8. Projectors
Projectors build read models by consuming events from the event store and updating SQLite databases. Their databases are designed to be queried by external processes — your HTTP API, dashboard, or reporting tools.
How projectors work
A projector:
- Calls
init()once at startup to create tables and prepare statements - Receives events from the event store in position order
- Calls
handle()for each event to update its SQLite database - Maintains a
last_positionwatermark for crash recovery
Projectors are naturally idempotent — deleting the SQLite database and replaying all events from the beginning produces the exact same result. There is no need for explicit idempotency logic.
The Projector trait
#![allow(unused)]
fn main() {
pub trait Projector: Sized {
type Query: EventSet;
fn init() -> anyhow::Result<Self>;
fn handle(&mut self, event: StoredEvent<<Self::Query as EventSet>::Item>)
-> anyhow::Result<()>;
}
}
A complete projector
#![allow(unused)]
fn main() {
use umari::prelude::*;
use rust_decimal::Decimal;
use std::str::FromStr;
export_projector!(Plans);
#[derive(EventSet)]
enum Query {
ShopConnected(ShopConnected),
ShopReconnected(ShopReconnected),
WarrantyPlanCreated(WarrantyPlanCreated),
WarrantyPlanUpdated(WarrantyPlanUpdated),
WarrantyPlanArchived(WarrantyPlanArchived),
WarrantyPlanUnarchived(WarrantyPlanUnarchived),
WarrantyPlanActivated(WarrantyPlanActivated),
WarrantyPlanDeactivated(WarrantyPlanDeactivated),
WarrantyPlanDeleted(WarrantyPlanDeleted),
WarrantyPlanVariantSynced(WarrantyPlanVariantSynced),
WarrantySold(WarrantySold),
}
struct Plans {}
impl Projector for Plans {
type Query = Query;
fn init() -> anyhow::Result<Self> {
execute_batch(
"
CREATE TABLE IF NOT EXISTS shops (
shop_id TEXT PRIMARY KEY,
access_token TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS plans (
plan_id TEXT PRIMARY KEY,
shop_id TEXT NOT NULL,
title TEXT,
duration_months INTEGER,
price TEXT NOT NULL,
applicable_to TEXT NOT NULL,
archived BOOLEAN NOT NULL DEFAULT FALSE,
total_sold INTEGER NOT NULL DEFAULT 0,
revenue TEXT NOT NULL DEFAULT '0.00',
status TEXT NOT NULL DEFAULT 'draft',
shopify_variant_id TEXT,
created_at TEXT NOT NULL
);
",
)?;
Ok(Plans {})
}
fn handle(&mut self, event: StoredEvent<Self::Query>) -> anyhow::Result<()> {
match event.data {
Query::ShopConnected(ev) => {
execute(
"INSERT INTO shops (shop_id, access_token) VALUES (?1, ?2)",
params![ev.shop_id.to_string(), ev.access_token],
)?;
}
Query::ShopReconnected(ev) => {
execute(
"UPDATE shops SET access_token = ?2 WHERE shop_id = ?1",
params![ev.shop_id.to_string(), ev.access_token],
)?;
}
Query::WarrantyPlanCreated(WarrantyPlanCreated {
plan_id, shop_id, title, duration_months, price,
applicable_to, status,
}) => {
execute(
"INSERT INTO plans (plan_id, shop_id, title, duration_months, price, applicable_to, status, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![plan_id, shop_id.to_string(), title, duration_months,
price.to_string(), serde_json::to_string(&applicable_to)?,
match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
event.timestamp.to_rfc3339(),
],
)?;
}
Query::WarrantyPlanUpdated(WarrantyPlanUpdated {
plan_id, title, duration_months, price, applicable_to, status, ..
}) => {
execute(
"UPDATE plans SET title = ?2, duration_months = ?3, price = ?4, applicable_to = ?5, status = ?6 WHERE plan_id = ?1",
params![plan_id, title, duration_months, price.to_string(),
serde_json::to_string(&applicable_to)?,
match status { PlanStatus::Active => "active", PlanStatus::Draft => "draft" },
],
)?;
}
Query::WarrantyPlanArchived(WarrantyPlanArchived { plan_id, .. }) => {
execute("UPDATE plans SET archived = TRUE where plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanUnarchived(WarrantyPlanUnarchived { plan_id, .. }) => {
execute("UPDATE plans SET archived = FALSE where plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanActivated(WarrantyPlanActivated { plan_id, .. }) => {
execute("UPDATE plans SET status = 'active' WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanDeactivated(WarrantyPlanDeactivated { plan_id, .. }) => {
execute("UPDATE plans SET status = 'draft' WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanDeleted(WarrantyPlanDeleted { plan_id, .. }) => {
execute("DELETE FROM plans WHERE plan_id = ?1", params![plan_id])?;
}
Query::WarrantyPlanVariantSynced(WarrantyPlanVariantSynced { plan_id, variant_id, .. }) => {
execute(
"UPDATE plans SET shopify_variant_id = ?2 WHERE plan_id = ?1",
params![plan_id, variant_id.to_string()],
)?;
}
Query::WarrantySold(WarrantySold { plan_id, price, .. }) => {
let current_revenue: String = query_row(
"SELECT revenue FROM plans WHERE plan_id = ?1",
params![plan_id],
)
.map(|row| row.get(0))
.unwrap_or_else(|| "0.00".to_string());
let new_revenue = Decimal::from_str(¤t_revenue)? + price;
execute(
"UPDATE plans SET total_sold = total_sold + 1, revenue = ?2 WHERE plan_id = ?1",
params![plan_id, new_revenue.to_string()],
)?;
}
}
Ok(())
}
}
}
The export_projector! macro
#![allow(unused)]
fn main() {
export_projector!(Plans);
}
This macro generates the WASM component interface glue — the projector() constructor, handle() entry point, and query() for declaring the event subscription. Your struct only needs to implement Projector.
Design guidelines
One table per concept
Each projector typically manages one primary concept (plans, shops, warranties). If you find yourself managing unrelated tables in one projector, split them.
Denormalize for reads
Projector tables should be optimized for query patterns, not normalized like an OLTP database. Denormalize freely — the source of truth is the event store, not the projector’s SQLite.
Use CREATE TABLE IF NOT EXISTS
Always use IF NOT EXISTS in init(). Projectors may be replayed from scratch (empty database), and init() is called before replay begins.
Keep handle() fast
Each event handler should be a single SQL statement or a small, bounded operation. Avoid complex computation or anything that could fail non-deterministically. If a handler fails, the projector stops and the error is logged. The runtime will retry (the event store subscription is persistent). Projectors don’t have access to HTTP or other side effects — they’re confined to their SQLite database — so this guidance is mostly about keeping per-event work tight.
Use prepared statements
For SQL that runs on every event, build a Statement once in init() and reuse it:
#![allow(unused)]
fn main() {
struct Widgets {
insert: Statement,
archive: Statement,
}
impl Projector for Widgets {
type Query = WidgetEvents;
fn init() -> anyhow::Result<Self> {
execute_batch("CREATE TABLE IF NOT EXISTS widgets (...)")?;
Ok(Widgets {
insert: prepare("INSERT INTO widgets (id, name) VALUES (?1, ?2)"),
archive: prepare("UPDATE widgets SET archived = TRUE WHERE id = ?1"),
})
}
fn handle(&mut self, event: StoredEvent<WidgetEvents>) -> anyhow::Result<()> {
match event.data {
WidgetEvents::Created(ev) => { self.insert.execute(params![ev.id, ev.name])?; }
WidgetEvents::Archived(ev) => { self.archive.execute(params![ev.id])?; }
}
Ok(())
}
}
}
The statement is parsed once and reused across every event, avoiding the per-event compilation cost.
Replaying projectors
Projectors can be replayed at any time — the runtime will delete the projector’s SQLite database and reprocess all events from position 0. This is done via the API:
POST /projectors/{name}/replay
Or via the CLI:
umari projector replay plans
This is safe because projectors are naturally idempotent. Replaying is the standard way to fix schema changes or recover from corruption.
Scoping in projectors
Projectors have no fold bindings, so dynamic #[scope(field)] is meaningless. Only hardcoded scopes are useful:
#![allow(unused)]
fn main() {
#[derive(EventSet)]
enum Query {
WarrantyPlanCreated(WarrantyPlanCreated), // All plans, all shops
#[scope(topic = "orders/paid")] // Only this topic
WebhookReceived(WebhookReceived),
}
}
Without #[scope(...)], the projector receives every event of that type from the entire event log.