The Umari Book
Status: Draft.
This book is the official guide to building event-sourced systems with Umari. It covers concepts, patterns, API reference, and operations — everything you need to go from zero to production.
Who this book is for
You should be comfortable with Rust. Prior event sourcing experience helps but is not required — the first part explains the concepts from scratch.
How to read this book
- Part 1 — Concepts: Read straight through. These chapters establish the mental model.
- Part 2 — Building Blocks: Read straight through. You’ll use every concept here.
- Part 3 — Module Types: Reference each chapter as you build that module type.
- Part 4 — Working with Umari: Practical patterns, project structure, and API reference.
- Part 5 — Runtime & Operations: Read when deploying or debugging.
Code examples follow current API conventions.
Conventions
- Crate names:
umarirefers to the SDK crate (crates/umari).umari-runtimerefers to the runtime. Module crates use kebab-case names.
1. Introduction
Umari is a WASM-native event sourcing runtime. You write business logic in Rust, compile it to WebAssembly, and the runtime handles event persistence, module lifecycle, and state derivation.
What is event sourcing?
In event sourcing, every state change is recorded as an immutable event in an append-only log. The current state of the system is derived by replaying events from the beginning. This gives you:
- Full audit trail — every change is recorded, forever
- Temporal queries — ask “what was the state at time T?” by replaying up to that point
- Complete replayability — delete all derived state and rebuild it from events
- Causal traceability — every event knows which action produced it and which event triggered that action
Umari adds two key innovations on top of classical event sourcing:
- No aggregates, no streams — consistency boundaries are dynamic (DCB), not pre-partitioned
- Everything is WASM — business logic is compiled to WebAssembly and loaded at runtime
Three module types
Umari splits business logic into three distinct concerns, each compiled as a separate WASM module:
| Module | Job | Writes events? | SQLite? |
|---|---|---|---|
| Command | Validate input, check invariants, emit events | Yes — the only writer | No |
| Projector | Build queryable read models from the event stream | No | Yes |
| Effect | React to events with side effects (HTTP, email, third-party APIs) | Only by calling commands | Yes |
Commands are the only mechanism for writing events. Projectors and effects subscribe to events and react. Effects can call commands, which write events, which trigger more projectors and effects — forming a causal chain.
How it fits together
External trigger (HTTP, webhook, cron)
│
▼
Command ──► emits events ──► Event Store (UmaDB)
│
┌─────────────────┴─────────────────┐
▼ ▼
Projector Effect
(builds read model (side effects:
in SQLite) HTTP, execute commands)
│
▼
Command
(private, for
idempotency)
Prerequisites
- Rust — you write modules in Rust using the
umariSDK crate - UmaDB — the event store. Must be running before starting the Umari server
- wasm32-wasip2 target —
rustup target add wasm32-wasip2
What you’ll build
A typical Umari application looks like this:
my-project/
├── src/ # Shared library: events, folds
│ ├── events/
│ │ ├── shop.rs
│ │ ├── warranty.rs
│ │ └── claim.rs
│ └── folds/
│ └── mod.rs
├── commands/
│ ├── connect-shop/ # Crate: connect-shop
│ ├── create-warranty-plan/
│ └── cancel-warranty/
├── projectors/
│ ├── plans/ # Crate: plans
│ ├── shops/
│ └── warranties/
├── effects/
│ ├── register-shopify-webhooks/
│ └── record-warranty-sale/
└── Cargo.toml # Workspace root
Each command, projector, and effect is its own crate, compiled as a WASM component. They all depend on the shared library for event and fold definitions.
About this book
This book is the canonical reference for building on Umari. It walks through the runtime, the SDK, and the patterns you’ll use to write commands, projectors, and effects.
2. Core Concepts
This chapter establishes the foundational concepts that Umari is built on. Understanding these will make the rest of the book straightforward.
Events as immutable facts
An event represents something that has already happened. It is named in past tense and carries all the data needed to describe what occurred. Once written, an event is never modified or deleted — it is a permanent fact in the system’s history.
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
#[domain_id]
pub shop_id: u64,
pub shop_domain: String,
pub shop_name: String,
pub access_token: String,
}
}
Every event carries metadata in the event envelope:
| Field | Purpose |
|---|---|
id | Unique UUID for this event |
position | Global position in the event log (monotonic) |
event_type | String identifier ("shop.connected") |
tags | Domain ID key-value pairs used to query events by domain ID (["shop_id:42"]) |
timestamp | When the event was written |
correlation_id | Traces back to the originating user action |
causation_id | The specific command execution that produced this event |
triggering_event_id | The event that caused an effect to trigger this command |
No aggregates — Dynamic Consistency Boundaries
Traditional event sourcing uses aggregates: each entity has its own event stream, and concurrency is managed by comparing stream positions. Umari does not use aggregates.
Instead, Umari uses Dynamic Consistency Boundaries (DCB). When a command runs, it declares exactly which events it needs — specified by event types and domain ID tags. The runtime fetches those events and uses their positions to form a consistency boundary at execution time.
This means:
- Different commands touching different domain IDs form different boundaries
- Two commands can run concurrently as long as their event sets don’t overlap
- There is no pre-partitioning of the event log into streams
- Consistency is dynamic, not pre-defined
Domain IDs
Domain IDs are the mechanism that makes DCB work. They are fields on events that identify what the event is “about.” When a command queries events, it specifies domain ID values, and the event store returns only events tagged with those values.
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
#[domain_id]
pub shop_id: u64, // tag: shop_id:42
#[domain_id]
pub warranty_id: Uuid, // tag: warranty_id:abc-def
#[domain_id]
pub order_id: u64, // tag: order_id:1001
pub plan_title: String, // not a domain ID — just data
}
}
Events are stored in a single global log. Domain ID tags enable the runtime to efficiently fetch only the relevant subset.
State is derived, not stored
There is no “current state” table. Everything is derived from events:
| Module | How it derives state |
|---|---|
| Command | Folds — replay just the events this domain ID touches, in memory, on every call |
| Projector | handle() updates SQLite as events arrive. The SQLite file is a rebuildable cache, not the source of truth |
| Effect | Tracks internal work in SQLite, but checks the event store (via folds) to decide whether a side effect has already happened |
The contract: delete every SQLite file, replay events from position 0, end up in the same state — and side effects that already ran don’t run again.
Commands are the only writers
Every event in the store comes from a command. Projectors never write events; effects don’t either — when an effect needs to write, it calls a command function inline. A command is just a regular Rust function exported with #[export_command], so any module can import and call one directly.
That single rule keeps every write going through validation and invariant checks, and gives every event a clear causal chain.
Causal chain
Every event traces back to the user action that initiated it:
User HTTP request
└── Command "create-warranty-plan"
└── Event "warranty.plan.created" (correlation_id = req_id)
└── Effect "sync-warranty-plan-product-variations"
└── Command "create-master-product" (triggering_event_id = above)
└── Event "shop.master_product.created"
The correlation_id flows through the entire chain. The triggering_event_id links each downstream command to the specific event that caused it.
Key principles
- Events are immutable facts — never modified, only appended
- Commands are the only writers — all events originate from command execution
- No aggregates or streams — DCB forms consistency boundaries dynamically
- State is derived by replay — folds for commands, SQLite for projectors
- The system is fully replayable — delete SQLite, replay events, identical result
3. Architecture Overview
This chapter describes the Umari runtime architecture — how the pieces connect, how modules are loaded, and how events flow through the system.
Crate map
Umari is a Cargo workspace with these crates:
| Crate | Role |
|---|---|
umari (crates/umari) | SDK — traits, types, macros, and the WASM guest library |
umari-macros (crates/macros) | Derive macros: Event, EventSet, DomainIds, FromDomainIds, #[export_command] |
umari-runtime (crates/runtime) | Runtime — Wasmtime-based module runner, event dispatch, actor system |
umari-api (crates/api) | HTTP API server (Axum) — upload modules, execute commands, manage lifecycle |
umari-server (crates/server) | Server binary — starts the runtime + API + optional Web UI |
umari-cli (crates/cli) | CLI tool — upload modules, execute commands, manage the system |
umari-ui (crates/ui) | Web UI built with HTMX |
umari-types (crates/types) | Shared API types |
Runtime architecture
The runtime is built on the kameo actor framework with Wasmtime as the WASM engine.
RuntimeSupervisor
├── PubSub<ModuleEvent> # Module lifecycle events (upload, activate, deactivate)
├── ModuleStoreActor # SQLite store for WASM bytes, metadata, crypto keys
├── CommandActor # Handles command execution requests
│ └── subscribes to module_pubsub for command modules
├── ModuleSupervisor<ProjectorWorld>
│ ├── ModuleActor (plans) # One per active projector
│ └── ModuleActor (shops)
└── ModuleSupervisor<EffectWorld>
├── ModuleActor (register-shopify-webhooks)
│ ├── Worker (global) # Pool of workers for parallel processing
│ ├── Worker (keyed, 0) # 8 keyed workers by default
│ └── Worker (keyed, 7)
└── ModuleActor (record-warranty-sale)
└── Worker (global) # Workers run on dedicated OS threads
ModuleSupervisor
Each module type (projector, effect) has a ModuleSupervisor that:
- Subscribes to
ModuleEventvia the pubsub - On
ModuleActivated: compiles the WASM, spawns aModuleActor - On
ModuleDeactivated: stops theModuleActor - On
ModuleUpdated(new version uploaded): spawns new actor, waits for old to stop, swaps
ModuleActor
Each active module has one ModuleActor that:
- Opens a subscription to the event store via DCB
- Receives event batches from the event stream
- For projectors: processes events sequentially inline
- For effects: dispatches events to a worker pool based on
partition_key() - Maintains a
last_positionwatermark in SQLite for crash recovery
Worker pool (effects only)
Effects use a worker pool for parallel processing. The pool consists of:
- One global worker — handles events with no partition key or
PartitionKey::Inline - N keyed workers (default: 8) — each handles a hash-consistent subset of partition keys
The partition_key() method on an effect determines routing. Returning None routes to the global worker (sequential for that effect). Returning Some(key) routes to hash(key) % 8, enabling parallel processing of independent event streams.
Workers acknowledge completion back to the ModuleActor, which tracks the watermark — the highest contiguous position acknowledged. The watermark prevents data loss: if a worker crashes, events above the watermark are replayed.
CommandActor
Commands are different from projectors/effects — they don’t subscribe to event streams. Instead, the CommandActor handles on-demand execution:
- A client (HTTP API, effect, or direct call) sends an execution request
- The CommandActor compiles (or retrieves from cache) the command’s WASM
- The command runs: queries events via DCB, applies folds, executes user logic, emits events
- Events are appended to the event store atomically
WASM runtime
Each module runs as a WASM component using the component model. The WIT interfaces define the contract between the guest (your Rust code) and the host (the Umari runtime):
| WIT interface | Provided to | Purpose |
|---|---|---|
command/transaction | Commands, Effects | Read events, commit new events |
common | All modules | Event types, event query types |
sqlite | Projectors, Effects | SQLite database access |
crypto | Effects | Delete encryption keys (crypto-shredding) |
wasi:http | Effects | Make HTTP requests |
Commands have a simpler interface — they only need the transaction interface. Effects have the richest interface, including HTTP and the SQLite database for their own internal state.
Note: An older
command/executorinterface still exists in the WIT package but is unused — effects today execute commands by calling the command function directly (see Chapter 9: Effects).
Event flow
1. External trigger (HTTP POST /execute)
2. CommandActor instantiates the already-compiled command component
(all active modules are compiled once at runtime startup and cached)
3. Command executes:
a. Builds DCB query from fold domain IDs
b. Opens transaction, reads events in batches
c. Applies events to folds, checks idempotency
d. Calls user's execute closure with fold state
e. User closure emits events (or returns empty emit)
f. Events appended to event store atomically
4. Event store notifies subscribers
5. Projector ModuleActor receives event batch
6. Projector handles each event, updates SQLite
7. Effect ModuleActor receives event batch
8. Effect dispatches to worker by partition key
9. Worker handles event, may execute commands or make HTTP calls
Input validation (e.g. the validator crate) is a guest-side convention used by the Rust SDK — it isn’t part of the runtime contract. Other SDKs may handle validation differently or not at all.
Data directory layout
umari-data/
├── umari.sqlite # Module store: WASM bytes, metadata, crypto keys
├── projector/
│ ├── plans.sqlite # Per-projector read model
│ ├── plans.log # Captured stdout/stderr from the projector
│ ├── shops.sqlite
│ └── warranties.sqlite
├── effect/
│ ├── register-shopify-webhooks.sqlite # Per-effect internal state
│ ├── register-shopify-webhooks.log # Captured stdout/stderr from the effect
│ └── record-warranty-sale.sqlite
└── cache/
└── *.cwasm # Compiled WASM component cache
Anything a module writes to stdout or stderr (e.g. println!, eprintln!, tracing logs configured to write to stderr) is captured by the runtime, buffered in memory for inspection via the API/UI, and persisted to the per-module .log file shown above. Commands don’t run as long-lived actors and don’t have their own per-module directory.
Module store
The module store (umari.sqlite) is a SQLite database that holds:
- WASM bytecode — raw
.wasmbytes for each module version - Module metadata — name, version, module type, active status
- Environment variables — key-value pairs injected as WASI env vars
- Crypto keys — AES-256 keys per encryption scope
When a module is uploaded, the bytes are stored. When activated, the WASM is compiled (cached to cache/*.cwasm) and a ModuleActor is spawned. Multiple versions can coexist — only one is “active” at a time.
4. Events
Events are the immutable facts of your system. This chapter covers how to define them, what data they carry, and how they flow through Umari.
Defining an event
An event is a Rust struct that derives Event, DomainIds, Serialize, and Deserialize:
#![allow(unused)]
fn main() {
use umari::prelude::*;
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.plan.created")]
pub struct WarrantyPlanCreated {
#[domain_id]
pub plan_id: Uuid,
#[domain_id]
pub shop_id: u64,
pub title: String,
pub duration_months: u32,
pub price: Decimal,
pub applicable_to: ProductApplicability,
}
}
Event and DomainIds are independent derives — Event does not include DomainIds, so you need to add both. Clone and Debug are optional; add them if you want them on a particular event.
The #[event_type("warranty.plan.created")] attribute sets the EVENT_TYPE constant. This string is what the event store uses to identify the event, what projectors and effects filter on, and what appears in the event_type field of StoredEvent. The attribute is optional — if omitted, EVENT_TYPE defaults to the struct name.
Naming convention: A common convention is dot-separated past-tense verb phrases like shop.connected, warranty.plan.created, order.paid — the first segment is the domain entity and the second is the action. Umari doesn’t enforce this; use whatever naming scheme fits your project.
Domain IDs
Fields annotated with #[domain_id] become tags on the stored event. Commands query for events by these tags via DCB, and event sets (the queries used by folds, projectors, and effects) use them to declare what they want to see. Choose domain IDs carefully:
- What entity is this event “about”? That’s a domain ID.
- Is this just reference data? Not a domain ID.
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
#[domain_id] pub shop_id: u64, // ✓ — identifies the shop
#[domain_id] pub warranty_id: Uuid, // ✓ — identifies the warranty
#[domain_id] pub order_id: u64, // ✓ — identifies the order
#[domain_id] pub line_item_id: u64, // ✓ — identifies the order line
pub plan_title: String, // ✗ — just data
pub customer_email: String, // ✗ — just data
pub price: Decimal, // ✗ — just data
}
}
Each domain ID becomes a tag like shop_id:42. When a command queries for events with shop_id=42, the event store returns only events carrying that tag.
Renaming domain IDs
You can override the tag name — useful when the Rust field name differs from the domain concept:
#![allow(unused)]
fn main() {
#[domain_id("plan_id")]
pub warranty_plan_id: Uuid,
}
This produces a tag like plan_id:abc-def rather than warranty_plan_id:abc-def.
The DomainIds derive
#[derive(DomainIds)] generates the domain_ids() method, which collects all #[domain_id] fields into a DomainIdBindings map used for querying. It’s a separate derive from Event — events need both, and so do other structs that participate in DCB queries (see Chapter 5: Domain IDs).
Encryption scopes
Events that contain sensitive data (PII, access tokens, etc.) can be encrypted at rest. To enable encryption, mark a single field on the event with #[crypto_scope]:
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
#[domain_id]
#[crypto_scope]
pub shop_id: u64,
pub shop_domain: String,
pub access_token: String,
}
}
The field marked #[crypto_scope] does not mean “encrypt this field.” Instead, its value (combined with the field name as field_name:value, e.g. shop_id:42) becomes a lookup key for an encryption key, and the runtime then encrypts the entire event payload with that key. The event envelope (id, position, tags, timestamps, etc.) is never encrypted.
- No
#[crypto_scope]on any field → the event is stored in plaintext. - One field with
#[crypto_scope]→ the whole payload is encrypted under a per-scope AES-256-GCM key. Each unique scope value (e.g. eachshop_id) gets its own key.
Encryption is transparent:
- Writing: the runtime serializes the event, fetches/creates the key for the scope, and encrypts the payload before appending.
- Reading: the runtime decrypts the payload before passing it to folds, projectors, and effects.
- Key deletion: deleting a scope’s key (crypto-shredding) makes all events for that scope permanently unreadable — they appear as
Value::Nulland are skipped.
See Chapter 14: Encryption & Crypto-Shredding for details.
The Event trait
The Event derive macro generates an implementation of:
#![allow(unused)]
fn main() {
pub trait Event: DomainIds + Serialize + DeserializeOwned + Sized {
const EVENT_TYPE: &'static str;
fn encryption_scope(&self) -> Option<String> {
None // Overridden if any field has #[crypto_scope]
}
}
}
You rarely need to know this — the derive macro handles everything. But it’s useful to understand when debugging.
The StoredEvent envelope
When you receive an event in a fold, projector, or effect, it arrives as StoredEvent<T>:
#![allow(unused)]
fn main() {
pub struct StoredEvent<T> {
pub id: Uuid,
pub position: u64,
pub event_type: String,
pub tags: Vec<String>, // ["shop_id:42", "plan_id:abc"]
pub timestamp: DateTime<Utc>,
pub correlation_id: Uuid,
pub causation_id: Uuid,
pub triggering_event_id: Option<Uuid>,
pub idempotency_key: Option<Uuid>,
pub encryption_scope: Option<String>,
pub encryption_key_id: Option<Uuid>,
pub data: T, // Your typed event data
}
}
The data field contains your deserialized event struct. The envelope fields carry tracing and routing metadata.
Event sets
An event set is an enum that groups multiple event types together. It’s used by folds, projectors, and effects to declare which events they care about.
#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
ShopConnected(ShopConnected),
ShopReconnected(ShopReconnected),
}
}
The #[derive(EventSet)] macro generates the EventSet trait implementation:
#![allow(unused)]
fn main() {
pub trait EventSet: Sized {
type Item;
fn event_types() -> Vec<&'static str>; // ["shop.connected", "shop.reconnected"]
fn event_domain_ids() -> Vec<EventDomainId>; // Domain ID requirements per event type
fn from_event(event_type: &str, data: &Value)
-> Option<Result<Self::Item, SerializationError>>;
}
}
Single events as event sets
For folds that only care about one event type, use SingleEvent<E>:
#![allow(unused)]
fn main() {
impl Fold for ShopExistsFold {
type Events = SingleEvent<ShopConnected>;
type State = bool;
fn apply(&self, exists: &mut bool, event: StoredEvent<ShopConnected>) {
*exists = true;
}
}
}
SingleEvent<E> is a built-in event set for a single event type. It’s the simplest way to subscribe to one event.
Scoping with #[scope(...)]
The #[scope(...)] attribute on an EventSet variant controls which domain ID tags are used to filter that event type:
#![allow(unused)]
fn main() {
#[derive(EventSet)]
pub enum WidgetFoldEvents {
// Only receive WidgetCreated events for this specific widget_id
#[scope(widget_id)]
WidgetCreated(WidgetCreated),
// Receive WidgetArchived events — uses all domain ID bindings from the fold
WidgetArchived(WidgetArchived),
// Always match events where shop_id = "acme", regardless of fold bindings
#[scope(shop_id = "acme")]
GlobalSettingsChanged(GlobalSettingsChanged),
}
}
Three forms:
- No attribute: The variant is filtered using all domain ID bindings from the fold/command input
#[scope(field_name)]: Filter only by the named domain ID — restricts scope to fewer IDs#[scope(field_name = "literal")]: Hardcoded tag value — matches events with that fixed value
Scoping matters. A fold that checks whether a widget name is unique within a shop should be scoped by
shop_idonly. Without#[scope(shop_id)], the fold would also filter bywidget_idand only see events for that specific widget — missing other widgets in the shop.
For projectors and effects (which have no fold bindings), only the hardcoded form is meaningful:
#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
WidgetCreated(WidgetCreated),
// Only webhook events for the "orders/paid" topic
#[scope(topic = "orders/paid")]
WebhookReceived(WebhookReceived),
}
}
Naming conventions
- Event struct:
PascalCasepast-tense verb phrase:WidgetCreated,ShopConnected,WarrantyClaimFiled - Event type string:
object.verbdot notation:"widget.created","shop.connected" - Event set enum: Always named
Query
5. Domain IDs
Domain IDs are the indexing and routing mechanism in Umari. They determine which events a command reads, which events a projector receives, and how effects partition work.
What domain IDs are
A domain ID is a field on an event that identifies the entity the event is about. Each domain ID becomes a tag in the format field_name:value stored alongside the event.
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
#[domain_id] pub shop_id: u64, // tag: shop_id:42
#[domain_id] pub warranty_id: Uuid, // tag: warranty_id:abc-def-123
#[domain_id] pub order_id: u64, // tag: order_id:1001
}
}
These tags are used by the event store (UmaDB) for DCB queries. When a command requests events with shop_id=42, only events tagged shop_id:42 are returned.
Choosing domain IDs
Ask yourself: “If this field changes, does it identify a different entity’s consistency boundary?”
shop_idonWarrantySold— yes, the warranty belongs to a specific shop. Domain ID.customer_emailonWarrantySold— no, it’s just data about the warranty. Not a domain ID.line_item_idonWarrantySold— yes, a single line item can only be sold once. Adding it as a domain ID lets a fold ask “has this line ever been sold?” and reject duplicates. Domain ID.
If you’re unsure, err on the side of fewer domain IDs. Adding one later is backwards-compatible — existing events just won’t have the new tag. Removing one is not — you’d have to backfill every existing event.
The DomainIds trait
The #[derive(DomainIds)] macro generates an implementation of:
#![allow(unused)]
fn main() {
pub trait DomainIds {
const DOMAIN_ID_FIELDS: &'static [&'static str];
fn domain_ids(&self) -> DomainIdBindings;
}
}
DomainIdBindings is IndexMap<&'static str, String> — a map from field name to string value. This is what the runtime uses to construct DCB queries.
Important: #[derive(DomainIds)] is separate from #[derive(Event)]. The Event derive does not include DomainIds. You need DomainIds on:
- Events — so their tags can be written and queried
- Command input structs — so the command’s domain ID bindings can be derived from the input
- Fold structs — so folds can declare which bindings they need
The FromDomainIds trait
Fold structs implement FromDomainIds to be constructed from command input bindings:
#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
#[domain_id]
pub shop_id: u64,
}
}
FromDomainIds generates a constructor that takes domain ID bindings and creates the fold struct. Only fields matching the fold’s #[domain_id] fields are copied from the bindings. This is how a command that declares shop_id=42 and plan_id=abc on its input automatically passes just shop_id=42 to the ShopExistsFold.
Where it’s used: FromDomainIds is only used by the Command builder’s .fold::<T>() and .fold_args::<T>(args) methods — those are the call sites that take a fold type by name and construct it from the command’s input bindings. If you always reach for .fold_with(|input| ...) instead, you don’t strictly need FromDomainIds. It may be removed in the future, but for now derive it on your fold structs so they’re usable with .fold::<T>().
The generic EventFold<E>, LatestEvent<E>, EventCounter<E>, and EventToggle<A,B> all implement FromDomainIds automatically — they filter bindings to only the fields that the event type E declares as domain IDs.
How DCB queries are built
When a command has registered folds, the runtime builds a DCB query by:
- Collecting all
EventDomainIdentries from every fold’sEventSet - For each entry, looking up the dynamic field values from the input’s domain ID bindings
- Grouping by tag sets — events that share the same tag combination are requested together
For example, a command with input { shop_id: 42, plan_id: abc } and two folds:
ShopExistsFold: reads SingleEvent<ShopConnected>, dynamic_fields: [shop_id]
→ DCB item: type="shop.connected", tags=["shop_id:42"]
PlanExistsFold: reads SingleEvent<WarrantyPlanCreated>, dynamic_fields: [shop_id, plan_id]
→ DCB item: type="warranty.plan.created", tags=["shop_id:42", "plan_id:abc"]
The event store returns events matching either query, deduplicated and in position order.
Scoping
The #[scope(...)] attribute on EventSet variants controls how an event type is filtered against the surrounding bindings (for folds) or the global event log (for projectors and effects). It’s the main knob for narrowing — or broadening — what a query sees.
See Chapter 4: Events → Scoping with #[scope(...)] for the full description and examples.
6. Folds
A fold is a “reduce” over the event log. You declare which events to read and how each one updates a piece of in-memory state. Commands replay folds on every call to recover whatever state they need to make a decision — they have no SQLite, no other query path.
If you’ve used Iterator::fold, the mental model is identical:
#![allow(unused)]
fn main() {
// Iterator::fold
events.iter().fold(State::default(), |state, event| apply(state, event));
// Umari Fold trait
fn apply(&self, state: &mut Self::State, event: StoredEvent<E>) { ... }
}
The runtime supplies the events (scoped by the fold’s domain IDs) and the initial state (State::default()). You only write the body.
The Fold trait
#![allow(unused)]
fn main() {
pub trait Fold: DomainIds + 'static {
type Events: EventSet;
type State: Default + 'static;
fn apply(&self, state: &mut Self::State, event: StoredEvent<<Self::Events as EventSet>::Item>);
}
}
Events— which events this fold subscribes to (anEventSet)State— the type of state produced by replaying those events (must implementDefault)apply()— called once per matching event, in position order, to update the state
Simple fold
#![allow(unused)]
fn main() {
use umari::prelude::*;
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
#[domain_id]
pub shop_id: u64,
}
impl Fold for ShopExistsFold {
type Events = SingleEvent<ShopConnected>;
type State = bool;
fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
*exists = true;
}
}
}
This fold subscribes to ShopConnected events scoped by shop_id. The state starts as false (the Default for bool). When a ShopConnected event is encountered during replay, the state becomes true. The command can then check if !exists { ... }.
Fold with an EventSet enum
For folds that need multiple event types:
#![allow(unused)]
fn main() {
#[derive(EventSet)]
pub enum ShopDomainQuery {
ShopConnected(ShopConnected),
ShopReconnected(ShopReconnected),
}
#[derive(DomainIds, FromDomainIds)]
pub struct ShopDomainFold {
#[domain_id]
pub shop_id: u64,
}
impl Fold for ShopDomainFold {
type Events = ShopDomainQuery;
type State = Option<String>;
fn apply(&self, domain: &mut Option<String>, event: StoredEvent<ShopDomainQuery>) {
match event.data {
ShopDomainQuery::ShopConnected(ev) => *domain = Some(ev.shop_domain),
ShopDomainQuery::ShopReconnected(ev) => *domain = Some(ev.shop_domain),
}
}
}
}
The apply method receives the deserialized event and decides how to update state. Events arrive in position order — the state after all events have been applied is what’s passed to the command’s execute closure.
Built-in fold types
Umari provides several generic folds for common patterns:
EventFold
Collects ALL occurrences of event E into a Vec. Use when you need the full history.
#![allow(unused)]
fn main() {
let connected = cmd.fold::<EventFold<ShopConnected>>();
// State: EventState<ShopConnected> { events: Vec<StoredEvent<ShopConnected>> }
// connected.exists() → true if at least one event exists
}
LatestEvent
Keeps only the most recent occurrence of event E. More efficient than EventFold when you only need the current value.
#![allow(unused)]
fn main() {
let latest_connected = cmd.fold::<LatestEvent<ShopConnected>>();
// State: Option<StoredEvent<ShopConnected>>
}
EventCounter
Counts occurrences of event E. Efficient — doesn’t store events.
#![allow(unused)]
fn main() {
let sale_count = cmd.fold::<EventCounter<WarrantySold>>();
// State: u64
}
EventToggle
Tracks which of two opposing events occurred last. Ideal for created/deleted, activated/deactivated, archived/unarchived pairs.
#![allow(unused)]
fn main() {
let toggle = cmd.fold::<EventToggle<WarrantyPlanArchived, WarrantyPlanUnarchived>>();
// State: ToggleState<A, B> {
// last: Option<ToggleSide<A, B>> // None, Some(ToggleSide::A(...)), or Some(ToggleSide::B(...))
// }
}
Custom folds
For anything beyond the built-in types, implement Fold directly:
#![allow(unused)]
fn main() {
#[derive(Default)]
pub struct WarrantyPlanState {
pub exists: bool,
pub title: Option<String>,
pub status: PlanStatus,
pub archived: bool,
}
#[derive(EventSet)]
pub enum WarrantyPlanEvents {
#[scope(plan_id)]
WarrantyPlanCreated(WarrantyPlanCreated),
#[scope(plan_id)]
WarrantyPlanUpdated(WarrantyPlanUpdated),
#[scope(plan_id)]
WarrantyPlanArchived(WarrantyPlanArchived),
#[scope(plan_id)]
WarrantyPlanUnarchived(WarrantyPlanUnarchived),
}
#[derive(DomainIds, FromDomainIds)]
pub struct WarrantyPlanFold {
#[domain_id]
pub plan_id: Uuid,
}
impl Fold for WarrantyPlanFold {
type Events = WarrantyPlanEvents;
type State = WarrantyPlanState;
fn apply(&self, state: &mut WarrantyPlanState, event: StoredEvent<WarrantyPlanEvents>) {
match event.data {
WarrantyPlanEvents::WarrantyPlanCreated(ev) => {
state.exists = true;
state.title = Some(ev.title);
state.status = ev.status;
}
WarrantyPlanEvents::WarrantyPlanUpdated(ev) => {
state.title = Some(ev.title);
state.status = ev.status;
}
WarrantyPlanEvents::WarrantyPlanArchived(_) => state.archived = true,
WarrantyPlanEvents::WarrantyPlanUnarchived(_) => state.archived = false,
}
}
}
}
Note that #[scope(plan_id)] ensures we only see events for the specific plan being queried. Without the scope attribute, the fold would filter by all domain ID bindings from the command input — which may be too narrow.
How folds are registered in commands
Commands register folds through the builder pattern:
#![allow(unused)]
fn main() {
Command::new(input, context)
.fold::<ShopExistsFold>() // No extra args
.fold::<WarrantyPlanFold>() // No extra args
.fold_args::<CustomFold>(args) // With additional constructor args
.fold_with(|input| MyFold { ... }) // Manual construction from input
.execute(|input, (shop_exists, plan_state)| {
// ...
})
}
Each .fold::<T>() call:
- Extends the DCB query with the fold’s event domain IDs
- Creates the fold from the input’s domain ID bindings (via
FromDomainIds) - Initializes the fold’s state to
Default - Returns a
FoldHandle<T>— a typed token for extracting state in the execute closure
The execute closure receives the input and a tuple of fold states, in the order they were registered. Up to 12 folds are supported in a single tuple.
Fold state and idempotency
When the runtime replays events into folds, it also checks for idempotency. If the command was called with an idempotency_key, and any event in the fold’s scope has a matching idempotency_key, the command exits early without calling the execute closure — returning an empty result.
This means you can safely retry command executions without worrying about duplicate events. The runtime handles deduplication at the event store level.
Crypto-shredded events in folds
When an event’s encryption key has been deleted, the event data is Value::Null and encryption_scope is Some. Folds skip these events silently — from_event returns None for null data, so apply is never called. Your fold state simply won’t reflect the shredded event, which is the intended behavior.
FoldQuery — standalone fold execution
Outside of commands, you can run folds standalone with FoldQuery. This is useful in effects for checking event store state before performing side effects:
#![allow(unused)]
fn main() {
use umari::prelude::FoldQuery;
let topics_registered = FoldQuery::new()
.fold_iter(topics.iter().map(|topic| AlreadyRegisteredFold {
shop_id,
topic: topic.to_string(),
current_event_id: event.id,
}))
.run()?;
}
FoldQuery opens a transaction, reads events, applies them to the registered folds, and returns the fold states. It’s the same mechanism commands use internally, exposed for effects that need to check event store state without executing a full command.
7. Commands
Commands are the entry point for all mutations. They are pure, deterministic functions that validate input, check invariants against event history, and emit new events. Commands are the only mechanism for writing to the event store.
Anatomy of a command
A command is a function annotated with #[export_command]. It receives typed input and a CommandContext, then uses the Command builder to declare folds and execute logic.
#![allow(unused)]
fn main() {
use umari::prelude::*;
use schemars::JsonSchema;
use serde::{Serialize, Deserialize};
use validator::Validate;
#[derive(DomainIds, Validate, JsonSchema, Serialize, Deserialize)]
pub struct Input {
#[domain_id]
pub shop_id: u64,
#[domain_id]
pub plan_id: Uuid,
#[validate(length(min = 1, max = 200))]
pub title: String,
#[validate(range(min = 1, max = 120))]
pub duration_months: u32,
pub price: Decimal,
}
#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
// 1. Validate input
input.validate()?;
// 2. Build command with folds, execute
Command::new(input, context)
.fold::<ShopExistsFold>()
.fold::<WarrantyPlanFold>()
.execute(|input, (shop_exists, plan_state)| {
// 3. Check invariants
anyhow::ensure!(shop_exists, "shop does not exist");
anyhow::ensure!(!plan_state.exists, "plan already exists with this ID");
anyhow::ensure!(
!plan_state.archived,
"a plan with this ID was previously archived"
);
// 4. Emit events
Ok(emit![WarrantyPlanCreated {
plan_id: input.plan_id,
shop_id: input.shop_id,
title: input.title,
duration_months: input.duration_months,
price: input.price,
applicable_to: input.applicable_to,
status: PlanStatus::Draft,
}])
})
}
}
The Command builder
Command::new(input, context)
Creates a new command builder. The input must implement DomainIds (derive it). The context is a CommandContext.
.fold::<T>()
Registers a fold. T must implement Fold + FromDomainIds<Args = ()>. The fold is constructed from the input’s domain ID bindings automatically. Returns a Command with the fold’s handle appended to the fold state tuple.
.fold_args::<T>(args)
Same as .fold::<T>() but passes additional constructor arguments to T::from_domain_ids(args, bindings).
.fold_with(|input| MyFold { ... })
Manually construct a fold from the raw input. Use this when the fold has custom construction logic beyond domain ID binding.
.execute(|input, fold_states| { ... })
Runs the command. The closure receives the input (by value) and the fold states as a tuple. Inside the closure, you:
- Check invariants against fold state (use
anyhow::ensure!orbail!) - Decide which events to emit (use the
emit!macro) - Return
Ok(emit![...])orOk(emit![])for a no-op
The emit! macro
#![allow(unused)]
fn main() {
emit![] // No events
emit![SomeEvent { field: value }] // Single event
emit![EventA { .. }, EventB { .. }] // Multiple events
}
Each event expression must be a struct implementing Event. The macro serializes each event, collects its domain IDs, and returns an Emit value.
You can also build an Emit manually:
#![allow(unused)]
fn main() {
Emit::new()
.event(FirstEvent { .. })
.event(SecondEvent { .. })
}
Command idempotency
Commands support built-in idempotency through the idempotency_key field in CommandContext. When present, the runtime checks whether any event in the fold scope already carries this key. If a match is found, the command exits early — the execute closure is never called, and no events are emitted.
#![allow(unused)]
fn main() {
let context = CommandContext::new()
.with_idempotency_key(Some(request_id));
}
This deduplication happens at the event store level, so it survives crashes and restarts. You can safely retry commands without producing duplicate events.
You can also implement domain-level idempotency inside the execute closure:
#![allow(unused)]
fn main() {
.execute(|input, plan_state| {
if plan_state.exists && plan_state.title.as_deref() == Some(&input.title) {
return Ok(emit![]); // Plan already exists with same data — idempotent
}
// ... emit WarrantyPlanCreated
})
}
CommandContext
#![allow(unused)]
fn main() {
pub struct CommandContext {
pub correlation_id: Uuid, // request that started the chain
pub causation_id: Uuid, // this specific execution
pub triggering_event_id: Option<Uuid>, // the event that called us, if any
pub idempotency_key: Option<Uuid>,
}
}
You almost never construct this by hand. Use CommandContext::new() and the right values are populated automatically:
| Where the command runs | What new() produces |
|---|---|
| HTTP / CLI entry point | fresh correlation_id, fresh causation_id, triggering_event_id = None |
Inside an effect’s handle() | inherits correlation_id and triggering_event_id from the effect’s current event; fresh causation_id |
The effect context lives in a thread-local (CURRENT_EVENT_CONTEXT) that the runtime sets before each handle() call. To override fields explicitly:
#![allow(unused)]
fn main() {
CommandContext::new()
.with_correlation_id(id)
.with_triggering_event_id(id)
.with_idempotency_key(key)
}
Private vs public commands
Commands fall into two categories by convention, not by type:
-
Public commands — part of the domain API. Called by external services, HTTP handlers, or scheduled jobs. These commands live in the
commands/directory and are uploaded to the runtime. -
Private commands — implementation details of effect idempotency. Only called from within effects. These are often defined as plain Rust functions (not
#[export_command]) inside the effect crate itself, or as separate modules within the effect.
#![allow(unused)]
fn main() {
// In effects/register-shopify-webhooks/src/commands.rs
use umari::prelude::*;
#[derive(DomainIds)]
pub struct RecordWebhookRegistrationCompletedInput {
#[domain_id] pub shop_id: u64,
#[domain_id] pub topic: String,
}
pub fn record_webhook_registration_completed(
input: RecordWebhookRegistrationCompletedInput,
context: CommandContext,
) -> anyhow::Result<ExecuteOutput> {
Command::new(input, context)
.fold::<ShopExistsFold>()
.execute(|input, shop_exists| {
anyhow::ensure!(shop_exists, "shop does not exist");
Ok(emit![ShopWebhookRegistrationCompleted {
shop_id: input.shop_id,
topic: input.topic,
}])
})
}
}
Private commands use the same Command::new(...).fold::<T>().execute(...) pattern — they just aren’t exported as WASM modules.
Validation
Use the validator crate for input validation:
#![allow(unused)]
fn main() {
#[derive(DomainIds, Validate, Serialize, Deserialize)]
pub struct Input {
#[validate(length(min = 1, max = 200))]
pub title: String,
#[validate(range(min = 1, max = 120))]
pub duration_months: u32,
}
#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
input.validate()?; // Call this first
// ...
}
}
Custom validators:
#![allow(unused)]
fn main() {
fn non_nil_uuid(value: &Uuid) -> Result<(), validator::ValidationError> {
if value.is_nil() {
return Err(validator::ValidationError::new("uuid")
.with_message("must not be nil".into()));
}
Ok(())
}
#[derive(DomainIds, Validate, Serialize, Deserialize)]
pub struct Input {
#[validate(custom(function = "non_nil_uuid"))]
pub plan_id: Uuid,
}
}
ExecuteOutput
The return type of every command:
#![allow(unused)]
fn main() {
pub struct ExecuteOutput {
pub position: Option<u64>, // Event store position after commit
pub events: Vec<EmittedEvent>, // Events that were emitted
}
pub struct EmittedEvent {
pub id: Uuid,
pub event_type: String,
pub domain_ids: IndexMap<String, String>, // field_name → value
}
}
Effects use ExecuteOutput to ask “did this idempotency-guard command actually emit anything?”:
#![allow(unused)]
fn main() {
let receipt = ScheduleWebhookRegistration::execute(&input)?;
if !receipt.has_event::<ShopWebhooksRegistrationScheduled>() {
return Ok(()); // already scheduled — skip the side effect
}
}
has_event::<E>() checks whether any event of type E is present in receipt.events. If the command short-circuited (idempotency hit, or invariant failed silently), the receipt will be empty and the effect can bail out cleanly.
Complete command example
#![allow(unused)]
fn main() {
#[derive(DomainIds, Validate, JsonSchema, Serialize, Deserialize)]
pub struct Input {
#[domain_id]
pub shop_id: u64,
pub shop_domain: String,
pub shop_name: String,
pub access_token: String,
#[validate(length(min = 1))]
pub owner_email: String,
}
#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
input.validate()?;
Command::new(input, context)
.fold::<EventFold<ShopConnected>>()
.execute(|input, connected| {
if connected.exists() {
Ok(emit![ShopReconnected {
shop_id: input.shop_id,
shop_domain: input.shop_domain,
shop_name: input.shop_name,
access_token: input.access_token,
owner_email: input.owner_email,
}])
} else {
Ok(emit![ShopConnected {
shop_id: input.shop_id,
shop_domain: input.shop_domain,
shop_name: input.shop_name,
access_token: input.access_token,
owner_email: input.owner_email,
}])
}
})
}
}
This command uses EventFold<ShopConnected> to determine whether the shop has been connected before. If yes, it emits ShopReconnected; if no, it emits ShopConnected. Both events carry the same data but have different semantics downstream.
8. Projectors
Projectors build read models by consuming events from the event store and updating SQLite databases. Their databases are designed to be queried by external processes — your HTTP API, dashboard, or reporting tools.
How projectors work
A projector:
- Calls
init()once at startup to create tables and prepare statements - Receives events from the event store in position order
- Calls
handle()for each event to update its SQLite database - Maintains a
last_positionwatermark for crash recovery
Projectors are naturally idempotent — deleting the SQLite database and replaying all events from the beginning produces the exact same result. There is no need for explicit idempotency logic.
The Projector trait
#![allow(unused)]
fn main() {
pub trait Projector: Sized {
type Query: EventSet;
fn init() -> anyhow::Result<Self>;
fn handle(&mut self, event: StoredEvent<<Self::Query as EventSet>::Item>)
-> anyhow::Result<()>;
}
}
A complete projector
#![allow(unused)]
fn main() {
use umari::prelude::*;
use rust_decimal::Decimal;
use std::str::FromStr;
export_projector!(Plans);
#[derive(EventSet)]
enum Query {
ShopConnected(ShopConnected),
ShopReconnected(ShopReconnected),
WarrantyPlanCreated(WarrantyPlanCreated),
WarrantyPlanUpdated(WarrantyPlanUpdated),
WarrantyPlanArchived(WarrantyPlanArchived),
WarrantyPlanUnarchived(WarrantyPlanUnarchived),
WarrantyPlanActivated(WarrantyPlanActivated),
WarrantyPlanDeactivated(WarrantyPlanDeactivated),
WarrantyPlanDeleted(WarrantyPlanDeleted),
WarrantyPlanVariantSynced(WarrantyPlanVariantSynced),
WarrantySold(WarrantySold),
}
struct Plans {}
impl Projector for Plans {
type Query = Query;
fn init() -> anyhow::Result<Self> {
execute_batch(
"
CREATE TABLE IF NOT EXISTS shops (
shop_id TEXT PRIMARY KEY,
access_token TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS plans (
plan_id TEXT PRIMARY KEY,
shop_id TEXT NOT NULL,
title TEXT,
duration_months INTEGER,
price TEXT NOT NULL,
applicable_to TEXT NOT NULL,
archived BOOLEAN NOT NULL DEFAULT FALSE,
total_sold INTEGER NOT NULL DEFAULT 0,
revenue TEXT NOT NULL DEFAULT '0.00',
status TEXT NOT NULL DEFAULT 'draft',
shopify_variant_id TEXT,
created_at TEXT NOT NULL
);
",
)?;
Ok(Plans {})
}
fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
match event.data {
Query::ShopConnected(ev) => {
execute(
"INSERT INTO shops (shop_id, access_token) VALUES (?1, ?2)",
params![ev.shop_id.to_string(), ev.access_token],
)?;
}
Query::ShopReconnected(ev) => {
execute(
"UPDATE shops SET access_token = ?2 WHERE shop_id = ?1",
params![ev.shop_id.to_string(), ev.access_token],
)?;
}
Query::WarrantyPlanCreated(WarrantyPlanCreated {
plan_id, shop_id, title, duration_months, price,
applicable_to, status,
}) => {
execute(
"INSERT INTO plans (plan_id, shop_id, title, duration_months, price, applicable_to, status, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![plan_id, shop_id.to_string(), title, duration_months,
price.to_string(), serde_json::to_string(&applicable_to)?,
match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
event.timestamp.to_rfc3339(),
],
)?;
}
Query::WarrantyPlanUpdated(WarrantyPlanUpdated {
plan_id, title, duration_months, price, applicable_to, status, ..
}) => {
execute(
"UPDATE plans SET title = ?2, duration_months = ?3, price = ?4, applicable_to = ?5, status = ?6 WHERE plan_id = ?1",
params![plan_id, title, duration_months, price.to_string(),
serde_json::to_string(&applicable_to)?,
match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
],
)?;
}
Query::WarrantyPlanArchived(WarrantyPlanArchived { plan_id, .. }) => {
execute("UPDATE plans SET archived = TRUE where plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanUnarchived(WarrantyPlanUnarchived { plan_id, .. }) => {
execute("UPDATE plans SET archived = FALSE where plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanActivated(WarrantyPlanActivated { plan_id, .. }) => {
execute("UPDATE plans SET status = 'active' WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanDeactivated(WarrantyPlanDeactivated { plan_id, .. }) => {
execute("UPDATE plans SET status = 'draft' WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanDeleted(WarrantyPlanDeleted { plan_id, .. }) => {
execute("DELETE FROM plans WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanVariantSynced(WarrantyPlanVariantSynced { plan_id, variant_id, .. }) => {
execute(
"UPDATE plans SET shopify_variant_id = ?2 WHERE plan_id = ?1",
params![plan_id, variant_id.to_string()],
)?;
}
Query::WarrantySold(WarrantySold { plan_id, price, .. }) => {
let current_revenue: String = query_row(
"SELECT revenue FROM plans WHERE plan_id = ?1",
params![plan_id],
)
.map(|row| row.get(0))
.unwrap_or_else(|| "0.00".to_string());
let new_revenue = Decimal::from_str(¤t_revenue)? + price;
execute(
"UPDATE plans SET total_sold = total_sold + 1, revenue = ?2 WHERE plan_id = ?1",
params![plan_id, new_revenue.to_string()],
)?;
}
}
Ok(())
}
}
}
The export_projector! macro
#![allow(unused)]
fn main() {
export_projector!(Plans);
}
This macro generates the WASM component interface glue — the projector() constructor, handle() entry point, and query() for declaring the event subscription. Your struct only needs to implement Projector.
Design guidelines
One table per concept
Each projector typically manages one primary concept (plans, shops, warranties). If you find yourself managing unrelated tables in one projector, split them.
Denormalize for reads
Projector tables should be optimized for query patterns, not normalized like an OLTP database. Denormalize freely — the source of truth is the event store, not the projector’s SQLite.
Use CREATE TABLE IF NOT EXISTS
Always use IF NOT EXISTS in init(). Projectors may be replayed from scratch (empty database), and init() is called before replay begins.
Keep handle() fast
Each event handler should be a single SQL statement or a small, bounded operation. Avoid complex computation or anything that could fail non-deterministically. If a handler fails, the projector stops and the error is logged. The runtime will retry (the event store subscription is persistent). Projectors don’t have access to HTTP or other side effects — they’re confined to their SQLite database — so this guidance is mostly about keeping per-event work tight.
Use prepared statements
For SQL that runs on every event, build a Statement once in init() and reuse it:
#![allow(unused)]
fn main() {
struct Widgets {
insert: Statement,
archive: Statement,
}
impl Projector for Widgets {
type Query = WidgetEvents;
fn init() -> anyhow::Result<Self> {
execute_batch("CREATE TABLE IF NOT EXISTS widgets (...)")?;
Ok(Widgets {
insert: prepare("INSERT INTO widgets (id, name) VALUES (?1, ?2)"),
archive: prepare("UPDATE widgets SET archived = TRUE WHERE id = ?1"),
})
}
fn handle(&mut self, event: StoredEvent<WidgetEvents>) -> anyhow::Result<()> {
match event.data {
WidgetEvents::Created(ev) => { self.insert.execute(params![ev.id, ev.name])?; }
WidgetEvents::Archived(ev) => { self.archive.execute(params![ev.id])?; }
}
Ok(())
}
}
}
The statement is parsed once and reused across every event, avoiding the per-event compilation cost.
Replaying projectors
Projectors can be replayed at any time — the runtime will delete the projector’s SQLite database and reprocess all events from position 0. This is done via the API:
POST /projectors/{name}/replay
Or via the CLI:
umari projector replay plans
This is safe because projectors are naturally idempotent. Replaying is the standard way to fix schema changes or recover from corruption.
Scoping in projectors
Projectors have no fold bindings, so dynamic #[scope(field)] is meaningless. Only hardcoded scopes are useful:
#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
WarrantyPlanCreated(WarrantyPlanCreated), // All plans, all shops
#[scope(topic = "orders/paid")] // Only this topic
WebhookReceived(WebhookReceived),
}
}
Without #[scope(...)], the projector receives every event of that type from the entire event log.
9. Effects
Effects react to events by performing side effects — HTTP requests, sending emails, or executing commands. They have their own SQLite database for internal state, but idempotency is anchored entirely in the event store.
The Effect trait
#![allow(unused)]
fn main() {
pub trait Effect: Sized {
type Query: EventSet;
fn init() -> anyhow::Result<Self>;
fn partition_key(&self, event: StoredEvent<<Self::Query as EventSet>::Item>) -> Option<String> {
None
}
fn handle(&mut self, event: StoredEvent<<Self::Query as EventSet>::Item>) -> anyhow::Result<()>;
}
}
Query— which events trigger this effectinit()— called once at startup; return the effect instancepartition_key()— controls parallel processing (returnNonefor sequential)handle()— called for each matching event
The idempotency contract
Effects must be idempotent. This is the most important rule in Umari. The runtime may redeliver events, and effects may be replayed from scratch. If an effect performs a side effect twice, that’s a bug.
The standard pattern is fold-check → side effect → record:
- Fold-check: Use
FoldQuery(or a fold inside a private command) to check whether the work has already been done, based on events already in the event store - Side effect: If not already done, perform the external action (HTTP call, email, etc.)
- Record: Execute a private command to record the outcome in the event store, so future runs will skip the side effect
Because idempotency is anchored in the event store (steps 1 and 3), deleting all SQLite databases and replaying events produces the same result without repeating side effects.
Scheduled events are usually unnecessary. A fold that checks for a completion event (step 3) is sufficient to determine whether the work was already done. Adding a separate “scheduled” event adds complexity without benefit in most cases.
Example: Registering webhooks
#![allow(unused)]
fn main() {
use umari::prelude::*;
use wasi_http_client::Client;
export_effect!(RegisterWebhooks);
#[derive(EventSet)]
enum Query {
ShopConnected(ShopConnected),
ShopReconnected(ShopReconnected),
}
struct RegisterWebhooks {
client: Client,
webhook_address: String,
}
impl Effect for RegisterWebhooks {
type Query = Query;
fn init() -> anyhow::Result<Self> {
let webhook_address = env::var("WEBHOOK_ADDRESS")
.expect("missing WEBHOOK_ADDRESS env var");
Ok(Self {
client: Client::new(),
webhook_address,
})
}
fn partition_key(&self, event: StoredEvent<Self::Query>) -> Option<String> {
match event.data {
Query::ShopConnected(ShopConnected { shop_id, .. })
| Query::ShopReconnected(ShopReconnected { shop_id, .. }) => Some(shop_id.to_string()),
}
}
fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
let (shop_id, shop_domain, access_token) = match event.data {
Query::ShopConnected(ShopConnected { shop_id, shop_domain, access_token, .. })
| Query::ShopReconnected(ShopReconnected { shop_id, shop_domain, access_token, .. }) => {
(shop_id, shop_domain, access_token)
}
};
let topics = ["orders/paid", "orders/cancelled"];
// 1. FOLD-CHECK — use a fold to check which topics are already registered
let topics_registered = FoldQuery::new()
.fold_iter(topics.iter().map(|topic| AlreadyRegisteredFold {
shop_id,
topic: topic.to_string(),
current_event_id: event.id,
}))
.run()?;
for (topic, registered) in topics.into_iter().zip(topics_registered) {
// Already done — skip
if registered {
continue;
}
// 2. SIDE EFFECT — make the HTTP call
let result = self.register_webhook(shop_id, &shop_domain, &access_token, topic)?;
match result {
Ok(()) => {
// 3. RECORD — persist completion in the event store
record_webhook_registration_completed(
RecordWebhookRegistrationCompletedInput {
shop_id,
topic: topic.to_string(),
},
CommandContext::new(),
)?;
}
Err(err) => {
eprintln!("failed to register webhook for topic {topic}: {err}");
// Don't fail the effect — retry on next event
return Ok(());
}
}
}
Ok(())
}
}
}
The fold (AlreadyRegisteredFold) checks whether a WebhookRegistrationCompleted event already exists for this shop and topic, scoped to the current triggering event. If it exists, the webhook was already registered in a previous run — skip. If not, perform the HTTP call and record completion.
partition_key and parallel processing
partition_key() enables parallel event processing. The runtime routes events to workers based on the key:
None→ global worker (sequential for this effect)Some(key)→ hashed to one of 8 keyed workers, enabling parallelism across independent streams
In the webhook example, partition_key returns shop_id — events for different shops are processed in parallel, but events for the same shop are serialized (same worker). This prevents race conditions within a shop while maximizing throughput across shops.
When to use partition keys:
- Use
Nonewhen events must be processed strictly in order - Use
Some(entity_id)when events for different entities are independent (different shops, different users) - Use
Some(event_id)with care — this parallelizes everything but may cause out-of-order processing within an entity
HTTP requests
Effects can make HTTP requests via wasi-http-client:
#![allow(unused)]
fn main() {
use wasi_http_client::Client;
let client = Client::new();
let resp = client
.post("https://api.example.com/endpoint")
.header("Content-Type", "application/json")
.header("Authorization", &format!("Bearer {token}"))
.json(&json!({ "key": "value" }))
.connect_timeout(Duration::from_secs(5))
.send()?;
let status = resp.status();
let body = resp.body()?;
}
The WASI HTTP interface is provided by the runtime. Effects get full HTTP client capability — they can make any outgoing request.
Executing commands from effects
Effects execute private commands directly as function calls:
#![allow(unused)]
fn main() {
use crate::commands::record_webhook_registration_completed;
record_webhook_registration_completed(
RecordWebhookRegistrationCompletedInput { shop_id, topic },
CommandContext::new(), // Inherits correlation context automatically
)?;
}
The CommandContext::new() inside an effect automatically:
- Inherits
correlation_idfrom the triggering event - Sets
triggering_event_idto the event being processed - Generates a new
causation_idfor this specific command execution
This creates a proper causal chain: the original event → effect → command → new events.
FoldQuery in effects
Effects can use FoldQuery to check event store state without executing a full command:
#![allow(unused)]
fn main() {
let registered = FoldQuery::new()
.fold(AlreadyRegisteredFold {
shop_id,
topic: "orders/paid".into(),
current_event_id: event.id,
})
.run()?;
}
This opens a transaction, reads events, applies them to the fold, and returns the state. It’s the same mechanism commands use internally, and is the recommended way to check idempotency in effects — lightweight, no extra events needed.
In the webhook example, FoldQuery::fold_iter() runs multiple folds in one transaction — one per topic — checking each topic’s registration status in a single event store read.
Environment variables
Effects can access environment variables:
#![allow(unused)]
fn main() {
fn init() -> anyhow::Result<Self> {
let webhook_address = env::var("WEBHOOK_ADDRESS")
.expect("missing WEBHOOK_ADDRESS env var");
Ok(Self { webhook_address, .. })
}
}
Env vars are set per-module and injected by the runtime as WASI environment variables. This separates configuration from code — you can deploy the same WASM module to staging and production with different env vars.
There are two ways to set them:
-
In the module’s
Cargo.tomlunder[package.metadata.umari.env]— picked up automatically byumari deploy. This is the easiest path and keeps env config alongside the module:[package.metadata.umari.env] DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/..." WEBHOOK_ADDRESS = "https://example.com/hooks" -
Via the lower-level CLI / API upload commands — when calling
umari effects upload(or the equivalent HTTP API) directly, pass--env KEY=VALUEflags. Useful for ad-hoc uploads or one-off overrides.
Error handling
Effect handle() returns anyhow::Result<()>. When a handler returns an error (or otherwise panics / traps), the runtime treats the module as having failed and retries indefinitely with exponential backoff:
- The first retry runs after 1 second.
- If the module keeps failing on the same event store position, the delay doubles each attempt, capped at 10 minutes.
- If a retry makes progress (the position advances) before failing again, the backoff resets to 1 second.
- There is no maximum attempt count — retries continue forever until the module starts succeeding (or is deactivated/replaced).
The effect’s position watermark is never advanced past a failed event, so the same event is replayed on each restart. The module’s captured stderr and the system log lines from the supervisor will show "module failed, retrying with backoff" along with the current attempt and delay.
This means effects should be designed to fail loudly on transient errors and let the runtime back off and retry — that’s the recovery path. The flip side is that returning an error from handle() will pause forward progress for that effect until the underlying problem is fixed, so you should reserve Err for situations that genuinely warrant blocking.
For known-permanent failures, prefer catching them inside handle() and recording the failure as an event (via a private command) so the fold-check pattern can skip the work on future runs.
The export_effect! macro
#![allow(unused)]
fn main() {
export_effect!(RegisterWebhooks);
}
This macro generates the WASM component interface, wiring up init(), handle(), partition_key(), and query().
Crate structure for effects with private commands
When an effect needs private commands and custom events, define them in the same crate:
effects/register-webhooks/
├── Cargo.toml
└── src/
├── lib.rs # Effect implementation + export_effect!
├── commands.rs # Private commands (plain functions)
└── events.rs # Effect-private events
Private events and commands are scoped to the effect — they’re not part of the shared domain library. This keeps the shared library focused on domain events and prevents effect implementation details from leaking.
10. Project Structure
This chapter describes how to organize an Umari project workspace. The structure is opinionated but flexible — you can adapt it to your needs.
Workspace layout
my-project/
├── Cargo.toml # Workspace root + shared library crate
├── src/ # Shared library: events, folds
│ ├── lib.rs
│ ├── events/
│ │ ├── mod.rs
│ │ ├── shop.rs # Shop events
│ │ ├── product.rs # Product events
│ │ └── order.rs # Order events
│ ├── folds/
│ │ └── mod.rs # Fold definitions
│ └── helpers.rs # Utility functions
├── commands/
│ ├── create-product/ # crate: create-product
│ │ ├── Cargo.toml
│ │ └── src/lib.rs
│ ├── update-product/
│ └── archive-product/
├── projectors/
│ ├── products/ # crate: products
│ │ ├── Cargo.toml
│ │ └── src/lib.rs
│ ├── shops/
│ └── orders/
├── effects/
│ ├── notify-external-service/
│ │ ├── Cargo.toml
│ │ └── src/
│ │ ├── lib.rs # Effect + export_effect!
│ │ ├── commands.rs # Private commands
│ │ └── events.rs # Effect-private events
│ └── sync-inventory/
└── .gitignore
Root Cargo.toml
The root is both the workspace definition and the shared library crate:
[package]
name = "my-project"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
umari.workspace = true
uuid.workspace = true
validator.workspace = true
[workspace]
resolver = "2"
members = [
".",
"commands/create-product",
"commands/update-product",
"projectors/products",
"projectors/shops",
"effects/notify-external-service",
# ... etc
]
[workspace.dependencies]
my-project = { path = "." }
umari = { path = "/path/to/umari/crates/umari" }
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.22", features = ["v4"] }
validator = { version = "0.20", features = ["derive"] }
wasi-http-client = { version = "0.2.1", features = ["json"] }
Key points:
- The root package (
.) is listed as a workspace member — it’s the shared library my-project = { path = "." }lets command/projector/effect crates depend onmy-project.workspace = trueumaripath points to the SDK crate (not the runtime workspace)- Each module crate uses
crate-type = ["cdylib", "rlib"]
Module crate Cargo.toml
Each module is a minimal crate:
# commands/create-product/Cargo.toml
[package]
name = "create-product"
version = "1.0.0"
edition = "2024"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
my-project.workspace = true # Shared library
umari.workspace = true # SDK
anyhow.workspace = true
serde.workspace = true
validator.workspace = true
crate-type = ["cdylib", "rlib"] is required. cdylib produces the .wasm file; rlib enables Rust-level linking for tests.
For effects with HTTP:
[dependencies]
wasi-http-client.workspace = true
For projectors with additional dependencies:
[dependencies]
rust_decimal.workspace = true
The shared library
The shared library crate (root src/) contains:
Events (src/events/)
#![allow(unused)]
fn main() {
// src/events/mod.rs
pub mod shop;
pub mod product;
pub mod order;
pub use shop::*;
pub use product::*;
pub use order::*;
}
Each event module groups related events:
#![allow(unused)]
fn main() {
// src/events/shop.rs
use umari::prelude::*;
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
#[domain_id]
#[crypto_scope]
pub shop_id: u64,
pub shop_domain: String,
pub access_token: String,
}
}
Folds (src/folds/)
#![allow(unused)]
fn main() {
// src/folds/mod.rs
use umari::prelude::*;
use crate::events::*;
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
#[domain_id]
pub shop_id: u64,
}
impl Fold for ShopExistsFold {
type Events = SingleEvent<ShopConnected>;
type State = bool;
fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
*exists = true;
}
}
}
Library root (src/lib.rs)
#![allow(unused)]
fn main() {
// src/lib.rs
pub mod events;
pub mod folds;
pub mod helpers;
}
Naming conventions
| Item | Convention | Example |
|---|---|---|
| Event struct | PascalCase, past tense | ShopConnected, ProductCreated |
| Event type string | object.verb dot notation | "shop.connected", "product.created" |
| Command crate | kebab-case, imperative | create-product, update-product |
| Command function | snake_case | pub fn execute(...) |
| Command input struct | Always Input | pub struct Input |
| Projector crate | kebab-case, plural noun | products, shops |
| Projector struct | PascalCase, plural | Products, Shops |
| Effect crate | kebab-case, verb phrase | notify-external-service |
| Effect struct | PascalCase | NotifyExternalService |
| Fold struct | PascalCase noun + Fold | ShopExistsFold, ProductStateFold |
| Fold state | PascalCase noun + State | ProductState, WidgetState |
| EventSet enum | Always Query | enum Query { ... } |
Dependencies between module types
shared library (events, folds)
↑ ↑ ↑
| | |
commands/ projectors/ effects/
(import events, (import events) (import events,
import folds) import folds,
may have own
events + commands)
- Commands import events and folds from the shared library
- Projectors import events from the shared library
- Effects import events and folds from the shared library; may define their own events and commands locally
Working with modules
The umari CLI is the intended way to scaffold, build, and deploy modules — it understands the workspace layout above, picks up env vars from [package.metadata.umari.env], and handles the wasm32-wasip2 build for you.
umari new
Scaffold a new module crate, wire it into the workspace Cargo.toml, and drop in starter lib.rs content:
umari new command create-product
umari new projector products
umari new effect notify-external-service
The default language is Rust; pass --lang js to scaffold a JS module instead.
umari build
Build every module crate in the workspace (Rust → wasm32-wasip2, JS → componentized wasm) in release mode. Pass paths to scope the build to a subset of crates:
umari build # build everything
umari build commands/create-product # build a single crate
umari build commands/ projectors/products # multiple paths
umari build --debug # debug profile
umari build -j 4 # cap parallelism
Outputs land in the usual target/wasm32-wasip2/{release,debug}/<name>.wasm paths.
umari deploy
Build everything (same flags as umari build) and upload + activate each module against the server configured for your client:
umari deploy # build + upload + activate all modules
umari deploy --no-activate # upload but don't activate
umari deploy --bump-patch # auto-bump patch version on conflict
umari deploy commands/create-product
Env vars declared under [package.metadata.umari.env] in each module’s Cargo.toml are sent along with the upload.
Manual cargo / lower-level CLI
If you want to bypass the workspace tooling — e.g. one-off uploads, custom build flags, or wiring this into your own scripts — you can still build with plain cargo and upload through the typed subcommands:
cargo build --target wasm32-wasip2 --release -p create-product
umari commands upload create-product 1.0.0 \
target/wasm32-wasip2/release/create_product.wasm \
--env API_KEY=... --activate
For production, always build in release mode — debug builds can be 10× larger and slower.
Adding a new module by hand
If you don’t want to use umari new:
- Create the crate directory with
Cargo.tomlandsrc/lib.rs - Add it to the workspace
memberslist in the rootCargo.toml - Implement the trait (
Commandvia#[export_command],Projector, orEffect) umari build(orcargo build --target wasm32-wasip2 --release -p <name>)umari deploy(or upload manually via the lower-level CLI / API)
11. SQLite API Reference
Projectors and effects each get their own isolated SQLite database file. Modules cannot read each other’s data. This chapter is the complete reference for the SQLite API available inside WASM modules.
Mental model
execute*/query*free functions run against the implicit connection — convenient for one-offs.prepare()returns aStatementyou store on your module struct, so the SQL is compiled once and reused per event.- Errors that return
Resultare constraint violations you can recover from. Everything else (wrong column name, wrong type, “expected one row but got two”) traps the module — the runtime treats traps as bugs, not business failures.
Every handle() call runs inside an implicit transaction. Don’t reach for BEGIN/COMMIT yourself.
Free functions
These all operate on the module’s connection.
execute(sql, params) -> Result<usize, SqliteError>
Run a single statement, returning rows affected.
#![allow(unused)]
fn main() {
execute(
"INSERT INTO plans (plan_id, shop_id, title) VALUES (?1, ?2, ?3)",
params![plan_id, shop_id.to_string(), title],
)?;
}
execute_batch(sql) -> Result<(), SqliteError>
Run multiple statements separated by semicolons. Use this in init() for DDL.
#![allow(unused)]
fn main() {
execute_batch(
"
CREATE TABLE IF NOT EXISTS widgets (
widget_id TEXT PRIMARY KEY,
name TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_widgets_name ON widgets (name);
",
)?;
}
query_one(sql, params) -> Row
Return exactly one row. Traps if zero or multiple rows match.
#![allow(unused)]
fn main() {
let row = query_one(
"SELECT name, price FROM plans WHERE plan_id = ?1",
params![plan_id],
);
let name: String = row.get("name");
let price: String = row.get(1); // by column index
}
query_row(sql, params) -> Option<Row>
Return at most one row. Extra rows are silently dropped — the first match wins.
#![allow(unused)]
fn main() {
if let Some(row) = query_row(
"SELECT access_token FROM shops WHERE shop_id = ?1",
params![id],
) {
let token: String = row.get(0);
}
}
last_insert_rowid() -> Option<i64>
Returns the rowid of the most recent successful INSERT on this connection, or None if no insert has happened yet.
Prepared statements
For queries that run on every event, prepare once in init() and reuse:
prepare(sql) -> Statement
Returns a Statement directly — there’s no Result. A malformed SQL string traps the module.
#![allow(unused)]
fn main() {
struct MyProjector {
insert_widget: Statement,
archive_widget: Statement,
}
impl Projector for MyProjector {
type Query = WidgetEvents;
fn init() -> anyhow::Result<Self> {
execute_batch("CREATE TABLE IF NOT EXISTS widgets (...)")?;
Ok(MyProjector {
insert_widget: prepare("INSERT INTO widgets (id, name) VALUES (?1, ?2)"),
archive_widget: prepare("UPDATE widgets SET archived = TRUE WHERE id = ?1"),
})
}
fn handle(&mut self, event: StoredEvent<WidgetEvents>) -> anyhow::Result<()> {
// ...
Ok(())
}
}
}
Statement methods
| Method | Returns | Traps on |
|---|---|---|
execute(params) | Result<usize, SqliteError> | — |
query(params) | Vec<Row> | — |
query_one(params) | Row | zero rows, or more than one row |
query_row(params) | Option<Row> | — |
#![allow(unused)]
fn main() {
self.insert_widget.execute(params![id.to_string(), name])?;
let rows = self.list_widgets.query(params![shop_id]);
for row in rows {
let name: String = row.get("name");
}
}
Parameters
Pass parameters using the params! macro:
#![allow(unused)]
fn main() {
params![] // no params
params![value1, value2, value3] // positional params for ?1, ?2, ?3
}
Each value is converted through Into<SqliteValue>. To pass a single value, write params![id] — there’s no trailing-comma syntax.
Supported parameter types
| Rust type | SQLite type |
|---|---|
bool | Integer (0 or 1) |
i8, i16, i32, i64, isize | Integer |
u8, u16, u32 | Integer |
f32, f64 | Real |
String, &str | Text |
Vec<u8> | Blob |
Uuid | Text (canonical hyphenated form) |
Option<T> | Null when None, otherwise T |
Reading rows
Row::get<I, T>(column) -> T
Get a column value by name (&str) or zero-based position (usize). Traps on type mismatch or unknown column.
#![allow(unused)]
fn main() {
let name: String = row.get("name");
let count: i64 = row.get(0);
let maybe: Option<String> = row.get("nullable_col");
}
Row::tuple<T>() -> T
Unpack the first N columns into a tuple by position (up to 8 columns).
#![allow(unused)]
fn main() {
let (id, name, price): (String, String, String) = row.tuple();
}
Supported column types
| Rust type | SQLite value |
|---|---|
bool | Integer (0 = false, 1 = true; other values trap) |
String | Text |
i64 | Integer |
f64 | Real |
Vec<u8> | Blob |
Option<T> | Null → None, otherwise Some(T) |
Errors
SqliteError only covers constraint violations — those are the only failures the API surfaces as Result:
#![allow(unused)]
fn main() {
pub enum SqliteError {
ConstraintViolation(ConstraintViolation),
}
pub struct ConstraintViolation {
pub kind: ConstraintViolationKind, // Unique, PrimaryKey, NotNull, ForeignKey, Check, Other
pub message: String,
}
}
Use it when you want a UNIQUE collision to mean “this event was already projected, skip”:
#![allow(unused)]
fn main() {
match execute("INSERT INTO widgets (id, name) VALUES (?1, ?2)", params![id, name]) {
Ok(_) => {}
Err(SqliteError::ConstraintViolation(v)) if v.kind == ConstraintViolationKind::Unique => {
// already projected — fine
}
Err(err) => return Err(err.into()),
}
}
Transactions
The runtime wraps every handle() call in a transaction: it begins before the call and commits if handle() returns Ok. Returning Err rolls back. You don’t manage transactions manually.
Best practices
- Use
IF NOT EXISTSin DDL soinit()is idempotent across module restarts. - Store UUIDs as
TEXT(SQLite has no UUID type) —Uuidalready converts to canonical text. - Store decimals as
TEXTto avoid floating-point precision issues. - Always use
params!— never interpolate into the SQL string. - Prepare statements that run per-event in
init(); use the free functions for one-offs.
12. Fold Reference
umari::prelude ships a handful of generic folds for the most common state-derivation shapes — “did this happen?”, “what’s the latest value?”, “how many times?”, “which of these two won?”. Reach for these first; only write a custom fold when none of them fit.
| Fold | State | Pick when you ask… |
|---|---|---|
EventFold<E> | EventState<E> | “Give me every occurrence of E” |
LatestEvent<E> | Option<StoredEvent<E>> | “What’s the most recent E?” |
EventCounter<E> | u64 | “How many E have happened?” |
EventToggle<A, B> | ToggleState<A, B> | “Was the last event A or B?” |
SingleEvent<E> | — (an EventSet) | Custom fold that only reads one event type |
EventFold
Collects every occurrence of event type E into a Vec.
State: EventState<E>
#![allow(unused)]
fn main() {
.fold::<EventFold<ShopConnected>>()
.execute(|input, connected| {
if connected.exists() {
let first = &connected.events[0];
// ...
}
Ok(emit![])
})
}
#![allow(unused)]
fn main() {
impl<E: Event> EventState<E> {
pub events: Vec<StoredEvent<E>>;
pub fn exists(&self) -> bool;
}
}
Use when: you need to inspect or aggregate over the full history.
Avoid when: you only need the most recent value — LatestEvent is cheaper.
LatestEvent
Keeps only the most recent E. Each new event replaces the previous.
State: Option<StoredEvent<E>>
#![allow(unused)]
fn main() {
.fold::<LatestEvent<WarrantyPlanUpdated>>()
.execute(|input, latest| {
if let Some(event) = latest {
// event.data is the most recent WarrantyPlanUpdated
}
Ok(emit![])
})
}
Use when: you care about the current value — last Updated, last Connected, etc.
Avoid when: you need the full history.
EventCounter
Counts occurrences of E without storing event data.
State: u64
#![allow(unused)]
fn main() {
.fold::<EventCounter<WarrantySold>>()
.execute(|input, sale_count| {
anyhow::ensure!(
sale_count < MAX_WARRANTIES,
"shop has reached the maximum number of warranties"
);
Ok(emit![/* ... */])
})
}
Use when: you only need a count. Avoid when: you need to inspect individual events.
EventToggle
Tracks which of two opposing events occurred last. Designed for created/deleted, activated/deactivated, archived/unarchived pairs.
State: ToggleState<A, B>
#![allow(unused)]
fn main() {
.fold::<EventToggle<WarrantyPlanArchived, WarrantyPlanUnarchived>>()
.execute(|input, toggle| {
if toggle.is_a() {
// currently archived
} else if toggle.is_b() {
// currently unarchived
} else {
// neither has happened
}
Ok(emit![])
})
}
#![allow(unused)]
fn main() {
impl<A: Event, B: Event> ToggleState<A, B> {
pub last: Option<ToggleSide<A, B>>;
pub fn is_a(&self) -> bool;
pub fn is_b(&self) -> bool;
pub fn as_a(&self) -> Option<&StoredEvent<A>>;
pub fn as_b(&self) -> Option<&StoredEvent<B>>;
}
}
Use when: paired opposing events — archived/unarchived, activated/deactivated, locked/unlocked.
Constraint: A and B must share the same domain ID fields.
SingleEvent
Not a fold — an EventSet shorthand for custom folds that only read a single event type. Use it as type Events = SingleEvent<MyEvent>;.
#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
#[domain_id]
pub shop_id: u64,
}
impl Fold for ShopExistsFold {
type Events = SingleEvent<ShopConnected>;
type State = bool;
fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
*exists = true;
}
}
}
Custom folds
When none of the built-ins fit, implement Fold yourself:
#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct MyFold {
#[domain_id] pub shop_id: u64,
#[from_domain_id(default)]
pub custom_field: String,
}
#[derive(EventSet)]
pub enum MyFoldEvents {
#[scope(shop_id)]
EventA(EventA),
EventB(EventB),
}
#[derive(Default)]
pub struct MyState {
pub a_count: u64,
pub b_count: u64,
pub latest_value: Option<String>,
}
impl Fold for MyFold {
type Events = MyFoldEvents;
type State = MyState;
fn apply(&self, state: &mut MyState, event: StoredEvent<MyFoldEvents>) {
match event.data {
MyFoldEvents::EventA(_) => state.a_count += 1,
MyFoldEvents::EventB(ev) => {
state.b_count += 1;
state.latest_value = Some(ev.some_field);
}
}
}
}
}
#[from_domain_id(default)]
Fields not annotated with #[domain_id] can use #[from_domain_id(default)] to get their default value during fold construction. The fold won’t try to bind these from domain IDs.
Fold composition limit
Commands support up to 12 folds in a single tuple. If you need more, compose multiple folds into a single fold by nesting state types, or split the command into multiple commands.
Appendix A: Quick Reference
Derive macros
| Macro | Applies to | Purpose |
|---|---|---|
#[derive(Event)] | Struct | Makes a struct a persisted event |
#[derive(EventSet)] | Enum | Creates a typed event set for queries |
#[derive(DomainIds)] | Struct | Generates domain_ids() method |
#[derive(FromDomainIds)] | Struct | Generates constructor from domain ID bindings |
Attribute macros
| Attribute | Placement | Purpose |
|---|---|---|
#[event_type("...")] | Event struct | Sets the event type string |
#[domain_id] | Field | Marks a field as a domain ID tag |
#[domain_id("alt_name")] | Field | Domain ID with alternate tag name |
#[crypto_scope] | Field on Event | Encrypts the event (must be on a #[domain_id] field) |
#[scope(field)] | EventSet variant | Filter by a single domain ID field |
#[scope(field = "value")] | EventSet variant | Hardcoded tag filter |
#[from_domain_id(default)] | Fold field | Use default value, don’t bind from domain IDs |
#[validate(...)] | Input field | Validation rules (validator crate) |
Export macros
| Macro | Usage |
|---|---|
#[export_command] | Annotate the command function |
export_projector!(Name); | Wire up projector WASM interface |
export_effect!(Name); | Wire up effect WASM interface |
Command builder API
#![allow(unused)]
fn main() {
Command::new(input, context) // Create builder
.fold::<T>() // Register fold (no args)
.fold_args::<T>(args) // Register fold with args
.fold_with(|input| MyFold { .. }) // Register fold manually
.execute(|input, states| { .. }) // Run with fold states
}
Emit and reject
#![allow(unused)]
fn main() {
emit![] // No events
emit![Event { field: val }] // Single event
emit![EventA { .. }, EventB { .. }] // Multiple events
// Business rejections — use anyhow::ensure! / bail!:
anyhow::ensure!(balance >= amount, "insufficient funds");
anyhow::bail!("shop not connected");
}
SQLite API
#![allow(unused)]
fn main() {
// Connection-level
execute(sql, params) -> Result<usize, SqliteError>
execute_batch(sql) -> Result<(), SqliteError>
query_one(sql, params) -> Row // traps on 0 or >1 rows
query_row(sql, params) -> Option<Row>
last_insert_rowid() -> Option<i64>
// Prepared statements (built with prepare(sql) -> Statement)
stmt.execute(params) -> Result<usize, SqliteError>
stmt.query(params) -> Vec<Row>
stmt.query_one(params) -> Row // traps on 0 or >1 rows
stmt.query_row(params) -> Option<Row>
// Parameters
params![] // no params
params![val1, val2, val3] // positional
// Reading
row.get::<&str, String>("column_name")
row.get::<usize, i64>(0)
row.tuple::<(String, String, i64)>()
}
Built-in fold types
| Type | State | Use for |
|---|---|---|
EventFold<E> | EventState<E> (vec of all events) | Full history |
LatestEvent<E> | Option<StoredEvent<E>> | Most recent event |
EventCounter<E> | u64 | Counting events |
EventToggle<A, B> | ToggleState<A, B> | Paired opposing events |
SingleEvent<E> | N/A (it’s an EventSet) | Single event type queries |
Event envelope fields
| Field | Type | Description |
|---|---|---|
id | Uuid | Event unique ID |
position | u64 | Global log position |
event_type | String | Event type identifier |
tags | Vec<String> | Domain ID tags |
timestamp | DateTime<Utc> | When written |
correlation_id | Uuid | Originating action |
causation_id | Uuid | Command execution |
triggering_event_id | Option<Uuid> | Causal event |
idempotency_key | Option<Uuid> | Deduplication |
encryption_scope | Option<String> | Encryption scope |
encryption_key_id | Option<Uuid> | Key identifier |
CommandContext
#![allow(unused)]
fn main() {
CommandContext::new() // Auto-detect (effect or external)
.with_correlation_id(id) // Set correlation ID
.with_triggering_event_id(id) // Set triggering event
.with_idempotency_key(key) // Set idempotency key
}
Environment variables
Server (umari binary)
| Variable | Default | Description |
|---|---|---|
UMARI_DATA_DIR | ./umari-data | runtime database directory |
UMARI_EVENT_STORE_URL | http://localhost:50051 | UmaDB event store URL |
UMARI_API_ADDR | 127.0.0.1:3000 | HTTP API bind address |
UMARI_API_KEY | (none) | required Authorization: Bearer <key> |
UMARI_LOG | umari=info | tracing-subscriber filter |
UMARI_VERBOSE | false | set log level to trace |
UMARI_NO_BANNER | false | hide the startup banner |
UMARI_SHUTDOWN_TIMEOUT | 10s | graceful shutdown deadline |
CLI (umari-cli / umari client)
| Variable | Default | Description |
|---|---|---|
UMARI_URL | http://localhost:3000 | server URL |
UMARI_API_KEY | (none) | bearer token sent with each request |
Essential imports
#![allow(unused)]
fn main() {
use umari::prelude::*; // Everything you need
use serde::{Serialize, Deserialize};
use validator::Validate;
use schemars::JsonSchema; // Optional, for OpenAPI docs
}
Naming conventions
| Item | Convention |
|---|---|
| Event struct | PascalCase past tense |
| Event type string | "object.verb" |
| Command crate | kebab-case imperative |
| Command input struct | Input |
| Projector crate | kebab-case plural noun |
| Effect crate | kebab-case verb phrase |
| Fold struct | PascalCase + Fold |
| Fold state | PascalCase + State |
| EventSet enum | Query |
Appendix B: Migration from Traditional Event Sourcing
If you’re coming from a traditional event sourcing background (EventStoreDB, Marten, Axon, etc.), this chapter maps familiar concepts to Umari equivalents.
Key differences
| Concept | Traditional ES | Umari |
|---|---|---|
| Event streams | Per-aggregate streams | Single global log with domain ID tags |
| Concurrency | Stream position check | DCB — dynamic boundaries by event overlap |
| Aggregates | Aggregate root with state | No aggregates; folds derive state on-demand |
| Read models | External projections | Projectors (WASM modules with SQLite) |
| Side effects | Process managers / sagas | Effects (WASM modules with HTTP access) |
| Business logic | In-process | WASM components, hot-reloadable |
| State derivation | Aggregate replay | Folds (event → state reducer) |
| Idempotency | Idempotency keys on commands | Built-in via idempotency_key + domain checks |
Aggregate → Fold + Command
Traditional:
public class Widget : Aggregate
{
public WidgetId Id { get; private set; }
public string Name { get; private set; }
public bool Archived { get; private set; }
public void Create(CreateWidget command)
{
if (Version > 0) throw new Exception("Already exists");
Apply(new WidgetCreated(command.WidgetId, command.Name));
}
public void When(WidgetCreated e)
{
Id = e.WidgetId;
Name = e.Name;
}
}
Umari:
#![allow(unused)]
fn main() {
// State — just data
#[derive(Default)]
pub struct WidgetState {
pub exists: bool,
pub name: Option<String>,
pub archived: bool,
}
// Fold — binds to domain IDs, replays events
#[derive(DomainIds, FromDomainIds)]
pub struct WidgetFold {
#[domain_id]
pub widget_id: Uuid,
}
#[derive(EventSet)]
pub enum WidgetEvents {
#[scope(widget_id)]
WidgetCreated(WidgetCreated),
#[scope(widget_id)]
WidgetArchived(WidgetArchived),
}
impl Fold for WidgetFold {
type Events = WidgetEvents;
type State = WidgetState;
fn apply(&self, state: &mut WidgetState, event: StoredEvent<WidgetEvents>) {
match event.data {
WidgetEvents::WidgetCreated(ev) => {
state.exists = true;
state.name = Some(ev.name);
}
WidgetEvents::WidgetArchived(_) => state.archived = true,
}
}
}
// Command — validates, checks invariants, emits events
#[export_command]
pub fn create_widget(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
Command::new(input, context)
.fold::<WidgetFold>()
.execute(|input, widget| {
anyhow::ensure!(!widget.exists, "widget already exists");
Ok(emit![WidgetCreated {
widget_id: input.widget_id,
name: input.name,
}])
})
}
}
Key differences:
- The fold struct is separate from the state — the fold holds domain ID bindings, the state holds the derived data
apply()takes&self(the fold bindings) and a mutable state reference- The command function is stateless — no aggregate instance, just pure logic
- Consistency is per-domain-ID, not per-aggregate-stream
Projection → Projector
Traditional:
public class WidgetProjection : Projection<WidgetReadModel>
{
public WidgetProjection()
{
Project<WidgetCreated>(e => Insert(new WidgetReadModel { Id = e.WidgetId, Name = e.Name }));
Project<WidgetArchived>(e => Update(e.WidgetId, w => w.Archived = true));
}
}
Umari:
#![allow(unused)]
fn main() {
impl Projector for Widgets {
type Query = WidgetQuery;
fn init() -> anyhow::Result<Self> {
execute_batch("CREATE TABLE IF NOT EXISTS widgets (
widget_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
archived BOOLEAN NOT NULL DEFAULT FALSE
)")?;
Ok(Widgets {})
}
fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
match event.data {
WidgetQuery::WidgetCreated(ev) => {
execute("INSERT INTO widgets (widget_id, name) VALUES (?1, ?2)",
params![ev.widget_id, ev.name])?;
}
WidgetQuery::WidgetArchived(ev) => {
execute("UPDATE widgets SET archived = TRUE WHERE widget_id = ?1",
params![ev.widget_id])?;
}
}
Ok(())
}
}
}
Key differences:
- Projectors are WASM modules, not in-process projections
- Each projector gets its own SQLite database
init()is called once;handle()is called per event- Projectors are naturally idempotent (replay-safe)
Process Manager / Saga → Effect
Traditional:
public class OrderSaga : Saga<OrderSagaState>,
IAmStartedBy<OrderPlaced>,
IHandle<PaymentReceived>
{
public async Task Handle(OrderPlaced e)
{
Data.OrderId = e.OrderId;
await Bus.Send(new ProcessPayment(e.OrderId, e.Amount));
MarkAsComplete();
}
}
Umari:
#![allow(unused)]
fn main() {
impl Effect for OrderProcessor {
type Query = OrderEvents;
fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
match event.data {
OrderEvents::OrderPlaced(ev) => {
// 1. Fold-check via a private command — if the "scheduled"
// event was already emitted, the command short-circuits
// and the receipt is empty.
let receipt = schedule_payment_processing(
SchedulePaymentProcessingInput { order_id: ev.order_id },
CommandContext::new(),
)?;
if !receipt.has_event::<PaymentProcessingScheduled>() {
return Ok(()); // already processed on a previous run
}
// 2. Side effect — call the payment gateway.
let response = self.http_client
.post("https://payment.example.com/process")
.json(&json!({ "order_id": ev.order_id, "amount": ev.amount }))
.send()?;
// 3. Record outcome via another private command.
record_payment_result(
RecordPaymentResultInput {
order_id: ev.order_id,
success: response.status().is_success(),
},
CommandContext::new(),
)?;
}
}
Ok(())
}
}
}
Key differences:
- Effects use the fold-check → side effect → record pattern for idempotency.
- Effects have their own SQLite for internal state, but it’s not the idempotency source — the event store is.
- Effects call commands directly as Rust functions; no message bus.
- HTTP is provided via WASI; no host-side bridging code.
EventStoreDB streams → UmaDB DCB
Traditional:
Stream "widget-abc" → [WidgetCreated, WidgetRenamed, WidgetArchived]
Stream "widget-def" → [WidgetCreated]
Umari:
Global log:
pos 1: WidgetCreated { widget_id: "abc", ... } tags: [widget_id:abc]
pos 2: WidgetCreated { widget_id: "def", ... } tags: [widget_id:def]
pos 3: WidgetRenamed { widget_id: "abc", name: "new" } tags: [widget_id:abc]
pos 4: WidgetArchived { widget_id: "abc" } tags: [widget_id:abc]
When command queries widget_id=abc: gets events at positions 1, 3, 4. No pre-partitioning needed.
Common migration patterns
1. Start with events
Port your event definitions first:
#![allow(unused)]
fn main() {
// Old: AccountCreated { AccountId, OwnerName }
#[derive(Event, Serialize, Deserialize)]
#[event_type("account.created")]
pub struct AccountCreated {
#[domain_id]
pub account_id: String,
pub owner_name: String,
}
}
Add domain IDs to all events. Choose which fields identify the entity.
2. Port aggregates to folds
For each aggregate, create:
- A state struct with
#[derive(Default)] - A fold struct with
#[derive(DomainIds, FromDomainIds)] - An EventSet enum
- Implement
Fold
The apply() method replaces your aggregate’s When() handlers.
3. Port command handlers
Replace aggregate method calls with the Command::new().fold().execute() pattern. Invariant checks go in the execute closure. Event emission uses emit![].
4. Port projections to projectors
Port your projection SQL to init() and handle(). Each projector is a separate crate with its own SQLite database. Use execute_batch() for DDL, execute()/params![] for DML.
5. Port sagas to effects
Replace message bus interactions with direct command execution. Replace saga state stored in a database with SQLite (internal state) and the event store (idempotency anchor).