diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java index 1b4eaca97..f5355bb29 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java @@ -227,16 +227,17 @@ private void handleIncomingErrors() { @Override public Mono sendMessage(JSONRPCMessage message) { - if (this.outboundSink.tryEmitNext(message).isSuccess()) { - // TODO: essentially we could reschedule ourselves in some time and make - // another attempt with the already read data but pause reading until - // success - // In this approach we delegate the retry and the backpressure onto the - // caller. This might be enough for most cases. + try { + // busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from + // another thread) instead of failing immediately. The contention window is + // microseconds (single CAS), so the spin resolves almost instantly; the + // duration is just a generous upper bound for pathological cases like GC + // pauses. + this.outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100))); return Mono.empty(); } - else { - return Mono.error(new RuntimeException("Failed to enqueue message")); + catch (Sinks.EmissionException e) { + return Mono.error(new RuntimeException("Failed to enqueue message", e)); } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java new file mode 100644 index 000000000..de70a10fd --- /dev/null +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportConcurrencyTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; +import reactor.core.publisher.Mono; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Reproduces a race condition in StdioClientTransport.sendMessage() when two threads call + * it concurrently on the same transport instance. + * + *

+ * The outbound sink (Sinks.many().unicast()) is wrapped by Reactor's SinkManySerialized, + * which uses a CAS-based guard. When two threads call tryEmitNext concurrently, the CAS + * loser immediately gets FAIL_NON_SERIALIZED, causing "Failed to enqueue message". + * + *

+ * This occurs when an MCP server proxies concurrent tool calls to a downstream MCP server + * via stdio transport — each call is dispatched on a separate thread and both call + * sendMessage() on the same transport. + * + * @see Issue + * #875 + */ +class StdioClientTransportConcurrencyTest { + + private StdioClientTransport transport; + + @AfterEach + void tearDown() { + if (transport != null) { + transport.closeGracefully().block(Duration.ofSeconds(5)); + } + } + + @RepeatedTest(20) + void concurrent_sendMessage_should_not_fail() throws Exception { + var serverParams = ServerParameters.builder("cat").env(Map.of()).build(); + transport = new StdioClientTransport(serverParams, new GsonMcpJsonMapper()); + + transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5)); + + var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1", + Map.of("name", "tool_a", "arguments", Map.of())); + var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2", + Map.of("name", "tool_b", "arguments", Map.of())); + + var barrier = new CyclicBarrier(2); + var errors = new CopyOnWriteArrayList(); + var latch = new CountDownLatch(2); + + for (var msg : new McpSchema.JSONRPCMessage[] { msg1, msg2 }) { + new Thread(() -> { + try { + barrier.await(2, TimeUnit.SECONDS); + transport.sendMessage(msg).block(Duration.ofSeconds(2)); + } + catch (Exception e) { + errors.add(e); + } + finally { + latch.countDown(); + } + }).start(); + } + + latch.await(5, TimeUnit.SECONDS); + + assertThat(errors) + .as("Concurrent sendMessage calls should both succeed, but the unicast sink " + + "rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext") + .isEmpty(); + } + +}