Skip to content

fix: concurrent message processing for all transports#610

Draft
rnett wants to merge 1 commit intomodelcontextprotocol:mainfrom
rnett:rnett/non-blocking-call-handlers
Draft

fix: concurrent message processing for all transports#610
rnett wants to merge 1 commit intomodelcontextprotocol:mainfrom
rnett:rnett/non-blocking-call-handlers

Conversation

@rnett
Copy link
Contributor

@rnett rnett commented Mar 18, 2026

Motivation and Context

Adds somewhat-unified concurrent request handling as mentioned in #572 and related issues. Unfortunately it wasn't possible to handle this in Protocol, since some transports (sse server mostly) need to handle the request inline and are already in a pre-request coroutine. I went with the next best place I could find, which was AbstractTransport.

I do think there's room for improvement here, including:

  • More centralized handling of this sort of thing, the coroutine scopes, and error handling. I would think the best place is in Protocol but the current lateinit setup architecture doesn't allow for it.
  • Unified graceful shutdown with timeout + cancellation
  • Updating setNotificationHandler and friends to take a suspending lambda, not one that returns Deferred.

However, as per the discussion on the issue, I wanted to keep this change as minimal as possible.

How Has This Been Tested?

All unit and integration tests pass, and I added tests to ensure that suspending and blocking handlers would not block other calls.
testToolConcurrentProcessing in particular ensures that new messages can be handled while tool execution is ongoing. It fails for stdio and channel on main.

Breaking Changes

None.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

Copilot AI review requested due to automatic review settings March 18, 2026 06:11
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses Issue #572 by enabling concurrent JSON-RPC message processing across transports, moving per-message coroutine launching/tracking into AbstractTransport so long-running handlers don’t block unrelated requests/notifications.

Changes:

  • Add centralized concurrent dispatch helpers in AbstractTransport (handleMessage, joinInProgressHandlers, cancellation helpers) and require transports to provide a scope.
  • Update multiple transports to route inbound messages through the new dispatch path and add coroutine naming for better observability.
  • Add/extend unit + integration tests to verify concurrent handling (including a new CHANNEL transport integration variant).

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt Adjusts npx tsx invocation to work reliably on Windows.
kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt Makes channel message handling concurrent via coroutine launches.
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt Adds a concurrency regression test for stdio server message processing.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt Runs batches of inbound HTTP messages via concurrent handler jobs.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt Switches stdio server inbound handling to the new dispatch helpers and joins/cancels handler work on close.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt Introduces a transport scope and routes inbound SSE messages through inline handling.
kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt Updates test transport to provide a scope compatible with new AbstractTransport API.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt Adds coroutine naming utilities and optimizes JSON->domain conversion for request/notification decoding.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt Uses new scope + concurrent dispatch for websocket inbound messages.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt Downgrades “unknown progress token” handling from error to warning to reduce noise under concurrency.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt Core change: centralizes concurrent per-message dispatch, tracking, and handler lifecycle helpers.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt Routes decoded responses/stream events through concurrent dispatch helpers.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt Switches stdio client inbound handling to concurrent dispatch + updated shutdown semantics.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt Provides scope + joins in-flight handlers during shutdown.
integration-test/src/jvmTest/typescript/package-lock.json Lockfile update (adds metadata field).
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt Updates test transport to provide scope and use concurrent dispatch helper.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt Adds channel-based integration test variant for tool concurrency behavior.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt Adds CHANNEL transport option and lifecycle management for linked channel transports.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt Adds a parameterized concurrency test validating fast calls complete while slow tools run.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt Provides scope and updates in-memory delivery to use inline dispatch helper.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt Updates local test transports to provide a scope and use concurrent dispatch helper.
Files not reviewed (1)
  • integration-test/src/jvmTest/typescript/package-lock.json: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@codecov-commenter
Copy link

codecov-commenter commented Mar 18, 2026

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
2827 3 2824 50
View the top 3 failed test(s) by shortest run time
linuxX64Test.io.modelcontextprotocol.kotlin.sdk.client.streamable.http.StreamableHttpClientTransportTest::testInlineSseRetryParsing[linuxX64]
Stack Traces | 0.066s run time
kotlin.AssertionError: Block failed after 247.239us; attempted 2 time(s)
The first error was caused by: Timed out after Infinity of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
kotlinx.coroutines.TimeoutCancellationException: Timed out after Infinity of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
	at kotlin.Throwable#<init>(Unknown Source)
	at kotlin.Exception#<init>(Unknown Source)
	at kotlin.RuntimeException#<init>(Unknown Source)
	at kotlin.IllegalStateException#<init>(Unknown Source)
	at kotlin.coroutines.cancellation.CancellationException#<init>(Unknown Source)
	at kotlinx.coroutines.TimeoutCancellationException#<init>(Unknown Source)
	at kotlinx.coroutines#TimeoutCancellationException(Unknown Source)
	at kotlinx.coroutines.TimeoutCoroutine.run#internal(Unknown Source)
	at kotlinx.coroutines.Runnable#run(Unknown Source)
	at kotlinx.coroutines.test.TestDispatcher#processEvent(Unknown Source)
	at kotlinx.coroutines.test.TestCoroutineScheduler#tryRunNextTaskUnless(Unknown Source)
	at kotlinx.coroutines.test.runTest$$inlined$let$1.runTest$$inlined$let$1$invoke$3.$invokeCOROUTINE$2.invokeSuspend#internal(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#invokeSuspend(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(Unknown Source)
	at kotlin.coroutines.Continuation#resumeWith(Unknown Source)
	at kotlinx.coroutines.DispatchedTask#run(Unknown Source)
	at kotlinx.coroutines.Runnable#run(Unknown Source)
	at kotlinx.coroutines.EventLoopImplBase#processNextEvent(Unknown Source)
	at kotlinx.coroutines.EventLoop#processNextEvent(Unknown Source)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking#internal(Unknown Source)
	at kotlinx.coroutines#runBlocking(Unknown Source)
	at kotlinx.coroutines#runBlocking$default(Unknown Source)
	at kotlinx.coroutines.test#createTestResult(Unknown Source)
	at kotlinx.coroutines.test#runTest__at__kotlinx.coroutines.test.TestScope(Unknown Source)
	at kotlinx.coroutines.test#runTest(Unknown Source)
	at kotlinx.coroutines.test#runTest$default(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.StreamableHttpClientTransportTest#testInlineSseRetryParsing(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.$StreamableHttpClientTransportTest$test$0.$StreamableHttpClientTransportTest$test$0$$FUNCTION_REFERENCE_FOR$testInlineSseRetryParsing$13.invoke#internal(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.$StreamableHttpClientTransportTest$test$0.$StreamableHttpClientTransportTest$test$0$$FUNCTION_REFERENCE_FOR$testInlineSseRetryParsing$13.$<bridge-DNN>invoke(Unknown Source)
	at kotlin.Function1#invoke(Unknown Source)
	at kotlin.native.internal.test.BaseClassSuite.TestCase#doRun(Unknown Source)
	at kotlin.native.internal.test.TestCase#doRun(Unknown Source)
	at kotlin.native.internal.test.TestCase#run(Unknown Source)
	at kotlin.native.internal.test.TestCase#run(Unknown Source)
	at kotlin.native.internal.test.TestRunner.run#internal(Unknown Source)
	at kotlin.native.internal.test.TestRunner.runIteration#internal(Unknown Source)
	at kotlin.native.internal.test.TestRunner#run(Unknown Source)
	at kotlin.native.internal.test#testLauncherEntryPoint(Unknown Source)
	at kotlin.native.internal.test#main(Unknown Source)
	at <global>.Konan_start(Unknown Source)
	at <global>.Init_and_run_start(Unknown Source)
	at <global>.0x0(Unknown Source)
	at <global>.__libc_start_main(Unknown Source)
	at <global>.0x0(Unknown Source)
	at kotlin.Error#<init>(Unknown Source)
	at kotlin.AssertionError#<init>(Unknown Source)
	at io.kotest.assertions#createAssertionError(Unknown Source)
	at io.kotest.assertions.AssertionErrorBuilder#build(Unknown Source)
	at io.kotest.assertions.nondeterministic.$runIterationsCOROUTINE$1.invokeSuspend#internal(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#invokeSuspend(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(Unknown Source)
	at kotlin.coroutines.Continuation#resumeWith(Unknown Source)
	at kotlinx.coroutines.DispatchedTask#run(Unknown Source)
	at kotlinx.coroutines.Runnable#run(Unknown Source)
	at kotlinx.coroutines.test.TestDispatcher#processEvent(Unknown Source)
	at kotlinx.coroutines.test.TestCoroutineScheduler#tryRunNextTaskUnless(Unknown Source)
	at kotlinx.coroutines.test.runTest$$inlined$let$1.runTest$$inlined$let$1$invoke$3.$invokeCOROUTINE$2.invokeSuspend#internal(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#invokeSuspend(Unknown Source)
	at kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(Unknown Source)
	at kotlin.coroutines.Continuation#resumeWith(Unknown Source)
	at kotlinx.coroutines.DispatchedTask#run(Unknown Source)
	at kotlinx.coroutines.Runnable#run(Unknown Source)
	at kotlinx.coroutines.EventLoopImplBase#processNextEvent(Unknown Source)
	at kotlinx.coroutines.EventLoop#processNextEvent(Unknown Source)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking#internal(Unknown Source)
	at kotlinx.coroutines#runBlocking(Unknown Source)
	at kotlinx.coroutines#runBlocking$default(Unknown Source)
	at kotlinx.coroutines.test#createTestResult(Unknown Source)
	at kotlinx.coroutines.test#runTest__at__kotlinx.coroutines.test.TestScope(Unknown Source)
	at kotlinx.coroutines.test#runTest(Unknown Source)
	at kotlinx.coroutines.test#runTest$default(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.StreamableHttpClientTransportTest#testInlineSseRetryParsing(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.$StreamableHttpClientTransportTest$test$0.$StreamableHttpClientTransportTest$test$0$$FUNCTION_REFERENCE_FOR$testInlineSseRetryParsing$13.invoke#internal(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.client.streamable.http.$StreamableHttpClientTransportTest$test$0.$StreamableHttpClientTransportTest$test$0$$FUNCTION_REFERENCE_FOR$testInlineSseRetryParsing$13.$<bridge-DNN>invoke(Unknown Source)
	at kotlin.Function1#invoke(Unknown Source)
	at kotlin.native.internal.test.BaseClassSuite.TestCase#doRun(Unknown Source)
	at kotlin.native.internal.test.TestCase#doRun(Unknown Source)
	at kotlin.native.internal.test.TestCase#run(Unknown Source)
	at kotlin.native.internal.test.TestCase#run(Unknown Source)
	at kotlin.native.internal.test.TestRunner.run#internal(Unknown Source)
	at kotlin.native.internal.test.TestRunner.runIteration#internal(Unknown Source)
	at kotlin.native.internal.test.TestRunner#run(Unknown Source)
	at kotlin.native.internal.test#testLauncherEntryPoint(Unknown Source)
	at kotlin.native.internal.test#main(Unknown Source)
	at <global>.Konan_start(Unknown Source)
	at <global>.Init_and_run_start(Unknown Source)
	at <global>.0x0(Unknown Source)
	at <global>.__libc_start_main(Unknown Source)
	at <global>.0x0(Unknown Source)
io.modelcontextprotocol.kotlin.sdk.integration.kotlin.sse.SchemaPromptIntegrationTestSse::testConcurrentPromptRequests()[jvm]
Stack Traces | 0.743s run time
kotlinx.coroutines.test.UncaughtExceptionsBeforeTest: There were uncaught exceptions before the test started. Please avoid this, as such exceptions are also reported in a platform-dependent manner so that they are not lost.
	at kotlinx.coroutines.test.TestScopeImpl.enter(TestScope.kt:238)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0(TestBuilders.kt:308)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0(Unknown Source)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0(TestBuilders.kt:167)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0(Unknown Source)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0$default(TestBuilders.kt:159)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0$default(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.integration.kotlin.AbstractPromptIntegrationTest.testConcurrentPromptRequests(AbstractPromptIntegrationTest.kt:629)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.modelcontextprotocol.kotlin.test.utils.RetryExtension.executeWithRetry(Retry.kt:40)
	at io.modelcontextprotocol.kotlin.test.utils.RetryExtension.interceptTestMethod(Retry.kt:26)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1351)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:422)
	at java.base/java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:651)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
	Suppressed: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler ChildCompletion@402a177c[job@4829a23a] for "ws-default#1055":StandaloneCoroutine{Cancelled}@4829a23a
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:946)
		at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:894)
		at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:859)
		at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:99)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:47)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
		at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:124)
		at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
		at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
		Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [CoroutineName(ws-default), CoroutineId(1055), "ws-default#1055":StandaloneCoroutine{Cancelled}@4829a23a, Dispatchers.IO]
	Caused by: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler InvokeOnCompletion@25c4ca3d[job@41b32733] for JobImpl{Cancelled}@41b32733
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.continueCompleting(JobSupport.kt:986)
		at kotlinx.coroutines.JobSupport.access$continueCompleting(JobSupport.kt:22)
		at kotlinx.coroutines.JobSupport$ChildCompletion.invoke(JobSupport.kt:1268)
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1625)
		... 14 more
	Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelled}@41b32733
		at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1685)
		at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:207)
		at io.ktor.websocket.DefaultWebSocketSessionImpl$start$2.invokeSuspend(DefaultWebSocketSession.kt:168)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:34)
		... 7 more
io.modelcontextprotocol.kotlin.sdk.integration.kotlin.streamablehttp.ResourceIntegrationTestStreamableHttp::testConcurrentResourceOperations()[jvm]
Stack Traces | 3.88s run time
kotlinx.coroutines.test.UncaughtExceptionsBeforeTest: There were uncaught exceptions before the test started. Please avoid this, as such exceptions are also reported in a platform-dependent manner so that they are not lost.
	at kotlinx.coroutines.test.TestScopeImpl.enter(TestScope.kt:238)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0(TestBuilders.kt:308)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0(Unknown Source)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0(TestBuilders.kt:167)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0(Unknown Source)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTest-8Mi8wO0$default(TestBuilders.kt:159)
	at kotlinx.coroutines.test.TestBuildersKt.runTest-8Mi8wO0$default(Unknown Source)
	at io.modelcontextprotocol.kotlin.sdk.integration.kotlin.AbstractResourceIntegrationTest.testConcurrentResourceOperations(AbstractResourceIntegrationTest.kt:285)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.modelcontextprotocol.kotlin.test.utils.RetryExtension.executeWithRetry(Retry.kt:40)
	at io.modelcontextprotocol.kotlin.test.utils.RetryExtension.interceptTestMethod(Retry.kt:26)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1351)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:422)
	at java.base/java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:651)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
	Suppressed: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler ChildCompletion@5eb4961b[job@2c9038b6] for "ws-default#705":StandaloneCoroutine{Cancelled}@2c9038b6
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:946)
		at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:894)
		at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:859)
		at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:99)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:47)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
		at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:124)
		at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
		at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
		Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [CoroutineName(ws-default), CoroutineId(705), "ws-default#705":StandaloneCoroutine{Cancelled}@2c9038b6, Dispatchers.IO]
	Caused by: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler InvokeOnCompletion@43b3b7a8[job@7d49e5fe] for JobImpl{Cancelled}@7d49e5fe
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.continueCompleting(JobSupport.kt:986)
		at kotlinx.coroutines.JobSupport.access$continueCompleting(JobSupport.kt:22)
		at kotlinx.coroutines.JobSupport$ChildCompletion.invoke(JobSupport.kt:1268)
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1625)
		... 14 more
	Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelled}@7d49e5fe
		at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1685)
		at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:207)
		at io.ktor.websocket.DefaultWebSocketSessionImpl$start$2.invokeSuspend(DefaultWebSocketSession.kt:168)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:34)
		... 7 more
	Suppressed: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler ChildCompletion@41bc5ebf[job@28a99ea5] for "ws-default#793":StandaloneCoroutine{Cancelled}@28a99ea5
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:946)
		at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:894)
		at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:859)
		at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:99)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:47)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
		at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:124)
		at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
		at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
		Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [CoroutineName(ws-default), CoroutineId(793), "ws-default#793":StandaloneCoroutine{Cancelled}@28a99ea5, Dispatchers.IO]
	Caused by: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler InvokeOnCompletion@63b2a1dc[job@12496aa8] for JobImpl{Cancelled}@12496aa8
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1629)
		at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:316)
		at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:233)
		at kotlinx.coroutines.JobSupport.continueCompleting(JobSupport.kt:986)
		at kotlinx.coroutines.JobSupport.access$continueCompleting(JobSupport.kt:22)
		at kotlinx.coroutines.JobSupport$ChildCompletion.invoke(JobSupport.kt:1268)
		at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1625)
		... 14 more
	Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelled}@12496aa8
		at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1685)
		at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:207)
		at io.ktor.websocket.DefaultWebSocketSessionImpl$start$2.invokeSuspend(DefaultWebSocketSession.kt:168)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:34)
		... 7 more

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@rnett rnett force-pushed the rnett/non-blocking-call-handlers branch from a2dbd83 to fb00602 Compare March 19, 2026 01:00
Copilot AI review requested due to automatic review settings March 19, 2026 02:32
@rnett rnett force-pushed the rnett/non-blocking-call-handlers branch from fb00602 to 5b88d56 Compare March 19, 2026 02:32
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces transport-level concurrent message handling so that long-running request handlers don’t block processing of subsequent messages (e.g., pings/progress), addressing the sequential-processing limitation reported in issue #572.

Changes:

  • Add a per-transport coroutine scope and centralized concurrent dispatch helpers in AbstractTransport.
  • Update all transports to route inbound messages through the new concurrent dispatcher (with special inline handling for SSE server).
  • Expand/adjust unit + integration tests to be concurrency-safe (Mutex-protected collections) and add coverage for concurrent tool execution; also fix SSE data: line concatenation and a Windows npx invocation edge.

Reviewed changes

Copilot reviewed 31 out of 32 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt Windows-safe npx/tsx command invocation for TS integration tests.
kotlin-sdk-testing/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTest.kt Make assertions/order tolerant and add Mutex protection for concurrent callbacks.
kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt Route message handling via AbstractTransport.handleMessage and track handler jobs.
kotlin-sdk-testing/api/kotlin-sdk-testing.api Public API surface update for LinkedTransports.close().
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt Mutex-protect message collection for concurrent transport dispatch.
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt Mutex-protect message collection and add explicit concurrent-processing test.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt Dispatch messages via concurrent handler jobs and join per-request.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt Dispatch inbound messages through concurrent handler jobs and join/cancel on close.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt Provide transport scope and switch to inline handler path.
kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt Add required scope to test transport implementation.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt Validate JSON-RPC messages are JSON objects during polymorphic selection.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt Add message-derived CoroutineName, RequestId.asString(), and more direct fromJSON() decoding.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt Route inbound messages via concurrent dispatch helper.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt Treat unknown progress tokens as warn-and-ignore (instead of error escalation).
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt Core concurrent dispatch + in-flight job tracking added to base transport.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport.kt Ensure shutdown waits for in-flight handlers before resource cleanup.
kotlin-sdk-core/api/kotlin-sdk-core.api Public API surface updates for new transport methods and RequestId.asString().
kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt Mutex-protect message collection for concurrent dispatch.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt Use concurrent dispatch helper and fix multi-line SSE data: concatenation.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt Add transport scope, route inbound messages through concurrent dispatcher, and cancel in-flight jobs on shutdown.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt Add transport scope, route inbound messages through concurrent dispatcher, and join in-flight handlers on close.
integration-test/src/jvmTest/typescript/package-lock.json Lockfile metadata update.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTestJvm.kt Rename JVM test class to avoid conflicts/clarify target.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt Add transport scope and route inbound messages via concurrent dispatch helper.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt Add channel transport variant of tool integration tests.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt Add CHANNEL transport option and lifecycle wiring.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt Add concurrent tool processing test (blocking + suspending variants).
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt Add transport scope and inline dispatch for in-memory test transport.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt Mutex-protect message collection for concurrent dispatch.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt Mutex-protect message collection for concurrent dispatch.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt Update test transport to satisfy new scope requirement and use concurrent dispatch helper.
AGENTS.md Document new testing + SSE parsing conventions related to concurrency.
Files not reviewed (1)
  • integration-test/src/jvmTest/typescript/package-lock.json: Language not supported
Comments suppressed due to low confidence (3)

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt:119

  • handleMessageInline(parsedMessage) swallows non-cancellation exceptions from the user _onMessage handler (via AbstractTransport.doHandle). As a result, handler failures won’t throw out of this method, and handlePostMessage will still respond "Accepted" even if processing failed. If POST requests should return an error on handler failure, handleMessageInline needs a variant that propagates handler exceptions (or returns a failure signal) so this transport can reject the request appropriately.
    public suspend fun handleMessage(message: String) {
        try {
            val parsedMessage = McpJson.decodeFromString<JSONRPCMessage>(message)
            handleMessageInline(parsedMessage)
        } catch (e: Exception) {
            _onError.invoke(e)
            throw e
        }

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt:168

  • handleMessage(message) just launches a coroutine and doesn’t throw handler exceptions (they’re handled inside AbstractTransport). The try/catch here won’t catch message-handler failures and may give the impression that errors are handled twice. Consider removing this try/catch and, if you need per-message completion logging or cancellation behavior, attach invokeOnCompletion to the returned Job instead.
            if (message == null) break
            try {
                handleMessage(message)
            } catch (e: CancellationException) {
                throw e
            } catch (e: Throwable) {
                logger.error(e) { "Error processing message" }
                _onError.invoke(e)
            }

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt:79

  • Similar to SSE: handleMessage(message) won’t throw on handler failure because AbstractTransport catches exceptions internally. The surrounding try/catch will therefore only catch decode/parsing errors, and handler exceptions will no longer close the websocket loop (where the current code appears to expect throwing to abort). If handler failures should terminate the connection, consider handling completion on the returned Job (and closing the session) or using an inline/propagating handler path.
                try {
                    val message = McpJson.decodeFromString<JSONRPCMessage>(message.readText())
                    handleMessage(message)
                } catch (e: Exception) {
                    _onError.invoke(e)
                    throw e
                }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@rnett rnett marked this pull request as draft March 19, 2026 02:53
@rnett rnett force-pushed the rnett/non-blocking-call-handlers branch from 5b88d56 to 848d6e7 Compare March 19, 2026 23:14
@rnett rnett force-pushed the rnett/non-blocking-call-handlers branch from 848d6e7 to 12fee57 Compare March 19, 2026 23:49
Copilot AI review requested due to automatic review settings March 19, 2026 23:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces unified concurrent inbound message handling across transports by centralizing message dispatch and lifecycle management in AbstractTransport, addressing sequential processing issues (e.g., stdio blocking long-running requests and preventing timely handling of ping/pong and other messages).

Changes:

  • Add concurrent message dispatch + handler shutdown/join support to AbstractTransport, then migrate transports to use handleMessage(...), handleMessageInline(...), handleError(...), and shutdownHandlers().
  • Update transports (stdio, SSE, WebSocket, Streamable HTTP, channel/in-memory test transports) to use the new dispatch/error APIs and align close/shutdown behavior.
  • Adjust and add tests for concurrency (thread-safe message collection, order-independent assertions, new integration coverage).

Reviewed changes

Copilot reviewed 34 out of 35 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
test-utils/src/jvmMain/kotlin/io/modelcontextprotocol/kotlin/test/utils/TypeScriptRunner.kt Use Windows-friendly cmd.exe /c invocation for npx tsx.
kotlin-sdk-testing/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTest.kt Make tests resilient to concurrent callbacks (Mutex/Channel + order-independent assertions).
kotlin-sdk-testing/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransport.kt Route inbound messages through new concurrent handler pipeline and graceful shutdown.
kotlin-sdk-testing/api/kotlin-sdk-testing.api Update API surface for LinkedTransports.close().
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt Make server-side test message capture thread-safe under concurrent dispatch.
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt Update ordering expectations; add concurrency-oriented coverage.
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/AbstractKtorExtensionsTest.kt Add timeout guard to reduce SSE test hangs.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerTransport.kt Pass session coroutine context into base transport for better scoping.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt Use new handler/error helpers; ensure handler shutdown on close; process batch messages via jobs.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt Move to AbstractTransport scope + concurrent message handling + unified error handling.
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt Bind transport scope to call context; use inline handler path where required; unified error handling.
kotlin-sdk-core/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransportTest.kt Minor formatting adjustment.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt Improve JSON-RPC polymorphic deserialization error clarity for non-object payloads.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt Add coroutine naming for messages + RequestId.asString(); optimize fromJSON() conversion.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport.kt Migrate WebSocket transport to new handler/error model; shutdown handlers on close.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt Downgrade unknown progress-token handling from error to warning (concurrency-friendly).
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt Core change: add concurrent dispatch, handler tracking, safe error handling, and shutdown helpers.
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport.kt Plumb transport/handler coroutine contexts into base; route send errors via handleError.
kotlin-sdk-core/api/kotlin-sdk-core.api Update published API due to new constructors/helpers.
kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt Make message capture thread-safe under concurrent dispatch.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt Route inbound messages/errors through new helpers; fix SSE multi-line data: concatenation.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt Remove custom scope in favor of base transport scope; concurrent message handling + unified shutdown.
kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport.kt Adopt new handler/error helpers; adjust close behavior in collector.
integration-test/src/jvmTest/typescript/package-lock.json Lockfile metadata update (hasInstallScript).
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/testing/ChannelTransportTestJvm.kt Rename JVM test class to avoid collisions/clarify intent.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/sse/KotlinServerForTsClientSse.kt Update custom HTTP transport to use new handler helpers and scoped context.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/channel/ToolIntegrationTestChannel.kt Add channel-based integration test variant for concurrency behavior.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt Add Channel transport option and lifecycle management in integration harness.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractToolIntegrationTest.kt Add parameterized concurrency test ensuring long tool execution doesn’t block other calls.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt Increase connect timeout to reduce flakiness.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/InMemoryTransport.kt Update in-memory transport to use new handler helpers and shutdown semantics.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt Make base transport test thread-safe and order-independent under concurrent processing.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt Update in-memory tests for new transport construction and thread-safety.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt Update mock transport to use new base constructor + handleMessage(...).
AGENTS.md Document concurrency-safe testing guidance and SSE data: concatenation rule.
Files not reviewed (1)
  • integration-test/src/jvmTest/typescript/package-lock.json: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 132 to 134
} finally {
closeResources()
close()
}
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collectMessages() calls close() in finally. If the SSE connection fails before endpoint is received, the transport is still in Initializing state, and AbstractClientTransport.close() is a no-op for that state; this can leave initialize() stuck on endpoint.await() and leak the session/job. Prefer calling closeResources() (as before) and/or completing endpoint exceptionally on failure so initialize() can terminate and start() can clean up correctly.

Copilot uses AI. Check for mistakes.

protected fun handleMessage(message: JSONRPCMessage): Job {
val name = message.coroutineName
return scope.launch(handlerContext + name) {
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlerContext is accepted as a full CoroutineContext, so callers can (and in tests do) pass a context that contains a Job. In handleMessage, scope.launch(handlerContext + name) will then override the transport scope’s Job, so handler coroutines may not be cancelled when the transport scope is cancelled (e.g., via invokeOnCloseCallback). Consider sanitizing handlerContext (e.g., handlerContext.minusKey(Job)) or narrowing the type to a CoroutineDispatcher to ensure handler jobs remain children of the transport scope.

Suggested change
return scope.launch(handlerContext + name) {
// Ensure handler jobs remain children of the transport scope by removing any Job from handlerContext
return scope.launch(handlerContext.minusKey(Job) + name) {

Copilot uses AI. Check for mistakes.
Comment on lines +333 to +340
// Simulate Protocol's new behavior: launch a coroutine for each message
launch {
if ((message as? JSONRPCRequest)?.method == "slow") {
firstMessageReceived.complete(Unit)
firstMessageProceed.await()
} else {
secondMessageHandled.complete(Unit)
}
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test intends to prove transport-level concurrent processing, but the handler immediately launches and returns, which allows sequential transports to keep reading the next message even if the launched coroutine is still suspended. To actually validate concurrent handling, the slow-path should suspend directly inside the onMessage handler (no extra launch), so a sequential implementation would block message2 while a concurrent one would not.

Suggested change
// Simulate Protocol's new behavior: launch a coroutine for each message
launch {
if ((message as? JSONRPCRequest)?.method == "slow") {
firstMessageReceived.complete(Unit)
firstMessageProceed.await()
} else {
secondMessageHandled.complete(Unit)
}
// Simulate a slow-path handler that actually suspends inside onMessage
if ((message as? JSONRPCRequest)?.method == "slow") {
firstMessageReceived.complete(Unit)
firstMessageProceed.await()
} else {
secondMessageHandled.complete(Unit)

Copilot uses AI. Check for mistakes.
Comment on lines 223 to 225
receiveChannel.send(JSONRPCRequest(RequestId.NumberId(2), "m2"))
delay(100)
receiveChannel.send(JSONRPCRequest(RequestId.NumberId(3), "m3"))
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fixed delay(100) makes this test timing-dependent and can still be flaky on slow/contended CI (or unnecessarily slow on fast machines). Prefer synchronizing deterministically (e.g., wait until message 2 has been observed/started via a CompletableDeferred, or use yield()/eventually around assertions) instead of a hard-coded sleep.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants