Skip to content

Commit ac7ed7f

Browse files
authored
refreshed documentation (#86)
1 parent b5f2664 commit ac7ed7f

3 files changed

Lines changed: 125 additions & 36 deletions

File tree

README.md

Lines changed: 113 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ durable = "0.1"
4444
```rust
4545
use durable::{Durable, MIGRATOR, Task, TaskContext, TaskResult, WorkerOptions, async_trait};
4646
use serde::{Deserialize, Serialize};
47+
use std::borrow::Cow;
4748

4849
// Define your task parameters and output
4950
#[derive(Serialize, Deserialize)]
@@ -58,15 +59,16 @@ struct ResearchResult {
5859
}
5960

6061
// Implement the Task trait
62+
#[derive(Default)]
6163
struct ResearchTask;
6264

6365
#[async_trait]
64-
impl Task for ResearchTask {
65-
fn name() -> Cow<'static, str> { Cow::Borrowed("research") }
66+
impl Task<()> for ResearchTask {
67+
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("research") }
6668
type Params = ResearchParams;
6769
type Output = ResearchResult;
6870

69-
async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
71+
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
7072
// Phase 1: Find relevant sources (checkpointed)
7173
// If the task crashes after this step, it won't re-run on retry
7274
let sources: Vec<String> = ctx.step("find-sources", (), |_, _| async {
@@ -84,21 +86,26 @@ impl Task for ResearchTask {
8486
}).await?;
8587

8688
// Phase 3: Generate summary (checkpointed)
87-
let summary: String = ctx.step("summarize", params, |params, _| async {
88-
// Summarization logic here...
89-
Ok(format!("Research summary for '{}': {}", params.query, analysis))
90-
}).await?;
89+
// Note: step takes a fn pointer, so captured variables must be passed via params
90+
let summary: String = ctx.step(
91+
"summarize",
92+
(params, analysis),
93+
|(params, analysis), _| async move {
94+
Ok(format!("Research summary for '{}': {}", params.query, analysis))
95+
},
96+
).await?;
9197

9298
Ok(ResearchResult { summary, sources })
9399
}
94100
}
95101

96102
#[tokio::main]
97103
async fn main() -> anyhow::Result<()> {
98-
// Create the client
104+
// Create the client (register tasks on the builder)
99105
let client = Durable::builder()
100106
.database_url("postgres://localhost/myapp")
101107
.queue_name("research")
108+
.register::<ResearchTask>()?
102109
.build()
103110
.await?;
104111

@@ -108,9 +115,6 @@ async fn main() -> anyhow::Result<()> {
108115
// Create the queue (idempotent - safe to call on every startup)
109116
client.create_queue(None).await?;
110117

111-
// Register your task
112-
client.register::<ResearchTask>().await?;
113-
114118
// Spawn a task
115119
let result = client.spawn::<ResearchTask>(ResearchParams {
116120
query: "distributed systems consensus algorithms".into(),
@@ -119,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
119123
println!("Spawned task: {}", result.task_id);
120124

121125
// Start a worker to process tasks
122-
let worker = client.start_worker(WorkerOptions::default()).await;
126+
let worker = client.start_worker(WorkerOptions::default()).await?;
123127

124128
// Wait for shutdown signal
125129
tokio::signal::ctrl_c().await?;
@@ -133,21 +137,62 @@ async fn main() -> anyhow::Result<()> {
133137

134138
### Tasks
135139

136-
Tasks are defined by implementing the [`Task`] trait:
140+
Tasks are defined by implementing the [`Task`] trait. The `State` type parameter allows passing application state (e.g., HTTP clients, database pools) to your tasks. Use `()` if you don't need state.
137141

138142
```rust
143+
#[derive(Default)]
144+
struct MyTask;
145+
139146
#[async_trait]
140-
impl Task for MyTask {
141-
fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier
142-
type Params = MyParams; // Input (JSON-serializable)
143-
type Output = MyOutput; // Output (JSON-serializable)
147+
impl Task<()> for MyTask {
148+
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier
149+
type Params = MyParams; // Input (JSON-serializable)
150+
type Output = MyOutput; // Output (JSON-serializable)
144151

145-
async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
152+
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
146153
// Your task logic here
147154
}
148155
}
149156
```
150157

158+
### Application State
159+
160+
Tasks can receive shared application state (like HTTP clients or database pools) via the generic `State` parameter:
161+
162+
```rust
163+
#[derive(Clone)]
164+
struct AppState {
165+
http_client: reqwest::Client,
166+
}
167+
168+
#[derive(Default)]
169+
struct FetchTask;
170+
171+
#[async_trait]
172+
impl Task<AppState> for FetchTask {
173+
fn name(&self) -> Cow<'static, str> { Cow::Borrowed("fetch") }
174+
type Params = String;
175+
type Output = String;
176+
177+
async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
178+
ctx.step("fetch", url, |url, _| async move {
179+
state.http_client.get(&url).send().await
180+
.map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
181+
.text().await
182+
.map_err(|e| anyhow::anyhow!("HTTP error: {}", e))
183+
}).await
184+
}
185+
}
186+
187+
// Build client with state
188+
let app_state = AppState { http_client: reqwest::Client::new() };
189+
let client = Durable::builder()
190+
.database_url("postgres://localhost/myapp")
191+
.register::<FetchTask>()?
192+
.build_with_state(app_state)
193+
.await?;
194+
```
195+
151196
### User Errors
152197

153198
Return user errors with structured data using `TaskError::user()`:
@@ -169,14 +214,15 @@ The error data is serialized to JSON and stored in the database for debugging an
169214

170215
The [`TaskContext`] provides methods for durable execution:
171216

172-
- **`step(name, params, closure)`** - Execute a checkpointed operation. The closure receives `(params, state)`. If the step completed in a previous run with the same name and params, returns the cached result.
217+
- **`step(name, params, f)`** - Execute a checkpointed operation. The `f` is a `fn(P, StepState<State>) -> Future` (function pointer, not a closure that captures). If the step completed in a previous run with the same name and params hash, returns the cached result.
173218
- **`spawn::<T>(name, params, options)`** - Spawn a subtask and return a handle.
174219
- **`spawn_by_name(name, task_name, params, options)`** - Spawn a subtask by task name (dynamic version).
175220
- **`join(handle)`** - Wait for a subtask to complete and get its result.
176221
- **`sleep_for(name, duration)`** - Suspend the task for a duration.
177222
- **`await_event(name, timeout)`** - Wait for an external event.
178223
- **`emit_event(name, payload)`** - Emit an event to wake waiting tasks.
179-
- **`heartbeat(duration)`** - Extend the task lease for long operations.
224+
- **`heartbeat(duration)`** - Extend the task lease for long operations. Takes `Option<Duration>`.
225+
- **`heartbeat_handle()`** - Get a cloneable `HeartbeatHandle` for use inside step closures.
180226
- **`rand()`** - Generate a durable random value in [0, 1). Checkpointed.
181227
- **`now()`** - Get the current time as a durable checkpoint.
182228
- **`uuid7()`** - Generate a durable UUIDv7. Checkpointed.
@@ -186,12 +232,14 @@ The [`TaskContext`] provides methods for durable execution:
186232
Steps provide "at-least-once" execution. To achieve "exactly-once" semantics for side effects, use the `task_id` as an idempotency key:
187233

188234
```rust
189-
ctx.step("charge-payment", ctx.task_id, |task_id, state| async {
235+
ctx.step("charge-payment", ctx.task_id, |task_id, _step_state| async move {
190236
let idempotency_key = format!("{}:charge", task_id);
191237
stripe::charge(amount, &idempotency_key).await
192238
}).await?;
193239
```
194240

241+
The step closure receives `(params, StepState<State>)`. Since `step` takes a function pointer (`fn`), the closure cannot capture variables from the surrounding scope — pass any needed data through the `params` argument.
242+
195243
### Events
196244

197245
Tasks can wait for and emit events:
@@ -216,7 +264,7 @@ client.emit_event(
216264
Tasks can spawn subtasks and wait for their results using `spawn()` and `join()`:
217265

218266
```rust
219-
async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Output> {
267+
async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
220268
// Spawn subtasks (runs on same queue)
221269
let h1 = ctx.spawn::<ProcessItem>("item-1", Item { id: 1 }, Default::default()).await?;
222270
let h2 = ctx.spawn::<ProcessItem>("item-2", Item { id: 2 }, SpawnOptions {
@@ -225,7 +273,7 @@ async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult<Self::Out
225273
}).await?;
226274

227275
// Do local work while subtasks run...
228-
let local = ctx.step("local-work", (), |_params, _state| async { Ok(compute()) }).await?;
276+
let local = ctx.step("local-work", (), |_, _| async { Ok(compute()) }).await?;
229277

230278
// Wait for subtask results
231279
let r1: ItemResult = ctx.join(h1).await?;
@@ -356,6 +404,16 @@ if let Some(poll) = result {
356404
}
357405
```
358406

407+
### Task Cancellation
408+
409+
Cancel a running or pending task:
410+
411+
```rust
412+
client.cancel_task(task_id, None).await?;
413+
```
414+
415+
When a task is cancelled, all its subtasks are automatically cancelled as well.
416+
359417
### Transactional Spawning
360418

361419
You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does:
@@ -382,6 +440,7 @@ This is useful when you need to guarantee that a task is only enqueued if relate
382440
- `spawn_with(executor, params)` - Spawn with default options
383441
- `spawn_with_options_with(executor, params, options)` - Spawn with custom options
384442
- `spawn_by_name_with(executor, task_name, params, options)` - Dynamic spawn by name
443+
- `emit_event_with(executor, event_name, payload, queue_name)` - Emit event transactionally
385444

386445
## API Overview
387446

@@ -390,14 +449,14 @@ This is useful when you need to guarantee that a task is only enqueued if relate
390449
| Type | Description |
391450
|------|-------------|
392451
| [`Durable`] | Main client for spawning tasks and managing queues |
393-
| [`DurableBuilder`] | Builder for configuring the client |
452+
| [`DurableBuilder`] | Builder for configuring the client (register tasks, set defaults) |
394453
| [`Worker`] | Background worker that processes tasks |
395454

396455
### Task Definition
397456

398457
| Type | Description |
399458
|------|-------------|
400-
| [`Task`] | Trait for defining task types |
459+
| [`Task<State>`] | Trait for defining task types (generic over application state) |
401460
| [`TaskContext`] | Context passed to task execution |
402461
| [`TaskResult<T>`] | Result type alias for task returns |
403462
| [`TaskError`] | Error type with control flow signals and user errors |
@@ -409,10 +468,10 @@ This is useful when you need to guarantee that a task is only enqueued if relate
409468

410469
| Type | Description |
411470
|------|-------------|
412-
| [`SpawnOptions`] | Options for spawning tasks (retries, headers, queue) |
413-
| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts) |
471+
| [`SpawnOptions`] | Options for spawning tasks (retries, headers, cancellation) |
472+
| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts, poll interval) |
414473
| [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` |
415-
| [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration |
474+
| [`CancellationPolicy`] | Auto-cancel tasks based on pending or running time |
416475

417476
### Cron Scheduling
418477

@@ -423,14 +482,39 @@ This is useful when you need to guarantee that a task is only enqueued if relate
423482
| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) |
424483
| [`setup_pgcron()`] | Initialize the pg_cron extension |
425484

426-
### Results
485+
### Results & Errors
427486

428487
| Type | Description |
429488
|------|-------------|
430489
| [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) |
431490
| [`TaskPollResult`] | Result of polling a task (status, output, error) |
432491
| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` |
433492
| [`ControlFlow`] | Signals for suspension and cancellation |
493+
| [`DurableError`] | Error type for client operations |
494+
| [`DurableResult<T>`] | Result type alias for client operations |
495+
496+
### Heartbeat & Step State
497+
498+
| Type | Description |
499+
|------|-------------|
500+
| [`HeartbeatHandle`] | Cloneable handle for extending task leases inside step closures |
501+
| [`StepState<State>`] | State passed to step closures (provides access to app state and heartbeat) |
502+
503+
## Queue Management
504+
505+
```rust
506+
// Create a queue (idempotent)
507+
client.create_queue(None).await?;
508+
509+
// List all queues
510+
let queues = client.list_queues().await?;
511+
512+
// Count unclaimed ready tasks (useful for autoscaling)
513+
let count = client.count_unclaimed_ready_tasks(None).await?;
514+
515+
// Drop a queue (also unschedules its cron jobs)
516+
client.drop_queue(None).await?;
517+
```
434518

435519
## Environment Variables
436520

src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
//! type Output = MyOutput;
3030
//!
3131
//! async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
32-
//! let doubled = ctx.step("double", || async {
32+
//! let doubled = ctx.step("double", params, |params, _| async move {
3333
//! Ok(params.value * 2)
3434
//! }).await?;
3535
//!
@@ -47,7 +47,7 @@
4747
//!
4848
//! client.spawn::<MyTask>(MyParams { value: 21 }).await?;
4949
//!
50-
//! let worker = client.start_worker(WorkerOptions::default()).await;
50+
//! let worker = client.start_worker(WorkerOptions::default()).await?;
5151
//! // ... worker processes tasks until shutdown
5252
//! worker.shutdown().await;
5353
//! Ok(())
@@ -75,9 +75,11 @@
7575
//! type Output = String;
7676
//!
7777
//! async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
78-
//! ctx.step("fetch", || async {
79-
//! state.http_client.get(&url).send().await?.text().await
80-
//! .map_err(|e| anyhow::anyhow!(e))
78+
//! ctx.step("fetch", url, |url, _| async move {
79+
//! state.http_client.get(&url).send().await
80+
//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
81+
//! .text().await
82+
//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))
8183
//! }).await
8284
//! }
8385
//! }
@@ -86,6 +88,7 @@
8688
//! let app_state = AppState { http_client: reqwest::Client::new() };
8789
//! let client = Durable::builder()
8890
//! .database_url("postgres://localhost/myapp")
91+
//! .register::<FetchTask>()?
8992
//! .build_with_state(app_state)
9093
//! .await?;
9194
//! ```

src/task.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::error::{TaskError, TaskResult};
2121
///
2222
/// # Example
2323
/// ```ignore
24+
/// #[derive(Default)]
2425
/// struct SendEmailTask;
2526
///
2627
/// #[async_trait]
@@ -30,7 +31,7 @@ use crate::error::{TaskError, TaskResult};
3031
/// type Output = SendEmailResult;
3132
///
3233
/// async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
33-
/// let result = ctx.step("send", || async {
34+
/// let result = ctx.step("send", params, |params, _| async move {
3435
/// email_service::send(&params.to, &params.subject, &params.body).await
3536
/// }).await?;
3637
///
@@ -44,6 +45,7 @@ use crate::error::{TaskError, TaskResult};
4445
/// http_client: reqwest::Client,
4546
/// }
4647
///
48+
/// #[derive(Default)]
4749
/// struct FetchUrlTask;
4850
///
4951
/// #[async_trait]
@@ -53,7 +55,7 @@ use crate::error::{TaskError, TaskResult};
5355
/// type Output = String;
5456
///
5557
/// async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
56-
/// let body = ctx.step("fetch", || async {
58+
/// let body = ctx.step("fetch", url, |url, _| async move {
5759
/// state.http_client.get(&url).send().await
5860
/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
5961
/// .text().await

0 commit comments

Comments
 (0)