From c33d4dcf0b88a62d40f4ae2f1d3a443740427b5d Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 11 Mar 2026 17:08:16 -0700 Subject: [PATCH 1/8] fix: separate infrastructure vs user code error handling in runtime and step handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Transient network errors (ECONNRESET, etc.) during infrastructure calls (event listing, event creation) were caught by a shared try/catch that also handles user code errors, incorrectly marking runs as run_failed or steps as step_failed instead of letting the queue redeliver. - runtime.ts: Move infrastructure calls outside the user-code try/catch so errors propagate to the queue handler for automatic retry - step-handler.ts: Same structural separation — only stepFn.apply() is wrapped in the try/catch that produces step_failed/step_retrying - helpers.ts: Add isTransientNetworkError() and update withServerErrorRetry to retry network errors in addition to 5xx responses - helpers.test.ts: Add tests for network error detection and retry --- packages/core/src/runtime.ts | 437 ++++++++++------------ packages/core/src/runtime/helpers.test.ts | 99 ++++- packages/core/src/runtime/helpers.ts | 72 +++- packages/core/src/runtime/step-handler.ts | 308 +++++++-------- 4 files changed, 507 insertions(+), 409 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ad3df0059a..79944ecd0e 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -12,10 +12,8 @@ import { runtimeLogger } from './logger.js'; import { getAllWorkflowRunEvents, getQueueOverhead, - getWorkflowQueueName, handleHealthCheckMessage, parseHealthCheckPayload, - queueMessage, withHealthCheck, withThrottleRetry, } from './runtime/helpers.js'; @@ -25,7 +23,6 @@ import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { linkToCurrentContext, - serializeTraceCarrier, trace, withTraceContext, withWorkflowBaggage, @@ -101,7 +98,6 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, - serverErrorRetryCount, } = WorkflowInvokePayloadSchema.parse(message_); // Extract the workflow name from the topic name const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); @@ -139,110 +135,117 @@ export function workflowEntrypoint( return await withThrottleRetry(async () => { let workflowStartedAt = -1; let workflowRun = await world.runs.get(runId); - try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { + // --- Infrastructure: prepare the run state --- + // Errors here (network failures, server errors) propagate to the + // queue handler, which returns a non-200 response so the message + // is redelivered and the workflow retries automatically. + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); } - workflowStartedAt = +workflowRun.startedAt; + workflowRun = result.run; + } - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); + } + workflowStartedAt = +workflowRun.startedAt; - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } - ); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - return; - } + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // Load all events into memory before running - const events = await getAllWorkflowRunEvents( - workflowRun.runId - ); + return; + } - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + // Load all events into memory before running + const events = await getAllWorkflowRunEvents( + workflowRun.runId + ); - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); - - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent - ); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.info( - 'Wait already completed, skipping', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - } - ); - continue; - } - throw err; + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); + + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); + + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + try { + const result = await world.events.create( + runId, + waitEvent + ); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.info('Wait already completed, skipping', { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + }); + continue; } + throw err; } + } - const result = await trace( + // --- User code execution --- + // Only errors from runWorkflow() (user workflow code) should + // produce run_failed. Infrastructure errors (network, server) + // must propagate to the queue handler for automatic retry. + let workflowResult: unknown; + try { + workflowResult = await trace( 'workflow.replay', {}, async (replaySpan) => { @@ -263,39 +266,8 @@ export function workflowEntrypoint( ); } ); - - // Complete the workflow run via event (event-sourced architecture) - try { - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); - } catch (err) { - if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) - ) { - runtimeLogger.warn( - 'Tried completing workflow run, but run has already finished.', - { - workflowRunId: runId, - message: err.message, - } - ); - return; - } else { - throw err; - } - } - - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); } catch (err) { + // WorkflowSuspension is normal control flow — not an error if (WorkflowSuspension.is(err)) { const suspensionMessage = buildWorkflowSuspensionMessage( runId, @@ -317,125 +289,126 @@ export function workflowEntrypoint( if (result.timeoutSeconds !== undefined) { return { timeoutSeconds: result.timeoutSeconds }; } - } else { - // Retry server errors (5xx) with exponential backoff before failing the run - if ( - WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500 - ) { - const retryCount = serverErrorRetryCount ?? 0; - const delaySecondSteps = [5, 30, 120]; // 5s, 30s, 120s - if (retryCount < delaySecondSteps.length) { - runtimeLogger.warn( - 'Server error (5xx), re-enqueueing workflow with backoff', - { - workflowRunId: runId, - retryCount, - delaySeconds: delaySecondSteps[retryCount], - error: err.message, - } - ); - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId, - serverErrorRetryCount: retryCount + 1, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { delaySeconds: delaySecondSteps[retryCount] } - ); - return; // Don't fail the run, retry later - } - // Fall through to run_failed after exhausting retries - } else if ( - WorkflowAPIError.is(err) && - err.status === 429 - ) { - // Throw to let withThrottleRetry handle it - throw err; - } - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + // Suspension handled, no further work needed + return; + } - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); - } + if (WorkflowAPIError.is(err) && err.status === 429) { + // Throw to let withThrottleRetry handle it + throw err; + } - const normalizedError = await normalizeUnknownError(err); - const errorName = - normalizedError.name || getErrorName(err); - const errorMessage = normalizedError.message; - let errorStack = - normalizedError.stack || getErrorStack(err); - - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = - parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + // This is a user code error or a WorkflowRuntimeError + // (e.g., corrupted event log). Fail the workflow run. - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + const normalizedError = await normalizeUnknownError(err); + const errorName = normalizedError.name || getErrorName(err); + const errorMessage = normalizedError.message; + let errorStack = + normalizedError.stack || getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( errorStack, - }); + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); - // Fail the workflow run via event (event-sourced architecture) - try { - await world.events.create(runId, { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them + // Fail the workflow run via event (event-sourced architecture) + try { + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, }, + // TODO: include error codes when we define them + }, + }); + } catch (failErr) { + if ( + WorkflowAPIError.is(failErr) && + (failErr.status === 409 || failErr.status === 410) + ) { + runtimeLogger.warn( + 'Tried failing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: failErr.message, + } + ); + span?.setAttributes({ + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), }); - } catch (err) { - if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) - ) { - runtimeLogger.warn( - 'Tried failing workflow run, but run has already finished.', - { - workflowRunId: runId, - message: err.message, - } - ); - span?.setAttributes({ - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), - }); - return; - } else { - throw err; - } + return; + } else { + throw failErr; } + } - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), - }); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); + return; + } + + // --- Infrastructure: complete the run --- + // This is outside the user-code try/catch so that failures + // here (e.g., network errors) propagate to the queue handler. + try { + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: workflowResult, + }, + }); + } catch (err) { + if ( + WorkflowAPIError.is(err) && + (err.status === 409 || err.status === 410) + ) { + runtimeLogger.warn( + 'Tried completing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: err.message, + } + ); + return; + } else { + throw err; } } + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); }); // End withThrottleRetry } ); // End trace diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index 3a69dc54e1..49e1532325 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -2,6 +2,7 @@ import { WorkflowAPIError } from '@workflow/errors'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { getWorkflowQueueName, + isTransientNetworkError, withServerErrorRetry, withThrottleRetry, } from './helpers.js'; @@ -167,7 +168,7 @@ describe('withServerErrorRetry', () => { expect(fn).toHaveBeenCalledTimes(1); }); - it('should not retry non-WorkflowAPIError errors', async () => { + it('should not retry non-WorkflowAPIError, non-network errors', async () => { const error = new Error('some other error'); const fn = vi.fn().mockRejectedValue(error); @@ -185,6 +186,102 @@ describe('withServerErrorRetry', () => { await expect(withServerErrorRetry(fn)).rejects.toThrow('Too Many Requests'); expect(fn).toHaveBeenCalledTimes(1); }); + + it('should retry on transient network errors (e.g., ECONNRESET)', async () => { + const networkError = makeNetworkError('ECONNRESET'); + + const fn = vi + .fn() + .mockRejectedValueOnce(networkError) + .mockResolvedValueOnce('recovered'); + + const promise = withServerErrorRetry(fn); + await vi.advanceTimersByTimeAsync(500); + const result = await promise; + + expect(result).toBe('recovered'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('should throw after exhausting retries on persistent network errors', async () => { + const networkError = makeNetworkError('ECONNRESET'); + + const fn = vi.fn().mockRejectedValue(networkError); + + const promise = withServerErrorRetry(fn).catch((e) => e); + + await vi.advanceTimersByTimeAsync(500); + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(2000); + + const result = await promise; + expect(result).toBeInstanceOf(TypeError); + expect((result as TypeError).message).toBe('fetch failed'); + expect(fn).toHaveBeenCalledTimes(4); + }); +}); + +/** + * Creates a TypeError that mimics what Node.js `fetch()` throws on network + * errors — `TypeError: fetch failed` with a `cause` carrying the error code. + */ +function makeNetworkError(code: string): TypeError { + const cause = new Error(`${code}`); + (cause as any).code = code; + const error = new TypeError('fetch failed'); + (error as any).cause = cause; + return error; +} + +describe('isTransientNetworkError', () => { + it('should return true for TypeError with ECONNRESET cause', () => { + expect(isTransientNetworkError(makeNetworkError('ECONNRESET'))).toBe(true); + }); + + it('should return true for TypeError with ECONNREFUSED cause', () => { + expect(isTransientNetworkError(makeNetworkError('ECONNREFUSED'))).toBe( + true + ); + }); + + it('should return true for TypeError with ENOTFOUND cause', () => { + expect(isTransientNetworkError(makeNetworkError('ENOTFOUND'))).toBe(true); + }); + + it('should return true for error with code directly on it (undici style)', () => { + const error = new Error('socket error'); + (error as any).code = 'UND_ERR_SOCKET'; + expect(isTransientNetworkError(error)).toBe(true); + }); + + it('should return false for regular Error', () => { + expect(isTransientNetworkError(new Error('something failed'))).toBe(false); + }); + + it('should return false for WorkflowAPIError', () => { + expect( + isTransientNetworkError( + new WorkflowAPIError('Internal Server Error', { status: 500 }) + ) + ).toBe(false); + }); + + it('should return false for non-Error values', () => { + expect(isTransientNetworkError('string error')).toBe(false); + expect(isTransientNetworkError(null)).toBe(false); + expect(isTransientNetworkError(undefined)).toBe(false); + expect(isTransientNetworkError(42)).toBe(false); + }); + + it('should return false for TypeError without a cause', () => { + expect(isTransientNetworkError(new TypeError('not a network error'))).toBe( + false + ); + }); + + it('should return false for error with non-transient code', () => { + expect(isTransientNetworkError(makeNetworkError('ENOENT'))).toBe(false); + }); }); describe('withThrottleRetry', () => { diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 47abd74bb1..418f065f5d 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -472,8 +472,54 @@ export async function withThrottleRetry( } /** - * Retries a function when it throws a 5xx WorkflowAPIError. - * Used to handle transient workflow-server errors without consuming step attempts. + * Known transient network error codes produced by Node.js / undici when + * a TCP connection is interrupted or unreachable. These are the same codes + * that undici's RetryAgent retries for non-POST methods by default. + */ +const TRANSIENT_NETWORK_ERROR_CODES = new Set([ + 'ECONNRESET', + 'ECONNREFUSED', + 'ENOTFOUND', + 'ENETDOWN', + 'ENETUNREACH', + 'EHOSTDOWN', + 'EHOSTUNREACH', + 'EPIPE', + 'UND_ERR_SOCKET', +]); + +/** + * Detects transient network errors thrown by `fetch()` / undici when the + * underlying TCP connection fails (e.g., ECONNRESET, DNS failures). + * + * These errors are **not** `WorkflowAPIError` instances because they occur + * before an HTTP response is received. Node's `fetch()` wraps them as + * `TypeError: fetch failed` with a `cause` carrying the original error code. + * + * This helper is used to distinguish infrastructure-level network failures + * (which should be retried) from user-code errors (which should fail the run). + */ +export function isTransientNetworkError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + + // Node fetch wraps network errors as `TypeError` with a `cause` + const cause = (err as Error & { cause?: Error }).cause; + if (cause && typeof (cause as any).code === 'string') { + return TRANSIENT_NETWORK_ERROR_CODES.has((cause as any).code); + } + + // Direct undici errors may carry the code on the error itself + if (typeof (err as any).code === 'string') { + return TRANSIENT_NETWORK_ERROR_CODES.has((err as any).code); + } + + return false; +} + +/** + * Retries a function when it throws a 5xx WorkflowAPIError or a transient + * network error (e.g., ECONNRESET). Used to handle transient workflow-server + * errors without consuming step attempts. * * Retries up to 3 times with exponential backoff (500ms, 1s, 2s ≈ 3.5s total). * If all retries fail, the original error is re-thrown. @@ -486,20 +532,24 @@ export async function withServerErrorRetry( try { return await fn(); } catch (err) { - if ( - WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500 && - attempt < delays.length - ) { + const isRetryable = + (WorkflowAPIError.is(err) && + err.status !== undefined && + err.status >= 500) || + isTransientNetworkError(err); + + if (isRetryable && attempt < delays.length) { runtimeLogger.warn( - 'Server error (5xx) from workflow-server, retrying in-process', + 'Server error from workflow-server, retrying in-process', { - status: err.status, + status: WorkflowAPIError.is(err) ? err.status : undefined, + networkErrorCode: isTransientNetworkError(err) + ? ((err as any).cause?.code ?? (err as any).code) + : undefined, attempt: attempt + 1, maxRetries: delays.length, nextDelayMs: delays[attempt], - url: err.url, + url: WorkflowAPIError.is(err) ? err.url : undefined, } ); await new Promise((resolve) => setTimeout(resolve, delays[attempt])); diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 2938077641..86e8253321 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -291,53 +291,61 @@ const stepHandler = getWorldHandlers().createQueueHandler( return; } - try { - // step_started already validated the step is in valid state (pending/running) - // and returned the updated step entity with incremented attempt + // --- Infrastructure: prepare step input --- + // Errors here propagate to the queue handler for automatic retry. + // step_started already validated the step is in valid state (pending/running) + // and returned the updated step entity with incremented attempt - // step.attempt is now the current attempt number (after increment) - const attempt = step.attempt; + // step.attempt is now the current attempt number (after increment) + const attempt = step.attempt; - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` + if (!step.startedAt) { + throw new WorkflowRuntimeError( + `Step "${stepId}" has no "startedAt" timestamp` + ); + } + // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) + const stepStartedAt = step.startedAt; + + // Hydrate the step input arguments, closure variables, and thisVal + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const ops: Promise[] = []; + const rawKey = await world.getEncryptionKeyForRun?.(workflowRunId); + const encryptionKey = rawKey ? await importKey(rawKey) : undefined; + const hydratedInput = await trace( + 'step.hydrate', + {}, + async (hydrateSpan) => { + const startTime = Date.now(); + const result = await hydrateStepArguments( + step.input, + workflowRunId, + encryptionKey, + ops ); + const durationMs = Date.now() - startTime; + hydrateSpan?.setAttributes({ + ...Attribute.StepArgumentsCount(result.args.length), + ...Attribute.QueueDeserializeTimeMs(durationMs), + }); + return result; } - // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) - const stepStartedAt = step.startedAt; - - // Hydrate the step input arguments, closure variables, and thisVal - // NOTE: This captures only the synchronous portion of hydration. Any async - // operations (e.g., stream loading) are added to `ops` and executed later - // via Promise.all(ops) - their timing is not included in this measurement. - const ops: Promise[] = []; - const rawKey = await world.getEncryptionKeyForRun?.(workflowRunId); - const encryptionKey = rawKey ? await importKey(rawKey) : undefined; - const hydratedInput = await trace( - 'step.hydrate', - {}, - async (hydrateSpan) => { - const startTime = Date.now(); - const result = await hydrateStepArguments( - step.input, - workflowRunId, - encryptionKey, - ops - ); - const durationMs = Date.now() - startTime; - hydrateSpan?.setAttributes({ - ...Attribute.StepArgumentsCount(result.args.length), - ...Attribute.QueueDeserializeTimeMs(durationMs), - }); - return result; - } - ); + ); - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; - // Execute the step function with tracing - const executionStartTime = Date.now(); + // --- User code execution --- + // Only errors from stepFn.apply() (user step code) should produce + // step_failed/step_retrying. Infrastructure errors (network, server) + // must propagate to the queue handler for automatic retry. + let userCodeError: unknown; + let userCodeFailed = false; + + const executionStartTime = Date.now(); + try { result = await trace('step.execute', {}, async () => { return await contextStorage.run( { @@ -364,93 +372,19 @@ const stepHandler = getWorldHandlers().createQueueHandler( () => stepFn.apply(thisVal, args) ); }); - const executionTimeMs = Date.now() - executionStartTime; - - span?.setAttributes({ - ...Attribute.QueueExecutionTimeMs(executionTimeMs), - }); - - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - result = await trace( - 'step.dehydrate', - {}, - async (dehydrateSpan) => { - const startTime = Date.now(); - const dehydrated = await dehydrateStepReturnValue( - result, - workflowRunId, - encryptionKey, - ops - ); - const durationMs = Date.now() - startTime; - dehydrateSpan?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(durationMs), - ...Attribute.StepResultType(typeof dehydrated), - }); - return dehydrated; - } - ); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - // Run step_completed and trace serialization concurrently; - // the trace carrier is used in the final queueMessage call below - let stepCompleted409 = false; - const [, traceCarrier] = await Promise.all([ - withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }) - ).catch((err) => { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.warn( - 'Tried completing step, but step has already finished.', - { - workflowRunId, - stepId, - stepName, - message: err.message, - } - ); - stepCompleted409 = true; - return; - } - throw err; - }), - serializeTraceCarrier(), - ]); - - if (stepCompleted409) { - return; - } + } catch (err) { + userCodeError = err; + userCodeFailed = true; + } + const executionTimeMs = Date.now() - executionStartTime; - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); - // Queue the workflow continuation with the concurrently-resolved trace carrier - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier, - requestedAt: new Date(), - }); - return; - } catch (err: unknown) { + // --- Handle user code errors --- + if (userCodeFailed) { + const err = userCodeError; const normalizedError = await normalizeUnknownError(err); const normalizedStack = normalizedError.stack || getErrorStack(err) || ''; @@ -477,44 +411,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.ErrorRetryable(!isFatal), }); - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - stepLogger.info( - 'Workflow run already completed, skipping step', - { - workflowRunId, - stepId, - message: err.message, - } - ); - return; - } - - // Server errors (5xx) from workflow-server are treated as persistent - // infrastructure issues. The withServerErrorRetry wrapper already - // retried the call a few times; if we still have a 5xx here it's - // likely persistent. Re-throw so the queue can retry the job and - // re-invoke this handler. Note: by the time we reach this point, - // step_started has already run and incremented step.attempt, and a - // subsequent queue retry may increment attempts again depending on - // storage semantics, so these retries are not guaranteed to be - // "free" with respect to step attempts. - if (err.status !== undefined && err.status >= 500) { - runtimeLogger.warn( - 'Persistent server error (5xx) during step, deferring to queue retry', - { - status: err.status, - workflowRunId, - stepId, - error: err.message, - url: err.url, - } - ); - throw err; - } - } - if (isFatal) { stepLogger.error( 'Encountered FatalError while executing step, bubbling up to parent workflow', @@ -701,11 +597,93 @@ const stepHandler = getWorldHandlers().createQueueHandler( return { timeoutSeconds }; } } + + // Re-invoke the workflow to handle the failed/retrying step + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + + // --- Infrastructure: complete the step --- + // Errors here (network failures, server errors) propagate to the + // queue handler for automatic retry. + + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + result = await trace('step.dehydrate', {}, async (dehydrateSpan) => { + const startTime = Date.now(); + const dehydrated = await dehydrateStepReturnValue( + result, + workflowRunId, + encryptionKey, + ops + ); + const durationMs = Date.now() - startTime; + dehydrateSpan?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(durationMs), + ...Attribute.StepResultType(typeof dehydrated), + }); + return dehydrated; + }); + + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) + ); + + // Run step_completed and trace serialization concurrently; + // the trace carrier is used in the final queueMessage call below + let stepCompleted409 = false; + const [, traceCarrier] = await Promise.all([ + withServerErrorRetry(() => + world.events.create(workflowRunId, { + eventType: 'step_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + result: result as Uint8Array, + }, + }) + ).catch((err) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.warn( + 'Tried completing step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: err.message, + } + ); + stepCompleted409 = true; + return; + } + throw err; + }), + serializeTraceCarrier(), + ]); + + if (stepCompleted409) { + return; } + span?.setAttributes({ + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), + }); + + // Queue the workflow continuation with the concurrently-resolved trace carrier await queueMessage(world, getWorkflowQueueName(workflowName), { runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), + traceCarrier, requestedAt: new Date(), }); } From 5ad7beed78f7897655d753d6e683c9bd0f1d4fb6 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 11 Mar 2026 17:08:40 -0700 Subject: [PATCH 2/8] add changeset --- .changeset/fix-infra-error-handling.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-infra-error-handling.md diff --git a/.changeset/fix-infra-error-handling.md b/.changeset/fix-infra-error-handling.md new file mode 100644 index 0000000000..2f8fe4cdec --- /dev/null +++ b/.changeset/fix-infra-error-handling.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Separate infrastructure vs user code error handling in workflow and step runtimes so transient network errors (ECONNRESET, etc.) propagate to the queue for retry instead of incorrectly marking runs as failed From 9cbebd33d7578534be49d216b7030b9d797558da Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 11 Mar 2026 17:20:47 -0700 Subject: [PATCH 3/8] remove withServerErrorRetry and isTransientNetworkError Redundant with undici RetryAgent which already handles 5xx retries and network error retries at the HTTP dispatcher level. --- packages/core/src/runtime/helpers.test.ts | 204 +----------------- packages/core/src/runtime/helpers.ts | 90 -------- .../core/src/runtime/step-handler.test.ts | 5 +- packages/core/src/runtime/step-handler.ts | 115 +++++----- 4 files changed, 56 insertions(+), 358 deletions(-) diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index 49e1532325..a1f12711c6 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -1,11 +1,6 @@ import { WorkflowAPIError } from '@workflow/errors'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - getWorkflowQueueName, - isTransientNetworkError, - withServerErrorRetry, - withThrottleRetry, -} from './helpers.js'; +import { getWorkflowQueueName, withThrottleRetry } from './helpers.js'; // Mock the logger to suppress output during tests vi.mock('../logger.js', () => ({ @@ -87,203 +82,6 @@ describe('getWorkflowQueueName', () => { }); }); -describe('withServerErrorRetry', () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - - it('should return the result on success', async () => { - const fn = vi.fn().mockResolvedValue('ok'); - const result = await withServerErrorRetry(fn); - expect(result).toBe('ok'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should retry on 5xx WorkflowAPIError and succeed', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Internal Server Error', { status: 500 }) - ) - .mockResolvedValueOnce('recovered'); - - const promise = withServerErrorRetry(fn); - await vi.advanceTimersByTimeAsync(500); - const result = await promise; - - expect(result).toBe('recovered'); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should retry up to 3 times with exponential backoff', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 502 })) - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 503 })) - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 500 })) - .mockResolvedValueOnce('finally'); - - const promise = withServerErrorRetry(fn); - - // First retry after 500ms - await vi.advanceTimersByTimeAsync(500); - // Second retry after 1000ms - await vi.advanceTimersByTimeAsync(1000); - // Third retry after 2000ms - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBe('finally'); - expect(fn).toHaveBeenCalledTimes(4); - }); - - it('should throw after exhausting all retries', async () => { - const error = new WorkflowAPIError('server down', { status: 500 }); - const fn = vi.fn().mockRejectedValue(error); - - const promise = withServerErrorRetry(fn).catch((e) => e); - - // Advance through all 3 retry delays - await vi.advanceTimersByTimeAsync(500); - await vi.advanceTimersByTimeAsync(1000); - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBeInstanceOf(WorkflowAPIError); - expect(result.message).toBe('server down'); - // 1 initial + 3 retries = 4 total calls - expect(fn).toHaveBeenCalledTimes(4); - }); - - it('should not retry non-5xx WorkflowAPIErrors', async () => { - const error = new WorkflowAPIError('Not Found', { status: 404 }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('Not Found'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should not retry non-WorkflowAPIError, non-network errors', async () => { - const error = new Error('some other error'); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('some other error'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should not retry 429 errors (handled by withThrottleRetry)', async () => { - const error = new WorkflowAPIError('Too Many Requests', { - status: 429, - retryAfter: 5, - }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('Too Many Requests'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should retry on transient network errors (e.g., ECONNRESET)', async () => { - const networkError = makeNetworkError('ECONNRESET'); - - const fn = vi - .fn() - .mockRejectedValueOnce(networkError) - .mockResolvedValueOnce('recovered'); - - const promise = withServerErrorRetry(fn); - await vi.advanceTimersByTimeAsync(500); - const result = await promise; - - expect(result).toBe('recovered'); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should throw after exhausting retries on persistent network errors', async () => { - const networkError = makeNetworkError('ECONNRESET'); - - const fn = vi.fn().mockRejectedValue(networkError); - - const promise = withServerErrorRetry(fn).catch((e) => e); - - await vi.advanceTimersByTimeAsync(500); - await vi.advanceTimersByTimeAsync(1000); - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBeInstanceOf(TypeError); - expect((result as TypeError).message).toBe('fetch failed'); - expect(fn).toHaveBeenCalledTimes(4); - }); -}); - -/** - * Creates a TypeError that mimics what Node.js `fetch()` throws on network - * errors — `TypeError: fetch failed` with a `cause` carrying the error code. - */ -function makeNetworkError(code: string): TypeError { - const cause = new Error(`${code}`); - (cause as any).code = code; - const error = new TypeError('fetch failed'); - (error as any).cause = cause; - return error; -} - -describe('isTransientNetworkError', () => { - it('should return true for TypeError with ECONNRESET cause', () => { - expect(isTransientNetworkError(makeNetworkError('ECONNRESET'))).toBe(true); - }); - - it('should return true for TypeError with ECONNREFUSED cause', () => { - expect(isTransientNetworkError(makeNetworkError('ECONNREFUSED'))).toBe( - true - ); - }); - - it('should return true for TypeError with ENOTFOUND cause', () => { - expect(isTransientNetworkError(makeNetworkError('ENOTFOUND'))).toBe(true); - }); - - it('should return true for error with code directly on it (undici style)', () => { - const error = new Error('socket error'); - (error as any).code = 'UND_ERR_SOCKET'; - expect(isTransientNetworkError(error)).toBe(true); - }); - - it('should return false for regular Error', () => { - expect(isTransientNetworkError(new Error('something failed'))).toBe(false); - }); - - it('should return false for WorkflowAPIError', () => { - expect( - isTransientNetworkError( - new WorkflowAPIError('Internal Server Error', { status: 500 }) - ) - ).toBe(false); - }); - - it('should return false for non-Error values', () => { - expect(isTransientNetworkError('string error')).toBe(false); - expect(isTransientNetworkError(null)).toBe(false); - expect(isTransientNetworkError(undefined)).toBe(false); - expect(isTransientNetworkError(42)).toBe(false); - }); - - it('should return false for TypeError without a cause', () => { - expect(isTransientNetworkError(new TypeError('not a network error'))).toBe( - false - ); - }); - - it('should return false for error with non-transient code', () => { - expect(isTransientNetworkError(makeNetworkError('ENOENT'))).toBe(false); - }); -}); - describe('withThrottleRetry', () => { beforeEach(() => { vi.useFakeTimers(); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 418f065f5d..acf5a2247e 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -470,93 +470,3 @@ export async function withThrottleRetry( throw err; } } - -/** - * Known transient network error codes produced by Node.js / undici when - * a TCP connection is interrupted or unreachable. These are the same codes - * that undici's RetryAgent retries for non-POST methods by default. - */ -const TRANSIENT_NETWORK_ERROR_CODES = new Set([ - 'ECONNRESET', - 'ECONNREFUSED', - 'ENOTFOUND', - 'ENETDOWN', - 'ENETUNREACH', - 'EHOSTDOWN', - 'EHOSTUNREACH', - 'EPIPE', - 'UND_ERR_SOCKET', -]); - -/** - * Detects transient network errors thrown by `fetch()` / undici when the - * underlying TCP connection fails (e.g., ECONNRESET, DNS failures). - * - * These errors are **not** `WorkflowAPIError` instances because they occur - * before an HTTP response is received. Node's `fetch()` wraps them as - * `TypeError: fetch failed` with a `cause` carrying the original error code. - * - * This helper is used to distinguish infrastructure-level network failures - * (which should be retried) from user-code errors (which should fail the run). - */ -export function isTransientNetworkError(err: unknown): boolean { - if (!(err instanceof Error)) return false; - - // Node fetch wraps network errors as `TypeError` with a `cause` - const cause = (err as Error & { cause?: Error }).cause; - if (cause && typeof (cause as any).code === 'string') { - return TRANSIENT_NETWORK_ERROR_CODES.has((cause as any).code); - } - - // Direct undici errors may carry the code on the error itself - if (typeof (err as any).code === 'string') { - return TRANSIENT_NETWORK_ERROR_CODES.has((err as any).code); - } - - return false; -} - -/** - * Retries a function when it throws a 5xx WorkflowAPIError or a transient - * network error (e.g., ECONNRESET). Used to handle transient workflow-server - * errors without consuming step attempts. - * - * Retries up to 3 times with exponential backoff (500ms, 1s, 2s ≈ 3.5s total). - * If all retries fail, the original error is re-thrown. - */ -export async function withServerErrorRetry( - fn: () => Promise -): Promise { - const delays = [500, 1000, 2000]; - for (let attempt = 0; attempt <= delays.length; attempt++) { - try { - return await fn(); - } catch (err) { - const isRetryable = - (WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500) || - isTransientNetworkError(err); - - if (isRetryable && attempt < delays.length) { - runtimeLogger.warn( - 'Server error from workflow-server, retrying in-process', - { - status: WorkflowAPIError.is(err) ? err.status : undefined, - networkErrorCode: isTransientNetworkError(err) - ? ((err as any).cause?.code ?? (err as any).code) - : undefined, - attempt: attempt + 1, - maxRetries: delays.length, - nextDelayMs: delays[attempt], - url: WorkflowAPIError.is(err) ? err.url : undefined, - } - ); - await new Promise((resolve) => setTimeout(resolve, delays[attempt])); - continue; - } - throw err; - } - } - throw new Error('withServerErrorRetry: unreachable'); -} diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index bd0c1033c8..3179903e94 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1,5 +1,5 @@ import { FatalError, WorkflowAPIError } from '@workflow/errors'; -import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; // Use vi.hoisted so these are available in mock factories const { @@ -96,7 +96,6 @@ vi.mock('./helpers.js', async () => { withHealthCheck: (handler: unknown) => handler, parseHealthCheckPayload: vi.fn().mockReturnValue(null), handleHealthCheckMessage: vi.fn(), - withServerErrorRetry: async (fn: () => Promise) => fn(), }; }); @@ -145,9 +144,9 @@ vi.mock('@workflow/utils/get-port', () => ({ import './step-handler.js'; import { getStepFunction } from '../private.js'; import { - normalizeUnknownError, getErrorName, getErrorStack, + normalizeUnknownError, } from '../types.js'; import { getWorld } from './world.js'; diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 86e8253321..56921d8610 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -36,7 +36,6 @@ import { parseHealthCheckPayload, queueMessage, withHealthCheck, - withServerErrorRetry, } from './helpers.js'; import { getWorld, getWorldHandlers } from './world.js'; @@ -119,13 +118,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( // - Workflow still active (returns 410 if completed) let step; try { - const startResult = await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }) - ); + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); if (!startResult.step) { throw new WorkflowRuntimeError( @@ -422,17 +419,15 @@ const stepHandler = getWorldHandlers().createQueueHandler( ); // Fail the step via event (event-sourced architecture) try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: normalizedError.message, - stack: normalizedStack, - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: normalizedError.message, + stack: normalizedStack, + }, + }); } catch (stepFailErr) { if ( WorkflowAPIError.is(stepFailErr) && @@ -483,17 +478,15 @@ const stepHandler = getWorldHandlers().createQueueHandler( const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${normalizedError.message}`; // Fail the step via event (event-sourced architecture) try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: errorMessage, - stack: normalizedStack, - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: normalizedStack, + }, + }); } catch (stepFailErr) { if ( WorkflowAPIError.is(stepFailErr) && @@ -540,20 +533,18 @@ const stepHandler = getWorldHandlers().createQueueHandler( // Set step to pending for retry via event (event-sourced architecture) // step_retrying records the error and sets status to pending try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_retrying', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: normalizedError.message, - stack: normalizedStack, - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_retrying', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: normalizedError.message, + stack: normalizedStack, + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }, + }); } catch (stepRetryErr) { if ( WorkflowAPIError.is(stepRetryErr) && @@ -643,8 +634,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( // the trace carrier is used in the final queueMessage call below let stepCompleted409 = false; const [, traceCarrier] = await Promise.all([ - withServerErrorRetry(() => - world.events.create(workflowRunId, { + world.events + .create(workflowRunId, { eventType: 'step_completed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, @@ -652,22 +643,22 @@ const stepHandler = getWorldHandlers().createQueueHandler( result: result as Uint8Array, }, }) - ).catch((err) => { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.warn( - 'Tried completing step, but step has already finished.', - { - workflowRunId, - stepId, - stepName, - message: err.message, - } - ); - stepCompleted409 = true; - return; - } - throw err; - }), + .catch((err: unknown) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.warn( + 'Tried completing step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: err.message, + } + ); + stepCompleted409 = true; + return; + } + throw err; + }), serializeTraceCarrier(), ]); From ed052989a9c2c13a6d6ca87cdd8a949d6c4eea14 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 12 Mar 2026 00:26:38 -0700 Subject: [PATCH 4/8] address review feedback: move getEncryptionKeyForRun out of user-code try/catch, re-add 5xx/410 safety net in step handler, relax e2e test assertion --- packages/core/e2e/e2e.test.ts | 9 --------- packages/core/src/runtime.ts | 13 +++++++------ packages/core/src/runtime/step-handler.ts | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 72f023e15d..a109517f62 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1082,15 +1082,6 @@ describe('e2e', () => { // retryCount > 0 proves the fault injection actually triggered expect(result.retryCount).toBe(2); - - // attempt === 1 proves no step attempt was consumed by the 5xx retries - const { json: steps } = await cliInspectJson( - `steps --runId ${run.runId}` - ); - const doWorkStep = steps.find((s: any) => - s.stepName.includes('doWork') - ); - expect(doWorkStep.attempt).toBe(1); } ); }); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 79944ecd0e..aa2456223b 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -239,6 +239,13 @@ export function workflowEntrypoint( } } + // Resolve the encryption key for this run's deployment + const rawKey = + await world.getEncryptionKeyForRun?.(workflowRun); + const encryptionKey = rawKey + ? await importKey(rawKey) + : undefined; + // --- User code execution --- // Only errors from runWorkflow() (user workflow code) should // produce run_failed. Infrastructure errors (network, server) @@ -252,12 +259,6 @@ export function workflowEntrypoint( replaySpan?.setAttributes({ ...Attribute.WorkflowEventsCount(events.length), }); - // Resolve the encryption key for this run's deployment - const rawKey = - await world.getEncryptionKeyForRun?.(workflowRun); - const encryptionKey = rawKey - ? await importKey(rawKey) - : undefined; return await runWorkflow( workflowCode, workflowRun, diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 56921d8610..92624cd50e 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -382,6 +382,28 @@ const stepHandler = getWorldHandlers().createQueueHandler( // --- Handle user code errors --- if (userCodeFailed) { const err = userCodeError; + + // Infrastructure errors that somehow surfaced through user code + // should propagate to the queue handler for retry, not consume + // step attempts. + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + stepLogger.info( + 'Workflow run already completed, skipping step', + { + workflowRunId, + stepId, + message: err.message, + } + ); + return; + } + if (err.status !== undefined && err.status >= 500) { + throw err; + } + } + const normalizedError = await normalizeUnknownError(err); const normalizedStack = normalizedError.stack || getErrorStack(err) || ''; From 07032928cd415772018340799f14b654fe8d2831 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 12 Mar 2026 02:23:31 -0700 Subject: [PATCH 5/8] remove serverError5xxRetryWorkflow e2e test This test validated withServerErrorRetry's in-process retry behavior, which was removed. Queue-level retry with process-scoped fault injection is unreliable across serverless instances and too slow for e2e timeouts. --- packages/core/e2e/e2e.test.ts | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index a109517f62..0ed482940e 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1067,23 +1067,12 @@ describe('e2e', () => { expect(result.attempt).toBe(1); }); - test( - 'workflow completes despite transient 5xx on step_completed', - { timeout: 120_000 }, - async () => { - const run = await start( - await e2e('serverError5xxRetryWorkflow'), - [42] - ); - const result = await run.returnValue; - - // Correct result proves workflow completed successfully - expect(result.result).toBe(84); // 42 * 2 - - // retryCount > 0 proves the fault injection actually triggered - expect(result.retryCount).toBe(2); - } - ); + // Removed: 'workflow completes despite transient 5xx on step_completed' + // This test validated withServerErrorRetry's in-process retry behavior, + // which was removed in favor of structural error separation (infra errors + // propagate to the queue for retry). Queue-level retry with process-scoped + // fault injection is unreliable (different serverless instances) and too + // slow for a bounded e2e timeout. }); describe('catchability', () => { From 556256177e886a9983cc70d4633d1cced89ae87b Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 12 Mar 2026 02:25:17 -0700 Subject: [PATCH 6/8] remove serverError5xxRetryWorkflow and fault injection helpers from e2e workflows --- workbench/example/workflows/99_e2e.ts | 133 -------------------------- 1 file changed, 133 deletions(-) diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 19541f619c..e1d1fc9b32 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -1016,139 +1016,6 @@ export class ChainableService { // E2E test for `this` serialization with .call() and .apply() ////////////////////////////////////////////////////////// -// ============================================================ -// 5XX SERVER ERROR RETRY E2E TEST -// ============================================================ -// Tests that withServerErrorRetry in the step handler correctly -// retries transient 5xx errors from workflow-server without -// consuming step attempts. -// ============================================================ - -const FAULT_MAP_SYMBOL = Symbol.for('__test_5xx_fault_map'); -const FAULT_WRAPPER_INSTALLED_SYMBOL = Symbol.for( - '__test_5xx_fault_wrapper_installed' -); - -type FaultState = { - installStepId: string; - targetStepId?: string; - remaining: number; - triggered: number; -}; - -function shouldInjectStepCompletedFault(state: FaultState, data: any): boolean { - if (data?.eventType !== 'step_completed') return false; - - const correlationId = - typeof data?.correlationId === 'string' ? data.correlationId : null; - if (!correlationId) return false; - - // Never inject on the install step itself. This avoids flakiness when - // workflow-server transiently retries install step_completed. - if (correlationId === state.installStepId) return false; - - // Target exactly one non-install step (the first one encountered). - // Retries of that same step share correlationId and are intercepted. - state.targetStepId ??= correlationId; - - if (correlationId !== state.targetStepId) return false; - if (state.remaining <= 0) return false; - - state.remaining--; - state.triggered++; - return true; -} - -/** - * Step that installs a fault injection patch on world.events.create, - * scoped to the current runId. Only intercepts step_completed events - * for the target run; all other runs pass through unmodified. - */ -async function installServerErrorFaultInjection(failCount: number) { - 'use step'; - const { workflowRunId } = getWorkflowMetadata(); - const { stepId: installStepId } = getStepMetadata(); - const world = (globalThis as any)[Symbol.for('@workflow/world//cache')]; - - // Process-level map for run-scoped fault state - (globalThis as any)[FAULT_MAP_SYMBOL] ??= new Map(); - const faultMap = (globalThis as any)[FAULT_MAP_SYMBOL] as Map< - string, - FaultState - >; - - faultMap.set(workflowRunId, { - installStepId, - remaining: failCount, - triggered: 0, - }); - - // Install the wrapper once per process to avoid nested wrappers across runs. - if (!(world.events.create as any)[FAULT_WRAPPER_INSTALLED_SYMBOL]) { - const original = world.events.create.bind(world.events); - - const wrappedCreate = async ( - rid: string, - data: any, - ...rest: any[] - ): Promise => { - const state = faultMap.get(rid); - if (state && shouldInjectStepCompletedFault(state, data)) { - // Create an error that matches WorkflowAPIError.is() check: - // isError(value) && value.name === 'WorkflowAPIError' - const err: any = new Error('Injected 5xx'); - err.name = 'WorkflowAPIError'; - err.status = 500; - throw err; - } - - return original(rid, data, ...rest); - }; - - (wrappedCreate as any)[FAULT_WRAPPER_INSTALLED_SYMBOL] = true; - world.events.create = wrappedCreate; - } -} - -/** - * Simple step that does computation (input * 2). - * Its step_completed event will be intercepted by the fault injection. - */ -async function doWork(input: number) { - 'use step'; - return input * 2; -} - -/** - * Cleanup step that reads and clears the fault injection state for this run. - * Returns how many times the fault was triggered. - */ -async function cleanupFaultInjection() { - 'use step'; - const { workflowRunId } = getWorkflowMetadata(); - const faultMap = (globalThis as any)[FAULT_MAP_SYMBOL] as - | Map - | undefined; - const state = faultMap?.get(workflowRunId); - const triggered = state?.triggered ?? 0; - faultMap?.delete(workflowRunId); - return triggered; -} - -/** - * Workflow that exercises the 5xx retry codepath in the step handler. - * 1. Installs fault injection (scoped to this run's step_completed events) - * 2. Runs a computation step (its step_completed will fail with 5xx twice) - * 3. Cleans up and returns result + retry count for assertions - */ -export async function serverError5xxRetryWorkflow(input: number) { - 'use workflow'; - await installServerErrorFaultInjection(2); - const result = await doWork(input); - const retryCount = await cleanupFaultInjection(); - return { result, retryCount }; -} - ////////////////////////////////////////////////////////// // E2E test for `this` serialization with .call() and .apply() ////////////////////////////////////////////////////////// From 3afbd2f2201496b486555d7833edb901da6dcb38 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 12 Mar 2026 09:35:12 -0700 Subject: [PATCH 7/8] remove inline comment about deleted test --- packages/core/e2e/e2e.test.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 0ed482940e..b6f37ac035 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1066,13 +1066,6 @@ describe('e2e', () => { expect(result.failed).toBe(true); expect(result.attempt).toBe(1); }); - - // Removed: 'workflow completes despite transient 5xx on step_completed' - // This test validated withServerErrorRetry's in-process retry behavior, - // which was removed in favor of structural error separation (infra errors - // propagate to the queue for retry). Queue-level retry with process-scoped - // fault injection is unreliable (different serverless instances) and too - // slow for a bounded e2e timeout. }); describe('catchability', () => { From 84b4b5e6d528d17f9ee23b0a2b7ffa252ebffa70 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 12 Mar 2026 14:10:00 -0700 Subject: [PATCH 8/8] address review: remove withThrottleRetry, handle WorkflowRuntimeError as fatal, deduplicate check - Remove withThrottleRetry (undici RetryAgent handles 429s) - WorkflowRuntimeError in infrastructure setup now produces run_failed/step_failed instead of retrying forever via queue - Deduplicate redundant WorkflowAPIError.is check in step-handler.ts --- packages/core/src/runtime.ts | 445 +++++++++++----------- packages/core/src/runtime/helpers.test.ts | 128 +------ packages/core/src/runtime/helpers.ts | 70 +--- packages/core/src/runtime/step-handler.ts | 38 +- 4 files changed, 267 insertions(+), 414 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index aa2456223b..542898008d 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -15,7 +15,6 @@ import { handleHealthCheckMessage, parseHealthCheckPayload, withHealthCheck, - withThrottleRetry, } from './runtime/helpers.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; @@ -132,14 +131,14 @@ export function workflowEntrypoint( ...Attribute.WorkflowTracePropagated(!!traceContext), }); - return await withThrottleRetry(async () => { - let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); + let workflowStartedAt = -1; + let workflowRun = await world.runs.get(runId); - // --- Infrastructure: prepare the run state --- - // Errors here (network failures, server errors) propagate to the - // queue handler, which returns a non-200 response so the message - // is redelivered and the workflow retries automatically. + // --- Infrastructure: prepare the run state --- + // Network/server errors propagate to the queue handler for retry. + // WorkflowRuntimeError (data integrity issues) are fatal and + // produce run_failed since retrying won't fix them. + try { if (workflowRun.status === 'pending') { // Transition run to 'running' via event (event-sourced architecture) const result = await world.events.create(runId, { @@ -162,187 +161,21 @@ export function workflowEntrypoint( `Workflow run "${runId}" has no "startedAt" timestamp` ); } - workflowStartedAt = +workflowRun.startedAt; - - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); - - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } + } catch (err) { + if (err instanceof WorkflowRuntimeError) { + runtimeLogger.error( + 'Fatal runtime error during workflow setup', + { workflowRunId: runId, error: err.message } ); - - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. - - return; - } - - // Load all events into memory before running - const events = await getAllWorkflowRunEvents( - workflowRun.runId - ); - - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); - - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); - - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); - - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent - ); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.info('Wait already completed, skipping', { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - }); - continue; - } - throw err; - } - } - - // Resolve the encryption key for this run's deployment - const rawKey = - await world.getEncryptionKeyForRun?.(workflowRun); - const encryptionKey = rawKey - ? await importKey(rawKey) - : undefined; - - // --- User code execution --- - // Only errors from runWorkflow() (user workflow code) should - // produce run_failed. Infrastructure errors (network, server) - // must propagate to the queue handler for automatic retry. - let workflowResult: unknown; - try { - workflowResult = await trace( - 'workflow.replay', - {}, - async (replaySpan) => { - replaySpan?.setAttributes({ - ...Attribute.WorkflowEventsCount(events.length), - }); - return await runWorkflow( - workflowCode, - workflowRun, - events, - encryptionKey - ); - } - ); - } catch (err) { - // WorkflowSuspension is normal control flow — not an error - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount - ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } - - const result = await handleSuspension({ - suspension: err, - world, - run: workflowRun, - span, - }); - - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - - // Suspension handled, no further work needed - return; - } - - if (WorkflowAPIError.is(err) && err.status === 429) { - // Throw to let withThrottleRetry handle it - throw err; - } - - // This is a user code error or a WorkflowRuntimeError - // (e.g., corrupted event log). Fail the workflow run. - - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); - } - - const normalizedError = await normalizeUnknownError(err); - const errorName = normalizedError.name || getErrorName(err); - const errorMessage = normalizedError.message; - let errorStack = - normalizedError.stack || getErrorStack(err); - - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = - parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } - - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - - // Fail the workflow run via event (event-sourced architecture) try { await world.events.create(runId, { eventType: 'run_failed', specVersion: SPEC_VERSION_CURRENT, eventData: { error: { - message: errorMessage, - stack: errorStack, + message: err.message, + stack: err.stack, }, - // TODO: include error codes when we define them }, }); } catch (failErr) { @@ -350,67 +183,251 @@ export function workflowEntrypoint( WorkflowAPIError.is(failErr) && (failErr.status === 409 || failErr.status === 410) ) { - runtimeLogger.warn( - 'Tried failing workflow run, but run has already finished.', - { - workflowRunId: runId, - message: failErr.message, - } - ); - span?.setAttributes({ - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), - }); return; - } else { - throw failErr; } + throw failErr; } + return; + } + throw err; + } + workflowStartedAt = +workflowRun.startedAt; + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); + + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. + + return; + } + + // Load all events into memory before running + const events = await getAllWorkflowRunEvents(workflowRun.runId); + + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); + + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); + + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); + + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + try { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.info('Wait already completed, skipping', { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + }); + continue; + } + throw err; + } + } + + // Resolve the encryption key for this run's deployment + const rawKey = + await world.getEncryptionKeyForRun?.(workflowRun); + const encryptionKey = rawKey + ? await importKey(rawKey) + : undefined; + + // --- User code execution --- + // Only errors from runWorkflow() (user workflow code) should + // produce run_failed. Infrastructure errors (network, server) + // must propagate to the queue handler for automatic retry. + let workflowResult: unknown; + try { + workflowResult = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + return await runWorkflow( + workflowCode, + workflowRun, + events, + encryptionKey + ); + } + ); + } catch (err) { + // WorkflowSuspension is normal control flow — not an error + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } + + const result = await handleSuspension({ + suspension: err, + world, + run: workflowRun, + span, }); + + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + + // Suspension handled, no further work needed return; } - // --- Infrastructure: complete the run --- - // This is outside the user-code try/catch so that failures - // here (e.g., network errors) propagate to the queue handler. + // This is a user code error or a WorkflowRuntimeError + // (e.g., corrupted event log). Fail the workflow run. + + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + const normalizedError = await normalizeUnknownError(err); + const errorName = normalizedError.name || getErrorName(err); + const errorMessage = normalizedError.message; + let errorStack = normalizedError.stack || getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + + // Fail the workflow run via event (event-sourced architecture) try { await world.events.create(runId, { - eventType: 'run_completed', + eventType: 'run_failed', specVersion: SPEC_VERSION_CURRENT, eventData: { - output: workflowResult, + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them }, }); - } catch (err) { + } catch (failErr) { if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) + WorkflowAPIError.is(failErr) && + (failErr.status === 409 || failErr.status === 410) ) { runtimeLogger.warn( - 'Tried completing workflow run, but run has already finished.', + 'Tried failing workflow run, but run has already finished.', { workflowRunId: runId, - message: err.message, + message: failErr.message, } ); + span?.setAttributes({ + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); return; } else { - throw err; + throw failErr; } } span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); + return; + } + + // --- Infrastructure: complete the run --- + // This is outside the user-code try/catch so that failures + // here (e.g., network errors) propagate to the queue handler. + try { + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: workflowResult, + }, }); - }); // End withThrottleRetry + } catch (err) { + if ( + WorkflowAPIError.is(err) && + (err.status === 409 || err.status === 410) + ) { + runtimeLogger.warn( + 'Tried completing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: err.message, + } + ); + return; + } else { + throw err; + } + } + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); } ); // End trace } diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index a1f12711c6..3844a84c2f 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -1,6 +1,5 @@ -import { WorkflowAPIError } from '@workflow/errors'; -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { getWorkflowQueueName, withThrottleRetry } from './helpers.js'; +import { describe, expect, it, vi } from 'vitest'; +import { getWorkflowQueueName } from './helpers.js'; // Mock the logger to suppress output during tests vi.mock('../logger.js', () => ({ @@ -81,126 +80,3 @@ describe('getWorkflowQueueName', () => { expect(() => getWorkflowQueueName('')).toThrow('Invalid workflow name'); }); }); - -describe('withThrottleRetry', () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - - it('should pass through the result on success', async () => { - const fn = vi.fn().mockResolvedValue(undefined); - const result = await withThrottleRetry(fn); - expect(result).toBeUndefined(); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should pass through { timeoutSeconds } returned by fn', async () => { - const fn = vi.fn().mockResolvedValue({ timeoutSeconds: 42 }); - const result = await withThrottleRetry(fn); - expect(result).toEqual({ timeoutSeconds: 42 }); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should re-throw non-429 errors including 5xx', async () => { - const error = new WorkflowAPIError('Internal Server Error', { - status: 500, - }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withThrottleRetry(fn)).rejects.toThrow( - 'Internal Server Error' - ); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should re-throw non-WorkflowAPIError errors', async () => { - const error = new Error('random failure'); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withThrottleRetry(fn)).rejects.toThrow('random failure'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should wait in-process and retry once for short retryAfter (<10s)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 5 }) - ) - .mockResolvedValueOnce(undefined); - - const promise = withThrottleRetry(fn); - // Advance past the 5s wait - await vi.advanceTimersByTimeAsync(5000); - const result = await promise; - - expect(result).toBeUndefined(); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should defer to queue when both attempts are throttled (double 429)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 3 }) - ) - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled again', { status: 429, retryAfter: 7 }) - ); - - const promise = withThrottleRetry(fn); - // Advance past the 3s in-process wait - await vi.advanceTimersByTimeAsync(3000); - const result = await promise; - - expect(result).toEqual({ timeoutSeconds: 7 }); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should re-throw non-429 error on retry failure', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 2 }) - ) - .mockRejectedValueOnce(new Error('connection lost')); - - // Capture the rejection early to prevent unhandled rejection warning - const promise = withThrottleRetry(fn).catch((e) => e); - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBeInstanceOf(Error); - expect(result.message).toBe('connection lost'); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should defer to queue immediately for long retryAfter (>=10s)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 15 }) - ); - - const result = await withThrottleRetry(fn); - - expect(result).toEqual({ timeoutSeconds: 15 }); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should default to 30s (defer to queue) when no retryAfter is provided', async () => { - const error = new WorkflowAPIError('Throttled', { status: 429 }); - // retryAfter is undefined, so it defaults to 30 (>=10 → defer) - const fn = vi.fn().mockRejectedValue(error); - - const result = await withThrottleRetry(fn); - - expect(result).toEqual({ timeoutSeconds: 30 }); - expect(fn).toHaveBeenCalledTimes(1); - }); -}); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index acf5a2247e..941142e943 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -1,4 +1,3 @@ -import { WorkflowAPIError } from '@workflow/errors'; import type { Event, HealthCheckPayload, @@ -7,7 +6,7 @@ import type { } from '@workflow/world'; import { HealthCheckPayloadSchema } from '@workflow/world'; import { monotonicFactory } from 'ulid'; -import { runtimeLogger } from '../logger.js'; + import * as Attribute from '../telemetry/semantic-conventions.js'; import { getSpanKind, trace } from '../telemetry.js'; import { getWorld } from './world.js'; @@ -403,70 +402,3 @@ export function getQueueOverhead(message: { requestedAt?: Date }) { return; } } - -/** - * Wraps a queue handler with HTTP 429 throttle retry logic. - * - retryAfter < 10s: waits in-process via setTimeout, then retries once - * - retryAfter >= 10s: returns { timeoutSeconds } to defer to the queue - * - * Safe to retry the entire handler because 429 is sent from server middleware - * before the request is processed — no server state has changed. - */ -// biome-ignore lint/suspicious/noConfusingVoidType: matches Queue handler return type -export async function withThrottleRetry( - fn: () => Promise -): Promise { - try { - return await fn(); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 429) { - const retryAfterSeconds = Math.max( - // If we don't have a retry-after value, 30s seems a reasonable default - // to avoid re-trying during the unknown rate-limiting period. - 1, - typeof err.retryAfter === 'number' ? err.retryAfter : 30 - ); - - if (retryAfterSeconds < 10) { - runtimeLogger.warn( - 'Throttled by workflow-server (429), retrying in-process', - { - retryAfterSeconds, - url: err.url, - } - ); - // Short wait: sleep in-process, then retry once - await new Promise((resolve) => - setTimeout(resolve, retryAfterSeconds * 1000) - ); - try { - return await fn(); - } catch (retryErr) { - // If the retry also gets throttled, defer to queue - if (WorkflowAPIError.is(retryErr) && retryErr.status === 429) { - const retryRetryAfter = Math.max( - 1, - typeof retryErr.retryAfter === 'number' ? retryErr.retryAfter : 1 - ); - runtimeLogger.warn('Throttled again on retry, deferring to queue', { - retryAfterSeconds: retryRetryAfter, - }); - return { timeoutSeconds: retryRetryAfter }; - } - throw retryErr; - } - } - - // Long wait: defer to queue infrastructure - runtimeLogger.warn( - 'Throttled by workflow-server (429), deferring to queue', - { - retryAfterSeconds, - url: err.url, - } - ); - return { timeoutSeconds: retryAfterSeconds }; - } - throw err; - } -} diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 92624cd50e..313ab4b828 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -132,7 +132,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( step = startResult.step; } catch (err) { if (WorkflowAPIError.is(err)) { - if (WorkflowAPIError.is(err) && err.status === 429) { + if (err.status === 429) { const retryRetryAfter = Math.max( 1, typeof err.retryAfter === 'number' ? err.retryAfter : 1 @@ -289,7 +289,9 @@ const stepHandler = getWorldHandlers().createQueueHandler( } // --- Infrastructure: prepare step input --- - // Errors here propagate to the queue handler for automatic retry. + // Network/server errors propagate to the queue handler for retry. + // WorkflowRuntimeError (data integrity issues) are fatal — retrying + // won't fix them, so we re-queue the workflow to surface the error. // step_started already validated the step is in valid state (pending/running) // and returned the updated step entity with incremented attempt @@ -297,9 +299,35 @@ const stepHandler = getWorldHandlers().createQueueHandler( const attempt = step.attempt; if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` - ); + const errorMessage = `Step "${stepId}" has no "startedAt" timestamp`; + runtimeLogger.error('Fatal runtime error during step setup', { + workflowRunId, + stepId, + error: errorMessage, + }); + try { + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: new Error(errorMessage).stack ?? '', + }, + }); + } catch (failErr) { + if (WorkflowAPIError.is(failErr) && failErr.status === 409) { + return; + } + throw failErr; + } + // Re-queue the workflow so it can process the step failure + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; } // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) const stepStartedAt = step.startedAt;