Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 38 additions & 192 deletions src/OpenClaw.Shared/OpenClawGatewayClient.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace OpenClaw.Shared;

public class OpenClawGatewayClient : IDisposable
public class OpenClawGatewayClient : WebSocketClientBase
{
private ClientWebSocket? _webSocket;
private readonly string _gatewayUrl;
private readonly string _gatewayUrlForDisplay;
private readonly string _token;
private readonly string? _credentials;
private readonly IOpenClawLogger _logger;
private CancellationTokenSource _cts;
private bool _disposed;
private int _reconnectAttempts;
private static readonly int[] BackoffMs = { 1000, 2000, 4000, 8000, 15000, 30000, 60000 };

// Tracked state
private readonly Dictionary<string, SessionInfo> _sessions = new();
private readonly Dictionary<string, GatewayNodeInfo> _nodes = new();
Expand All @@ -45,8 +31,32 @@ private void ResetUnsupportedMethodFlags()
_nodeListUnsupported = false;
}

protected override int ReceiveBufferSize => 16384;
protected override string ClientRole => "gateway";

protected override Task ProcessMessageAsync(string json)
{
ProcessMessage(json);
return Task.CompletedTask;
}

protected override Task OnConnectedAsync()
{
ResetUnsupportedMethodFlags();
return Task.CompletedTask;
}

protected override void OnDisconnected()
{
ClearPendingRequests();
}

protected override void OnDisposing()
{
ClearPendingRequests();
}

// Events
public event EventHandler<ConnectionStatus>? StatusChanged;
public event EventHandler<OpenClawNotification>? NotificationReceived;
public event EventHandler<AgentActivity>? ActivityChanged;
public event EventHandler<ChannelHealth[]>? ChannelHealthUpdated;
Expand All @@ -59,77 +69,31 @@ private void ResetUnsupportedMethodFlags()
public event EventHandler<SessionCommandResult>? SessionCommandCompleted;

public OpenClawGatewayClient(string gatewayUrl, string token, IOpenClawLogger? logger = null)
: base(gatewayUrl, token, logger)
{
_gatewayUrl = GatewayUrlHelper.NormalizeForWebSocket(gatewayUrl);
_gatewayUrlForDisplay = GatewayUrlHelper.SanitizeForDisplay(_gatewayUrl);
_token = token;
_credentials = GatewayUrlHelper.ExtractCredentials(gatewayUrl);
_logger = logger ?? NullLogger.Instance;
_cts = new CancellationTokenSource();
}

public async Task ConnectAsync()
{
try
{
StatusChanged?.Invoke(this, ConnectionStatus.Connecting);
_logger.Info($"Connecting to gateway: {_gatewayUrlForDisplay}");

_webSocket = new ClientWebSocket();
_webSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(30);

// Set Origin header based on gateway URL (convert ws/wss to http/https)
var uri = new Uri(_gatewayUrl);
var originScheme = uri.Scheme == "wss" ? "https" : "http";
var origin = $"{originScheme}://{uri.Host}:{uri.Port}";
_webSocket.Options.SetRequestHeader("Origin", origin);

if (!string.IsNullOrEmpty(_credentials))
{
var credentialsToEncode = GatewayUrlHelper.DecodeCredentials(_credentials);

_webSocket.Options.SetRequestHeader(
"Authorization",
$"Basic {Convert.ToBase64String(Encoding.UTF8.GetBytes(credentialsToEncode))}");
}

await _webSocket.ConnectAsync(uri, _cts.Token);

ResetUnsupportedMethodFlags();
_reconnectAttempts = 0;
_logger.Info("Gateway connected, waiting for challenge...");

// Don't send connect yet - wait for challenge event in ListenForMessagesAsync
_ = Task.Run(() => ListenForMessagesAsync(), _cts.Token);
}
catch (Exception ex)
{
_logger.Error("Connection failed", ex);
StatusChanged?.Invoke(this, ConnectionStatus.Error);
}
}

public async Task DisconnectAsync()
{
if (_webSocket?.State == WebSocketState.Open)
if (IsConnected)
{
try
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disconnecting", CancellationToken.None);
await CloseWebSocketAsync();
}
catch (Exception ex)
{
_logger.Warn($"Error during disconnect: {ex.Message}");
}
}
ClearPendingRequests();
StatusChanged?.Invoke(this, ConnectionStatus.Disconnected);
RaiseStatusChanged(ConnectionStatus.Disconnected);
_logger.Info("Disconnected");
}

public async Task CheckHealthAsync()
{
if (_webSocket?.State != WebSocketState.Open)
if (!IsConnected)
{
await ReconnectWithBackoffAsync();
return;
Expand All @@ -149,14 +113,14 @@ public async Task CheckHealthAsync()
catch (Exception ex)
{
_logger.Error("Health check failed", ex);
StatusChanged?.Invoke(this, ConnectionStatus.Error);
RaiseStatusChanged(ConnectionStatus.Error);
await ReconnectWithBackoffAsync();
}
}

public async Task SendChatMessageAsync(string message)
{
if (_webSocket?.State != WebSocketState.Open)
if (!IsConnected)
throw new InvalidOperationException("Gateway connection is not open");

var req = new
Expand All @@ -179,7 +143,7 @@ public async Task RequestSessionsAsync()
/// <summary>Request usage/context info from gateway (may not be supported on all gateways).</summary>
public async Task RequestUsageAsync()
{
if (_webSocket?.State != WebSocketState.Open) return;
if (!IsConnected) return;
try
{
if (_usageStatusUnsupported)
Expand Down Expand Up @@ -270,7 +234,7 @@ public Task<bool> CompactSessionAsync(string key, int maxLines = 400)
/// <summary>Start a channel (telegram, whatsapp, etc).</summary>
public async Task<bool> StartChannelAsync(string channelName)
{
if (_webSocket?.State != WebSocketState.Open) return false;
if (!IsConnected) return false;
try
{
var req = new
Expand All @@ -294,7 +258,7 @@ public async Task<bool> StartChannelAsync(string channelName)
/// <summary>Stop a channel (telegram, whatsapp, etc).</summary>
public async Task<bool> StopChannelAsync(string channelName)
{
if (_webSocket?.State != WebSocketState.Open) return false;
if (!IsConnected) return false;
try
{
var req = new
Expand All @@ -315,31 +279,6 @@ public async Task<bool> StopChannelAsync(string channelName)
}
}

// --- Connection management ---

private async Task ReconnectWithBackoffAsync()
{
var delay = BackoffMs[Math.Min(_reconnectAttempts, BackoffMs.Length - 1)];
_reconnectAttempts++;
_logger.Warn($"Reconnecting in {delay}ms (attempt {_reconnectAttempts})");
StatusChanged?.Invoke(this, ConnectionStatus.Connecting);

try
{
await Task.Delay(delay, _cts.Token);
_webSocket?.Dispose();
_webSocket = null;
await ConnectAsync();
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.Error("Reconnect failed", ex);
StatusChanged?.Invoke(this, ConnectionStatus.Error);
// Don't recurse — the listen loop will trigger reconnect again
}
}

private async Task SendConnectMessageAsync(string? nonce = null)
{
// Use "cli" client ID for native apps - no browser security checks
Expand Down Expand Up @@ -373,31 +312,9 @@ private async Task SendConnectMessageAsync(string? nonce = null)
await SendRawAsync(JsonSerializer.Serialize(msg));
}

private async Task SendRawAsync(string message)
{
// Capture local reference to avoid TOCTOU race with reconnect/dispose
var ws = _webSocket;
if (ws?.State != WebSocketState.Open) return;

try
{
var bytes = Encoding.UTF8.GetBytes(message);
await ws.SendAsync(new ArraySegment<byte>(bytes),
WebSocketMessageType.Text, true, _cts.Token);
}
catch (ObjectDisposedException)
{
// WebSocket was disposed between state check and send
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.InvalidState)
{
_logger.Warn($"WebSocket send failed (state changed): {ex.Message}");
}
}

private async Task SendTrackedRequestAsync(string method, object? parameters = null)
{
if (_webSocket?.State != WebSocketState.Open) return;
if (!IsConnected) return;

var requestId = Guid.NewGuid().ToString();
TrackPendingRequest(requestId, method);
Expand Down Expand Up @@ -482,60 +399,6 @@ private void ClearPendingRequests()
}
}

// --- Message loop ---

private async Task ListenForMessagesAsync()
{
var buffer = new byte[16384]; // Larger buffer for big events
var sb = new StringBuilder();

try
{
while (_webSocket?.State == WebSocketState.Open && !_cts.Token.IsCancellationRequested)
{
var result = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), _cts.Token);

if (result.MessageType == WebSocketMessageType.Text)
{
sb.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
if (result.EndOfMessage)
{
ProcessMessage(sb.ToString());
sb.Clear();
}
}
else if (result.MessageType == WebSocketMessageType.Close)
{
var closeStatus = _webSocket.CloseStatus?.ToString() ?? "unknown";
var closeDesc = _webSocket.CloseStatusDescription ?? "no description";
_logger.Info($"Server closed connection: {closeStatus} - {closeDesc}");
ClearPendingRequests();
StatusChanged?.Invoke(this, ConnectionStatus.Disconnected);
break;
}
}
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
_logger.Warn("Connection closed prematurely");
ClearPendingRequests();
StatusChanged?.Invoke(this, ConnectionStatus.Disconnected);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.Error("Listen error", ex);
StatusChanged?.Invoke(this, ConnectionStatus.Error);
}

// Auto-reconnect if not intentionally disposed
if (!_disposed && !_cts.Token.IsCancellationRequested)
{
await ReconnectWithBackoffAsync();
}
}

// --- Message processing ---

private void ProcessMessage(string json)
Expand Down Expand Up @@ -594,7 +457,7 @@ private void HandleResponse(JsonElement root)
if (payload.TryGetProperty("type", out var t) && t.GetString() == "hello-ok")
{
_logger.Info("Handshake complete (hello-ok)");
StatusChanged?.Invoke(this, ConnectionStatus.Connected);
RaiseStatusChanged(ConnectionStatus.Connected);

// Request initial state after handshake
_ = Task.Run(async () =>
Expand Down Expand Up @@ -1738,21 +1601,4 @@ private static string TruncateLabel(string text, int maxLen = 60)
if (string.IsNullOrEmpty(text) || text.Length <= maxLen) return text;
return text[..(maxLen - 1)] + "…";
}

public void Dispose()
{
if (_disposed) return;
_disposed = true;

try { _cts.Cancel(); } catch { }

ClearPendingRequests();

var ws = _webSocket;
_webSocket = null;
try { ws?.Dispose(); } catch { }

// Don't dispose _cts immediately — listen loop or reconnect may still reference it.
// It will be GC'd after all pending tasks complete.
}
}
Loading
Loading