From 4348299ea787b29f87ea07a5a8ecf540f5cc6770 Mon Sep 17 00:00:00 2001 From: Abbas Sabra Date: Fri, 20 Mar 2026 15:11:21 +0100 Subject: [PATCH 1/2] Add test reproducing concurrent sendMessage race condition When two threads call sendMessage() concurrently on the same StdioClientTransport, the unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED via its CAS guard, causing "Failed to enqueue message". This test reproduces the race: 19/20 repetitions fail. Closes #875 --- .../StdioClientTransportConcurrencyTest.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java new file mode 100644 index 000000000..de70a10fd --- /dev/null +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; +import reactor.core.publisher.Mono; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Reproduces a race condition in StdioClientTransport.sendMessage() when two threads call + * it concurrently on the same transport instance. + * + *

+ * The outbound sink (Sinks.many().unicast()) is wrapped by Reactor's SinkManySerialized, + * which uses a CAS-based guard. When two threads call tryEmitNext concurrently, the CAS + * loser immediately gets FAIL_NON_SERIALIZED, causing "Failed to enqueue message". + * + *

+ * This occurs when an MCP server proxies concurrent tool calls to a downstream MCP server + * via stdio transport — each call is dispatched on a separate thread and both call + * sendMessage() on the same transport. + * + * @see Issue + * #875 + */ +class StdioClientTransportConcurrencyTest { + + private StdioClientTransport transport; + + @AfterEach + void tearDown() { + if (transport != null) { + transport.closeGracefully().block(Duration.ofSeconds(5)); + } + } + + @RepeatedTest(20) + void concurrent_sendMessage_should_not_fail() throws Exception { + var serverParams = ServerParameters.builder("cat").env(Map.of()).build(); + transport = new StdioClientTransport(serverParams, new GsonMcpJsonMapper()); + + transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5)); + + var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1", + Map.of("name", "tool_a", "arguments", Map.of())); + var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2", + Map.of("name", "tool_b", "arguments", Map.of())); + + var barrier = new CyclicBarrier(2); + var errors = new CopyOnWriteArrayList(); + var latch = new CountDownLatch(2); + + for (var msg : new McpSchema.JSONRPCMessage[] { msg1, msg2 }) { + new Thread(() -> { + try { + barrier.await(2, TimeUnit.SECONDS); + transport.sendMessage(msg).block(Duration.ofSeconds(2)); + } + catch (Exception e) { + errors.add(e); + } + finally { + latch.countDown(); + } + }).start(); + } + + latch.await(5, TimeUnit.SECONDS); + + assertThat(errors) + .as("Concurrent sendMessage calls should both succeed, but the unicast sink " + + "rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext") + .isEmpty(); + } + +} From 705b2e8358f65d445bc75e3e977f554c9ba8b778 Mon Sep 17 00:00:00 2001 From: Abbas Sabra Date: Fri, 20 Mar 2026 15:11:58 +0100 Subject: [PATCH 2/2] Fix concurrent sendMessage race by retrying on FAIL_NON_SERIALIZED Replace tryEmitNext (fail-fast) with emitNext + busyLooping(100ms) in StdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. The contention window is microseconds (single CAS operation), so the 100ms duration is just a generous upper bound for pathological cases like GC pauses. Before: 19/20 test repetitions fail After: 20/20 pass Closes #875 --- .../client/transport/StdioClientTransport.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java index 1b4eaca97..f5355bb29 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java @@ -227,16 +227,17 @@ private void handleIncomingErrors() { @Override public Mono sendMessage(JSONRPCMessage message) { - if (this.outboundSink.tryEmitNext(message).isSuccess()) { - // TODO: essentially we could reschedule ourselves in some time and make - // another attempt with the already read data but pause reading until - // success - // In this approach we delegate the retry and the backpressure onto the - // caller. This might be enough for most cases. + try { + // busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from + // another thread) instead of failing immediately. The contention window is + // microseconds (single CAS), so the spin resolves almost instantly; the + // duration is just a generous upper bound for pathological cases like GC + // pauses. + this.outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100))); return Mono.empty(); } - else { - return Mono.error(new RuntimeException("Failed to enqueue message")); + catch (Sinks.EmissionException e) { + return Mono.error(new RuntimeException("Failed to enqueue message", e)); } }