diff --git a/src/components/cloud-agent-next/CloudAgentProvider.tsx b/src/components/cloud-agent-next/CloudAgentProvider.tsx index 38ea81721..9204b09ca 100644 --- a/src/components/cloud-agent-next/CloudAgentProvider.tsx +++ b/src/components/cloud-agent-next/CloudAgentProvider.tsx @@ -37,32 +37,32 @@ export function CloudAgentProvider({ children, organizationId }: CloudAgentProvi store: storeRef.current, resolveSession: async (kiloSessionId: KiloSessionId): Promise => { + // 1. Check if the session is in the active sessions list (remote CLI) + try { + const active = await trpcClient.activeSessions.list.query(); + if (active.sessions.some(s => s.id === kiloSessionId)) { + return { type: 'remote', kiloSessionId }; + } + } catch { + // Active sessions unavailable — fall through to other checks + } + + // 2. Check if the session has a cloud agent session ID try { const session = await trpcClient.cliSessionsV2.get.query({ session_id: kiloSessionId }); if (session.cloud_agent_session_id) { return { + type: 'cloud-agent', kiloSessionId, cloudAgentSessionId: session.cloud_agent_session_id as CloudAgentSessionId, - isLive: true, }; } - // CLI session — check if live - let isLive = false; - try { - const active = await trpcClient.activeSessions.list.query(); - isLive = active.sessions.some(s => s.id === kiloSessionId); - } catch { - /* not live */ - } - return { kiloSessionId, cloudAgentSessionId: null, isLive }; } catch { - // Not found — treat as cloud agent session ID directly (backward compat) - return { - kiloSessionId, - cloudAgentSessionId: kiloSessionId as unknown as CloudAgentSessionId, - isLive: true, - }; + // Session not found — fall through to read-only } + + // 3. Fallback: read-only historical session + return { type: 'read-only', kiloSessionId }; }, getTicket: async (sessionId: CloudAgentSessionId): Promise => { diff --git a/src/lib/cloud-agent-sdk/base-connection.test.ts b/src/lib/cloud-agent-sdk/base-connection.test.ts new file mode 100644 index 000000000..a8d20970e --- /dev/null +++ b/src/lib/cloud-agent-sdk/base-connection.test.ts @@ -0,0 +1,573 @@ +import { + createBaseConnection, + DEFAULT_STALENESS_TIMEOUT_MS, + type BaseConnectionConfig, +} from './base-connection'; + +type MockWebSocket = { + url: string; + onopen: ((ev: Event) => void) | null; + onmessage: ((ev: MessageEvent) => void) | null; + onclose: ((ev: CloseEvent) => void) | null; + onerror: ((ev: Event) => void) | null; + close: jest.Mock; + send: jest.Mock; + readyState: number; +}; + +let sockets: MockWebSocket[]; +let webSocketMock: jest.Mock; + +// The source code guards browser APIs with `typeof document/window !== 'undefined'`. +// In Jest's node environment these don't exist, so we provide minimal mocks. +let mockDocument: EventTarget & { visibilityState: string }; +let mockWindow: EventTarget; + +let savedDocument: typeof globalThis.document; +let savedWindow: typeof globalThis.window; + +beforeEach(() => { + jest.useFakeTimers(); + sockets = []; + + webSocketMock = jest.fn((url: string) => { + const socket: MockWebSocket = { + url, + onopen: null, + onmessage: null, + onclose: null, + onerror: null, + close: jest.fn(), + send: jest.fn(), + readyState: 1, // WebSocket.OPEN + }; + sockets.push(socket); + return socket; + }); + + // @ts-expect-error -- test WebSocket mock + global.WebSocket = webSocketMock; + (global.WebSocket as unknown as Record).OPEN = 1; + (global.WebSocket as unknown as Record).CLOSED = 3; + + // Set up minimal document/window so base-connection registers event listeners + mockDocument = Object.assign(new EventTarget(), { visibilityState: 'visible' }); + mockWindow = new EventTarget(); + + savedDocument = globalThis.document; + savedWindow = globalThis.window; + // @ts-expect-error -- minimal mock for browser globals + globalThis.document = mockDocument; + // @ts-expect-error -- minimal mock for browser globals + globalThis.window = mockWindow; +}); + +afterEach(() => { + jest.useRealTimers(); + jest.restoreAllMocks(); + // @ts-expect-error -- cleanup test global + delete global.WebSocket; + + // Restore original document/window (undefined in node) + globalThis.document = savedDocument; + globalThis.window = savedWindow; +}); + +function createTestConnection(overrides: Partial = {}) { + const onConnected = jest.fn(); + const onReconnected = jest.fn(); + const onDisconnected = jest.fn(); + const onEvent = jest.fn(); + const onUnexpectedDisconnect = jest.fn(); + + const config: BaseConnectionConfig = { + buildUrl: () => 'ws://localhost:9999/test', + parseMessage: (data: unknown) => { + if (typeof data === 'string') { + return { type: 'event' as const, payload: data }; + } + return null; + }, + onEvent, + onConnected, + onDisconnected, + onReconnected, + onUnexpectedDisconnect, + ...overrides, + }; + + const connection = createBaseConnection(config); + return { + connection, + onConnected, + onReconnected, + onDisconnected, + onEvent, + onUnexpectedDisconnect, + }; +} + +function connectSocket(socketIndex = 0): void { + sockets[socketIndex].onmessage?.({ data: 'connected-msg' } as MessageEvent); +} + +function closeSocket(socketIndex: number, code = 1006): void { + sockets[socketIndex].onclose?.({ code, reason: '', wasClean: false } as CloseEvent); +} + +function simulateVisibilityChange(state: 'visible' | 'hidden'): void { + mockDocument.visibilityState = state; + mockDocument.dispatchEvent(new Event('visibilitychange')); +} + +function simulatePageshow(persisted: boolean): void { + const event = new Event('pageshow'); + Object.defineProperty(event, 'persisted', { value: persisted }); + mockWindow.dispatchEvent(event); +} + +describe('createBaseConnection – stale WebSocket recovery', () => { + describe('visibility change', () => { + it('reconnects with reset attempts when tab becomes visible and WS is dead', () => { + const { connection } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Mark socket as not open (simulating a dead connection) + sockets[0].readyState = 3; // WebSocket.CLOSED + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Should have created a new socket for reconnect + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('reconnects if no server message within timeout when tab becomes visible with open WS', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Advance past the recency window so the staleness check fires + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Socket is OPEN but last message is stale — timeout is armed + expect(sockets).toHaveLength(1); + + // Advance past the staleness timeout + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + // Should have closed the stale socket and created a new one + expect(sockets[0].close).toHaveBeenCalled(); + expect(onDisconnected).toHaveBeenCalled(); + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('anchors staleness clock to the new socket after reconnect', () => { + jest.spyOn(Math, 'random').mockReturnValue(0); + + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Advance time so old socket's lastMessageTime becomes stale + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS + 1000); + + // Force a reconnect via socket close + minimal backoff. + // With Math.random()=0, backoff for attempt 0 = 500ms. + closeSocket(0); + jest.advanceTimersByTime(500); + + // New socket created — connectInternal resets lastMessageTime to Date.now() + expect(sockets).toHaveLength(2); + + // Tab becomes visible immediately after new socket is created. + // Without the lastMessageTime reset, the old socket's stale timestamp + // would cause a spurious staleness-timeout reconnect here. + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Advance past the staleness window — no extra reconnect should occur + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + expect(sockets).toHaveLength(2); + expect(onDisconnected).toHaveBeenCalledTimes(1); // only from the first close + + connection.destroy(); + }); + + it('skips staleness check when a message was received recently', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + // Do NOT advance time — last message is within the recency window + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Advance past the timeout — nothing should happen + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('cancels staleness timeout if a server message arrives before deadline', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Receive a message before the timeout fires + sockets[0].onmessage?.({ data: 'server-reply' } as MessageEvent); + + // Advance past the timeout - should NOT trigger reconnect + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('clears staleness timeout when tab is hidden', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + // Tab visible → arms staleness timeout + simulateVisibilityChange('visible'); + + // Tab hidden → should clear the timeout + simulateVisibilityChange('hidden'); + + // Advance past the timeout - nothing should happen + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + }); + + describe('BFCache (pageshow)', () => { + it('force-closes WS and reconnects on pageshow with persisted=true', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulatePageshow(true); + + expect(sockets[0].close).toHaveBeenCalled(); + expect(onDisconnected).toHaveBeenCalled(); + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('does nothing on pageshow with persisted=false', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + simulatePageshow(false); + + expect(sockets[0].close).not.toHaveBeenCalled(); + expect(onDisconnected).not.toHaveBeenCalled(); + expect(sockets).toHaveLength(1); + + connection.destroy(); + }); + }); + + describe('online event', () => { + it('resets attempts and reconnects when online fires while disconnected', () => { + const { connection } = createTestConnection(); + connection.connect(); + // Don't send a message, so `connected` stays false. + // Close the socket to simulate a disconnected state. + closeSocket(0); + + // Pending reconnect timer is scheduled. Clear it via online event. + const socketsBeforeOnline = sockets.length; + mockWindow.dispatchEvent(new Event('online')); + + // Should have created a new socket + expect(sockets.length).toBeGreaterThan(socketsBeforeOnline); + + connection.destroy(); + }); + + it('does nothing when online fires while already connected', () => { + const { connection, onDisconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + mockWindow.dispatchEvent(new Event('online')); + + // Should not create a new socket + expect(sockets).toHaveLength(1); + expect(onDisconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + }); + + describe('reconnect attempts reset after exhaustion', () => { + function exhaustReconnectAttempts(startSocketIndex: number) { + jest.spyOn(Math, 'random').mockReturnValue(0); + + // Close 8 times without ever receiving a message to exhaust retries + for (let i = 0; i < 8; i++) { + const idx = startSocketIndex + i; + closeSocket(idx); + // Advance timers to trigger the next scheduled reconnect + jest.advanceTimersByTime(60_000); + } + } + + it('visibilitychange to visible resets counter and reconnects after exhausting retries', () => { + const { connection } = createTestConnection(); + connection.connect(); + + exhaustReconnectAttempts(0); + + // 1 initial + 8 reconnects = 9 sockets + const socketsAfterExhaustion = sockets.length; + + // Close the last socket to hit the max-attempts guard + closeSocket(sockets.length - 1); + jest.advanceTimersByTime(60_000); + + // No more sockets should be created (max attempts exceeded) + expect(sockets.length).toBe(socketsAfterExhaustion); + + // Now simulate tab becoming visible - should reset and reconnect + sockets[sockets.length - 1].readyState = 3; // WebSocket.CLOSED + simulateVisibilityChange('visible'); + + expect(sockets.length).toBe(socketsAfterExhaustion + 1); + + connection.destroy(); + }); + + it('online event resets counter and reconnects after exhausting retries', () => { + const { connection } = createTestConnection(); + connection.connect(); + + exhaustReconnectAttempts(0); + + const socketsAfterExhaustion = sockets.length; + + closeSocket(sockets.length - 1); + jest.advanceTimersByTime(60_000); + + expect(sockets.length).toBe(socketsAfterExhaustion); + + // online event should reset and reconnect + mockWindow.dispatchEvent(new Event('online')); + + expect(sockets.length).toBe(socketsAfterExhaustion + 1); + + connection.destroy(); + }); + }); + + describe('onReconnected vs onConnected', () => { + it('fires onConnected on first successful connection', () => { + const { connection, onConnected, onReconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + expect(onConnected).toHaveBeenCalledTimes(1); + expect(onReconnected).not.toHaveBeenCalled(); + + connection.destroy(); + }); + + it('fires onReconnected on subsequent connections after disconnect', () => { + jest.spyOn(Math, 'random').mockReturnValue(0); + + const { connection, onConnected, onReconnected } = createTestConnection(); + connection.connect(); + connectSocket(0); + + expect(onConnected).toHaveBeenCalledTimes(1); + + // Disconnect and let it reconnect + closeSocket(0); + jest.advanceTimersByTime(60_000); + + // Second socket is now open - send a message to mark it connected + connectSocket(1); + + expect(onConnected).toHaveBeenCalledTimes(1); + expect(onReconnected).toHaveBeenCalledTimes(1); + + connection.destroy(); + }); + }); + + describe('proactive auth refresh on reconnect', () => { + it('calls refreshAuth before reconnecting after staleness timeout', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + // Make lastMessageTime stale, then trigger visibility check + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + // Advance past staleness timeout + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + // refreshAuth should be called to get a fresh ticket before reconnecting + expect(refreshAuth).toHaveBeenCalledTimes(1); + + // Allow the async refresh to complete + await Promise.resolve(); + await Promise.resolve(); + + // A new socket should be created after refresh + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('calls refreshAuth before reconnecting after BFCache restore', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + simulatePageshow(true); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('calls refreshAuth before reconnecting after online event while disconnected', async () => { + const refreshAuth = jest.fn(() => Promise.resolve()); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + // Don't connect — simulate being disconnected + closeSocket(0); + + // Clear the reconnect timer that was scheduled by closeSocket + jest.advanceTimersByTime(0); + + refreshAuth.mockClear(); + mockWindow.dispatchEvent(new Event('online')); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + // Should create a new socket after refresh + const socketsAfter = sockets.length; + expect(socketsAfter).toBeGreaterThan(1); + + connection.destroy(); + }); + + it('still reconnects if refreshAuth fails', async () => { + const refreshAuth = jest.fn(() => Promise.reject(new Error('refresh failed'))); + const { connection } = createTestConnection({ refreshAuth }); + connection.connect(); + connectSocket(0); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + expect(refreshAuth).toHaveBeenCalledTimes(1); + + await Promise.resolve(); + await Promise.resolve(); + + // Should still create a new socket even if refresh failed + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + + it('does not call refreshAuth when no refreshAuth is configured', () => { + const { connection } = createTestConnection(); // no refreshAuth + connection.connect(); + connectSocket(0); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + simulateVisibilityChange('hidden'); + simulateVisibilityChange('visible'); + + jest.advanceTimersByTime(DEFAULT_STALENESS_TIMEOUT_MS); + + // Should still create a new socket (direct connect, no refresh) + expect(sockets).toHaveLength(2); + + connection.destroy(); + }); + }); + + describe('listener lifecycle', () => { + it('destroy() removes visibilitychange, pageshow, and online listeners', () => { + const docRemoveSpy = jest.spyOn(mockDocument, 'removeEventListener'); + const winRemoveSpy = jest.spyOn(mockWindow, 'removeEventListener'); + + const { connection } = createTestConnection(); + connection.connect(); + + connection.destroy(); + + const docRemovedEvents = docRemoveSpy.mock.calls.map(call => call[0]); + const winRemovedEvents = winRemoveSpy.mock.calls.map(call => call[0]); + + expect(docRemovedEvents).toContain('visibilitychange'); + expect(winRemovedEvents).toContain('pageshow'); + expect(winRemovedEvents).toContain('online'); + }); + + it('disconnect() removes visibilitychange, pageshow, and online listeners', () => { + const docRemoveSpy = jest.spyOn(mockDocument, 'removeEventListener'); + const winRemoveSpy = jest.spyOn(mockWindow, 'removeEventListener'); + + const { connection } = createTestConnection(); + connection.connect(); + + connection.disconnect(); + + const docRemovedEvents = docRemoveSpy.mock.calls.map(call => call[0]); + const winRemovedEvents = winRemoveSpy.mock.calls.map(call => call[0]); + + expect(docRemovedEvents).toContain('visibilitychange'); + expect(winRemovedEvents).toContain('pageshow'); + expect(winRemovedEvents).toContain('online'); + }); + }); +}); diff --git a/src/lib/cloud-agent-sdk/base-connection.ts b/src/lib/cloud-agent-sdk/base-connection.ts index ef74bc536..14d091db5 100644 --- a/src/lib/cloud-agent-sdk/base-connection.ts +++ b/src/lib/cloud-agent-sdk/base-connection.ts @@ -6,11 +6,15 @@ export type BaseConnectionConfig = { onEvent: (payload: unknown) => void; onConnected: () => void; onDisconnected: () => void; + onReconnected?: () => void; onUnexpectedDisconnect?: () => void; onError?: (message: string) => void; isAuthFailure?: (event: CloseEvent) => boolean; refreshAuth?: () => Promise; onOpen?: (ws: WebSocket) => void; + /** How long to wait for a server message (e.g. heartbeat) on tab resume before + * treating the connection as stale. Should exceed the server's heartbeat interval. */ + stalenessTimeoutMs?: number; }; export type Connection = { @@ -22,6 +26,7 @@ export type Connection = { const MAX_RECONNECT_ATTEMPTS = 8; const BACKOFF_BASE_MS = 1000; const BACKOFF_CAP_MS = 30000; +export const DEFAULT_STALENESS_TIMEOUT_MS = 30_000; // min(cap, base * 2^attempt) * (0.5 + random jitter) function calculateBackoffDelay(attempt: number): number { @@ -39,6 +44,15 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { let connected = false; let reconnectAttempt = 0; let generation = 0; + let hasConnectedOnce = false; + let stalenessTimeoutId: ReturnType | null = null; + let lastMessageTime = 0; + const stalenessTimeoutMs = config.stalenessTimeoutMs ?? DEFAULT_STALENESS_TIMEOUT_MS; + + // Bound handler references for event listener cleanup + let boundVisibilityHandler: (() => void) | null = null; + let boundPageshowHandler: ((e: PageTransitionEvent) => void) | null = null; + let boundOnlineHandler: (() => void) | null = null; function clearReconnectTimer(): void { if (reconnectTimeoutId !== null) { @@ -47,6 +61,13 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { } } + function clearStalenessTimeout(): void { + if (stalenessTimeoutId !== null) { + clearTimeout(stalenessTimeoutId); + stalenessTimeoutId = null; + } + } + async function refreshAuthAndReconnect(expectedGeneration: number) { if (!config.refreshAuth) { return; @@ -67,6 +88,18 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { } } + async function refreshAndConnect(expectedGeneration: number): Promise { + if (config.refreshAuth) { + try { + await config.refreshAuth(); + } catch { + // Continue with existing auth — the old ticket might still work + } + if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; + } + connectInternal(0, expectedGeneration); + } + function scheduleReconnect(attempt: number, expectedGeneration: number) { if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; @@ -95,6 +128,10 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (destroyed || intentionalDisconnect || expectedGeneration !== generation) return; reconnectAttempt = attempt; + clearStalenessTimeout(); + // Anchor the staleness clock to this socket so visibility checks don't + // inherit timing from a previous connection. + lastMessageTime = Date.now(); // Close existing socket - clear reference first so onclose ignores it const oldWs = ws; @@ -115,6 +152,10 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { }; newWs.onmessage = (messageEvent: MessageEvent) => { + // Any incoming message cancels an active staleness check + clearStalenessTimeout(); + lastMessageTime = Date.now(); + const parsed = config.parseMessage(messageEvent.data); if (parsed === null) { return; @@ -131,7 +172,12 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { if (!connected) { connected = true; - config.onConnected(); + if (hasConnectedOnce) { + config.onReconnected?.(); + } else { + hasConnectedOnce = true; + config.onConnected(); + } } config.onEvent(parsed.payload); @@ -202,6 +248,119 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { }; } + function handleVisibilityChange(): void { + if (destroyed || intentionalDisconnect) return; + + if (typeof document === 'undefined') return; + + if (document.visibilityState === 'hidden') { + clearStalenessTimeout(); + return; + } + + // Tab became visible + reconnectAttempt = 0; + + if (ws === null || ws.readyState !== WebSocket.OPEN) { + clearReconnectTimer(); + void refreshAndConnect(generation); + return; + } + + // If a message arrived recently, the connection is verified alive + if (Date.now() - lastMessageTime < stalenessTimeoutMs) { + return; + } + + // Socket appears open but no recent message — wait for the next server + // heartbeat to confirm liveness; if nothing arrives, treat as stale. + const currentGeneration = generation; + stalenessTimeoutId = setTimeout(() => { + stalenessTimeoutId = null; + if (destroyed || intentionalDisconnect || currentGeneration !== generation) return; + console.log('[Connection] Staleness timeout - no server message, reconnecting'); + const staleWs = ws; + if (staleWs !== null) { + ws = null; + staleWs.close(); + } + if (connected) { + connected = false; + config.onDisconnected(); + } + void refreshAndConnect(currentGeneration); + }, stalenessTimeoutMs); + } + + function handlePageshow(event: PageTransitionEvent): void { + if (destroyed || intentionalDisconnect) return; + + if (!event.persisted) return; + + // BFCache restore - WebSocket is guaranteed dead + console.log('[Connection] BFCache restore detected, forcing reconnect'); + reconnectAttempt = 0; + clearReconnectTimer(); + clearStalenessTimeout(); + + const staleWs = ws; + if (staleWs !== null) { + ws = null; + staleWs.close(); + } + if (connected) { + connected = false; + config.onDisconnected(); + } + void refreshAndConnect(generation); + } + + function handleOnline(): void { + if (destroyed || intentionalDisconnect) return; + + // If already connected with an open socket, nothing to do + if (connected && ws !== null && ws.readyState === WebSocket.OPEN) return; + + console.log('[Connection] Online event - reconnecting'); + reconnectAttempt = 0; + clearReconnectTimer(); + void refreshAndConnect(generation); + } + + function addEventListeners(): void { + if (typeof document !== 'undefined' && boundVisibilityHandler === null) { + boundVisibilityHandler = handleVisibilityChange; + document.addEventListener('visibilitychange', boundVisibilityHandler); + } + if (typeof window !== 'undefined') { + if (boundPageshowHandler === null) { + boundPageshowHandler = handlePageshow; + window.addEventListener('pageshow', boundPageshowHandler); + } + if (boundOnlineHandler === null) { + boundOnlineHandler = handleOnline; + window.addEventListener('online', boundOnlineHandler); + } + } + } + + function removeEventListeners(): void { + if (typeof document !== 'undefined' && boundVisibilityHandler !== null) { + document.removeEventListener('visibilitychange', boundVisibilityHandler); + boundVisibilityHandler = null; + } + if (typeof window !== 'undefined') { + if (boundPageshowHandler !== null) { + window.removeEventListener('pageshow', boundPageshowHandler); + boundPageshowHandler = null; + } + if (boundOnlineHandler !== null) { + window.removeEventListener('online', boundOnlineHandler); + boundOnlineHandler = null; + } + } + } + function connect() { console.log('[Connection] connect() called - resetting state'); intentionalDisconnect = false; @@ -209,8 +368,12 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { authRefreshAttempted = false; connected = false; reconnectAttempt = 0; + hasConnectedOnce = false; + lastMessageTime = 0; generation += 1; clearReconnectTimer(); + clearStalenessTimeout(); + addEventListeners(); connectInternal(0, generation); } @@ -219,6 +382,8 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); + clearStalenessTimeout(); + removeEventListeners(); if (ws !== null) { ws.close(); @@ -236,6 +401,8 @@ export function createBaseConnection(config: BaseConnectionConfig): Connection { generation += 1; clearReconnectTimer(); + clearStalenessTimeout(); + removeEventListeners(); if (ws !== null) { ws.close(); diff --git a/src/lib/cloud-agent-sdk/cli-live-transport.test.ts b/src/lib/cloud-agent-sdk/cli-live-transport.test.ts index ebfc669b5..e488d738d 100644 --- a/src/lib/cloud-agent-sdk/cli-live-transport.test.ts +++ b/src/lib/cloud-agent-sdk/cli-live-transport.test.ts @@ -1268,3 +1268,178 @@ describe('CliLiveTransport onError callback', () => { expect(onError).toHaveBeenCalledWith('snapshot fetch failed'); }); }); + +// --------------------------------------------------------------------------- +// Snapshot refetch on reconnect +// --------------------------------------------------------------------------- + +describe('CliLiveTransport snapshot refetch on reconnect', () => { + const validInboundMessage = { + type: 'event', + sessionId: KILO_SESSION_ID, + event: 'session.status', + data: { sessionID: KILO_SESSION_ID, status: { type: 'busy' } }, + }; + + function triggerReconnect(ws: MockWebSocket): void { + ws.onclose?.({ code: 1006 } as CloseEvent); + jest.advanceTimersByTime(60_000); + } + + function getLatestMockWs(): MockWebSocket { + return webSocketConstructor.mock.results.at(-1)?.value as MockWebSocket; + } + + function sendInboundOn(ws: MockWebSocket, msg: Record): void { + ws.onmessage?.({ data: JSON.stringify(msg) } as MessageEvent); + } + + it('refetches snapshot on reconnect and replays into sinks', async () => { + jest.useFakeTimers(); + try { + const snapshot: SessionSnapshot = makeSnapshot({ id: KILO_SESSION_ID }, [ + { + info: stubUserMessage({ id: 'msg-1', sessionID: KILO_SESSION_ID }), + parts: [stubTextPart({ id: 'part-1', sessionID: KILO_SESSION_ID, messageID: 'msg-1' })], + }, + ]); + + const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); + const { transport, chatEvents, serviceEvents } = createTransportWithSinks({ fetchSnapshot }); + + transport.connect(); + + // Wait for initial snapshot fetch to resolve + await jest.advanceTimersByTimeAsync(0); + + // Open WS and mark connection as established with a valid message + openConnection(); + sendInbound(validInboundMessage); + + // Record event counts after initial snapshot + establishment message + const chatCountAfterInit = chatEvents.length; + const serviceCountAfterInit = serviceEvents.length; + + // Trigger reconnect: close with non-auth code, advance past backoff + triggerReconnect(mockWs); + + // Open new WS and send valid message to trigger onReconnected + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + // Flush promises for the async snapshot refetch + await jest.advanceTimersByTimeAsync(0); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(fetchSnapshot).toHaveBeenCalledWith(KILO_SESSION_ID); + + // Snapshot replay adds: 1 session.created + 1 message.updated + 1 message.part.updated + // The valid inbound message also adds 1 session.status service event + expect(serviceEvents.length).toBeGreaterThan(serviceCountAfterInit); + expect(chatEvents.length).toBeGreaterThan(chatCountAfterInit); + + // Verify the replayed snapshot events are present + const sessionCreatedEvents = serviceEvents.filter(e => e.type === 'session.created'); + expect(sessionCreatedEvents).toHaveLength(2); // initial + reconnect + + const messageUpdatedEvents = chatEvents.filter(e => e.type === 'message.updated'); + expect(messageUpdatedEvents).toHaveLength(2); // initial + reconnect + + const partUpdatedEvents = chatEvents.filter(e => e.type === 'message.part.updated'); + expect(partUpdatedEvents).toHaveLength(2); // initial + reconnect + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('reconnect works without fetchSnapshot configured (no replay)', async () => { + jest.useFakeTimers(); + try { + const { transport, serviceEvents } = createTransportWithSinks(); + + transport.connect(); + openConnection(); + sendInbound(validInboundMessage); + + const serviceCountAfterInit = serviceEvents.length; + + triggerReconnect(mockWs); + + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + await jest.advanceTimersByTimeAsync(0); + + // No snapshot replay, but the valid inbound message on the new WS still routes + const sessionStatusEvents = serviceEvents.filter(e => e.type === 'session.status'); + expect(sessionStatusEvents.length).toBeGreaterThan(0); + + // No session.created events (no snapshot) + const sessionCreatedEvents = serviceEvents.filter(e => e.type === 'session.created'); + expect(sessionCreatedEvents).toHaveLength(0); + + // Verify no errors — service events grew from the reconnected inbound message + expect(serviceEvents.length).toBeGreaterThan(serviceCountAfterInit); + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('snapshot fetch failure on reconnect is non-fatal', async () => { + jest.useFakeTimers(); + try { + let callCount = 0; + const snapshot: SessionSnapshot = makeSnapshot({ id: KILO_SESSION_ID }); + const fetchSnapshot = jest.fn(() => { + callCount++; + if (callCount === 1) return Promise.resolve(snapshot); + return Promise.reject(new Error('network error')); + }); + const onError = jest.fn(); + + const { transport, serviceEvents } = createTransportWithSinks({ fetchSnapshot, onError }); + + transport.connect(); + await jest.advanceTimersByTimeAsync(0); + + openConnection(); + sendInbound(validInboundMessage); + + // Trigger reconnect + triggerReconnect(mockWs); + + const newMockWs = getLatestMockWs(); + newMockWs.onopen?.({} as Event); + sendInboundOn(newMockWs, validInboundMessage); + + // Flush promises — snapshot refetch rejects + await jest.advanceTimersByTimeAsync(0); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + + // No error propagated to onError (reconnect snapshot failure is silently swallowed) + expect(onError).not.toHaveBeenCalled(); + + // Transport still works — send another event on the new WS + sendInboundOn(newMockWs, { + type: 'event', + sessionId: KILO_SESSION_ID, + event: 'session.status', + data: { sessionID: KILO_SESSION_ID, status: { type: 'idle' } }, + }); + + const statusEvents = serviceEvents.filter(e => e.type === 'session.status'); + expect(statusEvents.length).toBeGreaterThanOrEqual(2); + + transport.destroy(); + } finally { + jest.useRealTimers(); + } + }); +}); diff --git a/src/lib/cloud-agent-sdk/cli-live-transport.ts b/src/lib/cloud-agent-sdk/cli-live-transport.ts index 9fc91b97b..a196bd00d 100644 --- a/src/lib/cloud-agent-sdk/cli-live-transport.ts +++ b/src/lib/cloud-agent-sdk/cli-live-transport.ts @@ -172,6 +172,19 @@ function createCliLiveTransport(config: CliLiveTransportConfig): TransportFactor ws.send(JSON.stringify({ type: 'subscribe', sessionId: config.kiloSessionId })); }, onConnected: () => {}, + onReconnected: () => { + if (expectedGeneration !== generation) return; + if (!config.fetchSnapshot) return; + void config.fetchSnapshot(config.kiloSessionId).then( + snapshot => { + if (expectedGeneration !== generation) return; + replaySnapshot(snapshot); + }, + () => { + // Snapshot refetch failure on reconnect is non-fatal + } + ); + }, onDisconnected: () => {}, onError: config.onError, isAuthFailure: (event: CloseEvent) => event.code === 4001 || event.code === 1008, diff --git a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts index 8bdcac276..12c697cfe 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-connection.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-connection.ts @@ -13,6 +13,7 @@ export type ConnectionConfig = { onConnected: () => void; onDisconnected: () => void; onUnexpectedDisconnect?: () => void; + onReconnected?: () => void; onError?: (error: StreamError) => void; onRefreshTicket?: () => Promise; heartbeatTimeoutMs?: number; @@ -59,6 +60,7 @@ export function createConnection(config: ConnectionConfig): Connection { const refreshTicket = config.onRefreshTicket; return createBaseConnection({ + stalenessTimeoutMs: config.heartbeatTimeoutMs, buildUrl: () => { const url = new URL(config.websocketUrl); url.searchParams.set('ticket', currentTicket); @@ -74,6 +76,7 @@ export function createConnection(config: ConnectionConfig): Connection { onConnected: config.onConnected, onDisconnected: config.onDisconnected, onUnexpectedDisconnect: config.onUnexpectedDisconnect, + onReconnected: config.onReconnected, onError: config.onError ? message => config.onError?.({ diff --git a/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts b/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts index 07600c0e5..4dd5d80f3 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-transport.test.ts @@ -386,3 +386,163 @@ describe('CloudAgentTransport command delegation', () => { transport.destroy(); }); }); + +// --------------------------------------------------------------------------- +// Snapshot refetch on reconnect +// --------------------------------------------------------------------------- + +describe('CloudAgentTransport snapshot refetch on reconnect', () => { + // Microtask-based flush that works under jest.useFakeTimers() + // (unlike flushPromises which uses setTimeout and hangs with fake timers) + async function flushMicrotasks(): Promise { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } + } + + function createTransportWithControllableSnapshot( + snapshotOverride?: ReturnType + ) { + const chatEvents: ChatEvent[] = []; + const serviceEvents: ServiceEvent[] = []; + const snapshot = snapshotOverride ?? emptySnapshot; + const fetchSnapshot = jest.fn(() => Promise.resolve(snapshot)); + + const factory = createCloudAgentTransport({ + sessionId: cloudAgentId('ses-1'), + kiloSessionId: kiloId('ses-1'), + api: createMockApi(), + getTicket: () => 'test-ticket', + fetchSnapshot, + websocketBaseUrl: 'ws://localhost:9999', + }); + + const transport = factory({ + onChatEvent: event => chatEvents.push(event), + onServiceEvent: event => serviceEvents.push(event), + }); + + return { transport, chatEvents, serviceEvents, fetchSnapshot }; + } + + function sendRawOn(ws: MockWebSocket, event: CloudAgentEvent): void { + ws.onmessage?.({ data: JSON.stringify(event) } as MessageEvent); + } + + /** Establish connection, simulate close + reconnect, return the new WS mock. */ + async function simulateReconnect(): Promise { + mockWs.onclose?.({ code: 1006, reason: '', wasClean: false } as CloseEvent); + + jest.advanceTimersByTime(2000); + await flushMicrotasks(); + + const newMockWs = webSocketConstructor.mock.results.at(-1)?.value as MockWebSocket; + + newMockWs.onopen?.(new Event('open')); + sendRawOn( + newMockWs, + kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } }) + ); + + return newMockWs; + } + + it('refetches snapshot on reconnect and replays events into sinks', async () => { + jest.useFakeTimers(); + try { + const { transport, serviceEvents, fetchSnapshot } = createTransportWithControllableSnapshot(); + + transport.connect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + + // Establish connection in base-connection by sending a valid event + sendRaw(kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } })); + + const serviceCountBefore = serviceEvents.length; + + const newMockWs = await simulateReconnect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + + const replayedCreated = serviceEvents + .slice(serviceCountBefore) + .filter(e => e.type === 'session.created'); + expect(replayedCreated).toHaveLength(1); + + transport.destroy(); + newMockWs.onclose?.({ code: 1000, reason: '', wasClean: true } as CloseEvent); + } finally { + jest.useRealTimers(); + } + }); + + it('replayed snapshot with messages upserts into sinks correctly', async () => { + jest.useFakeTimers(); + try { + const snapshotWithMessages = makeSnapshot({ id: 'ses-1' }, [ + { + info: { + id: 'msg-1', + sessionID: 'ses-1', + role: 'user', + time: { created: 1 }, + agent: 'build', + model: { providerID: 'a', modelID: 'b' }, + }, + parts: [ + { + id: 'part-1', + sessionID: 'ses-1', + messageID: 'msg-1', + type: 'text', + text: 'hello', + }, + ], + }, + ]); + + const { transport, chatEvents, serviceEvents, fetchSnapshot } = + createTransportWithControllableSnapshot(snapshotWithMessages); + + transport.connect(); + await flushMicrotasks(); + + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(1); + expect(chatEvents.filter(e => e.type === 'message.updated')).toHaveLength(1); + expect(chatEvents.filter(e => e.type === 'message.part.updated')).toHaveLength(1); + + // Establish connection + sendRaw(kilocode('session.status', { sessionID: 'ses-1', status: { type: 'busy' } })); + + const newMockWs = await simulateReconnect(); + await flushMicrotasks(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(2); + expect(chatEvents.filter(e => e.type === 'message.updated')).toHaveLength(2); + expect(chatEvents.filter(e => e.type === 'message.part.updated')).toHaveLength(2); + + transport.destroy(); + newMockWs.onclose?.({ code: 1000, reason: '', wasClean: true } as CloseEvent); + } finally { + jest.useRealTimers(); + } + }); + + it('initial connect fetches snapshot once and opens WebSocket', async () => { + const { transport, serviceEvents, fetchSnapshot } = createTransportWithControllableSnapshot(); + + transport.connect(); + await flushPromises(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + expect(fetchSnapshot).toHaveBeenCalledWith('ses-1'); + expect(webSocketConstructor).toHaveBeenCalledTimes(1); + expect(serviceEvents.filter(e => e.type === 'session.created')).toHaveLength(1); + + transport.destroy(); + }); +}); diff --git a/src/lib/cloud-agent-sdk/cloud-agent-transport.ts b/src/lib/cloud-agent-sdk/cloud-agent-transport.ts index 102738226..0927c8de4 100644 --- a/src/lib/cloud-agent-sdk/cloud-agent-transport.ts +++ b/src/lib/cloud-agent-sdk/cloud-agent-transport.ts @@ -85,6 +85,19 @@ function createCloudAgentTransport(config: CloudAgentTransportConfig): Transport } }, onConnected: () => {}, + onReconnected: () => { + if (expectedGeneration !== lifecycleGeneration) return; + stoppedReceived = false; + void config.fetchSnapshot(config.kiloSessionId).then( + snapshot => { + if (expectedGeneration !== lifecycleGeneration) return; + replaySnapshot(snapshot); + }, + () => { + // Snapshot refetch failure on reconnect — ignore, live events will still flow + } + ); + }, onDisconnected: () => {}, onUnexpectedDisconnect: () => { if (expectedGeneration !== lifecycleGeneration) return; diff --git a/src/lib/cloud-agent-sdk/session-manager.test.ts b/src/lib/cloud-agent-sdk/session-manager.test.ts index 89dc7ec75..ffb9b547f 100644 --- a/src/lib/cloud-agent-sdk/session-manager.test.ts +++ b/src/lib/cloud-agent-sdk/session-manager.test.ts @@ -92,9 +92,9 @@ function createMockConfig(overrides: Partial = {}): Sessio return { store: createStore(), resolveSession: jest.fn().mockResolvedValue({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('agent-1'), - isLive: true, }), getTicket: jest.fn().mockResolvedValue('ticket-123'), fetchSnapshot: jest.fn().mockResolvedValue({ info: {}, messages: [] }), diff --git a/src/lib/cloud-agent-sdk/session-manager.ts b/src/lib/cloud-agent-sdk/session-manager.ts index 7d7ea6ef9..e253ff55e 100644 --- a/src/lib/cloud-agent-sdk/session-manager.ts +++ b/src/lib/cloud-agent-sdk/session-manager.ts @@ -40,7 +40,7 @@ type SessionConfig = { model: string; variant?: string | null; }; -type ActiveSessionType = 'cloud-agent' | 'cli'; +type ActiveSessionType = 'cloud-agent' | 'remote'; type StandaloneQuestion = { requestId: string; questions: QuestionInfo[] }; type StandalonePermission = { requestId: string; @@ -506,8 +506,8 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { if (ap?.requestId === requestId) store.set(activePermissionAtom, null); }, onResolved: resolved => { - if (resolved.cloudAgentSessionId) activeSessionType = 'cloud-agent'; - else if (resolved.isLive) activeSessionType = 'cli'; + if (resolved.type === 'cloud-agent') activeSessionType = 'cloud-agent'; + else if (resolved.type === 'remote') activeSessionType = 'remote'; }, onBranchChanged: branch => { const currentFetched = store.get(fetchedSessionDataAtom); @@ -547,7 +547,7 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { // Fallback: clear loading when events flow even if no root // session.created was replayed (e.g. CLI snapshot failure). store.set(isLoadingAtom, false); - if (activeSessionType === 'cli') { + if (activeSessionType === 'remote') { config.onRemoteSessionOpened?.({ kiloSessionId }); } }, @@ -588,7 +588,7 @@ function createSessionManager(config: SessionManagerConfig): SessionManager { model: payload.model, variant: payload.variant, }); - if (sessionType === 'cli' && kiloSessionId) { + if (sessionType === 'remote' && kiloSessionId) { config.onRemoteSessionMessageSent?.({ kiloSessionId }); } } catch (err) { diff --git a/src/lib/cloud-agent-sdk/session-phase.test.ts b/src/lib/cloud-agent-sdk/session-phase.test.ts index 35dc7b68a..3bb41d5dd 100644 --- a/src/lib/cloud-agent-sdk/session-phase.test.ts +++ b/src/lib/cloud-agent-sdk/session-phase.test.ts @@ -83,9 +83,9 @@ function createSessionWithStateCapture( const session = createCloudAgentSession({ kiloSessionId: TEST_KILO_ID, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId: TEST_KILO_ID, cloudAgentSessionId: TEST_CLOUD_AGENT_ID, - isLive: true, }), websocketBaseUrl: 'ws://localhost:9999', transport: { @@ -375,9 +375,9 @@ describe('session state transitions', () => { const session = createCloudAgentSession({ kiloSessionId: TEST_KILO_ID, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId: TEST_KILO_ID, cloudAgentSessionId: TEST_CLOUD_AGENT_ID, - isLive: true, }), websocketBaseUrl: 'ws://localhost:9999', transport: { diff --git a/src/lib/cloud-agent-sdk/session-routing.test.ts b/src/lib/cloud-agent-sdk/session-routing.test.ts index f0973e644..54ab3ab63 100644 --- a/src/lib/cloud-agent-sdk/session-routing.test.ts +++ b/src/lib/cloud-agent-sdk/session-routing.test.ts @@ -76,9 +76,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-456'), - isLive: true, }) ); @@ -112,14 +112,13 @@ describe('session transport routing', () => { }); }); - describe('resolveSession returning CLI live session', () => { + describe('resolveSession returning remote session', () => { it('creates CLI live transport and sends subscribe message', async () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'remote', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: true, }) ); @@ -150,7 +149,7 @@ describe('session transport routing', () => { }); }); - describe('resolveSession returning CLI historical session', () => { + describe('resolveSession returning read-only session', () => { it('replays snapshot events', async () => { const snapshot = makeSnapshot({ id: SES_ID }, [ { @@ -164,9 +163,8 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'read-only', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: false, }) ); @@ -201,6 +199,12 @@ describe('session transport routing', () => { }); }); + // NOTE: The old "completed Cloud Agent session" case (cloudAgentSessionId present + // but isLive=false) no longer exists. With the discriminated union, the resolver + // decides the session type. A completed cloud agent session is resolved as + // 'cloud-agent' (the transport handles completion via snapshot + WebSocket), or + // the resolver may choose 'read-only' for sessions without a live DO. + describe('resolveSession failure', () => { it('sets error state and fires onError', async () => { const onError = jest.fn(); @@ -263,9 +267,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -297,9 +301,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -328,9 +332,9 @@ describe('session transport routing', () => { const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'cloud-agent', kiloSessionId: kiloId('ses-1'), cloudAgentSessionId: cloudAgentId('do-1'), - isLive: true, }) ); @@ -356,15 +360,14 @@ describe('session transport routing', () => { session.destroy(); }); - it('sets error state when CLI live session lacks required config', async () => { + it('sets error state when remote session lacks required config', async () => { const onError = jest.fn(); const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'remote', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: true, }) ); @@ -379,7 +382,7 @@ describe('session transport routing', () => { await Promise.resolve(); expect(onError).toHaveBeenCalledWith( - 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for live CLI sessions' + 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for remote CLI sessions' ); expect(session.state.getActivity()).toEqual({ type: 'idle' }); expect(session.state.getStatus().type).toBe('error'); @@ -387,15 +390,14 @@ describe('session transport routing', () => { session.destroy(); }); - it('sets error state when CLI historical session lacks fetchSnapshot', async () => { + it('sets error state when read-only session lacks fetchSnapshot', async () => { const onError = jest.fn(); const resolveSession = jest.fn( (): Promise => Promise.resolve({ + type: 'read-only', kiloSessionId: kiloId('ses-1'), - cloudAgentSessionId: null, - isLive: false, }) ); @@ -410,7 +412,7 @@ describe('session transport routing', () => { await Promise.resolve(); expect(onError).toHaveBeenCalledWith( - 'CloudAgentSession transport.fetchSnapshot is required for historical CLI sessions' + 'CloudAgentSession transport.fetchSnapshot is required for read-only sessions' ); expect(session.state.getActivity()).toEqual({ type: 'idle' }); expect(session.state.getStatus().type).toBe('error'); diff --git a/src/lib/cloud-agent-sdk/session-transport.test.ts b/src/lib/cloud-agent-sdk/session-transport.test.ts index 47a5ea9aa..f13f04c91 100644 --- a/src/lib/cloud-agent-sdk/session-transport.test.ts +++ b/src/lib/cloud-agent-sdk/session-transport.test.ts @@ -65,9 +65,9 @@ function createCloudAgentResolvedSession(api: CloudAgentApi): CloudAgentSession return createCloudAgentSession({ kiloSessionId, resolveSession: async () => ({ + type: 'cloud-agent' as const, kiloSessionId, cloudAgentSessionId, - isLive: true, }), transport: { getTicket: () => 'ticket', @@ -198,14 +198,13 @@ describe('commands throw before transport is connected', () => { }); }); -describe('session transport missing command methods (historical session)', () => { +describe('session transport missing command methods (read-only session)', () => { function createHistoricalSession(): CloudAgentSession { return createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -219,7 +218,7 @@ describe('session transport missing command methods (historical session)', () => await new Promise(r => setTimeout(r, 0)); } - it('session.send() throws for historical session', async () => { + it('session.send() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -230,7 +229,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.interrupt() throws for historical session', async () => { + it('session.interrupt() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -241,7 +240,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.answer() throws for historical session', async () => { + it('session.answer() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -252,7 +251,7 @@ describe('session transport missing command methods (historical session)', () => session.destroy(); }); - it('session.reject() throws for historical session', async () => { + it('session.reject() throws for read-only session', async () => { const session = createHistoricalSession(); await connectHistorical(session); @@ -264,16 +263,15 @@ describe('session transport missing command methods (historical session)', () => }); }); -describe('CLI live session send via typed transport methods', () => { +describe('remote session send via typed transport methods', () => { const cliKiloSessionId = kiloId('ses_cli-live-session'); - it('session.send() uses kiloSessionId for CLI live sessions', async () => { + it('session.send() uses kiloSessionId for remote sessions', async () => { const session = createCloudAgentSession({ kiloSessionId: cliKiloSessionId, resolveSession: async () => ({ + type: 'remote' as const, kiloSessionId: cliKiloSessionId, - cloudAgentSessionId: null, - isLive: true, }), transport: { cliWebsocketUrl: 'wss://localhost:9999/api/user/web', @@ -350,13 +348,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canSend is true after connecting a CLI live session', async () => { + it('canSend is true after connecting a remote session', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_cli-live'), resolveSession: async () => ({ + type: 'remote' as const, kiloSessionId: kiloId('ses_cli-live'), - cloudAgentSessionId: null, - isLive: true, }), transport: { cliWebsocketUrl: 'wss://localhost:9999/api/user/web', @@ -373,13 +370,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canSend is false after connecting a historical session', async () => { + it('canSend is false after connecting a read-only session', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -409,13 +405,12 @@ describe('session capabilities', () => { session.destroy(); }); - it('canInterrupt is false for historical sessions', async () => { + it('canInterrupt is false for read-only sessions', async () => { const session = createCloudAgentSession({ kiloSessionId: kiloId('ses_historical'), resolveSession: async () => ({ + type: 'read-only' as const, kiloSessionId: kiloId('ses_historical'), - cloudAgentSessionId: null, - isLive: false, }), transport: { fetchSnapshot: () => Promise.resolve(makeSnapshot({ id: 'ses_historical' })), @@ -434,16 +429,13 @@ describe('session capabilities', () => { describe('disconnect during resolution', () => { it('disconnect() before resolveSession settles prevents transport from attaching', async () => { const api = createMockApi(); - let resolveSession!: (value: { + type CloudAgentResolved = { + type: 'cloud-agent'; kiloSessionId: typeof kiloSessionId; cloudAgentSessionId: typeof cloudAgentSessionId; - isLive: boolean; - }) => void; - const resolvePromise = new Promise<{ - kiloSessionId: typeof kiloSessionId; - cloudAgentSessionId: typeof cloudAgentSessionId; - isLive: boolean; - }>(r => { + }; + let resolveSession!: (value: CloudAgentResolved) => void; + const resolvePromise = new Promise(r => { resolveSession = r; }); @@ -463,7 +455,7 @@ describe('disconnect during resolution', () => { session.disconnect(); // Now let the resolution complete - resolveSession({ kiloSessionId, cloudAgentSessionId, isLive: true }); + resolveSession({ type: 'cloud-agent', kiloSessionId, cloudAgentSessionId }); await resolvePromise; // Flush microtasks so resolveAndConnect can run its post-resolve code await new Promise(r => setTimeout(r, 0)); diff --git a/src/lib/cloud-agent-sdk/session.ts b/src/lib/cloud-agent-sdk/session.ts index aa89e7b75..7f81fd2d4 100644 --- a/src/lib/cloud-agent-sdk/session.ts +++ b/src/lib/cloud-agent-sdk/session.ts @@ -142,69 +142,75 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes function pickTransportFactory(resolved: ResolvedSession): TransportFactory { console.log('[cli-debug] pickTransportFactory: resolved=%o', resolved); - if (resolved.cloudAgentSessionId) { - if (!config.transport.getTicket) { - throw new Error( - 'CloudAgentSession transport.getTicket is required for Cloud Agent sessions' + switch (resolved.type) { + case 'remote': { + if (!config.transport.cliWebsocketUrl || !config.transport.getAuthToken) { + throw new Error( + 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for remote CLI sessions' + ); + } + console.log( + '[cli-debug] pickTransportFactory: → CLI Live transport (kiloSessionId=%s, wsUrl=%s)', + resolved.kiloSessionId, + config.transport.cliWebsocketUrl ); + return createCliLiveTransport({ + kiloSessionId: resolved.kiloSessionId, + websocketUrl: config.transport.cliWebsocketUrl, + getAuthToken: config.transport.getAuthToken, + fetchSnapshot: config.transport.fetchSnapshot, + onError: config.onError, + }); } - if (!config.transport.fetchSnapshot) { - throw new Error( - 'CloudAgentSession transport.fetchSnapshot is required for Cloud Agent sessions' + case 'cloud-agent': { + if (!config.transport.getTicket) { + throw new Error( + 'CloudAgentSession transport.getTicket is required for Cloud Agent sessions' + ); + } + if (!config.transport.fetchSnapshot) { + throw new Error( + 'CloudAgentSession transport.fetchSnapshot is required for Cloud Agent sessions' + ); + } + if (!config.transport.api) { + throw new Error('CloudAgentSession transport.api is required for Cloud Agent sessions'); + } + console.log( + '[cli-debug] pickTransportFactory: → Cloud Agent transport (cloudAgentSessionId=%s)', + resolved.cloudAgentSessionId ); + return createCloudAgentTransport({ + sessionId: resolved.cloudAgentSessionId, + kiloSessionId: config.kiloSessionId, + api: config.transport.api, + getTicket: config.transport.getTicket, + fetchSnapshot: config.transport.fetchSnapshot, + websocketBaseUrl: config.websocketBaseUrl, + onError: config.onError, + }); } - if (!config.transport.api) { - throw new Error('CloudAgentSession transport.api is required for Cloud Agent sessions'); - } - console.log( - '[cli-debug] pickTransportFactory: → Cloud Agent transport (cloudAgentSessionId=%s)', - resolved.cloudAgentSessionId - ); - return createCloudAgentTransport({ - sessionId: resolved.cloudAgentSessionId, - kiloSessionId: config.kiloSessionId, - api: config.transport.api, - getTicket: config.transport.getTicket, - fetchSnapshot: config.transport.fetchSnapshot, - websocketBaseUrl: config.websocketBaseUrl, - onError: config.onError, - }); - } - - if (resolved.isLive) { - if (!config.transport.cliWebsocketUrl || !config.transport.getAuthToken) { - throw new Error( - 'CloudAgentSession transport.cliWebsocketUrl and getAuthToken are required for live CLI sessions' + case 'read-only': { + if (!config.transport.fetchSnapshot) { + throw new Error( + 'CloudAgentSession transport.fetchSnapshot is required for read-only sessions' + ); + } + console.log( + '[cli-debug] pickTransportFactory: → Historical transport (kiloSessionId=%s)', + resolved.kiloSessionId ); + return createCliHistoricalTransport({ + kiloSessionId: resolved.kiloSessionId, + fetchSnapshot: config.transport.fetchSnapshot, + onError: config.onError, + }); + } + default: { + const _exhaustive: never = resolved; + throw new Error(`Unknown resolved session type: ${(_exhaustive as { type: string }).type}`); } - console.log( - '[cli-debug] pickTransportFactory: → CLI Live transport (kiloSessionId=%s, wsUrl=%s)', - resolved.kiloSessionId, - config.transport.cliWebsocketUrl - ); - return createCliLiveTransport({ - kiloSessionId: resolved.kiloSessionId, - websocketUrl: config.transport.cliWebsocketUrl, - getAuthToken: config.transport.getAuthToken, - fetchSnapshot: config.transport.fetchSnapshot, - onError: config.onError, - }); - } - - if (!config.transport.fetchSnapshot) { - throw new Error( - 'CloudAgentSession transport.fetchSnapshot is required for historical CLI sessions' - ); } - console.log( - '[cli-debug] pickTransportFactory: → CLI Historical transport (kiloSessionId=%s)', - resolved.kiloSessionId - ); - return createCliHistoricalTransport({ - kiloSessionId: resolved.kiloSessionId, - fetchSnapshot: config.transport.fetchSnapshot, - onError: config.onError, - }); } async function resolveAndConnect(expectedGeneration: number): Promise { diff --git a/src/lib/cloud-agent-sdk/types.ts b/src/lib/cloud-agent-sdk/types.ts index f2b7ed46c..5c789363c 100644 --- a/src/lib/cloud-agent-sdk/types.ts +++ b/src/lib/cloud-agent-sdk/types.ts @@ -108,11 +108,10 @@ export type ServiceStateSnapshot = { // Session resolution — determines session type and transport routing // --------------------------------------------------------------------------- -export type ResolvedSession = { - kiloSessionId: KiloSessionId; - cloudAgentSessionId: CloudAgentSessionId | null; // null = CLI session - isLive: boolean; -}; +export type ResolvedSession = + | { type: 'remote'; kiloSessionId: KiloSessionId } + | { type: 'cloud-agent'; kiloSessionId: KiloSessionId; cloudAgentSessionId: CloudAgentSessionId } + | { type: 'read-only'; kiloSessionId: KiloSessionId }; // --------------------------------------------------------------------------- // Historical session snapshot — used by CLI historical transport