diff --git a/package-lock.json b/package-lock.json index a036a8f..97d7a7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "nanoid": "^5.1.5", "proper-lockfile": "^4.1.2", "write-file-atomic": "^7.0.0", + "ws": "^8.19.0", "yaml": "^2.7.1" }, "bin": { @@ -23,6 +24,7 @@ "devDependencies": { "@types/node": "^22.13.4", "@types/proper-lockfile": "^4.1.4", + "@types/ws": "^8.18.1", "tsup": "^8.4.0", "tsx": "^4.19.3", "typescript": "^5.7.3", @@ -933,6 +935,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@vitest/expect": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/@vitest/expect/-/expect-3.2.4.tgz", @@ -2491,6 +2503,27 @@ "node": "^20.17.0 || >=22.9.0" } }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/yaml": { "version": "2.8.2", "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.2.tgz", diff --git a/package.json b/package.json index b4cd8bf..6782c1f 100644 --- a/package.json +++ b/package.json @@ -51,11 +51,13 @@ "nanoid": "^5.1.5", "proper-lockfile": "^4.1.2", "write-file-atomic": "^7.0.0", + "ws": "^8.19.0", "yaml": "^2.7.1" }, "devDependencies": { "@types/node": "^22.13.4", "@types/proper-lockfile": "^4.1.4", + "@types/ws": "^8.18.1", "tsup": "^8.4.0", "tsx": "^4.19.3", "typescript": "^5.7.3", diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..541d560 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -6,6 +6,7 @@ import { readManifest, resolveWorktree, updateManifest } from '../core/manifest. import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; +import type { Manifest } from '../types/manifest.js'; import * as tmux from '../core/tmux.js'; vi.mock('node:fs/promises', async () => { @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', @@ -103,7 +104,7 @@ function createManifest(tmuxWindow = '') { } describe('spawnCommand', () => { - let manifestState = createManifest(); + let manifestState: Manifest = createManifest(); let nextAgent = 1; let nextSession = 1; diff --git a/src/server/ws/events.ts b/src/server/ws/events.ts new file mode 100644 index 0000000..82878a6 --- /dev/null +++ b/src/server/ws/events.ts @@ -0,0 +1,110 @@ +import type { AgentStatus, Manifest, WorktreeStatus } from '../../types/manifest.js'; + +// --- Inbound Commands (client → server) --- + +export interface PingCommand { + type: 'ping'; +} + +export interface TerminalSubscribeCommand { + type: 'terminal:subscribe'; + agentId: string; +} + +export interface TerminalUnsubscribeCommand { + type: 'terminal:unsubscribe'; + agentId: string; +} + +export interface TerminalInputCommand { + type: 'terminal:input'; + agentId: string; + data: string; +} + +export type ClientCommand = + | PingCommand + | TerminalSubscribeCommand + | TerminalUnsubscribeCommand + | TerminalInputCommand; + +// --- Outbound Events (server → client) --- + +export interface PongEvent { + type: 'pong'; +} + +export interface ManifestUpdatedEvent { + type: 'manifest:updated'; + manifest: Manifest; +} + +export interface AgentStatusEvent { + type: 'agent:status'; + worktreeId: string; + agentId: string; + status: AgentStatus; + worktreeStatus: WorktreeStatus; +} + +export interface TerminalOutputEvent { + type: 'terminal:output'; + agentId: string; + data: string; +} + +export interface ErrorEvent { + type: 'error'; + code: string; + message: string; +} + +export type ServerEvent = + | PongEvent + | ManifestUpdatedEvent + | AgentStatusEvent + | TerminalOutputEvent + | ErrorEvent; + +// --- Parsing --- + +const VALID_COMMAND_TYPES = new Set([ + 'ping', + 'terminal:subscribe', + 'terminal:unsubscribe', + 'terminal:input', +]); + +export function parseCommand(raw: string): ClientCommand | null { + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch { + return null; + } + + if (typeof parsed !== 'object' || parsed === null) return null; + + const obj = parsed as Record; + if (typeof obj.type !== 'string' || !VALID_COMMAND_TYPES.has(obj.type)) return null; + + if (obj.type === 'ping') { + return { type: 'ping' }; + } + + if (obj.type === 'terminal:subscribe' || obj.type === 'terminal:unsubscribe') { + if (typeof obj.agentId !== 'string') return null; + return { type: obj.type, agentId: obj.agentId }; + } + + if (obj.type === 'terminal:input') { + if (typeof obj.agentId !== 'string' || typeof obj.data !== 'string') return null; + return { type: 'terminal:input', agentId: obj.agentId, data: obj.data }; + } + + return null; +} + +export function serializeEvent(event: ServerEvent): string { + return JSON.stringify(event); +} diff --git a/src/server/ws/handler.test.ts b/src/server/ws/handler.test.ts new file mode 100644 index 0000000..532b81f --- /dev/null +++ b/src/server/ws/handler.test.ts @@ -0,0 +1,463 @@ +import { describe, test, expect, afterEach } from 'vitest'; +import http from 'node:http'; +import { WebSocket, type RawData } from 'ws'; +import { createWsHandler, type WsHandler } from './handler.js'; +import { parseCommand, serializeEvent, type ServerEvent } from './events.js'; + +// --- Helpers --- + +function createTestServer(): http.Server { + return http.createServer((_req, res) => { + res.writeHead(404); + res.end(); + }); +} + +function listen(server: http.Server): Promise { + return new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + if (typeof addr === 'object' && addr !== null) { + resolve(addr.port); + } + }); + }); +} + +function closeServer(server: http.Server): Promise { + return new Promise((resolve) => { + server.close(() => resolve()); + }); +} + +function connectWs(port: number, token: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=${token}`); + ws.on('open', () => resolve(ws)); + ws.on('error', reject); + }); +} + +function waitForMessage(ws: WebSocket): Promise { + return new Promise((resolve) => { + ws.once('message', (data: RawData) => { + const str = (() => { + if (typeof data === 'string') return data; + if (Buffer.isBuffer(data)) return data.toString('utf-8'); + if (data instanceof ArrayBuffer) return Buffer.from(data).toString('utf-8'); + if (Array.isArray(data)) return Buffer.concat(data).toString('utf-8'); + return ''; + })(); + resolve(JSON.parse(str) as ServerEvent); + }); + }); +} + +/** Wait for a ws client to close or error (rejected upgrades emit error then close) */ +function waitForDisconnect(ws: WebSocket): Promise { + return new Promise((resolve) => { + if (ws.readyState === WebSocket.CLOSED) { + resolve(); + return; + } + ws.on('close', () => resolve()); + ws.on('error', () => { + if (ws.readyState === WebSocket.CLOSED) resolve(); + }); + }); +} + +function send(ws: WebSocket, obj: Record): void { + ws.send(JSON.stringify(obj)); +} + +/** Send a ping and wait for pong — acts as a deterministic sync barrier. */ +async function roundTrip(ws: WebSocket): Promise { + const msg = waitForMessage(ws); + send(ws, { type: 'ping' }); + await msg; +} + +// --- Tests --- + +describe('WebSocket handler', () => { + let server: http.Server; + let handler: WsHandler; + const openSockets: WebSocket[] = []; + + async function setup( + opts: { + validateToken?: (token: string) => boolean | Promise; + onTerminalInput?: (agentId: string, data: string) => void | Promise; + } = {}, + ): Promise { + server = createTestServer(); + const port = await listen(server); + handler = createWsHandler({ + server, + validateToken: opts.validateToken ?? ((t) => t === 'valid-token'), + onTerminalInput: opts.onTerminalInput, + }); + return port; + } + + async function connect(port: number, token = 'valid-token'): Promise { + const ws = await connectWs(port, token); + openSockets.push(ws); + return ws; + } + + afterEach(async () => { + for (const ws of openSockets) { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + } + openSockets.length = 0; + + if (handler) { + await handler.close().catch(() => {}); + } + if (server?.listening) { + await closeServer(server); + } + }); + + describe('connection and auth', () => { + test('accepts connection with valid token', async () => { + const port = await setup(); + const ws = await connect(port); + expect(ws.readyState).toBe(WebSocket.OPEN); + expect(handler.clients.size).toBe(1); + }); + + test('rejects connection with invalid token', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=bad-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('rejects connection with no token', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('rejects connection on wrong path', async () => { + const port = await setup(); + const ws = new WebSocket(`ws://127.0.0.1:${port}/other?token=valid-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + + test('supports async token validation', async () => { + const port = await setup({ + validateToken: async (t) => t === 'async-token', + }); + const ws = await connect(port, 'async-token'); + expect(ws.readyState).toBe(WebSocket.OPEN); + }); + }); + + describe('command dispatch', () => { + test('responds to ping with pong', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'ping' }); + + const event = await msgPromise; + expect(event).toEqual({ type: 'pong' }); + }); + + test('sends error for invalid JSON', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + ws.send('not json'); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('INVALID_COMMAND'); + }); + + test('sends error for unknown command type', async () => { + const port = await setup(); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'unknown' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('INVALID_COMMAND'); + }); + + test('handles terminal:subscribe', async () => { + const port = await setup(); + const ws = await connect(port); + + send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); + await roundTrip(ws); + + const [client] = handler.clients; + expect(client.subscribedAgents.has('ag-12345678')).toBe(true); + }); + + test('handles terminal:unsubscribe', async () => { + const port = await setup(); + const ws = await connect(port); + + send(ws, { type: 'terminal:subscribe', agentId: 'ag-12345678' }); + await roundTrip(ws); + + send(ws, { type: 'terminal:unsubscribe', agentId: 'ag-12345678' }); + await roundTrip(ws); + + const [client] = handler.clients; + expect(client.subscribedAgents.has('ag-12345678')).toBe(false); + }); + + test('handles terminal:input and calls onTerminalInput', async () => { + let capturedAgentId = ''; + let capturedData = ''; + + const port = await setup({ + onTerminalInput: (agentId, data) => { + capturedAgentId = agentId; + capturedData = data; + }, + }); + const ws = await connect(port); + + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + await roundTrip(ws); + + expect(capturedAgentId).toBe('ag-12345678'); + expect(capturedData).toBe('hello\n'); + }); + + test('terminal:input is a no-op when onTerminalInput is not provided', async () => { + const port = await setup(); // no onTerminalInput + const ws = await connect(port); + + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + // Should not throw or send error — verify via round-trip + const msg = waitForMessage(ws); + send(ws, { type: 'ping' }); + const event = await msg; + expect(event).toEqual({ type: 'pong' }); + }); + + test('terminal:input sends error when onTerminalInput throws', async () => { + const port = await setup({ + onTerminalInput: () => { + throw new Error('tmux exploded'); + }, + }); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('TERMINAL_INPUT_FAILED'); + }); + + test('terminal:input sends error when async onTerminalInput rejects', async () => { + const port = await setup({ + onTerminalInput: async () => { + throw new Error('async tmux exploded'); + }, + }); + const ws = await connect(port); + + const msgPromise = waitForMessage(ws); + send(ws, { type: 'terminal:input', agentId: 'ag-12345678', data: 'hello\n' }); + + const event = await msgPromise; + expect(event.type).toBe('error'); + expect((event as { code: string }).code).toBe('TERMINAL_INPUT_FAILED'); + }); + }); + + describe('broadcast and sendEvent', () => { + test('broadcast sends to all connected clients', async () => { + const port = await setup(); + const ws1 = await connect(port); + const ws2 = await connect(port); + + expect(handler.clients.size).toBe(2); + + const msg1 = waitForMessage(ws1); + const msg2 = waitForMessage(ws2); + + handler.broadcast({ + type: 'manifest:updated', + manifest: { + version: 1, + projectRoot: '/tmp', + sessionName: 'test', + worktrees: {}, + createdAt: '2025-01-01T00:00:00Z', + updatedAt: '2025-01-01T00:00:00Z', + }, + }); + + const [event1, event2] = await Promise.all([msg1, msg2]); + expect(event1.type).toBe('manifest:updated'); + expect(event2.type).toBe('manifest:updated'); + }); + + test('sendEvent sends to specific client only', async () => { + const port = await setup(); + const ws1 = await connect(port); + const ws2 = await connect(port); + + const [client1] = handler.clients; + handler.sendEvent(client1, { type: 'pong' }); + + // ws1 should receive the pong + const event = await waitForMessage(ws1); + expect(event).toEqual({ type: 'pong' }); + + // ws2 should have no pending messages — verify by sending a ping + // and confirming the next message is the pong, not the earlier event + const msg2 = waitForMessage(ws2); + send(ws2, { type: 'ping' }); + const event2 = await msg2; + expect(event2).toEqual({ type: 'pong' }); + }); + + test('sendEvent skips client with closed socket', async () => { + const port = await setup(); + const ws = await connect(port); + + const [client] = handler.clients; + ws.close(); + await waitForDisconnect(ws); + + // Should not throw when sending to a closed client + handler.sendEvent(client, { type: 'pong' }); + }); + }); + + describe('cleanup', () => { + test('removes client on disconnect', async () => { + const port = await setup(); + const ws = await connect(port); + + expect(handler.clients.size).toBe(1); + + ws.close(); + await waitForDisconnect(ws); + // Use a round-trip on a second connection as a sync barrier + const ws2 = await connect(port); + await roundTrip(ws2); + + expect(handler.clients.size).toBe(1); // only ws2 remains + }); + + test('close() terminates all clients', async () => { + const port = await setup(); + const ws1 = await connect(port); + const ws2 = await connect(port); + + const close1 = waitForDisconnect(ws1); + const close2 = waitForDisconnect(ws2); + + await handler.close(); + await Promise.all([close1, close2]); + + expect(handler.clients.size).toBe(0); + }); + + test('close() removes upgrade listener from server', async () => { + const port = await setup(); + await handler.close(); + + // After close, a new WS connection attempt should not be handled + const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?token=valid-token`); + openSockets.push(ws); + + await waitForDisconnect(ws); + expect(handler.clients.size).toBe(0); + }); + }); +}); + +describe('parseCommand', () => { + test('parses ping command', () => { + expect(parseCommand('{"type":"ping"}')).toEqual({ type: 'ping' }); + }); + + test('parses terminal:subscribe', () => { + expect(parseCommand('{"type":"terminal:subscribe","agentId":"ag-123"}')).toEqual({ + type: 'terminal:subscribe', + agentId: 'ag-123', + }); + }); + + test('parses terminal:unsubscribe', () => { + expect(parseCommand('{"type":"terminal:unsubscribe","agentId":"ag-123"}')).toEqual({ + type: 'terminal:unsubscribe', + agentId: 'ag-123', + }); + }); + + test('parses terminal:input', () => { + expect(parseCommand('{"type":"terminal:input","agentId":"ag-123","data":"ls\\n"}')).toEqual({ + type: 'terminal:input', + agentId: 'ag-123', + data: 'ls\n', + }); + }); + + test('returns null for invalid JSON', () => { + expect(parseCommand('not json')).toBeNull(); + }); + + test('returns null for unknown type', () => { + expect(parseCommand('{"type":"unknown"}')).toBeNull(); + }); + + test('returns null for missing required fields', () => { + expect(parseCommand('{"type":"terminal:subscribe"}')).toBeNull(); + expect(parseCommand('{"type":"terminal:input","agentId":"ag-123"}')).toBeNull(); + }); + + test('returns null for non-object', () => { + expect(parseCommand('"string"')).toBeNull(); + expect(parseCommand('42')).toBeNull(); + expect(parseCommand('null')).toBeNull(); + }); +}); + +describe('serializeEvent', () => { + test('serializes pong event', () => { + expect(serializeEvent({ type: 'pong' })).toBe('{"type":"pong"}'); + }); + + test('serializes error event', () => { + const event: ServerEvent = { type: 'error', code: 'TEST', message: 'msg' }; + const parsed = JSON.parse(serializeEvent(event)); + expect(parsed).toEqual({ type: 'error', code: 'TEST', message: 'msg' }); + }); + + test('serializes terminal:output event', () => { + const event: ServerEvent = { type: 'terminal:output', agentId: 'ag-1', data: 'hello' }; + const parsed = JSON.parse(serializeEvent(event)); + expect(parsed).toEqual({ type: 'terminal:output', agentId: 'ag-1', data: 'hello' }); + }); +}); diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts new file mode 100644 index 0000000..757d690 --- /dev/null +++ b/src/server/ws/handler.ts @@ -0,0 +1,214 @@ +import { URL } from 'node:url'; +import type { Server as HttpServer, IncomingMessage } from 'node:http'; +import { WebSocketServer, WebSocket } from 'ws'; +import type { RawData } from 'ws'; +import type { Duplex } from 'node:stream'; +import { + parseCommand, + serializeEvent, + type ClientCommand, + type ServerEvent, +} from './events.js'; + +// --- Client State --- + +export interface ClientState { + ws: WebSocket; + subscribedAgents: Set; +} + +// --- Handler Options --- + +export interface WsHandlerOptions { + server: HttpServer; + validateToken: (token: string) => boolean | Promise; + onTerminalInput?: (agentId: string, data: string) => void | Promise; +} + +// --- WebSocket Handler --- + +export interface WsHandler { + wss: WebSocketServer; + clients: Set; + broadcast: (event: ServerEvent) => void; + sendEvent: (client: ClientState, event: ServerEvent) => void; + close: () => Promise; +} + +const MAX_PAYLOAD = 65_536; // 64 KB + +export function createWsHandler(options: WsHandlerOptions): WsHandler { + const { server, validateToken, onTerminalInput } = options; + + const wss = new WebSocketServer({ noServer: true, maxPayload: MAX_PAYLOAD }); + const clients = new Set(); + + function sendData(ws: WebSocket, data: string): boolean { + if (ws.readyState !== WebSocket.OPEN) return false; + try { + ws.send(data); + return true; + } catch { + return false; + } + } + + function decodeRawData(raw: RawData): string { + if (typeof raw === 'string') return raw; + if (Buffer.isBuffer(raw)) return raw.toString('utf-8'); + if (raw instanceof ArrayBuffer) return Buffer.from(raw).toString('utf-8'); + if (Array.isArray(raw)) return Buffer.concat(raw).toString('utf-8'); + return ''; + } + + function rejectUpgrade(socket: Duplex, statusLine: string): void { + if (socket.destroyed) return; + try { + socket.write(`${statusLine}\r\nConnection: close\r\n\r\n`); + } catch { + // ignore write errors on broken sockets + } finally { + socket.destroy(); + } + } + + function sendEvent(client: ClientState, event: ServerEvent): void { + if (!sendData(client.ws, serializeEvent(event))) { + clients.delete(client); + } + } + + function broadcast(event: ServerEvent): void { + const data = serializeEvent(event); + for (const client of clients) { + if (!sendData(client.ws, data)) { + clients.delete(client); + } + } + } + + function handleCommand(client: ClientState, command: ClientCommand): void { + switch (command.type) { + case 'ping': + sendEvent(client, { type: 'pong' }); + break; + + case 'terminal:subscribe': + client.subscribedAgents.add(command.agentId); + break; + + case 'terminal:unsubscribe': + client.subscribedAgents.delete(command.agentId); + break; + + case 'terminal:input': + if (onTerminalInput) { + try { + Promise.resolve(onTerminalInput(command.agentId, command.data)).catch(() => { + sendEvent(client, { + type: 'error', + code: 'TERMINAL_INPUT_FAILED', + message: `Failed to send input to agent ${command.agentId}`, + }); + }); + } catch { + sendEvent(client, { + type: 'error', + code: 'TERMINAL_INPUT_FAILED', + message: `Failed to send input to agent ${command.agentId}`, + }); + } + } + break; + } + } + + function onUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void { + let url: URL; + try { + // The path/query in request.url is all we need; avoid trusting Host header. + url = new URL(request.url ?? '/', 'http://localhost'); + } catch { + rejectUpgrade(socket, 'HTTP/1.1 400 Bad Request'); + return; + } + + if (url.pathname !== '/ws') { + rejectUpgrade(socket, 'HTTP/1.1 404 Not Found'); + return; + } + + const token = url.searchParams.get('token'); + if (!token) { + rejectUpgrade(socket, 'HTTP/1.1 401 Unauthorized'); + return; + } + + Promise.resolve(validateToken(token)) + .then((valid) => { + if (socket.destroyed) return; + if (!valid) { + rejectUpgrade(socket, 'HTTP/1.1 401 Unauthorized'); + return; + } + + try { + wss.handleUpgrade(request, socket, head, (ws) => { + wss.emit('connection', ws, request); + }); + } catch { + rejectUpgrade(socket, 'HTTP/1.1 500 Internal Server Error'); + } + }) + .catch(() => { + rejectUpgrade(socket, 'HTTP/1.1 500 Internal Server Error'); + }); + } + + server.on('upgrade', onUpgrade); + + wss.on('connection', (ws: WebSocket) => { + const client: ClientState = { + ws, + subscribedAgents: new Set(), + }; + clients.add(client); + + ws.on('message', (raw: RawData) => { + const data = decodeRawData(raw); + const command = parseCommand(data); + + if (!command) { + sendEvent(client, { + type: 'error', + code: 'INVALID_COMMAND', + message: 'Could not parse command', + }); + return; + } + + handleCommand(client, command); + }); + + ws.on('close', () => { + clients.delete(client); + }); + + ws.on('error', () => { + clients.delete(client); + }); + }); + + async function close(): Promise { + server.removeListener('upgrade', onUpgrade); + for (const client of clients) { + client.ws.close(1001, 'Server shutting down'); + } + await new Promise((resolve, reject) => { + wss.close((err) => (err ? reject(err) : resolve())); + }); + clients.clear(); + } + + return { wss, clients, broadcast, sendEvent, close }; +}