Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 113 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ durable = "0.1"
```rust
use durable::{Durable, MIGRATOR, Task, TaskContext, TaskResult, WorkerOptions, async_trait};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

// Define your task parameters and output
#[derive(Serialize, Deserialize)]
Expand All @@ -58,15 +59,16 @@ struct ResearchResult {
}

// Implement the Task trait
#[derive(Default)]
struct ResearchTask;

#[async_trait]
impl Task for ResearchTask {
fn name() -> Cow<'static, str> { Cow::Borrowed("research") }
impl Task<()> for ResearchTask {
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("research") }
type Params = ResearchParams;
type Output = ResearchResult;

async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
// Phase 1: Find relevant sources (checkpointed)
// If the task crashes after this step, it won't re-run on retry
let sources: Vec<String> = ctx.step("find-sources", (), |_, _| async {
Expand All @@ -84,21 +86,26 @@ impl Task for ResearchTask {
}).await?;

// Phase 3: Generate summary (checkpointed)
let summary: String = ctx.step("summarize", params, |params, _| async {
// Summarization logic here...
Ok(format!("Research summary for '{}': {}", params.query, analysis))
}).await?;
// Note: step takes a fn pointer, so captured variables must be passed via params
let summary: String = ctx.step(
"summarize",
(params, analysis),
|(params, analysis), _| async move {
Ok(format!("Research summary for '{}': {}", params.query, analysis))
},
).await?;

Ok(ResearchResult { summary, sources })
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create the client
// Create the client (register tasks on the builder)
let client = Durable::builder()
.database_url("postgres://localhost/myapp")
.queue_name("research")
.register::<ResearchTask>()?
.build()
.await?;

Expand All @@ -108,9 +115,6 @@ async fn main() -> anyhow::Result<()> {
// Create the queue (idempotent - safe to call on every startup)
client.create_queue(None).await?;

// Register your task
client.register::<ResearchTask>().await?;

// Spawn a task
let result = client.spawn::<ResearchTask>(ResearchParams {
query: "distributed systems consensus algorithms".into(),
Expand All @@ -119,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
println!("Spawned task: {}", result.task_id);

// Start a worker to process tasks
let worker = client.start_worker(WorkerOptions::default()).await;
let worker = client.start_worker(WorkerOptions::default()).await?;

// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
Expand All @@ -133,21 +137,62 @@ async fn main() -> anyhow::Result<()> {

### Tasks

Tasks are defined by implementing the [`Task`] trait:
Tasks are defined by implementing the [`Task`] trait. The `State` type parameter allows passing application state (e.g., HTTP clients, database pools) to your tasks. Use `()` if you don't need state.

```rust
#[derive(Default)]
struct MyTask;

#[async_trait]
impl Task for MyTask {
fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier
type Params = MyParams; // Input (JSON-serializable)
type Output = MyOutput; // Output (JSON-serializable)
impl Task<()> for MyTask {
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier
type Params = MyParams; // Input (JSON-serializable)
type Output = MyOutput; // Output (JSON-serializable)

async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
// Your task logic here
}
}
```

### Application State

Tasks can receive shared application state (like HTTP clients or database pools) via the generic `State` parameter:

```rust
#[derive(Clone)]
struct AppState {
http_client: reqwest::Client,
}

#[derive(Default)]
struct FetchTask;

#[async_trait]
impl Task<AppState> for FetchTask {
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("fetch") }
type Params = String;
type Output = String;

async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
ctx.step("fetch", url, |url, _| async move {
state.http_client.get(&url).send().await
.map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
.text().await
.map_err(|e| anyhow::anyhow!("HTTP error: {}", e))
}).await
}
}

// Build client with state
let app_state = AppState { http_client: reqwest::Client::new() };
let client = Durable::builder()
.database_url("postgres://localhost/myapp")
.register::<FetchTask>()?
.build_with_state(app_state)
.await?;
```

### User Errors

Return user errors with structured data using `TaskError::user()`:
Expand All @@ -169,14 +214,15 @@ The error data is serialized to JSON and stored in the database for debugging an

The [`TaskContext`] provides methods for durable execution:

- **`step(name, params, closure)`** - Execute a checkpointed operation. The closure receives `(params, state)`. If the step completed in a previous run with the same name and params, returns the cached result.
- **`step(name, params, f)`** - Execute a checkpointed operation. The `f` is a `fn(P, StepState<State>) -> Future` (function pointer, not a closure that captures). If the step completed in a previous run with the same name and params hash, returns the cached result.
- **`spawn::<T>(name, params, options)`** - Spawn a subtask and return a handle.
- **`spawn_by_name(name, task_name, params, options)`** - Spawn a subtask by task name (dynamic version).
- **`join(handle)`** - Wait for a subtask to complete and get its result.
- **`sleep_for(name, duration)`** - Suspend the task for a duration.
- **`await_event(name, timeout)`** - Wait for an external event.
- **`emit_event(name, payload)`** - Emit an event to wake waiting tasks.
- **`heartbeat(duration)`** - Extend the task lease for long operations.
- **`heartbeat(duration)`** - Extend the task lease for long operations. Takes `Option<Duration>`.
- **`heartbeat_handle()`** - Get a cloneable `HeartbeatHandle` for use inside step closures.
- **`rand()`** - Generate a durable random value in [0, 1). Checkpointed.
- **`now()`** - Get the current time as a durable checkpoint.
- **`uuid7()`** - Generate a durable UUIDv7. Checkpointed.
Expand All @@ -186,12 +232,14 @@ The [`TaskContext`] provides methods for durable execution:
Steps provide "at-least-once" execution. To achieve "exactly-once" semantics for side effects, use the `task_id` as an idempotency key:

```rust
ctx.step("charge-payment", ctx.task_id, |task_id, state| async {
ctx.step("charge-payment", ctx.task_id, |task_id, _step_state| async move {
let idempotency_key = format!("{}:charge", task_id);
stripe::charge(amount, &idempotency_key).await
}).await?;
```

The step closure receives `(params, StepState<State>)`. Since `step` takes a function pointer (`fn`), the closure cannot capture variables from the surrounding scope — pass any needed data through the `params` argument.

### Events

Tasks can wait for and emit events:
Expand All @@ -216,7 +264,7 @@ client.emit_event(
Tasks can spawn subtasks and wait for their results using `spawn()` and `join()`:

```rust
async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
// Spawn subtasks (runs on same queue)
let h1 = ctx.spawn::<ProcessItem>("item-1", Item { id: 1 }, Default::default()).await?;
let h2 = ctx.spawn::<ProcessItem>("item-2", Item { id: 2 }, SpawnOptions {
Expand All @@ -225,7 +273,7 @@ async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Out
}).await?;

// Do local work while subtasks run...
let local = ctx.step("local-work", (), |_params, _state| async { Ok(compute()) }).await?;
let local = ctx.step("local-work", (), |_, _| async { Ok(compute()) }).await?;

// Wait for subtask results
let r1: ItemResult = ctx.join(h1).await?;
Expand Down Expand Up @@ -356,6 +404,16 @@ if let Some(poll) = result {
}
```

### Task Cancellation

Cancel a running or pending task:

```rust
client.cancel_task(task_id, None).await?;
```

When a task is cancelled, all its subtasks are automatically cancelled as well.

### Transactional Spawning

You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does:
Expand All @@ -382,6 +440,7 @@ This is useful when you need to guarantee that a task is only enqueued if relate
- `spawn_with(executor, params)` - Spawn with default options
- `spawn_with_options_with(executor, params, options)` - Spawn with custom options
- `spawn_by_name_with(executor, task_name, params, options)` - Dynamic spawn by name
- `emit_event_with(executor, event_name, payload, queue_name)` - Emit event transactionally

## API Overview

Expand All @@ -390,14 +449,14 @@ This is useful when you need to guarantee that a task is only enqueued if relate
| Type | Description |
|------|-------------|
| [`Durable`] | Main client for spawning tasks and managing queues |
| [`DurableBuilder`] | Builder for configuring the client |
| [`DurableBuilder`] | Builder for configuring the client (register tasks, set defaults) |
| [`Worker`] | Background worker that processes tasks |

### Task Definition

| Type | Description |
|------|-------------|
| [`Task`] | Trait for defining task types |
| [`Task<State>`] | Trait for defining task types (generic over application state) |
| [`TaskContext`] | Context passed to task execution |
| [`TaskResult<T>`] | Result type alias for task returns |
| [`TaskError`] | Error type with control flow signals and user errors |
Expand All @@ -409,10 +468,10 @@ This is useful when you need to guarantee that a task is only enqueued if relate

| Type | Description |
|------|-------------|
| [`SpawnOptions`] | Options for spawning tasks (retries, headers, queue) |
| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts) |
| [`SpawnOptions`] | Options for spawning tasks (retries, headers, cancellation) |
| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts, poll interval) |
| [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` |
| [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration |
| [`CancellationPolicy`] | Auto-cancel tasks based on pending or running time |

### Cron Scheduling

Expand All @@ -423,14 +482,39 @@ This is useful when you need to guarantee that a task is only enqueued if relate
| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) |
| [`setup_pgcron()`] | Initialize the pg_cron extension |

### Results
### Results & Errors

| Type | Description |
|------|-------------|
| [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) |
| [`TaskPollResult`] | Result of polling a task (status, output, error) |
| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` |
| [`ControlFlow`] | Signals for suspension and cancellation |
| [`DurableError`] | Error type for client operations |
| [`DurableResult<T>`] | Result type alias for client operations |

### Heartbeat & Step State

| Type | Description |
|------|-------------|
| [`HeartbeatHandle`] | Cloneable handle for extending task leases inside step closures |
| [`StepState<State>`] | State passed to step closures (provides access to app state and heartbeat) |

## Queue Management

```rust
// Create a queue (idempotent)
client.create_queue(None).await?;

// List all queues
let queues = client.list_queues().await?;

// Count unclaimed ready tasks (useful for autoscaling)
let count = client.count_unclaimed_ready_tasks(None).await?;

// Drop a queue (also unschedules its cron jobs)
client.drop_queue(None).await?;
```

## Environment Variables

Expand Down
13 changes: 8 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! type Output = MyOutput;
//!
//! async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
//! let doubled = ctx.step("double", || async {
//! let doubled = ctx.step("double", params, |params, _| async move {
//! Ok(params.value * 2)
//! }).await?;
//!
Expand All @@ -47,7 +47,7 @@
//!
//! client.spawn::<MyTask>(MyParams { value: 21 }).await?;
//!
//! let worker = client.start_worker(WorkerOptions::default()).await;
//! let worker = client.start_worker(WorkerOptions::default()).await?;
//! // ... worker processes tasks until shutdown
//! worker.shutdown().await;
//! Ok(())
Expand Down Expand Up @@ -75,9 +75,11 @@
//! type Output = String;
//!
//! async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
//! ctx.step("fetch", || async {
//! state.http_client.get(&url).send().await?.text().await
//! .map_err(|e| anyhow::anyhow!(e))
//! ctx.step("fetch", url, |url, _| async move {
//! state.http_client.get(&url).send().await
//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
//! .text().await
//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))
//! }).await
//! }
//! }
Expand All @@ -86,6 +88,7 @@
//! let app_state = AppState { http_client: reqwest::Client::new() };
//! let client = Durable::builder()
//! .database_url("postgres://localhost/myapp")
//! .register::<FetchTask>()?
//! .build_with_state(app_state)
//! .await?;
//! ```
Expand Down
6 changes: 4 additions & 2 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::error::{TaskError, TaskResult};
///
/// # Example
/// ```ignore
/// #[derive(Default)]
/// struct SendEmailTask;
///
/// #[async_trait]
Expand All @@ -30,7 +31,7 @@ use crate::error::{TaskError, TaskResult};
/// type Output = SendEmailResult;
///
/// async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
/// let result = ctx.step("send", || async {
/// let result = ctx.step("send", params, |params, _| async move {
/// email_service::send(&params.to, &params.subject, &params.body).await
/// }).await?;
///
Expand All @@ -44,6 +45,7 @@ use crate::error::{TaskError, TaskResult};
/// http_client: reqwest::Client,
/// }
///
/// #[derive(Default)]
/// struct FetchUrlTask;
///
/// #[async_trait]
Expand All @@ -53,7 +55,7 @@ use crate::error::{TaskError, TaskResult};
/// type Output = String;
///
/// async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
/// let body = ctx.step("fetch", || async {
/// let body = ctx.step("fetch", url, |url, _| async move {
/// state.http_client.get(&url).send().await
/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
/// .text().await
Expand Down
Loading