From 7e8375eaf59c9d6b03a76d99c9400ac2a5c02c4f Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 00:05:15 -0600 Subject: [PATCH 1/3] feat: implement WebSocket manager with auto-reconnect and keepalive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observable WebSocket manager using URLSessionWebSocketTask with: - Connection states: disconnected, connecting, connected, reconnecting - Exponential backoff reconnect (1s→30s max) - Keepalive ping every 30s - Event parsing into typed enums (manifest updates, agent/worktree status) - Terminal subscribe/unsubscribe/input commands - NotificationCenter integration matching existing app patterns Closes #79 --- PPG CLI/PPG CLI/WebSocketManager.swift | 347 +++++++++++++++++++++++++ 1 file changed, 347 insertions(+) create mode 100644 PPG CLI/PPG CLI/WebSocketManager.swift diff --git a/PPG CLI/PPG CLI/WebSocketManager.swift b/PPG CLI/PPG CLI/WebSocketManager.swift new file mode 100644 index 0000000..096fef3 --- /dev/null +++ b/PPG CLI/PPG CLI/WebSocketManager.swift @@ -0,0 +1,347 @@ +import Foundation + +// MARK: - Notifications + +extension Notification.Name { + static let webSocketStateDidChange = Notification.Name("PPGWebSocketStateDidChange") + static let webSocketDidReceiveEvent = Notification.Name("PPGWebSocketDidReceiveEvent") +} + +// MARK: - Connection State + +enum WebSocketConnectionState: Equatable, Sendable { + case disconnected + case connecting + case connected + case reconnecting(attempt: Int) + + var isConnected: Bool { self == .connected } +} + +// MARK: - Server Events + +enum WebSocketEvent: Sendable { + case manifestUpdated(ManifestModel) + case agentStatusChanged(agentId: String, status: AgentStatus) + case worktreeStatusChanged(worktreeId: String, status: String) + case pong + case unknown(type: String, payload: String) +} + +// MARK: - Client Commands + +enum WebSocketCommand { + case subscribe(channel: String) + case unsubscribe(channel: String) + case terminalInput(agentId: String, data: String) + + var jsonString: String { + switch self { + case .subscribe(let channel): + return #"{"type":"subscribe","channel":"\#(channel)"}"# + case .unsubscribe(let channel): + return #"{"type":"unsubscribe","channel":"\#(channel)"}"# + case .terminalInput(let agentId, let data): + let escaped = data + .replacingOccurrences(of: "\\", with: "\\\\") + .replacingOccurrences(of: "\"", with: "\\\"") + .replacingOccurrences(of: "\n", with: "\\n") + .replacingOccurrences(of: "\r", with: "\\r") + .replacingOccurrences(of: "\t", with: "\\t") + return #"{"type":"terminal_input","agentId":"\#(agentId)","data":"\#(escaped)"}"# + } + } +} + +// MARK: - WebSocketManager + +nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWebSocketDelegate { + + /// Notification userInfo key for connection state. + static let stateUserInfoKey = "PPGWebSocketState" + /// Notification userInfo key for received event. + static let eventUserInfoKey = "PPGWebSocketEvent" + + // MARK: - Configuration + + private let url: URL + private let maxReconnectDelay: TimeInterval = 30.0 + private let baseReconnectDelay: TimeInterval = 1.0 + private let pingInterval: TimeInterval = 30.0 + + // MARK: - State + + private let queue = DispatchQueue(label: "ppg.websocket-manager", qos: .utility) + private(set) var state: WebSocketConnectionState = .disconnected { + didSet { + guard state != oldValue else { return } + let newState = state + DispatchQueue.main.async { + NotificationCenter.default.post( + name: .webSocketStateDidChange, + object: nil, + userInfo: [WebSocketManager.stateUserInfoKey: newState] + ) + } + } + } + + private var session: URLSession? + private var task: URLSessionWebSocketTask? + private var pingTimer: DispatchSourceTimer? + private var reconnectAttempt = 0 + private var intentionalDisconnect = false + + // MARK: - Callbacks (alternative to NotificationCenter) + + var onStateChange: ((WebSocketConnectionState) -> Void)? + var onEvent: ((WebSocketEvent) -> Void)? + + // MARK: - Init + + init(url: URL) { + self.url = url + super.init() + } + + convenience init?(urlString: String) { + guard let url = URL(string: urlString) else { return nil } + self.init(url: url) + } + + deinit { + disconnect() + } + + // MARK: - Public API + + func connect() { + queue.async { [weak self] in + self?.doConnect() + } + } + + func disconnect() { + queue.async { [weak self] in + self?.doDisconnect() + } + } + + func send(_ command: WebSocketCommand) { + queue.async { [weak self] in + self?.doSend(command.jsonString) + } + } + + // MARK: - Connection Lifecycle + + private func doConnect() { + guard state == .disconnected || state != .connecting else { return } + + intentionalDisconnect = false + + if case .reconnecting = state { + // Already in reconnect flow — keep the attempt counter + } else { + reconnectAttempt = 0 + state = .connecting + } + + let config = URLSessionConfiguration.default + config.waitsForConnectivity = true + session = URLSession(configuration: config, delegate: self, delegateQueue: nil) + + let wsTask = session!.webSocketTask(with: url) + task = wsTask + wsTask.resume() + } + + private func doDisconnect() { + intentionalDisconnect = true + stopPingTimer() + task?.cancel(with: .goingAway, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil + reconnectAttempt = 0 + state = .disconnected + } + + // MARK: - Sending + + private func doSend(_ text: String) { + guard state == .connected, let task = task else { return } + task.send(.string(text)) { error in + if let error = error { + NSLog("[WebSocketManager] send error: \(error.localizedDescription)") + } + } + } + + // MARK: - Receiving + + private func listenForMessages() { + task?.receive { [weak self] result in + guard let self = self else { return } + switch result { + case .success(let message): + self.handleMessage(message) + self.listenForMessages() + case .failure(let error): + if !self.intentionalDisconnect { + NSLog("[WebSocketManager] receive error: \(error.localizedDescription)") + self.queue.async { self.handleConnectionLost() } + } + } + } + } + + private func handleMessage(_ message: URLSessionWebSocketTask.Message) { + let text: String + switch message { + case .string(let s): + text = s + case .data(let d): + guard let s = String(data: d, encoding: .utf8) else { return } + text = s + @unknown default: + return + } + + guard let event = parseEvent(text) else { return } + + // Notify via callback + DispatchQueue.main.async { [weak self] in + self?.onEvent?(event) + NotificationCenter.default.post( + name: .webSocketDidReceiveEvent, + object: nil, + userInfo: [WebSocketManager.eventUserInfoKey: event] + ) + } + } + + // MARK: - Event Parsing + + private func parseEvent(_ text: String) -> WebSocketEvent? { + guard let data = text.data(using: .utf8), + let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + let type = json["type"] as? String else { + return nil + } + + switch type { + case "manifest_updated": + if let payloadData = json["manifest"], + let payloadJSON = try? JSONSerialization.data(withJSONObject: payloadData), + let manifest = try? JSONDecoder().decode(ManifestModel.self, from: payloadJSON) { + return .manifestUpdated(manifest) + } + return .unknown(type: type, payload: text) + + case "agent_status_changed": + if let agentId = json["agentId"] as? String, + let statusRaw = json["status"] as? String, + let status = AgentStatus(rawValue: statusRaw) { + return .agentStatusChanged(agentId: agentId, status: status) + } + return .unknown(type: type, payload: text) + + case "worktree_status_changed": + if let worktreeId = json["worktreeId"] as? String, + let status = json["status"] as? String { + return .worktreeStatusChanged(worktreeId: worktreeId, status: status) + } + return .unknown(type: type, payload: text) + + case "pong": + return .pong + + default: + return .unknown(type: type, payload: text) + } + } + + // MARK: - Keepalive Ping + + private func startPingTimer() { + stopPingTimer() + let timer = DispatchSource.makeTimerSource(queue: queue) + timer.schedule(deadline: .now() + pingInterval, repeating: pingInterval) + timer.setEventHandler { [weak self] in + self?.sendPing() + } + timer.resume() + pingTimer = timer + } + + private func stopPingTimer() { + pingTimer?.cancel() + pingTimer = nil + } + + private func sendPing() { + task?.sendPing { [weak self] error in + if let error = error { + NSLog("[WebSocketManager] ping error: \(error.localizedDescription)") + self?.queue.async { self?.handleConnectionLost() } + } + } + } + + // MARK: - Reconnect + + private func handleConnectionLost() { + guard !intentionalDisconnect else { return } + stopPingTimer() + task?.cancel(with: .abnormalClosure, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil + scheduleReconnect() + } + + private func scheduleReconnect() { + reconnectAttempt += 1 + state = .reconnecting(attempt: reconnectAttempt) + + let delay = min(baseReconnectDelay * pow(2.0, Double(reconnectAttempt - 1)), maxReconnectDelay) + NSLog("[WebSocketManager] reconnecting in %.1fs (attempt %d)", delay, reconnectAttempt) + + queue.asyncAfter(deadline: .now() + delay) { [weak self] in + guard let self = self, !self.intentionalDisconnect else { return } + self.doConnect() + } + } + + // MARK: - URLSessionWebSocketDelegate + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + queue.async { [weak self] in + guard let self = self else { return } + self.reconnectAttempt = 0 + self.state = .connected + self.startPingTimer() + self.listenForMessages() + } + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + queue.async { [weak self] in + guard let self = self else { return } + if self.intentionalDisconnect { + self.state = .disconnected + } else { + self.handleConnectionLost() + } + } + } + + func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + guard error != nil else { return } + queue.async { [weak self] in + guard let self = self, !self.intentionalDisconnect else { return } + self.handleConnectionLost() + } + } +} From 406e01fd1f547d8d2651a39ce68d29b7b0be1747 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 08:03:03 -0600 Subject: [PATCH 2/3] fix: address review findings in WebSocketManager - B2: fix doConnect() guard to reject double-connect (was tautological) - B1: remove unused closure callbacks (onStateChange/onEvent) for consistency with codebase's NotificationCenter-only pattern - B3: synchronous cleanup in deinit instead of unreliable async dispatch - T1: thread-safe state reads via queue.sync backing store - Security: replace manual JSON string interpolation with JSONSerialization to prevent injection in channel/agentId fields - Make parseEvent() internal for testability - Add WebSocketManagerTests (21 tests covering event parsing, command serialization, connection state, and edge cases) --- PPG CLI/PPG CLI/WebSocketManager.swift | 94 ++++---- .../PPG CLITests/WebSocketManagerTests.swift | 212 ++++++++++++++++++ 2 files changed, 266 insertions(+), 40 deletions(-) create mode 100644 PPG CLI/PPG CLITests/WebSocketManagerTests.swift diff --git a/PPG CLI/PPG CLI/WebSocketManager.swift b/PPG CLI/PPG CLI/WebSocketManager.swift index 096fef3..a1a6848 100644 --- a/PPG CLI/PPG CLI/WebSocketManager.swift +++ b/PPG CLI/PPG CLI/WebSocketManager.swift @@ -16,6 +16,11 @@ enum WebSocketConnectionState: Equatable, Sendable { case reconnecting(attempt: Int) var isConnected: Bool { self == .connected } + + var isReconnecting: Bool { + if case .reconnecting = self { return true } + return false + } } // MARK: - Server Events @@ -30,26 +35,26 @@ enum WebSocketEvent: Sendable { // MARK: - Client Commands -enum WebSocketCommand { +enum WebSocketCommand: Sendable { case subscribe(channel: String) case unsubscribe(channel: String) case terminalInput(agentId: String, data: String) var jsonString: String { + let dict: [String: String] switch self { case .subscribe(let channel): - return #"{"type":"subscribe","channel":"\#(channel)"}"# + dict = ["type": "subscribe", "channel": channel] case .unsubscribe(let channel): - return #"{"type":"unsubscribe","channel":"\#(channel)"}"# + dict = ["type": "unsubscribe", "channel": channel] case .terminalInput(let agentId, let data): - let escaped = data - .replacingOccurrences(of: "\\", with: "\\\\") - .replacingOccurrences(of: "\"", with: "\\\"") - .replacingOccurrences(of: "\n", with: "\\n") - .replacingOccurrences(of: "\r", with: "\\r") - .replacingOccurrences(of: "\t", with: "\\t") - return #"{"type":"terminal_input","agentId":"\#(agentId)","data":"\#(escaped)"}"# + dict = ["type": "terminal_input", "agentId": agentId, "data": data] + } + guard let data = try? JSONSerialization.data(withJSONObject: dict, options: [.sortedKeys]), + let str = String(data: data, encoding: .utf8) else { + return "{}" } + return str } } @@ -72,18 +77,13 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb // MARK: - State private let queue = DispatchQueue(label: "ppg.websocket-manager", qos: .utility) - private(set) var state: WebSocketConnectionState = .disconnected { - didSet { - guard state != oldValue else { return } - let newState = state - DispatchQueue.main.async { - NotificationCenter.default.post( - name: .webSocketStateDidChange, - object: nil, - userInfo: [WebSocketManager.stateUserInfoKey: newState] - ) - } - } + + /// Internal state — only read/write on `queue`. + private var _state: WebSocketConnectionState = .disconnected + + /// Thread-safe read of the current connection state. + var state: WebSocketConnectionState { + queue.sync { _state } } private var session: URLSession? @@ -92,11 +92,6 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb private var reconnectAttempt = 0 private var intentionalDisconnect = false - // MARK: - Callbacks (alternative to NotificationCenter) - - var onStateChange: ((WebSocketConnectionState) -> Void)? - var onEvent: ((WebSocketEvent) -> Void)? - // MARK: - Init init(url: URL) { @@ -110,7 +105,14 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb } deinit { - disconnect() + // Synchronous cleanup — safe because we're the last reference holder. + intentionalDisconnect = true + pingTimer?.cancel() + pingTimer = nil + task?.cancel(with: .goingAway, reason: nil) + task = nil + session?.invalidateAndCancel() + session = nil } // MARK: - Public API @@ -136,15 +138,15 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb // MARK: - Connection Lifecycle private func doConnect() { - guard state == .disconnected || state != .connecting else { return } + guard _state == .disconnected || _state.isReconnecting else { return } intentionalDisconnect = false - if case .reconnecting = state { + if _state.isReconnecting { // Already in reconnect flow — keep the attempt counter } else { reconnectAttempt = 0 - state = .connecting + setState(.connecting) } let config = URLSessionConfiguration.default @@ -164,13 +166,26 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb session?.invalidateAndCancel() session = nil reconnectAttempt = 0 - state = .disconnected + setState(.disconnected) + } + + /// Set state on the queue and post a notification on main. + private func setState(_ newState: WebSocketConnectionState) { + guard _state != newState else { return } + _state = newState + DispatchQueue.main.async { + NotificationCenter.default.post( + name: .webSocketStateDidChange, + object: nil, + userInfo: [WebSocketManager.stateUserInfoKey: newState] + ) + } } // MARK: - Sending private func doSend(_ text: String) { - guard state == .connected, let task = task else { return } + guard _state == .connected, let task = task else { return } task.send(.string(text)) { error in if let error = error { NSLog("[WebSocketManager] send error: \(error.localizedDescription)") @@ -210,9 +225,7 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb guard let event = parseEvent(text) else { return } - // Notify via callback - DispatchQueue.main.async { [weak self] in - self?.onEvent?(event) + DispatchQueue.main.async { NotificationCenter.default.post( name: .webSocketDidReceiveEvent, object: nil, @@ -223,7 +236,8 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb // MARK: - Event Parsing - private func parseEvent(_ text: String) -> WebSocketEvent? { + /// Parse a JSON text message into a typed event. Internal for testability. + func parseEvent(_ text: String) -> WebSocketEvent? { guard let data = text.data(using: .utf8), let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], let type = json["type"] as? String else { @@ -303,7 +317,7 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb private func scheduleReconnect() { reconnectAttempt += 1 - state = .reconnecting(attempt: reconnectAttempt) + setState(.reconnecting(attempt: reconnectAttempt)) let delay = min(baseReconnectDelay * pow(2.0, Double(reconnectAttempt - 1)), maxReconnectDelay) NSLog("[WebSocketManager] reconnecting in %.1fs (attempt %d)", delay, reconnectAttempt) @@ -320,7 +334,7 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb queue.async { [weak self] in guard let self = self else { return } self.reconnectAttempt = 0 - self.state = .connected + self.setState(.connected) self.startPingTimer() self.listenForMessages() } @@ -330,7 +344,7 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb queue.async { [weak self] in guard let self = self else { return } if self.intentionalDisconnect { - self.state = .disconnected + self.setState(.disconnected) } else { self.handleConnectionLost() } diff --git a/PPG CLI/PPG CLITests/WebSocketManagerTests.swift b/PPG CLI/PPG CLITests/WebSocketManagerTests.swift new file mode 100644 index 0000000..70dac9d --- /dev/null +++ b/PPG CLI/PPG CLITests/WebSocketManagerTests.swift @@ -0,0 +1,212 @@ +import XCTest +@testable import PPG_CLI + +final class WebSocketManagerTests: XCTestCase { + + // MARK: - WebSocketConnectionState + + func testIsConnectedReturnsTrueOnlyWhenConnected() { + XCTAssertTrue(WebSocketConnectionState.connected.isConnected) + XCTAssertFalse(WebSocketConnectionState.disconnected.isConnected) + XCTAssertFalse(WebSocketConnectionState.connecting.isConnected) + XCTAssertFalse(WebSocketConnectionState.reconnecting(attempt: 1).isConnected) + } + + func testIsReconnectingReturnsTrueOnlyWhenReconnecting() { + XCTAssertTrue(WebSocketConnectionState.reconnecting(attempt: 1).isReconnecting) + XCTAssertTrue(WebSocketConnectionState.reconnecting(attempt: 5).isReconnecting) + XCTAssertFalse(WebSocketConnectionState.connected.isReconnecting) + XCTAssertFalse(WebSocketConnectionState.disconnected.isReconnecting) + XCTAssertFalse(WebSocketConnectionState.connecting.isReconnecting) + } + + func testReconnectingEquality() { + XCTAssertEqual( + WebSocketConnectionState.reconnecting(attempt: 3), + WebSocketConnectionState.reconnecting(attempt: 3) + ) + XCTAssertNotEqual( + WebSocketConnectionState.reconnecting(attempt: 1), + WebSocketConnectionState.reconnecting(attempt: 2) + ) + } + + // MARK: - WebSocketCommand.jsonString + + func testSubscribeCommandProducesValidJSON() { + let cmd = WebSocketCommand.subscribe(channel: "manifest") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "subscribe") + XCTAssertEqual(json?["channel"] as? String, "manifest") + } + + func testUnsubscribeCommandProducesValidJSON() { + let cmd = WebSocketCommand.unsubscribe(channel: "agents") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "unsubscribe") + XCTAssertEqual(json?["channel"] as? String, "agents") + } + + func testTerminalInputCommandProducesValidJSON() { + let cmd = WebSocketCommand.terminalInput(agentId: "ag-12345678", data: "ls -la\n") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["type"] as? String, "terminal_input") + XCTAssertEqual(json?["agentId"] as? String, "ag-12345678") + XCTAssertEqual(json?["data"] as? String, "ls -la\n") + } + + func testCommandEscapesSpecialCharactersInChannel() { + // A channel name with quotes should not break JSON structure + let cmd = WebSocketCommand.subscribe(channel: #"test"channel"#) + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["channel"] as? String, #"test"channel"#) + } + + func testCommandEscapesSpecialCharactersInAgentId() { + let cmd = WebSocketCommand.terminalInput(agentId: #"id"with"quotes"#, data: "x") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["agentId"] as? String, #"id"with"quotes"#) + } + + func testTerminalInputPreservesControlCharacters() { + let cmd = WebSocketCommand.terminalInput(agentId: "ag-1", data: "line1\nline2\ttab\r") + let json = parseJSON(cmd.jsonString) + XCTAssertEqual(json?["data"] as? String, "line1\nline2\ttab\r") + } + + // MARK: - parseEvent + + func testParseAgentStatusChangedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-abc","status":"completed"}"# + let event = manager.parseEvent(json) + + if case .agentStatusChanged(let agentId, let status) = event { + XCTAssertEqual(agentId, "ag-abc") + XCTAssertEqual(status, .completed) + } else { + XCTFail("Expected agentStatusChanged, got \(String(describing: event))") + } + } + + func testParseWorktreeStatusChangedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"worktree_status_changed","worktreeId":"wt-xyz","status":"active"}"# + let event = manager.parseEvent(json) + + if case .worktreeStatusChanged(let worktreeId, let status) = event { + XCTAssertEqual(worktreeId, "wt-xyz") + XCTAssertEqual(status, "active") + } else { + XCTFail("Expected worktreeStatusChanged, got \(String(describing: event))") + } + } + + func testParsePongEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let event = manager.parseEvent(#"{"type":"pong"}"#) + + if case .pong = event { + // pass + } else { + XCTFail("Expected pong, got \(String(describing: event))") + } + } + + func testParseUnknownEventType() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"custom_event","foo":"bar"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, let payload) = event { + XCTAssertEqual(type, "custom_event") + XCTAssertEqual(payload, json) + } else { + XCTFail("Expected unknown, got \(String(describing: event))") + } + } + + func testParseManifestUpdatedEvent() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = """ + {"type":"manifest_updated","manifest":{"version":1,"projectRoot":"/tmp","sessionName":"s","worktrees":{},"createdAt":"t","updatedAt":"t"}} + """ + let event = manager.parseEvent(json) + + if case .manifestUpdated(let manifest) = event { + XCTAssertEqual(manifest.version, 1) + XCTAssertEqual(manifest.projectRoot, "/tmp") + XCTAssertEqual(manifest.sessionName, "s") + } else { + XCTFail("Expected manifestUpdated, got \(String(describing: event))") + } + } + + func testParseManifestUpdatedWithInvalidManifestFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"manifest_updated","manifest":{"bad":"data"}}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "manifest_updated") + } else { + XCTFail("Expected unknown fallback, got \(String(describing: event))") + } + } + + func testParseReturnsNilForInvalidJSON() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertNil(manager.parseEvent("not json")) + } + + func testParseReturnsNilForMissingType() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertNil(manager.parseEvent(#"{"channel":"test"}"#)) + } + + func testParseAgentStatusWithInvalidStatusFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-1","status":"bogus"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "agent_status_changed") + } else { + XCTFail("Expected unknown fallback for invalid status, got \(String(describing: event))") + } + } + + func testParseAgentStatusWithMissingFieldsFallsBackToUnknown() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + let json = #"{"type":"agent_status_changed","agentId":"ag-1"}"# + let event = manager.parseEvent(json) + + if case .unknown(let type, _) = event { + XCTAssertEqual(type, "agent_status_changed") + } else { + XCTFail("Expected unknown fallback for missing status, got \(String(describing: event))") + } + } + + // MARK: - Initial State + + func testInitialStateIsDisconnected() { + let manager = WebSocketManager(url: URL(string: "ws://localhost")!) + XCTAssertEqual(manager.state, .disconnected) + } + + func testConvenienceInitReturnsNilForEmptyString() { + XCTAssertNil(WebSocketManager(urlString: "")) + } + + func testConvenienceInitSucceedsForValidURL() { + XCTAssertNotNil(WebSocketManager(urlString: "ws://localhost:8080")) + } + + // MARK: - Helpers + + private func parseJSON(_ string: String) -> [String: Any]? { + guard let data = string.data(using: .utf8) else { return nil } + return try? JSONSerialization.jsonObject(with: data) as? [String: Any] + } +} From 583e545cc93ff68e1c6a0f28309e2aff4f1b5908 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Fri, 27 Feb 2026 08:36:33 -0600 Subject: [PATCH 3/3] Fix websocket reconnect race and typecheck manifest typing --- PPG CLI/PPG CLI/WebSocketManager.swift | 52 ++++++++++++++++++-------- src/commands/spawn.test.ts | 7 ++-- 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/PPG CLI/PPG CLI/WebSocketManager.swift b/PPG CLI/PPG CLI/WebSocketManager.swift index a1a6848..32975fa 100644 --- a/PPG CLI/PPG CLI/WebSocketManager.swift +++ b/PPG CLI/PPG CLI/WebSocketManager.swift @@ -9,7 +9,7 @@ extension Notification.Name { // MARK: - Connection State -enum WebSocketConnectionState: Equatable, Sendable { +nonisolated enum WebSocketConnectionState: Equatable, Sendable { case disconnected case connecting case connected @@ -25,7 +25,7 @@ enum WebSocketConnectionState: Equatable, Sendable { // MARK: - Server Events -enum WebSocketEvent: Sendable { +nonisolated enum WebSocketEvent: Sendable { case manifestUpdated(ManifestModel) case agentStatusChanged(agentId: String, status: AgentStatus) case worktreeStatusChanged(worktreeId: String, status: String) @@ -35,7 +35,7 @@ enum WebSocketEvent: Sendable { // MARK: - Client Commands -enum WebSocketCommand: Sendable { +nonisolated enum WebSocketCommand: Sendable { case subscribe(channel: String) case unsubscribe(channel: String) case terminalInput(agentId: String, data: String) @@ -89,8 +89,10 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb private var session: URLSession? private var task: URLSessionWebSocketTask? private var pingTimer: DispatchSourceTimer? + private var reconnectWorkItem: DispatchWorkItem? private var reconnectAttempt = 0 private var intentionalDisconnect = false + private var isHandlingConnectionLoss = false // MARK: - Init @@ -141,6 +143,9 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb guard _state == .disconnected || _state.isReconnecting else { return } intentionalDisconnect = false + isHandlingConnectionLoss = false + reconnectWorkItem?.cancel() + reconnectWorkItem = nil if _state.isReconnecting { // Already in reconnect flow — keep the attempt counter @@ -160,6 +165,9 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb private func doDisconnect() { intentionalDisconnect = true + isHandlingConnectionLoss = false + reconnectWorkItem?.cancel() + reconnectWorkItem = nil stopPingTimer() task?.cancel(with: .goingAway, reason: nil) task = nil @@ -195,17 +203,20 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb // MARK: - Receiving - private func listenForMessages() { - task?.receive { [weak self] result in + private func listenForMessages(for expectedTask: URLSessionWebSocketTask) { + expectedTask.receive { [weak self] result in guard let self = self else { return } - switch result { - case .success(let message): - self.handleMessage(message) - self.listenForMessages() - case .failure(let error): - if !self.intentionalDisconnect { - NSLog("[WebSocketManager] receive error: \(error.localizedDescription)") - self.queue.async { self.handleConnectionLost() } + self.queue.async { + guard self.task === expectedTask else { return } + switch result { + case .success(let message): + self.handleMessage(message) + self.listenForMessages(for: expectedTask) + case .failure(let error): + if !self.intentionalDisconnect { + NSLog("[WebSocketManager] receive error: \(error.localizedDescription)") + self.handleConnectionLost() + } } } } @@ -307,6 +318,8 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb private func handleConnectionLost() { guard !intentionalDisconnect else { return } + guard !isHandlingConnectionLoss else { return } + isHandlingConnectionLoss = true stopPingTimer() task?.cancel(with: .abnormalClosure, reason: nil) task = nil @@ -322,10 +335,14 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb let delay = min(baseReconnectDelay * pow(2.0, Double(reconnectAttempt - 1)), maxReconnectDelay) NSLog("[WebSocketManager] reconnecting in %.1fs (attempt %d)", delay, reconnectAttempt) - queue.asyncAfter(deadline: .now() + delay) { [weak self] in + let workItem = DispatchWorkItem { [weak self] in guard let self = self, !self.intentionalDisconnect else { return } + self.reconnectWorkItem = nil self.doConnect() } + reconnectWorkItem?.cancel() + reconnectWorkItem = workItem + queue.asyncAfter(deadline: .now() + delay, execute: workItem) } // MARK: - URLSessionWebSocketDelegate @@ -333,16 +350,19 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { queue.async { [weak self] in guard let self = self else { return } + guard self.task === webSocketTask else { return } self.reconnectAttempt = 0 + self.isHandlingConnectionLoss = false self.setState(.connected) self.startPingTimer() - self.listenForMessages() + self.listenForMessages(for: webSocketTask) } } func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { queue.async { [weak self] in guard let self = self else { return } + guard self.task === webSocketTask else { return } if self.intentionalDisconnect { self.setState(.disconnected) } else { @@ -355,6 +375,8 @@ nonisolated class WebSocketManager: NSObject, @unchecked Sendable, URLSessionWeb guard error != nil else { return } queue.async { [weak self] in guard let self = self, !self.intentionalDisconnect else { return } + guard let webSocketTask = task as? URLSessionWebSocketTask, + self.task === webSocketTask else { return } self.handleConnectionLost() } } diff --git a/src/commands/spawn.test.ts b/src/commands/spawn.test.ts index ee642c7..15ba3bc 100644 --- a/src/commands/spawn.test.ts +++ b/src/commands/spawn.test.ts @@ -7,6 +7,7 @@ import { spawnAgent } from '../core/agent.js'; import { getRepoRoot } from '../core/worktree.js'; import { agentId, sessionId } from '../lib/id.js'; import * as tmux from '../core/tmux.js'; +import type { AgentEntry, Manifest } from '../types/manifest.js'; vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises'); @@ -79,7 +80,7 @@ const mockedEnsureSession = vi.mocked(tmux.ensureSession); const mockedCreateWindow = vi.mocked(tmux.createWindow); const mockedSplitPane = vi.mocked(tmux.splitPane); -function createManifest(tmuxWindow = '') { +function createManifest(tmuxWindow = ''): Manifest { return { version: 1 as const, projectRoot: '/tmp/repo', @@ -93,7 +94,7 @@ function createManifest(tmuxWindow = '') { baseBranch: 'main', status: 'active' as const, tmuxWindow, - agents: {} as Record, + agents: {} as Record, createdAt: '2026-02-27T00:00:00.000Z', }, }, @@ -103,7 +104,7 @@ function createManifest(tmuxWindow = '') { } describe('spawnCommand', () => { - let manifestState = createManifest(); + let manifestState: Manifest = createManifest(); let nextAgent = 1; let nextSession = 1;