From 928e0a7a29e33a0a1dc707b6c0a71356d0e4867d Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 01:01:57 -0600 Subject: [PATCH 1/3] feat: implement manifest watcher with dual-source change detection Add fs.watch on manifest.json (300ms debounce) and status polling (3s interval) that broadcast events to connected WebSocket clients. Tracks previous agent statuses to emit granular agent:status change events. Gracefully handles in-flight writes and tmux unavailability. Closes #74 --- src/server/ws/watcher.test.ts | 272 ++++++++++++++++++++++++++++++++++ src/server/ws/watcher.ts | 119 +++++++++++++++ 2 files changed, 391 insertions(+) create mode 100644 src/server/ws/watcher.test.ts create mode 100644 src/server/ws/watcher.ts diff --git a/src/server/ws/watcher.test.ts b/src/server/ws/watcher.test.ts new file mode 100644 index 0000000..28289d0 --- /dev/null +++ b/src/server/ws/watcher.test.ts @@ -0,0 +1,272 @@ +import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; +import { makeAgent, makeWorktree } from '../../test-fixtures.js'; +import type { WsEvent } from './watcher.js'; +import type { Manifest } from '../../types/manifest.js'; + +// Mock fs (synchronous watch API) +vi.mock('node:fs', () => ({ + default: { + watch: vi.fn((_path: string, _cb: () => void) => ({ + on: vi.fn(), + close: vi.fn(), + })), + }, +})); + +// Mock core modules +vi.mock('../../core/manifest.js', () => ({ + readManifest: vi.fn(), +})); + +vi.mock('../../core/agent.js', () => ({ + checkAgentStatus: vi.fn(), +})); + +vi.mock('../../core/tmux.js', () => ({ + listSessionPanes: vi.fn(), +})); + +vi.mock('../../lib/paths.js', () => ({ + manifestPath: vi.fn(() => '/tmp/project/.ppg/manifest.json'), +})); + +import nodefs from 'node:fs'; +import { readManifest } from '../../core/manifest.js'; +import { checkAgentStatus } from '../../core/agent.js'; +import { listSessionPanes } from '../../core/tmux.js'; +import { startManifestWatcher } from './watcher.js'; + +const mockedReadManifest = vi.mocked(readManifest); +const mockedCheckAgentStatus = vi.mocked(checkAgentStatus); +const mockedListSessionPanes = vi.mocked(listSessionPanes); +const mockedFsWatch = vi.mocked(nodefs.watch); + +const PROJECT_ROOT = '/tmp/project'; + +function makeManifest(overrides?: Partial): Manifest { + return { + version: 1, + projectRoot: PROJECT_ROOT, + sessionName: 'ppg', + worktrees: {}, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +} + +/** Trigger the most recent fs.watch callback (simulates file change) */ +function triggerFsWatch(): void { + const calls = mockedFsWatch.mock.calls; + if (calls.length > 0) { + const cb = calls[calls.length - 1][1] as () => void; + cb(); + } +} + +beforeEach(() => { + vi.useFakeTimers(); + vi.clearAllMocks(); + mockedListSessionPanes.mockResolvedValue(new Map()); +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe('startManifestWatcher', () => { + describe('fs.watch debounce', () => { + test('given file change, should broadcast manifest:updated after debounce', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, // effectively disable polling for this test + }); + + // Trigger fs.watch callback + triggerFsWatch(); + + // Before debounce fires — no event yet + expect(events).toHaveLength(0); + + // Advance past debounce + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(1); + expect(events[0].type).toBe('manifest:updated'); + expect(events[0].payload).toEqual(manifest); + + watcher.stop(); + }); + + test('given rapid file changes, should debounce to single broadcast', async () => { + const manifest = makeManifest(); + mockedReadManifest.mockResolvedValue(manifest); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, + }); + + // Three rapid changes + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(100); + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(100); + triggerFsWatch(); + + // Advance past debounce from last trigger + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(1); + expect(events[0].type).toBe('manifest:updated'); + + watcher.stop(); + }); + + test('given manifest read error during file change, should not broadcast', async () => { + mockedReadManifest.mockRejectedValue(new SyntaxError('Unexpected end of JSON')); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 60_000, + }); + + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(350); + + expect(events).toHaveLength(0); + + watcher.stop(); + }); + }); + + describe('status polling', () => { + test('given agent status change, should broadcast agent:status', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + + // First poll: running, second poll: idle + mockedCheckAgentStatus + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'idle' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — establishes baseline, no change event + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — status changed from running → idle + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + type: 'agent:status', + payload: { + agentId: 'ag-aaa11111', + worktreeId: 'wt-abc123', + status: 'idle', + previousStatus: 'running', + }, + }); + + watcher.stop(); + }); + + test('given no status change, should not broadcast', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + mockedReadManifest.mockResolvedValue(manifest); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // Two polls — same status each time + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + + expect(events).toHaveLength(0); + + watcher.stop(); + }); + + test('given manifest read failure during poll, should skip cycle', async () => { + mockedReadManifest.mockRejectedValue(new Error('ENOENT')); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + watcher.stop(); + }); + + test('given tmux unavailable during poll, should skip cycle', async () => { + const manifest = makeManifest(); + mockedReadManifest.mockResolvedValue(manifest); + mockedListSessionPanes.mockRejectedValue(new Error('tmux not found')); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + watcher.stop(); + }); + }); + + describe('cleanup', () => { + test('stop should clear all timers and close watcher', async () => { + const manifest = makeManifest(); + mockedReadManifest.mockResolvedValue(manifest); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + watcher.stop(); + + // Trigger fs.watch and advance timers — nothing should fire + triggerFsWatch(); + await vi.advanceTimersByTimeAsync(5000); + + expect(events).toHaveLength(0); + + // Verify fs.watch close was called + const watchResults = mockedFsWatch.mock.results; + if (watchResults.length > 0) { + const fsWatcher = watchResults[0].value as { close: ReturnType }; + expect(fsWatcher.close).toHaveBeenCalled(); + } + }); + }); +}); diff --git a/src/server/ws/watcher.ts b/src/server/ws/watcher.ts new file mode 100644 index 0000000..7d3fc5a --- /dev/null +++ b/src/server/ws/watcher.ts @@ -0,0 +1,119 @@ +import fs from 'node:fs'; +import { readManifest } from '../../core/manifest.js'; +import { checkAgentStatus } from '../../core/agent.js'; +import { listSessionPanes } from '../../core/tmux.js'; +import { manifestPath } from '../../lib/paths.js'; +import type { AgentStatus, Manifest } from '../../types/manifest.js'; + +export interface WsEvent { + type: 'manifest:updated' | 'agent:status'; + payload: unknown; +} + +export type BroadcastFn = (event: WsEvent) => void; + +export interface ManifestWatcher { + stop(): void; +} + +/** + * Start watching manifest.json for changes and polling agent statuses. + * + * Two sources of change: + * 1. `fs.watch` on manifest.json — fires `manifest:updated` (debounced 300ms) + * 2. Status poll at `pollIntervalMs` — fires `agent:status` per changed agent + */ +export function startManifestWatcher( + projectRoot: string, + broadcast: BroadcastFn, + options?: { debounceMs?: number; pollIntervalMs?: number }, +): ManifestWatcher { + const debounceMs = options?.debounceMs ?? 300; + const pollIntervalMs = options?.pollIntervalMs ?? 3000; + + let debounceTimer: ReturnType | null = null; + let previousStatuses = new Map(); + let stopped = false; + + // --- fs.watch on manifest.json --- + const mPath = manifestPath(projectRoot); + let watcher: fs.FSWatcher | null = null; + try { + watcher = fs.watch(mPath, () => { + if (stopped) return; + if (debounceTimer) clearTimeout(debounceTimer); + debounceTimer = setTimeout(() => { + if (stopped) return; + onManifestFileChange(); + }, debounceMs); + }); + watcher.on('error', () => { + // File may be deleted or inaccessible — silently ignore + }); + } catch { + // manifest.json may not exist yet — that's OK + } + + async function onManifestFileChange(): Promise { + try { + const manifest = await readManifest(projectRoot); + broadcast({ type: 'manifest:updated', payload: manifest }); + } catch { + // In-flight write or corrupted JSON — skip this cycle + } + } + + // --- Status polling --- + const pollTimer = setInterval(() => { + if (stopped) return; + pollStatuses(); + }, pollIntervalMs); + + async function pollStatuses(): Promise { + let manifest: Manifest; + try { + manifest = await readManifest(projectRoot); + } catch { + return; // manifest unreadable — skip + } + + // Batch-fetch pane info + let paneMap: Map | undefined; + try { + paneMap = await listSessionPanes(manifest.sessionName); + } catch { + return; // tmux unavailable — skip + } + + // Check each agent's live status + const nextStatuses = new Map(); + for (const wt of Object.values(manifest.worktrees)) { + for (const agent of Object.values(wt.agents)) { + try { + const { status } = await checkAgentStatus(agent, projectRoot, paneMap); + nextStatuses.set(agent.id, status); + + const prev = previousStatuses.get(agent.id); + if (prev !== undefined && prev !== status) { + broadcast({ + type: 'agent:status', + payload: { agentId: agent.id, worktreeId: wt.id, status, previousStatus: prev }, + }); + } + } catch { + // Individual agent check failed — skip + } + } + } + previousStatuses = nextStatuses; + } + + return { + stop() { + stopped = true; + if (debounceTimer) clearTimeout(debounceTimer); + if (watcher) watcher.close(); + clearInterval(pollTimer); + }, + }; +} From 8413043e0e820a5a88898559375c67b8c059607d Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 07:53:53 -0600 Subject: [PATCH 2/3] fix: address code review findings for manifest watcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Replace inline import() type with top-level import type for PaneInfo P1: Add multi-agent/multi-worktree test covering nested loop + worktreeId mapping P2: Replace WsEvent payload: unknown with discriminated union for type-safe consumers P2: Add directory watcher fallback when manifest.json doesn't exist at startup P2: Add test for agent removed between polls (stale entry handling) P3: Use Promise.all for parallel agent status checks (consistent with refreshAllAgentStatuses) P3: Add polling overlap guard to prevent duplicate events from slow polls P3: Fix cleanup test silent if-guard — assert watchResults.length > 0 P3: Move makeManifest factory to shared test-fixtures.ts P3: Add optional onError callback for error observability P3: Document manifest:updated vs agent:status event gap in JSDoc --- src/server/ws/watcher.test.ts | 179 ++++++++++++++++++++++++++++------ src/server/ws/watcher.ts | 165 ++++++++++++++++++++----------- src/test-fixtures.ts | 14 ++- 3 files changed, 271 insertions(+), 87 deletions(-) diff --git a/src/server/ws/watcher.test.ts b/src/server/ws/watcher.test.ts index 28289d0..f246ddc 100644 --- a/src/server/ws/watcher.test.ts +++ b/src/server/ws/watcher.test.ts @@ -1,12 +1,11 @@ import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; -import { makeAgent, makeWorktree } from '../../test-fixtures.js'; +import { makeAgent, makeManifest, makeWorktree } from '../../test-fixtures.js'; import type { WsEvent } from './watcher.js'; -import type { Manifest } from '../../types/manifest.js'; // Mock fs (synchronous watch API) vi.mock('node:fs', () => ({ default: { - watch: vi.fn((_path: string, _cb: () => void) => ({ + watch: vi.fn((_path: string, _cb: (...args: unknown[]) => void) => ({ on: vi.fn(), close: vi.fn(), })), @@ -28,6 +27,7 @@ vi.mock('../../core/tmux.js', () => ({ vi.mock('../../lib/paths.js', () => ({ manifestPath: vi.fn(() => '/tmp/project/.ppg/manifest.json'), + ppgDir: vi.fn(() => '/tmp/project/.ppg'), })); import nodefs from 'node:fs'; @@ -43,18 +43,6 @@ const mockedFsWatch = vi.mocked(nodefs.watch); const PROJECT_ROOT = '/tmp/project'; -function makeManifest(overrides?: Partial): Manifest { - return { - version: 1, - projectRoot: PROJECT_ROOT, - sessionName: 'ppg', - worktrees: {}, - createdAt: '2026-01-01T00:00:00.000Z', - updatedAt: '2026-01-01T00:00:00.000Z', - ...overrides, - }; -} - /** Trigger the most recent fs.watch callback (simulates file change) */ function triggerFsWatch(): void { const calls = mockedFsWatch.mock.calls; @@ -79,7 +67,7 @@ describe('startManifestWatcher', () => { test('given file change, should broadcast manifest:updated after debounce', async () => { const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); - const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); mockedReadManifest.mockResolvedValue(manifest); mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); @@ -89,7 +77,6 @@ describe('startManifestWatcher', () => { pollIntervalMs: 60_000, // effectively disable polling for this test }); - // Trigger fs.watch callback triggerFsWatch(); // Before debounce fires — no event yet @@ -106,7 +93,7 @@ describe('startManifestWatcher', () => { }); test('given rapid file changes, should debounce to single broadcast', async () => { - const manifest = makeManifest(); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); mockedReadManifest.mockResolvedValue(manifest); const events: WsEvent[] = []; @@ -134,16 +121,20 @@ describe('startManifestWatcher', () => { test('given manifest read error during file change, should not broadcast', async () => { mockedReadManifest.mockRejectedValue(new SyntaxError('Unexpected end of JSON')); + const errors: unknown[] = []; const events: WsEvent[] = []; const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { debounceMs: 300, pollIntervalMs: 60_000, + onError: (err) => errors.push(err), }); triggerFsWatch(); await vi.advanceTimersByTimeAsync(350); expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SyntaxError); watcher.stop(); }); @@ -153,7 +144,7 @@ describe('startManifestWatcher', () => { test('given agent status change, should broadcast agent:status', async () => { const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); - const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); mockedReadManifest.mockResolvedValue(manifest); // First poll: running, second poll: idle @@ -187,10 +178,92 @@ describe('startManifestWatcher', () => { watcher.stop(); }); + test('given multiple agents across worktrees, should broadcast each change', async () => { + const agent1 = makeAgent({ id: 'ag-aaa11111', status: 'running', tmuxTarget: 'ppg:1.0' }); + const agent2 = makeAgent({ id: 'ag-bbb22222', status: 'running', tmuxTarget: 'ppg:2.0' }); + const wt1 = makeWorktree({ id: 'wt-aaa111', name: 'auth', agents: { [agent1.id]: agent1 } }); + const wt2 = makeWorktree({ id: 'wt-bbb222', name: 'api', agents: { [agent2.id]: agent2 } }); + const manifest = makeManifest({ + projectRoot: PROJECT_ROOT, + worktrees: { [wt1.id]: wt1, [wt2.id]: wt2 }, + }); + mockedReadManifest.mockResolvedValue(manifest); + + // First poll: both running. Second poll: agent1 idle, agent2 gone + mockedCheckAgentStatus + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'running' }) + .mockResolvedValueOnce({ status: 'idle' }) + .mockResolvedValueOnce({ status: 'gone' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — baseline + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — both changed + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(2); + + const statusEvents = events.filter((e) => e.type === 'agent:status'); + expect(statusEvents).toHaveLength(2); + + const payloads = statusEvents.map((e) => e.payload); + expect(payloads).toContainEqual({ + agentId: 'ag-aaa11111', + worktreeId: 'wt-aaa111', + status: 'idle', + previousStatus: 'running', + }); + expect(payloads).toContainEqual({ + agentId: 'ag-bbb22222', + worktreeId: 'wt-bbb222', + status: 'gone', + previousStatus: 'running', + }); + + watcher.stop(); + }); + + test('given agent removed between polls, should not emit stale event', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifestWithAgent = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + const manifestEmpty = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: {} }); + + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + // First poll sees agent, second poll agent's worktree is gone + mockedReadManifest + .mockResolvedValueOnce(manifestWithAgent) + .mockResolvedValueOnce(manifestEmpty); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First poll — baseline with agent + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + // Second poll — agent gone from manifest, no stale event emitted + await vi.advanceTimersByTimeAsync(1000); + expect(events).toHaveLength(0); + + watcher.stop(); + }); + test('given no status change, should not broadcast', async () => { const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); - const manifest = makeManifest({ worktrees: { [wt.id]: wt } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); mockedReadManifest.mockResolvedValue(manifest); mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); @@ -209,34 +282,81 @@ describe('startManifestWatcher', () => { watcher.stop(); }); - test('given manifest read failure during poll, should skip cycle', async () => { - mockedReadManifest.mockRejectedValue(new Error('ENOENT')); + test('given manifest read failure during poll, should skip cycle and report error', async () => { + const readError = new Error('ENOENT'); + mockedReadManifest.mockRejectedValue(readError); + const errors: unknown[] = []; const events: WsEvent[] = []; const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { debounceMs: 300, pollIntervalMs: 1000, + onError: (err) => errors.push(err), }); await vi.advanceTimersByTimeAsync(1000); expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(readError); watcher.stop(); }); - test('given tmux unavailable during poll, should skip cycle', async () => { - const manifest = makeManifest(); + test('given tmux unavailable during poll, should skip cycle and report error', async () => { + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); mockedReadManifest.mockResolvedValue(manifest); - mockedListSessionPanes.mockRejectedValue(new Error('tmux not found')); + const tmuxError = new Error('tmux not found'); + mockedListSessionPanes.mockRejectedValue(tmuxError); + const errors: unknown[] = []; const events: WsEvent[] = []; const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { debounceMs: 300, pollIntervalMs: 1000, + onError: (err) => errors.push(err), }); await vi.advanceTimersByTimeAsync(1000); expect(events).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(tmuxError); + + watcher.stop(); + }); + }); + + describe('overlap guard', () => { + test('given slow poll, should skip overlapping tick', async () => { + const agent = makeAgent({ id: 'ag-aaa11111', status: 'running' }); + const wt = makeWorktree({ id: 'wt-abc123', agents: { [agent.id]: agent } }); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT, worktrees: { [wt.id]: wt } }); + + // readManifest takes 1500ms on first call (longer than pollInterval) + let callCount = 0; + mockedReadManifest.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return new Promise((resolve) => setTimeout(() => resolve(manifest), 1500)); + } + return Promise.resolve(manifest); + }); + mockedCheckAgentStatus.mockResolvedValue({ status: 'running' }); + + const events: WsEvent[] = []; + const watcher = startManifestWatcher(PROJECT_ROOT, (e) => events.push(e), { + debounceMs: 300, + pollIntervalMs: 1000, + }); + + // First tick at 1000ms starts a slow poll + await vi.advanceTimersByTimeAsync(1000); + // Second tick at 2000ms — poll still running, should be skipped + await vi.advanceTimersByTimeAsync(1000); + // Finish slow poll at 2500ms + await vi.advanceTimersByTimeAsync(500); + + // readManifest called once for the slow poll, second tick was skipped + expect(callCount).toBe(1); watcher.stop(); }); @@ -244,7 +364,7 @@ describe('startManifestWatcher', () => { describe('cleanup', () => { test('stop should clear all timers and close watcher', async () => { - const manifest = makeManifest(); + const manifest = makeManifest({ projectRoot: PROJECT_ROOT }); mockedReadManifest.mockResolvedValue(manifest); const events: WsEvent[] = []; @@ -263,10 +383,9 @@ describe('startManifestWatcher', () => { // Verify fs.watch close was called const watchResults = mockedFsWatch.mock.results; - if (watchResults.length > 0) { - const fsWatcher = watchResults[0].value as { close: ReturnType }; - expect(fsWatcher.close).toHaveBeenCalled(); - } + expect(watchResults.length).toBeGreaterThan(0); + const fsWatcher = watchResults[0].value as { close: ReturnType }; + expect(fsWatcher.close).toHaveBeenCalled(); }); }); }); diff --git a/src/server/ws/watcher.ts b/src/server/ws/watcher.ts index 7d3fc5a..7e149dd 100644 --- a/src/server/ws/watcher.ts +++ b/src/server/ws/watcher.ts @@ -1,17 +1,19 @@ import fs from 'node:fs'; +import path from 'node:path'; import { readManifest } from '../../core/manifest.js'; import { checkAgentStatus } from '../../core/agent.js'; -import { listSessionPanes } from '../../core/tmux.js'; -import { manifestPath } from '../../lib/paths.js'; +import { listSessionPanes, type PaneInfo } from '../../core/tmux.js'; +import { manifestPath, ppgDir } from '../../lib/paths.js'; import type { AgentStatus, Manifest } from '../../types/manifest.js'; -export interface WsEvent { - type: 'manifest:updated' | 'agent:status'; - payload: unknown; -} +export type WsEvent = + | { type: 'manifest:updated'; payload: Manifest } + | { type: 'agent:status'; payload: { agentId: string; worktreeId: string; status: AgentStatus; previousStatus: AgentStatus } }; export type BroadcastFn = (event: WsEvent) => void; +export type ErrorFn = (error: unknown) => void; + export interface ManifestWatcher { stop(): void; } @@ -22,97 +24,148 @@ export interface ManifestWatcher { * Two sources of change: * 1. `fs.watch` on manifest.json — fires `manifest:updated` (debounced 300ms) * 2. Status poll at `pollIntervalMs` — fires `agent:status` per changed agent + * + * Note: `manifest:updated` and `agent:status` are independent streams. + * A file change that adds/removes agents won't produce `agent:status` events + * until the next poll cycle. Consumers needing immediate agent awareness + * should derive it from the `manifest:updated` payload. + * + * The watcher must start after `ppg init` — if manifest.json doesn't exist + * at startup, the parent directory is watched and the file watcher is + * established once the manifest appears. */ export function startManifestWatcher( projectRoot: string, broadcast: BroadcastFn, - options?: { debounceMs?: number; pollIntervalMs?: number }, + options?: { debounceMs?: number; pollIntervalMs?: number; onError?: ErrorFn }, ): ManifestWatcher { const debounceMs = options?.debounceMs ?? 300; const pollIntervalMs = options?.pollIntervalMs ?? 3000; + const onError = options?.onError; let debounceTimer: ReturnType | null = null; let previousStatuses = new Map(); + let polling = false; let stopped = false; - // --- fs.watch on manifest.json --- + // --- fs.watch on manifest.json (with directory fallback) --- const mPath = manifestPath(projectRoot); - let watcher: fs.FSWatcher | null = null; - try { - watcher = fs.watch(mPath, () => { + let fileWatcher: fs.FSWatcher | null = null; + let dirWatcher: fs.FSWatcher | null = null; + + function onFsChange(): void { + if (stopped) return; + if (debounceTimer) clearTimeout(debounceTimer); + debounceTimer = setTimeout(() => { if (stopped) return; - if (debounceTimer) clearTimeout(debounceTimer); - debounceTimer = setTimeout(() => { - if (stopped) return; - onManifestFileChange(); - }, debounceMs); - }); - watcher.on('error', () => { - // File may be deleted or inaccessible — silently ignore - }); - } catch { - // manifest.json may not exist yet — that's OK + onManifestFileChange().catch((err) => onError?.(err)); + }, debounceMs); + } + + function watchManifestFile(): boolean { + try { + fileWatcher = fs.watch(mPath, onFsChange); + fileWatcher.on('error', () => {}); + return true; + } catch { + return false; + } + } + + // Try to watch manifest directly; fall back to watching .ppg/ directory + if (!watchManifestFile()) { + try { + const dir = ppgDir(projectRoot); + dirWatcher = fs.watch(dir, (_event, filename) => { + if (filename === path.basename(mPath) && !fileWatcher) { + if (watchManifestFile()) { + dirWatcher?.close(); + dirWatcher = null; + } + onFsChange(); + } + }); + dirWatcher.on('error', () => {}); + } catch { + // .ppg/ doesn't exist yet either — polling still works + } } async function onManifestFileChange(): Promise { try { const manifest = await readManifest(projectRoot); broadcast({ type: 'manifest:updated', payload: manifest }); - } catch { - // In-flight write or corrupted JSON — skip this cycle + } catch (err) { + onError?.(err); } } // --- Status polling --- const pollTimer = setInterval(() => { if (stopped) return; - pollStatuses(); + pollStatuses().catch((err) => onError?.(err)); }, pollIntervalMs); async function pollStatuses(): Promise { - let manifest: Manifest; + if (polling) return; + polling = true; try { - manifest = await readManifest(projectRoot); - } catch { - return; // manifest unreadable — skip - } + let manifest: Manifest; + try { + manifest = await readManifest(projectRoot); + } catch (err) { + onError?.(err); + return; + } - // Batch-fetch pane info - let paneMap: Map | undefined; - try { - paneMap = await listSessionPanes(manifest.sessionName); - } catch { - return; // tmux unavailable — skip - } + let paneMap: Map; + try { + paneMap = await listSessionPanes(manifest.sessionName); + } catch (err) { + onError?.(err); + return; + } - // Check each agent's live status - const nextStatuses = new Map(); - for (const wt of Object.values(manifest.worktrees)) { - for (const agent of Object.values(wt.agents)) { - try { - const { status } = await checkAgentStatus(agent, projectRoot, paneMap); - nextStatuses.set(agent.id, status); - - const prev = previousStatuses.get(agent.id); - if (prev !== undefined && prev !== status) { - broadcast({ - type: 'agent:status', - payload: { agentId: agent.id, worktreeId: wt.id, status, previousStatus: prev }, - }); - } - } catch { - // Individual agent check failed — skip + // Collect all agents with their worktree context + const agents = Object.values(manifest.worktrees).flatMap((wt) => + Object.values(wt.agents).map((agent) => ({ agent, worktreeId: wt.id })), + ); + + // Check statuses in parallel (checkAgentStatus does no I/O when paneMap is provided) + const results = await Promise.all( + agents.map(({ agent }) => + checkAgentStatus(agent, projectRoot, paneMap).catch(() => null), + ), + ); + + const nextStatuses = new Map(); + for (let i = 0; i < agents.length; i++) { + const result = results[i]; + if (!result) continue; + + const { agent, worktreeId } = agents[i]; + nextStatuses.set(agent.id, result.status); + + const prev = previousStatuses.get(agent.id); + if (prev !== undefined && prev !== result.status) { + broadcast({ + type: 'agent:status', + payload: { agentId: agent.id, worktreeId, status: result.status, previousStatus: prev }, + }); } } + previousStatuses = nextStatuses; + } finally { + polling = false; } - previousStatuses = nextStatuses; } return { stop() { stopped = true; if (debounceTimer) clearTimeout(debounceTimer); - if (watcher) watcher.close(); + if (fileWatcher) fileWatcher.close(); + if (dirWatcher) dirWatcher.close(); clearInterval(pollTimer); }, }; diff --git a/src/test-fixtures.ts b/src/test-fixtures.ts index 3a4c12b..38c7c4b 100644 --- a/src/test-fixtures.ts +++ b/src/test-fixtures.ts @@ -1,4 +1,4 @@ -import type { AgentEntry, WorktreeEntry } from './types/manifest.js'; +import type { AgentEntry, Manifest, WorktreeEntry } from './types/manifest.js'; import type { PaneInfo } from './core/tmux.js'; export function makeAgent(overrides?: Partial): AgentEntry { @@ -38,3 +38,15 @@ export function makePaneInfo(overrides?: Partial): PaneInfo { ...overrides, }; } + +export function makeManifest(overrides?: Partial): Manifest { + return { + version: 1, + projectRoot: '/tmp/project', + sessionName: 'ppg', + worktrees: {}, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +} From a997446434105d05c6cdfb2b40e6bbbb81dad920 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 08:34:42 -0600 Subject: [PATCH 3/3] test: type manifest fixture in spawn tests --- src/commands/spawn.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..3b1b54f 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -7,6 +7,7 @@ import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; import * as tmux from '../core/tmux.js'; +import type { Manifest } from '../types/manifest.js'; vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises'); @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo',