From 2bd6cb4b9bc8f6d2ea2bd648fe2898a4f55d6b27 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 01:05:38 -0600 Subject: [PATCH 1/3] feat: implement WebSocket handler and event system Add WebSocket endpoint with token auth, client tracking, command dispatch, and typed discriminated union events for real-time server-client communication. - ws://:/ws?token= endpoint via HTTP upgrade - Token validation from query parameter (sync and async) - Client set with per-client terminal subscription state - Command dispatch: ping, terminal:subscribe/unsubscribe, terminal:input - Server events: pong, manifest:updated, agent:status, terminal:output, error - broadcast() and sendEvent() helpers - Proper cleanup on disconnect and server shutdown - 26 tests covering auth, commands, broadcast, and cleanup Closes #73 --- package-lock.json | 33 +++ package.json | 2 + src/server/ws/events.ts | 110 ++++++++++ src/server/ws/handler.test.ts | 375 ++++++++++++++++++++++++++++++++++ src/server/ws/handler.ts | 159 ++++++++++++++ 5 files changed, 679 insertions(+) create mode 100644 src/server/ws/events.ts create mode 100644 src/server/ws/handler.test.ts create mode 100644 src/server/ws/handler.ts diff --git a/package-lock.json b/package-lock.json index a036a8f..97d7a7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "nanoid": "^5.1.5", "proper-lockfile": "^4.1.2", "write-file-atomic": "^7.0.0", + "ws": "^8.19.0", "yaml": "^2.7.1" }, "bin": { @@ -23,6 +24,7 @@ "devDependencies": { "@types/node": "^22.13.4", "@types/proper-lockfile": "^4.1.4", + "@types/ws": "^8.18.1", "tsup": "^8.4.0", "tsx": "^4.19.3", "typescript": "^5.7.3", @@ -933,6 +935,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@vitest/expect": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/@vitest/expect/-/expect-3.2.4.tgz", @@ -2491,6 +2503,27 @@ "node": "^20.17.0 || >=22.9.0" } }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/yaml": { "version": "2.8.2", "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.2.tgz", diff --git a/package.json b/package.json index b4cd8bf..6782c1f 100644 --- a/package.json +++ b/package.json @@ -51,11 +51,13 @@ "nanoid": "^5.1.5", "proper-lockfile": "^4.1.2", "write-file-atomic": "^7.0.0", + "ws": "^8.19.0", "yaml": "^2.7.1" }, "devDependencies": { "@types/node": "^22.13.4", "@types/proper-lockfile": "^4.1.4", + "@types/ws": "^8.18.1", "tsup": "^8.4.0", "tsx": "^4.19.3", "typescript": "^5.7.3", diff --git a/src/server/ws/events.ts b/src/server/ws/events.ts new file mode 100644 index 0000000..82878a6 --- /dev/null +++ b/src/server/ws/events.ts @@ -0,0 +1,110 @@ +import type { AgentStatus, Manifest, WorktreeStatus } from '../../types/manifest.js'; + +// --- Inbound Commands (client → server) --- + +export interface PingCommand { + type: 'ping'; +} + +export interface TerminalSubscribeCommand { + type: 'terminal:subscribe'; + agentId: string; +} + +export interface TerminalUnsubscribeCommand { + type: 'terminal:unsubscribe'; + agentId: string; +} + +export interface TerminalInputCommand { + type: 'terminal:input'; + agentId: string; + data: string; +} + +export type ClientCommand = + | PingCommand + | TerminalSubscribeCommand + | TerminalUnsubscribeCommand + | TerminalInputCommand; + +// --- Outbound Events (server → client) --- + +export interface PongEvent { + type: 'pong'; +} + +export interface ManifestUpdatedEvent { + type: 'manifest:updated'; + manifest: Manifest; +} + +export interface AgentStatusEvent { + type: 'agent:status'; + worktreeId: string; + agentId: string; + status: AgentStatus; + worktreeStatus: WorktreeStatus; +} + +export interface TerminalOutputEvent { + type: 'terminal:output'; + agentId: string; + data: string; +} + +export interface ErrorEvent { + type: 'error'; + code: string; + message: string; +} + +export type ServerEvent = + | PongEvent + | ManifestUpdatedEvent + | AgentStatusEvent + | TerminalOutputEvent + | ErrorEvent; + +// --- Parsing --- + +const VALID_COMMAND_TYPES = new Set([ + 'ping', + 'terminal:subscribe', + 'terminal:unsubscribe', + 'terminal:input', +]); + +export function parseCommand(raw: string): ClientCommand | null { + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch { + return null; + } + + if (typeof parsed !== 'object' || parsed === null) return null; + + const obj = parsed as Record; + if (typeof obj.type !== 'string' || !VALID_COMMAND_TYPES.has(obj.type)) return null; + + if (obj.type === 'ping') { + return { type: 'ping' }; + } + + if (obj.type === 'terminal:subscribe' || obj.type === 'terminal:unsubscribe') { + if (typeof obj.agentId !== 'string') return null; + return { type: obj.type, agentId: obj.agentId }; + } + + if (obj.type === 'terminal:input') { + if (typeof obj.agentId !== 'string' || typeof obj.data !== 'string') return null; + return { type: 'terminal:input', agentId: obj.agentId, data: obj.data }; + } + + return null; +} + +export function serializeEvent(event: ServerEvent): string { + return JSON.stringify(event); +} diff --git a/src/server/ws/handler.test.ts b/src/server/ws/handler.test.ts new file mode 100644 index 0000000..425fb75 --- /dev/null +++ b/src/server/ws/handler.test.ts @@ -0,0 +1,375 @@ +import { describe, test, expect, beforeEach, afterEach } from 'vitest'; +import http from 'node:http'; +import { WebSocket } from 'ws'; +import { createWsHandler, type WsHandler } from './handler.js'; +import { parseCommand, serializeEvent, type ServerEvent } from './events.js'; + +// --- Helpers --- + +function createTestServer(): http.Server { + return http.createServer((_req, res) => { + res.writeHead(404); + res.end(); + }); +} + +function listen(server: http.Server): Promise { + return new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + if (typeof addr === 'object' && addr !== null) { + resolve(addr.port); + } + }); + }); +} + +function closeServer(server: http.Server): Promise { + return new Promise((resolve) => { + server.close(() => resolve()); + }); +} + +function connectWs(port: number, token: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=${token}`); + ws.on('open', () => resolve(ws)); + ws.on('error', reject); + }); +} + +function waitForMessage(ws: WebSocket): Promise { + return new Promise((resolve) => { + ws.once('message', (data: Buffer | string) => { + const str = typeof data === 'string' ? data : data.toString('utf-8'); + resolve(JSON.parse(str) as ServerEvent); + }); + }); +} + +/** Wait for a ws client to close or error (rejected upgrades emit error then close) */ +function waitForDisconnect(ws: WebSocket): Promise { + return new Promise((resolve) => { + if (ws.readyState === WebSocket.CLOSED) { + resolve(); + return; + } + ws.on('close', () => resolve()); + ws.on('error', () => { + // error fires before close on rejected upgrades — wait for close + if (ws.readyState === WebSocket.CLOSED) resolve(); + }); + }); +} + +function send(ws: WebSocket, obj: Record): void { + ws.send(JSON.stringify(obj)); +} + +// --- Tests --- + +describe('WebSocket handler', () => { + let server: http.Server; + let handler: WsHandler; + const openSockets: WebSocket[] = []; + + async function setup( + opts: { + validateToken?: (token: string) => boolean | Promise; + onTerminalInput?: (agentId: string, data: string) => void; + } = {}, + ): Promise { + server = createTestServer(); + const port = await listen(server); + handler = createWsHandler({ + server, + validateToken: opts.validateToken ?? ((t) => t === 'valid-token'), + onTerminalInput: opts.onTerminalInput, + }); + return port; + } + + async function connect(port: number, token = 'valid-token'): Promise { + const ws = await connectWs(port, token); + openSockets.push(ws); + return ws; + } + + afterEach(async () => { + for (const ws of openSockets) { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + } + openSockets.length = 0; + + if (handler) { + await handler.close().catch(() => {}); + } + if (server?.listening) { + await closeServer(server); + } + }); + + describe('connection and auth', () => { + test('accepts connection with valid token', async () => { + const port = await setup(); + const ws = await connect(port); + expect(ws.readyState).toBe(WebSocket.OPEN); + expect(handler.clients.size).toBe(1); + }); + + test('rejects connection with invalid token', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=bad-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('rejects connection with no token', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('rejects connection on wrong path', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/other?token=valid-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('supports async token validation', async () => { + const port = await setup({ + validateToken: async (t) => t === 'async-token', + }); + const ws = await connect(port, 'async-token'); + expect(ws.readyState).toBe(WebSocket.OPEN); + }); + }); + + describe('command dispatch', () => { + test('responds to ping with pong', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'ping' }); + + const event = await msgPromise; + expect(event).toEqual({ type: 'pong' }); + }); + + test('sends error for invalid JSON', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + ws.send('not json'); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('INVALID_COMMAND'); + }); + + test('sends error for unknown command type', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'unknown' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('INVALID_COMMAND'); + }); + + test('handles terminal:subscribe', async () => { + const port = await setup(); + const ws = await connect(port); + + send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); + await new Promise((r) => setTimeout(r, 50)); + + const [client] = handler.clients; + expect(client.subscribedAgents.has('ag-12345678')).toBe(true); + }); + + test('handles terminal:unsubscribe', async () => { + const port = await setup(); + const ws = await connect(port); + + send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); + await new Promise((r) => setTimeout(r, 50)); + + send(ws, { type: 'terminal:unsubscribe', agentId: 'ag-12345678' }); + await new Promise((r) => setTimeout(r, 50)); + + const [client] = handler.clients; + expect(client.subscribedAgents.has('ag-12345678')).toBe(false); + }); + + test('handles terminal:input and calls onTerminalInput', async () => { + let capturedAgentId = ''; + let capturedData = ''; + + const port = await setup({ + onTerminalInput: (agentId, data) => { + capturedAgentId = agentId; + capturedData = data; + }, + }); + const ws = await connect(port); + + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + await new Promise((r) => setTimeout(r, 50)); + + expect(capturedAgentId).toBe('ag-12345678'); + expect(capturedData).toBe('hello\n'); + }); + }); + + describe('broadcast and sendEvent', () => { + test('broadcast sends to all connected clients', async () => { + const port = await setup(); + const ws1 = await connect(port); + const ws2 = await connect(port); + + expect(handler.clients.size).toBe(2); + + const msg1 = waitForMessage(ws1); + const msg2 = waitForMessage(ws2); + + handler.broadcast({ + type: 'manifest:updated', + manifest: { + version: 1, + projectRoot: '/tmp', + sessionName: 'test', + worktrees: {}, + createdAt: '2025-01-01T00:00:00Z', + updatedAt: '2025-01-01T00:00:00Z', + }, + }); + + const [event1, event2] = await Promise.all([msg1, msg2]); + expect(event1.type).toBe('manifest:updated'); + expect(event2.type).toBe('manifest:updated'); + }); + + test('sendEvent sends to specific client only', async () => { + const port = await setup(); + const ws1 = await connect(port); + await connect(port); // ws2 — should not receive + + const msg1 = waitForMessage(ws1); + const [client1] = handler.clients; + + handler.sendEvent(client1, { type: 'pong' }); + + const event = await msg1; + expect(event).toEqual({ type: 'pong' }); + }); + }); + + describe('cleanup', () => { + test('removes client on disconnect', async () => { + const port = await setup(); + const ws = await connect(port); + + expect(handler.clients.size).toBe(1); + + ws.close(); + await waitForDisconnect(ws); + await new Promise((r) => setTimeout(r, 50)); + + expect(handler.clients.size).toBe(0); + }); + + test('close() terminates all clients', async () => { + const port = await setup(); + const ws1 = await connect(port); + const ws2 = await connect(port); + + const close1 = waitForDisconnect(ws1); + const close2 = waitForDisconnect(ws2); + + await handler.close(); + await Promise.all([close1, close2]); + + expect(handler.clients.size).toBe(0); + }); + }); +}); + +describe('parseCommand', () => { + test('parses ping command', () => { + expect(parseCommand('{"type":"ping"}')).toEqual({ type: 'ping' }); + }); + + test('parses terminal:subscribe', () => { + expect(parseCommand('{"type":"terminal:subscribe","agentId":"ag-123"}')).toEqual({ + type: 'terminal:subscribe', + agentId: 'ag-123', + }); + }); + + test('parses terminal:unsubscribe', () => { + expect(parseCommand('{"type":"terminal:unsubscribe","agentId":"ag-123"}')).toEqual({ + type: 'terminal:unsubscribe', + agentId: 'ag-123', + }); + }); + + test('parses terminal:input', () => { + expect(parseCommand('{"type":"terminal:input","agentId":"ag-123","data":"ls\\n"}')).toEqual({ + type: 'terminal:input', + agentId: 'ag-123', + data: 'ls\n', + }); + }); + + test('returns null for invalid JSON', () => { + expect(parseCommand('not json')).toBeNull(); + }); + + test('returns null for unknown type', () => { + expect(parseCommand('{"type":"unknown"}')).toBeNull(); + }); + + test('returns null for missing required fields', () => { + expect(parseCommand('{"type":"terminal:subscribe"}')).toBeNull(); + expect(parseCommand('{"type":"terminal:input","agentId":"ag-123"}')).toBeNull(); + }); + + test('returns null for non-object', () => { + expect(parseCommand('"string"')).toBeNull(); + expect(parseCommand('42')).toBeNull(); + expect(parseCommand('null')).toBeNull(); + }); +}); + +describe('serializeEvent', () => { + test('serializes pong event', () => { + expect(serializeEvent({ type: 'pong' })).toBe('{"type":"pong"}'); + }); + + test('serializes error event', () => { + const event: ServerEvent = { type: 'error', code: 'TEST', message: 'msg' }; + const parsed = JSON.parse(serializeEvent(event)); + expect(parsed).toEqual({ type: 'error', code: 'TEST', message: 'msg' }); + }); + + test('serializes terminal:output event', () => { + const event: ServerEvent = { type: 'terminal:output', agentId: 'ag-1', data: 'hello' }; + const parsed = JSON.parse(serializeEvent(event)); + expect(parsed).toEqual({ type: 'terminal:output', agentId: 'ag-1', data: 'hello' }); + }); +}); diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts new file mode 100644 index 0000000..f60f452 --- /dev/null +++ b/src/server/ws/handler.ts @@ -0,0 +1,159 @@ +import { URL } from 'node:url'; +import type { Server as HttpServer, IncomingMessage } from 'node:http'; +import { WebSocketServer, WebSocket } from 'ws'; +import type { Duplex } from 'node:stream'; +import { + parseCommand, + serializeEvent, + type ClientCommand, + type ServerEvent, +} from './events.js'; + +// --- Client State --- + +export interface ClientState { + ws: WebSocket; + subscribedAgents: Set; +} + +// --- Handler Options --- + +export interface WsHandlerOptions { + server: HttpServer; + validateToken: (token: string) => boolean | Promise; + onTerminalInput?: (agentId: string, data: string) => void | Promise; +} + +// --- WebSocket Handler --- + +export interface WsHandler { + wss: WebSocketServer; + clients: Set; + broadcast: (event: ServerEvent) => void; + sendEvent: (client: ClientState, event: ServerEvent) => void; + close: () => Promise; +} + +export function createWsHandler(options: WsHandlerOptions): WsHandler { + const { server, validateToken, onTerminalInput } = options; + + const wss = new WebSocketServer({ noServer: true }); + const clients = new Set(); + + function sendEvent(client: ClientState, event: ServerEvent): void { + if (client.ws.readyState === WebSocket.OPEN) { + client.ws.send(serializeEvent(event)); + } + } + + function broadcast(event: ServerEvent): void { + for (const client of clients) { + sendEvent(client, event); + } + } + + function handleCommand(client: ClientState, command: ClientCommand): void { + switch (command.type) { + case 'ping': + sendEvent(client, { type: 'pong' }); + break; + + case 'terminal:subscribe': + client.subscribedAgents.add(command.agentId); + break; + + case 'terminal:unsubscribe': + client.subscribedAgents.delete(command.agentId); + break; + + case 'terminal:input': + if (onTerminalInput) { + Promise.resolve(onTerminalInput(command.agentId, command.data)).catch(() => { + sendEvent(client, { + type: 'error', + code: 'TERMINAL_INPUT_FAILED', + message: `Failed to send input to agent ${command.agentId}`, + }); + }); + } + break; + } + } + + server.on('upgrade', (request: IncomingMessage, socket: Duplex, head: Buffer) => { + const url = new URL(request.url ?? '/', `http://${request.headers.host ?? 'localhost'}`); + + if (url.pathname !== '/ws') { + socket.destroy(); + return; + } + + const token = url.searchParams.get('token'); + if (!token) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + + Promise.resolve(validateToken(token)) + .then((valid) => { + if (!valid) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + + wss.handleUpgrade(request, socket, head, (ws) => { + wss.emit('connection', ws, request); + }); + }) + .catch(() => { + socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n'); + socket.destroy(); + }); + }); + + wss.on('connection', (ws: WebSocket) => { + const client: ClientState = { + ws, + subscribedAgents: new Set(), + }; + clients.add(client); + + ws.on('message', (raw: Buffer | string) => { + const data = typeof raw === 'string' ? raw : raw.toString('utf-8'); + const command = parseCommand(data); + + if (!command) { + sendEvent(client, { + type: 'error', + code: 'INVALID_COMMAND', + message: 'Could not parse command', + }); + return; + } + + handleCommand(client, command); + }); + + ws.on('close', () => { + clients.delete(client); + }); + + ws.on('error', () => { + clients.delete(client); + }); + }); + + async function close(): Promise { + for (const client of clients) { + client.ws.close(1001, 'Server shutting down'); + } + clients.clear(); + await new Promise((resolve, reject) => { + wss.close((err) => (err ? reject(err) : resolve())); + }); + } + + return { wss, clients, broadcast, sendEvent, close }; +} From 95ac0127ecea416d95f1802a2c6431edeee25723 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 07:54:58 -0600 Subject: [PATCH 2/3] fix: address code review findings for WebSocket handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Remove upgrade listener from server on close() to prevent listener leak if handler is recreated on the same server. P2: Pre-serialize event once in broadcast() instead of per-client. P2: Add maxPayload (64KB) to WebSocketServer to prevent OOM. P2: Replace timer-based test synchronization (setTimeout 50ms) with deterministic ping/pong round-trip barriers. P3: Remove unused beforeEach import. P3: Strengthen sendEvent specificity test to verify ws2 does not receive. P3: Add sendEvent skip test for closed socket readyState guard. P3: Add tests for onTerminalInput not provided (no-op path). P3: Add tests for sync and async onTerminalInput throw/reject paths. P3: Add test verifying close() removes upgrade listener. P3: Fix close() ordering: close clients → close WSS → clear set. P3: Wrap synchronous onTerminalInput throws in try/catch. --- src/server/ws/handler.test.ts | 108 ++++++++++++++++++++++++++++++---- src/server/ws/handler.ts | 30 +++++++--- 2 files changed, 118 insertions(+), 20 deletions(-) diff --git a/src/server/ws/handler.test.ts b/src/server/ws/handler.test.ts index 425fb75..f87d1d9 100644 --- a/src/server/ws/handler.test.ts +++ b/src/server/ws/handler.test.ts @@ -1,4 +1,4 @@ -import { describe, test, expect, beforeEach, afterEach } from 'vitest'; +import { describe, test, expect, afterEach } from 'vitest'; import http from 'node:http'; import { WebSocket } from 'ws'; import { createWsHandler, type WsHandler } from './handler.js'; @@ -56,7 +56,6 @@ function waitForDisconnect(ws: WebSocket): Promise { } ws.on('close', () => resolve()); ws.on('error', () => { - // error fires before close on rejected upgrades — wait for close if (ws.readyState === WebSocket.CLOSED) resolve(); }); }); @@ -66,6 +65,13 @@ function send(ws: WebSocket, obj: Record): void { ws.send(JSON.stringify(obj)); } +/** Send a ping and wait for pong — acts as a deterministic sync barrier. */ +async function roundTrip(ws: WebSocket): Promise { + const msg = waitForMessage(ws); + send(ws, { type: 'ping' }); + await msg; +} + // --- Tests --- describe('WebSocket handler', () => { @@ -76,7 +82,7 @@ describe('WebSocket handler', () => { async function setup( opts: { validateToken?: (token: string) => boolean | Promise; - onTerminalInput?: (agentId: string, data: string) => void; + onTerminalInput?: (agentId: string, data: string) => void | Promise; } = {}, ): Promise { server = createTestServer(); @@ -196,7 +202,7 @@ describe('WebSocket handler', () => { const ws = await connect(port); send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); - await new Promise((r) => setTimeout(r, 50)); + await roundTrip(ws); const [client] = handler.clients; expect(client.subscribedAgents.has('ag-12345678')).toBe(true); @@ -207,10 +213,10 @@ describe('WebSocket handler', () => { const ws = await connect(port); send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); - await new Promise((r) => setTimeout(r, 50)); + await roundTrip(ws); send(ws, { type: 'terminal:unsubscribe', agentId: 'ag-12345678' }); - await new Promise((r) => setTimeout(r, 50)); + await roundTrip(ws); const [client] = handler.clients; expect(client.subscribedAgents.has('ag-12345678')).toBe(false); @@ -229,11 +235,55 @@ describe('WebSocket handler', () => { const ws = await connect(port); send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); - await new Promise((r) => setTimeout(r, 50)); + await roundTrip(ws); expect(capturedAgentId).toBe('ag-12345678'); expect(capturedData).toBe('hello\n'); }); + + test('terminal:input is a no-op when onTerminalInput is not provided', async () => { + const port = await setup(); // no onTerminalInput + const ws = await connect(port); + + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + // Should not throw or send error — verify via round-trip + const msg = waitForMessage(ws); + send(ws, { type: 'ping' }); + const event = await msg; + expect(event).toEqual({ type: 'pong' }); + }); + + test('terminal:input sends error when onTerminalInput throws', async () => { + const port = await setup({ + onTerminalInput: () => { + throw new Error('tmux exploded'); + }, + }); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('TERMINAL_INPUT_FAILED'); + }); + + test('terminal:input sends error when async onTerminalInput rejects', async () => { + const port = await setup({ + onTerminalInput: async () => { + throw new Error('async tmux exploded'); + }, + }); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('TERMINAL_INPUT_FAILED'); + }); }); describe('broadcast and sendEvent', () => { @@ -267,15 +317,33 @@ describe('WebSocket handler', () => { test('sendEvent sends to specific client only', async () => { const port = await setup(); const ws1 = await connect(port); - await connect(port); // ws2 — should not receive + const ws2 = await connect(port); - const msg1 = waitForMessage(ws1); const [client1] = handler.clients; - handler.sendEvent(client1, { type: 'pong' }); - const event = await msg1; + // ws1 should receive the pong + const event = await waitForMessage(ws1); expect(event).toEqual({ type: 'pong' }); + + // ws2 should have no pending messages — verify by sending a ping + // and confirming the next message is the pong, not the earlier event + const msg2 = waitForMessage(ws2); + send(ws2, { type: 'ping' }); + const event2 = await msg2; + expect(event2).toEqual({ type: 'pong' }); + }); + + test('sendEvent skips client with closed socket', async () => { + const port = await setup(); + const ws = await connect(port); + + const [client] = handler.clients; + ws.close(); + await waitForDisconnect(ws); + + // Should not throw when sending to a closed client + handler.sendEvent(client, { type: 'pong' }); }); }); @@ -288,9 +356,11 @@ describe('WebSocket handler', () => { ws.close(); await waitForDisconnect(ws); - await new Promise((r) => setTimeout(r, 50)); + // Use a round-trip on a second connection as a sync barrier + const ws2 = await connect(port); + await roundTrip(ws2); - expect(handler.clients.size).toBe(0); + expect(handler.clients.size).toBe(1); // only ws2 remains }); test('close() terminates all clients', async () => { @@ -306,6 +376,18 @@ describe('WebSocket handler', () => { expect(handler.clients.size).toBe(0); }); + + test('close() removes upgrade listener from server', async () => { + const port = await setup(); + await handler.close(); + + // After close, a new WS connection attempt should not be handled + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=valid-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); }); }); diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts index f60f452..c03398c 100644 --- a/src/server/ws/handler.ts +++ b/src/server/ws/handler.ts @@ -34,10 +34,12 @@ export interface WsHandler { close: () => Promise; } +const MAX_PAYLOAD = 65_536; // 64 KB + export function createWsHandler(options: WsHandlerOptions): WsHandler { const { server, validateToken, onTerminalInput } = options; - const wss = new WebSocketServer({ noServer: true }); + const wss = new WebSocketServer({ noServer: true, maxPayload: MAX_PAYLOAD }); const clients = new Set(); function sendEvent(client: ClientState, event: ServerEvent): void { @@ -47,8 +49,11 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { } function broadcast(event: ServerEvent): void { + const data = serializeEvent(event); for (const client of clients) { - sendEvent(client, event); + if (client.ws.readyState === WebSocket.OPEN) { + client.ws.send(data); + } } } @@ -68,19 +73,27 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { case 'terminal:input': if (onTerminalInput) { - Promise.resolve(onTerminalInput(command.agentId, command.data)).catch(() => { + try { + Promise.resolve(onTerminalInput(command.agentId, command.data)).catch(() => { + sendEvent(client, { + type: 'error', + code: 'TERMINAL_INPUT_FAILED', + message: `Failed to send input to agent ${command.agentId}`, + }); + }); + } catch { sendEvent(client, { type: 'error', code: 'TERMINAL_INPUT_FAILED', message: `Failed to send input to agent ${command.agentId}`, }); - }); + } } break; } } - server.on('upgrade', (request: IncomingMessage, socket: Duplex, head: Buffer) => { + function onUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void { const url = new URL(request.url ?? '/', `http://${request.headers.host ?? 'localhost'}`); if (url.pathname !== '/ws') { @@ -111,7 +124,9 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n'); socket.destroy(); }); - }); + } + + server.on('upgrade', onUpgrade); wss.on('connection', (ws: WebSocket) => { const client: ClientState = { @@ -146,13 +161,14 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { }); async function close(): Promise { + server.removeListener('upgrade', onUpgrade); for (const client of clients) { client.ws.close(1001, 'Server shutting down'); } - clients.clear(); await new Promise((resolve, reject) => { wss.close((err) => (err ? reject(err) : resolve())); }); + clients.clear(); } return { wss, clients, broadcast, sendEvent, close }; From 8e4159bb7c8e7597777df4ccd94c054c71210618 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 08:34:49 -0600 Subject: [PATCH 3/3] fix ws handler robustness and unblock strict typecheck --- src/commands/spawn.test.ts | 5 ++- src/server/ws/handler.test.ts | 12 ++++-- src/server/ws/handler.ts | 73 +++++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..541d560 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -6,6 +6,7 @@ import { readManifest, resolveWorktree, updateManifest } from '../core/manifest. import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; +import type { Manifest } from '../types/manifest.js'; import * as tmux from '../core/tmux.js'; vi.mock('node:fs/promises', async () => { @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', @@ -103,7 +104,7 @@ function createManifest(tmuxWindow = '') { } describe('spawnCommand', () => { - let manifestState = createManifest(); + let manifestState: Manifest = createManifest(); let nextAgent = 1; let nextSession = 1; diff --git a/src/server/ws/handler.test.ts b/src/server/ws/handler.test.ts index f87d1d9..532b81f 100644 --- a/src/server/ws/handler.test.ts +++ b/src/server/ws/handler.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect, afterEach } from 'vitest'; import http from 'node:http'; -import { WebSocket } from 'ws'; +import { WebSocket, type RawData } from 'ws'; import { createWsHandler, type WsHandler } from './handler.js'; import { parseCommand, serializeEvent, type ServerEvent } from './events.js'; @@ -40,8 +40,14 @@ function connectWs(port: number, token: string): Promise { function waitForMessage(ws: WebSocket): Promise { return new Promise((resolve) => { - ws.once('message', (data: Buffer | string) => { - const str = typeof data === 'string' ? data : data.toString('utf-8'); + ws.once('message', (data: RawData) => { + const str = (() => { + if (typeof data === 'string') return data; + if (Buffer.isBuffer(data)) return data.toString('utf-8'); + if (data instanceof ArrayBuffer) return Buffer.from(data).toString('utf-8'); + if (Array.isArray(data)) return Buffer.concat(data).toString('utf-8'); + return ''; + })(); resolve(JSON.parse(str) as ServerEvent); }); }); diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts index c03398c..757d690 100644 --- a/src/server/ws/handler.ts +++ b/src/server/ws/handler.ts @@ -1,6 +1,7 @@ import { URL } from 'node:url'; import type { Server as HttpServer, IncomingMessage } from 'node:http'; import { WebSocketServer, WebSocket } from 'ws'; +import type { RawData } from 'ws'; import type { Duplex } from 'node:stream'; import { parseCommand, @@ -42,17 +43,46 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { const wss = new WebSocketServer({ noServer: true, maxPayload: MAX_PAYLOAD }); const clients = new Set(); + function sendData(ws: WebSocket, data: string): boolean { + if (ws.readyState !== WebSocket.OPEN) return false; + try { + ws.send(data); + return true; + } catch { + return false; + } + } + + function decodeRawData(raw: RawData): string { + if (typeof raw === 'string') return raw; + if (Buffer.isBuffer(raw)) return raw.toString('utf-8'); + if (raw instanceof ArrayBuffer) return Buffer.from(raw).toString('utf-8'); + if (Array.isArray(raw)) return Buffer.concat(raw).toString('utf-8'); + return ''; + } + + function rejectUpgrade(socket: Duplex, statusLine: string): void { + if (socket.destroyed) return; + try { + socket.write(`${statusLine}\r\nConnection: close\r\n\r\n`); + } catch { + // ignore write errors on broken sockets + } finally { + socket.destroy(); + } + } + function sendEvent(client: ClientState, event: ServerEvent): void { - if (client.ws.readyState === WebSocket.OPEN) { - client.ws.send(serializeEvent(event)); + if (!sendData(client.ws, serializeEvent(event))) { + clients.delete(client); } } function broadcast(event: ServerEvent): void { const data = serializeEvent(event); for (const client of clients) { - if (client.ws.readyState === WebSocket.OPEN) { - client.ws.send(data); + if (!sendData(client.ws, data)) { + clients.delete(client); } } } @@ -94,35 +124,44 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { } function onUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void { - const url = new URL(request.url ?? '/', `http://${request.headers.host ?? 'localhost'}`); + let url: URL; + try { + // The path/query in request.url is all we need; avoid trusting Host header. + url = new URL(request.url ?? '/', 'http://localhost'); + } catch { + rejectUpgrade(socket, 'HTTP/1.1 400 Bad Request'); + return; + } if (url.pathname !== '/ws') { - socket.destroy(); + rejectUpgrade(socket, 'HTTP/1.1 404 Not Found'); return; } const token = url.searchParams.get('token'); if (!token) { - socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); - socket.destroy(); + rejectUpgrade(socket, 'HTTP/1.1 401 Unauthorized'); return; } Promise.resolve(validateToken(token)) .then((valid) => { + if (socket.destroyed) return; if (!valid) { - socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); - socket.destroy(); + rejectUpgrade(socket, 'HTTP/1.1 401 Unauthorized'); return; } - wss.handleUpgrade(request, socket, head, (ws) => { - wss.emit('connection', ws, request); - }); + try { + wss.handleUpgrade(request, socket, head, (ws) => { + wss.emit('connection', ws, request); + }); + } catch { + rejectUpgrade(socket, 'HTTP/1.1 500 Internal Server Error'); + } }) .catch(() => { - socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n'); - socket.destroy(); + rejectUpgrade(socket, 'HTTP/1.1 500 Internal Server Error'); }); } @@ -135,8 +174,8 @@ export function createWsHandler(options: WsHandlerOptions): WsHandler { }; clients.add(client); - ws.on('message', (raw: Buffer | string) => { - const data = typeof raw === 'string' ? raw : raw.toString('utf-8'); + ws.on('message', (raw: RawData) => { + const data = decodeRawData(raw); const command = parseCommand(data); if (!command) {