Skip to content

Latest commit

 

History

History
642 lines (493 loc) · 21.3 KB

File metadata and controls

642 lines (493 loc) · 21.3 KB

Concepts

Understanding the mental model behind workflows.

What is a Workflow?

A workflow is a durable state machine. Let's break that down:

State machine: Your code has states (idle, walking, fighting) and transitions between them (start walking, detect threat, win combat).

Durable: The state lives in the database, not in memory. If the server restarts, the workflow continues exactly where it left off.

The Magic of #[workflow]

Write sequential code. The macro transforms it into a state machine:

#[workflow]
fn buff(init: BuffInit) -> Result<BuffResult> {
    let mut stacks: u32 = 1;

    loop {
        select! {
            timer!(BuffTimer::Expire, init.duration_secs.secs()) => break,
            signal!(BuffSignal::Dispel) => break,
            signal!(BuffSignal::Stack(n)) => {
                stacks += n;
                continue
            },
        }.await;
    }

    Ok(BuffResult { final_stacks: stacks })
}

This looks like normal Rust async code, but:

  • stacks survives restarts - It's persisted to the database
  • The timer survives restarts - Server restarts after 3 minutes, timer fires 2 minutes later
  • Signals wake the workflow - External code can send Dispel or Stack(5)

Visual Model

                    ┌─────────────────────────────────────────────────┐
                    │                   WORKFLOW                       │
                    │                                                  │
    start ─────────▶│  ┌───────┐  timer   ┌─────────┐  signal         │
                    │  │ IDLE  │─────────▶│ WALKING │─────────┐       │
                    │  └───────┘          └─────────┘         │       │
                    │                          │              │       │
                    │                    timer │    "threat"  │       │
                    │                          ▼              ▼       │
                    │                    ┌───────────┐  ┌──────────┐  │
                    │                    │AT_WAYPOINT│  │ FIGHTING │  │
                    │                    └───────────┘  └────┬─────┘  │
                    │                          │              │       │
                    │                    timer │     complete │       │
                    │                          ▼              │       │
                    │                    ┌───────────┐        │       │
                    │                    │  WALKING  │◀───────┘       │
                    │                    └───────────┘                │
                    └─────────────────────────────────────────────────┘
                                              │
                                    stored in database
                                              │
                                              ▼
                    ┌─────────────────────────────────────────────────┐
                    │  workflow table                                  │
                    │  ┌────┬──────────┬───────────┬─────────────┐    │
                    │  │ id │   type   │   phase   │    state    │    │
                    │  ├────┼──────────┼───────────┼─────────────┤    │
                    │  │  1 │ "patrol" │     2     │ {waypoint:2}│    │
                    │  └────┴──────────┴───────────┴─────────────┘    │
                    └─────────────────────────────────────────────────┘

Await Points (Suspending)

Workflows suspend at await points. Each maps to a specific event type:

1. Timers - timer!()

Schedule something to happen later:

// Wait for 5 seconds
timer!(MyTimer::Tick, 5.secs()).await;

Key insight: Timers are stored in SpacetimeDB's scheduled table. The database automatically invokes workflow_timer_fire when the time comes. Server restarts don't lose them.

2. Signals - signal!()

External events sent to a workflow:

// From game code: "Hey patrol workflow, there's a threat!"
workflow_signal(ctx, patrol_id, "threat_detected", threat_data)?;

Wait for signals with select!:

select! {
    signal!(PatrolSignal::ThreatDetected(enemy_id)) => {
        // Transition to alert state
        // enemy_id is already deserialized!
    },
    signal!(PatrolSignal::StandDown) => {
        // Return to patrol
    },
}.await;

3. Child Workflows - spawn!()

Spawn a child workflow and wait for its result:

// Spawn combat workflow and wait for it to finish
let combat_result: CombatResult = spawn!("combat", combat_init).await;

// Combat is over, resume patrol
if combat_result.victory {
    // handle victory
}

The parent workflow suspends until the child completes. The child's result is automatically deserialized.

4. Procedures - procedure!()

Call an external procedure and wait for the result:

// Call a procedure to compute damage
let damage: i32 = procedure!("compute_damage", (target_id, weapon_power)).await;

Procedures differ from workflows - they're synchronous calculations that may involve database lookups. Complete them by calling workflow_procedure_complete.

Non-Suspending Calls

These execute immediately without suspending the workflow:

reducer!() - Fire-and-Forget

Call a reducer synchronously within the same transaction:

#[workflow]
fn combat(init: CombatInit) -> Result<CombatOutcome> {
    // These execute immediately, no .await needed
    reducer!(apply_damage(init.target_id, 50));
    reducer!(spawn_effect(init.target_id, "hit"));
    reducer!(log_combat_event(init.target_id, "attack"));

    // Now wait for next phase
    timer!(CombatTimer::Delay, 1.secs()).await;

    Ok(CombatOutcome::Victory)
}

Key insight: reducer!() calls happen in the same database transaction as the workflow state update. They're useful for side effects like updating game state, spawning entities, or logging.

subscribe!() - Signal Filtering

Register interest in specific signals for broadcast routing:

#[workflow]
fn quest(init: QuestInit) -> Result<QuestOutcome> {
    // Subscribe to specific events for this quest
    subscribe!(QuestSignal::EnemyKilled(init.target_enemy_type));
    subscribe!(QuestSignal::ItemCollected(init.target_item_id));

    // Now wait for matching signals via broadcast
    select! {
        signal!(QuestSignal::EnemyKilled(enemy_type)) => { /* ... */ },
        signal!(QuestSignal::ItemCollected(item_id)) => { /* ... */ },
    }.await;

    Ok(QuestOutcome { completed: true })
}

Broadcasting signals (from game code):

// Signal names use snake_case (matches Signal::name())
workflow_broadcast_signal(ctx, "enemy_killed".to_string(), Some(filter), payload)?;

Subscriptions enable workflow_broadcast_signal to route signals only to interested workflows.

The select! Macro

Wait for the first of multiple events:

select! {
    timer!(BuffTimer::Expire, duration) => {
        // Timer fired first
        "expired"
    },
    signal!(BuffSignal::Cancel) => {
        // Signal received first
        "cancelled"
    },
}.await;

Capturing results: The select arm bodies return values that can be captured:

let reason: &str = select! {
    timer!(Timer::Timeout, 30.secs()) => "timeout",
    signal!(Signal::Cancel) => "cancelled",
}.await;

Payload binding: Signal variants with data are automatically deserialized:

select! {
    signal!(BuffSignal::Stack(n)) => {
        // n is already a u32, extracted from the signal payload
        stacks += n;
    },
}.await;

Control Flow

The macro supports Rust control flow constructs with await points inside:

Loops with break / continue

loop {
    select! {
        timer!(Timer::Tick, 1.secs()) => {
            count -= 1;
            if count == 0 { break }  // Exit loop
            continue                  // Next iteration
        },
        signal!(Signal::Cancel) => break,  // Exit loop
    }.await;
}

For Loops

for i in 0..task_count {
    let result: TaskResult = spawn!("task", TaskInit { id: i }).await;
    total += result.value;
}

The loop counter and bounds are tracked in the state struct.

Conditionals

if use_short_timer {
    timer!(Timer::Short, 1.secs()).await;
    value += 10;
} else {
    timer!(Timer::Long, 5.secs()).await;
    value += 50;
}

The condition is evaluated once and stored, so the correct branch resumes after the timer fires.

Workflow Lifecycle

┌──────────┐
│ CREATED  │  workflow_start() called
└────┬─────┘
     │
     ▼
┌──────────┐
│ RUNNING  │  executing workflow code
└────┬─────┘
     │
     ├─────────────────────────────────────┐
     │ hits await point                    │ spawns child
     │ (timer!/signal!/select!)            │ workflow
     ▼                                     ▼
┌───────────┐                        ┌───────────┐
│ SUSPENDED │  waiting for timer     │ SUSPENDED │  waiting for child
└─────┬─────┘  or signal             └─────┬─────┘
      │                                    │
      │ timer fires or                     │ child completes
      │ signal received                    │
      ▼                                    ▼
┌──────────┐                         ┌──────────┐
│ RUNNING  │                         │ RUNNING  │
└────┬─────┘                         └────┬─────┘
     │                                    │
     │ returns Ok(result)                 │
     ▼                                    │
┌───────────┐                             │
│ COMPLETED │◀────────────────────────────┘
└───────────┘

     or

┌──────────┐
│  FAILED  │  workflow returned Err(...)
└──────────┘

     or

┌───────────┐
│ CANCELLED │  workflow_cancel() called
└───────────┘

Defining Timers and Signals

Each workflow defines its own timer and signal types using derive macros:

use workflow_core::prelude::*;
use workflow_macros::{Timer, Signal};

// Timer variants must be unit variants (no data)
#[derive(Timer)]
enum PatrolTimer {
    StartPatrol,
    Arrival,
    Wait,
}

// Signal variants can have payloads
#[derive(Signal)]
enum PatrolSignal {
    ThreatDetected(u64),  // Payload: enemy entity ID
    StandDown,            // No payload
    Configure { speed: f32, alert_range: f32 },  // Struct payload
}

Generated methods:

Trait Method Purpose
Timer name(&self) -> &'static str Get wire name ("arrival")
Timer from_name(name: &str) -> Option<Self> Parse from wire name
Signal name(&self) -> &'static str Get wire name ("threat_detected")
Signal from_name_and_payload(name, payload) Deserialize signal with data

Signal naming: Variant names are converted to snake_case for the wire format:

  • ThreatDetected"threat_detected"
  • StandDown"stand_down"

This is the name used for workflow_signal and workflow_broadcast_signal.

Mutable Variable Tracking

Variables that need to persist across await points must have explicit type annotations:

#[workflow]
fn counter(init: CounterInit) -> Result<CounterResult> {
    // TRACKED: explicit type annotation
    let mut count: u32 = 0;

    // NOT tracked: inferred type
    let multiplier = 2;  // Only valid before first await

    for _ in 0..init.iterations {
        timer!(Timer::Tick, 1.secs()).await;
        count += 1;  // count persists across iterations
    }

    Ok(CounterResult { final_count: count })
}

The macro generates a state struct:

#[derive(Serialize, Deserialize)]
struct CounterWorkflowState {
    phase: u8,
    init: CounterInit,
    count: u32,  // Tracked mutable variable
}

The WorkflowHandler Trait

The macro generates an implementation of WorkflowHandler:

pub trait WorkflowHandler: Send + Sync {
    /// Called once when workflow starts
    fn start(&self, ctx: &WorkflowContext, input: Vec<u8>)
        -> Result<(Vec<u8>, WorkflowResult)>;

    /// Called when an event wakes the workflow
    fn handle(&self, ctx: &WorkflowContext, state: Vec<u8>, event: WorkflowEvent)
        -> Result<(Vec<u8>, WorkflowResult)>;

    /// Returns the workflow type name
    fn workflow_type(&self) -> &'static str;
}

Key insight: Workflows are stateless handlers. All state is passed in and out as serialized bytes. This enables durability - the handler doesn't hold any state in memory.

WorkflowResult - What Handlers Return

Handlers return WorkflowResult to tell the engine what to do:

Variant Meaning
Suspend { timers, subscriptions, ... } Wait for timer or signal
SpawnChild { workflow_type, initial_data, ... } Start a child workflow
CallProcedure { procedure, ... } Call a procedure
Complete { result, ... } Workflow finished successfully
Fail { reason, ... } Workflow failed with error

WorkflowEvent - What Wakes Handlers

When an event occurs, the engine calls handle() with:

Event When
Timer { name } A scheduled timer fired
Signal { name, payload } A signal was sent
ChildComplete { child_id, result } A spawned child finished
ProcedureComplete { name, result } A procedure call completed

Generated Tables

The install! macro generates these SpacetimeDB tables:

Table Purpose
Workflow Primary state - id, type, status, phase, state_data
WorkflowTimer Scheduled timers (auto-fired by SpacetimeDB)
WorkflowSubscription Signal subscriptions for filtering
PendingProcedure In-flight procedure calls
LastWorkflowId Retrieve ID of last created workflow

Generated Reducers

Reducer Purpose
workflow_start Create and start a new workflow
workflow_start_test Start in test mode (timers don't auto-fire)
workflow_signal Send a signal to a specific workflow
workflow_broadcast_signal Send signal to all matching subscriptions
workflow_procedure_complete Complete a pending procedure
workflow_cancel Cancel a running workflow
workflow_timer_fire Internal: called when timer fires
debug_fire_timer Testing: manually fire a timer
debug_send_signal Testing: send signal bypassing subscriptions

Registration with install!

Register workflows in your SpacetimeDB module:

workflow_macros::install! {
    "buff" => BuffWorkflow,
    "patrol" => PatrolWorkflow,
    "combat" => CombatWorkflow,
}

This generates:

  1. All the tables and reducers above
  2. A registration function that runs on first reducer call
  3. Lazy initialization to survive module updates

Why lazy registration? SpacetimeDB WASM reloads reset static state, and init doesn't re-run on updates. The engine ensures workflows are registered before any reducer executes.

Organizing Workflows

Workflows can be associated with game entities and grouped together:

Entity ID

Attach workflows to game entities (players, NPCs, items):

// Start a buff attached to player 42
workflow_start(ctx, "buff".to_string(), Some(42), None, init_data)?;

// Query all workflows for an entity
SELECT * FROM workflow WHERE entity_id = 42;

// Find active buffs on a player
SELECT * FROM workflow
WHERE entity_id = 42
AND workflow_type = 'buff'
AND status IN ('Running', 'Suspended');

Use cases:

  • Buffs/debuffs on a character
  • AI behaviors for an NPC
  • Production queues for a building
  • Cooldowns on an ability

Correlation ID

Group related workflows that span multiple entities:

// A spell that affects multiple targets
let spell_id = uuid::Uuid::new_v4().to_string();

for target_id in affected_targets {
    workflow_start(ctx, "dot".to_string(), Some(target_id), Some(spell_id.clone()), init_data)?;
}

// Cancel all effects from one spell
SELECT id FROM workflow WHERE correlation_id = 'spell-uuid-here';
// Then cancel each one

Use cases:

  • AoE spell affecting multiple targets
  • Quest with multiple objectives (each a workflow)
  • Trade transaction with buyer and seller workflows
  • Raid event with per-player tracking

When to Use Workflows

Good Fit

Scenario Why
NPC AI behaviors Needs to survive restarts, respond to events
Timed game events Buff expires in 60s, respawn in 5 min
Multi-step processes Quest stages, crafting progress
Turn-based systems Combat rounds with timeouts
Economic simulations Factory production cycles

Not a Good Fit

Scenario Why Alternative
Real-time physics Too fast, no persistence needed Game loop
Simple CRUD No state machine needed Direct table ops
Request/response No long-running state Regular reducers
High-frequency updates 60fps is too fast Game loop + periodic sync

Workflows vs Regular Tables

"Why not just use a table with a state column?"

You could! But you'd need to:

  1. Poll for timers - Check every second if any timers expired
  2. Handle races - Two reducers read same workflow simultaneously
  3. Manage transitions - Ensure state changes are atomic
  4. Track timers separately - Another table, more bookkeeping
  5. Implement dispatch - Route events to the right handler code

The workflow engine handles all of this:

You Write Engine Handles
Sequential code State machine transformation
timer!().await Timer persistence and firing
signal!() Signal delivery and dispatch
reducer!() Synchronous reducer execution
subscribe!() Signal subscription routing
Mutable variables Serialization to/from database
- Atomic state updates
- Parent/child coordination

How the Macro Works

The #[workflow] macro transforms your sequential code:

  1. Analyzes the function body - Finds await points (timer!, signal!, spawn!, procedure!, select!), non-suspending calls (reducer!, subscribe!), local variables, and control flow
  2. Generates state struct - Tracks phase, init data, and mutable variables
  3. Creates state machine - Each await point becomes a phase with event dispatch
  4. Handles persistence - State serialized at each await, restored on wake

Your code:

let mut count: u32 = 0;
timer!(MyTimer::Tick, 1.secs()).await;
count += 1;
timer!(MyTimer::Tick, 1.secs()).await;

Becomes (conceptually):

struct State { phase: u8, init: MyInit, count: u32 }

// start(): Phase 0
state.count = 0;
return Suspend { timers: [Timer("tick", 1s)] }

// handle(event): Phase 0 -> 1
state.count += 1;
state.phase = 1;
return Suspend { timers: [Timer("tick", 1s)] }

// handle(event): Phase 1 -> Complete
return Complete { result: ... }

Key Concepts Summary

Concept What It Is
Workflow Durable state machine stored in database
Phase Current position in the workflow (internal tracking)
Timer Scheduled future event, survives restarts
Signal External event sent to workflow
select! Wait for first matching timer or signal
spawn! Start child workflow, wait for result
procedure! Call procedure, wait for result
reducer! Fire-and-forget reducer call (no suspend)
subscribe! Register interest in signals for broadcast routing
Mutable vars Variables with explicit types that persist across awaits
WorkflowHandler Trait implemented by generated workflow structs
WorkflowResult What handlers return to control execution
WorkflowEvent What wakes a suspended workflow

Next Steps

Now that you understand the model: