diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..3b1b54f 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -7,6 +7,7 @@ import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; import * as tmux from '../core/tmux.js'; +import type { Manifest } from '../types/manifest.js'; vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises'); @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', diff --git a/src/server/ws/watcher.test.ts b/src/server/ws/watcher.test.ts new file mode 100644 index 0000000..f246ddc --- /dev/null +++ b/src/server/ws/watcher.test.ts @@ -0,0 +1,391 @@ +import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; +import { makeAgent, makeManifest, makeWorktree } from '../../test-fixtures.js'; +import type { WsEvent } from './watcher.js'; + +// Mock fs (synchronous watch API) +vi.mock('node:fs', () => ({ + default: { + watch: vi.fn((_path: string, _cb: (...args: unknown[]) => void) => ({ + on: vi.fn(), + close: vi.fn(), + })), + }, +})); + +// Mock core modules +vi.mock('../../core/manifest.js', () => ({ + readManifest: vi.fn(), +})); + +vi.mock('../../core/agent.js', () => ({ + checkAgentStatus: vi.fn(), +})); + +vi.mock('../../core/tmux.js', () => ({ + listSessionPanes: vi.fn(), +})); + +vi.mock('../../lib/paths.js', () => ({ + manifestPath: vi.fn(() => '/tmp/project/.ppg/manifest.json'), + ppgDir: vi.fn(() => '/tmp/project/.ppg'), +})); + +import nodefs from 'node:fs'; +import { readManifest } from '../../core/manifest.js'; +import { checkAgentStatus } from '../../core/agent.js'; +import { listSessionPanes } from '../../core/tmux.js'; +import { startManifestWatcher } from './watcher.js'; + +const mockedReadManifest = vi.mocked(readManifest); +const mockedCheckAgentStatus = vi.mocked(checkAgentStatus); +const mockedListSessionPanes = vi.mocked(listSessionPanes); +const mockedFsWatch = vi.mocked(nodefs.watch); + +const PROJECT_ROOT = '/tmp/project'; + +/** Trigger the most recent fs.watch callback (simulates file change) */ +function triggerFsWatch(): void { + const calls = mockedFsWatch.mock.calls; + if (calls.length > 0) { + const cb = calls[calls.length - 1][1] as () => void; + cb(); + } +} + +beforeEach(() => { + vi.useFakeTimers(); + vi.clearAllMocks(); + mockedListSessionPanes.mockResolvedValue(new Map()); +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe('startManifestWatcher', () => { + describe('fs.watch debounce', () => { + test('given file change, should broadcast manifest:updated after debounce', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, // effectively disable polling for this test + }); + + triggerFsWatch(); + + // Before debounce fires — no event yet + expect(events).toHaveLength(0); + + // Advance past debounce + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(1); + expect(events[0].type).toBe('manifest:updated'); + expect(events[0].payload).toEqual(manifest); + + watcher.stop(); + }); + + test('given rapid file changes, should debounce to single broadcast', async () => { + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); + mockedReadManifest.mockResolvedValue(manifest); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, + }); + + // Three rapid changes + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(100); + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(100); + triggerFsWatch(); + + // Advance past debounce from last trigger + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(1); + expect(events[0].type).toBe('manifest:updated'); + + watcher.stop(); + }); + + test('given manifest read error during file change, should not broadcast', async () => { + mockedReadManifest.mockRejectedValue(new SyntaxError('Unexpected end of JSON')); + + const errors: unknown[] = []; + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, + onError: (err) => errors.push(err), + }); + + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SyntaxError); + + watcher.stop(); + }); + }); + + describe('status polling', () => { + test('given agent status change, should broadcast agent:status', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + + // First poll: running, second poll: idle + mockedCheckAgentStatus + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'idle' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — establishes baseline, no change event + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — status changed from running → idle + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + type: 'agent:status', + payload: { + agentId: 'ag-aaa11111', + worktreeId: 'wt-abc123', + status: 'idle', + previousStatus: 'running', + }, + }); + + watcher.stop(); + }); + + test('given multiple agents across worktrees, should broadcast each change', async () => { + const agent1 = makeAgent({ id: 'ag-aaa11111', status: 'running', tmuxTarget: 'ppg:1.0' }); + const agent2 = makeAgent({ id: 'ag-bbb22222', status: 'running', tmuxTarget: 'ppg:2.0' }); + const wt1 = makeWorktree({ id: 'wt-aaa111', name: 'auth', agents: { [agent1.id]: agent1 } }); + const wt2 = makeWorktree({ id: 'wt-bbb222', name: 'api', agents: { [agent2.id]: agent2 } }); + const manifest = makeManifest({ + projectRoot: PROJECT_ROOT, + worktrees: { [wt1.id]: wt1, [wt2.id]: wt2 }, + }); + mockedReadManifest.mockResolvedValue(manifest); + + // First poll: both running. Second poll: agent1 idle, agent2 gone + mockedCheckAgentStatus + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'idle' }) + .mockResolvedValueOnce({ status: 'gone' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — baseline + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — both changed + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(2); + + const statusEvents = events.filter((e) => e.type === 'agent:status'); + expect(statusEvents).toHaveLength(2); + + const payloads = statusEvents.map((e) => e.payload); + expect(payloads).toContainEqual({ + agentId: 'ag-aaa11111', + worktreeId: 'wt-aaa111', + status: 'idle', + previousStatus: 'running', + }); + expect(payloads).toContainEqual({ + agentId: 'ag-bbb22222', + worktreeId: 'wt-bbb222', + status: 'gone', + previousStatus: 'running', + }); + + watcher.stop(); + }); + + test('given agent removed between polls, should not emit stale event', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifestWithAgent = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + const manifestEmpty = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: {} }); + + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + // First poll sees agent, second poll agent's worktree is gone + mockedReadManifest + .mockResolvedValueOnce(manifestWithAgent) + .mockResolvedValueOnce(manifestEmpty); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — baseline with agent + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — agent gone from manifest, no stale event emitted + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + watcher.stop(); + }); + + test('given no status change, should not broadcast', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // Two polls — same status each time + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + + expect(events).toHaveLength(0); + + watcher.stop(); + }); + + test('given manifest read failure during poll, should skip cycle and report error', async () => { + const readError = new Error('ENOENT'); + mockedReadManifest.mockRejectedValue(readError); + + const errors: unknown[] = []; + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + onError: (err) => errors.push(err), + }); + + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(readError); + + watcher.stop(); + }); + + test('given tmux unavailable during poll, should skip cycle and report error', async () => { + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); + mockedReadManifest.mockResolvedValue(manifest); + const tmuxError = new Error('tmux not found'); + mockedListSessionPanes.mockRejectedValue(tmuxError); + + const errors: unknown[] = []; + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + onError: (err) => errors.push(err), + }); + + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(tmuxError); + + watcher.stop(); + }); + }); + + describe('overlap guard', () => { + test('given slow poll, should skip overlapping tick', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + + // readManifest takes 1500ms on first call (longer than pollInterval) + let callCount = 0; + mockedReadManifest.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return new Promise((resolve) => setTimeout(() => resolve(manifest), 1500)); + } + return Promise.resolve(manifest); + }); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First tick at 1000ms starts a slow poll + await vi.advanceTimersByTimeAsync(1000); + // Second tick at 2000ms — poll still running, should be skipped + await vi.advanceTimersByTimeAsync(1000); + // Finish slow poll at 2500ms + await vi.advanceTimersByTimeAsync(500); + + // readManifest called once for the slow poll, second tick was skipped + expect(callCount).toBe(1); + + watcher.stop(); + }); + }); + + describe('cleanup', () => { + test('stop should clear all timers and close watcher', async () => { + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); + mockedReadManifest.mockResolvedValue(manifest); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + watcher.stop(); + + // Trigger fs.watch and advance timers — nothing should fire + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(5000); + + expect(events).toHaveLength(0); + + // Verify fs.watch close was called + const watchResults = mockedFsWatch.mock.results; + expect(watchResults.length).toBeGreaterThan(0); + const fsWatcher = watchResults[0].value as { close: ReturnType }; + expect(fsWatcher.close).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/server/ws/watcher.ts b/src/server/ws/watcher.ts new file mode 100644 index 0000000..7e149dd --- /dev/null +++ b/src/server/ws/watcher.ts @@ -0,0 +1,172 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { readManifest } from '../../core/manifest.js'; +import { checkAgentStatus } from '../../core/agent.js'; +import { listSessionPanes, type PaneInfo } from '../../core/tmux.js'; +import { manifestPath, ppgDir } from '../../lib/paths.js'; +import type { AgentStatus, Manifest } from '../../types/manifest.js'; + +export type WsEvent = + | { type: 'manifest:updated'; payload: Manifest } + | { type: 'agent:status'; payload: { agentId: string; worktreeId: string; status: AgentStatus; previousStatus: AgentStatus } }; + +export type BroadcastFn = (event: WsEvent) => void; + +export type ErrorFn = (error: unknown) => void; + +export interface ManifestWatcher { + stop(): void; +} + +/** + * Start watching manifest.json for changes and polling agent statuses. + * + * Two sources of change: + * 1. `fs.watch` on manifest.json — fires `manifest:updated` (debounced 300ms) + * 2. Status poll at `pollIntervalMs` — fires `agent:status` per changed agent + * + * Note: `manifest:updated` and `agent:status` are independent streams. + * A file change that adds/removes agents won't produce `agent:status` events + * until the next poll cycle. Consumers needing immediate agent awareness + * should derive it from the `manifest:updated` payload. + * + * The watcher must start after `ppg init` — if manifest.json doesn't exist + * at startup, the parent directory is watched and the file watcher is + * established once the manifest appears. + */ +export function startManifestWatcher( + projectRoot: string, + broadcast: BroadcastFn, + options?: { debounceMs?: number; pollIntervalMs?: number; onError?: ErrorFn }, +): ManifestWatcher { + const debounceMs = options?.debounceMs ?? 300; + const pollIntervalMs = options?.pollIntervalMs ?? 3000; + const onError = options?.onError; + + let debounceTimer: ReturnType | null = null; + let previousStatuses = new Map(); + let polling = false; + let stopped = false; + + // --- fs.watch on manifest.json (with directory fallback) --- + const mPath = manifestPath(projectRoot); + let fileWatcher: fs.FSWatcher | null = null; + let dirWatcher: fs.FSWatcher | null = null; + + function onFsChange(): void { + if (stopped) return; + if (debounceTimer) clearTimeout(debounceTimer); + debounceTimer = setTimeout(() => { + if (stopped) return; + onManifestFileChange().catch((err) => onError?.(err)); + }, debounceMs); + } + + function watchManifestFile(): boolean { + try { + fileWatcher = fs.watch(mPath, onFsChange); + fileWatcher.on('error', () => {}); + return true; + } catch { + return false; + } + } + + // Try to watch manifest directly; fall back to watching .ppg/ directory + if (!watchManifestFile()) { + try { + const dir = ppgDir(projectRoot); + dirWatcher = fs.watch(dir, (_event, filename) => { + if (filename === path.basename(mPath) && !fileWatcher) { + if (watchManifestFile()) { + dirWatcher?.close(); + dirWatcher = null; + } + onFsChange(); + } + }); + dirWatcher.on('error', () => {}); + } catch { + // .ppg/ doesn't exist yet either — polling still works + } + } + + async function onManifestFileChange(): Promise { + try { + const manifest = await readManifest(projectRoot); + broadcast({ type: 'manifest:updated', payload: manifest }); + } catch (err) { + onError?.(err); + } + } + + // --- Status polling --- + const pollTimer = setInterval(() => { + if (stopped) return; + pollStatuses().catch((err) => onError?.(err)); + }, pollIntervalMs); + + async function pollStatuses(): Promise { + if (polling) return; + polling = true; + try { + let manifest: Manifest; + try { + manifest = await readManifest(projectRoot); + } catch (err) { + onError?.(err); + return; + } + + let paneMap: Map; + try { + paneMap = await listSessionPanes(manifest.sessionName); + } catch (err) { + onError?.(err); + return; + } + + // Collect all agents with their worktree context + const agents = Object.values(manifest.worktrees).flatMap((wt) => + Object.values(wt.agents).map((agent) => ({ agent, worktreeId: wt.id })), + ); + + // Check statuses in parallel (checkAgentStatus does no I/O when paneMap is provided) + const results = await Promise.all( + agents.map(({ agent }) => + checkAgentStatus(agent, projectRoot, paneMap).catch(() => null), + ), + ); + + const nextStatuses = new Map(); + for (let i = 0; i < agents.length; i++) { + const result = results[i]; + if (!result) continue; + + const { agent, worktreeId } = agents[i]; + nextStatuses.set(agent.id, result.status); + + const prev = previousStatuses.get(agent.id); + if (prev !== undefined && prev !== result.status) { + broadcast({ + type: 'agent:status', + payload: { agentId: agent.id, worktreeId, status: result.status, previousStatus: prev }, + }); + } + } + previousStatuses = nextStatuses; + } finally { + polling = false; + } + } + + return { + stop() { + stopped = true; + if (debounceTimer) clearTimeout(debounceTimer); + if (fileWatcher) fileWatcher.close(); + if (dirWatcher) dirWatcher.close(); + clearInterval(pollTimer); + }, + }; +} diff --git a/src/test-fixtures.ts b/src/test-fixtures.ts index 3a4c12b..38c7c4b 100644 --- a/src/test-fixtures.ts +++ b/src/test-fixtures.ts @@ -1,4 +1,4 @@ -import type { AgentEntry, WorktreeEntry } from './types/manifest.js'; +import type { AgentEntry, Manifest, WorktreeEntry } from './types/manifest.js'; import type { PaneInfo } from './core/tmux.js'; export function makeAgent(overrides?: Partial): AgentEntry { @@ -38,3 +38,15 @@ export function makePaneInfo(overrides?: Partial): PaneInfo { ...overrides, }; } + +export function makeManifest(overrides?: Partial): Manifest { + return { + version: 1, + projectRoot: '/tmp/project', + sessionName: 'ppg', + worktrees: {}, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +}