From fae5c094ec1774bbd0854651c6b1f6e328fe9528 Mon Sep 17 00:00:00 2001 From: barshaul Date: Tue, 3 Feb 2026 11:59:55 +0000 Subject: [PATCH 1/4] Handle MOVED error pointing to same endpoint by triggering reconnection before retrying the request. --- src/StackExchange.Redis/ResultProcessor.cs | 62 ++--- .../MovedTestServer.cs | 211 ++++++++++++++++++ .../MovedToSameEndpointTests.cs | 113 ++++++++++ .../StackExchange.Redis.Tests.csproj | 1 + 4 files changed, 359 insertions(+), 28 deletions(-) create mode 100644 tests/StackExchange.Redis.Tests/MovedTestServer.cs create mode 100644 tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 196cabde5..bdafca8f6 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -259,43 +259,49 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in if (Format.TryParseInt32(parts[1], out int hashSlot) && Format.TryParseEndPoint(parts[2], out var endpoint)) { - // no point sending back to same server, and no point sending to a dead server - if (!Equals(server?.EndPoint, endpoint)) + // Check if MOVED points to same endpoint + bool isSameEndpoint = Equals(server?.EndPoint, endpoint); + if (isSameEndpoint && isMoved) { - if (bridge is null) - { - // already toast - } - else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved)) + // MOVED to same endpoint detected. + // This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies. + // The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer + // to route the connection to a different underlying server host, then retry the command. + bridge?.TryConnect(null)?.Dispose(); + } + if (bridge is null) + { + // already toast + } + else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved)) + { + bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK"); + return false; + } + else + { + if (isMoved && wasNoRedirect) { - bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK"); - return false; + if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) + { + err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. "; + } + else + { + err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. "; + } } else { - if (isMoved && wasNoRedirect) + unableToConnectError = true; + if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) { - if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) - { - err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. "; - } - else - { - err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. "; - } + err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. " + + PerfCounterHelper.GetThreadPoolAndCPUSummary(); } else { - unableToConnectError = true; - if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) - { - err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. " - + PerfCounterHelper.GetThreadPoolAndCPUSummary(); - } - else - { - err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; - } + err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; } } } diff --git a/tests/StackExchange.Redis.Tests/MovedTestServer.cs b/tests/StackExchange.Redis.Tests/MovedTestServer.cs new file mode 100644 index 000000000..5512f290d --- /dev/null +++ b/tests/StackExchange.Redis.Tests/MovedTestServer.cs @@ -0,0 +1,211 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using StackExchange.Redis.Server; + +namespace StackExchange.Redis.Tests; + +/// +/// Test Redis/Valkey server that simulates MOVED errors pointing to the same endpoint. +/// Used to verify client reconnection behavior when the server is behind DNS/load balancers/proxies. +/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command, +/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host. +/// +public class MovedTestServer : MemoryCacheRedisServer +{ + private int _setCmdCount = 0; + private int _movedResponseCount = 0; + private int _connectionCount = 0; + private readonly Func _getEndpoint; + private readonly string _triggerKey; + private readonly int _hashSlot; + private EndPoint? _actualEndpoint; + + public MovedTestServer(Func getEndpoint, string triggerKey = "testkey", int hashSlot = 12345) + { + _getEndpoint = getEndpoint; + _triggerKey = triggerKey; + _hashSlot = hashSlot; + } + + /// + /// Called when a new client connection is established. Increments the connection counter. + /// + public override RedisClient CreateClient() + { + Interlocked.Increment(ref _connectionCount); + Log($"New client connection established (total connections: {_connectionCount}), endpoint: {_actualEndpoint}"); + return base.CreateClient(); + } + + /// + /// Handles the INFO command, reporting cluster mode as enabled. + /// + protected override TypedRedisValue Info(RedisClient client, RedisRequest request) + { + // Override INFO to report cluster mode enabled + var section = request.Count >= 2 ? request.GetString(1) : null; + + // Return cluster-enabled info + var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true + ? "# Cluster\r\ncluster_enabled:1\r\n" + : "# Server\r\nredis_version:7.0.0\r\n# Cluster\r\ncluster_enabled:1\r\n"; + + Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}"); + + return TypedRedisValue.BulkString(infoResponse); + } + + /// + /// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation. + /// + protected override TypedRedisValue Cluster(RedisClient client, RedisRequest request) + { + if (request.Count < 2) + { + return TypedRedisValue.Error("ERR wrong number of arguments for 'cluster' command"); + } + + var subcommand = request.GetString(1); + + // Handle CLUSTER SLOTS command to support cluster mode + if (subcommand.Equals("SLOTS", StringComparison.OrdinalIgnoreCase)) + { + Log($"Returning CLUSTER SLOTS response, endpoint: {_actualEndpoint}"); + return GetClusterSlotsResponse(); + } + + // Handle CLUSTER NODES command + if (subcommand.Equals("NODES", StringComparison.OrdinalIgnoreCase)) + { + Log($"Returning CLUSTER NODES response, endpoint: {_actualEndpoint}"); + return GetClusterNodesResponse(); + } + + return TypedRedisValue.Error($"ERR Unknown CLUSTER subcommand '{subcommand}'"); + } + + /// + /// Handles SET commands. Returns MOVED error on first attempt for the trigger key, + /// then processes normally on subsequent attempts. + /// + protected override TypedRedisValue Set(RedisClient client, RedisRequest request) + { + var key = request.GetKey(1); + + // Only trigger MOVED on FIRST attempt for the trigger key + if (key == _triggerKey && Interlocked.Increment(ref _setCmdCount) == 1) + { + Interlocked.Increment(ref _movedResponseCount); + var endpoint = _getEndpoint(); + Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}', actual endpoint: {_actualEndpoint}"); + + // Return MOVED error pointing to same endpoint + // Don't close the connection - let the client handle reconnection naturally + return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}"); + } + + // Normal processing on retry or other keys + Log($"Processing SET normally for key '{key}', endpoint: {_actualEndpoint}"); + return base.Set(client, request); + } + + /// + /// Returns a CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383). + /// + private TypedRedisValue GetClusterSlotsResponse() + { + // Return a minimal CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383) + // Format: Array of slot ranges, each containing: + // [start_slot, end_slot, [host, port, node_id]] + if (_actualEndpoint == null) + { + return TypedRedisValue.Error("ERR endpoint not set"); + } + + var endpoint = _getEndpoint(); + var parts = endpoint.Split(':'); + var host = parts.Length > 0 ? parts[0] : "127.0.0.1"; + var port = parts.Length > 1 ? parts[1] : "6379"; + + // Build response: [[0, 16383, [host, port, node-id]]] + // Inner array: [host, port, node-id] + var hostPortArray = TypedRedisValue.MultiBulk((ICollection)new[] + { + TypedRedisValue.BulkString(host), + TypedRedisValue.Integer(int.Parse(port)), + TypedRedisValue.BulkString("test-node-id"), + }); + // Slot range: [start_slot, end_slot, [host, port, node-id]] + var slotRange = TypedRedisValue.MultiBulk((ICollection)new[] + { + TypedRedisValue.Integer(0), // start slot + TypedRedisValue.Integer(16383), // end slot + hostPortArray, + }); + + // Outer array containing the single slot range + return TypedRedisValue.MultiBulk((ICollection)new[] { slotRange }); + } + + /// + /// Returns a CLUSTER NODES response. + /// + private TypedRedisValue GetClusterNodesResponse() + { + // Return CLUSTER NODES response + // Format: node-id host:port@cport flags master - ping-sent pong-recv config-epoch link-state slot-range + // Example: test-node-id 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-16383 + if (_actualEndpoint == null) + { + return TypedRedisValue.Error("ERR endpoint not set"); + } + + var endpoint = _getEndpoint(); + var nodesInfo = $"test-node-id {endpoint}@1{endpoint.Split(':')[1]} myself,master - 0 0 1 connected 0-16383\r\n"; + + return TypedRedisValue.BulkString(nodesInfo); + } + + /// + /// Gets the number of SET commands executed. + /// + public int SetCmdCount => _setCmdCount; + + /// + /// Gets the number of times MOVED response was returned. + /// + public int MovedResponseCount => _movedResponseCount; + + /// + /// Gets the number of client connections established. + /// + public int ConnectionCount => _connectionCount; + + /// + /// Gets the actual endpoint the server is listening on. + /// + public EndPoint? ActualEndpoint => _actualEndpoint; + + /// + /// Sets the actual endpoint the server is listening on. + /// This should be called externally after the server starts. + /// + public void SetActualEndpoint(EndPoint endPoint) + { + _actualEndpoint = endPoint; + Log($"MovedTestServer endpoint set to {endPoint}"); + } + + /// + /// Resets all counters for test reusability. + /// + public void ResetCounters() + { + Interlocked.Exchange(ref _setCmdCount, 0); + Interlocked.Exchange(ref _movedResponseCount, 0); + Interlocked.Exchange(ref _connectionCount, 0); + } +} diff --git a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs new file mode 100644 index 000000000..97d29b433 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs @@ -0,0 +1,113 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using StackExchange.Redis.Server; +using Xunit; + +namespace StackExchange.Redis.Tests; + +/// +/// Integration tests for MOVED-to-same-endpoint error handling. +/// When a MOVED error points to the same endpoint, the client should reconnect before retrying, +/// allowing the DNS record/proxy/load balancer to route to a different underlying server host. +/// +public class MovedToSameEndpointTests +{ + /// + /// Gets a free port by temporarily binding to port 0 and retrieving the OS-assigned port. + /// + private static int GetFreePort() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + /// + /// Integration test: Verifies that when a MOVED error points to the same endpoint, + /// the client reconnects and successfully retries the operation. + /// + /// Test scenario: + /// 1. Client connects to test server + /// 2. Client sends SET command for trigger key + /// 3. Server returns MOVED error pointing to same endpoint + /// 4. Client detects MOVED-to-same-endpoint and triggers reconnection + /// 5. Client retries SET command after reconnection + /// 6. Server processes SET normally on retry + /// + /// Expected behavior: + /// - SET command count should increase by 2 (initial attempt + retry) + /// - MOVED response count should increase by 1 (only on first attempt) + /// - Connection count should increase by 1 (reconnection after MOVED) + /// - Final SET operation should succeed with value stored + /// + [Fact] + public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds() + { + // Arrange: Get a free port to avoid conflicts when tests run in parallel + var port = GetFreePort(); + var listenEndpoint = new IPEndPoint(IPAddress.Loopback, port); + + var testServer = new MovedTestServer( + getEndpoint: () => Format.ToString(listenEndpoint), + triggerKey: "testkey"); + + var socketServer = new RespSocketServer(testServer); + + try + { + // Start listening on the free port + socketServer.Listen(listenEndpoint); + testServer.SetActualEndpoint(listenEndpoint); + + // Wait a moment for the server to fully start + await Task.Delay(100); + + // Act: Connect to the test server + var config = new ConfigurationOptions + { + EndPoints = { listenEndpoint }, + ConnectTimeout = 5000, + SyncTimeout = 5000, + AsyncTimeout = 5000, + }; + + await using var conn = await ConnectionMultiplexer.ConnectAsync(config); + var db = conn.GetDatabase(); + + // Record baseline counters after initial connection + var initialSetCmdCount = testServer.SetCmdCount; + var initialMovedResponseCount = testServer.MovedResponseCount; + var initialConnectionCount = testServer.ConnectionCount; + + // Execute SET command: This should receive MOVED → reconnect → retry → succeed + var setResult = await db.StringSetAsync("testkey", "testvalue"); + + // Assert: Verify SET command succeeded + Assert.True(setResult, "SET command should return true (OK)"); + + // Verify the value was actually stored (proving retry succeeded) + var retrievedValue = await db.StringGetAsync("testkey"); + Assert.Equal("testvalue", (string?)retrievedValue); + + // Verify SET command was executed twice: once with MOVED response, once successfully + var expectedSetCmdCount = initialSetCmdCount + 2; + Assert.Equal(expectedSetCmdCount, testServer.SetCmdCount); + + // Verify MOVED response was returned exactly once + var expectedMovedResponseCount = initialMovedResponseCount + 1; + Assert.Equal(expectedMovedResponseCount, testServer.MovedResponseCount); + + // Verify reconnection occurred: connection count should have increased by 1 + var expectedConnectionCount = initialConnectionCount + 1; + Assert.Equal(expectedConnectionCount, testServer.ConnectionCount); + } + finally + { + socketServer?.Dispose(); + } + } +} diff --git a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj index f6e38236b..1991501bc 100644 --- a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj +++ b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj @@ -19,6 +19,7 @@ + From 4200efe2f8eed18757f3b04247193825f79cff72 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Tue, 3 Feb 2026 12:43:45 +0000 Subject: [PATCH 2/4] Better stimulate proxy/LB --- .../MovedTestServer.cs | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/MovedTestServer.cs b/tests/StackExchange.Redis.Tests/MovedTestServer.cs index 5512f290d..94202a1b2 100644 --- a/tests/StackExchange.Redis.Tests/MovedTestServer.cs +++ b/tests/StackExchange.Redis.Tests/MovedTestServer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; using System.Threading; @@ -15,9 +16,27 @@ namespace StackExchange.Redis.Tests; /// public class MovedTestServer : MemoryCacheRedisServer { + /// + /// Represents the simulated server host state behind a proxy/load balancer. + /// + private enum SimulatedHost + { + /// + /// Old server that returns MOVED errors for the trigger key (pre-migration state). + /// + OldServer, + + /// + /// New server that handles requests normally (post-migration state). + /// + NewServer, + } + private int _setCmdCount = 0; private int _movedResponseCount = 0; private int _connectionCount = 0; + private SimulatedHost _currentServerHost = SimulatedHost.OldServer; + private readonly ConcurrentDictionary _clientHostAssignments = new(); private readonly Func _getEndpoint; private readonly string _triggerKey; private readonly int _hashSlot; @@ -31,13 +50,17 @@ public MovedTestServer(Func getEndpoint, string triggerKey = "testkey", } /// - /// Called when a new client connection is established. Increments the connection counter. + /// Called when a new client connection is established. + /// Assigns the client to the current server host state (simulating proxy/load balancer routing). /// public override RedisClient CreateClient() { + var client = base.CreateClient(); + var assignedHost = _currentServerHost; + _clientHostAssignments[client] = assignedHost; Interlocked.Increment(ref _connectionCount); - Log($"New client connection established (total connections: {_connectionCount}), endpoint: {_actualEndpoint}"); - return base.CreateClient(); + Log($"New client connection established (assigned to {assignedHost}, total connections: {_connectionCount}), endpoint: {_actualEndpoint}"); + return client; } /// @@ -88,27 +111,38 @@ protected override TypedRedisValue Cluster(RedisClient client, RedisRequest requ } /// - /// Handles SET commands. Returns MOVED error on first attempt for the trigger key, - /// then processes normally on subsequent attempts. + /// Handles SET commands. Returns MOVED error for the trigger key when requested by clients + /// connected to the old server, simulating a server migration behind a proxy/load balancer. /// protected override TypedRedisValue Set(RedisClient client, RedisRequest request) { var key = request.GetKey(1); - // Only trigger MOVED on FIRST attempt for the trigger key - if (key == _triggerKey && Interlocked.Increment(ref _setCmdCount) == 1) + // Increment SET command counter for every SET call + Interlocked.Increment(ref _setCmdCount); + + // Get the client's assigned server host + if (!_clientHostAssignments.TryGetValue(client, out var clientHost)) { + throw new InvalidOperationException("Client host assignment not found - this indicates a test infrastructure error"); + } + + // Check if this is the trigger key from an old server client + if (key == _triggerKey && clientHost == SimulatedHost.OldServer) + { + // Transition server to new host (so future connections route to new server) + _currentServerHost = SimulatedHost.NewServer; + Interlocked.Increment(ref _movedResponseCount); var endpoint = _getEndpoint(); - Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}', actual endpoint: {_actualEndpoint}"); + Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}' from {clientHost} client, server transitioned to {SimulatedHost.NewServer}, actual endpoint: {_actualEndpoint}"); // Return MOVED error pointing to same endpoint - // Don't close the connection - let the client handle reconnection naturally return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}"); } - // Normal processing on retry or other keys - Log($"Processing SET normally for key '{key}', endpoint: {_actualEndpoint}"); + // Normal processing for new server clients or other keys + Log($"Processing SET normally for key '{key}' from {clientHost} client, endpoint: {_actualEndpoint}"); return base.Set(client, request); } From 9c32b440826f081cd6f3e4d90f8cfb7a2f1b0fb4 Mon Sep 17 00:00:00 2001 From: barshaul Date: Tue, 3 Feb 2026 13:23:32 +0000 Subject: [PATCH 3/4] Increase timeout --- tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs index 97d29b433..d39ca82c8 100644 --- a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs +++ b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs @@ -70,7 +70,7 @@ public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds( var config = new ConfigurationOptions { EndPoints = { listenEndpoint }, - ConnectTimeout = 5000, + ConnectTimeout = 10000, SyncTimeout = 5000, AsyncTimeout = 5000, }; From 93609b0af94f6e43d1d630cf17213d9839242bd1 Mon Sep 17 00:00:00 2001 From: barshaul Date: Tue, 3 Feb 2026 13:43:16 +0000 Subject: [PATCH 4/4] Fixed key name to prevent collisions --- .../StackExchange.Redis.Tests/MovedToSameEndpointTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs index d39ca82c8..3c4319712 100644 --- a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs +++ b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs @@ -47,13 +47,14 @@ private static int GetFreePort() [Fact] public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds() { + var keyName = "MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds"; // Arrange: Get a free port to avoid conflicts when tests run in parallel var port = GetFreePort(); var listenEndpoint = new IPEndPoint(IPAddress.Loopback, port); var testServer = new MovedTestServer( getEndpoint: () => Format.ToString(listenEndpoint), - triggerKey: "testkey"); + triggerKey: keyName); var socketServer = new RespSocketServer(testServer); @@ -82,15 +83,14 @@ public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds( var initialSetCmdCount = testServer.SetCmdCount; var initialMovedResponseCount = testServer.MovedResponseCount; var initialConnectionCount = testServer.ConnectionCount; - // Execute SET command: This should receive MOVED → reconnect → retry → succeed - var setResult = await db.StringSetAsync("testkey", "testvalue"); + var setResult = await db.StringSetAsync(keyName, "testvalue"); // Assert: Verify SET command succeeded Assert.True(setResult, "SET command should return true (OK)"); // Verify the value was actually stored (proving retry succeeded) - var retrievedValue = await db.StringGetAsync("testkey"); + var retrievedValue = await db.StringGetAsync(keyName); Assert.Equal("testvalue", (string?)retrievedValue); // Verify SET command was executed twice: once with MOVED response, once successfully