fix: concurrent message processing for all transports#610
fix: concurrent message processing for all transports#610rnett wants to merge 1 commit intomodelcontextprotocol:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses Issue #572 by enabling concurrent JSON-RPC message processing across transports, moving per-message coroutine launching/tracking into AbstractTransport so long-running handlers don’t block unrelated requests/notifications.
Changes:
- Add centralized concurrent dispatch helpers in
AbstractTransport(handleMessage,joinInProgressHandlers, cancellation helpers) and require transports to provide ascope. - Update multiple transports to route inbound messages through the new dispatch path and add coroutine naming for better observability.
- Add/extend unit + integration tests to verify concurrent handling (including a new CHANNEL transport integration variant).
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt | Adjusts npx tsx invocation to work reliably on Windows. |
| kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt | Makes channel message handling concurrent via coroutine launches. |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt | Adds a concurrency regression test for stdio server message processing. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt | Runs batches of inbound HTTP messages via concurrent handler jobs. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt | Switches stdio server inbound handling to the new dispatch helpers and joins/cancels handler work on close. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt | Introduces a transport scope and routes inbound SSE messages through inline handling. |
| kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt | Updates test transport to provide a scope compatible with new AbstractTransport API. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt | Adds coroutine naming utilities and optimizes JSON->domain conversion for request/notification decoding. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt | Uses new scope + concurrent dispatch for websocket inbound messages. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | Downgrades “unknown progress token” handling from error to warning to reduce noise under concurrency. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt | Core change: centralizes concurrent per-message dispatch, tracking, and handler lifecycle helpers. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt | Routes decoded responses/stream events through concurrent dispatch helpers. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt | Switches stdio client inbound handling to concurrent dispatch + updated shutdown semantics. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt | Provides scope + joins in-flight handlers during shutdown. |
| integration-test/src/jvmTest/typescript/package-lock.json | Lockfile update (adds metadata field). |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt | Updates test transport to provide scope and use concurrent dispatch helper. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt | Adds channel-based integration test variant for tool concurrency behavior. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt | Adds CHANNEL transport option and lifecycle management for linked channel transports. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt | Adds a parameterized concurrency test validating fast calls complete while slow tools run. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt | Provides scope and updates in-memory delivery to use inline dispatch helper. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt | Updates local test transports to provide a scope and use concurrent dispatch helper. |
Files not reviewed (1)
- integration-test/src/jvmTest/typescript/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...dk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt
Outdated
Show resolved
Hide resolved
...testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt
Outdated
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt
Outdated
Show resolved
Hide resolved
❌ 3 Tests Failed:
View the top 3 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
a2dbd83 to
fb00602
Compare
fb00602 to
5b88d56
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces transport-level concurrent message handling so that long-running request handlers don’t block processing of subsequent messages (e.g., pings/progress), addressing the sequential-processing limitation reported in issue #572.
Changes:
- Add a per-transport coroutine
scopeand centralized concurrent dispatch helpers inAbstractTransport. - Update all transports to route inbound messages through the new concurrent dispatcher (with special inline handling for SSE server).
- Expand/adjust unit + integration tests to be concurrency-safe (Mutex-protected collections) and add coverage for concurrent tool execution; also fix SSE
data:line concatenation and a Windowsnpxinvocation edge.
Reviewed changes
Copilot reviewed 31 out of 32 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt | Windows-safe npx/tsx command invocation for TS integration tests. |
| kotlin-sdk-testing/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTest.kt | Make assertions/order tolerant and add Mutex protection for concurrent callbacks. |
| kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt | Route message handling via AbstractTransport.handleMessage and track handler jobs. |
| kotlin-sdk-testing/api/kotlin-sdk-testing.api | Public API surface update for LinkedTransports.close(). |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt | Mutex-protect message collection for concurrent transport dispatch. |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt | Mutex-protect message collection and add explicit concurrent-processing test. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt | Dispatch messages via concurrent handler jobs and join per-request. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt | Dispatch inbound messages through concurrent handler jobs and join/cancel on close. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt | Provide transport scope and switch to inline handler path. |
| kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt | Add required scope to test transport implementation. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt | Validate JSON-RPC messages are JSON objects during polymorphic selection. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt | Add message-derived CoroutineName, RequestId.asString(), and more direct fromJSON() decoding. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt | Route inbound messages via concurrent dispatch helper. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | Treat unknown progress tokens as warn-and-ignore (instead of error escalation). |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt | Core concurrent dispatch + in-flight job tracking added to base transport. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport.kt | Ensure shutdown waits for in-flight handlers before resource cleanup. |
| kotlin-sdk-core/api/kotlin-sdk-core.api | Public API surface updates for new transport methods and RequestId.asString(). |
| kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt | Mutex-protect message collection for concurrent dispatch. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt | Use concurrent dispatch helper and fix multi-line SSE data: concatenation. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt | Add transport scope, route inbound messages through concurrent dispatcher, and cancel in-flight jobs on shutdown. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt | Add transport scope, route inbound messages through concurrent dispatcher, and join in-flight handlers on close. |
| integration-test/src/jvmTest/typescript/package-lock.json | Lockfile metadata update. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTestJvm.kt | Rename JVM test class to avoid conflicts/clarify target. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt | Add transport scope and route inbound messages via concurrent dispatch helper. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt | Add channel transport variant of tool integration tests. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt | Add CHANNEL transport option and lifecycle wiring. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt | Add concurrent tool processing test (blocking + suspending variants). |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt | Add transport scope and inline dispatch for in-memory test transport. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt | Mutex-protect message collection for concurrent dispatch. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt | Mutex-protect message collection for concurrent dispatch. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt | Update test transport to satisfy new scope requirement and use concurrent dispatch helper. |
| AGENTS.md | Document new testing + SSE parsing conventions related to concurrency. |
Files not reviewed (1)
- integration-test/src/jvmTest/typescript/package-lock.json: Language not supported
Comments suppressed due to low confidence (3)
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt:119
handleMessageInline(parsedMessage)swallows non-cancellation exceptions from the user_onMessagehandler (viaAbstractTransport.doHandle). As a result, handler failures won’t throw out of this method, andhandlePostMessagewill still respond "Accepted" even if processing failed. If POST requests should return an error on handler failure,handleMessageInlineneeds a variant that propagates handler exceptions (or returns a failure signal) so this transport can reject the request appropriately.
public suspend fun handleMessage(message: String) {
try {
val parsedMessage = McpJson.decodeFromString<JSONRPCMessage>(message)
handleMessageInline(parsedMessage)
} catch (e: Exception) {
_onError.invoke(e)
throw e
}
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt:168
handleMessage(message)just launches a coroutine and doesn’t throw handler exceptions (they’re handled insideAbstractTransport). The try/catch here won’t catch message-handler failures and may give the impression that errors are handled twice. Consider removing this try/catch and, if you need per-message completion logging or cancellation behavior, attachinvokeOnCompletionto the returned Job instead.
if (message == null) break
try {
handleMessage(message)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message" }
_onError.invoke(e)
}
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt:79
- Similar to SSE:
handleMessage(message)won’t throw on handler failure becauseAbstractTransportcatches exceptions internally. The surrounding try/catch will therefore only catch decode/parsing errors, and handler exceptions will no longer close the websocket loop (where the current code appears to expect throwing to abort). If handler failures should terminate the connection, consider handling completion on the returned Job (and closing the session) or using an inline/propagating handler path.
try {
val message = McpJson.decodeFromString<JSONRPCMessage>(message.readText())
handleMessage(message)
} catch (e: Exception) {
_onError.invoke(e)
throw e
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...dk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt
Show resolved
Hide resolved
...testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt
Show resolved
Hide resolved
...ing/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTest.kt
Outdated
Show resolved
Hide resolved
...ver/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt
Outdated
Show resolved
Hide resolved
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt
Outdated
Show resolved
Hide resolved
...testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt
Show resolved
Hide resolved
...rver/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt
Show resolved
Hide resolved
...dk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt
Outdated
Show resolved
Hide resolved
5b88d56 to
848d6e7
Compare
848d6e7 to
12fee57
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces unified concurrent inbound message handling across transports by centralizing message dispatch and lifecycle management in AbstractTransport, addressing sequential processing issues (e.g., stdio blocking long-running requests and preventing timely handling of ping/pong and other messages).
Changes:
- Add concurrent message dispatch + handler shutdown/join support to
AbstractTransport, then migrate transports to usehandleMessage(...),handleMessageInline(...),handleError(...), andshutdownHandlers(). - Update transports (stdio, SSE, WebSocket, Streamable HTTP, channel/in-memory test transports) to use the new dispatch/error APIs and align close/shutdown behavior.
- Adjust and add tests for concurrency (thread-safe message collection, order-independent assertions, new integration coverage).
Reviewed changes
Copilot reviewed 34 out of 35 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt | Use Windows-friendly cmd.exe /c invocation for npx tsx. |
| kotlin-sdk-testing/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTest.kt | Make tests resilient to concurrent callbacks (Mutex/Channel + order-independent assertions). |
| kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt | Route inbound messages through new concurrent handler pipeline and graceful shutdown. |
| kotlin-sdk-testing/api/kotlin-sdk-testing.api | Update API surface for LinkedTransports.close(). |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt | Make server-side test message capture thread-safe under concurrent dispatch. |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt | Update ordering expectations; add concurrency-oriented coverage. |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/AbstractKtorExtensionsTest.kt | Add timeout guard to reduce SSE test hangs. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerTransport.kt | Pass session coroutine context into base transport for better scoping. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt | Use new handler/error helpers; ensure handler shutdown on close; process batch messages via jobs. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt | Move to AbstractTransport scope + concurrent message handling + unified error handling. |
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt | Bind transport scope to call context; use inline handler path where required; unified error handling. |
| kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt | Minor formatting adjustment. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt | Improve JSON-RPC polymorphic deserialization error clarity for non-object payloads. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt | Add coroutine naming for messages + RequestId.asString(); optimize fromJSON() conversion. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt | Migrate WebSocket transport to new handler/error model; shutdown handlers on close. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | Downgrade unknown progress-token handling from error to warning (concurrency-friendly). |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt | Core change: add concurrent dispatch, handler tracking, safe error handling, and shutdown helpers. |
| kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport.kt | Plumb transport/handler coroutine contexts into base; route send errors via handleError. |
| kotlin-sdk-core/api/kotlin-sdk-core.api | Update published API due to new constructors/helpers. |
| kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt | Make message capture thread-safe under concurrent dispatch. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt | Route inbound messages/errors through new helpers; fix SSE multi-line data: concatenation. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt | Remove custom scope in favor of base transport scope; concurrent message handling + unified shutdown. |
| kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt | Adopt new handler/error helpers; adjust close behavior in collector. |
| integration-test/src/jvmTest/typescript/package-lock.json | Lockfile metadata update (hasInstallScript). |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTestJvm.kt | Rename JVM test class to avoid collisions/clarify intent. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt | Update custom HTTP transport to use new handler helpers and scoped context. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt | Add channel-based integration test variant for concurrency behavior. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt | Add Channel transport option and lifecycle management in integration harness. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt | Add parameterized concurrency test ensuring long tool execution doesn’t block other calls. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt | Increase connect timeout to reduce flakiness. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt | Update in-memory transport to use new handler helpers and shutdown semantics. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt | Make base transport test thread-safe and order-independent under concurrent processing. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt | Update in-memory tests for new transport construction and thread-safety. |
| integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt | Update mock transport to use new base constructor + handleMessage(...). |
| AGENTS.md | Document concurrency-safe testing guidance and SSE data: concatenation rule. |
Files not reviewed (1)
- integration-test/src/jvmTest/typescript/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } finally { | ||
| closeResources() | ||
| close() | ||
| } |
There was a problem hiding this comment.
collectMessages() calls close() in finally. If the SSE connection fails before endpoint is received, the transport is still in Initializing state, and AbstractClientTransport.close() is a no-op for that state; this can leave initialize() stuck on endpoint.await() and leak the session/job. Prefer calling closeResources() (as before) and/or completing endpoint exceptionally on failure so initialize() can terminate and start() can clean up correctly.
|
|
||
| protected fun handleMessage(message: JSONRPCMessage): Job { | ||
| val name = message.coroutineName | ||
| return scope.launch(handlerContext + name) { |
There was a problem hiding this comment.
handlerContext is accepted as a full CoroutineContext, so callers can (and in tests do) pass a context that contains a Job. In handleMessage, scope.launch(handlerContext + name) will then override the transport scope’s Job, so handler coroutines may not be cancelled when the transport scope is cancelled (e.g., via invokeOnCloseCallback). Consider sanitizing handlerContext (e.g., handlerContext.minusKey(Job)) or narrowing the type to a CoroutineDispatcher to ensure handler jobs remain children of the transport scope.
| return scope.launch(handlerContext + name) { | |
| // Ensure handler jobs remain children of the transport scope by removing any Job from handlerContext | |
| return scope.launch(handlerContext.minusKey(Job) + name) { |
| // Simulate Protocol's new behavior: launch a coroutine for each message | ||
| launch { | ||
| if ((message as? JSONRPCRequest)?.method == "slow") { | ||
| firstMessageReceived.complete(Unit) | ||
| firstMessageProceed.await() | ||
| } else { | ||
| secondMessageHandled.complete(Unit) | ||
| } |
There was a problem hiding this comment.
This test intends to prove transport-level concurrent processing, but the handler immediately launches and returns, which allows sequential transports to keep reading the next message even if the launched coroutine is still suspended. To actually validate concurrent handling, the slow-path should suspend directly inside the onMessage handler (no extra launch), so a sequential implementation would block message2 while a concurrent one would not.
| // Simulate Protocol's new behavior: launch a coroutine for each message | |
| launch { | |
| if ((message as? JSONRPCRequest)?.method == "slow") { | |
| firstMessageReceived.complete(Unit) | |
| firstMessageProceed.await() | |
| } else { | |
| secondMessageHandled.complete(Unit) | |
| } | |
| // Simulate a slow-path handler that actually suspends inside onMessage | |
| if ((message as? JSONRPCRequest)?.method == "slow") { | |
| firstMessageReceived.complete(Unit) | |
| firstMessageProceed.await() | |
| } else { | |
| secondMessageHandled.complete(Unit) |
| receiveChannel.send(JSONRPCRequest(RequestId.NumberId(2), "m2")) | ||
| delay(100) | ||
| receiveChannel.send(JSONRPCRequest(RequestId.NumberId(3), "m3")) |
There was a problem hiding this comment.
The fixed delay(100) makes this test timing-dependent and can still be flaky on slow/contended CI (or unnecessarily slow on fast machines). Prefer synchronizing deterministically (e.g., wait until message 2 has been observed/started via a CompletableDeferred, or use yield()/eventually around assertions) instead of a hard-coded sleep.
Motivation and Context
Adds somewhat-unified concurrent request handling as mentioned in #572 and related issues. Unfortunately it wasn't possible to handle this in
Protocol, since some transports (sse server mostly) need to handle the request inline and are already in a pre-request coroutine. I went with the next best place I could find, which wasAbstractTransport.I do think there's room for improvement here, including:
Protocolbut the current lateinit setup architecture doesn't allow for it.setNotificationHandlerand friends to take a suspending lambda, not one that returnsDeferred.However, as per the discussion on the issue, I wanted to keep this change as minimal as possible.
How Has This Been Tested?
All unit and integration tests pass, and I added tests to ensure that suspending and blocking handlers would not block other calls.
testToolConcurrentProcessingin particular ensures that new messages can be handled while tool execution is ongoing. It fails for stdio and channel onmain.Breaking Changes
None.
Types of changes
Checklist
Additional context