diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 196cabde5..bdafca8f6 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -259,43 +259,49 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in if (Format.TryParseInt32(parts[1], out int hashSlot) && Format.TryParseEndPoint(parts[2], out var endpoint)) { - // no point sending back to same server, and no point sending to a dead server - if (!Equals(server?.EndPoint, endpoint)) + // Check if MOVED points to same endpoint + bool isSameEndpoint = Equals(server?.EndPoint, endpoint); + if (isSameEndpoint && isMoved) { - if (bridge is null) - { - // already toast - } - else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved)) + // MOVED to same endpoint detected. + // This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies. + // The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer + // to route the connection to a different underlying server host, then retry the command. + bridge?.TryConnect(null)?.Dispose(); + } + if (bridge is null) + { + // already toast + } + else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved)) + { + bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK"); + return false; + } + else + { + if (isMoved && wasNoRedirect) { - bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK"); - return false; + if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) + { + err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. "; + } + else + { + err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. "; + } } else { - if (isMoved && wasNoRedirect) + unableToConnectError = true; + if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) { - if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) - { - err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. "; - } - else - { - err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. "; - } + err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. " + + PerfCounterHelper.GetThreadPoolAndCPUSummary(); } else { - unableToConnectError = true; - if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions) - { - err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. " - + PerfCounterHelper.GetThreadPoolAndCPUSummary(); - } - else - { - err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; - } + err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; } } } diff --git a/tests/StackExchange.Redis.Tests/MovedTestServer.cs b/tests/StackExchange.Redis.Tests/MovedTestServer.cs new file mode 100644 index 000000000..94202a1b2 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/MovedTestServer.cs @@ -0,0 +1,245 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using StackExchange.Redis.Server; + +namespace StackExchange.Redis.Tests; + +/// +/// Test Redis/Valkey server that simulates MOVED errors pointing to the same endpoint. +/// Used to verify client reconnection behavior when the server is behind DNS/load balancers/proxies. +/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command, +/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host. +/// +public class MovedTestServer : MemoryCacheRedisServer +{ + /// + /// Represents the simulated server host state behind a proxy/load balancer. + /// + private enum SimulatedHost + { + /// + /// Old server that returns MOVED errors for the trigger key (pre-migration state). + /// + OldServer, + + /// + /// New server that handles requests normally (post-migration state). + /// + NewServer, + } + + private int _setCmdCount = 0; + private int _movedResponseCount = 0; + private int _connectionCount = 0; + private SimulatedHost _currentServerHost = SimulatedHost.OldServer; + private readonly ConcurrentDictionary _clientHostAssignments = new(); + private readonly Func _getEndpoint; + private readonly string _triggerKey; + private readonly int _hashSlot; + private EndPoint? _actualEndpoint; + + public MovedTestServer(Func getEndpoint, string triggerKey = "testkey", int hashSlot = 12345) + { + _getEndpoint = getEndpoint; + _triggerKey = triggerKey; + _hashSlot = hashSlot; + } + + /// + /// Called when a new client connection is established. + /// Assigns the client to the current server host state (simulating proxy/load balancer routing). + /// + public override RedisClient CreateClient() + { + var client = base.CreateClient(); + var assignedHost = _currentServerHost; + _clientHostAssignments[client] = assignedHost; + Interlocked.Increment(ref _connectionCount); + Log($"New client connection established (assigned to {assignedHost}, total connections: {_connectionCount}), endpoint: {_actualEndpoint}"); + return client; + } + + /// + /// Handles the INFO command, reporting cluster mode as enabled. + /// + protected override TypedRedisValue Info(RedisClient client, RedisRequest request) + { + // Override INFO to report cluster mode enabled + var section = request.Count >= 2 ? request.GetString(1) : null; + + // Return cluster-enabled info + var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true + ? "# Cluster\r\ncluster_enabled:1\r\n" + : "# Server\r\nredis_version:7.0.0\r\n# Cluster\r\ncluster_enabled:1\r\n"; + + Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}"); + + return TypedRedisValue.BulkString(infoResponse); + } + + /// + /// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation. + /// + protected override TypedRedisValue Cluster(RedisClient client, RedisRequest request) + { + if (request.Count < 2) + { + return TypedRedisValue.Error("ERR wrong number of arguments for 'cluster' command"); + } + + var subcommand = request.GetString(1); + + // Handle CLUSTER SLOTS command to support cluster mode + if (subcommand.Equals("SLOTS", StringComparison.OrdinalIgnoreCase)) + { + Log($"Returning CLUSTER SLOTS response, endpoint: {_actualEndpoint}"); + return GetClusterSlotsResponse(); + } + + // Handle CLUSTER NODES command + if (subcommand.Equals("NODES", StringComparison.OrdinalIgnoreCase)) + { + Log($"Returning CLUSTER NODES response, endpoint: {_actualEndpoint}"); + return GetClusterNodesResponse(); + } + + return TypedRedisValue.Error($"ERR Unknown CLUSTER subcommand '{subcommand}'"); + } + + /// + /// Handles SET commands. Returns MOVED error for the trigger key when requested by clients + /// connected to the old server, simulating a server migration behind a proxy/load balancer. + /// + protected override TypedRedisValue Set(RedisClient client, RedisRequest request) + { + var key = request.GetKey(1); + + // Increment SET command counter for every SET call + Interlocked.Increment(ref _setCmdCount); + + // Get the client's assigned server host + if (!_clientHostAssignments.TryGetValue(client, out var clientHost)) + { + throw new InvalidOperationException("Client host assignment not found - this indicates a test infrastructure error"); + } + + // Check if this is the trigger key from an old server client + if (key == _triggerKey && clientHost == SimulatedHost.OldServer) + { + // Transition server to new host (so future connections route to new server) + _currentServerHost = SimulatedHost.NewServer; + + Interlocked.Increment(ref _movedResponseCount); + var endpoint = _getEndpoint(); + Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}' from {clientHost} client, server transitioned to {SimulatedHost.NewServer}, actual endpoint: {_actualEndpoint}"); + + // Return MOVED error pointing to same endpoint + return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}"); + } + + // Normal processing for new server clients or other keys + Log($"Processing SET normally for key '{key}' from {clientHost} client, endpoint: {_actualEndpoint}"); + return base.Set(client, request); + } + + /// + /// Returns a CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383). + /// + private TypedRedisValue GetClusterSlotsResponse() + { + // Return a minimal CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383) + // Format: Array of slot ranges, each containing: + // [start_slot, end_slot, [host, port, node_id]] + if (_actualEndpoint == null) + { + return TypedRedisValue.Error("ERR endpoint not set"); + } + + var endpoint = _getEndpoint(); + var parts = endpoint.Split(':'); + var host = parts.Length > 0 ? parts[0] : "127.0.0.1"; + var port = parts.Length > 1 ? parts[1] : "6379"; + + // Build response: [[0, 16383, [host, port, node-id]]] + // Inner array: [host, port, node-id] + var hostPortArray = TypedRedisValue.MultiBulk((ICollection)new[] + { + TypedRedisValue.BulkString(host), + TypedRedisValue.Integer(int.Parse(port)), + TypedRedisValue.BulkString("test-node-id"), + }); + // Slot range: [start_slot, end_slot, [host, port, node-id]] + var slotRange = TypedRedisValue.MultiBulk((ICollection)new[] + { + TypedRedisValue.Integer(0), // start slot + TypedRedisValue.Integer(16383), // end slot + hostPortArray, + }); + + // Outer array containing the single slot range + return TypedRedisValue.MultiBulk((ICollection)new[] { slotRange }); + } + + /// + /// Returns a CLUSTER NODES response. + /// + private TypedRedisValue GetClusterNodesResponse() + { + // Return CLUSTER NODES response + // Format: node-id host:port@cport flags master - ping-sent pong-recv config-epoch link-state slot-range + // Example: test-node-id 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-16383 + if (_actualEndpoint == null) + { + return TypedRedisValue.Error("ERR endpoint not set"); + } + + var endpoint = _getEndpoint(); + var nodesInfo = $"test-node-id {endpoint}@1{endpoint.Split(':')[1]} myself,master - 0 0 1 connected 0-16383\r\n"; + + return TypedRedisValue.BulkString(nodesInfo); + } + + /// + /// Gets the number of SET commands executed. + /// + public int SetCmdCount => _setCmdCount; + + /// + /// Gets the number of times MOVED response was returned. + /// + public int MovedResponseCount => _movedResponseCount; + + /// + /// Gets the number of client connections established. + /// + public int ConnectionCount => _connectionCount; + + /// + /// Gets the actual endpoint the server is listening on. + /// + public EndPoint? ActualEndpoint => _actualEndpoint; + + /// + /// Sets the actual endpoint the server is listening on. + /// This should be called externally after the server starts. + /// + public void SetActualEndpoint(EndPoint endPoint) + { + _actualEndpoint = endPoint; + Log($"MovedTestServer endpoint set to {endPoint}"); + } + + /// + /// Resets all counters for test reusability. + /// + public void ResetCounters() + { + Interlocked.Exchange(ref _setCmdCount, 0); + Interlocked.Exchange(ref _movedResponseCount, 0); + Interlocked.Exchange(ref _connectionCount, 0); + } +} diff --git a/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs new file mode 100644 index 000000000..3c4319712 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs @@ -0,0 +1,113 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using StackExchange.Redis.Server; +using Xunit; + +namespace StackExchange.Redis.Tests; + +/// +/// Integration tests for MOVED-to-same-endpoint error handling. +/// When a MOVED error points to the same endpoint, the client should reconnect before retrying, +/// allowing the DNS record/proxy/load balancer to route to a different underlying server host. +/// +public class MovedToSameEndpointTests +{ + /// + /// Gets a free port by temporarily binding to port 0 and retrieving the OS-assigned port. + /// + private static int GetFreePort() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + /// + /// Integration test: Verifies that when a MOVED error points to the same endpoint, + /// the client reconnects and successfully retries the operation. + /// + /// Test scenario: + /// 1. Client connects to test server + /// 2. Client sends SET command for trigger key + /// 3. Server returns MOVED error pointing to same endpoint + /// 4. Client detects MOVED-to-same-endpoint and triggers reconnection + /// 5. Client retries SET command after reconnection + /// 6. Server processes SET normally on retry + /// + /// Expected behavior: + /// - SET command count should increase by 2 (initial attempt + retry) + /// - MOVED response count should increase by 1 (only on first attempt) + /// - Connection count should increase by 1 (reconnection after MOVED) + /// - Final SET operation should succeed with value stored + /// + [Fact] + public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds() + { + var keyName = "MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds"; + // Arrange: Get a free port to avoid conflicts when tests run in parallel + var port = GetFreePort(); + var listenEndpoint = new IPEndPoint(IPAddress.Loopback, port); + + var testServer = new MovedTestServer( + getEndpoint: () => Format.ToString(listenEndpoint), + triggerKey: keyName); + + var socketServer = new RespSocketServer(testServer); + + try + { + // Start listening on the free port + socketServer.Listen(listenEndpoint); + testServer.SetActualEndpoint(listenEndpoint); + + // Wait a moment for the server to fully start + await Task.Delay(100); + + // Act: Connect to the test server + var config = new ConfigurationOptions + { + EndPoints = { listenEndpoint }, + ConnectTimeout = 10000, + SyncTimeout = 5000, + AsyncTimeout = 5000, + }; + + await using var conn = await ConnectionMultiplexer.ConnectAsync(config); + var db = conn.GetDatabase(); + + // Record baseline counters after initial connection + var initialSetCmdCount = testServer.SetCmdCount; + var initialMovedResponseCount = testServer.MovedResponseCount; + var initialConnectionCount = testServer.ConnectionCount; + // Execute SET command: This should receive MOVED → reconnect → retry → succeed + var setResult = await db.StringSetAsync(keyName, "testvalue"); + + // Assert: Verify SET command succeeded + Assert.True(setResult, "SET command should return true (OK)"); + + // Verify the value was actually stored (proving retry succeeded) + var retrievedValue = await db.StringGetAsync(keyName); + Assert.Equal("testvalue", (string?)retrievedValue); + + // Verify SET command was executed twice: once with MOVED response, once successfully + var expectedSetCmdCount = initialSetCmdCount + 2; + Assert.Equal(expectedSetCmdCount, testServer.SetCmdCount); + + // Verify MOVED response was returned exactly once + var expectedMovedResponseCount = initialMovedResponseCount + 1; + Assert.Equal(expectedMovedResponseCount, testServer.MovedResponseCount); + + // Verify reconnection occurred: connection count should have increased by 1 + var expectedConnectionCount = initialConnectionCount + 1; + Assert.Equal(expectedConnectionCount, testServer.ConnectionCount); + } + finally + { + socketServer?.Dispose(); + } + } +} diff --git a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj index f6e38236b..1991501bc 100644 --- a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj +++ b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj @@ -19,6 +19,7 @@ +