diff --git a/README.md b/README.md index 320c069..e1e5a03 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ durable = "0.1" ```rust use durable::{Durable, MIGRATOR, Task, TaskContext, TaskResult, WorkerOptions, async_trait}; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; // Define your task parameters and output #[derive(Serialize, Deserialize)] @@ -58,15 +59,16 @@ struct ResearchResult { } // Implement the Task trait +#[derive(Default)] struct ResearchTask; #[async_trait] -impl Task for ResearchTask { - fn name() -> Cow<'static, str> { Cow::Borrowed("research") } +impl Task<()> for ResearchTask { + fn name(&self) -> Cow<'static, str> { Cow::Borrowed("research") } type Params = ResearchParams; type Output = ResearchResult; - async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { + async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { // Phase 1: Find relevant sources (checkpointed) // If the task crashes after this step, it won't re-run on retry let sources: Vec = ctx.step("find-sources", (), |_, _| async { @@ -84,10 +86,14 @@ impl Task for ResearchTask { }).await?; // Phase 3: Generate summary (checkpointed) - let summary: String = ctx.step("summarize", params, |params, _| async { - // Summarization logic here... - Ok(format!("Research summary for '{}': {}", params.query, analysis)) - }).await?; + // Note: step takes a fn pointer, so captured variables must be passed via params + let summary: String = ctx.step( + "summarize", + (params, analysis), + |(params, analysis), _| async move { + Ok(format!("Research summary for '{}': {}", params.query, analysis)) + }, + ).await?; Ok(ResearchResult { summary, sources }) } @@ -95,10 +101,11 @@ impl Task for ResearchTask { #[tokio::main] async fn main() -> anyhow::Result<()> { - // Create the client + // Create the client (register tasks on the builder) let client = Durable::builder() .database_url("postgres://localhost/myapp") .queue_name("research") + .register::()? .build() .await?; @@ -108,9 +115,6 @@ async fn main() -> anyhow::Result<()> { // Create the queue (idempotent - safe to call on every startup) client.create_queue(None).await?; - // Register your task - client.register::().await?; - // Spawn a task let result = client.spawn::(ResearchParams { query: "distributed systems consensus algorithms".into(), @@ -119,7 +123,7 @@ async fn main() -> anyhow::Result<()> { println!("Spawned task: {}", result.task_id); // Start a worker to process tasks - let worker = client.start_worker(WorkerOptions::default()).await; + let worker = client.start_worker(WorkerOptions::default()).await?; // Wait for shutdown signal tokio::signal::ctrl_c().await?; @@ -133,21 +137,62 @@ async fn main() -> anyhow::Result<()> { ### Tasks -Tasks are defined by implementing the [`Task`] trait: +Tasks are defined by implementing the [`Task`] trait. The `State` type parameter allows passing application state (e.g., HTTP clients, database pools) to your tasks. Use `()` if you don't need state. ```rust +#[derive(Default)] +struct MyTask; + #[async_trait] -impl Task for MyTask { - fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier - type Params = MyParams; // Input (JSON-serializable) - type Output = MyOutput; // Output (JSON-serializable) +impl Task<()> for MyTask { + fn name(&self) -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier + type Params = MyParams; // Input (JSON-serializable) + type Output = MyOutput; // Output (JSON-serializable) - async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { + async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { // Your task logic here } } ``` +### Application State + +Tasks can receive shared application state (like HTTP clients or database pools) via the generic `State` parameter: + +```rust +#[derive(Clone)] +struct AppState { + http_client: reqwest::Client, +} + +#[derive(Default)] +struct FetchTask; + +#[async_trait] +impl Task for FetchTask { + fn name(&self) -> Cow<'static, str> { Cow::Borrowed("fetch") } + type Params = String; + type Output = String; + + async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { + ctx.step("fetch", url, |url, _| async move { + state.http_client.get(&url).send().await + .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))? + .text().await + .map_err(|e| anyhow::anyhow!("HTTP error: {}", e)) + }).await + } +} + +// Build client with state +let app_state = AppState { http_client: reqwest::Client::new() }; +let client = Durable::builder() + .database_url("postgres://localhost/myapp") + .register::()? + .build_with_state(app_state) + .await?; +``` + ### User Errors Return user errors with structured data using `TaskError::user()`: @@ -169,14 +214,15 @@ The error data is serialized to JSON and stored in the database for debugging an The [`TaskContext`] provides methods for durable execution: -- **`step(name, params, closure)`** - Execute a checkpointed operation. The closure receives `(params, state)`. If the step completed in a previous run with the same name and params, returns the cached result. +- **`step(name, params, f)`** - Execute a checkpointed operation. The `f` is a `fn(P, StepState) -> Future` (function pointer, not a closure that captures). If the step completed in a previous run with the same name and params hash, returns the cached result. - **`spawn::(name, params, options)`** - Spawn a subtask and return a handle. - **`spawn_by_name(name, task_name, params, options)`** - Spawn a subtask by task name (dynamic version). - **`join(handle)`** - Wait for a subtask to complete and get its result. - **`sleep_for(name, duration)`** - Suspend the task for a duration. - **`await_event(name, timeout)`** - Wait for an external event. - **`emit_event(name, payload)`** - Emit an event to wake waiting tasks. -- **`heartbeat(duration)`** - Extend the task lease for long operations. +- **`heartbeat(duration)`** - Extend the task lease for long operations. Takes `Option`. +- **`heartbeat_handle()`** - Get a cloneable `HeartbeatHandle` for use inside step closures. - **`rand()`** - Generate a durable random value in [0, 1). Checkpointed. - **`now()`** - Get the current time as a durable checkpoint. - **`uuid7()`** - Generate a durable UUIDv7. Checkpointed. @@ -186,12 +232,14 @@ The [`TaskContext`] provides methods for durable execution: Steps provide "at-least-once" execution. To achieve "exactly-once" semantics for side effects, use the `task_id` as an idempotency key: ```rust -ctx.step("charge-payment", ctx.task_id, |task_id, state| async { +ctx.step("charge-payment", ctx.task_id, |task_id, _step_state| async move { let idempotency_key = format!("{}:charge", task_id); stripe::charge(amount, &idempotency_key).await }).await?; ``` +The step closure receives `(params, StepState)`. Since `step` takes a function pointer (`fn`), the closure cannot capture variables from the surrounding scope — pass any needed data through the `params` argument. + ### Events Tasks can wait for and emit events: @@ -216,7 +264,7 @@ client.emit_event( Tasks can spawn subtasks and wait for their results using `spawn()` and `join()`: ```rust -async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { +async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { // Spawn subtasks (runs on same queue) let h1 = ctx.spawn::("item-1", Item { id: 1 }, Default::default()).await?; let h2 = ctx.spawn::("item-2", Item { id: 2 }, SpawnOptions { @@ -225,7 +273,7 @@ async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult`] | Trait for defining task types (generic over application state) | | [`TaskContext`] | Context passed to task execution | | [`TaskResult`] | Result type alias for task returns | | [`TaskError`] | Error type with control flow signals and user errors | @@ -409,10 +468,10 @@ This is useful when you need to guarantee that a task is only enqueued if relate | Type | Description | |------|-------------| -| [`SpawnOptions`] | Options for spawning tasks (retries, headers, queue) | -| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts) | +| [`SpawnOptions`] | Options for spawning tasks (retries, headers, cancellation) | +| [`WorkerOptions`] | Options for worker configuration (concurrency, timeouts, poll interval) | | [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` | -| [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration | +| [`CancellationPolicy`] | Auto-cancel tasks based on pending or running time | ### Cron Scheduling @@ -423,7 +482,7 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) | | [`setup_pgcron()`] | Initialize the pg_cron extension | -### Results +### Results & Errors | Type | Description | |------|-------------| @@ -431,6 +490,31 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`TaskPollResult`] | Result of polling a task (status, output, error) | | [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` | | [`ControlFlow`] | Signals for suspension and cancellation | +| [`DurableError`] | Error type for client operations | +| [`DurableResult`] | Result type alias for client operations | + +### Heartbeat & Step State + +| Type | Description | +|------|-------------| +| [`HeartbeatHandle`] | Cloneable handle for extending task leases inside step closures | +| [`StepState`] | State passed to step closures (provides access to app state and heartbeat) | + +## Queue Management + +```rust +// Create a queue (idempotent) +client.create_queue(None).await?; + +// List all queues +let queues = client.list_queues().await?; + +// Count unclaimed ready tasks (useful for autoscaling) +let count = client.count_unclaimed_ready_tasks(None).await?; + +// Drop a queue (also unschedules its cron jobs) +client.drop_queue(None).await?; +``` ## Environment Variables diff --git a/src/lib.rs b/src/lib.rs index 557c4c3..fe71f04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,7 @@ //! type Output = MyOutput; //! //! async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { -//! let doubled = ctx.step("double", || async { +//! let doubled = ctx.step("double", params, |params, _| async move { //! Ok(params.value * 2) //! }).await?; //! @@ -47,7 +47,7 @@ //! //! client.spawn::(MyParams { value: 21 }).await?; //! -//! let worker = client.start_worker(WorkerOptions::default()).await; +//! let worker = client.start_worker(WorkerOptions::default()).await?; //! // ... worker processes tasks until shutdown //! worker.shutdown().await; //! Ok(()) @@ -75,9 +75,11 @@ //! type Output = String; //! //! async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { -//! ctx.step("fetch", || async { -//! state.http_client.get(&url).send().await?.text().await -//! .map_err(|e| anyhow::anyhow!(e)) +//! ctx.step("fetch", url, |url, _| async move { +//! state.http_client.get(&url).send().await +//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))? +//! .text().await +//! .map_err(|e| anyhow::anyhow!("HTTP error: {}", e)) //! }).await //! } //! } @@ -86,6 +88,7 @@ //! let app_state = AppState { http_client: reqwest::Client::new() }; //! let client = Durable::builder() //! .database_url("postgres://localhost/myapp") +//! .register::()? //! .build_with_state(app_state) //! .await?; //! ``` diff --git a/src/task.rs b/src/task.rs index d02ad36..65b445a 100644 --- a/src/task.rs +++ b/src/task.rs @@ -21,6 +21,7 @@ use crate::error::{TaskError, TaskResult}; /// /// # Example /// ```ignore +/// #[derive(Default)] /// struct SendEmailTask; /// /// #[async_trait] @@ -30,7 +31,7 @@ use crate::error::{TaskError, TaskResult}; /// type Output = SendEmailResult; /// /// async fn run(&self, params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { -/// let result = ctx.step("send", || async { +/// let result = ctx.step("send", params, |params, _| async move { /// email_service::send(¶ms.to, ¶ms.subject, ¶ms.body).await /// }).await?; /// @@ -44,6 +45,7 @@ use crate::error::{TaskError, TaskResult}; /// http_client: reqwest::Client, /// } /// +/// #[derive(Default)] /// struct FetchUrlTask; /// /// #[async_trait] @@ -53,7 +55,7 @@ use crate::error::{TaskError, TaskResult}; /// type Output = String; /// /// async fn run(&self, url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { -/// let body = ctx.step("fetch", || async { +/// let body = ctx.step("fetch", url, |url, _| async move { /// state.http_client.get(&url).send().await /// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))? /// .text().await