diff --git a/src/context.rs b/src/context.rs index 50c14bd..db972f3 100644 --- a/src/context.rs +++ b/src/context.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::Durable; use crate::error::suspend_handle::SuspendMarker; -use crate::error::{ControlFlow, TaskError, TaskResult}; +use crate::error::{ControlFlow, NonControlTaskError, TaskError, TaskResult}; use std::sync::Arc; use crate::heartbeat::{HeartbeatHandle, Heartbeater, StepState}; @@ -93,9 +93,9 @@ where /// Validate that a user-provided step name doesn't use reserved prefix. fn validate_user_name(name: &str) -> TaskResult<()> { if name.starts_with('$') { - return Err(TaskError::Validation { + return Err(TaskError::NonControl(NonControlTaskError::Validation { message: "Step names cannot start with '$' (reserved for internal use)".to_string(), - }); + })); } Ok(()) } @@ -106,9 +106,9 @@ where { pub(crate) fn mark_suspended(&mut self) -> TaskResult<()> { if self.has_suspended { - return Err(TaskError::Validation { + return Err(TaskError::NonControl(NonControlTaskError::Validation { message: "Task has already been suspended during this execution".to_string(), - }); + })); } self.has_suspended = true; Ok(()) @@ -242,9 +242,11 @@ where state: self.durable.state().clone(), heartbeater: Arc::new(self.heartbeat_handle.clone()), }; - let result = f(params, step_state).await.map_err(|e| TaskError::Step { - base_name: base_name.to_string(), - error: e, + let result = f(params, step_state).await.map_err(|e| { + TaskError::NonControl(NonControlTaskError::Step { + base_name: base_name.to_string(), + error: e, + }) })?; // Persist checkpoint (also extends claim lease) @@ -423,9 +425,9 @@ where // Check if we were woken by this event but it timed out (null payload) if self.task.wake_event.as_deref() == Some(event_name) && self.task.event_payload.is_none() { - return Err(TaskError::Timeout { + return Err(TaskError::NonControl(NonControlTaskError::Timeout { step_name: event_name.to_string(), - }); + })); } // Call await_event stored procedure @@ -505,9 +507,11 @@ where self.durable .emit_event(event_name, payload, None) .await - .map_err(|e| TaskError::EmitEventFailed { - event_name: event_name.to_string(), - error: e, + .map_err(|e| { + TaskError::NonControl(NonControlTaskError::EmitEventFailed { + event_name: event_name.to_string(), + error: e, + }) }) } @@ -566,8 +570,10 @@ where if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) { let stored: String = serde_json::from_value(cached.clone())?; return Ok(DateTime::parse_from_rfc3339(&stored) - .map_err(|e| TaskError::Validation { - message: format!("Invalid stored time: {e}"), + .map_err(|e| { + TaskError::NonControl(NonControlTaskError::Validation { + message: format!("Invalid stored time: {e}"), + }) })? .with_timezone(&Utc)); } @@ -716,9 +722,11 @@ where }, ) .await - .map_err(|e| TaskError::SubtaskSpawnFailed { - name: task_name.to_string(), - error: e, + .map_err(|e| { + TaskError::NonControl(NonControlTaskError::SubtaskSpawnFailed { + name: task_name.to_string(), + error: e, + }) })?; // Checkpoint the spawn self.persist_checkpoint(&checkpoint_name, &spawned_task.task_id) @@ -781,9 +789,9 @@ where // Check if we were woken by this event but it timed out (null payload) if self.task.wake_event.as_deref() == Some(&event_name) && self.task.event_payload.is_none() { - return Err(TaskError::Timeout { + return Err(TaskError::NonControl(NonControlTaskError::Timeout { step_name: step_name.to_string(), - }); + })); } // Call await_event stored procedure (no timeout for join - we wait indefinitely) @@ -829,8 +837,10 @@ where ) -> TaskResult { match payload.status { ChildStatus::Completed => { - let result = payload.result.ok_or_else(|| TaskError::Validation { - message: "Child completed but no result available".to_string(), + let result = payload.result.ok_or_else(|| { + TaskError::NonControl(NonControlTaskError::Validation { + message: "Child completed but no result available".to_string(), + }) })?; Ok(serde_json::from_value(result)?) } @@ -839,14 +849,16 @@ where .error .and_then(|e| e.get("message").and_then(|m| m.as_str()).map(String::from)) .unwrap_or_else(|| "Unknown error".to_string()); - Err(TaskError::ChildFailed { + Err(TaskError::NonControl(NonControlTaskError::ChildFailed { step_name: step_name.to_string(), message, - }) + })) + } + ChildStatus::Cancelled => { + Err(TaskError::NonControl(NonControlTaskError::ChildCancelled { + step_name: step_name.to_string(), + })) } - ChildStatus::Cancelled => Err(TaskError::ChildCancelled { - step_name: step_name.to_string(), - }), } } } diff --git a/src/error.rs b/src/error.rs index b93a069..f706db5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,38 +46,13 @@ pub mod suspend_handle { } } -/// Error type for task execution. -/// -/// This enum distinguishes between control flow signals (suspension, cancellation) -/// and actual failures. The worker handles these differently: -/// -/// - `Control(Suspend)` - Task is waiting; worker does nothing (scheduler will resume it) -/// - `Control(Cancelled)` - Task was cancelled; worker does nothing -/// - `Control(LeaseExpired)` - Task lost its lease; worker stops without failing the run -/// - All other variants - Actual errors; worker records failure and may retry -/// -/// # Example +/// Non-control-flow error variants for task execution. /// -/// ```ignore -/// match ctx.await_event::("my-event", Some(Duration::from_secs(30))).await { -/// Ok(payload) => { /* handle payload */ } -/// Err(TaskError::Timeout { step_name }) => { -/// println!("Timed out waiting for {}", step_name); -/// } -/// Err(TaskError::Control(ControlFlow::Cancelled)) => { -/// println!("Task was cancelled"); -/// } -/// Err(e) => { /* handle other errors */ } -/// } -/// ``` +/// These represent actual failures (as opposed to control flow signals like suspension +/// or cancellation). The worker records these as failures and may retry depending on +/// [`NonControlTaskError::retryable`]. #[derive(Debug, Error)] -pub enum TaskError { - /// Control flow signal - not an actual error. - /// - /// The worker will not mark the task as failed or trigger retries. - #[error("control flow: {0:?}")] - Control(ControlFlow), - +pub enum NonControlTaskError { /// The operation timed out. /// /// Returned by [`TaskContext::await_event`](crate::TaskContext::await_event) when @@ -170,23 +145,70 @@ pub enum TaskError { }, } -impl TaskError { +impl NonControlTaskError { pub fn retryable(&self) -> bool { match self { // These are non-deterministic errors, which might succeed on a retry // (which will have the same checkpoint cache up to the point of the error) - TaskError::Timeout { .. } | TaskError::Database(_) | TaskError::Step { .. } => true, + NonControlTaskError::Timeout { .. } + | NonControlTaskError::Database(_) + | NonControlTaskError::Step { .. } => true, // Everything else is considered to be a deterministic error, which will fail again // on a retry - TaskError::SubtaskSpawnFailed { .. } - | TaskError::EmitEventFailed { .. } - | TaskError::Control(_) - | TaskError::Serialization(_) - | TaskError::ChildFailed { .. } - | TaskError::ChildCancelled { .. } - | TaskError::Validation { .. } - | TaskError::User { .. } - | TaskError::TaskPanicked { .. } => false, + NonControlTaskError::SubtaskSpawnFailed { .. } + | NonControlTaskError::EmitEventFailed { .. } + | NonControlTaskError::Serialization(_) + | NonControlTaskError::ChildFailed { .. } + | NonControlTaskError::ChildCancelled { .. } + | NonControlTaskError::Validation { .. } + | NonControlTaskError::User { .. } + | NonControlTaskError::TaskPanicked { .. } => false, + } + } +} + +/// Error type for task execution. +/// +/// This enum distinguishes between control flow signals (suspension, cancellation) +/// and actual failures. The worker handles these differently: +/// +/// - `Control(Suspend)` - Task is waiting; worker does nothing (scheduler will resume it) +/// - `Control(Cancelled)` - Task was cancelled; worker does nothing +/// - `Control(LeaseExpired)` - Task lost its lease; worker stops without failing the run +/// - `NonControl(_)` - Actual errors; worker records failure and may retry +/// +/// # Example +/// +/// ```ignore +/// match ctx.await_event::("my-event", Some(Duration::from_secs(30))).await { +/// Ok(payload) => { /* handle payload */ } +/// Err(TaskError::NonControl(NonControlTaskError::Timeout { step_name })) => { +/// println!("Timed out waiting for {}", step_name); +/// } +/// Err(TaskError::Control(ControlFlow::Cancelled)) => { +/// println!("Task was cancelled"); +/// } +/// Err(e) => { /* handle other errors */ } +/// } +/// ``` +#[derive(Debug, Error)] +pub enum TaskError { + /// Control flow signal - not an actual error. + /// + /// The worker will not mark the task as failed or trigger retries. + #[error("control flow: {0:?}")] + Control(ControlFlow), + + /// An actual task error (not control flow). + #[error("{0}")] + NonControl(NonControlTaskError), +} + +impl TaskError { + pub fn retryable(&self) -> bool { + match self { + TaskError::Control(_) => false, + TaskError::NonControl(e) => e.retryable(), } } } @@ -216,25 +238,31 @@ impl TaskError { .and_then(|v| v.as_str()) .map(|s| s.to_string()) .unwrap_or_else(|| error_data.to_string()); - TaskError::User { + TaskError::NonControl(NonControlTaskError::User { message, error_data, - } + }) } /// Create a user error from just a message string. pub fn user_message(message: impl Into) -> Self { let message = message.into(); - TaskError::User { + TaskError::NonControl(NonControlTaskError::User { error_data: serde_json::Value::String(message.clone()), message, - } + }) } } impl From for TaskError { fn from(err: serde_json::Error) -> Self { - TaskError::Serialization(err) + TaskError::NonControl(NonControlTaskError::Serialization(err)) + } +} + +impl From for TaskError { + fn from(err: NonControlTaskError) -> Self { + TaskError::NonControl(err) } } @@ -247,7 +275,7 @@ impl TaskError { } else if is_lease_expired_error(&err) { TaskError::Control(ControlFlow::LeaseExpired) } else { - TaskError::Database(err) + TaskError::NonControl(NonControlTaskError::Database(err)) } } } @@ -280,60 +308,67 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue { "message": err.to_string(), }) } - TaskError::Timeout { step_name } => { + TaskError::NonControl(inner) => serialize_non_control_task_error(inner), + } +} + +/// Serialize a NonControlTaskError for storage in fail_run +pub fn serialize_non_control_task_error(err: &NonControlTaskError) -> JsonValue { + match err { + NonControlTaskError::Timeout { step_name } => { serde_json::json!({ "name": "Timeout", "message": err.to_string(), "step_name": step_name, }) } - TaskError::Database(e) => { + NonControlTaskError::Database(e) => { serde_json::json!({ "name": "Database", "message": e.to_string(), }) } - TaskError::Serialization(e) => { + NonControlTaskError::Serialization(e) => { serde_json::json!({ "name": "Serialization", "message": e.to_string(), }) } - TaskError::SubtaskSpawnFailed { name, error } => { + NonControlTaskError::SubtaskSpawnFailed { name, error } => { serde_json::json!({ "name": "SubtaskSpawnFailed", "message": error.to_string(), "subtask_name": name, }) } - TaskError::EmitEventFailed { event_name, error } => { + NonControlTaskError::EmitEventFailed { event_name, error } => { serde_json::json!({ "name": "EmitEventFailed", "message": error.to_string(), "event_name": event_name, }) } - TaskError::ChildFailed { step_name, message } => { + NonControlTaskError::ChildFailed { step_name, message } => { serde_json::json!({ "name": "ChildFailed", "message": message, "step_name": step_name, }) } - TaskError::ChildCancelled { step_name } => { + NonControlTaskError::ChildCancelled { step_name } => { serde_json::json!({ "name": "ChildCancelled", "message": err.to_string(), "step_name": step_name, }) } - TaskError::Validation { message } => { + NonControlTaskError::Validation { message } => { serde_json::json!({ "name": "Validation", "message": message, }) } - TaskError::User { + NonControlTaskError::User { message, error_data, } => { @@ -343,14 +378,14 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue { "error_data": error_data, }) } - TaskError::Step { base_name, error } => { + NonControlTaskError::Step { base_name, error } => { serde_json::json!({ "name": "Step", "base_name": base_name, "message": error.to_string(), }) } - TaskError::TaskPanicked { message } => { + NonControlTaskError::TaskPanicked { message } => { serde_json::json!({ "name": "TaskPanicked", "message": message, diff --git a/src/heartbeat.rs b/src/heartbeat.rs index b0543c5..1ab8fc7 100644 --- a/src/heartbeat.rs +++ b/src/heartbeat.rs @@ -4,7 +4,7 @@ use std::time::Duration; use async_trait::async_trait; use uuid::Uuid; -use crate::error::{TaskError, TaskResult}; +use crate::error::{NonControlTaskError, TaskError, TaskResult}; use crate::worker::LeaseExtender; /// Trait for extending task leases during long-running operations. @@ -64,9 +64,9 @@ impl Heartbeater for HeartbeatHandle { let extend_by = duration.unwrap_or(self.claim_timeout); if extend_by < Duration::from_secs(1) { - return Err(TaskError::Validation { + return Err(TaskError::NonControl(NonControlTaskError::Validation { message: "heartbeat duration must be at least 1 second".to_string(), - }); + })); } let query = "SELECT durable.extend_claim($1, $2, $3)"; diff --git a/src/lib.rs b/src/lib.rs index 3413829..557c4c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,7 +112,9 @@ mod worker; pub use client::{Durable, DurableBuilder}; pub use context::TaskContext; pub use cron::{ScheduleFilter, ScheduleInfo, ScheduleOptions, setup_pgcron}; -pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult}; +pub use error::{ + ControlFlow, DurableError, DurableResult, NonControlTaskError, TaskError, TaskResult, +}; pub use heartbeat::{HeartbeatHandle, Heartbeater, NoopHeartbeater, StepState}; pub use task::{ErasedTask, Task, TaskWrapper}; pub use types::{ diff --git a/src/worker.rs b/src/worker.rs index 42df8ea..e9002e4 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use crate::Durable; use crate::context::TaskContext; -use crate::error::{ControlFlow, TaskError, serialize_task_error}; +use crate::error::{ControlFlow, NonControlTaskError, TaskError, serialize_task_error}; use crate::types::{ClaimedTask, ClaimedTaskRow, WorkerOptions}; /// Notifies the worker that the lease has been extended. @@ -327,9 +327,9 @@ impl Worker { durable.queue_name(), task.task_id, task.run_id, - &TaskError::Validation { + &TaskError::NonControl(NonControlTaskError::Validation { message: format!("Unknown task: {}", task.task_name), - }, + }), ) .await; return; @@ -413,7 +413,7 @@ impl Worker { Err(e) => { let message = format!("Task {} panicked: {}", task_label, e); tracing::error!("{}", message); - Some(Err(TaskError::TaskPanicked { message })) + Some(Err(TaskError::NonControl(NonControlTaskError::TaskPanicked { message }))) } } }