Add retry semantics and error recovery for adaptive interruption detection#1149
Conversation
|
| * @defaultValue undefined | ||
| */ | ||
| mode: 'adaptive' | 'vad' | false | undefined; | ||
| mode: 'adaptive' | 'vad' | undefined; |
There was a problem hiding this comment.
Remove false to keep aligned with python impl
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f1141df03e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (numRetries > 0 && this.isAgentSpeaking) { | ||
| await stream.pushFrame(InterruptionStreamSentinel.agentSpeechStarted()); | ||
| } |
There was a problem hiding this comment.
Rehydrate overlap state when recreating interruption stream
On retry, the code only re-injects agent-speech-started into the new interruption stream. If the transport failure happens during an active overlap, the replacement stream never receives overlap-speech-started, so its internal overlapSpeechStarted flag stays false and incoming audio frames are not sent for adaptive inference until a brand-new overlap begins. This causes the current interruption to be missed after a transient error, which undermines the intended error-recovery behavior.
Useful? React with 👍 / 👎.
Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
There was a problem hiding this comment.
apart from my other comment about catching abort before the async push, this looks good to me.
Generally not a fan of the nested while loop as it introduces a lot of logic flow through multiple break levels, but we can address this afterwards also.
…ttps://github.com/livekit/agents-js into brian/unify-interruption-transport-error-handling
| outputController?.error( | ||
| new APIConnectionError({ message: `WebSocket error: ${err.message}` }), | ||
| ); |
There was a problem hiding this comment.
🔴 Multiple controller.error() calls in ws_transport can crash due to WHATWG Streams assertion
The PR changes the socket.on('error') handler (line 141) and transform catch block (line 381) from logging errors to calling controller.error() / outputController.error(). These join the existing MSG_ERROR handler (ws_transport.ts:277) and timeout check (ws_transport.ts:368) that already call controller.error(). If any two of these fire in sequence (e.g., sendAudioData throws APIConnectionError because ws.readyState !== WebSocket.OPEN, the transform catch calls controller.error(), and then the pending socket.on('error') event fires and calls outputController.error()), the second call attempts to error an already-errored stream. In Node.js's WHATWG Streams implementation, ReadableStreamError asserts stream.state === 'readable', so the second call throws an AssertionError that is unhandled in the event handler, crashing the process.
The old code avoided this by only logging in socket.on('error') and transform catch, so only MSG_ERROR and the timeout check could call controller.error().
Prompt for agents
In agents/src/inference/interruption/ws_transport.ts, guard all calls to outputController.error() and controller.error() against the stream already being in an errored state. One approach: introduce a boolean flag (e.g. let streamErrored = false) scoped alongside outputController at line 123. Before each controller.error() call (lines 141-143, 277-280, 368-370, 381), check if streamErrored is false, and after a successful controller.error() call, set streamErrored = true. This prevents the second controller.error() call from triggering an assertion failure in Node.js's WHATWG Streams implementation. Affected locations:
- Line 141-143: socket.on('error') handler in setupMessageHandler
- Line 277-280: MSG_ERROR case in handleMessage
- Line 368-370: timeout check in transform
- Line 381: catch block in transform
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
Ports Python's recoverable/unrecoverable retry semantics for adaptive interruption detection (agents#5142) to the JS SDK, and fixes a JS-specific bug where interruption detection becomes permanently dead after a transient transport error.
Changes
Transport error classification (
http_transport.ts,ws_transport.ts)APIErrorsubclasses (APIStatusError,APIConnectionError,APITimeoutError) instead of genericError— enables the retry loop to classify errors viaisAPIError()and.retryableofetch's internal retry (retry: 0) so retry logic is centralized in the stream consumercontroller.error()instead of swallowing them withlogger.errorensureConnection()(retry is now handled at the stream level)Retry loop in
createInterruptionTask(audio_recognition.ts)whileloop that creates a freshInterruptionStreamBaseon each retry attempt, matching Python's_main_taskpatternAPIError→ emit recoverable error + retry with exponential backoff; non-retryable or retries exhausted → emit unrecoverable error + break (triggers VAD fallback)forwardTaskrejections infinallyto prevent unhandled rejections from breaking the retry loopagent-speech-startedsentinel on retry — JS creates a new stream per retry (unlike Python whereself._agent_speech_startedpersists on the instance), so the new stream'sagentSpeechStartedclosure variable must be re-initializedType and logic alignment (
interruption.ts,agent_activity.ts,report.ts)falsefromInterruptionOptions.modeunion type to match Python'sLiteral["adaptive", "vad"]resolveInterruptionDetectorconditions accordinglyallowInterruptionscomputation in session reportTest plan
pnpm build --filter agents)onInterruptionErrorcorrectly routes recoverable errors (log only) vs unrecoverable errors (fallback to VAD)APIErrorsubclasses throughcontroller.error(), consumed identically by the retry loop