diff --git a/PPG CLI/PPG CLI/WebSocketManager.swift b/PPG CLI/PPG CLI/WebSocketManager.swift new file mode 100644 index 0000000..32975fa --- /dev/null +++ b/PPG CLI/PPG CLI/WebSocketManager.swift @@ -0,0 +1,383 @@ +import Foundation + +// MARK: - Notifications + +extension Notification.Name { + static let webSocketStateDidChange = Notification.Name("PPGWebSocketStateDidChange") + static let webSocketDidReceiveEvent = Notification.Name("PPGWebSocketDidReceiveEvent") +} + +// MARK: - Connection State + +nonisolated enum WebSocketConnectionState: Equatable, Sendable { + case disconnected + case connecting + case connected + case reconnecting(attempt: Int) + + var isConnected: Bool { self == .connected } + + var isReconnecting: Bool { + if case .reconnecting = self { return true } + return false + } +} + +// MARK: - Server Events + +nonisolated enum WebSocketEvent: Sendable { + case manifestUpdated(ManifestModel) + case agentStatusChanged(agentId: String, status: AgentStatus) + case worktreeStatusChanged(worktreeId: String, status: String) + case pong + case unknown(type: String, payload: String) +} + +// MARK: - Client Commands + +nonisolated enum WebSocketCommand: Sendable { + case subscribe(channel: String) + case unsubscribe(channel: String) + case terminalInput(agentId: String, data: String) + + var jsonString: String { + let dict: [String: String] + switch self { + case .subscribe(let channel): + dict = ["type": "subscribe", "channel": channel] + case .unsubscribe(let channel): + dict = ["type": "unsubscribe", "channel": channel] + case .terminalInput(let agentId, let data): + dict = ["type": "terminal_input", "agentId": agentId, "data": data] + } + guard let data = try? JSONSerialization.data(withJSONObject: dict, options: [.sortedKeys]), + let str = String(data: data, encoding: .utf8) else { + return "{}" + } + return str + } +} + +// MARK: - WebSocketManager + +nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWebSocketDelegate { + + /// Notification userInfo key for connection state. + static let stateUserInfoKey = "PPGWebSocketState" + /// Notification userInfo key for received event. + static let eventUserInfoKey = "PPGWebSocketEvent" + + // MARK: - Configuration + + private let url: URL + private let maxReconnectDelay: TimeInterval = 30.0 + private let baseReconnectDelay: TimeInterval = 1.0 + private let pingInterval: TimeInterval = 30.0 + + // MARK: - State + + private let queue = DispatchQueue(label: "ppg.websocket-manager", qos: .utility) + + /// Internal state — only read/write on `queue`. + private var _state: WebSocketConnectionState = .disconnected + + /// Thread-safe read of the current connection state. + var state: WebSocketConnectionState { + queue.sync { _state } + } + + private var session: URLSession? + private var task: URLSessionWebSocketTask? + private var pingTimer: DispatchSourceTimer? + private var reconnectWorkItem: DispatchWorkItem? + private var reconnectAttempt = 0 + private var intentionalDisconnect = false + private var isHandlingConnectionLoss = false + + // MARK: - Init + + init(url: URL) { + self.url = url + super.init() + } + + convenience init?(urlString: String) { + guard let url = URL(string: urlString) else { return nil } + self.init(url: url) + } + + deinit { + // Synchronous cleanup — safe because we're the last reference holder. + intentionalDisconnect = true + pingTimer?.cancel() + pingTimer = nil + task?.cancel(with: .goingAway, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil + } + + // MARK: - Public API + + func connect() { + queue.async { [weak self] in + self?.doConnect() + } + } + + func disconnect() { + queue.async { [weak self] in + self?.doDisconnect() + } + } + + func send(_ command: WebSocketCommand) { + queue.async { [weak self] in + self?.doSend(command.jsonString) + } + } + + // MARK: - Connection Lifecycle + + private func doConnect() { + guard _state == .disconnected || _state.isReconnecting else { return } + + intentionalDisconnect = false + isHandlingConnectionLoss = false + reconnectWorkItem?.cancel() + reconnectWorkItem = nil + + if _state.isReconnecting { + // Already in reconnect flow — keep the attempt counter + } else { + reconnectAttempt = 0 + setState(.connecting) + } + + let config = URLSessionConfiguration.default + config.waitsForConnectivity = true + session = URLSession(configuration: config, delegate: self, delegateQueue: nil) + + let wsTask = session!.webSocketTask(with: url) + task = wsTask + wsTask.resume() + } + + private func doDisconnect() { + intentionalDisconnect = true + isHandlingConnectionLoss = false + reconnectWorkItem?.cancel() + reconnectWorkItem = nil + stopPingTimer() + task?.cancel(with: .goingAway, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil + reconnectAttempt = 0 + setState(.disconnected) + } + + /// Set state on the queue and post a notification on main. + private func setState(_ newState: WebSocketConnectionState) { + guard _state != newState else { return } + _state = newState + DispatchQueue.main.async { + NotificationCenter.default.post( + name: .webSocketStateDidChange, + object: nil, + userInfo: [WebSocketManager.stateUserInfoKey: newState] + ) + } + } + + // MARK: - Sending + + private func doSend(_ text: String) { + guard _state == .connected, let task = task else { return } + task.send(.string(text)) { error in + if let error = error { + NSLog("[WebSocketManager] send error: \(error.localizedDescription)") + } + } + } + + // MARK: - Receiving + + private func listenForMessages(for expectedTask: URLSessionWebSocketTask) { + expectedTask.receive { [weak self] result in + guard let self = self else { return } + self.queue.async { + guard self.task === expectedTask else { return } + switch result { + case .success(let message): + self.handleMessage(message) + self.listenForMessages(for: expectedTask) + case .failure(let error): + if !self.intentionalDisconnect { + NSLog("[WebSocketManager] receive error: \(error.localizedDescription)") + self.handleConnectionLost() + } + } + } + } + } + + private func handleMessage(_ message: URLSessionWebSocketTask.Message) { + let text: String + switch message { + case .string(let s): + text = s + case .data(let d): + guard let s = String(data: d, encoding: .utf8) else { return } + text = s + @unknown default: + return + } + + guard let event = parseEvent(text) else { return } + + DispatchQueue.main.async { + NotificationCenter.default.post( + name: .webSocketDidReceiveEvent, + object: nil, + userInfo: [WebSocketManager.eventUserInfoKey: event] + ) + } + } + + // MARK: - Event Parsing + + /// Parse a JSON text message into a typed event. Internal for testability. + func parseEvent(_ text: String) -> WebSocketEvent? { + guard let data = text.data(using: .utf8), + let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + let type = json["type"] as? String else { + return nil + } + + switch type { + case "manifest_updated": + if let payloadData = json["manifest"], + let payloadJSON = try? JSONSerialization.data(withJSONObject: payloadData), + let manifest = try? JSONDecoder().decode(ManifestModel.self, from: payloadJSON) { + return .manifestUpdated(manifest) + } + return .unknown(type: type, payload: text) + + case "agent_status_changed": + if let agentId = json["agentId"] as? String, + let statusRaw = json["status"] as? String, + let status = AgentStatus(rawValue: statusRaw) { + return .agentStatusChanged(agentId: agentId, status: status) + } + return .unknown(type: type, payload: text) + + case "worktree_status_changed": + if let worktreeId = json["worktreeId"] as? String, + let status = json["status"] as? String { + return .worktreeStatusChanged(worktreeId: worktreeId, status: status) + } + return .unknown(type: type, payload: text) + + case "pong": + return .pong + + default: + return .unknown(type: type, payload: text) + } + } + + // MARK: - Keepalive Ping + + private func startPingTimer() { + stopPingTimer() + let timer = DispatchSource.makeTimerSource(queue: queue) + timer.schedule(deadline: .now() + pingInterval, repeating: pingInterval) + timer.setEventHandler { [weak self] in + self?.sendPing() + } + timer.resume() + pingTimer = timer + } + + private func stopPingTimer() { + pingTimer?.cancel() + pingTimer = nil + } + + private func sendPing() { + task?.sendPing { [weak self] error in + if let error = error { + NSLog("[WebSocketManager] ping error: \(error.localizedDescription)") + self?.queue.async { self?.handleConnectionLost() } + } + } + } + + // MARK: - Reconnect + + private func handleConnectionLost() { + guard !intentionalDisconnect else { return } + guard !isHandlingConnectionLoss else { return } + isHandlingConnectionLoss = true + stopPingTimer() + task?.cancel(with: .abnormalClosure, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil + scheduleReconnect() + } + + private func scheduleReconnect() { + reconnectAttempt += 1 + setState(.reconnecting(attempt: reconnectAttempt)) + + let delay = min(baseReconnectDelay * pow(2.0, Double(reconnectAttempt - 1)), maxReconnectDelay) + NSLog("[WebSocketManager] reconnecting in %.1fs (attempt %d)", delay, reconnectAttempt) + + let workItem = DispatchWorkItem { [weak self] in + guard let self = self, !self.intentionalDisconnect else { return } + self.reconnectWorkItem = nil + self.doConnect() + } + reconnectWorkItem?.cancel() + reconnectWorkItem = workItem + queue.asyncAfter(deadline: .now() + delay, execute: workItem) + } + + // MARK: - URLSessionWebSocketDelegate + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + queue.async { [weak self] in + guard let self = self else { return } + guard self.task === webSocketTask else { return } + self.reconnectAttempt = 0 + self.isHandlingConnectionLoss = false + self.setState(.connected) + self.startPingTimer() + self.listenForMessages(for: webSocketTask) + } + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + queue.async { [weak self] in + guard let self = self else { return } + guard self.task === webSocketTask else { return } + if self.intentionalDisconnect { + self.setState(.disconnected) + } else { + self.handleConnectionLost() + } + } + } + + func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + guard error != nil else { return } + queue.async { [weak self] in + guard let self = self, !self.intentionalDisconnect else { return } + guard let webSocketTask = task as? URLSessionWebSocketTask, + self.task === webSocketTask else { return } + self.handleConnectionLost() + } + } +} diff --git a/PPG CLI/PPG CLITests/WebSocketManagerTests.swift b/PPG CLI/PPG CLITests/WebSocketManagerTests.swift new file mode 100644 index 0000000..70dac9d --- /dev/null +++ b/PPG CLI/PPG CLITests/WebSocketManagerTests.swift @@ -0,0 +1,212 @@ +import XCTest +@testable import PPG_CLI + +final class WebSocketManagerTests: XCTestCase { + + // MARK: - WebSocketConnectionState + + func testIsConnectedReturnsTrueOnlyWhenConnected() { + XCTAssertTrue(WebSocketConnectionState.connected.isConnected) + XCTAssertFalse(WebSocketConnectionState.disconnected.isConnected) + XCTAssertFalse(WebSocketConnectionState.connecting.isConnected) + XCTAssertFalse(WebSocketConnectionState.reconnecting(attempt: 1).isConnected) + } + + func testIsReconnectingReturnsTrueOnlyWhenReconnecting() { + XCTAssertTrue(WebSocketConnectionState.reconnecting(attempt: 1).isReconnecting) + XCTAssertTrue(WebSocketConnectionState.reconnecting(attempt: 5).isReconnecting) + XCTAssertFalse(WebSocketConnectionState.connected.isReconnecting) + XCTAssertFalse(WebSocketConnectionState.disconnected.isReconnecting) + XCTAssertFalse(WebSocketConnectionState.connecting.isReconnecting) + } + + func testReconnectingEquality() { + XCTAssertEqual( + WebSocketConnectionState.reconnecting(attempt: 3), + WebSocketConnectionState.reconnecting(attempt: 3) + ) + XCTAssertNotEqual( + WebSocketConnectionState.reconnecting(attempt: 1), + WebSocketConnectionState.reconnecting(attempt: 2) + ) + } + + // MARK: - WebSocketCommand.jsonString + + func testSubscribeCommandProducesValidJSON() { + let cmd = WebSocketCommand.subscribe(channel: "manifest") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "subscribe") + XCTAssertEqual(json?["channel"] as? String, "manifest") + } + + func testUnsubscribeCommandProducesValidJSON() { + let cmd = WebSocketCommand.unsubscribe(channel: "agents") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "unsubscribe") + XCTAssertEqual(json?["channel"] as? String, "agents") + } + + func testTerminalInputCommandProducesValidJSON() { + let cmd = WebSocketCommand.terminalInput(agentId: "ag-12345678", data: "ls -la\n") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "terminal_input") + XCTAssertEqual(json?["agentId"] as? String, "ag-12345678") + XCTAssertEqual(json?["data"] as? String, "ls -la\n") + } + + func testCommandEscapesSpecialCharactersInChannel() { + // A channel name with quotes should not break JSON structure + let cmd = WebSocketCommand.subscribe(channel: #"test"channel"#) + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["channel"] as? String, #"test"channel"#) + } + + func testCommandEscapesSpecialCharactersInAgentId() { + let cmd = WebSocketCommand.terminalInput(agentId: #"id"with"quotes"#, data: "x") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["agentId"] as? String, #"id"with"quotes"#) + } + + func testTerminalInputPreservesControlCharacters() { + let cmd = WebSocketCommand.terminalInput(agentId: "ag-1", data: "line1\nline2\ttab\r") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["data"] as? String, "line1\nline2\ttab\r") + } + + // MARK: - parseEvent + + func testParseAgentStatusChangedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-abc","status":"completed"}"# + let event = manager.parseEvent(json) + + if case .agentStatusChanged(let agentId, let status) = event { + XCTAssertEqual(agentId, "ag-abc") + XCTAssertEqual(status, .completed) + } else { + XCTFail("Expected agentStatusChanged, got \(String(describing: event))") + } + } + + func testParseWorktreeStatusChangedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"worktree_status_changed","worktreeId":"wt-xyz","status":"active"}"# + let event = manager.parseEvent(json) + + if case .worktreeStatusChanged(let worktreeId, let status) = event { + XCTAssertEqual(worktreeId, "wt-xyz") + XCTAssertEqual(status, "active") + } else { + XCTFail("Expected worktreeStatusChanged, got \(String(describing: event))") + } + } + + func testParsePongEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let event = manager.parseEvent(#"{"type":"pong"}"#) + + if case .pong = event { + // pass + } else { + XCTFail("Expected pong, got \(String(describing: event))") + } + } + + func testParseUnknownEventType() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"custom_event","foo":"bar"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, let payload) = event { + XCTAssertEqual(type, "custom_event") + XCTAssertEqual(payload, json) + } else { + XCTFail("Expected unknown, got \(String(describing: event))") + } + } + + func testParseManifestUpdatedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = """ + {"type":"manifest_updated","manifest":{"version":1,"projectRoot":"/tmp","sessionName":"s","worktrees":{},"createdAt":"t","updatedAt":"t"}} + """ + let event = manager.parseEvent(json) + + if case .manifestUpdated(let manifest) = event { + XCTAssertEqual(manifest.version, 1) + XCTAssertEqual(manifest.projectRoot, "/tmp") + XCTAssertEqual(manifest.sessionName, "s") + } else { + XCTFail("Expected manifestUpdated, got \(String(describing: event))") + } + } + + func testParseManifestUpdatedWithInvalidManifestFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"manifest_updated","manifest":{"bad":"data"}}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "manifest_updated") + } else { + XCTFail("Expected unknown fallback, got \(String(describing: event))") + } + } + + func testParseReturnsNilForInvalidJSON() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertNil(manager.parseEvent("not json")) + } + + func testParseReturnsNilForMissingType() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertNil(manager.parseEvent(#"{"channel":"test"}"#)) + } + + func testParseAgentStatusWithInvalidStatusFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-1","status":"bogus"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "agent_status_changed") + } else { + XCTFail("Expected unknown fallback for invalid status, got \(String(describing: event))") + } + } + + func testParseAgentStatusWithMissingFieldsFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-1"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "agent_status_changed") + } else { + XCTFail("Expected unknown fallback for missing status, got \(String(describing: event))") + } + } + + // MARK: - Initial State + + func testInitialStateIsDisconnected() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertEqual(manager.state, .disconnected) + } + + func testConvenienceInitReturnsNilForEmptyString() { + XCTAssertNil(WebSocketManager(urlString: "")) + } + + func testConvenienceInitSucceedsForValidURL() { + XCTAssertNotNil(WebSocketManager(urlString: "ws://localhost:8080")) + } + + // MARK: - Helpers + + private func parseJSON(_ string: String) -> [String: Any]? { + guard let data = string.data(using: .utf8) else { return nil } + return try? JSONSerialization.jsonObject(with: data) as? [String: Any] + } +} diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..15ba3bc 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -7,6 +7,7 @@ import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; import * as tmux from '../core/tmux.js'; +import type { AgentEntry, Manifest } from '../types/manifest.js'; vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises'); @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', @@ -93,7 +94,7 @@ function createManifest(tmuxWindow = '') { baseBranch: 'main', status: 'active' as const, tmuxWindow, - agents: {} as Record, + agents: {} as Record, createdAt: '2026-02-27T00:00:00.000Z', }, }, @@ -103,7 +104,7 @@ function createManifest(tmuxWindow = '') { } describe('spawnCommand', () => { - let manifestState = createManifest(); + let manifestState: Manifest = createManifest(); let nextAgent = 1; let nextSession = 1;