Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Appendix B: Migration from Traditional Event Sourcing

If you’re coming from a traditional event sourcing background (EventStoreDB, Marten, Axon, etc.), this chapter maps familiar concepts to Umari equivalents.

Key differences

ConceptTraditional ESUmari
Event streamsPer-aggregate streamsSingle global log with domain ID tags
ConcurrencyStream position checkDCB — dynamic boundaries by event overlap
AggregatesAggregate root with stateNo aggregates; folds derive state on-demand
Read modelsExternal projectionsProjectors (WASM modules with SQLite)
Side effectsProcess managers / sagasEffects (WASM modules with HTTP access)
Business logicIn-processWASM components, hot-reloadable
State derivationAggregate replayFolds (event → state reducer)
IdempotencyIdempotency keys on commandsBuilt-in via idempotency_key + domain checks

Aggregate → Fold + Command

Traditional:

public class Widget : Aggregate
{
    public WidgetId Id { get; private set; }
    public string Name { get; private set; }
    public bool Archived { get; private set; }

    public void Create(CreateWidget command)
    {
        if (Version > 0) throw new Exception("Already exists");
        Apply(new WidgetCreated(command.WidgetId, command.Name));
    }

    public void When(WidgetCreated e)
    {
        Id = e.WidgetId;
        Name = e.Name;
    }
}

Umari:

#![allow(unused)]
fn main() {
// State — just data
#[derive(Default)]
pub struct WidgetState {
    pub exists: bool,
    pub name: Option<String>,
    pub archived: bool,
}

// Fold — binds to domain IDs, replays events
#[derive(DomainIds, FromDomainIds)]
pub struct WidgetFold {
    #[domain_id]
    pub widget_id: Uuid,
}

#[derive(EventSet)]
pub enum WidgetEvents {
    #[scope(widget_id)]
    WidgetCreated(WidgetCreated),
    #[scope(widget_id)]
    WidgetArchived(WidgetArchived),
}

impl Fold for WidgetFold {
    type Events = WidgetEvents;
    type State = WidgetState;

    fn apply(&self, state: &mut WidgetState, event: StoredEvent<WidgetEvents>) {
        match event.data {
            WidgetEvents::WidgetCreated(ev) => {
                state.exists = true;
                state.name = Some(ev.name);
            }
            WidgetEvents::WidgetArchived(_) => state.archived = true,
        }
    }
}

// Command — validates, checks invariants, emits events
#[export_command]
pub fn create_widget(input: Input, context: CommandContext) -> anyhow::Result<ExecuteOutput> {
    Command::new(input, context)
        .fold::<WidgetFold>()
        .execute(|input, widget| {
            anyhow::ensure!(!widget.exists, "widget already exists");
            Ok(emit![WidgetCreated {
                widget_id: input.widget_id,
                name: input.name,
            }])
        })
}
}

Key differences:

  • The fold struct is separate from the state — the fold holds domain ID bindings, the state holds the derived data
  • apply() takes &self (the fold bindings) and a mutable state reference
  • The command function is stateless — no aggregate instance, just pure logic
  • Consistency is per-domain-ID, not per-aggregate-stream

Projection → Projector

Traditional:

public class WidgetProjection : Projection<WidgetReadModel>
{
    public WidgetProjection()
    {
        Project<WidgetCreated>(e => Insert(new WidgetReadModel { Id = e.WidgetId, Name = e.Name }));
        Project<WidgetArchived>(e => Update(e.WidgetId, w => w.Archived = true));
    }
}

Umari:

#![allow(unused)]
fn main() {
impl Projector for Widgets {
    type Query = WidgetQuery;

    fn init() -> anyhow::Result<Self> {
        execute_batch("CREATE TABLE IF NOT EXISTS widgets (
            widget_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            archived BOOLEAN NOT NULL DEFAULT FALSE
        )")?;
        Ok(Widgets {})
    }

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        match event.data {
            WidgetQuery::WidgetCreated(ev) => {
                execute("INSERT INTO widgets (widget_id, name) VALUES (?1, ?2)",
                    params![ev.widget_id, ev.name])?;
            }
            WidgetQuery::WidgetArchived(ev) => {
                execute("UPDATE widgets SET archived = TRUE WHERE widget_id = ?1",
                    params![ev.widget_id])?;
            }
        }
        Ok(())
    }
}
}

Key differences:

  • Projectors are WASM modules, not in-process projections
  • Each projector gets its own SQLite database
  • init() is called once; handle() is called per event
  • Projectors are naturally idempotent (replay-safe)

Process Manager / Saga → Effect

Traditional:

public class OrderSaga : Saga<OrderSagaState>,
    IAmStartedBy<OrderPlaced>,
    IHandle<PaymentReceived>
{
    public async Task Handle(OrderPlaced e)
    {
        Data.OrderId = e.OrderId;
        await Bus.Send(new ProcessPayment(e.OrderId, e.Amount));
        MarkAsComplete();
    }
}

Umari:

#![allow(unused)]
fn main() {
impl Effect for OrderProcessor {
    type Query = OrderEvents;

    fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
        match event.data {
            OrderEvents::OrderPlaced(ev) => {
                // 1. Fold-check via a private command — if the "scheduled"
                //    event was already emitted, the command short-circuits
                //    and the receipt is empty.
                let receipt = schedule_payment_processing(
                    SchedulePaymentProcessingInput { order_id: ev.order_id },
                    CommandContext::new(),
                )?;
                if !receipt.has_event::<PaymentProcessingScheduled>() {
                    return Ok(()); // already processed on a previous run
                }

                // 2. Side effect — call the payment gateway.
                let response = self.http_client
                    .post("https://payment.example.com/process")
                    .json(&json!({ "order_id": ev.order_id, "amount": ev.amount }))
                    .send()?;

                // 3. Record outcome via another private command.
                record_payment_result(
                    RecordPaymentResultInput {
                        order_id: ev.order_id,
                        success: response.status().is_success(),
                    },
                    CommandContext::new(),
                )?;
            }
        }
        Ok(())
    }
}
}

Key differences:

  • Effects use the fold-check → side effect → record pattern for idempotency.
  • Effects have their own SQLite for internal state, but it’s not the idempotency source — the event store is.
  • Effects call commands directly as Rust functions; no message bus.
  • HTTP is provided via WASI; no host-side bridging code.

EventStoreDB streams → UmaDB DCB

Traditional:

Stream "widget-abc" → [WidgetCreated, WidgetRenamed, WidgetArchived]
Stream "widget-def" → [WidgetCreated]

Umari:

Global log:
  pos 1: WidgetCreated { widget_id: "abc", ... }           tags: [widget_id:abc]
  pos 2: WidgetCreated { widget_id: "def", ... }           tags: [widget_id:def]
  pos 3: WidgetRenamed { widget_id: "abc", name: "new" }  tags: [widget_id:abc]
  pos 4: WidgetArchived { widget_id: "abc" }               tags: [widget_id:abc]

When command queries widget_id=abc: gets events at positions 1, 3, 4. No pre-partitioning needed.

Common migration patterns

1. Start with events

Port your event definitions first:

#![allow(unused)]
fn main() {
// Old: AccountCreated { AccountId, OwnerName }
#[derive(Event, Serialize, Deserialize)]
#[event_type("account.created")]
pub struct AccountCreated {
    #[domain_id]
    pub account_id: String,
    pub owner_name: String,
}
}

Add domain IDs to all events. Choose which fields identify the entity.

2. Port aggregates to folds

For each aggregate, create:

  • A state struct with #[derive(Default)]
  • A fold struct with #[derive(DomainIds, FromDomainIds)]
  • An EventSet enum
  • Implement Fold

The apply() method replaces your aggregate’s When() handlers.

3. Port command handlers

Replace aggregate method calls with the Command::new().fold().execute() pattern. Invariant checks go in the execute closure. Event emission uses emit![].

4. Port projections to projectors

Port your projection SQL to init() and handle(). Each projector is a separate crate with its own SQLite database. Use execute_batch() for DDL, execute()/params![] for DML.

5. Port sagas to effects

Replace message bus interactions with direct command execution. Replace saga state stored in a database with SQLite (internal state) and the event store (idempotency anchor).