From 3794c3912c568ce13a2d69d51c3a4c513ba5d073 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 01:04:16 -0600 Subject: [PATCH 1/3] feat: implement terminal streaming with diff algorithm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-agent terminal output streaming with efficient longest-common-suffix diff algorithm that only sends new lines, not the full buffer. - Per-agent subscriptions with lazy initialization (polling starts on first subscriber, stops when last unsubscribes) - 500ms polling interval for tmux pane content via capturePane() - Longest common suffix diff — finds overlap between previous and current buffer snapshots to emit only new lines - Shared timer across clients watching the same agent (single capture per interval regardless of subscriber count) - Auto-cleanup when subscriber count drops to 0 - Error handling for dead/missing panes with subscriber notification - Injectable capture function for testability - 23 tests covering diff algorithm, subscription lifecycle, shared timer, polling behavior, error handling, and cleanup Closes #75 --- src/server/ws/terminal.test.ts | 320 +++++++++++++++++++++++++++++++++ src/server/ws/terminal.ts | 233 ++++++++++++++++++++++++ 2 files changed, 553 insertions(+) create mode 100644 src/server/ws/terminal.test.ts create mode 100644 src/server/ws/terminal.ts diff --git a/src/server/ws/terminal.test.ts b/src/server/ws/terminal.test.ts new file mode 100644 index 0000000..c77024d --- /dev/null +++ b/src/server/ws/terminal.test.ts @@ -0,0 +1,320 @@ +import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; +import { diffLines, TerminalStreamer } from './terminal.js'; +import type { TerminalData, TerminalError } from './terminal.js'; + +// --------------------------------------------------------------------------- +// diffLines — longest common suffix algorithm +// --------------------------------------------------------------------------- + +describe('diffLines', () => { + test('given empty prev, should return all of curr', () => { + const result = diffLines([], ['line1', 'line2']); + expect(result).toEqual(['line1', 'line2']); + }); + + test('given empty curr, should return empty', () => { + const result = diffLines(['line1', 'line2'], []); + expect(result).toEqual([]); + }); + + test('given identical buffers, should return empty', () => { + const lines = ['a', 'b', 'c']; + const result = diffLines(lines, [...lines]); + expect(result).toEqual([]); + }); + + test('given appended lines, should return only new lines', () => { + const prev = ['line1', 'line2']; + const curr = ['line1', 'line2', 'line3', 'line4']; + const result = diffLines(prev, curr); + expect(result).toEqual(['line3', 'line4']); + }); + + test('given scrolled buffer with new lines, should return new lines', () => { + // Terminal scrolled: line1 is gone, lines 2-3 remain, line4 is new + const prev = ['line1', 'line2', 'line3']; + const curr = ['line2', 'line3', 'line4']; + const result = diffLines(prev, curr); + expect(result).toEqual(['line4']); + }); + + test('given completely different content, should return all of curr', () => { + const prev = ['aaa', 'bbb']; + const curr = ['xxx', 'yyy']; + const result = diffLines(prev, curr); + expect(result).toEqual(['xxx', 'yyy']); + }); + + test('given partial overlap in scrolled buffer, should detect suffix match', () => { + const prev = ['a', 'b', 'c', 'd']; + const curr = ['c', 'd', 'e', 'f']; + const result = diffLines(prev, curr); + expect(result).toEqual(['e', 'f']); + }); + + test('given single line overlap, should return new lines after overlap', () => { + const prev = ['x', 'y', 'z']; + const curr = ['z', 'new1', 'new2']; + const result = diffLines(prev, curr); + expect(result).toEqual(['new1', 'new2']); + }); + + test('given prev longer than curr with overlap, should return new lines', () => { + const prev = ['a', 'b', 'c', 'd', 'e']; + const curr = ['d', 'e', 'f']; + const result = diffLines(prev, curr); + expect(result).toEqual(['f']); + }); +}); + +// --------------------------------------------------------------------------- +// TerminalStreamer +// --------------------------------------------------------------------------- + +describe('TerminalStreamer', () => { + let streamer: TerminalStreamer; + let mockCapture: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + mockCapture = vi.fn<(target: string, lines?: number) => Promise>(); + streamer = new TerminalStreamer({ + pollIntervalMs: 500, + capture: mockCapture, + }); + }); + + afterEach(() => { + streamer.destroy(); + vi.useRealTimers(); + }); + + // -- Subscription lifecycle ----------------------------------------------- + + describe('subscription lifecycle', () => { + test('given first subscriber, should start polling', () => { + mockCapture.mockResolvedValue('hello'); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + + expect(streamer.subscriberCount('ag-001')).toBe(1); + expect(streamer.isPolling('ag-001')).toBe(true); + }); + + test('given second subscriber, should share timer', () => { + mockCapture.mockResolvedValue('hello'); + const send1 = vi.fn(); + const send2 = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-001', 'ppg:1.0', send2); + + expect(streamer.subscriberCount('ag-001')).toBe(2); + expect(streamer.isPolling('ag-001')).toBe(true); + }); + + test('given unsubscribe of one, should keep timer for remaining', () => { + mockCapture.mockResolvedValue('hello'); + const send1 = vi.fn(); + const send2 = vi.fn(); + + const unsub1 = streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-001', 'ppg:1.0', send2); + + unsub1(); + + expect(streamer.subscriberCount('ag-001')).toBe(1); + expect(streamer.isPolling('ag-001')).toBe(true); + }); + + test('given all unsubscribed, should stop polling and cleanup', () => { + mockCapture.mockResolvedValue('hello'); + const send = vi.fn(); + + const unsub = streamer.subscribe('ag-001', 'ppg:1.0', send); + unsub(); + + expect(streamer.subscriberCount('ag-001')).toBe(0); + expect(streamer.isPolling('ag-001')).toBe(false); + }); + + test('given multiple agents, should track independently', () => { + mockCapture.mockResolvedValue('hello'); + const send1 = vi.fn(); + const send2 = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-002', 'ppg:1.1', send2); + + expect(streamer.subscriberCount('ag-001')).toBe(1); + expect(streamer.subscriberCount('ag-002')).toBe(1); + expect(streamer.isPolling('ag-001')).toBe(true); + expect(streamer.isPolling('ag-002')).toBe(true); + }); + }); + + // -- Polling & diff ------------------------------------------------------- + + describe('polling and diff', () => { + test('given initial content, should send all lines on first poll', async () => { + mockCapture.mockResolvedValue('line1\nline2\nline3'); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + + await vi.advanceTimersByTimeAsync(500); + + expect(mockCapture).toHaveBeenCalledWith('ppg:1.0'); + expect(send).toHaveBeenCalledTimes(1); + + const msg: TerminalData = JSON.parse(send.mock.calls[0][0]); + expect(msg.type).toBe('terminal'); + expect(msg.agentId).toBe('ag-001'); + expect(msg.lines).toEqual(['line1', 'line2', 'line3']); + }); + + test('given unchanged content, should not send', async () => { + mockCapture.mockResolvedValue('line1\nline2'); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + + await vi.advanceTimersByTimeAsync(500); + expect(send).toHaveBeenCalledTimes(1); + + // Same content on next poll + await vi.advanceTimersByTimeAsync(500); + expect(send).toHaveBeenCalledTimes(1); // No new call + }); + + test('given new lines appended, should send only diff', async () => { + mockCapture.mockResolvedValueOnce('line1\nline2'); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + await vi.advanceTimersByTimeAsync(500); + + // New lines appended + mockCapture.mockResolvedValueOnce('line1\nline2\nline3\nline4'); + await vi.advanceTimersByTimeAsync(500); + + expect(send).toHaveBeenCalledTimes(2); + const msg: TerminalData = JSON.parse(send.mock.calls[1][0]); + expect(msg.lines).toEqual(['line3', 'line4']); + }); + + test('given content broadcast to multiple subscribers, should send to all', async () => { + mockCapture.mockResolvedValue('hello'); + const send1 = vi.fn(); + const send2 = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-001', 'ppg:1.0', send2); + + await vi.advanceTimersByTimeAsync(500); + + expect(send1).toHaveBeenCalledTimes(1); + expect(send2).toHaveBeenCalledTimes(1); + expect(send1.mock.calls[0][0]).toBe(send2.mock.calls[0][0]); + }); + + test('given 500ms interval, should not poll before interval', async () => { + mockCapture.mockResolvedValue('hello'); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + + await vi.advanceTimersByTimeAsync(200); + expect(mockCapture).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(300); + expect(mockCapture).toHaveBeenCalledTimes(1); + }); + }); + + // -- Error handling ------------------------------------------------------- + + describe('error handling', () => { + test('given pane capture fails, should send error and cleanup', async () => { + mockCapture.mockRejectedValue(new Error('pane not found')); + const send = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send); + + await vi.advanceTimersByTimeAsync(500); + + expect(send).toHaveBeenCalledTimes(1); + const msg: TerminalError = JSON.parse(send.mock.calls[0][0]); + expect(msg.type).toBe('terminal:error'); + expect(msg.agentId).toBe('ag-001'); + expect(msg.error).toBe('Pane no longer available'); + + // Stream should be cleaned up + expect(streamer.subscriberCount('ag-001')).toBe(0); + expect(streamer.isPolling('ag-001')).toBe(false); + }); + + test('given dead subscriber send throws, should remove subscriber', async () => { + mockCapture.mockResolvedValue('line1'); + const goodSend = vi.fn(); + const badSend = vi.fn().mockImplementation(() => { + throw new Error('connection closed'); + }); + + streamer.subscribe('ag-001', 'ppg:1.0', badSend); + streamer.subscribe('ag-001', 'ppg:1.0', goodSend); + + await vi.advanceTimersByTimeAsync(500); + + // Good subscriber got the message + expect(goodSend).toHaveBeenCalledTimes(1); + // Bad subscriber was removed + expect(streamer.subscriberCount('ag-001')).toBe(1); + }); + }); + + // -- Shared timer --------------------------------------------------------- + + describe('shared timer', () => { + test('given shared timer, should only call capture once per interval', async () => { + mockCapture.mockResolvedValue('data'); + const send1 = vi.fn(); + const send2 = vi.fn(); + const send3 = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-001', 'ppg:1.0', send2); + streamer.subscribe('ag-001', 'ppg:1.0', send3); + + await vi.advanceTimersByTimeAsync(500); + + // Only one capture call despite three subscribers + expect(mockCapture).toHaveBeenCalledTimes(1); + }); + }); + + // -- destroy -------------------------------------------------------------- + + describe('destroy', () => { + test('given active streams, should clean up everything', async () => { + mockCapture.mockResolvedValue('data'); + const send1 = vi.fn(); + const send2 = vi.fn(); + + streamer.subscribe('ag-001', 'ppg:1.0', send1); + streamer.subscribe('ag-002', 'ppg:1.1', send2); + + streamer.destroy(); + + expect(streamer.subscriberCount('ag-001')).toBe(0); + expect(streamer.subscriberCount('ag-002')).toBe(0); + expect(streamer.isPolling('ag-001')).toBe(false); + expect(streamer.isPolling('ag-002')).toBe(false); + + // No more polling after destroy + await vi.advanceTimersByTimeAsync(1000); + expect(mockCapture).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/server/ws/terminal.ts b/src/server/ws/terminal.ts new file mode 100644 index 0000000..d2e4b84 --- /dev/null +++ b/src/server/ws/terminal.ts @@ -0,0 +1,233 @@ +import { capturePane } from '../../core/tmux.js'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** A function that sends a message to a connected client. */ +export type SendFn = (message: string) => void; + +/** Wire format for terminal data pushed to subscribers. */ +export interface TerminalData { + type: 'terminal'; + agentId: string; + lines: string[]; +} + +/** Wire format for terminal errors pushed to subscribers. */ +export interface TerminalError { + type: 'terminal:error'; + agentId: string; + error: string; +} + +/** Internal state for a single subscriber. */ +interface Subscriber { + id: number; + send: SendFn; +} + +/** Shared polling state for all subscribers watching the same agent. */ +interface AgentStream { + tmuxTarget: string; + subscribers: Map; + timer: ReturnType | null; + /** Previous captured lines, used by the diff algorithm. */ + lastLines: string[]; +} + +// --------------------------------------------------------------------------- +// Diff algorithm — longest common suffix +// --------------------------------------------------------------------------- + +/** + * Given the previous set of lines and the current set, return only the new + * lines that were appended to the terminal buffer. + * + * Strategy: find the longest suffix of `prev` that is also a prefix of `curr`. + * Everything in `curr` after that shared region is new output. + * + * This handles the common terminal pattern where existing content scrolls up + * and new content appears at the bottom. It degrades gracefully when content + * is rewritten (e.g. TUI redraw) — in that case the full buffer is sent. + */ +export function diffLines(prev: string[], curr: string[]): string[] { + if (prev.length === 0) return curr; + if (curr.length === 0) return []; + + // Find the longest suffix of prev that matches a prefix of curr. + // We search from the longest possible overlap downward. + const maxOverlap = Math.min(prev.length, curr.length); + + for (let overlap = maxOverlap; overlap > 0; overlap--) { + const prevStart = prev.length - overlap; + let match = true; + for (let i = 0; i < overlap; i++) { + if (prev[prevStart + i] !== curr[i]) { + match = false; + break; + } + } + if (match) { + return curr.slice(overlap); + } + } + + // No shared suffix/prefix — full content is "new" + return curr; +} + +// --------------------------------------------------------------------------- +// TerminalStreamer — manages per-agent subscriptions and shared polling +// --------------------------------------------------------------------------- + +const POLL_INTERVAL_MS = 500; + +export class TerminalStreamer { + private streams = new Map(); + private nextSubscriberId = 1; + private readonly pollIntervalMs: number; + /** Injectable capture function — defaults to tmux capturePane. */ + private readonly capture: (target: string, lines?: number) => Promise; + + constructor(options?: { + pollIntervalMs?: number; + capture?: (target: string, lines?: number) => Promise; + }) { + this.pollIntervalMs = options?.pollIntervalMs ?? POLL_INTERVAL_MS; + this.capture = options?.capture ?? capturePane; + } + + /** + * Subscribe a client to terminal output for an agent. + * Returns an unsubscribe function. + */ + subscribe( + agentId: string, + tmuxTarget: string, + send: SendFn, + ): () => void { + const subId = this.nextSubscriberId++; + + let stream = this.streams.get(agentId); + if (!stream) { + stream = { + tmuxTarget, + subscribers: new Map(), + timer: null, + lastLines: [], + }; + this.streams.set(agentId, stream); + } + + stream.subscribers.set(subId, { id: subId, send }); + + // Lazy init: start polling only when the first subscriber arrives + if (stream.timer === null) { + this.startPolling(agentId, stream); + } + + // Return unsubscribe function + return () => { + this.unsubscribe(agentId, subId); + }; + } + + /** Number of active subscribers for an agent. */ + subscriberCount(agentId: string): number { + return this.streams.get(agentId)?.subscribers.size ?? 0; + } + + /** Whether a polling timer is active for an agent. */ + isPolling(agentId: string): boolean { + return this.streams.get(agentId)?.timer != null; + } + + /** Tear down all streams and timers. */ + destroy(): void { + for (const [agentId, stream] of this.streams) { + if (stream.timer !== null) { + clearInterval(stream.timer); + stream.timer = null; + } + stream.subscribers.clear(); + } + this.streams.clear(); + } + + // ----------------------------------------------------------------------- + // Private + // ----------------------------------------------------------------------- + + private unsubscribe(agentId: string, subId: number): void { + const stream = this.streams.get(agentId); + if (!stream) return; + + stream.subscribers.delete(subId); + + // Auto-cleanup: stop polling when no subscribers remain + if (stream.subscribers.size === 0) { + if (stream.timer !== null) { + clearInterval(stream.timer); + stream.timer = null; + } + this.streams.delete(agentId); + } + } + + private startPolling(agentId: string, stream: AgentStream): void { + stream.timer = setInterval(() => { + void this.poll(agentId, stream); + }, this.pollIntervalMs); + } + + private async poll(agentId: string, stream: AgentStream): Promise { + try { + const raw = await this.capture(stream.tmuxTarget); + const currentLines = raw.split('\n'); + + const newLines = diffLines(stream.lastLines, currentLines); + stream.lastLines = currentLines; + + if (newLines.length === 0) return; + + const message = JSON.stringify({ + type: 'terminal', + agentId, + lines: newLines, + } satisfies TerminalData); + + for (const sub of stream.subscribers.values()) { + try { + sub.send(message); + } catch { + // Dead client — remove on next tick + stream.subscribers.delete(sub.id); + } + } + } catch { + // Pane gone / tmux error — notify subscribers and clean up + const errorMsg = JSON.stringify({ + type: 'terminal:error', + agentId, + error: 'Pane no longer available', + } satisfies TerminalError); + + for (const sub of stream.subscribers.values()) { + try { + sub.send(errorMsg); + } catch { + // ignore + } + } + + // Stop polling — pane is dead + if (stream.timer !== null) { + clearInterval(stream.timer); + stream.timer = null; + } + stream.subscribers.clear(); + this.streams.delete(agentId); + } + } +} From 82e39cbd74c887b8a9b07c53a0b3f269b06a6e81 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 07:54:00 -0600 Subject: [PATCH 2/3] fix: address review findings for terminal streaming - Switch from setInterval to chained setTimeout to prevent concurrent poll races when capturePane takes longer than the poll interval - Replace loose equality (!=) with strict equality (===) in isPolling - Remove unused loop variable in destroy() - Log original error in catch block for debugging (console.error) - Add test for double-unsubscribe idempotency - Add test for trailing empty lines from tmux capturePane output - Verify error logging in pane-failure test --- src/server/ws/terminal.test.ts | 27 +++++++++++++++ src/server/ws/terminal.ts | 61 +++++++++++++++++++--------------- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/src/server/ws/terminal.test.ts b/src/server/ws/terminal.test.ts index c77024d..125e022 100644 --- a/src/server/ws/terminal.test.ts +++ b/src/server/ws/terminal.test.ts @@ -65,6 +65,14 @@ describe('diffLines', () => { const result = diffLines(prev, curr); expect(result).toEqual(['f']); }); + + test('given trailing empty lines from tmux, should handle correctly', () => { + // capturePane often returns "line1\nline2\n" → split gives trailing '' + const prev = ['line1', 'line2', '']; + const curr = ['line1', 'line2', '', 'line3', '']; + const result = diffLines(prev, curr); + expect(result).toEqual(['line3', '']); + }); }); // --------------------------------------------------------------------------- @@ -139,6 +147,18 @@ describe('TerminalStreamer', () => { expect(streamer.isPolling('ag-001')).toBe(false); }); + test('given double unsubscribe, should be idempotent', () => { + mockCapture.mockResolvedValue('hello'); + const send = vi.fn(); + + const unsub = streamer.subscribe('ag-001', 'ppg:1.0', send); + unsub(); + unsub(); // second call should not throw + + expect(streamer.subscriberCount('ag-001')).toBe(0); + expect(streamer.isPolling('ag-001')).toBe(false); + }); + test('given multiple agents, should track independently', () => { mockCapture.mockResolvedValue('hello'); const send1 = vi.fn(); @@ -237,6 +257,7 @@ describe('TerminalStreamer', () => { describe('error handling', () => { test('given pane capture fails, should send error and cleanup', async () => { + const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); mockCapture.mockRejectedValue(new Error('pane not found')); const send = vi.fn(); @@ -250,9 +271,15 @@ describe('TerminalStreamer', () => { expect(msg.agentId).toBe('ag-001'); expect(msg.error).toBe('Pane no longer available'); + // Original error should be logged + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining('pane not found'), + ); + // Stream should be cleaned up expect(streamer.subscriberCount('ag-001')).toBe(0); expect(streamer.isPolling('ag-001')).toBe(false); + consoleSpy.mockRestore(); }); test('given dead subscriber send throws, should remove subscriber', async () => { diff --git a/src/server/ws/terminal.ts b/src/server/ws/terminal.ts index d2e4b84..1d9defd 100644 --- a/src/server/ws/terminal.ts +++ b/src/server/ws/terminal.ts @@ -31,7 +31,7 @@ interface Subscriber { interface AgentStream { tmuxTarget: string; subscribers: Map; - timer: ReturnType | null; + timer: ReturnType | null; /** Previous captured lines, used by the diff algorithm. */ lastLines: string[]; } @@ -124,7 +124,7 @@ export class TerminalStreamer { // Lazy init: start polling only when the first subscriber arrives if (stream.timer === null) { - this.startPolling(agentId, stream); + this.scheduleNextPoll(agentId, stream); } // Return unsubscribe function @@ -140,14 +140,15 @@ export class TerminalStreamer { /** Whether a polling timer is active for an agent. */ isPolling(agentId: string): boolean { - return this.streams.get(agentId)?.timer != null; + const stream = this.streams.get(agentId); + return stream !== undefined && stream.timer !== null; } /** Tear down all streams and timers. */ destroy(): void { - for (const [agentId, stream] of this.streams) { + for (const stream of this.streams.values()) { if (stream.timer !== null) { - clearInterval(stream.timer); + clearTimeout(stream.timer); stream.timer = null; } stream.subscribers.clear(); @@ -168,15 +169,15 @@ export class TerminalStreamer { // Auto-cleanup: stop polling when no subscribers remain if (stream.subscribers.size === 0) { if (stream.timer !== null) { - clearInterval(stream.timer); + clearTimeout(stream.timer); stream.timer = null; } this.streams.delete(agentId); } } - private startPolling(agentId: string, stream: AgentStream): void { - stream.timer = setInterval(() => { + private scheduleNextPoll(agentId: string, stream: AgentStream): void { + stream.timer = setTimeout(() => { void this.poll(agentId, stream); }, this.pollIntervalMs); } @@ -189,23 +190,28 @@ export class TerminalStreamer { const newLines = diffLines(stream.lastLines, currentLines); stream.lastLines = currentLines; - if (newLines.length === 0) return; - - const message = JSON.stringify({ - type: 'terminal', - agentId, - lines: newLines, - } satisfies TerminalData); - - for (const sub of stream.subscribers.values()) { - try { - sub.send(message); - } catch { - // Dead client — remove on next tick - stream.subscribers.delete(sub.id); + if (newLines.length > 0) { + const message = JSON.stringify({ + type: 'terminal', + agentId, + lines: newLines, + } satisfies TerminalData); + + for (const sub of stream.subscribers.values()) { + try { + sub.send(message); + } catch { + // Dead client — remove immediately + stream.subscribers.delete(sub.id); + } } } - } catch { + + // Schedule next poll only after this one completes + if (stream.subscribers.size > 0) { + this.scheduleNextPoll(agentId, stream); + } + } catch (err) { // Pane gone / tmux error — notify subscribers and clean up const errorMsg = JSON.stringify({ type: 'terminal:error', @@ -213,6 +219,10 @@ export class TerminalStreamer { error: 'Pane no longer available', } satisfies TerminalError); + if (err instanceof Error) { + console.error(`[ppg] terminal poll failed for ${agentId}: ${err.message}`); + } + for (const sub of stream.subscribers.values()) { try { sub.send(errorMsg); @@ -222,10 +232,7 @@ export class TerminalStreamer { } // Stop polling — pane is dead - if (stream.timer !== null) { - clearInterval(stream.timer); - stream.timer = null; - } + stream.timer = null; stream.subscribers.clear(); this.streams.delete(agentId); } From 9c95be1f48c3e3242d937fe1b7227786702ae35c Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 08:34:38 -0600 Subject: [PATCH 3/3] test: fix manifest typing in spawn test --- src/commands/spawn.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..12ecbc8 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -7,6 +7,7 @@ import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; import * as tmux from '../core/tmux.js'; +import type { Manifest } from '../types/manifest.js'; vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises'); @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', @@ -103,7 +104,7 @@ function createManifest(tmuxWindow = '') { } describe('spawnCommand', () => { - let manifestState = createManifest(); + let manifestState: Manifest = createManifest(); let nextAgent = 1; let nextSession = 1;