Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

The Umari Book

Status: Draft.

This book is the official guide to building event-sourced systems with Umari. It covers concepts, patterns, API reference, and operations — everything you need to go from zero to production.

Who this book is for

You should be comfortable with Rust. Prior event sourcing experience helps but is not required — the first part explains the concepts from scratch.

How to read this book

  • Part 1 — Concepts: Read straight through. These chapters establish the mental model.
  • Part 2 — Building Blocks: Read straight through. You’ll use every concept here.
  • Part 3 — Module Types: Reference each chapter as you build that module type.
  • Part 4 — Working with Umari: Practical patterns, project structure, and API reference.
  • Part 5 — Runtime & Operations: Read when deploying or debugging.

Code examples follow current API conventions.

Conventions

  • Crate names: umari refers to the SDK crate (crates/umari). umari-runtime refers to the runtime. Module crates use kebab-case names.

1. Introduction

Umari is a WASM-native event sourcing runtime. You write business logic in Rust, compile it to WebAssembly, and the runtime handles event persistence, module lifecycle, and state derivation.

What is event sourcing?

In event sourcing, every state change is recorded as an immutable event in an append-only log. The current state of the system is derived by replaying events from the beginning. This gives you:

  • Full audit trail — every change is recorded, forever
  • Temporal queries — ask “what was the state at time T?” by replaying up to that point
  • Complete replayability — delete all derived state and rebuild it from events
  • Causal traceability — every event knows which action produced it and which event triggered that action

Umari adds two key innovations on top of classical event sourcing:

  1. No aggregates, no streams — consistency boundaries are dynamic (DCB), not pre-partitioned
  2. Everything is WASM — business logic is compiled to WebAssembly and loaded at runtime

Three module types

Umari splits business logic into three distinct concerns, each compiled as a separate WASM module:

ModuleJobWrites events?SQLite?
CommandValidate input, check invariants, emit eventsYes — the only writerNo
ProjectorBuild queryable read models from the event streamNoYes
EffectReact to events with side effects (HTTP, email, third-party APIs)Only by calling commandsYes

Commands are the only mechanism for writing events. Projectors and effects subscribe to events and react. Effects can call commands, which write events, which trigger more projectors and effects — forming a causal chain.

How it fits together

External trigger (HTTP, webhook, cron)
    │
    ▼
Command ──► emits events ──► Event Store (UmaDB)
                                  │
                ┌─────────────────┴─────────────────┐
                ▼                                   ▼
           Projector                              Effect
           (builds read model                    (side effects:
            in SQLite)                           HTTP, execute commands)
                                                     │
                                                     ▼
                                                  Command
                                                  (private, for
                                                   idempotency)

Prerequisites

  • Rust — you write modules in Rust using the umari SDK crate
  • UmaDB — the event store. Must be running before starting the Umari server
  • wasm32-wasip2 targetrustup target add wasm32-wasip2

What you’ll build

A typical Umari application looks like this:

my-project/
├── src/                     # Shared library: events, folds
│   ├── events/
│   │   ├── shop.rs
│   │   ├── warranty.rs
│   │   └── claim.rs
│   └── folds/
│       └── mod.rs
├── commands/
│   ├── connect-shop/        # Crate: connect-shop
│   ├── create-warranty-plan/
│   └── cancel-warranty/
├── projectors/
│   ├── plans/               # Crate: plans
│   ├── shops/
│   └── warranties/
├── effects/
│   ├── register-shopify-webhooks/
│   └── record-warranty-sale/
└── Cargo.toml               # Workspace root

Each command, projector, and effect is its own crate, compiled as a WASM component. They all depend on the shared library for event and fold definitions.

About this book

This book is the canonical reference for building on Umari. It walks through the runtime, the SDK, and the patterns you’ll use to write commands, projectors, and effects.

2. Core Concepts

This chapter establishes the foundational concepts that Umari is built on. Understanding these will make the rest of the book straightforward.

Events as immutable facts

An event represents something that has already happened. It is named in past tense and carries all the data needed to describe what occurred. Once written, an event is never modified or deleted — it is a permanent fact in the system’s history.

#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
    #[domain_id]
    pub shop_id: u64,
    pub shop_domain: String,
    pub shop_name: String,
    pub access_token: String,
}
}

Every event carries metadata in the event envelope:

FieldPurpose
idUnique UUID for this event
positionGlobal position in the event log (monotonic)
event_typeString identifier ("shop.connected")
tagsDomain ID key-value pairs used to query events by domain ID (["shop_id:42"])
timestampWhen the event was written
correlation_idTraces back to the originating user action
causation_idThe specific command execution that produced this event
triggering_event_idThe event that caused an effect to trigger this command

No aggregates — Dynamic Consistency Boundaries

Traditional event sourcing uses aggregates: each entity has its own event stream, and concurrency is managed by comparing stream positions. Umari does not use aggregates.

Instead, Umari uses Dynamic Consistency Boundaries (DCB). When a command runs, it declares exactly which events it needs — specified by event types and domain ID tags. The runtime fetches those events and uses their positions to form a consistency boundary at execution time.

This means:

  • Different commands touching different domain IDs form different boundaries
  • Two commands can run concurrently as long as their event sets don’t overlap
  • There is no pre-partitioning of the event log into streams
  • Consistency is dynamic, not pre-defined

Domain IDs

Domain IDs are the mechanism that makes DCB work. They are fields on events that identify what the event is “about.” When a command queries events, it specifies domain ID values, and the event store returns only events tagged with those values.

#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
    #[domain_id]
    pub shop_id: u64,        // tag: shop_id:42
    #[domain_id]
    pub warranty_id: Uuid,   // tag: warranty_id:abc-def
    #[domain_id]
    pub order_id: u64,       // tag: order_id:1001
    pub plan_title: String,   // not a domain ID — just data
}
}

Events are stored in a single global log. Domain ID tags enable the runtime to efficiently fetch only the relevant subset.

State is derived, not stored

There is no “current state” table. Everything is derived from events:

ModuleHow it derives state
CommandFolds — replay just the events this domain ID touches, in memory, on every call
Projectorhandle() updates SQLite as events arrive. The SQLite file is a rebuildable cache, not the source of truth
EffectTracks internal work in SQLite, but checks the event store (via folds) to decide whether a side effect has already happened

The contract: delete every SQLite file, replay events from position 0, end up in the same state — and side effects that already ran don’t run again.

Commands are the only writers

Every event in the store comes from a command. Projectors never write events; effects don’t either — when an effect needs to write, it calls a command function inline. A command is just a regular Rust function exported with #[export_command], so any module can import and call one directly.

That single rule keeps every write going through validation and invariant checks, and gives every event a clear causal chain.

Causal chain

Every event traces back to the user action that initiated it:

User HTTP request
  └── Command "create-warranty-plan"
        └── Event "warranty.plan.created"  (correlation_id = req_id)
              └── Effect "sync-warranty-plan-product-variations"
                    └── Command "create-master-product"  (triggering_event_id = above)
                          └── Event "shop.master_product.created"

The correlation_id flows through the entire chain. The triggering_event_id links each downstream command to the specific event that caused it.

Key principles

  1. Events are immutable facts — never modified, only appended
  2. Commands are the only writers — all events originate from command execution
  3. No aggregates or streams — DCB forms consistency boundaries dynamically
  4. State is derived by replay — folds for commands, SQLite for projectors
  5. The system is fully replayable — delete SQLite, replay events, identical result

3. Architecture Overview

This chapter describes the Umari runtime architecture — how the pieces connect, how modules are loaded, and how events flow through the system.

Crate map

Umari is a Cargo workspace with these crates:

CrateRole
umari (crates/umari)SDK — traits, types, macros, and the WASM guest library
umari-macros (crates/macros)Derive macros: Event, EventSet, DomainIds, FromDomainIds, #[export_command]
umari-runtime (crates/runtime)Runtime — Wasmtime-based module runner, event dispatch, actor system
umari-api (crates/api)HTTP API server (Axum) — upload modules, execute commands, manage lifecycle
umari-server (crates/server)Server binary — starts the runtime + API + optional Web UI
umari-cli (crates/cli)CLI tool — upload modules, execute commands, manage the system
umari-ui (crates/ui)Web UI built with HTMX
umari-types (crates/types)Shared API types

Runtime architecture

The runtime is built on the kameo actor framework with Wasmtime as the WASM engine.

RuntimeSupervisor
├── PubSub<ModuleEvent>          # Module lifecycle events (upload, activate, deactivate)
├── ModuleStoreActor             # SQLite store for WASM bytes, metadata, crypto keys
├── CommandActor                 # Handles command execution requests
│   └── subscribes to module_pubsub for command modules
├── ModuleSupervisor<ProjectorWorld>
│   ├── ModuleActor (plans)      # One per active projector
│   └── ModuleActor (shops)
└── ModuleSupervisor<EffectWorld>
    ├── ModuleActor (register-shopify-webhooks)
    │   ├── Worker (global)      # Pool of workers for parallel processing
    │   ├── Worker (keyed, 0)    # 8 keyed workers by default
    │   └── Worker (keyed, 7)
    └── ModuleActor (record-warranty-sale)
        └── Worker (global)      # Workers run on dedicated OS threads

ModuleSupervisor

Each module type (projector, effect) has a ModuleSupervisor that:

  • Subscribes to ModuleEvent via the pubsub
  • On ModuleActivated: compiles the WASM, spawns a ModuleActor
  • On ModuleDeactivated: stops the ModuleActor
  • On ModuleUpdated (new version uploaded): spawns new actor, waits for old to stop, swaps

ModuleActor

Each active module has one ModuleActor that:

  • Opens a subscription to the event store via DCB
  • Receives event batches from the event stream
  • For projectors: processes events sequentially inline
  • For effects: dispatches events to a worker pool based on partition_key()
  • Maintains a last_position watermark in SQLite for crash recovery

Worker pool (effects only)

Effects use a worker pool for parallel processing. The pool consists of:

  • One global worker — handles events with no partition key or PartitionKey::Inline
  • N keyed workers (default: 8) — each handles a hash-consistent subset of partition keys

The partition_key() method on an effect determines routing. Returning None routes to the global worker (sequential for that effect). Returning Some(key) routes to hash(key) % 8, enabling parallel processing of independent event streams.

Workers acknowledge completion back to the ModuleActor, which tracks the watermark — the highest contiguous position acknowledged. The watermark prevents data loss: if a worker crashes, events above the watermark are replayed.

CommandActor

Commands are different from projectors/effects — they don’t subscribe to event streams. Instead, the CommandActor handles on-demand execution:

  1. A client (HTTP API, effect, or direct call) sends an execution request
  2. The CommandActor compiles (or retrieves from cache) the command’s WASM
  3. The command runs: queries events via DCB, applies folds, executes user logic, emits events
  4. Events are appended to the event store atomically

WASM runtime

Each module runs as a WASM component using the component model. The WIT interfaces define the contract between the guest (your Rust code) and the host (the Umari runtime):

WIT interfaceProvided toPurpose
command/transactionCommands, EffectsRead events, commit new events
commonAll modulesEvent types, event query types
sqliteProjectors, EffectsSQLite database access
cryptoEffectsDelete encryption keys (crypto-shredding)
wasi:httpEffectsMake HTTP requests

Commands have a simpler interface — they only need the transaction interface. Effects have the richest interface, including HTTP and the SQLite database for their own internal state.

Note: An older command/executor interface still exists in the WIT package but is unused — effects today execute commands by calling the command function directly (see Chapter 9: Effects).

Event flow

1. External trigger (HTTP POST /execute)
2. CommandActor instantiates the already-compiled command component
   (all active modules are compiled once at runtime startup and cached)
3. Command executes:
   a. Builds DCB query from fold domain IDs
   b. Opens transaction, reads events in batches
   c. Applies events to folds, checks idempotency
   d. Calls user's execute closure with fold state
   e. User closure emits events (or returns empty emit)
   f. Events appended to event store atomically
4. Event store notifies subscribers
5. Projector ModuleActor receives event batch
6. Projector handles each event, updates SQLite
7. Effect ModuleActor receives event batch
8. Effect dispatches to worker by partition key
9. Worker handles event, may execute commands or make HTTP calls

Input validation (e.g. the validator crate) is a guest-side convention used by the Rust SDK — it isn’t part of the runtime contract. Other SDKs may handle validation differently or not at all.

Data directory layout

umari-data/
├── umari.sqlite              # Module store: WASM bytes, metadata, crypto keys
├── projector/
│   ├── plans.sqlite           # Per-projector read model
│   ├── plans.log              # Captured stdout/stderr from the projector
│   ├── shops.sqlite
│   └── warranties.sqlite
├── effect/
│   ├── register-shopify-webhooks.sqlite  # Per-effect internal state
│   ├── register-shopify-webhooks.log     # Captured stdout/stderr from the effect
│   └── record-warranty-sale.sqlite
└── cache/
    └── *.cwasm                # Compiled WASM component cache

Anything a module writes to stdout or stderr (e.g. println!, eprintln!, tracing logs configured to write to stderr) is captured by the runtime, buffered in memory for inspection via the API/UI, and persisted to the per-module .log file shown above. Commands don’t run as long-lived actors and don’t have their own per-module directory.

Module store

The module store (umari.sqlite) is a SQLite database that holds:

  • WASM bytecode — raw .wasm bytes for each module version
  • Module metadata — name, version, module type, active status
  • Environment variables — key-value pairs injected as WASI env vars
  • Crypto keys — AES-256 keys per encryption scope

When a module is uploaded, the bytes are stored. When activated, the WASM is compiled (cached to cache/*.cwasm) and a ModuleActor is spawned. Multiple versions can coexist — only one is “active” at a time.

4. Events

Events are the immutable facts of your system. This chapter covers how to define them, what data they carry, and how they flow through Umari.

Defining an event

An event is a Rust struct that derives Event, DomainIds, Serialize, and Deserialize:

#![allow(unused)]
fn main() {
use umari::prelude::*;
use serde::{Serialize, Deserialize};
use uuid::Uuid;

#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.plan.created")]
pub struct WarrantyPlanCreated {
    #[domain_id]
    pub plan_id: Uuid,
    #[domain_id]
    pub shop_id: u64,
    pub title: String,
    pub duration_months: u32,
    pub price: Decimal,
    pub applicable_to: ProductApplicability,
}
}

Event and DomainIds are independent derives — Event does not include DomainIds, so you need to add both. Clone and Debug are optional; add them if you want them on a particular event.

The #[event_type("warranty.plan.created")] attribute sets the EVENT_TYPE constant. This string is what the event store uses to identify the event, what projectors and effects filter on, and what appears in the event_type field of StoredEvent. The attribute is optional — if omitted, EVENT_TYPE defaults to the struct name.

Naming convention: A common convention is dot-separated past-tense verb phrases like shop.connected, warranty.plan.created, order.paid — the first segment is the domain entity and the second is the action. Umari doesn’t enforce this; use whatever naming scheme fits your project.

Domain IDs

Fields annotated with #[domain_id] become tags on the stored event. Commands query for events by these tags via DCB, and event sets (the queries used by folds, projectors, and effects) use them to declare what they want to see. Choose domain IDs carefully:

  • What entity is this event “about”? That’s a domain ID.
  • Is this just reference data? Not a domain ID.
#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
    #[domain_id] pub shop_id: u64,       // ✓ — identifies the shop
    #[domain_id] pub warranty_id: Uuid,  // ✓ — identifies the warranty
    #[domain_id] pub order_id: u64,      // ✓ — identifies the order
    #[domain_id] pub line_item_id: u64,  // ✓ — identifies the order line
    pub plan_title: String,              // ✗ — just data
    pub customer_email: String,          // ✗ — just data
    pub price: Decimal,                  // ✗ — just data
}
}

Each domain ID becomes a tag like shop_id:42. When a command queries for events with shop_id=42, the event store returns only events carrying that tag.

Renaming domain IDs

You can override the tag name — useful when the Rust field name differs from the domain concept:

#![allow(unused)]
fn main() {
#[domain_id("plan_id")]
pub warranty_plan_id: Uuid,
}

This produces a tag like plan_id:abc-def rather than warranty_plan_id:abc-def.

The DomainIds derive

#[derive(DomainIds)] generates the domain_ids() method, which collects all #[domain_id] fields into a DomainIdBindings map used for querying. It’s a separate derive from Event — events need both, and so do other structs that participate in DCB queries (see Chapter 5: Domain IDs).

Encryption scopes

Events that contain sensitive data (PII, access tokens, etc.) can be encrypted at rest. To enable encryption, mark a single field on the event with #[crypto_scope]:

#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
    #[domain_id]
    #[crypto_scope]
    pub shop_id: u64,
    pub shop_domain: String,
    pub access_token: String,
}
}

The field marked #[crypto_scope] does not mean “encrypt this field.” Instead, its value (combined with the field name as field_name:value, e.g. shop_id:42) becomes a lookup key for an encryption key, and the runtime then encrypts the entire event payload with that key. The event envelope (id, position, tags, timestamps, etc.) is never encrypted.

  • No #[crypto_scope] on any field → the event is stored in plaintext.
  • One field with #[crypto_scope] → the whole payload is encrypted under a per-scope AES-256-GCM key. Each unique scope value (e.g. each shop_id) gets its own key.

Encryption is transparent:

  • Writing: the runtime serializes the event, fetches/creates the key for the scope, and encrypts the payload before appending.
  • Reading: the runtime decrypts the payload before passing it to folds, projectors, and effects.
  • Key deletion: deleting a scope’s key (crypto-shredding) makes all events for that scope permanently unreadable — they appear as Value::Null and are skipped.

See Chapter 14: Encryption & Crypto-Shredding for details.

The Event trait

The Event derive macro generates an implementation of:

#![allow(unused)]
fn main() {
pub trait Event: DomainIds + Serialize + DeserializeOwned + Sized {
    const EVENT_TYPE: &'static str;

    fn encryption_scope(&self) -> Option<String> {
        None  // Overridden if any field has #[crypto_scope]
    }
}
}

You rarely need to know this — the derive macro handles everything. But it’s useful to understand when debugging.

The StoredEvent envelope

When you receive an event in a fold, projector, or effect, it arrives as StoredEvent<T>:

#![allow(unused)]
fn main() {
pub struct StoredEvent<T> {
    pub id: Uuid,
    pub position: u64,
    pub event_type: String,
    pub tags: Vec<String>,               // ["shop_id:42", "plan_id:abc"]
    pub timestamp: DateTime<Utc>,
    pub correlation_id: Uuid,
    pub causation_id: Uuid,
    pub triggering_event_id: Option<Uuid>,
    pub idempotency_key: Option<Uuid>,
    pub encryption_scope: Option<String>,
    pub encryption_key_id: Option<Uuid>,
    pub data: T,                          // Your typed event data
}
}

The data field contains your deserialized event struct. The envelope fields carry tracing and routing metadata.

Event sets

An event set is an enum that groups multiple event types together. It’s used by folds, projectors, and effects to declare which events they care about.

#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
    ShopConnected(ShopConnected),
    ShopReconnected(ShopReconnected),
}
}

The #[derive(EventSet)] macro generates the EventSet trait implementation:

#![allow(unused)]
fn main() {
pub trait EventSet: Sized {
    type Item;

    fn event_types() -> Vec<&'static str>;      // ["shop.connected", "shop.reconnected"]
    fn event_domain_ids() -> Vec<EventDomainId>; // Domain ID requirements per event type
    fn from_event(event_type: &str, data: &Value)
        -> Option<Result<Self::Item, SerializationError>>;
}
}

Single events as event sets

For folds that only care about one event type, use SingleEvent<E>:

#![allow(unused)]
fn main() {
impl Fold for ShopExistsFold {
    type Events = SingleEvent<ShopConnected>;
    type State = bool;

    fn apply(&self, exists: &mut bool, event: StoredEvent<ShopConnected>) {
        *exists = true;
    }
}
}

SingleEvent<E> is a built-in event set for a single event type. It’s the simplest way to subscribe to one event.

Scoping with #[scope(...)]

The #[scope(...)] attribute on an EventSet variant controls which domain ID tags are used to filter that event type:

#![allow(unused)]
fn main() {
#[derive(EventSet)]
pub enum WidgetFoldEvents {
    // Only receive WidgetCreated events for this specific widget_id
    #[scope(widget_id)]
    WidgetCreated(WidgetCreated),

    // Receive WidgetArchived events — uses all domain ID bindings from the fold
    WidgetArchived(WidgetArchived),

    // Always match events where shop_id = "acme", regardless of fold bindings
    #[scope(shop_id = "acme")]
    GlobalSettingsChanged(GlobalSettingsChanged),
}
}

Three forms:

  • No attribute: The variant is filtered using all domain ID bindings from the fold/command input
  • #[scope(field_name)]: Filter only by the named domain ID — restricts scope to fewer IDs
  • #[scope(field_name = "literal")]: Hardcoded tag value — matches events with that fixed value

Scoping matters. A fold that checks whether a widget name is unique within a shop should be scoped by shop_id only. Without #[scope(shop_id)], the fold would also filter by widget_id and only see events for that specific widget — missing other widgets in the shop.

For projectors and effects (which have no fold bindings), only the hardcoded form is meaningful:

#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
    WidgetCreated(WidgetCreated),
    // Only webhook events for the "orders/paid" topic
    #[scope(topic = "orders/paid")]
    WebhookReceived(WebhookReceived),
}
}

Naming conventions

  • Event struct: PascalCase past-tense verb phrase: WidgetCreated, ShopConnected, WarrantyClaimFiled
  • Event type string: object.verb dot notation: "widget.created", "shop.connected"
  • Event set enum: Always named Query

5. Domain IDs

Domain IDs are the indexing and routing mechanism in Umari. They determine which events a command reads, which events a projector receives, and how effects partition work.

What domain IDs are

A domain ID is a field on an event that identifies the entity the event is about. Each domain ID becomes a tag in the format field_name:value stored alongside the event.

#![allow(unused)]
fn main() {
#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("warranty.sold")]
pub struct WarrantySold {
    #[domain_id] pub shop_id: u64,       // tag: shop_id:42
    #[domain_id] pub warranty_id: Uuid,  // tag: warranty_id:abc-def-123
    #[domain_id] pub order_id: u64,      // tag: order_id:1001
}
}

These tags are used by the event store (UmaDB) for DCB queries. When a command requests events with shop_id=42, only events tagged shop_id:42 are returned.

Choosing domain IDs

Ask yourself: “If this field changes, does it identify a different entity’s consistency boundary?”

  • shop_id on WarrantySold — yes, the warranty belongs to a specific shop. Domain ID.
  • customer_email on WarrantySold — no, it’s just data about the warranty. Not a domain ID.
  • line_item_id on WarrantySold — yes, a single line item can only be sold once. Adding it as a domain ID lets a fold ask “has this line ever been sold?” and reject duplicates. Domain ID.

If you’re unsure, err on the side of fewer domain IDs. Adding one later is backwards-compatible — existing events just won’t have the new tag. Removing one is not — you’d have to backfill every existing event.

The DomainIds trait

The #[derive(DomainIds)] macro generates an implementation of:

#![allow(unused)]
fn main() {
pub trait DomainIds {
    const DOMAIN_ID_FIELDS: &'static [&'static str];
    fn domain_ids(&self) -> DomainIdBindings;
}
}

DomainIdBindings is IndexMap<&'static str, String> — a map from field name to string value. This is what the runtime uses to construct DCB queries.

Important: #[derive(DomainIds)] is separate from #[derive(Event)]. The Event derive does not include DomainIds. You need DomainIds on:

  • Events — so their tags can be written and queried
  • Command input structs — so the command’s domain ID bindings can be derived from the input
  • Fold structs — so folds can declare which bindings they need

The FromDomainIds trait

Fold structs implement FromDomainIds to be constructed from command input bindings:

#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
    #[domain_id]
    pub shop_id: u64,
}
}

FromDomainIds generates a constructor that takes domain ID bindings and creates the fold struct. Only fields matching the fold’s #[domain_id] fields are copied from the bindings. This is how a command that declares shop_id=42 and plan_id=abc on its input automatically passes just shop_id=42 to the ShopExistsFold.

Where it’s used: FromDomainIds is only used by the Command builder’s .fold::<T>() and .fold_args::<T>(args) methods — those are the call sites that take a fold type by name and construct it from the command’s input bindings. If you always reach for .fold_with(|input| ...) instead, you don’t strictly need FromDomainIds. It may be removed in the future, but for now derive it on your fold structs so they’re usable with .fold::<T>().

The generic EventFold<E>, LatestEvent<E>, EventCounter<E>, and EventToggle<A,B> all implement FromDomainIds automatically — they filter bindings to only the fields that the event type E declares as domain IDs.

How DCB queries are built

When a command has registered folds, the runtime builds a DCB query by:

  1. Collecting all EventDomainId entries from every fold’s EventSet
  2. For each entry, looking up the dynamic field values from the input’s domain ID bindings
  3. Grouping by tag sets — events that share the same tag combination are requested together

For example, a command with input { shop_id: 42, plan_id: abc } and two folds:

ShopExistsFold: reads SingleEvent<ShopConnected>, dynamic_fields: [shop_id]
  → DCB item: type="shop.connected", tags=["shop_id:42"]

PlanExistsFold: reads SingleEvent<WarrantyPlanCreated>, dynamic_fields: [shop_id, plan_id]
  → DCB item: type="warranty.plan.created", tags=["shop_id:42", "plan_id:abc"]

The event store returns events matching either query, deduplicated and in position order.

Scoping

The #[scope(...)] attribute on EventSet variants controls how an event type is filtered against the surrounding bindings (for folds) or the global event log (for projectors and effects). It’s the main knob for narrowing — or broadening — what a query sees.

See Chapter 4: Events → Scoping with #[scope(...)] for the full description and examples.

6. Folds

A fold is a “reduce” over the event log. You declare which events to read and how each one updates a piece of in-memory state. Commands replay folds on every call to recover whatever state they need to make a decision — they have no SQLite, no other query path.

If you’ve used Iterator::fold, the mental model is identical:

#![allow(unused)]
fn main() {
// Iterator::fold
events.iter().fold(State::default(), |state, event| apply(state, event));

// Umari Fold trait
fn apply(&self, state: &mut Self::State, event: StoredEvent<E>) { ... }
}

The runtime supplies the events (scoped by the fold’s domain IDs) and the initial state (State::default()). You only write the body.

The Fold trait

#![allow(unused)]
fn main() {
pub trait Fold: DomainIds + 'static {
    type Events: EventSet;
    type State: Default + 'static;

    fn apply(&self, state: &mut Self::State, event: StoredEvent<<Self::Events as EventSet>::Item>);
}
}
  • Events — which events this fold subscribes to (an EventSet)
  • State — the type of state produced by replaying those events (must implement Default)
  • apply() — called once per matching event, in position order, to update the state

Simple fold

#![allow(unused)]
fn main() {
use umari::prelude::*;

#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
    #[domain_id]
    pub shop_id: u64,
}

impl Fold for ShopExistsFold {
    type Events = SingleEvent<ShopConnected>;
    type State = bool;

    fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
        *exists = true;
    }
}
}

This fold subscribes to ShopConnected events scoped by shop_id. The state starts as false (the Default for bool). When a ShopConnected event is encountered during replay, the state becomes true. The command can then check if !exists { ... }.

Fold with an EventSet enum

For folds that need multiple event types:

#![allow(unused)]
fn main() {
#[derive(EventSet)]
pub enum ShopDomainQuery {
    ShopConnected(ShopConnected),
    ShopReconnected(ShopReconnected),
}

#[derive(DomainIds, FromDomainIds)]
pub struct ShopDomainFold {
    #[domain_id]
    pub shop_id: u64,
}

impl Fold for ShopDomainFold {
    type Events = ShopDomainQuery;
    type State = Option<String>;

    fn apply(&self, domain: &mut Option<String>, event: StoredEvent<ShopDomainQuery>) {
        match event.data {
            ShopDomainQuery::ShopConnected(ev) => *domain = Some(ev.shop_domain),
            ShopDomainQuery::ShopReconnected(ev) => *domain = Some(ev.shop_domain),
        }
    }
}
}

The apply method receives the deserialized event and decides how to update state. Events arrive in position order — the state after all events have been applied is what’s passed to the command’s execute closure.

Built-in fold types

Umari provides several generic folds for common patterns:

EventFold

Collects ALL occurrences of event E into a Vec. Use when you need the full history.

#![allow(unused)]
fn main() {
let connected = cmd.fold::<EventFold<ShopConnected>>();
// State: EventState<ShopConnected> { events: Vec<StoredEvent<ShopConnected>> }
// connected.exists() → true if at least one event exists
}

LatestEvent

Keeps only the most recent occurrence of event E. More efficient than EventFold when you only need the current value.

#![allow(unused)]
fn main() {
let latest_connected = cmd.fold::<LatestEvent<ShopConnected>>();
// State: Option<StoredEvent<ShopConnected>>
}

EventCounter

Counts occurrences of event E. Efficient — doesn’t store events.

#![allow(unused)]
fn main() {
let sale_count = cmd.fold::<EventCounter<WarrantySold>>();
// State: u64
}

EventToggle

Tracks which of two opposing events occurred last. Ideal for created/deleted, activated/deactivated, archived/unarchived pairs.

#![allow(unused)]
fn main() {
let toggle = cmd.fold::<EventToggle<WarrantyPlanArchived, WarrantyPlanUnarchived>>();
// State: ToggleState<A, B> {
//     last: Option<ToggleSide<A, B>>  // None, Some(ToggleSide::A(...)), or Some(ToggleSide::B(...))
// }
}

Custom folds

For anything beyond the built-in types, implement Fold directly:

#![allow(unused)]
fn main() {
#[derive(Default)]
pub struct WarrantyPlanState {
    pub exists: bool,
    pub title: Option<String>,
    pub status: PlanStatus,
    pub archived: bool,
}

#[derive(EventSet)]
pub enum WarrantyPlanEvents {
    #[scope(plan_id)]
    WarrantyPlanCreated(WarrantyPlanCreated),
    #[scope(plan_id)]
    WarrantyPlanUpdated(WarrantyPlanUpdated),
    #[scope(plan_id)]
    WarrantyPlanArchived(WarrantyPlanArchived),
    #[scope(plan_id)]
    WarrantyPlanUnarchived(WarrantyPlanUnarchived),
}

#[derive(DomainIds, FromDomainIds)]
pub struct WarrantyPlanFold {
    #[domain_id]
    pub plan_id: Uuid,
}

impl Fold for WarrantyPlanFold {
    type Events = WarrantyPlanEvents;
    type State = WarrantyPlanState;

    fn apply(&self, state: &mut WarrantyPlanState, event: StoredEvent<WarrantyPlanEvents>) {
        match event.data {
            WarrantyPlanEvents::WarrantyPlanCreated(ev) => {
                state.exists = true;
                state.title = Some(ev.title);
                state.status = ev.status;
            }
            WarrantyPlanEvents::WarrantyPlanUpdated(ev) => {
                state.title = Some(ev.title);
                state.status = ev.status;
            }
            WarrantyPlanEvents::WarrantyPlanArchived(_) => state.archived = true,
            WarrantyPlanEvents::WarrantyPlanUnarchived(_) => state.archived = false,
        }
    }
}
}

Note that #[scope(plan_id)] ensures we only see events for the specific plan being queried. Without the scope attribute, the fold would filter by all domain ID bindings from the command input — which may be too narrow.

How folds are registered in commands

Commands register folds through the builder pattern:

#![allow(unused)]
fn main() {
Command::new(input, context)
    .fold::<ShopExistsFold>()           // No extra args
    .fold::<WarrantyPlanFold>()         // No extra args
    .fold_args::<CustomFold>(args)      // With additional constructor args
    .fold_with(|input| MyFold { ... })  // Manual construction from input
    .execute(|input, (shop_exists, plan_state)| {
        // ...
    })
}

Each .fold::<T>() call:

  1. Extends the DCB query with the fold’s event domain IDs
  2. Creates the fold from the input’s domain ID bindings (via FromDomainIds)
  3. Initializes the fold’s state to Default
  4. Returns a FoldHandle<T> — a typed token for extracting state in the execute closure

The execute closure receives the input and a tuple of fold states, in the order they were registered. Up to 12 folds are supported in a single tuple.

Fold state and idempotency

When the runtime replays events into folds, it also checks for idempotency. If the command was called with an idempotency_key, and any event in the fold’s scope has a matching idempotency_key, the command exits early without calling the execute closure — returning an empty result.

This means you can safely retry command executions without worrying about duplicate events. The runtime handles deduplication at the event store level.

Crypto-shredded events in folds

When an event’s encryption key has been deleted, the event data is Value::Null and encryption_scope is Some. Folds skip these events silently — from_event returns None for null data, so apply is never called. Your fold state simply won’t reflect the shredded event, which is the intended behavior.

FoldQuery — standalone fold execution

Outside of commands, you can run folds standalone with FoldQuery. This is useful in effects for checking event store state before performing side effects:

#![allow(unused)]
fn main() {
use umari::prelude::FoldQuery;

let topics_registered = FoldQuery::new()
    .fold_iter(topics.iter().map(|topic| AlreadyRegisteredFold {
        shop_id,
        topic: topic.to_string(),
        current_event_id: event.id,
    }))
    .run()?;
}

FoldQuery opens a transaction, reads events, applies them to the registered folds, and returns the fold states. It’s the same mechanism commands use internally, exposed for effects that need to check event store state without executing a full command.

7. Commands

Commands are the entry point for all mutations. They are pure, deterministic functions that validate input, check invariants against event history, and emit new events. Commands are the only mechanism for writing to the event store.

Anatomy of a command

A command is a function annotated with #[export_command]. It receives typed input and a CommandContext, then uses the Command builder to declare folds and execute logic.

#![allow(unused)]
fn main() {
use umari::prelude::*;
use schemars::JsonSchema;
use serde::{Serialize, Deserialize};
use validator::Validate;

#[derive(DomainIds, Validate, JsonSchema, Serialize, Deserialize)]
pub struct Input {
    #[domain_id]
    pub shop_id: u64,
    #[domain_id]
    pub plan_id: Uuid,
    #[validate(length(min = 1, max = 200))]
    pub title: String,
    #[validate(range(min = 1, max = 120))]
    pub duration_months: u32,
    pub price: Decimal,
}

#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
    // 1. Validate input
    input.validate()?;

    // 2. Build command with folds, execute
    Command::new(input, context)
        .fold::<ShopExistsFold>()
        .fold::<WarrantyPlanFold>()
        .execute(|input, (shop_exists, plan_state)| {
            // 3. Check invariants
            anyhow::ensure!(shop_exists, "shop does not exist");
            anyhow::ensure!(!plan_state.exists, "plan already exists with this ID");
            anyhow::ensure!(
                !plan_state.archived,
                "a plan with this ID was previously archived"
            );

            // 4. Emit events
            Ok(emit![WarrantyPlanCreated {
                plan_id: input.plan_id,
                shop_id: input.shop_id,
                title: input.title,
                duration_months: input.duration_months,
                price: input.price,
                applicable_to: input.applicable_to,
                status: PlanStatus::Draft,
            }])
        })
}
}

The Command builder

Command::new(input, context)

Creates a new command builder. The input must implement DomainIds (derive it). The context is a CommandContext.

.fold::<T>()

Registers a fold. T must implement Fold + FromDomainIds<Args = ()>. The fold is constructed from the input’s domain ID bindings automatically. Returns a Command with the fold’s handle appended to the fold state tuple.

.fold_args::<T>(args)

Same as .fold::<T>() but passes additional constructor arguments to T::from_domain_ids(args, bindings).

.fold_with(|input| MyFold { ... })

Manually construct a fold from the raw input. Use this when the fold has custom construction logic beyond domain ID binding.

.execute(|input, fold_states| { ... })

Runs the command. The closure receives the input (by value) and the fold states as a tuple. Inside the closure, you:

  1. Check invariants against fold state (use anyhow::ensure! or bail!)
  2. Decide which events to emit (use the emit! macro)
  3. Return Ok(emit![...]) or Ok(emit![]) for a no-op

The emit! macro

#![allow(unused)]
fn main() {
emit![]                                 // No events
emit![SomeEvent { field: value }]       // Single event
emit![EventA { .. }, EventB { .. }]     // Multiple events
}

Each event expression must be a struct implementing Event. The macro serializes each event, collects its domain IDs, and returns an Emit value.

You can also build an Emit manually:

#![allow(unused)]
fn main() {
Emit::new()
    .event(FirstEvent { .. })
    .event(SecondEvent { .. })
}

Command idempotency

Commands support built-in idempotency through the idempotency_key field in CommandContext. When present, the runtime checks whether any event in the fold scope already carries this key. If a match is found, the command exits early — the execute closure is never called, and no events are emitted.

#![allow(unused)]
fn main() {
let context = CommandContext::new()
    .with_idempotency_key(Some(request_id));
}

This deduplication happens at the event store level, so it survives crashes and restarts. You can safely retry commands without producing duplicate events.

You can also implement domain-level idempotency inside the execute closure:

#![allow(unused)]
fn main() {
.execute(|input, plan_state| {
    if plan_state.exists && plan_state.title.as_deref() == Some(&input.title) {
        return Ok(emit![]);  // Plan already exists with same data — idempotent
    }
    // ... emit WarrantyPlanCreated
})
}

CommandContext

#![allow(unused)]
fn main() {
pub struct CommandContext {
    pub correlation_id: Uuid,           // request that started the chain
    pub causation_id: Uuid,             // this specific execution
    pub triggering_event_id: Option<Uuid>,  // the event that called us, if any
    pub idempotency_key: Option<Uuid>,
}
}

You almost never construct this by hand. Use CommandContext::new() and the right values are populated automatically:

Where the command runsWhat new() produces
HTTP / CLI entry pointfresh correlation_id, fresh causation_id, triggering_event_id = None
Inside an effect’s handle()inherits correlation_id and triggering_event_id from the effect’s current event; fresh causation_id

The effect context lives in a thread-local (CURRENT_EVENT_CONTEXT) that the runtime sets before each handle() call. To override fields explicitly:

#![allow(unused)]
fn main() {
CommandContext::new()
    .with_correlation_id(id)
    .with_triggering_event_id(id)
    .with_idempotency_key(key)
}

Private vs public commands

Commands fall into two categories by convention, not by type:

  • Public commands — part of the domain API. Called by external services, HTTP handlers, or scheduled jobs. These commands live in the commands/ directory and are uploaded to the runtime.

  • Private commands — implementation details of effect idempotency. Only called from within effects. These are often defined as plain Rust functions (not #[export_command]) inside the effect crate itself, or as separate modules within the effect.

#![allow(unused)]
fn main() {
// In effects/register-shopify-webhooks/src/commands.rs

use umari::prelude::*;

#[derive(DomainIds)]
pub struct RecordWebhookRegistrationCompletedInput {
    #[domain_id] pub shop_id: u64,
    #[domain_id] pub topic: String,
}

pub fn record_webhook_registration_completed(
    input: RecordWebhookRegistrationCompletedInput,
    context: CommandContext,
) -> anyhow::Result<ExecuteOutput> {
    Command::new(input, context)
        .fold::<ShopExistsFold>()
        .execute(|input, shop_exists| {
            anyhow::ensure!(shop_exists, "shop does not exist");
            Ok(emit![ShopWebhookRegistrationCompleted {
                shop_id: input.shop_id,
                topic: input.topic,
            }])
        })
}
}

Private commands use the same Command::new(...).fold::<T>().execute(...) pattern — they just aren’t exported as WASM modules.

Validation

Use the validator crate for input validation:

#![allow(unused)]
fn main() {
#[derive(DomainIds, Validate, Serialize, Deserialize)]
pub struct Input {
    #[validate(length(min = 1, max = 200))]
    pub title: String,
    #[validate(range(min = 1, max = 120))]
    pub duration_months: u32,
}

#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
    input.validate()?;  // Call this first
    // ...
}
}

Custom validators:

#![allow(unused)]
fn main() {
fn non_nil_uuid(value: &Uuid) -> Result<(), validator::ValidationError> {
    if value.is_nil() {
        return Err(validator::ValidationError::new("uuid")
            .with_message("must not be nil".into()));
    }
    Ok(())
}

#[derive(DomainIds, Validate, Serialize, Deserialize)]
pub struct Input {
    #[validate(custom(function = "non_nil_uuid"))]
    pub plan_id: Uuid,
}
}

ExecuteOutput

The return type of every command:

#![allow(unused)]
fn main() {
pub struct ExecuteOutput {
    pub position: Option<u64>,      // Event store position after commit
    pub events: Vec<EmittedEvent>,  // Events that were emitted
}

pub struct EmittedEvent {
    pub id: Uuid,
    pub event_type: String,
    pub domain_ids: IndexMap<String, String>,  // field_name → value
}
}

Effects use ExecuteOutput to ask “did this idempotency-guard command actually emit anything?”:

#![allow(unused)]
fn main() {
let receipt = ScheduleWebhookRegistration::execute(&input)?;
if !receipt.has_event::<ShopWebhooksRegistrationScheduled>() {
    return Ok(());  // already scheduled — skip the side effect
}
}

has_event::<E>() checks whether any event of type E is present in receipt.events. If the command short-circuited (idempotency hit, or invariant failed silently), the receipt will be empty and the effect can bail out cleanly.

Complete command example

#![allow(unused)]
fn main() {
#[derive(DomainIds, Validate, JsonSchema, Serialize, Deserialize)]
pub struct Input {
    #[domain_id]
    pub shop_id: u64,
    pub shop_domain: String,
    pub shop_name: String,
    pub access_token: String,
    #[validate(length(min = 1))]
    pub owner_email: String,
}

#[export_command]
pub fn execute(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
    input.validate()?;

    Command::new(input, context)
        .fold::<EventFold<ShopConnected>>()
        .execute(|input, connected| {
            if connected.exists() {
                Ok(emit![ShopReconnected {
                    shop_id: input.shop_id,
                    shop_domain: input.shop_domain,
                    shop_name: input.shop_name,
                    access_token: input.access_token,
                    owner_email: input.owner_email,
                }])
            } else {
                Ok(emit![ShopConnected {
                    shop_id: input.shop_id,
                    shop_domain: input.shop_domain,
                    shop_name: input.shop_name,
                    access_token: input.access_token,
                    owner_email: input.owner_email,
                }])
            }
        })
}
}

This command uses EventFold<ShopConnected> to determine whether the shop has been connected before. If yes, it emits ShopReconnected; if no, it emits ShopConnected. Both events carry the same data but have different semantics downstream.

8. Projectors

Projectors build read models by consuming events from the event store and updating SQLite databases. Their databases are designed to be queried by external processes — your HTTP API, dashboard, or reporting tools.

How projectors work

A projector:

  1. Calls init() once at startup to create tables and prepare statements
  2. Receives events from the event store in position order
  3. Calls handle() for each event to update its SQLite database
  4. Maintains a last_position watermark for crash recovery

Projectors are naturally idempotent — deleting the SQLite database and replaying all events from the beginning produces the exact same result. There is no need for explicit idempotency logic.

The Projector trait

#![allow(unused)]
fn main() {
pub trait Projector: Sized {
    type Query: EventSet;

    fn init() -> anyhow::Result<Self>;
    fn handle(&mut self, event: StoredEvent<<Self::Query as EventSet>::Item>)
        -> anyhow::Result<()>;
}
}

A complete projector

#![allow(unused)]
fn main() {
use umari::prelude::*;
use rust_decimal::Decimal;
use std::str::FromStr;

export_projector!(Plans);

#[derive(EventSet)]
enum Query {
    ShopConnected(ShopConnected),
    ShopReconnected(ShopReconnected),
    WarrantyPlanCreated(WarrantyPlanCreated),
    WarrantyPlanUpdated(WarrantyPlanUpdated),
    WarrantyPlanArchived(WarrantyPlanArchived),
    WarrantyPlanUnarchived(WarrantyPlanUnarchived),
    WarrantyPlanActivated(WarrantyPlanActivated),
    WarrantyPlanDeactivated(WarrantyPlanDeactivated),
    WarrantyPlanDeleted(WarrantyPlanDeleted),
    WarrantyPlanVariantSynced(WarrantyPlanVariantSynced),
    WarrantySold(WarrantySold),
}

struct Plans {}

impl Projector for Plans {
    type Query = Query;

    fn init() -> anyhow::Result<Self> {
        execute_batch(
            "
                CREATE TABLE IF NOT EXISTS shops (
                    shop_id TEXT PRIMARY KEY,
                    access_token TEXT NOT NULL
                );

                CREATE TABLE IF NOT EXISTS plans (
                    plan_id TEXT PRIMARY KEY,
                    shop_id TEXT NOT NULL,
                    title TEXT,
                    duration_months INTEGER,
                    price TEXT NOT NULL,
                    applicable_to TEXT NOT NULL,
                    archived BOOLEAN NOT NULL DEFAULT FALSE,
                    total_sold INTEGER NOT NULL DEFAULT 0,
                    revenue TEXT NOT NULL DEFAULT '0.00',
                    status TEXT NOT NULL DEFAULT 'draft',
                    shopify_variant_id TEXT,
                    created_at TEXT NOT NULL
                );
            ",
        )?;

        Ok(Plans {})
    }

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        match event.data {
            Query::ShopConnected(ev) => {
                execute(
                    "INSERT INTO shops (shop_id, access_token) VALUES (?1, ?2)",
                    params![ev.shop_id.to_string(), ev.access_token],
                )?;
            }
            Query::ShopReconnected(ev) => {
                execute(
                    "UPDATE shops SET access_token = ?2 WHERE shop_id = ?1",
                    params![ev.shop_id.to_string(), ev.access_token],
                )?;
            }
            Query::WarrantyPlanCreated(WarrantyPlanCreated {
                plan_id, shop_id, title, duration_months, price,
                applicable_to, status,
            }) => {
                execute(
                    "INSERT INTO plans (plan_id, shop_id, title, duration_months, price, applicable_to, status, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
                    params![plan_id, shop_id.to_string(), title, duration_months,
                        price.to_string(), serde_json::to_string(&applicable_to)?,
                        match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
                        event.timestamp.to_rfc3339(),
                    ],
                )?;
            }
            Query::WarrantyPlanUpdated(WarrantyPlanUpdated {
                plan_id, title, duration_months, price, applicable_to, status, ..
            }) => {
                execute(
                    "UPDATE plans SET title = ?2, duration_months = ?3, price = ?4, applicable_to = ?5, status = ?6 WHERE plan_id = ?1",
                    params![plan_id, title, duration_months, price.to_string(),
                        serde_json::to_string(&applicable_to)?,
                        match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
                    ],
                )?;
            }
            Query::WarrantyPlanArchived(WarrantyPlanArchived { plan_id, .. }) => {
                execute("UPDATE plans SET archived = TRUE where plan_id = ?1", params![plan_id])?;
            }
            Query::WarrantyPlanUnarchived(WarrantyPlanUnarchived { plan_id, .. }) => {
                execute("UPDATE plans SET archived = FALSE where plan_id = ?1", params![plan_id])?;
            }
            Query::WarrantyPlanActivated(WarrantyPlanActivated { plan_id, .. }) => {
                execute("UPDATE plans SET status = 'active' WHERE plan_id = ?1", params![plan_id])?;
            }
            Query::WarrantyPlanDeactivated(WarrantyPlanDeactivated { plan_id, .. }) => {
                execute("UPDATE plans SET status = 'draft' WHERE plan_id = ?1", params![plan_id])?;
            }
            Query::WarrantyPlanDeleted(WarrantyPlanDeleted { plan_id, .. }) => {
                execute("DELETE FROM plans WHERE plan_id = ?1", params![plan_id])?;
            }
            Query::WarrantyPlanVariantSynced(WarrantyPlanVariantSynced { plan_id, variant_id, .. }) => {
                execute(
                    "UPDATE plans SET shopify_variant_id = ?2 WHERE plan_id = ?1",
                    params![plan_id, variant_id.to_string()],
                )?;
            }
            Query::WarrantySold(WarrantySold { plan_id, price, .. }) => {
                let current_revenue: String = query_row(
                    "SELECT revenue FROM plans WHERE plan_id = ?1",
                    params![plan_id],
                )
                .map(|row| row.get(0))
                .unwrap_or_else(|| "0.00".to_string());
                let new_revenue = Decimal::from_str(&current_revenue)? + price;
                execute(
                    "UPDATE plans SET total_sold = total_sold + 1, revenue = ?2 WHERE plan_id = ?1",
                    params![plan_id, new_revenue.to_string()],
                )?;
            }
        }
        Ok(())
    }
}
}

The export_projector! macro

#![allow(unused)]
fn main() {
export_projector!(Plans);
}

This macro generates the WASM component interface glue — the projector() constructor, handle() entry point, and query() for declaring the event subscription. Your struct only needs to implement Projector.

Design guidelines

One table per concept

Each projector typically manages one primary concept (plans, shops, warranties). If you find yourself managing unrelated tables in one projector, split them.

Denormalize for reads

Projector tables should be optimized for query patterns, not normalized like an OLTP database. Denormalize freely — the source of truth is the event store, not the projector’s SQLite.

Use CREATE TABLE IF NOT EXISTS

Always use IF NOT EXISTS in init(). Projectors may be replayed from scratch (empty database), and init() is called before replay begins.

Keep handle() fast

Each event handler should be a single SQL statement or a small, bounded operation. Avoid complex computation or anything that could fail non-deterministically. If a handler fails, the projector stops and the error is logged. The runtime will retry (the event store subscription is persistent). Projectors don’t have access to HTTP or other side effects — they’re confined to their SQLite database — so this guidance is mostly about keeping per-event work tight.

Use prepared statements

For SQL that runs on every event, build a Statement once in init() and reuse it:

#![allow(unused)]
fn main() {
struct Widgets {
    insert: Statement,
    archive: Statement,
}

impl Projector for Widgets {
    type Query = WidgetEvents;

    fn init() -> anyhow::Result<Self> {
        execute_batch("CREATE TABLE IF NOT EXISTS widgets (...)")?;
        Ok(Widgets {
            insert: prepare("INSERT INTO widgets (id, name) VALUES (?1, ?2)"),
            archive: prepare("UPDATE widgets SET archived = TRUE WHERE id = ?1"),
        })
    }

    fn handle(&mut self, event: StoredEvent<WidgetEvents>) -> anyhow::Result<()> {
        match event.data {
            WidgetEvents::Created(ev) => { self.insert.execute(params![ev.id, ev.name])?; }
            WidgetEvents::Archived(ev) => { self.archive.execute(params![ev.id])?; }
        }
        Ok(())
    }
}
}

The statement is parsed once and reused across every event, avoiding the per-event compilation cost.

Replaying projectors

Projectors can be replayed at any time — the runtime will delete the projector’s SQLite database and reprocess all events from position 0. This is done via the API:

POST /projectors/{name}/replay

Or via the CLI:

umari projector replay plans

This is safe because projectors are naturally idempotent. Replaying is the standard way to fix schema changes or recover from corruption.

Scoping in projectors

Projectors have no fold bindings, so dynamic #[scope(field)] is meaningless. Only hardcoded scopes are useful:

#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
    WarrantyPlanCreated(WarrantyPlanCreated),  // All plans, all shops
    #[scope(topic = "orders/paid")]             // Only this topic
    WebhookReceived(WebhookReceived),
}
}

Without #[scope(...)], the projector receives every event of that type from the entire event log.

9. Effects

Effects react to events by performing side effects — HTTP requests, sending emails, or executing commands. They have their own SQLite database for internal state, but idempotency is anchored entirely in the event store.

The Effect trait

#![allow(unused)]
fn main() {
pub trait Effect: Sized {
    type Query: EventSet;

    fn init() -> anyhow::Result<Self>;
    fn partition_key(&self, event: StoredEvent<<Self::Query as EventSet>::Item>) -> Option<String> {
        None
    }
    fn handle(&mut self, event: StoredEvent<<Self::Query as EventSet>::Item>) -> anyhow::Result<()>;
}
}
  • Query — which events trigger this effect
  • init() — called once at startup; return the effect instance
  • partition_key() — controls parallel processing (return None for sequential)
  • handle() — called for each matching event

The idempotency contract

Effects must be idempotent. This is the most important rule in Umari. The runtime may redeliver events, and effects may be replayed from scratch. If an effect performs a side effect twice, that’s a bug.

The standard pattern is fold-check → side effect → record:

  1. Fold-check: Use FoldQuery (or a fold inside a private command) to check whether the work has already been done, based on events already in the event store
  2. Side effect: If not already done, perform the external action (HTTP call, email, etc.)
  3. Record: Execute a private command to record the outcome in the event store, so future runs will skip the side effect

Because idempotency is anchored in the event store (steps 1 and 3), deleting all SQLite databases and replaying events produces the same result without repeating side effects.

Scheduled events are usually unnecessary. A fold that checks for a completion event (step 3) is sufficient to determine whether the work was already done. Adding a separate “scheduled” event adds complexity without benefit in most cases.

Example: Registering webhooks

#![allow(unused)]
fn main() {
use umari::prelude::*;
use wasi_http_client::Client;

export_effect!(RegisterWebhooks);

#[derive(EventSet)]
enum Query {
    ShopConnected(ShopConnected),
    ShopReconnected(ShopReconnected),
}

struct RegisterWebhooks {
    client: Client,
    webhook_address: String,
}

impl Effect for RegisterWebhooks {
    type Query = Query;

    fn init() -> anyhow::Result<Self> {
        let webhook_address = env::var("WEBHOOK_ADDRESS")
            .expect("missing WEBHOOK_ADDRESS env var");
        Ok(Self {
            client: Client::new(),
            webhook_address,
        })
    }

    fn partition_key(&self, event: StoredEvent<Self::Query>) -> Option<String> {
        match event.data {
            Query::ShopConnected(ShopConnected { shop_id, .. })
            | Query::ShopReconnected(ShopReconnected { shop_id, .. }) => Some(shop_id.to_string()),
        }
    }

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        let (shop_id, shop_domain, access_token) = match event.data {
            Query::ShopConnected(ShopConnected { shop_id, shop_domain, access_token, .. })
            | Query::ShopReconnected(ShopReconnected { shop_id, shop_domain, access_token, .. }) => {
                (shop_id, shop_domain, access_token)
            }
        };

        let topics = ["orders/paid", "orders/cancelled"];

        // 1. FOLD-CHECK — use a fold to check which topics are already registered
        let topics_registered = FoldQuery::new()
            .fold_iter(topics.iter().map(|topic| AlreadyRegisteredFold {
                shop_id,
                topic: topic.to_string(),
                current_event_id: event.id,
            }))
            .run()?;

        for (topic, registered) in topics.into_iter().zip(topics_registered) {
            // Already done — skip
            if registered {
                continue;
            }

            // 2. SIDE EFFECT — make the HTTP call
            let result = self.register_webhook(shop_id, &shop_domain, &access_token, topic)?;
            match result {
                Ok(()) => {
                    // 3. RECORD — persist completion in the event store
                    record_webhook_registration_completed(
                        RecordWebhookRegistrationCompletedInput {
                            shop_id,
                            topic: topic.to_string(),
                        },
                        CommandContext::new(),
                    )?;
                }
                Err(err) => {
                    eprintln!("failed to register webhook for topic {topic}: {err}");
                    // Don't fail the effect — retry on next event
                    return Ok(());
                }
            }
        }

        Ok(())
    }
}
}

The fold (AlreadyRegisteredFold) checks whether a WebhookRegistrationCompleted event already exists for this shop and topic, scoped to the current triggering event. If it exists, the webhook was already registered in a previous run — skip. If not, perform the HTTP call and record completion.

partition_key and parallel processing

partition_key() enables parallel event processing. The runtime routes events to workers based on the key:

  • None → global worker (sequential for this effect)
  • Some(key) → hashed to one of 8 keyed workers, enabling parallelism across independent streams

In the webhook example, partition_key returns shop_id — events for different shops are processed in parallel, but events for the same shop are serialized (same worker). This prevents race conditions within a shop while maximizing throughput across shops.

When to use partition keys:

  • Use None when events must be processed strictly in order
  • Use Some(entity_id) when events for different entities are independent (different shops, different users)
  • Use Some(event_id) with care — this parallelizes everything but may cause out-of-order processing within an entity

HTTP requests

Effects can make HTTP requests via wasi-http-client:

#![allow(unused)]
fn main() {
use wasi_http_client::Client;

let client = Client::new();
let resp = client
    .post("https://api.example.com/endpoint")
    .header("Content-Type", "application/json")
    .header("Authorization", &format!("Bearer {token}"))
    .json(&json!({ "key": "value" }))
    .connect_timeout(Duration::from_secs(5))
    .send()?;

let status = resp.status();
let body = resp.body()?;
}

The WASI HTTP interface is provided by the runtime. Effects get full HTTP client capability — they can make any outgoing request.

Executing commands from effects

Effects execute private commands directly as function calls:

#![allow(unused)]
fn main() {
use crate::commands::record_webhook_registration_completed;

record_webhook_registration_completed(
    RecordWebhookRegistrationCompletedInput { shop_id, topic },
    CommandContext::new(),  // Inherits correlation context automatically
)?;
}

The CommandContext::new() inside an effect automatically:

  • Inherits correlation_id from the triggering event
  • Sets triggering_event_id to the event being processed
  • Generates a new causation_id for this specific command execution

This creates a proper causal chain: the original event → effect → command → new events.

FoldQuery in effects

Effects can use FoldQuery to check event store state without executing a full command:

#![allow(unused)]
fn main() {
let registered = FoldQuery::new()
    .fold(AlreadyRegisteredFold {
        shop_id,
        topic: "orders/paid".into(),
        current_event_id: event.id,
    })
    .run()?;
}

This opens a transaction, reads events, applies them to the fold, and returns the state. It’s the same mechanism commands use internally, and is the recommended way to check idempotency in effects — lightweight, no extra events needed.

In the webhook example, FoldQuery::fold_iter() runs multiple folds in one transaction — one per topic — checking each topic’s registration status in a single event store read.

Environment variables

Effects can access environment variables:

#![allow(unused)]
fn main() {
fn init() -> anyhow::Result<Self> {
    let webhook_address = env::var("WEBHOOK_ADDRESS")
        .expect("missing WEBHOOK_ADDRESS env var");
    Ok(Self { webhook_address, .. })
}
}

Env vars are set per-module and injected by the runtime as WASI environment variables. This separates configuration from code — you can deploy the same WASM module to staging and production with different env vars.

There are two ways to set them:

  • In the module’s Cargo.toml under [package.metadata.umari.env] — picked up automatically by umari deploy. This is the easiest path and keeps env config alongside the module:

    [package.metadata.umari.env]
    DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/..."
    WEBHOOK_ADDRESS = "https://example.com/hooks"
    
  • Via the lower-level CLI / API upload commands — when calling umari effects upload (or the equivalent HTTP API) directly, pass --env KEY=VALUE flags. Useful for ad-hoc uploads or one-off overrides.

Error handling

Effect handle() returns anyhow::Result<()>. When a handler returns an error (or otherwise panics / traps), the runtime treats the module as having failed and retries indefinitely with exponential backoff:

  • The first retry runs after 1 second.
  • If the module keeps failing on the same event store position, the delay doubles each attempt, capped at 10 minutes.
  • If a retry makes progress (the position advances) before failing again, the backoff resets to 1 second.
  • There is no maximum attempt count — retries continue forever until the module starts succeeding (or is deactivated/replaced).

The effect’s position watermark is never advanced past a failed event, so the same event is replayed on each restart. The module’s captured stderr and the system log lines from the supervisor will show "module failed, retrying with backoff" along with the current attempt and delay.

This means effects should be designed to fail loudly on transient errors and let the runtime back off and retry — that’s the recovery path. The flip side is that returning an error from handle() will pause forward progress for that effect until the underlying problem is fixed, so you should reserve Err for situations that genuinely warrant blocking.

For known-permanent failures, prefer catching them inside handle() and recording the failure as an event (via a private command) so the fold-check pattern can skip the work on future runs.

The export_effect! macro

#![allow(unused)]
fn main() {
export_effect!(RegisterWebhooks);
}

This macro generates the WASM component interface, wiring up init(), handle(), partition_key(), and query().

Crate structure for effects with private commands

When an effect needs private commands and custom events, define them in the same crate:

effects/register-webhooks/
├── Cargo.toml
└── src/
    ├── lib.rs          # Effect implementation + export_effect!
    ├── commands.rs     # Private commands (plain functions)
    └── events.rs       # Effect-private events

Private events and commands are scoped to the effect — they’re not part of the shared domain library. This keeps the shared library focused on domain events and prevents effect implementation details from leaking.

10. Project Structure

This chapter describes how to organize an Umari project workspace. The structure is opinionated but flexible — you can adapt it to your needs.

Workspace layout

my-project/
├── Cargo.toml                 # Workspace root + shared library crate
├── src/                       # Shared library: events, folds
│   ├── lib.rs
│   ├── events/
│   │   ├── mod.rs
│   │   ├── shop.rs            # Shop events
│   │   ├── product.rs         # Product events
│   │   └── order.rs           # Order events
│   ├── folds/
│   │   └── mod.rs             # Fold definitions
│   └── helpers.rs             # Utility functions
├── commands/
│   ├── create-product/        # crate: create-product
│   │   ├── Cargo.toml
│   │   └── src/lib.rs
│   ├── update-product/
│   └── archive-product/
├── projectors/
│   ├── products/              # crate: products
│   │   ├── Cargo.toml
│   │   └── src/lib.rs
│   ├── shops/
│   └── orders/
├── effects/
│   ├── notify-external-service/
│   │   ├── Cargo.toml
│   │   └── src/
│   │       ├── lib.rs         # Effect + export_effect!
│   │       ├── commands.rs    # Private commands
│   │       └── events.rs      # Effect-private events
│   └── sync-inventory/
└── .gitignore

Root Cargo.toml

The root is both the workspace definition and the shared library crate:

[package]
name = "my-project"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
umari.workspace = true
uuid.workspace = true
validator.workspace = true

[workspace]
resolver = "2"
members = [
    ".",
    "commands/create-product",
    "commands/update-product",
    "projectors/products",
    "projectors/shops",
    "effects/notify-external-service",
    # ... etc
]

[workspace.dependencies]
my-project = { path = "." }
umari = { path = "/path/to/umari/crates/umari" }
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.22", features = ["v4"] }
validator = { version = "0.20", features = ["derive"] }
wasi-http-client = { version = "0.2.1", features = ["json"] }

Key points:

  • The root package (.) is listed as a workspace member — it’s the shared library
  • my-project = { path = "." } lets command/projector/effect crates depend on my-project.workspace = true
  • umari path points to the SDK crate (not the runtime workspace)
  • Each module crate uses crate-type = ["cdylib", "rlib"]

Module crate Cargo.toml

Each module is a minimal crate:

# commands/create-product/Cargo.toml
[package]
name = "create-product"
version = "1.0.0"
edition = "2024"

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
my-project.workspace = true     # Shared library
umari.workspace = true           # SDK
anyhow.workspace = true
serde.workspace = true
validator.workspace = true

crate-type = ["cdylib", "rlib"] is required. cdylib produces the .wasm file; rlib enables Rust-level linking for tests.

For effects with HTTP:

[dependencies]
wasi-http-client.workspace = true

For projectors with additional dependencies:

[dependencies]
rust_decimal.workspace = true

The shared library

The shared library crate (root src/) contains:

Events (src/events/)

#![allow(unused)]
fn main() {
// src/events/mod.rs
pub mod shop;
pub mod product;
pub mod order;

pub use shop::*;
pub use product::*;
pub use order::*;
}

Each event module groups related events:

#![allow(unused)]
fn main() {
// src/events/shop.rs
use umari::prelude::*;

#[derive(Event, DomainIds, Serialize, Deserialize)]
#[event_type("shop.connected")]
pub struct ShopConnected {
    #[domain_id]
    #[crypto_scope]
    pub shop_id: u64,
    pub shop_domain: String,
    pub access_token: String,
}
}

Folds (src/folds/)

#![allow(unused)]
fn main() {
// src/folds/mod.rs
use umari::prelude::*;
use crate::events::*;

#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
    #[domain_id]
    pub shop_id: u64,
}

impl Fold for ShopExistsFold {
    type Events = SingleEvent<ShopConnected>;
    type State = bool;

    fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
        *exists = true;
    }
}
}

Library root (src/lib.rs)

#![allow(unused)]
fn main() {
// src/lib.rs
pub mod events;
pub mod folds;
pub mod helpers;
}

Naming conventions

ItemConventionExample
Event structPascalCase, past tenseShopConnected, ProductCreated
Event type stringobject.verb dot notation"shop.connected", "product.created"
Command cratekebab-case, imperativecreate-product, update-product
Command functionsnake_casepub fn execute(...)
Command input structAlways Inputpub struct Input
Projector cratekebab-case, plural nounproducts, shops
Projector structPascalCase, pluralProducts, Shops
Effect cratekebab-case, verb phrasenotify-external-service
Effect structPascalCaseNotifyExternalService
Fold structPascalCase noun + FoldShopExistsFold, ProductStateFold
Fold statePascalCase noun + StateProductState, WidgetState
EventSet enumAlways Queryenum Query { ... }

Dependencies between module types

shared library (events, folds)
    ↑                    ↑                    ↑
    |                    |                    |
 commands/          projectors/           effects/
 (import events,    (import events)       (import events,
  import folds)                             import folds,
                                            may have own
                                            events + commands)
  • Commands import events and folds from the shared library
  • Projectors import events from the shared library
  • Effects import events and folds from the shared library; may define their own events and commands locally

Working with modules

The umari CLI is the intended way to scaffold, build, and deploy modules — it understands the workspace layout above, picks up env vars from [package.metadata.umari.env], and handles the wasm32-wasip2 build for you.

umari new

Scaffold a new module crate, wire it into the workspace Cargo.toml, and drop in starter lib.rs content:

umari new command create-product
umari new projector products
umari new effect notify-external-service

The default language is Rust; pass --lang js to scaffold a JS module instead.

umari build

Build every module crate in the workspace (Rust → wasm32-wasip2, JS → componentized wasm) in release mode. Pass paths to scope the build to a subset of crates:

umari build                                # build everything
umari build commands/create-product        # build a single crate
umari build commands/ projectors/products  # multiple paths
umari build --debug                        # debug profile
umari build -j 4                           # cap parallelism

Outputs land in the usual target/wasm32-wasip2/{release,debug}/<name>.wasm paths.

umari deploy

Build everything (same flags as umari build) and upload + activate each module against the server configured for your client:

umari deploy                       # build + upload + activate all modules
umari deploy --no-activate         # upload but don't activate
umari deploy --bump-patch          # auto-bump patch version on conflict
umari deploy commands/create-product

Env vars declared under [package.metadata.umari.env] in each module’s Cargo.toml are sent along with the upload.

Manual cargo / lower-level CLI

If you want to bypass the workspace tooling — e.g. one-off uploads, custom build flags, or wiring this into your own scripts — you can still build with plain cargo and upload through the typed subcommands:

cargo build --target wasm32-wasip2 --release -p create-product

umari commands upload create-product 1.0.0 \
    target/wasm32-wasip2/release/create_product.wasm \
    --env API_KEY=... --activate

For production, always build in release mode — debug builds can be 10× larger and slower.

Adding a new module by hand

If you don’t want to use umari new:

  1. Create the crate directory with Cargo.toml and src/lib.rs
  2. Add it to the workspace members list in the root Cargo.toml
  3. Implement the trait (Command via #[export_command], Projector, or Effect)
  4. umari build (or cargo build --target wasm32-wasip2 --release -p <name>)
  5. umari deploy (or upload manually via the lower-level CLI / API)

11. SQLite API Reference

Projectors and effects each get their own isolated SQLite database file. Modules cannot read each other’s data. This chapter is the complete reference for the SQLite API available inside WASM modules.

Mental model

  • execute* / query* free functions run against the implicit connection — convenient for one-offs.
  • prepare() returns a Statement you store on your module struct, so the SQL is compiled once and reused per event.
  • Errors that return Result are constraint violations you can recover from. Everything else (wrong column name, wrong type, “expected one row but got two”) traps the module — the runtime treats traps as bugs, not business failures.

Every handle() call runs inside an implicit transaction. Don’t reach for BEGIN/COMMIT yourself.

Free functions

These all operate on the module’s connection.

execute(sql, params) -> Result<usize, SqliteError>

Run a single statement, returning rows affected.

#![allow(unused)]
fn main() {
execute(
    "INSERT INTO plans (plan_id, shop_id, title) VALUES (?1, ?2, ?3)",
    params![plan_id, shop_id.to_string(), title],
)?;
}

execute_batch(sql) -> Result<(), SqliteError>

Run multiple statements separated by semicolons. Use this in init() for DDL.

#![allow(unused)]
fn main() {
execute_batch(
    "
        CREATE TABLE IF NOT EXISTS widgets (
            widget_id TEXT PRIMARY KEY,
            name TEXT NOT NULL
        );
        CREATE INDEX IF NOT EXISTS idx_widgets_name ON widgets (name);
    ",
)?;
}

query_one(sql, params) -> Row

Return exactly one row. Traps if zero or multiple rows match.

#![allow(unused)]
fn main() {
let row = query_one(
    "SELECT name, price FROM plans WHERE plan_id = ?1",
    params![plan_id],
);
let name: String = row.get("name");
let price: String = row.get(1);  // by column index
}

query_row(sql, params) -> Option<Row>

Return at most one row. Extra rows are silently dropped — the first match wins.

#![allow(unused)]
fn main() {
if let Some(row) = query_row(
    "SELECT access_token FROM shops WHERE shop_id = ?1",
    params![id],
) {
    let token: String = row.get(0);
}
}

last_insert_rowid() -> Option<i64>

Returns the rowid of the most recent successful INSERT on this connection, or None if no insert has happened yet.

Prepared statements

For queries that run on every event, prepare once in init() and reuse:

prepare(sql) -> Statement

Returns a Statement directly — there’s no Result. A malformed SQL string traps the module.

#![allow(unused)]
fn main() {
struct MyProjector {
    insert_widget: Statement,
    archive_widget: Statement,
}

impl Projector for MyProjector {
    type Query = WidgetEvents;

    fn init() -> anyhow::Result<Self> {
        execute_batch("CREATE TABLE IF NOT EXISTS widgets (...)")?;
        Ok(MyProjector {
            insert_widget: prepare("INSERT INTO widgets (id, name) VALUES (?1, ?2)"),
            archive_widget: prepare("UPDATE widgets SET archived = TRUE WHERE id = ?1"),
        })
    }

    fn handle(&mut self, event: StoredEvent<WidgetEvents>) -> anyhow::Result<()> {
        // ...
        Ok(())
    }
}
}

Statement methods

MethodReturnsTraps on
execute(params)Result<usize, SqliteError>
query(params)Vec<Row>
query_one(params)Rowzero rows, or more than one row
query_row(params)Option<Row>
#![allow(unused)]
fn main() {
self.insert_widget.execute(params![id.to_string(), name])?;

let rows = self.list_widgets.query(params![shop_id]);
for row in rows {
    let name: String = row.get("name");
}
}

Parameters

Pass parameters using the params! macro:

#![allow(unused)]
fn main() {
params![]                          // no params
params![value1, value2, value3]    // positional params for ?1, ?2, ?3
}

Each value is converted through Into<SqliteValue>. To pass a single value, write params![id] — there’s no trailing-comma syntax.

Supported parameter types

Rust typeSQLite type
boolInteger (0 or 1)
i8, i16, i32, i64, isizeInteger
u8, u16, u32Integer
f32, f64Real
String, &strText
Vec<u8>Blob
UuidText (canonical hyphenated form)
Option<T>Null when None, otherwise T

Reading rows

Row::get<I, T>(column) -> T

Get a column value by name (&str) or zero-based position (usize). Traps on type mismatch or unknown column.

#![allow(unused)]
fn main() {
let name: String = row.get("name");
let count: i64 = row.get(0);
let maybe: Option<String> = row.get("nullable_col");
}

Row::tuple<T>() -> T

Unpack the first N columns into a tuple by position (up to 8 columns).

#![allow(unused)]
fn main() {
let (id, name, price): (String, String, String) = row.tuple();
}

Supported column types

Rust typeSQLite value
boolInteger (0 = false, 1 = true; other values trap)
StringText
i64Integer
f64Real
Vec<u8>Blob
Option<T>Null → None, otherwise Some(T)

Errors

SqliteError only covers constraint violations — those are the only failures the API surfaces as Result:

#![allow(unused)]
fn main() {
pub enum SqliteError {
    ConstraintViolation(ConstraintViolation),
}

pub struct ConstraintViolation {
    pub kind: ConstraintViolationKind, // Unique, PrimaryKey, NotNull, ForeignKey, Check, Other
    pub message: String,
}
}

Use it when you want a UNIQUE collision to mean “this event was already projected, skip”:

#![allow(unused)]
fn main() {
match execute("INSERT INTO widgets (id, name) VALUES (?1, ?2)", params![id, name]) {
    Ok(_) => {}
    Err(SqliteError::ConstraintViolation(v)) if v.kind == ConstraintViolationKind::Unique => {
        // already projected — fine
    }
    Err(err) => return Err(err.into()),
}
}

Transactions

The runtime wraps every handle() call in a transaction: it begins before the call and commits if handle() returns Ok. Returning Err rolls back. You don’t manage transactions manually.

Best practices

  • Use IF NOT EXISTS in DDL so init() is idempotent across module restarts.
  • Store UUIDs as TEXT (SQLite has no UUID type) — Uuid already converts to canonical text.
  • Store decimals as TEXT to avoid floating-point precision issues.
  • Always use params! — never interpolate into the SQL string.
  • Prepare statements that run per-event in init(); use the free functions for one-offs.

12. Fold Reference

umari::prelude ships a handful of generic folds for the most common state-derivation shapes — “did this happen?”, “what’s the latest value?”, “how many times?”, “which of these two won?”. Reach for these first; only write a custom fold when none of them fit.

FoldStatePick when you ask…
EventFold<E>EventState<E>“Give me every occurrence of E
LatestEvent<E>Option<StoredEvent<E>>“What’s the most recent E?”
EventCounter<E>u64“How many E have happened?”
EventToggle<A, B>ToggleState<A, B>“Was the last event A or B?”
SingleEvent<E>— (an EventSet)Custom fold that only reads one event type

EventFold

Collects every occurrence of event type E into a Vec.

State: EventState<E>

#![allow(unused)]
fn main() {
.fold::<EventFold<ShopConnected>>()
.execute(|input, connected| {
    if connected.exists() {
        let first = &connected.events[0];
        // ...
    }
    Ok(emit![])
})
}
#![allow(unused)]
fn main() {
impl<E: Event> EventState<E> {
    pub events: Vec<StoredEvent<E>>;
    pub fn exists(&self) -> bool;
}
}

Use when: you need to inspect or aggregate over the full history. Avoid when: you only need the most recent value — LatestEvent is cheaper.

LatestEvent

Keeps only the most recent E. Each new event replaces the previous.

State: Option<StoredEvent<E>>

#![allow(unused)]
fn main() {
.fold::<LatestEvent<WarrantyPlanUpdated>>()
.execute(|input, latest| {
    if let Some(event) = latest {
        // event.data is the most recent WarrantyPlanUpdated
    }
    Ok(emit![])
})
}

Use when: you care about the current value — last Updated, last Connected, etc. Avoid when: you need the full history.

EventCounter

Counts occurrences of E without storing event data.

State: u64

#![allow(unused)]
fn main() {
.fold::<EventCounter<WarrantySold>>()
.execute(|input, sale_count| {
    anyhow::ensure!(
        sale_count < MAX_WARRANTIES,
        "shop has reached the maximum number of warranties"
    );
    Ok(emit![/* ... */])
})
}

Use when: you only need a count. Avoid when: you need to inspect individual events.

EventToggle

Tracks which of two opposing events occurred last. Designed for created/deleted, activated/deactivated, archived/unarchived pairs.

State: ToggleState<A, B>

#![allow(unused)]
fn main() {
.fold::<EventToggle<WarrantyPlanArchived, WarrantyPlanUnarchived>>()
.execute(|input, toggle| {
    if toggle.is_a() {
        // currently archived
    } else if toggle.is_b() {
        // currently unarchived
    } else {
        // neither has happened
    }
    Ok(emit![])
})
}
#![allow(unused)]
fn main() {
impl<A: Event, B: Event> ToggleState<A, B> {
    pub last: Option<ToggleSide<A, B>>;
    pub fn is_a(&self) -> bool;
    pub fn is_b(&self) -> bool;
    pub fn as_a(&self) -> Option<&StoredEvent<A>>;
    pub fn as_b(&self) -> Option<&StoredEvent<B>>;
}
}

Use when: paired opposing events — archived/unarchived, activated/deactivated, locked/unlocked. Constraint: A and B must share the same domain ID fields.

SingleEvent

Not a fold — an EventSet shorthand for custom folds that only read a single event type. Use it as type Events = SingleEvent<MyEvent>;.

#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct ShopExistsFold {
    #[domain_id]
    pub shop_id: u64,
}

impl Fold for ShopExistsFold {
    type Events = SingleEvent<ShopConnected>;
    type State = bool;

    fn apply(&self, exists: &mut bool, _event: StoredEvent<ShopConnected>) {
        *exists = true;
    }
}
}

Custom folds

When none of the built-ins fit, implement Fold yourself:

#![allow(unused)]
fn main() {
#[derive(DomainIds, FromDomainIds)]
pub struct MyFold {
    #[domain_id] pub shop_id: u64,
    #[from_domain_id(default)]
    pub custom_field: String,
}

#[derive(EventSet)]
pub enum MyFoldEvents {
    #[scope(shop_id)]
    EventA(EventA),
    EventB(EventB),
}

#[derive(Default)]
pub struct MyState {
    pub a_count: u64,
    pub b_count: u64,
    pub latest_value: Option<String>,
}

impl Fold for MyFold {
    type Events = MyFoldEvents;
    type State = MyState;

    fn apply(&self, state: &mut MyState, event: StoredEvent<MyFoldEvents>) {
        match event.data {
            MyFoldEvents::EventA(_) => state.a_count += 1,
            MyFoldEvents::EventB(ev) => {
                state.b_count += 1;
                state.latest_value = Some(ev.some_field);
            }
        }
    }
}
}

#[from_domain_id(default)]

Fields not annotated with #[domain_id] can use #[from_domain_id(default)] to get their default value during fold construction. The fold won’t try to bind these from domain IDs.

Fold composition limit

Commands support up to 12 folds in a single tuple. If you need more, compose multiple folds into a single fold by nesting state types, or split the command into multiple commands.

Appendix A: Quick Reference

Derive macros

MacroApplies toPurpose
#[derive(Event)]StructMakes a struct a persisted event
#[derive(EventSet)]EnumCreates a typed event set for queries
#[derive(DomainIds)]StructGenerates domain_ids() method
#[derive(FromDomainIds)]StructGenerates constructor from domain ID bindings

Attribute macros

AttributePlacementPurpose
#[event_type("...")]Event structSets the event type string
#[domain_id]FieldMarks a field as a domain ID tag
#[domain_id("alt_name")]FieldDomain ID with alternate tag name
#[crypto_scope]Field on EventEncrypts the event (must be on a #[domain_id] field)
#[scope(field)]EventSet variantFilter by a single domain ID field
#[scope(field = "value")]EventSet variantHardcoded tag filter
#[from_domain_id(default)]Fold fieldUse default value, don’t bind from domain IDs
#[validate(...)]Input fieldValidation rules (validator crate)

Export macros

MacroUsage
#[export_command]Annotate the command function
export_projector!(Name);Wire up projector WASM interface
export_effect!(Name);Wire up effect WASM interface

Command builder API

#![allow(unused)]
fn main() {
Command::new(input, context)           // Create builder
    .fold::<T>()                        // Register fold (no args)
    .fold_args::<T>(args)               // Register fold with args
    .fold_with(|input| MyFold { .. })   // Register fold manually
    .execute(|input, states| { .. })    // Run with fold states
}

Emit and reject

#![allow(unused)]
fn main() {
emit![]                                // No events
emit![Event { field: val }]            // Single event
emit![EventA { .. }, EventB { .. }]    // Multiple events

// Business rejections — use anyhow::ensure! / bail!:
anyhow::ensure!(balance >= amount, "insufficient funds");
anyhow::bail!("shop not connected");
}

SQLite API

#![allow(unused)]
fn main() {
// Connection-level
execute(sql, params)       -> Result<usize, SqliteError>
execute_batch(sql)         -> Result<(), SqliteError>
query_one(sql, params)     -> Row              // traps on 0 or >1 rows
query_row(sql, params)     -> Option<Row>
last_insert_rowid()        -> Option<i64>

// Prepared statements (built with prepare(sql) -> Statement)
stmt.execute(params)       -> Result<usize, SqliteError>
stmt.query(params)         -> Vec<Row>
stmt.query_one(params)     -> Row              // traps on 0 or >1 rows
stmt.query_row(params)     -> Option<Row>

// Parameters
params![]                        // no params
params![val1, val2, val3]        // positional

// Reading
row.get::<&str, String>("column_name")
row.get::<usize, i64>(0)
row.tuple::<(String, String, i64)>()
}

Built-in fold types

TypeStateUse for
EventFold<E>EventState<E> (vec of all events)Full history
LatestEvent<E>Option<StoredEvent<E>>Most recent event
EventCounter<E>u64Counting events
EventToggle<A, B>ToggleState<A, B>Paired opposing events
SingleEvent<E>N/A (it’s an EventSet)Single event type queries

Event envelope fields

FieldTypeDescription
idUuidEvent unique ID
positionu64Global log position
event_typeStringEvent type identifier
tagsVec<String>Domain ID tags
timestampDateTime<Utc>When written
correlation_idUuidOriginating action
causation_idUuidCommand execution
triggering_event_idOption<Uuid>Causal event
idempotency_keyOption<Uuid>Deduplication
encryption_scopeOption<String>Encryption scope
encryption_key_idOption<Uuid>Key identifier

CommandContext

#![allow(unused)]
fn main() {
CommandContext::new()                           // Auto-detect (effect or external)
    .with_correlation_id(id)                    // Set correlation ID
    .with_triggering_event_id(id)               // Set triggering event
    .with_idempotency_key(key)                  // Set idempotency key
}

Environment variables

Server (umari binary)

VariableDefaultDescription
UMARI_DATA_DIR./umari-dataruntime database directory
UMARI_EVENT_STORE_URLhttp://localhost:50051UmaDB event store URL
UMARI_API_ADDR127.0.0.1:3000HTTP API bind address
UMARI_API_KEY(none)required Authorization: Bearer <key>
UMARI_LOGumari=infotracing-subscriber filter
UMARI_VERBOSEfalseset log level to trace
UMARI_NO_BANNERfalsehide the startup banner
UMARI_SHUTDOWN_TIMEOUT10sgraceful shutdown deadline

CLI (umari-cli / umari client)

VariableDefaultDescription
UMARI_URLhttp://localhost:3000server URL
UMARI_API_KEY(none)bearer token sent with each request

Essential imports

#![allow(unused)]
fn main() {
use umari::prelude::*;           // Everything you need
use serde::{Serialize, Deserialize};
use validator::Validate;
use schemars::JsonSchema;        // Optional, for OpenAPI docs
}

Naming conventions

ItemConvention
Event structPascalCase past tense
Event type string"object.verb"
Command cratekebab-case imperative
Command input structInput
Projector cratekebab-case plural noun
Effect cratekebab-case verb phrase
Fold structPascalCase + Fold
Fold statePascalCase + State
EventSet enumQuery

Appendix B: Migration from Traditional Event Sourcing

If you’re coming from a traditional event sourcing background (EventStoreDB, Marten, Axon, etc.), this chapter maps familiar concepts to Umari equivalents.

Key differences

ConceptTraditional ESUmari
Event streamsPer-aggregate streamsSingle global log with domain ID tags
ConcurrencyStream position checkDCB — dynamic boundaries by event overlap
AggregatesAggregate root with stateNo aggregates; folds derive state on-demand
Read modelsExternal projectionsProjectors (WASM modules with SQLite)
Side effectsProcess managers / sagasEffects (WASM modules with HTTP access)
Business logicIn-processWASM components, hot-reloadable
State derivationAggregate replayFolds (event → state reducer)
IdempotencyIdempotency keys on commandsBuilt-in via idempotency_key + domain checks

Aggregate → Fold + Command

Traditional:

public class Widget : Aggregate
{
    public WidgetId Id { get; private set; }
    public string Name { get; private set; }
    public bool Archived { get; private set; }

    public void Create(CreateWidget command)
    {
        if (Version > 0) throw new Exception("Already exists");
        Apply(new WidgetCreated(command.WidgetId, command.Name));
    }

    public void When(WidgetCreated e)
    {
        Id = e.WidgetId;
        Name = e.Name;
    }
}

Umari:

#![allow(unused)]
fn main() {
// State — just data
#[derive(Default)]
pub struct WidgetState {
    pub exists: bool,
    pub name: Option<String>,
    pub archived: bool,
}

// Fold — binds to domain IDs, replays events
#[derive(DomainIds, FromDomainIds)]
pub struct WidgetFold {
    #[domain_id]
    pub widget_id: Uuid,
}

#[derive(EventSet)]
pub enum WidgetEvents {
    #[scope(widget_id)]
    WidgetCreated(WidgetCreated),
    #[scope(widget_id)]
    WidgetArchived(WidgetArchived),
}

impl Fold for WidgetFold {
    type Events = WidgetEvents;
    type State = WidgetState;

    fn apply(&self, state: &mut WidgetState, event: StoredEvent<WidgetEvents>) {
        match event.data {
            WidgetEvents::WidgetCreated(ev) => {
                state.exists = true;
                state.name = Some(ev.name);
            }
            WidgetEvents::WidgetArchived(_) => state.archived = true,
        }
    }
}

// Command — validates, checks invariants, emits events
#[export_command]
pub fn create_widget(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
    Command::new(input, context)
        .fold::<WidgetFold>()
        .execute(|input, widget| {
            anyhow::ensure!(!widget.exists, "widget already exists");
            Ok(emit![WidgetCreated {
                widget_id: input.widget_id,
                name: input.name,
            }])
        })
}
}

Key differences:

  • The fold struct is separate from the state — the fold holds domain ID bindings, the state holds the derived data
  • apply() takes &self (the fold bindings) and a mutable state reference
  • The command function is stateless — no aggregate instance, just pure logic
  • Consistency is per-domain-ID, not per-aggregate-stream

Projection → Projector

Traditional:

public class WidgetProjection : Projection<WidgetReadModel>
{
    public WidgetProjection()
    {
        Project<WidgetCreated>(e => Insert(new WidgetReadModel { Id = e.WidgetId, Name = e.Name }));
        Project<WidgetArchived>(e => Update(e.WidgetId, w => w.Archived = true));
    }
}

Umari:

#![allow(unused)]
fn main() {
impl Projector for Widgets {
    type Query = WidgetQuery;

    fn init() -> anyhow::Result<Self> {
        execute_batch("CREATE TABLE IF NOT EXISTS widgets (
            widget_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            archived BOOLEAN NOT NULL DEFAULT FALSE
        )")?;
        Ok(Widgets {})
    }

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        match event.data {
            WidgetQuery::WidgetCreated(ev) => {
                execute("INSERT INTO widgets (widget_id, name) VALUES (?1, ?2)",
                    params![ev.widget_id, ev.name])?;
            }
            WidgetQuery::WidgetArchived(ev) => {
                execute("UPDATE widgets SET archived = TRUE WHERE widget_id = ?1",
                    params![ev.widget_id])?;
            }
        }
        Ok(())
    }
}
}

Key differences:

  • Projectors are WASM modules, not in-process projections
  • Each projector gets its own SQLite database
  • init() is called once; handle() is called per event
  • Projectors are naturally idempotent (replay-safe)

Process Manager / Saga → Effect

Traditional:

public class OrderSaga : Saga<OrderSagaState>,
    IAmStartedBy<OrderPlaced>,
    IHandle<PaymentReceived>
{
    public async Task Handle(OrderPlaced e)
    {
        Data.OrderId = e.OrderId;
        await Bus.Send(new ProcessPayment(e.OrderId, e.Amount));
        MarkAsComplete();
    }
}

Umari:

#![allow(unused)]
fn main() {
impl Effect for OrderProcessor {
    type Query = OrderEvents;

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        match event.data {
            OrderEvents::OrderPlaced(ev) => {
                // 1. Fold-check via a private command — if the "scheduled"
                //    event was already emitted, the command short-circuits
                //    and the receipt is empty.
                let receipt = schedule_payment_processing(
                    SchedulePaymentProcessingInput { order_id: ev.order_id },
                    CommandContext::new(),
                )?;
                if !receipt.has_event::<PaymentProcessingScheduled>() {
                    return Ok(()); // already processed on a previous run
                }

                // 2. Side effect — call the payment gateway.
                let response = self.http_client
                    .post("https://payment.example.com/process")
                    .json(&json!({ "order_id": ev.order_id, "amount": ev.amount }))
                    .send()?;

                // 3. Record outcome via another private command.
                record_payment_result(
                    RecordPaymentResultInput {
                        order_id: ev.order_id,
                        success: response.status().is_success(),
                    },
                    CommandContext::new(),
                )?;
            }
        }
        Ok(())
    }
}
}

Key differences:

  • Effects use the fold-check → side effect → record pattern for idempotency.
  • Effects have their own SQLite for internal state, but it’s not the idempotency source — the event store is.
  • Effects call commands directly as Rust functions; no message bus.
  • HTTP is provided via WASI; no host-side bridging code.

EventStoreDB streams → UmaDB DCB

Traditional:

Stream "widget-abc" → [WidgetCreated, WidgetRenamed, WidgetArchived]
Stream "widget-def" → [WidgetCreated]

Umari:

Global log:
  pos 1: WidgetCreated { widget_id: "abc", ... }           tags: [widget_id:abc]
  pos 2: WidgetCreated { widget_id: "def", ... }           tags: [widget_id:def]
  pos 3: WidgetRenamed { widget_id: "abc", name: "new" }  tags: [widget_id:abc]
  pos 4: WidgetArchived { widget_id: "abc" }               tags: [widget_id:abc]

When command queries widget_id=abc: gets events at positions 1, 3, 4. No pre-partitioning needed.

Common migration patterns

1. Start with events

Port your event definitions first:

#![allow(unused)]
fn main() {
// Old: AccountCreated { AccountId, OwnerName }
#[derive(Event, Serialize, Deserialize)]
#[event_type("account.created")]
pub struct AccountCreated {
    #[domain_id]
    pub account_id: String,
    pub owner_name: String,
}
}

Add domain IDs to all events. Choose which fields identify the entity.

2. Port aggregates to folds

For each aggregate, create:

  • A state struct with #[derive(Default)]
  • A fold struct with #[derive(DomainIds, FromDomainIds)]
  • An EventSet enum
  • Implement Fold

The apply() method replaces your aggregate’s When() handlers.

3. Port command handlers

Replace aggregate method calls with the Command::new().fold().execute() pattern. Invariant checks go in the execute closure. Event emission uses emit![].

4. Port projections to projectors

Port your projection SQL to init() and handle(). Each projector is a separate crate with its own SQLite database. Use execute_batch() for DDL, execute()/params![] for DML.

5. Port sagas to effects

Replace message bus interactions with direct command execution. Replace saga state stored in a database with SQLite (internal state) and the event store (idempotency anchor).