Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,43 +259,49 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
if (Format.TryParseInt32(parts[1], out int hashSlot)
&& Format.TryParseEndPoint(parts[2], out var endpoint))
{
// no point sending back to same server, and no point sending to a dead server
if (!Equals(server?.EndPoint, endpoint))
// Check if MOVED points to same endpoint
bool isSameEndpoint = Equals(server?.EndPoint, endpoint);
if (isSameEndpoint && isMoved)
{
if (bridge is null)
{
// already toast
}
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
// MOVED to same endpoint detected.
// This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies.
// The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer
// to route the connection to a different underlying server host, then retry the command.
bridge?.TryConnect(null)?.Dispose();
Comment on lines 265 to +270
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it relies on timing between the connection being detected as closed, and the parser - but the parser is IIRC "inline" here, i.e. we haven't finished reading yet. I need to think very carefully here about whether this is reliable ... I"m not sure either way, and I invite your input!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional concern: from memory (@NickCraver may remember more), DNS resolution in .NET has a habit of being cached for the process duration, unless explicit steps are taken - and I'm not seeing any explicit steps. I'm concerned that this may result in instant reconnection on the old cached (in-proc) DNS entry; definitely something to check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philon-msft tells me that the DNS part might be safer than I recall - will discuss, but: might not be a problem

}
if (bridge is null)
{
// already toast
}
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
{
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
return false;
}
else
{
if (isMoved && wasNoRedirect)
{
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
return false;
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
{
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
}
else
{
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
}
}
else
{
if (isMoved && wasNoRedirect)
unableToConnectError = true;
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
{
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
{
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
}
else
{
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
}
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
}
else
{
unableToConnectError = true;
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
{
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
}
else
{
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
}
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
}
}
}
Expand Down
245 changes: 245 additions & 0 deletions tests/StackExchange.Redis.Tests/MovedTestServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Server;

namespace StackExchange.Redis.Tests;

/// <summary>
/// Test Redis/Valkey server that simulates MOVED errors pointing to the same endpoint.
/// Used to verify client reconnection behavior when the server is behind DNS/load balancers/proxies.
/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command,
/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host.
/// </summary>
public class MovedTestServer : MemoryCacheRedisServer
{
/// <summary>
/// Represents the simulated server host state behind a proxy/load balancer.
/// </summary>
private enum SimulatedHost
{
/// <summary>
/// Old server that returns MOVED errors for the trigger key (pre-migration state).
/// </summary>
OldServer,

/// <summary>
/// New server that handles requests normally (post-migration state).
/// </summary>
NewServer,
}

private int _setCmdCount = 0;
private int _movedResponseCount = 0;
private int _connectionCount = 0;
private SimulatedHost _currentServerHost = SimulatedHost.OldServer;
private readonly ConcurrentDictionary<RedisClient, SimulatedHost> _clientHostAssignments = new();
private readonly Func<string> _getEndpoint;
private readonly string _triggerKey;
private readonly int _hashSlot;
private EndPoint? _actualEndpoint;

public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey", int hashSlot = 12345)
{
_getEndpoint = getEndpoint;
_triggerKey = triggerKey;
_hashSlot = hashSlot;
}

/// <summary>
/// Called when a new client connection is established.
/// Assigns the client to the current server host state (simulating proxy/load balancer routing).
/// </summary>
public override RedisClient CreateClient()
{
var client = base.CreateClient();
var assignedHost = _currentServerHost;
_clientHostAssignments[client] = assignedHost;
Interlocked.Increment(ref _connectionCount);
Log($"New client connection established (assigned to {assignedHost}, total connections: {_connectionCount}), endpoint: {_actualEndpoint}");
return client;
}

/// <summary>
/// Handles the INFO command, reporting cluster mode as enabled.
/// </summary>
protected override TypedRedisValue Info(RedisClient client, RedisRequest request)
{
// Override INFO to report cluster mode enabled
var section = request.Count >= 2 ? request.GetString(1) : null;

// Return cluster-enabled info
var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true
? "# Cluster\r\ncluster_enabled:1\r\n"
: "# Server\r\nredis_version:7.0.0\r\n# Cluster\r\ncluster_enabled:1\r\n";

Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}");

return TypedRedisValue.BulkString(infoResponse);
}

/// <summary>
/// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation.
/// </summary>
protected override TypedRedisValue Cluster(RedisClient client, RedisRequest request)
{
if (request.Count < 2)
{
return TypedRedisValue.Error("ERR wrong number of arguments for 'cluster' command");
}

var subcommand = request.GetString(1);

// Handle CLUSTER SLOTS command to support cluster mode
if (subcommand.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
{
Log($"Returning CLUSTER SLOTS response, endpoint: {_actualEndpoint}");
return GetClusterSlotsResponse();
}

// Handle CLUSTER NODES command
if (subcommand.Equals("NODES", StringComparison.OrdinalIgnoreCase))
{
Log($"Returning CLUSTER NODES response, endpoint: {_actualEndpoint}");
return GetClusterNodesResponse();
}

return TypedRedisValue.Error($"ERR Unknown CLUSTER subcommand '{subcommand}'");
}

/// <summary>
/// Handles SET commands. Returns MOVED error for the trigger key when requested by clients
/// connected to the old server, simulating a server migration behind a proxy/load balancer.
/// </summary>
protected override TypedRedisValue Set(RedisClient client, RedisRequest request)
{
var key = request.GetKey(1);

// Increment SET command counter for every SET call
Interlocked.Increment(ref _setCmdCount);

// Get the client's assigned server host
if (!_clientHostAssignments.TryGetValue(client, out var clientHost))
{
throw new InvalidOperationException("Client host assignment not found - this indicates a test infrastructure error");
}

// Check if this is the trigger key from an old server client
if (key == _triggerKey && clientHost == SimulatedHost.OldServer)
{
// Transition server to new host (so future connections route to new server)
_currentServerHost = SimulatedHost.NewServer;

Interlocked.Increment(ref _movedResponseCount);
var endpoint = _getEndpoint();
Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}' from {clientHost} client, server transitioned to {SimulatedHost.NewServer}, actual endpoint: {_actualEndpoint}");

// Return MOVED error pointing to same endpoint
return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}");
}

// Normal processing for new server clients or other keys
Log($"Processing SET normally for key '{key}' from {clientHost} client, endpoint: {_actualEndpoint}");
return base.Set(client, request);
}

/// <summary>
/// Returns a CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383).
/// </summary>
private TypedRedisValue GetClusterSlotsResponse()
{
// Return a minimal CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383)
// Format: Array of slot ranges, each containing:
// [start_slot, end_slot, [host, port, node_id]]
if (_actualEndpoint == null)
{
return TypedRedisValue.Error("ERR endpoint not set");
}

var endpoint = _getEndpoint();
var parts = endpoint.Split(':');
var host = parts.Length > 0 ? parts[0] : "127.0.0.1";
var port = parts.Length > 1 ? parts[1] : "6379";

// Build response: [[0, 16383, [host, port, node-id]]]
// Inner array: [host, port, node-id]
var hostPortArray = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
{
TypedRedisValue.BulkString(host),
TypedRedisValue.Integer(int.Parse(port)),
TypedRedisValue.BulkString("test-node-id"),
});
// Slot range: [start_slot, end_slot, [host, port, node-id]]
var slotRange = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
{
TypedRedisValue.Integer(0), // start slot
TypedRedisValue.Integer(16383), // end slot
hostPortArray,
});

// Outer array containing the single slot range
return TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[] { slotRange });
}

/// <summary>
/// Returns a CLUSTER NODES response.
/// </summary>
private TypedRedisValue GetClusterNodesResponse()
{
// Return CLUSTER NODES response
// Format: node-id host:port@cport flags master - ping-sent pong-recv config-epoch link-state slot-range
// Example: test-node-id 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-16383
if (_actualEndpoint == null)
{
return TypedRedisValue.Error("ERR endpoint not set");
}

var endpoint = _getEndpoint();
var nodesInfo = $"test-node-id {endpoint}@1{endpoint.Split(':')[1]} myself,master - 0 0 1 connected 0-16383\r\n";

return TypedRedisValue.BulkString(nodesInfo);
}

/// <summary>
/// Gets the number of SET commands executed.
/// </summary>
public int SetCmdCount => _setCmdCount;

/// <summary>
/// Gets the number of times MOVED response was returned.
/// </summary>
public int MovedResponseCount => _movedResponseCount;

/// <summary>
/// Gets the number of client connections established.
/// </summary>
public int ConnectionCount => _connectionCount;

/// <summary>
/// Gets the actual endpoint the server is listening on.
/// </summary>
public EndPoint? ActualEndpoint => _actualEndpoint;

/// <summary>
/// Sets the actual endpoint the server is listening on.
/// This should be called externally after the server starts.
/// </summary>
public void SetActualEndpoint(EndPoint endPoint)
{
_actualEndpoint = endPoint;
Log($"MovedTestServer endpoint set to {endPoint}");
}

/// <summary>
/// Resets all counters for test reusability.
/// </summary>
public void ResetCounters()
{
Interlocked.Exchange(ref _setCmdCount, 0);
Interlocked.Exchange(ref _movedResponseCount, 0);
Interlocked.Exchange(ref _connectionCount, 0);
}
}
Loading
Loading