diff --git a/.changeset/fix-infra-error-handling.md b/.changeset/fix-infra-error-handling.md new file mode 100644 index 0000000000..2f8fe4cdec --- /dev/null +++ b/.changeset/fix-infra-error-handling.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Separate infrastructure vs user code error handling in workflow and step runtimes so transient network errors (ECONNRESET, etc.) propagate to the queue for retry instead of incorrectly marking runs as failed diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 72f023e15d..b6f37ac035 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1066,33 +1066,6 @@ describe('e2e', () => { expect(result.failed).toBe(true); expect(result.attempt).toBe(1); }); - - test( - 'workflow completes despite transient 5xx on step_completed', - { timeout: 120_000 }, - async () => { - const run = await start( - await e2e('serverError5xxRetryWorkflow'), - [42] - ); - const result = await run.returnValue; - - // Correct result proves workflow completed successfully - expect(result.result).toBe(84); // 42 * 2 - - // retryCount > 0 proves the fault injection actually triggered - expect(result.retryCount).toBe(2); - - // attempt === 1 proves no step attempt was consumed by the 5xx retries - const { json: steps } = await cliInspectJson( - `steps --runId ${run.runId}` - ); - const doWorkStep = steps.find((s: any) => - s.stepName.includes('doWork') - ); - expect(doWorkStep.attempt).toBe(1); - } - ); }); describe('catchability', () => { diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ad3df0059a..542898008d 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -12,12 +12,9 @@ import { runtimeLogger } from './logger.js'; import { getAllWorkflowRunEvents, getQueueOverhead, - getWorkflowQueueName, handleHealthCheckMessage, parseHealthCheckPayload, - queueMessage, withHealthCheck, - withThrottleRetry, } from './runtime/helpers.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; @@ -25,7 +22,6 @@ import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { linkToCurrentContext, - serializeTraceCarrier, trace, withTraceContext, withWorkflowBaggage, @@ -101,7 +97,6 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, - serverErrorRetryCount, } = WorkflowInvokePayloadSchema.parse(message_); // Extract the workflow name from the topic name const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); @@ -136,307 +131,303 @@ export function workflowEntrypoint( ...Attribute.WorkflowTracePropagated(!!traceContext), }); - return await withThrottleRetry(async () => { - let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); - try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } + let workflowStartedAt = -1; + let workflowRun = await world.runs.get(runId); - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; - - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), + // --- Infrastructure: prepare the run state --- + // Network/server errors propagate to the queue handler for retry. + // WorkflowRuntimeError (data integrity issues) are fatal and + // produce run_failed since retrying won't fix them. + try { + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, }); - - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); - - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. - - return; } + workflowRun = result.run; + } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents( - workflowRun.runId - ); - - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); - - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` ); - - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); - - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent - ); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.info( - 'Wait already completed, skipping', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - } - ); - continue; - } - throw err; - } - } - - const result = await trace( - 'workflow.replay', - {}, - async (replaySpan) => { - replaySpan?.setAttributes({ - ...Attribute.WorkflowEventsCount(events.length), - }); - // Resolve the encryption key for this run's deployment - const rawKey = - await world.getEncryptionKeyForRun?.(workflowRun); - const encryptionKey = rawKey - ? await importKey(rawKey) - : undefined; - return await runWorkflow( - workflowCode, - workflowRun, - events, - encryptionKey - ); - } + } + } catch (err) { + if (err instanceof WorkflowRuntimeError) { + runtimeLogger.error( + 'Fatal runtime error during workflow setup', + { workflowRunId: runId, error: err.message } ); - - // Complete the workflow run via event (event-sourced architecture) try { await world.events.create(runId, { - eventType: 'run_completed', + eventType: 'run_failed', specVersion: SPEC_VERSION_CURRENT, eventData: { - output: result, + error: { + message: err.message, + stack: err.stack, + }, }, }); - } catch (err) { + } catch (failErr) { if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) + WorkflowAPIError.is(failErr) && + (failErr.status === 409 || failErr.status === 410) ) { - runtimeLogger.warn( - 'Tried completing workflow run, but run has already finished.', - { - workflowRunId: runId, - message: err.message, - } - ); return; - } else { - throw err; } + throw failErr; } + return; + } + throw err; + } + workflowStartedAt = +workflowRun.startedAt; - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount - ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - const result = await handleSuspension({ - suspension: err, - world, - run: workflowRun, - span, - }); + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - } else { - // Retry server errors (5xx) with exponential backoff before failing the run - if ( - WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500 - ) { - const retryCount = serverErrorRetryCount ?? 0; - const delaySecondSteps = [5, 30, 120]; // 5s, 30s, 120s - if (retryCount < delaySecondSteps.length) { - runtimeLogger.warn( - 'Server error (5xx), re-enqueueing workflow with backoff', - { - workflowRunId: runId, - retryCount, - delaySeconds: delaySecondSteps[retryCount], - error: err.message, - } - ); - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId, - serverErrorRetryCount: retryCount + 1, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { delaySeconds: delaySecondSteps[retryCount] } - ); - return; // Don't fail the run, retry later - } - // Fall through to run_failed after exhausting retries - } else if ( - WorkflowAPIError.is(err) && - err.status === 429 - ) { - // Throw to let withThrottleRetry handle it - throw err; - } + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + return; + } - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); - } + // Load all events into memory before running + const events = await getAllWorkflowRunEvents(workflowRun.runId); - const normalizedError = await normalizeUnknownError(err); - const errorName = - normalizedError.name || getErrorName(err); - const errorMessage = normalizedError.message; - let errorStack = - normalizedError.stack || getErrorStack(err); - - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = - parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); + + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); + + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - runtimeLogger.error('Error while running workflow', { + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + try { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.info('Wait already completed, skipping', { workflowRunId: runId, - errorName, - errorStack, + correlationId: waitEvent.correlationId, }); + continue; + } + throw err; + } + } - // Fail the workflow run via event (event-sourced architecture) - try { - await world.events.create(runId, { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them - }, - }); - } catch (err) { - if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) - ) { - runtimeLogger.warn( - 'Tried failing workflow run, but run has already finished.', - { - workflowRunId: runId, - message: err.message, - } - ); - span?.setAttributes({ - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), - }); - return; - } else { - throw err; - } - } + // Resolve the encryption key for this run's deployment + const rawKey = + await world.getEncryptionKeyForRun?.(workflowRun); + const encryptionKey = rawKey + ? await importKey(rawKey) + : undefined; + // --- User code execution --- + // Only errors from runWorkflow() (user workflow code) should + // produce run_failed. Infrastructure errors (network, server) + // must propagate to the queue handler for automatic retry. + let workflowResult: unknown; + try { + workflowResult = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + return await runWorkflow( + workflowCode, + workflowRun, + events, + encryptionKey + ); + } + ); + } catch (err) { + // WorkflowSuspension is normal control flow — not an error + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } + + const result = await handleSuspension({ + suspension: err, + world, + run: workflowRun, + span, + }); + + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + + // Suspension handled, no further work needed + return; + } + + // This is a user code error or a WorkflowRuntimeError + // (e.g., corrupted event log). Fail the workflow run. + + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + const normalizedError = await normalizeUnknownError(err); + const errorName = normalizedError.name || getErrorName(err); + const errorMessage = normalizedError.message; + let errorStack = normalizedError.stack || getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + + // Fail the workflow run via event (event-sourced architecture) + try { + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + } catch (failErr) { + if ( + WorkflowAPIError.is(failErr) && + (failErr.status === 409 || failErr.status === 410) + ) { + runtimeLogger.warn( + 'Tried failing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: failErr.message, + } + ); span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), ...Attribute.WorkflowErrorName(errorName), ...Attribute.WorkflowErrorMessage(errorMessage), ...Attribute.ErrorType(errorName), }); + return; + } else { + throw failErr; } } - }); // End withThrottleRetry + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); + return; + } + + // --- Infrastructure: complete the run --- + // This is outside the user-code try/catch so that failures + // here (e.g., network errors) propagate to the queue handler. + try { + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: workflowResult, + }, + }); + } catch (err) { + if ( + WorkflowAPIError.is(err) && + (err.status === 409 || err.status === 410) + ) { + runtimeLogger.warn( + 'Tried completing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: err.message, + } + ); + return; + } else { + throw err; + } + } + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); } ); // End trace } diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index 3a69dc54e1..3844a84c2f 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -1,10 +1,5 @@ -import { WorkflowAPIError } from '@workflow/errors'; -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - getWorkflowQueueName, - withServerErrorRetry, - withThrottleRetry, -} from './helpers.js'; +import { describe, expect, it, vi } from 'vitest'; +import { getWorkflowQueueName } from './helpers.js'; // Mock the logger to suppress output during tests vi.mock('../logger.js', () => ({ @@ -85,227 +80,3 @@ describe('getWorkflowQueueName', () => { expect(() => getWorkflowQueueName('')).toThrow('Invalid workflow name'); }); }); - -describe('withServerErrorRetry', () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - - it('should return the result on success', async () => { - const fn = vi.fn().mockResolvedValue('ok'); - const result = await withServerErrorRetry(fn); - expect(result).toBe('ok'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should retry on 5xx WorkflowAPIError and succeed', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Internal Server Error', { status: 500 }) - ) - .mockResolvedValueOnce('recovered'); - - const promise = withServerErrorRetry(fn); - await vi.advanceTimersByTimeAsync(500); - const result = await promise; - - expect(result).toBe('recovered'); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should retry up to 3 times with exponential backoff', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 502 })) - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 503 })) - .mockRejectedValueOnce(new WorkflowAPIError('error', { status: 500 })) - .mockResolvedValueOnce('finally'); - - const promise = withServerErrorRetry(fn); - - // First retry after 500ms - await vi.advanceTimersByTimeAsync(500); - // Second retry after 1000ms - await vi.advanceTimersByTimeAsync(1000); - // Third retry after 2000ms - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBe('finally'); - expect(fn).toHaveBeenCalledTimes(4); - }); - - it('should throw after exhausting all retries', async () => { - const error = new WorkflowAPIError('server down', { status: 500 }); - const fn = vi.fn().mockRejectedValue(error); - - const promise = withServerErrorRetry(fn).catch((e) => e); - - // Advance through all 3 retry delays - await vi.advanceTimersByTimeAsync(500); - await vi.advanceTimersByTimeAsync(1000); - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBeInstanceOf(WorkflowAPIError); - expect(result.message).toBe('server down'); - // 1 initial + 3 retries = 4 total calls - expect(fn).toHaveBeenCalledTimes(4); - }); - - it('should not retry non-5xx WorkflowAPIErrors', async () => { - const error = new WorkflowAPIError('Not Found', { status: 404 }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('Not Found'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should not retry non-WorkflowAPIError errors', async () => { - const error = new Error('some other error'); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('some other error'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should not retry 429 errors (handled by withThrottleRetry)', async () => { - const error = new WorkflowAPIError('Too Many Requests', { - status: 429, - retryAfter: 5, - }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withServerErrorRetry(fn)).rejects.toThrow('Too Many Requests'); - expect(fn).toHaveBeenCalledTimes(1); - }); -}); - -describe('withThrottleRetry', () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - - it('should pass through the result on success', async () => { - const fn = vi.fn().mockResolvedValue(undefined); - const result = await withThrottleRetry(fn); - expect(result).toBeUndefined(); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should pass through { timeoutSeconds } returned by fn', async () => { - const fn = vi.fn().mockResolvedValue({ timeoutSeconds: 42 }); - const result = await withThrottleRetry(fn); - expect(result).toEqual({ timeoutSeconds: 42 }); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should re-throw non-429 errors including 5xx', async () => { - const error = new WorkflowAPIError('Internal Server Error', { - status: 500, - }); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withThrottleRetry(fn)).rejects.toThrow( - 'Internal Server Error' - ); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should re-throw non-WorkflowAPIError errors', async () => { - const error = new Error('random failure'); - const fn = vi.fn().mockRejectedValue(error); - - await expect(withThrottleRetry(fn)).rejects.toThrow('random failure'); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should wait in-process and retry once for short retryAfter (<10s)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 5 }) - ) - .mockResolvedValueOnce(undefined); - - const promise = withThrottleRetry(fn); - // Advance past the 5s wait - await vi.advanceTimersByTimeAsync(5000); - const result = await promise; - - expect(result).toBeUndefined(); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should defer to queue when both attempts are throttled (double 429)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 3 }) - ) - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled again', { status: 429, retryAfter: 7 }) - ); - - const promise = withThrottleRetry(fn); - // Advance past the 3s in-process wait - await vi.advanceTimersByTimeAsync(3000); - const result = await promise; - - expect(result).toEqual({ timeoutSeconds: 7 }); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should re-throw non-429 error on retry failure', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 2 }) - ) - .mockRejectedValueOnce(new Error('connection lost')); - - // Capture the rejection early to prevent unhandled rejection warning - const promise = withThrottleRetry(fn).catch((e) => e); - await vi.advanceTimersByTimeAsync(2000); - - const result = await promise; - expect(result).toBeInstanceOf(Error); - expect(result.message).toBe('connection lost'); - expect(fn).toHaveBeenCalledTimes(2); - }); - - it('should defer to queue immediately for long retryAfter (>=10s)', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce( - new WorkflowAPIError('Throttled', { status: 429, retryAfter: 15 }) - ); - - const result = await withThrottleRetry(fn); - - expect(result).toEqual({ timeoutSeconds: 15 }); - expect(fn).toHaveBeenCalledTimes(1); - }); - - it('should default to 30s (defer to queue) when no retryAfter is provided', async () => { - const error = new WorkflowAPIError('Throttled', { status: 429 }); - // retryAfter is undefined, so it defaults to 30 (>=10 → defer) - const fn = vi.fn().mockRejectedValue(error); - - const result = await withThrottleRetry(fn); - - expect(result).toEqual({ timeoutSeconds: 30 }); - expect(fn).toHaveBeenCalledTimes(1); - }); -}); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 47abd74bb1..941142e943 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -1,4 +1,3 @@ -import { WorkflowAPIError } from '@workflow/errors'; import type { Event, HealthCheckPayload, @@ -7,7 +6,7 @@ import type { } from '@workflow/world'; import { HealthCheckPayloadSchema } from '@workflow/world'; import { monotonicFactory } from 'ulid'; -import { runtimeLogger } from '../logger.js'; + import * as Attribute from '../telemetry/semantic-conventions.js'; import { getSpanKind, trace } from '../telemetry.js'; import { getWorld } from './world.js'; @@ -403,110 +402,3 @@ export function getQueueOverhead(message: { requestedAt?: Date }) { return; } } - -/** - * Wraps a queue handler with HTTP 429 throttle retry logic. - * - retryAfter < 10s: waits in-process via setTimeout, then retries once - * - retryAfter >= 10s: returns { timeoutSeconds } to defer to the queue - * - * Safe to retry the entire handler because 429 is sent from server middleware - * before the request is processed — no server state has changed. - */ -// biome-ignore lint/suspicious/noConfusingVoidType: matches Queue handler return type -export async function withThrottleRetry( - fn: () => Promise -): Promise { - try { - return await fn(); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 429) { - const retryAfterSeconds = Math.max( - // If we don't have a retry-after value, 30s seems a reasonable default - // to avoid re-trying during the unknown rate-limiting period. - 1, - typeof err.retryAfter === 'number' ? err.retryAfter : 30 - ); - - if (retryAfterSeconds < 10) { - runtimeLogger.warn( - 'Throttled by workflow-server (429), retrying in-process', - { - retryAfterSeconds, - url: err.url, - } - ); - // Short wait: sleep in-process, then retry once - await new Promise((resolve) => - setTimeout(resolve, retryAfterSeconds * 1000) - ); - try { - return await fn(); - } catch (retryErr) { - // If the retry also gets throttled, defer to queue - if (WorkflowAPIError.is(retryErr) && retryErr.status === 429) { - const retryRetryAfter = Math.max( - 1, - typeof retryErr.retryAfter === 'number' ? retryErr.retryAfter : 1 - ); - runtimeLogger.warn('Throttled again on retry, deferring to queue', { - retryAfterSeconds: retryRetryAfter, - }); - return { timeoutSeconds: retryRetryAfter }; - } - throw retryErr; - } - } - - // Long wait: defer to queue infrastructure - runtimeLogger.warn( - 'Throttled by workflow-server (429), deferring to queue', - { - retryAfterSeconds, - url: err.url, - } - ); - return { timeoutSeconds: retryAfterSeconds }; - } - throw err; - } -} - -/** - * Retries a function when it throws a 5xx WorkflowAPIError. - * Used to handle transient workflow-server errors without consuming step attempts. - * - * Retries up to 3 times with exponential backoff (500ms, 1s, 2s ≈ 3.5s total). - * If all retries fail, the original error is re-thrown. - */ -export async function withServerErrorRetry( - fn: () => Promise -): Promise { - const delays = [500, 1000, 2000]; - for (let attempt = 0; attempt <= delays.length; attempt++) { - try { - return await fn(); - } catch (err) { - if ( - WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500 && - attempt < delays.length - ) { - runtimeLogger.warn( - 'Server error (5xx) from workflow-server, retrying in-process', - { - status: err.status, - attempt: attempt + 1, - maxRetries: delays.length, - nextDelayMs: delays[attempt], - url: err.url, - } - ); - await new Promise((resolve) => setTimeout(resolve, delays[attempt])); - continue; - } - throw err; - } - } - throw new Error('withServerErrorRetry: unreachable'); -} diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index bd0c1033c8..3179903e94 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1,5 +1,5 @@ import { FatalError, WorkflowAPIError } from '@workflow/errors'; -import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; // Use vi.hoisted so these are available in mock factories const { @@ -96,7 +96,6 @@ vi.mock('./helpers.js', async () => { withHealthCheck: (handler: unknown) => handler, parseHealthCheckPayload: vi.fn().mockReturnValue(null), handleHealthCheckMessage: vi.fn(), - withServerErrorRetry: async (fn: () => Promise) => fn(), }; }); @@ -145,9 +144,9 @@ vi.mock('@workflow/utils/get-port', () => ({ import './step-handler.js'; import { getStepFunction } from '../private.js'; import { - normalizeUnknownError, getErrorName, getErrorStack, + normalizeUnknownError, } from '../types.js'; import { getWorld } from './world.js'; diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 2938077641..313ab4b828 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -36,7 +36,6 @@ import { parseHealthCheckPayload, queueMessage, withHealthCheck, - withServerErrorRetry, } from './helpers.js'; import { getWorld, getWorldHandlers } from './world.js'; @@ -119,13 +118,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( // - Workflow still active (returns 410 if completed) let step; try { - const startResult = await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }) - ); + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); if (!startResult.step) { throw new WorkflowRuntimeError( @@ -135,7 +132,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( step = startResult.step; } catch (err) { if (WorkflowAPIError.is(err)) { - if (WorkflowAPIError.is(err) && err.status === 429) { + if (err.status === 429) { const retryRetryAfter = Math.max( 1, typeof err.retryAfter === 'number' ? err.retryAfter : 1 @@ -291,53 +288,89 @@ const stepHandler = getWorldHandlers().createQueueHandler( return; } - try { - // step_started already validated the step is in valid state (pending/running) - // and returned the updated step entity with incremented attempt + // --- Infrastructure: prepare step input --- + // Network/server errors propagate to the queue handler for retry. + // WorkflowRuntimeError (data integrity issues) are fatal — retrying + // won't fix them, so we re-queue the workflow to surface the error. + // step_started already validated the step is in valid state (pending/running) + // and returned the updated step entity with incremented attempt - // step.attempt is now the current attempt number (after increment) - const attempt = step.attempt; + // step.attempt is now the current attempt number (after increment) + const attempt = step.attempt; - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` + if (!step.startedAt) { + const errorMessage = `Step "${stepId}" has no "startedAt" timestamp`; + runtimeLogger.error('Fatal runtime error during step setup', { + workflowRunId, + stepId, + error: errorMessage, + }); + try { + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: new Error(errorMessage).stack ?? '', + }, + }); + } catch (failErr) { + if (WorkflowAPIError.is(failErr) && failErr.status === 409) { + return; + } + throw failErr; + } + // Re-queue the workflow so it can process the step failure + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) + const stepStartedAt = step.startedAt; + + // Hydrate the step input arguments, closure variables, and thisVal + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const ops: Promise[] = []; + const rawKey = await world.getEncryptionKeyForRun?.(workflowRunId); + const encryptionKey = rawKey ? await importKey(rawKey) : undefined; + const hydratedInput = await trace( + 'step.hydrate', + {}, + async (hydrateSpan) => { + const startTime = Date.now(); + const result = await hydrateStepArguments( + step.input, + workflowRunId, + encryptionKey, + ops ); + const durationMs = Date.now() - startTime; + hydrateSpan?.setAttributes({ + ...Attribute.StepArgumentsCount(result.args.length), + ...Attribute.QueueDeserializeTimeMs(durationMs), + }); + return result; } - // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) - const stepStartedAt = step.startedAt; - - // Hydrate the step input arguments, closure variables, and thisVal - // NOTE: This captures only the synchronous portion of hydration. Any async - // operations (e.g., stream loading) are added to `ops` and executed later - // via Promise.all(ops) - their timing is not included in this measurement. - const ops: Promise[] = []; - const rawKey = await world.getEncryptionKeyForRun?.(workflowRunId); - const encryptionKey = rawKey ? await importKey(rawKey) : undefined; - const hydratedInput = await trace( - 'step.hydrate', - {}, - async (hydrateSpan) => { - const startTime = Date.now(); - const result = await hydrateStepArguments( - step.input, - workflowRunId, - encryptionKey, - ops - ); - const durationMs = Date.now() - startTime; - hydrateSpan?.setAttributes({ - ...Attribute.StepArgumentsCount(result.args.length), - ...Attribute.QueueDeserializeTimeMs(durationMs), - }); - return result; - } - ); + ); + + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; + // --- User code execution --- + // Only errors from stepFn.apply() (user step code) should produce + // step_failed/step_retrying. Infrastructure errors (network, server) + // must propagate to the queue handler for automatic retry. + let userCodeError: unknown; + let userCodeFailed = false; - // Execute the step function with tracing - const executionStartTime = Date.now(); + const executionStartTime = Date.now(); + try { result = await trace('step.execute', {}, async () => { return await contextStorage.run( { @@ -364,93 +397,41 @@ const stepHandler = getWorldHandlers().createQueueHandler( () => stepFn.apply(thisVal, args) ); }); - const executionTimeMs = Date.now() - executionStartTime; + } catch (err) { + userCodeError = err; + userCodeFailed = true; + } + const executionTimeMs = Date.now() - executionStartTime; - span?.setAttributes({ - ...Attribute.QueueExecutionTimeMs(executionTimeMs), - }); + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - result = await trace( - 'step.dehydrate', - {}, - async (dehydrateSpan) => { - const startTime = Date.now(); - const dehydrated = await dehydrateStepReturnValue( - result, - workflowRunId, - encryptionKey, - ops + // --- Handle user code errors --- + if (userCodeFailed) { + const err = userCodeError; + + // Infrastructure errors that somehow surfaced through user code + // should propagate to the queue handler for retry, not consume + // step attempts. + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + stepLogger.info( + 'Workflow run already completed, skipping step', + { + workflowRunId, + stepId, + message: err.message, + } ); - const durationMs = Date.now() - startTime; - dehydrateSpan?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(durationMs), - ...Attribute.StepResultType(typeof dehydrated), - }); - return dehydrated; + return; } - ); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - // Run step_completed and trace serialization concurrently; - // the trace carrier is used in the final queueMessage call below - let stepCompleted409 = false; - const [, traceCarrier] = await Promise.all([ - withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }) - ).catch((err) => { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.warn( - 'Tried completing step, but step has already finished.', - { - workflowRunId, - stepId, - stepName, - message: err.message, - } - ); - stepCompleted409 = true; - return; - } + if (err.status !== undefined && err.status >= 500) { throw err; - }), - serializeTraceCarrier(), - ]); - - if (stepCompleted409) { - return; + } } - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); - - // Queue the workflow continuation with the concurrently-resolved trace carrier - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier, - requestedAt: new Date(), - }); - return; - } catch (err: unknown) { const normalizedError = await normalizeUnknownError(err); const normalizedStack = normalizedError.stack || getErrorStack(err) || ''; @@ -477,44 +458,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.ErrorRetryable(!isFatal), }); - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - stepLogger.info( - 'Workflow run already completed, skipping step', - { - workflowRunId, - stepId, - message: err.message, - } - ); - return; - } - - // Server errors (5xx) from workflow-server are treated as persistent - // infrastructure issues. The withServerErrorRetry wrapper already - // retried the call a few times; if we still have a 5xx here it's - // likely persistent. Re-throw so the queue can retry the job and - // re-invoke this handler. Note: by the time we reach this point, - // step_started has already run and incremented step.attempt, and a - // subsequent queue retry may increment attempts again depending on - // storage semantics, so these retries are not guaranteed to be - // "free" with respect to step attempts. - if (err.status !== undefined && err.status >= 500) { - runtimeLogger.warn( - 'Persistent server error (5xx) during step, deferring to queue retry', - { - status: err.status, - workflowRunId, - stepId, - error: err.message, - url: err.url, - } - ); - throw err; - } - } - if (isFatal) { stepLogger.error( 'Encountered FatalError while executing step, bubbling up to parent workflow', @@ -526,17 +469,15 @@ const stepHandler = getWorldHandlers().createQueueHandler( ); // Fail the step via event (event-sourced architecture) try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: normalizedError.message, - stack: normalizedStack, - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: normalizedError.message, + stack: normalizedStack, + }, + }); } catch (stepFailErr) { if ( WorkflowAPIError.is(stepFailErr) && @@ -587,17 +528,15 @@ const stepHandler = getWorldHandlers().createQueueHandler( const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${normalizedError.message}`; // Fail the step via event (event-sourced architecture) try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: errorMessage, - stack: normalizedStack, - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: normalizedStack, + }, + }); } catch (stepFailErr) { if ( WorkflowAPIError.is(stepFailErr) && @@ -644,20 +583,18 @@ const stepHandler = getWorldHandlers().createQueueHandler( // Set step to pending for retry via event (event-sourced architecture) // step_retrying records the error and sets status to pending try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_retrying', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: normalizedError.message, - stack: normalizedStack, - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }, - }) - ); + await world.events.create(workflowRunId, { + eventType: 'step_retrying', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: normalizedError.message, + stack: normalizedStack, + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }, + }); } catch (stepRetryErr) { if ( WorkflowAPIError.is(stepRetryErr) && @@ -701,11 +638,93 @@ const stepHandler = getWorldHandlers().createQueueHandler( return { timeoutSeconds }; } } + + // Re-invoke the workflow to handle the failed/retrying step + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + + // --- Infrastructure: complete the step --- + // Errors here (network failures, server errors) propagate to the + // queue handler for automatic retry. + + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + result = await trace('step.dehydrate', {}, async (dehydrateSpan) => { + const startTime = Date.now(); + const dehydrated = await dehydrateStepReturnValue( + result, + workflowRunId, + encryptionKey, + ops + ); + const durationMs = Date.now() - startTime; + dehydrateSpan?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(durationMs), + ...Attribute.StepResultType(typeof dehydrated), + }); + return dehydrated; + }); + + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) + ); + + // Run step_completed and trace serialization concurrently; + // the trace carrier is used in the final queueMessage call below + let stepCompleted409 = false; + const [, traceCarrier] = await Promise.all([ + world.events + .create(workflowRunId, { + eventType: 'step_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + result: result as Uint8Array, + }, + }) + .catch((err: unknown) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.warn( + 'Tried completing step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: err.message, + } + ); + stepCompleted409 = true; + return; + } + throw err; + }), + serializeTraceCarrier(), + ]); + + if (stepCompleted409) { + return; } + span?.setAttributes({ + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), + }); + + // Queue the workflow continuation with the concurrently-resolved trace carrier await queueMessage(world, getWorkflowQueueName(workflowName), { runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), + traceCarrier, requestedAt: new Date(), }); } diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 19541f619c..e1d1fc9b32 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -1016,139 +1016,6 @@ export class ChainableService { // E2E test for `this` serialization with .call() and .apply() ////////////////////////////////////////////////////////// -// ============================================================ -// 5XX SERVER ERROR RETRY E2E TEST -// ============================================================ -// Tests that withServerErrorRetry in the step handler correctly -// retries transient 5xx errors from workflow-server without -// consuming step attempts. -// ============================================================ - -const FAULT_MAP_SYMBOL = Symbol.for('__test_5xx_fault_map'); -const FAULT_WRAPPER_INSTALLED_SYMBOL = Symbol.for( - '__test_5xx_fault_wrapper_installed' -); - -type FaultState = { - installStepId: string; - targetStepId?: string; - remaining: number; - triggered: number; -}; - -function shouldInjectStepCompletedFault(state: FaultState, data: any): boolean { - if (data?.eventType !== 'step_completed') return false; - - const correlationId = - typeof data?.correlationId === 'string' ? data.correlationId : null; - if (!correlationId) return false; - - // Never inject on the install step itself. This avoids flakiness when - // workflow-server transiently retries install step_completed. - if (correlationId === state.installStepId) return false; - - // Target exactly one non-install step (the first one encountered). - // Retries of that same step share correlationId and are intercepted. - state.targetStepId ??= correlationId; - - if (correlationId !== state.targetStepId) return false; - if (state.remaining <= 0) return false; - - state.remaining--; - state.triggered++; - return true; -} - -/** - * Step that installs a fault injection patch on world.events.create, - * scoped to the current runId. Only intercepts step_completed events - * for the target run; all other runs pass through unmodified. - */ -async function installServerErrorFaultInjection(failCount: number) { - 'use step'; - const { workflowRunId } = getWorkflowMetadata(); - const { stepId: installStepId } = getStepMetadata(); - const world = (globalThis as any)[Symbol.for('@workflow/world//cache')]; - - // Process-level map for run-scoped fault state - (globalThis as any)[FAULT_MAP_SYMBOL] ??= new Map(); - const faultMap = (globalThis as any)[FAULT_MAP_SYMBOL] as Map< - string, - FaultState - >; - - faultMap.set(workflowRunId, { - installStepId, - remaining: failCount, - triggered: 0, - }); - - // Install the wrapper once per process to avoid nested wrappers across runs. - if (!(world.events.create as any)[FAULT_WRAPPER_INSTALLED_SYMBOL]) { - const original = world.events.create.bind(world.events); - - const wrappedCreate = async ( - rid: string, - data: any, - ...rest: any[] - ): Promise => { - const state = faultMap.get(rid); - if (state && shouldInjectStepCompletedFault(state, data)) { - // Create an error that matches WorkflowAPIError.is() check: - // isError(value) && value.name === 'WorkflowAPIError' - const err: any = new Error('Injected 5xx'); - err.name = 'WorkflowAPIError'; - err.status = 500; - throw err; - } - - return original(rid, data, ...rest); - }; - - (wrappedCreate as any)[FAULT_WRAPPER_INSTALLED_SYMBOL] = true; - world.events.create = wrappedCreate; - } -} - -/** - * Simple step that does computation (input * 2). - * Its step_completed event will be intercepted by the fault injection. - */ -async function doWork(input: number) { - 'use step'; - return input * 2; -} - -/** - * Cleanup step that reads and clears the fault injection state for this run. - * Returns how many times the fault was triggered. - */ -async function cleanupFaultInjection() { - 'use step'; - const { workflowRunId } = getWorkflowMetadata(); - const faultMap = (globalThis as any)[FAULT_MAP_SYMBOL] as - | Map - | undefined; - const state = faultMap?.get(workflowRunId); - const triggered = state?.triggered ?? 0; - faultMap?.delete(workflowRunId); - return triggered; -} - -/** - * Workflow that exercises the 5xx retry codepath in the step handler. - * 1. Installs fault injection (scoped to this run's step_completed events) - * 2. Runs a computation step (its step_completed will fail with 5xx twice) - * 3. Cleans up and returns result + retry count for assertions - */ -export async function serverError5xxRetryWorkflow(input: number) { - 'use workflow'; - await installServerErrorFaultInjection(2); - const result = await doWork(input); - const retryCount = await cleanupFaultInjection(); - return { result, retryCount }; -} - ////////////////////////////////////////////////////////// // E2E test for `this` serialization with .call() and .apply() //////////////////////////////////////////////////////////