From df1bd03ec0184a112d552bc4f807f1a4f8f13fda Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Thu, 2 Apr 2026 15:17:52 +0200 Subject: [PATCH 1/4] fix(cloud-agent-sdk): recover stale WebSocket connections on tab resume Add application-level ping/pong staleness detection for WebSocket connections that silently die when tabs are backgrounded. On tab resume, sends a ping and reconnects if no response within 5s. - base-connection: visibilitychange/pageshow/online handlers, ping timeout, proactive ticket refresh before reconnect - cloud-agent-transport/cli-live-transport: snapshot refetch on reconnect via onReconnected callback - session.ts: route completed sessions to historical transport - CloudAgentProvider: determine isLive from DO execution status --- .../cloud-agent-next/CloudAgentProvider.tsx | 29 +- .../cloud-agent-sdk/base-connection.test.ts | 501 ++++++++++++++++++ src/lib/cloud-agent-sdk/base-connection.ts | 154 +++++- .../cli-live-transport.test.ts | 175 ++++++ src/lib/cloud-agent-sdk/cli-live-transport.ts | 13 + .../cloud-agent-sdk/cloud-agent-connection.ts | 2 + .../cloud-agent-transport.test.ts | 160 ++++++ .../cloud-agent-sdk/cloud-agent-transport.ts | 13 + .../cloud-agent-sdk/session-routing.test.ts | 62 ++- src/lib/cloud-agent-sdk/session.ts | 6 +- 10 files changed, 1109 insertions(+), 6 deletions(-) create mode 100644 src/lib/cloud-agent-sdk/base-connection.test.ts diff --git a/src/components/cloud-agent-next/CloudAgentProvider.tsx b/src/components/cloud-agent-next/CloudAgentProvider.tsx index 38ea81721..266cd1ad6 100644 --- a/src/components/cloud-agent-next/CloudAgentProvider.tsx +++ b/src/components/cloud-agent-next/CloudAgentProvider.tsx @@ -40,10 +40,37 @@ export function CloudAgentProvider({ children, organizationId }: CloudAgentProvi try { const session = await trpcClient.cliSessionsV2.get.query({ session_id: kiloSessionId }); if (session.cloud_agent_session_id) { + let isLive = true; + try { + const withState = await trpcClient.cliSessionsV2.getWithRuntimeState.query({ + session_id: kiloSessionId, + }); + const rs = withState.runtimeState; + const executionStatus = rs?.execution?.status; + if ( + executionStatus === 'completed' || + executionStatus === 'failed' || + executionStatus === 'interrupted' + ) { + // Terminal execution status — session is done. + isLive = false; + } else if (executionStatus === 'pending' || executionStatus === 'running') { + // Active execution — session is live. + isLive = true; + } else { + // execution is null: either pre-execution (not yet initiated) or + // post-execution (DO cleaned up the execution record). + // If initiatedAt is set, the session ran and finished → not live. + // If initiatedAt is not set, the session is still being prepared → live. + isLive = !rs?.initiatedAt; + } + } catch { + // If we can't determine runtime state, assume live (safer — will just open a WebSocket) + } return { kiloSessionId, cloudAgentSessionId: session.cloud_agent_session_id as CloudAgentSessionId, - isLive: true, + isLive, }; } // CLI session — check if live diff --git a/src/lib/cloud-agent-sdk/base-connection.test.ts b/src/lib/cloud-agent-sdk/base-connection.test.ts new file mode 100644 index 000000000..b21e83dd7 --- /dev/null +++ b/src/lib/cloud-agent-sdk/base-connection.test.ts @@ -0,0 +1,501 @@ +import { + createBaseConnection, + PING_TIMEOUT_MS, + type BaseConnectionConfig, +} from './base-connection'; + +type MockWebSocket = { + url: string; + onopen: ((ev: Event) => void) | null; + onmessage: ((ev: MessageEvent) => void) | null; + onclose: ((ev: CloseEvent) => void) | null; + onerror: ((ev: Event) => void) | null; + close: jest.Mock; + send: jest.Mock; + readyState: number; +}; + +let sockets: MockWebSocket[]; +let webSocketMock: jest.Mock; + +// The source code guards browser APIs with `typeof document/window !== 'undefined'`. +// In Jest's node environment these don't exist, so we provide minimal mocks. +let mockDocument: EventTarget & { visibilityState: string }; +let mockWindow: EventTarget; + +let savedDocument: typeof globalThis.document; +let savedWindow: typeof globalThis.window; + +beforeEach(() => { + jest.useFakeTimers(); + sockets = []; + + webSocketMock = jest.fn((url: string) => { + const socket: MockWebSocket = { + url, + onopen: null, + onmessage: null, + onclose: null, + onerror: null, + close: jest.fn(), + send: jest.fn(), + readyState: 1, // WebSocket.OPEN + }; + sockets.push(socket); + return socket; + }); + + // @ts-expect-error -- test WebSocket mock + global.WebSocket = webSocketMock; + (global.WebSocket as unknown as Record).OPEN = 1; + (global.WebSocket as unknown as Record).CLOSED = 3; + + // Set up minimal document/window so base-connection registers event listeners + mockDocument = Object.assign(new EventTarget(), { visibilityState: 'visible' }); + mockWindow = new EventTarget(); + + savedDocument = globalThis.document; + savedWindow = globalThis.window; + // @ts-expect-error -- minimal mock for browser globals + globalThis.document = mockDocument; + // @ts-expect-error -- minimal mock for browser globals + globalThis.window = mockWindow; +}); + +afterEach(() => { + jest.useRealTimers(); + jest.restoreAllMocks(); + // @ts-expect-error -- cleanup test global + delete global.WebSocket; + + // Restore original document/window (undefined in node) + globalThis.document = savedDocument; + globalThis.window = savedWindow; +}); + +function createTestConnection(overrides: Partial = {}) { + const onConnected = jest.fn(); + const onReconnected = jest.fn(); + const onDisconnected = jest.fn(); + const onEvent = jest.fn(); + const onUnexpectedDisconnect = jest.fn(); + + const config: BaseConnectionConfig = { + buildUrl: () => 'ws://localhost:9999/test', + parseMessage: (data: unknown) => { + if (typeof data === 'string') { + return { type: 'event' as const, payload: data }; + } + return null; + }, + onEvent, + onConnected, + onDisconnected, + onReconnected, + onUnexpectedDisconnect, + ...overrides, + }; + + const connection = createBaseConnection(config); + return { + connection, + onConnected, + onReconnected, + onDisconnected, + onEvent, + onUnexpectedDisconnect, + }; +} + +function connectSocket(socketIndex = 0): void { + sockets[socketIndex].onmessage?.({ data: 'connected-msg' } as MessageEvent); +} + +function closeSocket(socketIndex: number, code = 1006): void { + sockets[socketIndex].onclose?.({ code, reason: '', wasClean: false } as CloseEvent); +} + +function simulateVisibilityChange(state: 'visible' | 'hidden'): void { + mockDocument.visibilityState = state; + mockDocument.dispatchEvent(new Event('visibilitychange')); +} + +function simulatePageshow(persisted: boolean): void { + const event = new Event('pageshow'); + Object.defineProperty(event, 'persisted', { value: persisted }); + mockWindow.dispatchEvent(event); +} + +describe('createBaseConnection – stale WebSocket recovery', () => { + describe('visibility change', () => { + it('reconnects with reset attempts when tab becomes visible and WS is dead', () => { + const { connection } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Mark socket as not open (simulating a dead connection) + sockets[0].readyState = 3; // WebSocket.CLOSED + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Should have created a new socket for reconnect + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('sends ping and reconnects if no message within timeout when tab becomes visible with open WS', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Socket is OPEN, so it should send a ping rather than immediately reconnecting + expect(sockets[0].send).toHaveBeenCalledWith('ping'); + expect(sockets).toHaveLength(1); + + // Advance past the ping timeout + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + // Should have closed the stale socket and created a new one + expect(sockets[0].close).toHaveBeenCalled(); + expect(onDisconnected).toHaveBeenCalled(); + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('cancels ping timeout if a message arrives before deadline', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + expect(sockets[0].send).toHaveBeenCalledWith('ping'); + + // Receive a message before the timeout fires + sockets[0].onmessage?.({ data: 'server-reply' } as MessageEvent); + + // Advance past the ping timeout - should NOT trigger reconnect + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('clears ping timeout when tab is hidden', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Tab visible → sends ping + simulateVisibilityChange('visible'); + expect(sockets[0].send).toHaveBeenCalledWith('ping'); + + // Tab hidden → should clear the ping timeout + simulateVisibilityChange('hidden'); + + // Advance past the ping timeout - nothing should happen + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + }); + + describe('BFCache (pageshow)', () => { + it('force-closes WS and reconnects on pageshow with persisted=true', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulatePageshow(true); + + expect(sockets[0].close).toHaveBeenCalled(); + expect(onDisconnected).toHaveBeenCalled(); + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('does nothing on pageshow with persisted=false', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulatePageshow(false); + + expect(sockets[0].close).not.toHaveBeenCalled(); + expect(onDisconnected).not.toHaveBeenCalled(); + expect(sockets).toHaveLength(1); + + connection.destroy(); + }); + }); + + describe('online event', () => { + it('resets attempts and reconnects when online fires while disconnected', () => { + const { connection } = createTestConnection(); + connection.connect(); + // Don't send a message, so `connected` stays false. + // Close the socket to simulate a disconnected state. + closeSocket(0); + + // Pending reconnect timer is scheduled. Clear it via online event. + const socketsBeforeOnline = sockets.length; + mockWindow.dispatchEvent(new Event('online')); + + // Should have created a new socket + expect(sockets.length).toBeGreaterThan(socketsBeforeOnline); + + connection.destroy(); + }); + + it('does nothing when online fires while already connected', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + mockWindow.dispatchEvent(new Event('online')); + + // Should not create a new socket + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + }); + + describe('reconnect attempts reset after exhaustion', () => { + function exhaustReconnectAttempts(startSocketIndex: number) { + jest.spyOn(Math, 'random').mockReturnValue(0); + + // Close 8 times without ever receiving a message to exhaust retries + for (let i = 0; i < 8; i++) { + const idx = startSocketIndex + i; + closeSocket(idx); + // Advance timers to trigger the next scheduled reconnect + jest.advanceTimersByTime(60_000); + } + } + + it('visibilitychange to visible resets counter and reconnects after exhausting retries', () => { + const { connection } = createTestConnection(); + connection.connect(); + + exhaustReconnectAttempts(0); + + // 1 initial + 8 reconnects = 9 sockets + const socketsAfterExhaustion = sockets.length; + + // Close the last socket to hit the max-attempts guard + closeSocket(sockets.length - 1); + jest.advanceTimersByTime(60_000); + + // No more sockets should be created (max attempts exceeded) + expect(sockets.length).toBe(socketsAfterExhaustion); + + // Now simulate tab becoming visible - should reset and reconnect + sockets[sockets.length - 1].readyState = 3; // WebSocket.CLOSED + simulateVisibilityChange('visible'); + + expect(sockets.length).toBe(socketsAfterExhaustion + 1); + + connection.destroy(); + }); + + it('online event resets counter and reconnects after exhausting retries', () => { + const { connection } = createTestConnection(); + connection.connect(); + + exhaustReconnectAttempts(0); + + const socketsAfterExhaustion = sockets.length; + + closeSocket(sockets.length - 1); + jest.advanceTimersByTime(60_000); + + expect(sockets.length).toBe(socketsAfterExhaustion); + + // online event should reset and reconnect + mockWindow.dispatchEvent(new Event('online')); + + expect(sockets.length).toBe(socketsAfterExhaustion + 1); + + connection.destroy(); + }); + }); + + describe('onReconnected vs onConnected', () => { + it('fires onConnected on first successful connection', () => { + const { connection, onConnected, onReconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + expect(onConnected).toHaveBeenCalledTimes(1); + expect(onReconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('fires onReconnected on subsequent connections after disconnect', () => { + jest.spyOn(Math, 'random').mockReturnValue(0); + + const { connection, onConnected, onReconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + expect(onConnected).toHaveBeenCalledTimes(1); + + // Disconnect and let it reconnect + closeSocket(0); + jest.advanceTimersByTime(60_000); + + // Second socket is now open - send a message to mark it connected + connectSocket(1); + + expect(onConnected).toHaveBeenCalledTimes(1); + expect(onReconnected).toHaveBeenCalledTimes(1); + + connection.destroy(); + }); + }); + + describe('proactive auth refresh on reconnect', () => { + it('calls refreshAuth before reconnecting after ping timeout', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + // Simulate tab hidden then visible — triggers ping + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + expect(sockets[0].send).toHaveBeenCalledWith('ping'); + + // Advance past ping timeout + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + // refreshAuth should be called to get a fresh ticket before reconnecting + expect(refreshAuth).toHaveBeenCalledTimes(1); + + // Allow the async refresh to complete + await Promise.resolve(); + await Promise.resolve(); + + // A new socket should be created after refresh + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('calls refreshAuth before reconnecting after BFCache restore', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + simulatePageshow(true); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('calls refreshAuth before reconnecting after online event while disconnected', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + // Don't connect — simulate being disconnected + closeSocket(0); + + // Clear the reconnect timer that was scheduled by closeSocket + jest.advanceTimersByTime(0); + + refreshAuth.mockClear(); + mockWindow.dispatchEvent(new Event('online')); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + // Should create a new socket after refresh + const socketsAfter = sockets.length; + expect(socketsAfter).toBeGreaterThan(1); + + connection.destroy(); + }); + + it('still reconnects if refreshAuth fails', async () => { + const refreshAuth = jest.fn(() => Promise.reject(new Error('refresh failed'))); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + // Should still create a new socket even if refresh failed + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('does not call refreshAuth when no refreshAuth is configured', () => { + const { connection } = createTestConnection(); // no refreshAuth + connection.connect(); + connectSocket(0); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + jest.advanceTimersByTime(PING_TIMEOUT_MS); + + // Should still create a new socket (direct connect, no refresh) + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + }); + + describe('listener lifecycle', () => { + it('destroy() removes visibilitychange, pageshow, and online listeners', () => { + const docRemoveSpy = jest.spyOn(mockDocument, 'removeEventListener'); + const winRemoveSpy = jest.spyOn(mockWindow, 'removeEventListener'); + + const { connection } = createTestConnection(); + connection.connect(); + + connection.destroy(); + + const docRemovedEvents = docRemoveSpy.mock.calls.map(call => call[0]); + const winRemovedEvents = winRemoveSpy.mock.calls.map(call => call[0]); + + expect(docRemovedEvents).toContain('visibilitychange'); + expect(winRemovedEvents).toContain('pageshow'); + expect(winRemovedEvents).toContain('online'); + }); + }); +}); diff --git a/src/lib/cloud-agent-sdk/base-connection.ts b/src/lib/cloud-agent-sdk/base-connection.ts index ef74bc536..4951c3f9b 100644 --- a/src/lib/cloud-agent-sdk/base-connection.ts +++ b/src/lib/cloud-agent-sdk/base-connection.ts @@ -6,6 +6,7 @@ export type BaseConnectionConfig = { onEvent: (payload: unknown) => void; onConnected: () => void; onDisconnected: () => void; + onReconnected?: () => void; onUnexpectedDisconnect?: () => void; onError?: (message: string) => void; isAuthFailure?: (event: CloseEvent) => boolean; @@ -22,6 +23,7 @@ export type Connection = { const MAX_RECONNECT_ATTEMPTS = 8; const BACKOFF_BASE_MS = 1000; const BACKOFF_CAP_MS = 30000; +export const PING_TIMEOUT_MS = 5000; // min(cap, base * 2^attempt) * (0.5 + random jitter) function calculateBackoffDelay(attempt: number): number { @@ -39,6 +41,13 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { let connected = false; let reconnectAttempt = 0; let generation = 0; + let hasConnectedOnce = false; + let pingTimeoutId: ReturnType | null = null; + + // Bound handler references for event listener cleanup + let boundVisibilityHandler: (() => void) | null = null; + let boundPageshowHandler: ((e: PageTransitionEvent) => void) | null = null; + let boundOnlineHandler: (() => void) | null = null; function clearReconnectTimer(): void { if (reconnectTimeoutId !== null) { @@ -47,6 +56,13 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { } } + function clearPingTimeout(): void { + if (pingTimeoutId !== null) { + clearTimeout(pingTimeoutId); + pingTimeoutId = null; + } + } + async function refreshAuthAndReconnect(expectedGeneration: number) { if (!config.refreshAuth) { return; @@ -67,6 +83,18 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { } } + async function refreshAndConnect(expectedGeneration: number): Promise { + if (config.refreshAuth) { + try { + await config.refreshAuth(); + } catch { + // Continue with existing auth — the old ticket might still work + } + if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; + } + connectInternal(0, expectedGeneration); + } + function scheduleReconnect(attempt: number, expectedGeneration: number) { if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; @@ -95,6 +123,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; reconnectAttempt = attempt; + clearPingTimeout(); // Close existing socket - clear reference first so onclose ignores it const oldWs = ws; @@ -115,6 +144,9 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { }; newWs.onmessage = (messageEvent: MessageEvent) => { + // Any incoming message cancels an active ping staleness check + clearPingTimeout(); + const parsed = config.parseMessage(messageEvent.data); if (parsed === null) { return; @@ -131,7 +163,12 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (!connected) { connected = true; - config.onConnected(); + if (hasConnectedOnce) { + config.onReconnected?.(); + } else { + hasConnectedOnce = true; + config.onConnected(); + } } config.onEvent(parsed.payload); @@ -202,6 +239,115 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { }; } + function handleVisibilityChange(): void { + if (destroyed || intentionalDisconnect) return; + + if (typeof document === 'undefined') return; + + if (document.visibilityState === 'hidden') { + clearPingTimeout(); + return; + } + + // Tab became visible + reconnectAttempt = 0; + + if (ws === null || ws.readyState !== WebSocket.OPEN) { + clearReconnectTimer(); + void refreshAndConnect(generation); + return; + } + + // Socket appears open - send a ping to verify it's not stale + ws.send('ping'); + const currentGeneration = generation; + pingTimeoutId = setTimeout(() => { + pingTimeoutId = null; + if (destroyed || intentionalDisconnect || currentGeneration !== generation) return; + // No message received in time - connection is stale + console.log('[Connection] Ping timeout - connection stale, reconnecting'); + const staleWs = ws; + if (staleWs !== null) { + ws = null; + staleWs.close(); + } + if (connected) { + connected = false; + config.onDisconnected(); + } + void refreshAndConnect(currentGeneration); + }, PING_TIMEOUT_MS); + } + + function handlePageshow(event: PageTransitionEvent): void { + if (destroyed || intentionalDisconnect) return; + + if (!event.persisted) return; + + // BFCache restore - WebSocket is guaranteed dead + console.log('[Connection] BFCache restore detected, forcing reconnect'); + reconnectAttempt = 0; + clearReconnectTimer(); + clearPingTimeout(); + + const staleWs = ws; + if (staleWs !== null) { + ws = null; + staleWs.close(); + } + if (connected) { + connected = false; + config.onDisconnected(); + } + void refreshAndConnect(generation); + } + + function handleOnline(): void { + if (destroyed || intentionalDisconnect) return; + + // If already connected with an open socket, nothing to do + if (connected && ws !== null && ws.readyState === WebSocket.OPEN) return; + + console.log('[Connection] Online event - reconnecting'); + reconnectAttempt = 0; + clearReconnectTimer(); + void refreshAndConnect(generation); + } + + function addEventListeners(): void { + if (typeof document !== 'undefined' && boundVisibilityHandler === null) { + boundVisibilityHandler = handleVisibilityChange; + document.addEventListener('visibilitychange', boundVisibilityHandler); + } + if (typeof window !== 'undefined') { + if (boundPageshowHandler === null) { + boundPageshowHandler = handlePageshow; + window.addEventListener('pageshow', boundPageshowHandler); + } + if (boundOnlineHandler === null) { + boundOnlineHandler = handleOnline; + window.addEventListener('online', boundOnlineHandler); + } + } + } + + function removeEventListeners(): void { + if (typeof document !== 'undefined' && boundVisibilityHandler !== null) { + document.removeEventListener('visibilitychange', boundVisibilityHandler); + boundVisibilityHandler = null; + } + if (typeof window !== 'undefined') { + if (boundPageshowHandler !== null) { + window.removeEventListener('pageshow', boundPageshowHandler); + boundPageshowHandler = null; + } + if (boundOnlineHandler !== null) { + window.removeEventListener('online', boundOnlineHandler); + boundOnlineHandler = null; + } + } + } + function connect() { console.log('[Connection] connect() called - resetting state'); intentionalDisconnect = false; @@ -209,8 +355,11 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { authRefreshAttempted = false; connected = false; reconnectAttempt = 0; + hasConnectedOnce = false; generation += 1; clearReconnectTimer(); + clearPingTimeout(); + addEventListeners(); connectInternal(0, generation); } @@ -219,6 +368,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); + clearPingTimeout(); if (ws !== null) { ws.close(); @@ -236,6 +386,8 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); + clearPingTimeout(); + removeEventListeners(); if (ws !== null) { ws.close(); diff --git a/src/lib/cloud-agent-sdk/cli-live-transport.test.ts b/src/lib/cloud-agent-sdk/cli-live-transport.test.ts index ebfc669b5..e488d738d 100644 --- a/src/lib/cloud-agent-sdk/cli-live-transport.test.ts +++ b/src/lib/cloud-agent-sdk/cli-live-transport.test.ts @@ -1268,3 +1268,178 @@ describe('CliLiveTransport onError callback', () => { expect(onError).toHaveBeenCalledWith('snapshot fetch failed'); }); }); + +// --------------------------------------------------------------------------- +// Snapshot refetch on reconnect +// --------------------------------------------------------------------------- + +describe('CliLiveTransport snapshot refetch on reconnect', () => { + const validInboundMessage = { + type: 'event', + sessionId: KILO_SESSION_ID, + event: 'session.status', + data: { sessionID: KILO_SESSION_ID, status: { type: 'busy' } }, + }; + + function triggerReconnect(ws: MockWebSocket): void { + ws.onclose?.({ code: 1006 } as CloseEvent); + jest.advanceTimersByTime(60_000); + } + + function getLatestMockWs(): MockWebSocket { + return webSocketConstructor.mock.results.at(-1)?.value as MockWebSocket; + } + + function sendInboundOn(ws: MockWebSocket, msg: Record): void { + ws.onmessage?.({ data: JSON.stringify(msg) } as MessageEvent); + } + + it('refetches snapshot on reconnect and replays into sinks', async () => { + jest.useFakeTimers(); + try { + const snapshot: SessionSnapshot = makeSnapshot({ id: KILO_SESSION_ID }, [ + { + info: stubUserMessage({ id: 'msg-1', sessionID: KILO_SESSION_ID }), + parts: [stubTextPart({ id: 'part-1', sessionID: KILO_SESSION_ID, messageID: 'msg-1' })], + }, + ]); + + const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); + const { transport, chatEvents, serviceEvents } = createTransportWithSinks({ fetchSnapshot }); + + transport.connect(); + + // Wait for initial snapshot fetch to resolve + await jest.advanceTimersByTimeAsync(0); + + // Open WS and mark connection as established with a valid message + openConnection(); + sendInbound(validInboundMessage); + + // Record event counts after initial snapshot + establishment message + const chatCountAfterInit = chatEvents.length; + const serviceCountAfterInit = serviceEvents.length; + + // Trigger reconnect: close with non-auth code, advance past backoff + triggerReconnect(mockWs); + + // Open new WS and send valid message to trigger onReconnected + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + // Flush promises for the async snapshot refetch + await jest.advanceTimersByTimeAsync(0); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(fetchSnapshot).toHaveBeenCalledWith(KILO_SESSION_ID); + + // Snapshot replay adds: 1 session.created + 1 message.updated + 1 message.part.updated + // The valid inbound message also adds 1 session.status service event + expect(serviceEvents.length).toBeGreaterThan(serviceCountAfterInit); + expect(chatEvents.length).toBeGreaterThan(chatCountAfterInit); + + // Verify the replayed snapshot events are present + const sessionCreatedEvents = serviceEvents.filter(e => e.type === 'session.created'); + expect(sessionCreatedEvents).toHaveLength(2); // initial + reconnect + + const messageUpdatedEvents = chatEvents.filter(e => e.type === 'message.updated'); + expect(messageUpdatedEvents).toHaveLength(2); // initial + reconnect + + const partUpdatedEvents = chatEvents.filter(e => e.type === 'message.part.updated'); + expect(partUpdatedEvents).toHaveLength(2); // initial + reconnect + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('reconnect works without fetchSnapshot configured (no replay)', async () => { + jest.useFakeTimers(); + try { + const { transport, serviceEvents } = createTransportWithSinks(); + + transport.connect(); + openConnection(); + sendInbound(validInboundMessage); + + const serviceCountAfterInit = serviceEvents.length; + + triggerReconnect(mockWs); + + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + await jest.advanceTimersByTimeAsync(0); + + // No snapshot replay, but the valid inbound message on the new WS still routes + const sessionStatusEvents = serviceEvents.filter(e => e.type === 'session.status'); + expect(sessionStatusEvents.length).toBeGreaterThan(0); + + // No session.created events (no snapshot) + const sessionCreatedEvents = serviceEvents.filter(e => e.type === 'session.created'); + expect(sessionCreatedEvents).toHaveLength(0); + + // Verify no errors — service events grew from the reconnected inbound message + expect(serviceEvents.length).toBeGreaterThan(serviceCountAfterInit); + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('snapshot fetch failure on reconnect is non-fatal', async () => { + jest.useFakeTimers(); + try { + let callCount = 0; + const snapshot: SessionSnapshot = makeSnapshot({ id: KILO_SESSION_ID }); + const fetchSnapshot = jest.fn(() => { + callCount++; + if (callCount === 1) return Promise.resolve(snapshot); + return Promise.reject(new Error('network error')); + }); + const onError = jest.fn(); + + const { transport, serviceEvents } = createTransportWithSinks({ fetchSnapshot, onError }); + + transport.connect(); + await jest.advanceTimersByTimeAsync(0); + + openConnection(); + sendInbound(validInboundMessage); + + // Trigger reconnect + triggerReconnect(mockWs); + + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + // Flush promises — snapshot refetch rejects + await jest.advanceTimersByTimeAsync(0); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + + // No error propagated to onError (reconnect snapshot failure is silently swallowed) + expect(onError).not.toHaveBeenCalled(); + + // Transport still works — send another event on the new WS + sendInboundOn(newMockWs, { + type: 'event', + sessionId: KILO_SESSION_ID, + event: 'session.status', + data: { sessionID: KILO_SESSION_ID, status: { type: 'idle' } }, + }); + + const statusEvents = serviceEvents.filter(e => e.type === 'session.status'); + expect(statusEvents.length).toBeGreaterThanOrEqual(2); + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); +}); diff --git a/src/lib/cloud-agent-sdk/cli-live-transport.ts b/src/lib/cloud-agent-sdk/cli-live-transport.ts index 9fc91b97b..a196bd00d 100644 --- a/src/lib/cloud-agent-sdk/cli-live-transport.ts +++ b/src/lib/cloud-agent-sdk/cli-live-transport.ts @@ -172,6 +172,19 @@ function createCliLiveTransport(config: CliLiveTransportConfig): TransportFactor ws.send(JSON.stringify({ type: 'subscribe', sessionId: config.kiloSessionId })); }, onConnected: () => {}, + onReconnected: () => { + if (expectedGeneration !== generation) return; + if (!config.fetchSnapshot) return; + void config.fetchSnapshot(config.kiloSessionId).then( + snapshot => { + if (expectedGeneration !== generation) return; + replaySnapshot(snapshot); + }, + () => { + // Snapshot refetch failure on reconnect is non-fatal + } + ); + }, onDisconnected: () => {}, onError: config.onError, isAuthFailure: (event: CloseEvent) => event.code === 4001 || event.code === 1008, diff --git a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts index 8bdcac276..4a02d542d 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts @@ -13,6 +13,7 @@ export type ConnectionConfig = { onConnected: () => void; onDisconnected: () => void; onUnexpectedDisconnect?: () => void; + onReconnected?: () => void; onError?: (error: StreamError) => void; onRefreshTicket?: () => Promise; heartbeatTimeoutMs?: number; @@ -74,6 +75,7 @@ export function createConnection(config: ConnectionConfig): Connection { onConnected: config.onConnected, onDisconnected: config.onDisconnected, onUnexpectedDisconnect: config.onUnexpectedDisconnect, + onReconnected: config.onReconnected, onError: config.onError ? message => config.onError?.({ diff --git a/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts b/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts index 07600c0e5..4dd5d80f3 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts @@ -386,3 +386,163 @@ describe('CloudAgentTransport command delegation', () => { transport.destroy(); }); }); + +// --------------------------------------------------------------------------- +// Snapshot refetch on reconnect +// --------------------------------------------------------------------------- + +describe('CloudAgentTransport snapshot refetch on reconnect', () => { + // Microtask-based flush that works under jest.useFakeTimers() + // (unlike flushPromises which uses setTimeout and hangs with fake timers) + async function flushMicrotasks(): Promise { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } + } + + function createTransportWithControllableSnapshot( + snapshotOverride?: ReturnType + ) { + const chatEvents: ChatEvent[] = []; + const serviceEvents: ServiceEvent[] = []; + const snapshot = snapshotOverride ?? emptySnapshot; + const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); + + const factory = createCloudAgentTransport({ + sessionId: cloudAgentId('ses-1'), + kiloSessionId: kiloId('ses-1'), + api: createMockApi(), + getTicket: () => 'test-ticket', + fetchSnapshot, + websocketBaseUrl: 'ws://localhost:9999', + }); + + const transport = factory({ + onChatEvent: event => chatEvents.push(event), + onServiceEvent: event => serviceEvents.push(event), + }); + + return { transport, chatEvents, serviceEvents, fetchSnapshot }; + } + + function sendRawOn(ws: MockWebSocket, event: CloudAgentEvent): void { + ws.onmessage?.({ data: JSON.stringify(event) } as MessageEvent); + } + + /** Establish connection, simulate close + reconnect, return the new WS mock. */ + async function simulateReconnect(): Promise { + mockWs.onclose?.({ code: 1006, reason: '', wasClean: false } as CloseEvent); + + jest.advanceTimersByTime(2000); + await flushMicrotasks(); + + const newMockWs = webSocketConstructor.mock.results.at(-1)?.value as MockWebSocket; + + newMockWs.onopen?.(new Event('open')); + sendRawOn( + newMockWs, + kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } }) + ); + + return newMockWs; + } + + it('refetches snapshot on reconnect and replays events into sinks', async () => { + jest.useFakeTimers(); + try { + const { transport, serviceEvents, fetchSnapshot } = createTransportWithControllableSnapshot(); + + transport.connect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + + // Establish connection in base-connection by sending a valid event + sendRaw(kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } })); + + const serviceCountBefore = serviceEvents.length; + + const newMockWs = await simulateReconnect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + + const replayedCreated = serviceEvents + .slice(serviceCountBefore) + .filter(e => e.type === 'session.created'); + expect(replayedCreated).toHaveLength(1); + + transport.destroy(); + newMockWs.onclose?.({ code: 1000, reason: '', wasClean: true } as CloseEvent); + } finally { + jest.useRealTimers(); + } + }); + + it('replayed snapshot with messages upserts into sinks correctly', async () => { + jest.useFakeTimers(); + try { + const snapshotWithMessages = makeSnapshot({ id: 'ses-1' }, [ + { + info: { + id: 'msg-1', + sessionID: 'ses-1', + role: 'user', + time: { created: 1 }, + agent: 'build', + model: { providerID: 'a', modelID: 'b' }, + }, + parts: [ + { + id: 'part-1', + sessionID: 'ses-1', + messageID: 'msg-1', + type: 'text', + text: 'hello', + }, + ], + }, + ]); + + const { transport, chatEvents, serviceEvents, fetchSnapshot } = + createTransportWithControllableSnapshot(snapshotWithMessages); + + transport.connect(); + await flushMicrotasks(); + + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(1); + expect(chatEvents.filter(e => e.type === 'message.updated')).toHaveLength(1); + expect(chatEvents.filter(e => e.type === 'message.part.updated')).toHaveLength(1); + + // Establish connection + sendRaw(kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } })); + + const newMockWs = await simulateReconnect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(2); + expect(chatEvents.filter(e => e.type === 'message.updated')).toHaveLength(2); + expect(chatEvents.filter(e => e.type === 'message.part.updated')).toHaveLength(2); + + transport.destroy(); + newMockWs.onclose?.({ code: 1000, reason: '', wasClean: true } as CloseEvent); + } finally { + jest.useRealTimers(); + } + }); + + it('initial connect fetches snapshot once and opens WebSocket', async () => { + const { transport, serviceEvents, fetchSnapshot } = createTransportWithControllableSnapshot(); + + transport.connect(); + await flushPromises(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + expect(fetchSnapshot).toHaveBeenCalledWith('ses-1'); + expect(webSocketConstructor).toHaveBeenCalledTimes(1); + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(1); + + transport.destroy(); + }); +}); diff --git a/src/lib/cloud-agent-sdk/cloud-agent-transport.ts b/src/lib/cloud-agent-sdk/cloud-agent-transport.ts index 102738226..0927c8de4 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-transport.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-transport.ts @@ -85,6 +85,19 @@ function createCloudAgentTransport(config: CloudAgentTransportConfig): Transport } }, onConnected: () => {}, + onReconnected: () => { + if (expectedGeneration !== lifecycleGeneration) return; + stoppedReceived = false; + void config.fetchSnapshot(config.kiloSessionId).then( + snapshot => { + if (expectedGeneration !== lifecycleGeneration) return; + replaySnapshot(snapshot); + }, + () => { + // Snapshot refetch failure on reconnect — ignore, live events will still flow + } + ); + }, onDisconnected: () => {}, onUnexpectedDisconnect: () => { if (expectedGeneration !== lifecycleGeneration) return; diff --git a/src/lib/cloud-agent-sdk/session-routing.test.ts b/src/lib/cloud-agent-sdk/session-routing.test.ts index f0973e644..b88864de7 100644 --- a/src/lib/cloud-agent-sdk/session-routing.test.ts +++ b/src/lib/cloud-agent-sdk/session-routing.test.ts @@ -201,6 +201,66 @@ describe('session transport routing', () => { }); }); + describe('resolveSession returning completed Cloud Agent session', () => { + it('routes to historical transport when cloudAgentSessionId exists but isLive is false', async () => { + const snapshot = makeSnapshot({ id: SES_ID }, [ + { + info: stubUserMessage({ id: 'msg-1', sessionID: SES_ID }), + parts: [ + stubTextPart({ id: 'part-1', messageID: 'msg-1', sessionID: SES_ID, text: 'hi' }), + ], + }, + ]); + + const resolveSession = jest.fn( + (): Promise => + Promise.resolve({ + kiloSessionId: kiloId('ses-1'), + cloudAgentSessionId: cloudAgentId('do-456'), + isLive: false, + }) + ); + + const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); + + const session = createCloudAgentSession({ + kiloSessionId: kiloId('ses-1'), + resolveSession, + transport: { + fetchSnapshot, + getTicket: () => 'ticket', + api: { + send: () => Promise.resolve(), + interrupt: () => Promise.resolve(), + answer: () => Promise.resolve(), + reject: () => Promise.resolve(), + respondToPermission: () => Promise.resolve(), + }, + }, + }); + + session.connect(); + await Promise.resolve(); + await Promise.resolve(); + + // Should NOT open a WebSocket — historical transport is snapshot-only + expect(webSocketConstructor).not.toHaveBeenCalled(); + + // Session info set from snapshot + expect(session.state.getSessionInfo()).toEqual({ id: 'ses-1', parentID: undefined }); + + // Messages in storage + const messageIds = session.storage.getMessageIds(); + expect(messageIds).toContain('msg-1'); + + // Historical session should not be interactive + expect(session.canSend).toBe(false); + expect(session.canInterrupt).toBe(false); + + session.destroy(); + }); + }); + describe('resolveSession failure', () => { it('sets error state and fires onError', async () => { const onError = jest.fn(); @@ -410,7 +470,7 @@ describe('session transport routing', () => { await Promise.resolve(); expect(onError).toHaveBeenCalledWith( - 'CloudAgentSession transport.fetchSnapshot is required for historical CLI sessions' + 'CloudAgentSession transport.fetchSnapshot is required for historical sessions' ); expect(session.state.getActivity()).toEqual({ type: 'idle' }); expect(session.state.getStatus().type).toBe('error'); diff --git a/src/lib/cloud-agent-sdk/session.ts b/src/lib/cloud-agent-sdk/session.ts index aa89e7b75..c9b04f4cd 100644 --- a/src/lib/cloud-agent-sdk/session.ts +++ b/src/lib/cloud-agent-sdk/session.ts @@ -142,7 +142,7 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes function pickTransportFactory(resolved: ResolvedSession): TransportFactory { console.log('[cli-debug] pickTransportFactory: resolved=%o', resolved); - if (resolved.cloudAgentSessionId) { + if (resolved.cloudAgentSessionId && resolved.isLive) { if (!config.transport.getTicket) { throw new Error( 'CloudAgentSession transport.getTicket is required for Cloud Agent sessions' @@ -193,11 +193,11 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes if (!config.transport.fetchSnapshot) { throw new Error( - 'CloudAgentSession transport.fetchSnapshot is required for historical CLI sessions' + 'CloudAgentSession transport.fetchSnapshot is required for historical sessions' ); } console.log( - '[cli-debug] pickTransportFactory: → CLI Historical transport (kiloSessionId=%s)', + '[cli-debug] pickTransportFactory: → Historical transport (kiloSessionId=%s)', resolved.kiloSessionId ); return createCliHistoricalTransport({ From ec6ea1e5a77cb59d2bd94a790db1ae6e359a3289 Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Thu, 2 Apr 2026 17:30:00 +0200 Subject: [PATCH 2/4] fix(cloud-agent-sdk): replace client ping with passive staleness detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback on stale WebSocket recovery: - Remove ws.send('ping') — server never responds; staleness detection now relies on server heartbeats canceling the timeout - Make staleness timeout configurable (stalenessTimeoutMs) so the transport layer that knows the heartbeat interval controls the value - Increase default from 5s to 30s to exceed server heartbeat intervals - Track lastMessageTime to skip the check when a recent message proves the connection is alive - Wire heartbeatTimeoutMs through cloud-agent-connection - disconnect() now removes visibility/pageshow/online listeners, fixing a leak when transports disconnect without calling destroy() --- .../cloud-agent-sdk/base-connection.test.ts | 86 ++++++++++++++----- src/lib/cloud-agent-sdk/base-connection.ts | 54 +++++++----- .../cloud-agent-sdk/cloud-agent-connection.ts | 1 + 3 files changed, 97 insertions(+), 44 deletions(-) diff --git a/src/lib/cloud-agent-sdk/base-connection.test.ts b/src/lib/cloud-agent-sdk/base-connection.test.ts index b21e83dd7..19e5d8bd7 100644 --- a/src/lib/cloud-agent-sdk/base-connection.test.ts +++ b/src/lib/cloud-agent-sdk/base-connection.test.ts @@ -1,6 +1,6 @@ import { createBaseConnection, - PING_TIMEOUT_MS, + DEFAULT_STALENESS_TIMEOUT_MS, type BaseConnectionConfig, } from './base-connection'; @@ -145,20 +145,22 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.destroy(); }); - it('sends ping and reconnects if no message within timeout when tab becomes visible with open WS', () => { + it('reconnects if no server message within timeout when tab becomes visible with open WS', () => { const { connection, onDisconnected } = createTestConnection(); connection.connect(); connectSocket(0); + // Advance past the recency window so the staleness check fires + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + simulateVisibilityChange('hidden'); simulateVisibilityChange('visible'); - // Socket is OPEN, so it should send a ping rather than immediately reconnecting - expect(sockets[0].send).toHaveBeenCalledWith('ping'); + // Socket is OPEN but last message is stale — timeout is armed expect(sockets).toHaveLength(1); - // Advance past the ping timeout - jest.advanceTimersByTime(PING_TIMEOUT_MS); + // Advance past the staleness timeout + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); // Should have closed the stale socket and created a new one expect(sockets[0].close).toHaveBeenCalled(); @@ -168,21 +170,39 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.destroy(); }); - it('cancels ping timeout if a message arrives before deadline', () => { + it('skips staleness check when a message was received recently', () => { const { connection, onDisconnected } = createTestConnection(); connection.connect(); connectSocket(0); + // Do NOT advance time — last message is within the recency window simulateVisibilityChange('hidden'); simulateVisibilityChange('visible'); - expect(sockets[0].send).toHaveBeenCalledWith('ping'); + // Advance past the timeout — nothing should happen + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('cancels staleness timeout if a server message arrives before deadline', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); // Receive a message before the timeout fires sockets[0].onmessage?.({ data: 'server-reply' } as MessageEvent); - // Advance past the ping timeout - should NOT trigger reconnect - jest.advanceTimersByTime(PING_TIMEOUT_MS); + // Advance past the timeout - should NOT trigger reconnect + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); expect(sockets).toHaveLength(1); expect(onDisconnected).not.toHaveBeenCalled(); @@ -190,20 +210,21 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.destroy(); }); - it('clears ping timeout when tab is hidden', () => { + it('clears staleness timeout when tab is hidden', () => { const { connection, onDisconnected } = createTestConnection(); connection.connect(); connectSocket(0); - // Tab visible → sends ping + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + // Tab visible → arms staleness timeout simulateVisibilityChange('visible'); - expect(sockets[0].send).toHaveBeenCalledWith('ping'); - // Tab hidden → should clear the ping timeout + // Tab hidden → should clear the timeout simulateVisibilityChange('hidden'); - // Advance past the ping timeout - nothing should happen - jest.advanceTimersByTime(PING_TIMEOUT_MS); + // Advance past the timeout - nothing should happen + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); expect(sockets).toHaveLength(1); expect(onDisconnected).not.toHaveBeenCalled(); @@ -371,19 +392,19 @@ describe('createBaseConnection – stale WebSocket recovery', () => { }); describe('proactive auth refresh on reconnect', () => { - it('calls refreshAuth before reconnecting after ping timeout', async () => { + it('calls refreshAuth before reconnecting after staleness timeout', async () => { const refreshAuth = jest.fn(() => Promise.resolve()); const { connection } = createTestConnection({ refreshAuth }); connection.connect(); connectSocket(0); - // Simulate tab hidden then visible — triggers ping + // Make lastMessageTime stale, then trigger visibility check + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); simulateVisibilityChange('hidden'); simulateVisibilityChange('visible'); - expect(sockets[0].send).toHaveBeenCalledWith('ping'); - // Advance past ping timeout - jest.advanceTimersByTime(PING_TIMEOUT_MS); + // Advance past staleness timeout + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); // refreshAuth should be called to get a fresh ticket before reconnecting expect(refreshAuth).toHaveBeenCalledTimes(1); @@ -447,10 +468,11 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.connect(); connectSocket(0); + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); simulateVisibilityChange('hidden'); simulateVisibilityChange('visible'); - jest.advanceTimersByTime(PING_TIMEOUT_MS); + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); expect(refreshAuth).toHaveBeenCalledTimes(1); @@ -468,10 +490,11 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.connect(); connectSocket(0); + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); simulateVisibilityChange('hidden'); simulateVisibilityChange('visible'); - jest.advanceTimersByTime(PING_TIMEOUT_MS); + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); // Should still create a new socket (direct connect, no refresh) expect(sockets).toHaveLength(2); @@ -497,5 +520,22 @@ describe('createBaseConnection – stale WebSocket recovery', () => { expect(winRemovedEvents).toContain('pageshow'); expect(winRemovedEvents).toContain('online'); }); + + it('disconnect() removes visibilitychange, pageshow, and online listeners', () => { + const docRemoveSpy = jest.spyOn(mockDocument, 'removeEventListener'); + const winRemoveSpy = jest.spyOn(mockWindow, 'removeEventListener'); + + const { connection } = createTestConnection(); + connection.connect(); + + connection.disconnect(); + + const docRemovedEvents = docRemoveSpy.mock.calls.map(call => call[0]); + const winRemovedEvents = winRemoveSpy.mock.calls.map(call => call[0]); + + expect(docRemovedEvents).toContain('visibilitychange'); + expect(winRemovedEvents).toContain('pageshow'); + expect(winRemovedEvents).toContain('online'); + }); }); }); diff --git a/src/lib/cloud-agent-sdk/base-connection.ts b/src/lib/cloud-agent-sdk/base-connection.ts index 4951c3f9b..92d3ba5f8 100644 --- a/src/lib/cloud-agent-sdk/base-connection.ts +++ b/src/lib/cloud-agent-sdk/base-connection.ts @@ -12,6 +12,9 @@ export type BaseConnectionConfig = { isAuthFailure?: (event: CloseEvent) => boolean; refreshAuth?: () => Promise; onOpen?: (ws: WebSocket) => void; + /** How long to wait for a server message (e.g. heartbeat) on tab resume before + * treating the connection as stale. Should exceed the server's heartbeat interval. */ + stalenessTimeoutMs?: number; }; export type Connection = { @@ -23,7 +26,7 @@ export type Connection = { const MAX_RECONNECT_ATTEMPTS = 8; const BACKOFF_BASE_MS = 1000; const BACKOFF_CAP_MS = 30000; -export const PING_TIMEOUT_MS = 5000; +export const DEFAULT_STALENESS_TIMEOUT_MS = 30_000; // min(cap, base * 2^attempt) * (0.5 + random jitter) function calculateBackoffDelay(attempt: number): number { @@ -42,7 +45,9 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { let reconnectAttempt = 0; let generation = 0; let hasConnectedOnce = false; - let pingTimeoutId: ReturnType | null = null; + let stalenessTimeoutId: ReturnType | null = null; + let lastMessageTime = 0; + const stalenessTimeoutMs = config.stalenessTimeoutMs ?? DEFAULT_STALENESS_TIMEOUT_MS; // Bound handler references for event listener cleanup let boundVisibilityHandler: (() => void) | null = null; @@ -56,10 +61,10 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { } } - function clearPingTimeout(): void { - if (pingTimeoutId !== null) { - clearTimeout(pingTimeoutId); - pingTimeoutId = null; + function clearStalenessTimeout(): void { + if (stalenessTimeoutId !== null) { + clearTimeout(stalenessTimeoutId); + stalenessTimeoutId = null; } } @@ -123,7 +128,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; reconnectAttempt = attempt; - clearPingTimeout(); + clearStalenessTimeout(); // Close existing socket - clear reference first so onclose ignores it const oldWs = ws; @@ -144,8 +149,9 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { }; newWs.onmessage = (messageEvent: MessageEvent) => { - // Any incoming message cancels an active ping staleness check - clearPingTimeout(); + // Any incoming message cancels an active staleness check + clearStalenessTimeout(); + lastMessageTime = Date.now(); const parsed = config.parseMessage(messageEvent.data); if (parsed === null) { @@ -245,7 +251,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (typeof document === 'undefined') return; if (document.visibilityState === 'hidden') { - clearPingTimeout(); + clearStalenessTimeout(); return; } @@ -258,14 +264,18 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { return; } - // Socket appears open - send a ping to verify it's not stale - ws.send('ping'); + // If a message arrived recently, the connection is verified alive + if (Date.now() - lastMessageTime < stalenessTimeoutMs) { + return; + } + + // Socket appears open but no recent message — wait for the next server + // heartbeat to confirm liveness; if nothing arrives, treat as stale. const currentGeneration = generation; - pingTimeoutId = setTimeout(() => { - pingTimeoutId = null; + stalenessTimeoutId = setTimeout(() => { + stalenessTimeoutId = null; if (destroyed || intentionalDisconnect || currentGeneration !== generation) return; - // No message received in time - connection is stale - console.log('[Connection] Ping timeout - connection stale, reconnecting'); + console.log('[Connection] Staleness timeout - no server message, reconnecting'); const staleWs = ws; if (staleWs !== null) { ws = null; @@ -276,7 +286,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { config.onDisconnected(); } void refreshAndConnect(currentGeneration); - }, PING_TIMEOUT_MS); + }, stalenessTimeoutMs); } function handlePageshow(event: PageTransitionEvent): void { @@ -288,7 +298,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { console.log('[Connection] BFCache restore detected, forcing reconnect'); reconnectAttempt = 0; clearReconnectTimer(); - clearPingTimeout(); + clearStalenessTimeout(); const staleWs = ws; if (staleWs !== null) { @@ -356,9 +366,10 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { connected = false; reconnectAttempt = 0; hasConnectedOnce = false; + lastMessageTime = 0; generation += 1; clearReconnectTimer(); - clearPingTimeout(); + clearStalenessTimeout(); addEventListeners(); connectInternal(0, generation); } @@ -368,7 +379,8 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); - clearPingTimeout(); + clearStalenessTimeout(); + removeEventListeners(); if (ws !== null) { ws.close(); @@ -386,7 +398,7 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); - clearPingTimeout(); + clearStalenessTimeout(); removeEventListeners(); if (ws !== null) { diff --git a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts index 4a02d542d..12c697cfe 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts @@ -60,6 +60,7 @@ export function createConnection(config: ConnectionConfig): Connection { const refreshTicket = config.onRefreshTicket; return createBaseConnection({ + stalenessTimeoutMs: config.heartbeatTimeoutMs, buildUrl: () => { const url = new URL(config.websocketUrl); url.searchParams.set('ticket', currentTicket); From 816b6aa58b5dc08f0ff55125bbd40b1c378eddf6 Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Thu, 2 Apr 2026 21:24:02 +0200 Subject: [PATCH 3/4] fix(cloud-agent-sdk): reset staleness clock when creating a replacement socket --- .../cloud-agent-sdk/base-connection.test.ts | 32 +++++++++++++++++++ src/lib/cloud-agent-sdk/base-connection.ts | 3 ++ 2 files changed, 35 insertions(+) diff --git a/src/lib/cloud-agent-sdk/base-connection.test.ts b/src/lib/cloud-agent-sdk/base-connection.test.ts index 19e5d8bd7..a8d20970e 100644 --- a/src/lib/cloud-agent-sdk/base-connection.test.ts +++ b/src/lib/cloud-agent-sdk/base-connection.test.ts @@ -170,6 +170,38 @@ describe('createBaseConnection – stale WebSocket recovery', () => { connection.destroy(); }); + it('anchors staleness clock to the new socket after reconnect', () => { + jest.spyOn(Math, 'random').mockReturnValue(0); + + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Advance time so old socket's lastMessageTime becomes stale + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS + 1000); + + // Force a reconnect via socket close + minimal backoff. + // With Math.random()=0, backoff for attempt 0 = 500ms. + closeSocket(0); + jest.advanceTimersByTime(500); + + // New socket created — connectInternal resets lastMessageTime to Date.now() + expect(sockets).toHaveLength(2); + + // Tab becomes visible immediately after new socket is created. + // Without the lastMessageTime reset, the old socket's stale timestamp + // would cause a spurious staleness-timeout reconnect here. + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Advance past the staleness window — no extra reconnect should occur + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + expect(sockets).toHaveLength(2); + expect(onDisconnected).toHaveBeenCalledTimes(1); // only from the first close + + connection.destroy(); + }); + it('skips staleness check when a message was received recently', () => { const { connection, onDisconnected } = createTestConnection(); connection.connect(); diff --git a/src/lib/cloud-agent-sdk/base-connection.ts b/src/lib/cloud-agent-sdk/base-connection.ts index 92d3ba5f8..14d091db5 100644 --- a/src/lib/cloud-agent-sdk/base-connection.ts +++ b/src/lib/cloud-agent-sdk/base-connection.ts @@ -129,6 +129,9 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { reconnectAttempt = attempt; clearStalenessTimeout(); + // Anchor the staleness clock to this socket so visibility checks don't + // inherit timing from a previous connection. + lastMessageTime = Date.now(); // Close existing socket - clear reference first so onclose ignores it const oldWs = ws; From 99cfaf440d00f31e445c440723d73a4d77a5b67e Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Thu, 2 Apr 2026 22:31:55 +0200 Subject: [PATCH 4/4] refactor(cloud-agent-sdk): replace ResolvedSession struct with discriminated union Replace the flat { cloudAgentSessionId, isLive } shape with a discriminated union ('remote' | 'cloud-agent' | 'read-only') so transport routing is explicit and type-safe. Simplify session resolution in CloudAgentProvider by removing the runtime-state liveness check. Add runtime exhaustive check in pickTransportFactory. --- .../cloud-agent-next/CloudAgentProvider.tsx | 59 +++------ .../cloud-agent-sdk/session-manager.test.ts | 2 +- src/lib/cloud-agent-sdk/session-manager.ts | 10 +- src/lib/cloud-agent-sdk/session-phase.test.ts | 4 +- .../cloud-agent-sdk/session-routing.test.ts | 96 +++----------- .../cloud-agent-sdk/session-transport.test.ts | 52 ++++---- src/lib/cloud-agent-sdk/session.ts | 118 +++++++++--------- src/lib/cloud-agent-sdk/types.ts | 9 +- 8 files changed, 131 insertions(+), 219 deletions(-) diff --git a/src/components/cloud-agent-next/CloudAgentProvider.tsx b/src/components/cloud-agent-next/CloudAgentProvider.tsx index 266cd1ad6..9204b09ca 100644 --- a/src/components/cloud-agent-next/CloudAgentProvider.tsx +++ b/src/components/cloud-agent-next/CloudAgentProvider.tsx @@ -37,59 +37,32 @@ export function CloudAgentProvider({ children, organizationId }: CloudAgentProvi store: storeRef.current, resolveSession: async (kiloSessionId: KiloSessionId): Promise => { + // 1. Check if the session is in the active sessions list (remote CLI) + try { + const active = await trpcClient.activeSessions.list.query(); + if (active.sessions.some(s => s.id === kiloSessionId)) { + return { type: 'remote', kiloSessionId }; + } + } catch { + // Active sessions unavailable — fall through to other checks + } + + // 2. Check if the session has a cloud agent session ID try { const session = await trpcClient.cliSessionsV2.get.query({ session_id: kiloSessionId }); if (session.cloud_agent_session_id) { - let isLive = true; - try { - const withState = await trpcClient.cliSessionsV2.getWithRuntimeState.query({ - session_id: kiloSessionId, - }); - const rs = withState.runtimeState; - const executionStatus = rs?.execution?.status; - if ( - executionStatus === 'completed' || - executionStatus === 'failed' || - executionStatus === 'interrupted' - ) { - // Terminal execution status — session is done. - isLive = false; - } else if (executionStatus === 'pending' || executionStatus === 'running') { - // Active execution — session is live. - isLive = true; - } else { - // execution is null: either pre-execution (not yet initiated) or - // post-execution (DO cleaned up the execution record). - // If initiatedAt is set, the session ran and finished → not live. - // If initiatedAt is not set, the session is still being prepared → live. - isLive = !rs?.initiatedAt; - } - } catch { - // If we can't determine runtime state, assume live (safer — will just open a WebSocket) - } return { + type: 'cloud-agent', kiloSessionId, cloudAgentSessionId: session.cloud_agent_session_id as CloudAgentSessionId, - isLive, }; } - // CLI session — check if live - let isLive = false; - try { - const active = await trpcClient.activeSessions.list.query(); - isLive = active.sessions.some(s => s.id === kiloSessionId); - } catch { - /* not live */ - } - return { kiloSessionId, cloudAgentSessionId: null, isLive }; } catch { - // Not found — treat as cloud agent session ID directly (backward compat) - return { - kiloSessionId, - cloudAgentSessionId: kiloSessionId as unknown as CloudAgentSessionId, - isLive: true, - }; + // Session not found — fall through to read-only } + + // 3. Fallback: read-only historical session + return { type: 'read-only', kiloSessionId }; }, getTicket: async (sessionId: CloudAgentSessionId): Promise => { diff --git a/src/lib/cloud-agent-sdk/session-manager.test.ts b/src/lib/cloud-agent-sdk/session-manager.test.ts index 89dc7ec75..ffb9b547f 100644 --- a/src/lib/cloud-agent-sdk/session-manager.test.ts +++ b/src/lib/cloud-agent-sdk/session-manager.test.ts @@ -92,9 +92,9 @@ function createMockConfig(overrides: Partial = {}): Sessio return { store: createStore(), resolveSession: jest.fn().mockResolvedValue({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('agent-1'), - isLive: true, }), getTicket: jest.fn().mockResolvedValue('ticket-123'), fetchSnapshot: jest.fn().mockResolvedValue({ info: {}, messages: [] }), diff --git a/src/lib/cloud-agent-sdk/session-manager.ts b/src/lib/cloud-agent-sdk/session-manager.ts index 7d7ea6ef9..e253ff55e 100644 --- a/src/lib/cloud-agent-sdk/session-manager.ts +++ b/src/lib/cloud-agent-sdk/session-manager.ts @@ -40,7 +40,7 @@ type SessionConfig = { model: string; variant?: string | null; }; -type ActiveSessionType = 'cloud-agent' | 'cli'; +type ActiveSessionType = 'cloud-agent' | 'remote'; type StandaloneQuestion = { requestId: string; questions: QuestionInfo[] }; type StandalonePermission = { requestId: string; @@ -506,8 +506,8 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { if (ap?.requestId === requestId) store.set(activePermissionAtom, null); }, onResolved: resolved => { - if (resolved.cloudAgentSessionId) activeSessionType = 'cloud-agent'; - else if (resolved.isLive) activeSessionType = 'cli'; + if (resolved.type === 'cloud-agent') activeSessionType = 'cloud-agent'; + else if (resolved.type === 'remote') activeSessionType = 'remote'; }, onBranchChanged: branch => { const currentFetched = store.get(fetchedSessionDataAtom); @@ -547,7 +547,7 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { // Fallback: clear loading when events flow even if no root // session.created was replayed (e.g. CLI snapshot failure). store.set(isLoadingAtom, false); - if (activeSessionType === 'cli') { + if (activeSessionType === 'remote') { config.onRemoteSessionOpened?.({ kiloSessionId }); } }, @@ -588,7 +588,7 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { model: payload.model, variant: payload.variant, }); - if (sessionType === 'cli' && kiloSessionId) { + if (sessionType === 'remote' && kiloSessionId) { config.onRemoteSessionMessageSent?.({ kiloSessionId }); } } catch (err) { diff --git a/src/lib/cloud-agent-sdk/session-phase.test.ts b/src/lib/cloud-agent-sdk/session-phase.test.ts index 35dc7b68a..3bb41d5dd 100644 --- a/src/lib/cloud-agent-sdk/session-phase.test.ts +++ b/src/lib/cloud-agent-sdk/session-phase.test.ts @@ -83,9 +83,9 @@ function createSessionWithStateCapture( const session = createCloudAgentSession({ kiloSessionId: TEST_KILO_ID, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId: TEST_KILO_ID, cloudAgentSessionId: TEST_CLOUD_AGENT_ID, - isLive: true, }), websocketBaseUrl: 'ws://localhost:9999', transport: { @@ -375,9 +375,9 @@ describe('session state transitions', () => { const session = createCloudAgentSession({ kiloSessionId: TEST_KILO_ID, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId: TEST_KILO_ID, cloudAgentSessionId: TEST_CLOUD_AGENT_ID, - isLive: true, }), websocketBaseUrl: 'ws://localhost:9999', transport: { diff --git a/src/lib/cloud-agent-sdk/session-routing.test.ts b/src/lib/cloud-agent-sdk/session-routing.test.ts index b88864de7..54ab3ab63 100644 --- a/src/lib/cloud-agent-sdk/session-routing.test.ts +++ b/src/lib/cloud-agent-sdk/session-routing.test.ts @@ -76,9 +76,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-456'), - isLive: true, }) ); @@ -112,14 +112,13 @@ describe('session transport routing', () => { }); }); - describe('resolveSession returning CLI live session', () => { + describe('resolveSession returning remote session', () => { it('creates CLI live transport and sends subscribe message', async () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'remote', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: true, }) ); @@ -150,7 +149,7 @@ describe('session transport routing', () => { }); }); - describe('resolveSession returning CLI historical session', () => { + describe('resolveSession returning read-only session', () => { it('replays snapshot events', async () => { const snapshot = makeSnapshot({ id: SES_ID }, [ { @@ -164,9 +163,8 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'read-only', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: false, }) ); @@ -201,65 +199,11 @@ describe('session transport routing', () => { }); }); - describe('resolveSession returning completed Cloud Agent session', () => { - it('routes to historical transport when cloudAgentSessionId exists but isLive is false', async () => { - const snapshot = makeSnapshot({ id: SES_ID }, [ - { - info: stubUserMessage({ id: 'msg-1', sessionID: SES_ID }), - parts: [ - stubTextPart({ id: 'part-1', messageID: 'msg-1', sessionID: SES_ID, text: 'hi' }), - ], - }, - ]); - - const resolveSession = jest.fn( - (): Promise => - Promise.resolve({ - kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: cloudAgentId('do-456'), - isLive: false, - }) - ); - - const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); - - const session = createCloudAgentSession({ - kiloSessionId: kiloId('ses-1'), - resolveSession, - transport: { - fetchSnapshot, - getTicket: () => 'ticket', - api: { - send: () => Promise.resolve(), - interrupt: () => Promise.resolve(), - answer: () => Promise.resolve(), - reject: () => Promise.resolve(), - respondToPermission: () => Promise.resolve(), - }, - }, - }); - - session.connect(); - await Promise.resolve(); - await Promise.resolve(); - - // Should NOT open a WebSocket — historical transport is snapshot-only - expect(webSocketConstructor).not.toHaveBeenCalled(); - - // Session info set from snapshot - expect(session.state.getSessionInfo()).toEqual({ id: 'ses-1', parentID: undefined }); - - // Messages in storage - const messageIds = session.storage.getMessageIds(); - expect(messageIds).toContain('msg-1'); - - // Historical session should not be interactive - expect(session.canSend).toBe(false); - expect(session.canInterrupt).toBe(false); - - session.destroy(); - }); - }); + // NOTE: The old "completed Cloud Agent session" case (cloudAgentSessionId present + // but isLive=false) no longer exists. With the discriminated union, the resolver + // decides the session type. A completed cloud agent session is resolved as + // 'cloud-agent' (the transport handles completion via snapshot + WebSocket), or + // the resolver may choose 'read-only' for sessions without a live DO. describe('resolveSession failure', () => { it('sets error state and fires onError', async () => { @@ -323,9 +267,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -357,9 +301,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -388,9 +332,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -416,15 +360,14 @@ describe('session transport routing', () => { session.destroy(); }); - it('sets error state when CLI live session lacks required config', async () => { + it('sets error state when remote session lacks required config', async () => { const onError = jest.fn(); const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'remote', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: true, }) ); @@ -439,7 +382,7 @@ describe('session transport routing', () => { await Promise.resolve(); expect(onError).toHaveBeenCalledWith( - 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for live CLI sessions' + 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for remote CLI sessions' ); expect(session.state.getActivity()).toEqual({ type: 'idle' }); expect(session.state.getStatus().type).toBe('error'); @@ -447,15 +390,14 @@ describe('session transport routing', () => { session.destroy(); }); - it('sets error state when CLI historical session lacks fetchSnapshot', async () => { + it('sets error state when read-only session lacks fetchSnapshot', async () => { const onError = jest.fn(); const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'read-only', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: false, }) ); @@ -470,7 +412,7 @@ describe('session transport routing', () => { await Promise.resolve(); expect(onError).toHaveBeenCalledWith( - 'CloudAgentSession transport.fetchSnapshot is required for historical sessions' + 'CloudAgentSession transport.fetchSnapshot is required for read-only sessions' ); expect(session.state.getActivity()).toEqual({ type: 'idle' }); expect(session.state.getStatus().type).toBe('error'); diff --git a/src/lib/cloud-agent-sdk/session-transport.test.ts b/src/lib/cloud-agent-sdk/session-transport.test.ts index 47a5ea9aa..f13f04c91 100644 --- a/src/lib/cloud-agent-sdk/session-transport.test.ts +++ b/src/lib/cloud-agent-sdk/session-transport.test.ts @@ -65,9 +65,9 @@ function createCloudAgentResolvedSession(api: CloudAgentApi): CloudAgentSession return createCloudAgentSession({ kiloSessionId, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId, cloudAgentSessionId, - isLive: true, }), transport: { getTicket: () => 'ticket', @@ -198,14 +198,13 @@ describe('commands throw before transport is connected', () => { }); }); -describe('session transport missing command methods (historical session)', () => { +describe('session transport missing command methods (read-only session)', () => { function createHistoricalSession(): CloudAgentSession { return createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -219,7 +218,7 @@ describe('session transport missing command methods (historical session)', () => await new Promise(r => setTimeout(r, 0)); } - it('session.send() throws for historical session', async () => { + it('session.send() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -230,7 +229,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.interrupt() throws for historical session', async () => { + it('session.interrupt() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -241,7 +240,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.answer() throws for historical session', async () => { + it('session.answer() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -252,7 +251,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.reject() throws for historical session', async () => { + it('session.reject() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -264,16 +263,15 @@ describe('session transport missing command methods (historical session)', () => }); }); -describe('CLI live session send via typed transport methods', () => { +describe('remote session send via typed transport methods', () => { const cliKiloSessionId = kiloId('ses_cli-live-session'); - it('session.send() uses kiloSessionId for CLI live sessions', async () => { + it('session.send() uses kiloSessionId for remote sessions', async () => { const session = createCloudAgentSession({ kiloSessionId: cliKiloSessionId, resolveSession: async () => ({ + type: 'remote' as const, kiloSessionId: cliKiloSessionId, - cloudAgentSessionId: null, - isLive: true, }), transport: { cliWebsocketUrl: 'wss://localhost:9999/api/user/web', @@ -350,13 +348,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canSend is true after connecting a CLI live session', async () => { + it('canSend is true after connecting a remote session', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_cli-live'), resolveSession: async () => ({ + type: 'remote' as const, kiloSessionId: kiloId('ses_cli-live'), - cloudAgentSessionId: null, - isLive: true, }), transport: { cliWebsocketUrl: 'wss://localhost:9999/api/user/web', @@ -373,13 +370,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canSend is false after connecting a historical session', async () => { + it('canSend is false after connecting a read-only session', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -409,13 +405,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canInterrupt is false for historical sessions', async () => { + it('canInterrupt is false for read-only sessions', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -434,16 +429,13 @@ describe('session capabilities', () => { describe('disconnect during resolution', () => { it('disconnect() before resolveSession settles prevents transport from attaching', async () => { const api = createMockApi(); - let resolveSession!: (value: { + type CloudAgentResolved = { + type: 'cloud-agent'; kiloSessionId: typeof kiloSessionId; cloudAgentSessionId: typeof cloudAgentSessionId; - isLive: boolean; - }) => void; - const resolvePromise = new Promise<{ - kiloSessionId: typeof kiloSessionId; - cloudAgentSessionId: typeof cloudAgentSessionId; - isLive: boolean; - }>(r => { + }; + let resolveSession!: (value: CloudAgentResolved) => void; + const resolvePromise = new Promise(r => { resolveSession = r; }); @@ -463,7 +455,7 @@ describe('disconnect during resolution', () => { session.disconnect(); // Now let the resolution complete - resolveSession({ kiloSessionId, cloudAgentSessionId, isLive: true }); + resolveSession({ type: 'cloud-agent', kiloSessionId, cloudAgentSessionId }); await resolvePromise; // Flush microtasks so resolveAndConnect can run its post-resolve code await new Promise(r => setTimeout(r, 0)); diff --git a/src/lib/cloud-agent-sdk/session.ts b/src/lib/cloud-agent-sdk/session.ts index c9b04f4cd..7f81fd2d4 100644 --- a/src/lib/cloud-agent-sdk/session.ts +++ b/src/lib/cloud-agent-sdk/session.ts @@ -142,69 +142,75 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes function pickTransportFactory(resolved: ResolvedSession): TransportFactory { console.log('[cli-debug] pickTransportFactory: resolved=%o', resolved); - if (resolved.cloudAgentSessionId && resolved.isLive) { - if (!config.transport.getTicket) { - throw new Error( - 'CloudAgentSession transport.getTicket is required for Cloud Agent sessions' + switch (resolved.type) { + case 'remote': { + if (!config.transport.cliWebsocketUrl || !config.transport.getAuthToken) { + throw new Error( + 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for remote CLI sessions' + ); + } + console.log( + '[cli-debug] pickTransportFactory: → CLI Live transport (kiloSessionId=%s, wsUrl=%s)', + resolved.kiloSessionId, + config.transport.cliWebsocketUrl ); + return createCliLiveTransport({ + kiloSessionId: resolved.kiloSessionId, + websocketUrl: config.transport.cliWebsocketUrl, + getAuthToken: config.transport.getAuthToken, + fetchSnapshot: config.transport.fetchSnapshot, + onError: config.onError, + }); } - if (!config.transport.fetchSnapshot) { - throw new Error( - 'CloudAgentSession transport.fetchSnapshot is required for Cloud Agent sessions' + case 'cloud-agent': { + if (!config.transport.getTicket) { + throw new Error( + 'CloudAgentSession transport.getTicket is required for Cloud Agent sessions' + ); + } + if (!config.transport.fetchSnapshot) { + throw new Error( + 'CloudAgentSession transport.fetchSnapshot is required for Cloud Agent sessions' + ); + } + if (!config.transport.api) { + throw new Error('CloudAgentSession transport.api is required for Cloud Agent sessions'); + } + console.log( + '[cli-debug] pickTransportFactory: → Cloud Agent transport (cloudAgentSessionId=%s)', + resolved.cloudAgentSessionId ); + return createCloudAgentTransport({ + sessionId: resolved.cloudAgentSessionId, + kiloSessionId: config.kiloSessionId, + api: config.transport.api, + getTicket: config.transport.getTicket, + fetchSnapshot: config.transport.fetchSnapshot, + websocketBaseUrl: config.websocketBaseUrl, + onError: config.onError, + }); } - if (!config.transport.api) { - throw new Error('CloudAgentSession transport.api is required for Cloud Agent sessions'); - } - console.log( - '[cli-debug] pickTransportFactory: → Cloud Agent transport (cloudAgentSessionId=%s)', - resolved.cloudAgentSessionId - ); - return createCloudAgentTransport({ - sessionId: resolved.cloudAgentSessionId, - kiloSessionId: config.kiloSessionId, - api: config.transport.api, - getTicket: config.transport.getTicket, - fetchSnapshot: config.transport.fetchSnapshot, - websocketBaseUrl: config.websocketBaseUrl, - onError: config.onError, - }); - } - - if (resolved.isLive) { - if (!config.transport.cliWebsocketUrl || !config.transport.getAuthToken) { - throw new Error( - 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for live CLI sessions' + case 'read-only': { + if (!config.transport.fetchSnapshot) { + throw new Error( + 'CloudAgentSession transport.fetchSnapshot is required for read-only sessions' + ); + } + console.log( + '[cli-debug] pickTransportFactory: → Historical transport (kiloSessionId=%s)', + resolved.kiloSessionId ); + return createCliHistoricalTransport({ + kiloSessionId: resolved.kiloSessionId, + fetchSnapshot: config.transport.fetchSnapshot, + onError: config.onError, + }); + } + default: { + const _exhaustive: never = resolved; + throw new Error(`Unknown resolved session type: ${(_exhaustive as { type: string }).type}`); } - console.log( - '[cli-debug] pickTransportFactory: → CLI Live transport (kiloSessionId=%s, wsUrl=%s)', - resolved.kiloSessionId, - config.transport.cliWebsocketUrl - ); - return createCliLiveTransport({ - kiloSessionId: resolved.kiloSessionId, - websocketUrl: config.transport.cliWebsocketUrl, - getAuthToken: config.transport.getAuthToken, - fetchSnapshot: config.transport.fetchSnapshot, - onError: config.onError, - }); - } - - if (!config.transport.fetchSnapshot) { - throw new Error( - 'CloudAgentSession transport.fetchSnapshot is required for historical sessions' - ); } - console.log( - '[cli-debug] pickTransportFactory: → Historical transport (kiloSessionId=%s)', - resolved.kiloSessionId - ); - return createCliHistoricalTransport({ - kiloSessionId: resolved.kiloSessionId, - fetchSnapshot: config.transport.fetchSnapshot, - onError: config.onError, - }); } async function resolveAndConnect(expectedGeneration: number): Promise { diff --git a/src/lib/cloud-agent-sdk/types.ts b/src/lib/cloud-agent-sdk/types.ts index f2b7ed46c..5c789363c 100644 --- a/src/lib/cloud-agent-sdk/types.ts +++ b/src/lib/cloud-agent-sdk/types.ts @@ -108,11 +108,10 @@ export type ServiceStateSnapshot = { // Session resolution — determines session type and transport routing // --------------------------------------------------------------------------- -export type ResolvedSession = { - kiloSessionId: KiloSessionId; - cloudAgentSessionId: CloudAgentSessionId | null; // null = CLI session - isLive: boolean; -}; +export type ResolvedSession = + | { type: 'remote'; kiloSessionId: KiloSessionId } + | { type: 'cloud-agent'; kiloSessionId: KiloSessionId; cloudAgentSessionId: CloudAgentSessionId } + | { type: 'read-only'; kiloSessionId: KiloSessionId }; // --------------------------------------------------------------------------- // Historical session snapshot — used by CLI historical transport