From dab81a197a94575dff61383233d9a5ed73930968 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 01:21:01 +0100 Subject: [PATCH 1/6] introduce configurable SSE reconnection options and handle server-driven retry delays in StreamableHttpClientTransport --- .../kotlin/sdk/client/ReconnectionOptions.kt | 44 ++++ .../client/StreamableHttpClientTransport.kt | 153 +++++++++++--- .../StreamableHttpMcpKtorClientExtensions.kt | 60 +++++- .../http/StreamableHttpClientTransportTest.kt | 198 ++++++++++++++++++ .../server/StreamableHttpServerTransport.kt | 11 +- 5 files changed, 432 insertions(+), 34 deletions(-) create mode 100644 kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt new file mode 100644 index 000000000..b22ad6cfe --- /dev/null +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt @@ -0,0 +1,44 @@ +package io.modelcontextprotocol.kotlin.sdk.client + +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * Options for controlling SSE reconnection behavior. + * + * @property initialReconnectionDelay The initial delay before the first reconnection attempt. + * @property maxReconnectionDelay The maximum delay between reconnection attempts. + * @property reconnectionDelayGrowFactor The factor by which the delay grows on each attempt. + * @property maxRetries The maximum number of reconnection attempts per disconnect. + */ +public class ReconnectionOptions( + public val initialReconnectionDelay: Duration = 1.seconds, + public val maxReconnectionDelay: Duration = 30.seconds, + public val reconnectionDelayGrowFactor: Double = 1.5, + public val maxRetries: Int = 2, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || this::class != other::class) return false + + other as ReconnectionOptions + + if (reconnectionDelayGrowFactor != other.reconnectionDelayGrowFactor) return false + if (maxRetries != other.maxRetries) return false + if (initialReconnectionDelay != other.initialReconnectionDelay) return false + if (maxReconnectionDelay != other.maxReconnectionDelay) return false + + return true + } + + override fun hashCode(): Int { + var result = reconnectionDelayGrowFactor.hashCode() + result = 31 * result + maxRetries + result = 31 * result + initialReconnectionDelay.hashCode() + result = 31 * result + maxReconnectionDelay.hashCode() + return result + } + + override fun toString(): String = + "ReconnectionOptions(initialReconnectionDelay=$initialReconnectionDelay, maxReconnectionDelay=$maxReconnectionDelay, reconnectionDelayGrowFactor=$reconnectionDelayGrowFactor, maxRetries=$maxRetries)" +} diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index 8526042cf..f1359b38e 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -38,9 +38,15 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.math.min +import kotlin.math.pow import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds private const val MCP_SESSION_ID_HEADER = "mcp-session-id" private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" @@ -61,10 +67,23 @@ public class StreamableHttpError(public val code: Int? = null, message: String? public class StreamableHttpClientTransport( private val client: HttpClient, private val url: String, - private val reconnectionTime: Duration? = null, + private val reconnectionOptions: ReconnectionOptions = ReconnectionOptions(), private val requestBuilder: HttpRequestBuilder.() -> Unit = {}, ) : AbstractClientTransport() { + @Deprecated( + "Use constructor with ReconnectionOptions", + replaceWith = ReplaceWith( + "StreamableHttpClientTransport(client, url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + ), + ) + public constructor( + client: HttpClient, + url: String, + reconnectionTime: Duration?, + requestBuilder: HttpRequestBuilder.() -> Unit = {}, + ) : this(client, url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder) + override val logger: KLogger = KotlinLogging.logger {} public var sessionId: String? = null @@ -77,6 +96,37 @@ public class StreamableHttpClientTransport( private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) } private var lastEventId: String? = null + private var serverRetryDelay: Duration? = null + + private data class SseStreamResult(val hasPrimingEvent: Boolean, val receivedResponse: Boolean) + + private fun getNextReconnectionDelay(attempt: Int): Duration { + serverRetryDelay?.let { return it } + val delayMs = reconnectionOptions.initialReconnectionDelay.inWholeMilliseconds * + reconnectionOptions.reconnectionDelayGrowFactor.pow(attempt) + return min(delayMs, reconnectionOptions.maxReconnectionDelay.inWholeMilliseconds.toDouble()) + .toLong().milliseconds + } + + /** + * Checks if an SSE session error is non-retryable (404, 405, JSON-only). + * Returns `true` if non-retryable (should stop trying), `false` otherwise. + */ + private fun isNonRetryableSseError(e: SSEClientException): Boolean { + val responseStatus = e.response?.status + val responseContentType = e.response?.contentType() + + if (responseStatus == HttpStatusCode.NotFound || responseStatus == HttpStatusCode.MethodNotAllowed) { + logger.info { "Server returned ${responseStatus.value} for GET/SSE, stream disabled." } + return true + } + if (responseContentType?.match(ContentType.Application.Json) == true) { + logger.info { "Server returned application/json for GET/SSE, using JSON-only mode." } + return true + } + + return false + } override suspend fun initialize() { logger.debug { "Client transport is starting..." } @@ -133,11 +183,27 @@ public class StreamableHttpClientTransport( } } - ContentType.Text.EventStream -> handleInlineSse( - response, - onResumptionToken = options?.onResumptionToken, - replayMessageId = if (message is JSONRPCRequest) message.id else null, - ) + ContentType.Text.EventStream -> { + val result = handleInlineSse( + response, + onResumptionToken = options?.onResumptionToken, + replayMessageId = if (message is JSONRPCRequest) message.id else null, + ) + if (result.hasPrimingEvent && !result.receivedResponse) { + scope.launch { + try { + serverRetryDelay?.let { delay(it) } + startSseSession( + resumptionToken = lastEventId, + replayMessageId = if (message is JSONRPCRequest) message.id else null, + onResumptionToken = options?.onResumptionToken, + ) + } catch (e: Exception) { + logger.debug { "POST-to-GET SSE reconnection failed: ${e.message}" } + } + } + } + } else -> { val body = response.bodyAsText() @@ -205,6 +271,7 @@ public class StreamableHttpClientTransport( logger.debug { "Session terminated successfully" } } + @Suppress("CyclomaticComplexMethod", "NestedBlockDepth", "LongMethod") private suspend fun startSseSession( resumptionToken: String? = null, replayMessageId: RequestId? = null, @@ -217,7 +284,7 @@ public class StreamableHttpClientTransport( try { sseSession = client.sseSession( urlString = url, - reconnectionTime = reconnectionTime, + showRetryEvents = true, ) { method = HttpMethod.Get applyCommonHeaders(this) @@ -228,28 +295,43 @@ public class StreamableHttpClientTransport( } logger.debug { "Client SSE session started successfully." } } catch (e: SSEClientException) { - val responseStatus = e.response?.status - val responseContentType = e.response?.contentType() - - // 404 or 405 means server doesn't support SSE at GET endpoint - this is expected and valid - if (responseStatus == HttpStatusCode.NotFound || responseStatus == HttpStatusCode.MethodNotAllowed) { - logger.info { "Server returned ${responseStatus.value} for GET/SSE, stream disabled." } - return - } - - // If server returns application/json, it means it doesn't support SSE for this session - // This is valid per spec - server can choose to only use JSON responses - if (responseContentType?.match(ContentType.Application.Json) == true) { - logger.info { "Server returned application/json for GET/SSE, using JSON-only mode." } - return - } - - _onError(e) + if (isNonRetryableSseError(e)) return throw e } sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) { - sseSession?.let { collectSse(it, replayMessageId, onResumptionToken) } + while (isActive) { + val result = sseSession?.let { collectSse(it, replayMessageId, onResumptionToken) } ?: break + if (result.receivedResponse) break + + var attempts = 0 + var reconnected = false + while (isActive && attempts < reconnectionOptions.maxRetries) { + delay(getNextReconnectionDelay(attempts)) + attempts++ + try { + sseSession = client.sseSession( + urlString = url, + showRetryEvents = true, + ) { + method = HttpMethod.Get + applyCommonHeaders(this) + accept(ContentType.Application.Json) + lastEventId?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } + requestBuilder() + } + reconnected = true + break + } catch (e: SSEClientException) { + if (isNonRetryableSseError(e)) return@launch + } + } + + if (!reconnected) { + _onError(StreamableHttpError(null, "Maximum reconnection attempts exceeded")) + break + } + } } } @@ -265,11 +347,15 @@ public class StreamableHttpClientTransport( session: ClientSSESession, replayMessageId: RequestId?, onResumptionToken: ((String) -> Unit)?, - ) { + ): SseStreamResult { + var hasPrimingEvent = false + var receivedResponse = false try { session.incoming.collect { event -> + event.retry?.let { serverRetryDelay = it.milliseconds } event.id?.let { lastEventId = it + hasPrimingEvent = true onResumptionToken?.invoke(it) } logger.trace { "Client received SSE event: event=${event.event}, data=${event.data}, id=${event.id}" } @@ -278,6 +364,7 @@ public class StreamableHttpClientTransport( event.data?.takeIf { it.isNotEmpty() }?.let { json -> runCatching { McpJson.decodeFromString(json) } .onSuccess { msg -> + if (msg is JSONRPCResponse) receivedResponse = true if (replayMessageId != null && msg is JSONRPCResponse) { _onMessage(msg.copy(id = replayMessageId)) } else { @@ -295,6 +382,7 @@ public class StreamableHttpClientTransport( } catch (t: Throwable) { _onError(t) } + return SseStreamResult(hasPrimingEvent, receivedResponse) } @Suppress("CyclomaticComplexMethod") @@ -302,10 +390,12 @@ public class StreamableHttpClientTransport( response: HttpResponse, replayMessageId: RequestId?, onResumptionToken: ((String) -> Unit)?, - ) { + ): SseStreamResult { logger.trace { "Handling inline SSE from POST response" } val channel = response.bodyAsChannel() + var hasPrimingEvent = false + var receivedResponse = false val sb = StringBuilder() var id: String? = null var eventName: String? = null @@ -313,6 +403,7 @@ public class StreamableHttpClientTransport( suspend fun dispatch(id: String?, eventName: String?, data: String) { id?.let { lastEventId = it + hasPrimingEvent = true onResumptionToken?.invoke(it) } if (data.isBlank()) { @@ -321,6 +412,7 @@ public class StreamableHttpClientTransport( if (eventName == null || eventName == "message") { runCatching { McpJson.decodeFromString(data) } .onSuccess { msg -> + if (msg is JSONRPCResponse) receivedResponse = true if (replayMessageId != null && msg is JSONRPCResponse) { _onMessage(msg.copy(id = replayMessageId)) } else { @@ -351,9 +443,16 @@ public class StreamableHttpClientTransport( } when { line.startsWith("id:") -> id = line.substringAfter("id:").trim() + line.startsWith("event:") -> eventName = line.substringAfter("event:").trim() + line.startsWith("data:") -> sb.append(line.substringAfter("data:").trim()) + + line.startsWith("retry:") -> line.substringAfter("retry:").trim().toLongOrNull()?.let { + serverRetryDelay = it.milliseconds + } } } + return SseStreamResult(hasPrimingEvent, receivedResponse) } } diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt index b64a22062..880820adf 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt @@ -6,21 +6,66 @@ import io.modelcontextprotocol.kotlin.sdk.LIB_VERSION import io.modelcontextprotocol.kotlin.sdk.shared.IMPLEMENTATION_NAME import io.modelcontextprotocol.kotlin.sdk.types.Implementation import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds /** * Returns a new Streamable HTTP transport for the Model Context Protocol using the provided HttpClient. * * @param url URL of the MCP server. - * @param reconnectionTime Optional duration to wait before attempting to reconnect. + * @param reconnectionOptions Options for controlling SSE reconnection behavior. * @param requestBuilder Optional lambda to configure the HTTP request. * @return A [StreamableHttpClientTransport] configured for MCP communication. */ public fun HttpClient.mcpStreamableHttpTransport( url: String, - reconnectionTime: Duration? = null, + reconnectionOptions: ReconnectionOptions = ReconnectionOptions(), requestBuilder: HttpRequestBuilder.() -> Unit = {}, ): StreamableHttpClientTransport = - StreamableHttpClientTransport(this, url, reconnectionTime, requestBuilder = requestBuilder) + StreamableHttpClientTransport(this, url, reconnectionOptions, requestBuilder = requestBuilder) + +/** + * Returns a new Streamable HTTP transport for the Model Context Protocol using the provided HttpClient. + * + * @param url URL of the MCP server. + * @param reconnectionTime Optional duration to wait before attempting to reconnect. + * @param requestBuilder Optional lambda to configure the HTTP request. + * @return A [StreamableHttpClientTransport] configured for MCP communication. + */ +@Deprecated( + "Use overload with ReconnectionOptions", + replaceWith = ReplaceWith( + "mcpStreamableHttpTransport(url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + ), +) +public fun HttpClient.mcpStreamableHttpTransport( + url: String, + reconnectionTime: Duration?, + requestBuilder: HttpRequestBuilder.() -> Unit = {}, +): StreamableHttpClientTransport = StreamableHttpClientTransport( + this, + url, + ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), + requestBuilder = requestBuilder, +) + +/** + * Creates and connects an MCP client over Streamable HTTP using the provided HttpClient. + * + * @param url URL of the MCP server. + * @param reconnectionOptions Options for controlling SSE reconnection behavior. + * @param requestBuilder Optional lambda to configure the HTTP request. + * @return A connected [Client] ready for MCP communication. + */ +public suspend fun HttpClient.mcpStreamableHttp( + url: String, + reconnectionOptions: ReconnectionOptions = ReconnectionOptions(), + requestBuilder: HttpRequestBuilder.() -> Unit = {}, +): Client { + val transport = mcpStreamableHttpTransport(url, reconnectionOptions, requestBuilder) + val client = Client(Implementation(name = IMPLEMENTATION_NAME, version = LIB_VERSION)) + client.connect(transport) + return client +} /** * Creates and connects an MCP client over Streamable HTTP using the provided HttpClient. @@ -30,11 +75,18 @@ public fun HttpClient.mcpStreamableHttpTransport( * @param requestBuilder Optional lambda to configure the HTTP request. * @return A connected [Client] ready for MCP communication. */ +@Deprecated( + "Use overload with ReconnectionOptions", + replaceWith = ReplaceWith( + "mcpStreamableHttp(url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + ), +) public suspend fun HttpClient.mcpStreamableHttp( url: String, - reconnectionTime: Duration? = null, + reconnectionTime: Duration?, requestBuilder: HttpRequestBuilder.() -> Unit = {}, ): Client { + @Suppress("DEPRECATION") val transport = mcpStreamableHttpTransport(url, reconnectionTime, requestBuilder) val client = Client(Implementation(name = IMPLEMENTATION_NAME, version = LIB_VERSION)) client.connect(transport) diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt index e303326ff..26f0a6a9f 100644 --- a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt @@ -23,9 +23,11 @@ import io.modelcontextprotocol.kotlin.sdk.types.Implementation import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCNotification import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse import io.modelcontextprotocol.kotlin.sdk.types.McpException import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.RPCError +import io.modelcontextprotocol.kotlin.sdk.types.RequestId import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay @@ -657,6 +659,202 @@ class StreamableHttpClientTransportTest { receivedErrors shouldHaveSize 0 } + @Test + fun testInlineSseRetryParsing() = runTest { + val transport = createTransport { request -> + if (request.method == HttpMethod.Post) { + val sseContent = buildString { + appendLine("retry: 5000") + appendLine("id: ev-1") + appendLine("event: message") + appendLine("""data: {"jsonrpc":"2.0","id":"req-1","result":{"tools":[]}}""") + appendLine() + } + + respond( + content = ByteReadChannel(sseContent), + status = HttpStatusCode.OK, + headers = headersOf( + HttpHeaders.ContentType, + ContentType.Text.EventStream.toString(), + ), + ) + } else { + respond("", HttpStatusCode.OK) + } + } + + val receivedMessages = mutableListOf() + val responseReceived = CompletableDeferred() + + transport.onMessage { message -> + receivedMessages.add(message) + if (message is JSONRPCResponse && !responseReceived.isCompleted) { + responseReceived.complete(Unit) + } + } + + transport.start() + + transport.send( + JSONRPCRequest( + id = "req-1", + method = "test", + params = buildJsonObject { }, + ), + ) + + eventually { + responseReceived.await() + } + + receivedMessages shouldHaveSize 1 + val response = receivedMessages[0] as JSONRPCResponse + response.id shouldBe RequestId.StringId("req-1") + + transport.close() + } + + @Test + fun testInlineSseHasPrimingEventTracking() = runTest { + val transport = createTransport { request -> + if (request.method == HttpMethod.Post) { + val sseContent = buildString { + // Event with id = priming event + appendLine("id: priming-1") + appendLine("event: message") + appendLine( + """data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"t1","progress":50}}""", + ) + appendLine() + // Notification without id + appendLine("event: message") + appendLine("""data: {"jsonrpc":"2.0","method":"notifications/tools/list_changed"}""") + appendLine() + } + + respond( + content = ByteReadChannel(sseContent), + status = HttpStatusCode.OK, + headers = headersOf( + HttpHeaders.ContentType, + ContentType.Text.EventStream.toString(), + ), + ) + } else { + respond("", HttpStatusCode.OK) + } + } + + val receivedMessages = mutableListOf() + val twoMessagesReceived = CompletableDeferred() + + transport.onMessage { message -> + receivedMessages.add(message) + if (receivedMessages.size >= 2 && !twoMessagesReceived.isCompleted) { + twoMessagesReceived.complete(Unit) + } + } + + transport.start() + + transport.send( + JSONRPCRequest( + id = "test-1", + method = "test", + params = buildJsonObject { }, + ), + ) + + eventually { + twoMessagesReceived.await() + } + + receivedMessages shouldHaveSize 2 + // Both should be notifications (no JSONRPCResponse → POST-to-GET reconnect would be triggered) + receivedMessages[0].shouldBeInstanceOf() + receivedMessages[1].shouldBeInstanceOf() + + transport.close() + } + + @Test + fun testInlineSseResponseStopsReconnection() = runTest { + val transport = createTransport { request -> + if (request.method == HttpMethod.Post) { + val sseContent = buildString { + appendLine("id: ev-1") + appendLine("event: message") + appendLine("""data: {"jsonrpc":"2.0","id":"req-1","result":{"tools":[]}}""") + appendLine() + } + + respond( + content = ByteReadChannel(sseContent), + status = HttpStatusCode.OK, + headers = headersOf( + HttpHeaders.ContentType, + ContentType.Text.EventStream.toString(), + ), + ) + } else { + respond("", HttpStatusCode.OK) + } + } + + val receivedMessages = mutableListOf() + val responseReceived = CompletableDeferred() + + transport.onMessage { message -> + receivedMessages.add(message) + if (message is JSONRPCResponse && !responseReceived.isCompleted) { + responseReceived.complete(Unit) + } + } + + transport.start() + + transport.send( + JSONRPCRequest( + id = "req-1", + method = "tools/list", + params = buildJsonObject { }, + ), + ) + + eventually { + responseReceived.await() + } + + receivedMessages shouldHaveSize 1 + // Response received → no reconnection triggered (hasPrimingEvent=true, receivedResponse=true) + val response = receivedMessages[0] as JSONRPCResponse + response.id shouldBe RequestId.StringId("req-1") + + transport.close() + } + + @Suppress("DEPRECATION") + @Test + fun testDeprecatedConstructorStillWorks() = runTest { + val mockEngine = MockEngine { _ -> + respond( + content = "", + status = HttpStatusCode.Accepted, + ) + } + val httpClient = HttpClient(mockEngine) { + install(SSE) + } + + val transport = + StreamableHttpClientTransport(httpClient, url = "http://localhost:8080/mcp", reconnectionTime = 2.seconds) + + transport.start() + transport.send(JSONRPCNotification(method = "test")) + transport.close() + } + private suspend fun setupTransportAndCollectMessages( transport: StreamableHttpClientTransport, ): Pair, MutableList> { diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 580fcc0e5..a66c17b29 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -388,7 +388,7 @@ public class StreamableHttpServerTransport(private val configuration: Configurat if (!configuration.enableJsonResponse) { call.appendSseHeaders() flushSse(session) // flush headers immediately - maybeSendPrimingEvent(streamId, session) + maybeSendPrimingEvent(streamId, session, call.request.header(MCP_PROTOCOL_VERSION_HEADER)) } streamMutex.withLock { @@ -451,7 +451,7 @@ public class StreamableHttpServerTransport(private val configuration: Configurat call.appendSseHeaders() flushSse(sseSession) // flush headers immediately streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(sseSession, call) - maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession) + maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession, call.request.header(MCP_PROTOCOL_VERSION_HEADER)) sseSession.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } @@ -702,9 +702,14 @@ public class StreamableHttpServerTransport(private val configuration: Configurat } @Suppress("TooGenericExceptionCaught") - private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) { + private suspend fun maybeSendPrimingEvent( + streamId: String, + session: ServerSSESession?, + clientProtocolVersion: String? = null, + ) { val store = configuration.eventStore ?: return val sseSession = session ?: return + if (clientProtocolVersion != null && clientProtocolVersion < "2025-11-25") return try { val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage) sseSession.send( From cf3dad4d90cdf06a1189c5f03965fe68c7c46bdc Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 02:28:20 +0100 Subject: [PATCH 2/6] refactor StreamableHttpClientTransport to improve SSE reconnection logic, address retry behavior, and ensure compatibility with protocol updates --- build.gradle.kts | 2 +- .../client/StreamableHttpClientTransport.kt | 123 +++++++++--------- .../StreamableHttpMcpKtorClientExtensions.kt | 13 +- .../http/StreamableHttpClientTransportTest.kt | 3 +- .../server/StreamableHttpServerTransport.kt | 12 +- 5 files changed, 85 insertions(+), 68 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 4fe64dd1a..72c55539b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -24,7 +24,7 @@ subprojects { apply(plugin = "org.jlleitschuh.gradle.ktlint") apply(plugin = "org.jetbrains.kotlinx.kover") - if (name != "conformance-test") { + if (name != "conformance-test" && name != "docs") { apply(plugin = "dev.detekt") detekt { diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index f1359b38e..711b0f7a2 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -41,8 +41,8 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlin.concurrent.Volatile import kotlin.concurrent.atomics.ExperimentalAtomicApi -import kotlin.math.min import kotlin.math.pow import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @@ -64,6 +64,7 @@ public class StreamableHttpError(public val code: Int? = null, message: String? * for receiving messages. */ @OptIn(ExperimentalAtomicApi::class) +@Suppress("TooManyFunctions") public class StreamableHttpClientTransport( private val client: HttpClient, private val url: String, @@ -74,7 +75,8 @@ public class StreamableHttpClientTransport( @Deprecated( "Use constructor with ReconnectionOptions", replaceWith = ReplaceWith( - "StreamableHttpClientTransport(client, url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + "StreamableHttpClientTransport(client, url, " + + "ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", ), ) public constructor( @@ -95,39 +97,15 @@ public class StreamableHttpClientTransport( private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) } + @Volatile private var lastEventId: String? = null + + @Volatile private var serverRetryDelay: Duration? = null + /** Result of SSE stream collection. Reconnect when [hasPrimingEvent] is true and [receivedResponse] is false. */ private data class SseStreamResult(val hasPrimingEvent: Boolean, val receivedResponse: Boolean) - private fun getNextReconnectionDelay(attempt: Int): Duration { - serverRetryDelay?.let { return it } - val delayMs = reconnectionOptions.initialReconnectionDelay.inWholeMilliseconds * - reconnectionOptions.reconnectionDelayGrowFactor.pow(attempt) - return min(delayMs, reconnectionOptions.maxReconnectionDelay.inWholeMilliseconds.toDouble()) - .toLong().milliseconds - } - - /** - * Checks if an SSE session error is non-retryable (404, 405, JSON-only). - * Returns `true` if non-retryable (should stop trying), `false` otherwise. - */ - private fun isNonRetryableSseError(e: SSEClientException): Boolean { - val responseStatus = e.response?.status - val responseContentType = e.response?.contentType() - - if (responseStatus == HttpStatusCode.NotFound || responseStatus == HttpStatusCode.MethodNotAllowed) { - logger.info { "Server returned ${responseStatus.value} for GET/SSE, stream disabled." } - return true - } - if (responseContentType?.match(ContentType.Application.Json) == true) { - logger.info { "Server returned application/json for GET/SSE, using JSON-only mode." } - return true - } - - return false - } - override suspend fun initialize() { logger.debug { "Client transport is starting..." } } @@ -135,7 +113,7 @@ public class StreamableHttpClientTransport( /** * Sends a single message with optional resumption support */ - @Suppress("ReturnCount", "CyclomaticComplexMethod") + @Suppress("ReturnCount", "CyclomaticComplexMethod", "LongMethod", "TooGenericExceptionCaught", "ThrowsCount") override suspend fun performSend(message: JSONRPCMessage, options: TransportSendOptions?) { logger.debug { "Client sending message via POST to $url: ${McpJson.encodeToString(message)}" } @@ -210,7 +188,7 @@ public class StreamableHttpClientTransport( if (response.contentType() == null && body.isBlank()) return val ct = response.contentType()?.toString() ?: "" - val error = StreamableHttpError(-1, "Unexpected content type: $$ct") + val error = StreamableHttpError(-1, "Unexpected content type: $ct") _onError(error) throw error } @@ -271,7 +249,6 @@ public class StreamableHttpClientTransport( logger.debug { "Session terminated successfully" } } - @Suppress("CyclomaticComplexMethod", "NestedBlockDepth", "LongMethod") private suspend fun startSseSession( resumptionToken: String? = null, replayMessageId: RequestId? = null, @@ -300,34 +277,11 @@ public class StreamableHttpClientTransport( } sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) { + @Suppress("LoopWithTooManyJumpStatements") while (isActive) { val result = sseSession?.let { collectSse(it, replayMessageId, onResumptionToken) } ?: break if (result.receivedResponse) break - - var attempts = 0 - var reconnected = false - while (isActive && attempts < reconnectionOptions.maxRetries) { - delay(getNextReconnectionDelay(attempts)) - attempts++ - try { - sseSession = client.sseSession( - urlString = url, - showRetryEvents = true, - ) { - method = HttpMethod.Get - applyCommonHeaders(this) - accept(ContentType.Application.Json) - lastEventId?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } - requestBuilder() - } - reconnected = true - break - } catch (e: SSEClientException) { - if (isNonRetryableSseError(e)) return@launch - } - } - - if (!reconnected) { + if (!reconnectSseSession()) { _onError(StreamableHttpError(null, "Maximum reconnection attempts exceeded")) break } @@ -335,6 +289,59 @@ public class StreamableHttpClientTransport( } } + @Suppress("ReturnCount") + private suspend fun reconnectSseSession(): Boolean { + for (attempt in 0 until reconnectionOptions.maxRetries) { + delay(getNextReconnectionDelay(attempt)) + try { + sseSession = client.sseSession( + urlString = url, + showRetryEvents = true, + ) { + method = HttpMethod.Get + applyCommonHeaders(this) + accept(ContentType.Application.Json) + lastEventId?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } + requestBuilder() + } + return true + } catch (e: SSEClientException) { + if (isNonRetryableSseError(e)) return false + } + } + return false + } + + private fun getNextReconnectionDelay(attempt: Int): Duration { + serverRetryDelay?.let { return it } + val delay = reconnectionOptions.initialReconnectionDelay * + reconnectionOptions.reconnectionDelayGrowFactor.pow(attempt) + return delay.coerceAtMost(reconnectionOptions.maxReconnectionDelay) + } + + /** + * Checks if an SSE session error is non-retryable (404, 405, JSON-only). + * Returns `true` if non-retryable (should stop trying), `false` otherwise. + */ + private fun isNonRetryableSseError(e: SSEClientException): Boolean { + val responseStatus = e.response?.status + val responseContentType = e.response?.contentType() + + return when { + responseStatus == HttpStatusCode.NotFound || responseStatus == HttpStatusCode.MethodNotAllowed -> { + logger.info { "Server returned ${responseStatus.value} for GET/SSE, stream disabled." } + true + } + + responseContentType?.match(ContentType.Application.Json) == true -> { + logger.info { "Server returned application/json for GET/SSE, using JSON-only mode." } + true + } + + else -> false + } + } + private fun applyCommonHeaders(builder: HttpRequestBuilder) { builder.headers { sessionId?.let { append(MCP_SESSION_ID_HEADER, it) } diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt index 880820adf..a618f1823 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensions.kt @@ -34,7 +34,8 @@ public fun HttpClient.mcpStreamableHttpTransport( @Deprecated( "Use overload with ReconnectionOptions", replaceWith = ReplaceWith( - "mcpStreamableHttpTransport(url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + "mcpStreamableHttpTransport(url, " + + "ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", ), ) public fun HttpClient.mcpStreamableHttpTransport( @@ -78,7 +79,8 @@ public suspend fun HttpClient.mcpStreamableHttp( @Deprecated( "Use overload with ReconnectionOptions", replaceWith = ReplaceWith( - "mcpStreamableHttp(url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + "mcpStreamableHttp(url, " + + "ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", ), ) public suspend fun HttpClient.mcpStreamableHttp( @@ -86,8 +88,11 @@ public suspend fun HttpClient.mcpStreamableHttp( reconnectionTime: Duration?, requestBuilder: HttpRequestBuilder.() -> Unit = {}, ): Client { - @Suppress("DEPRECATION") - val transport = mcpStreamableHttpTransport(url, reconnectionTime, requestBuilder) + val transport = mcpStreamableHttpTransport( + url, + ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), + requestBuilder, + ) val client = Client(Implementation(name = IMPLEMENTATION_NAME, version = LIB_VERSION)) client.connect(transport) return client diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt index 26f0a6a9f..55df3e06e 100644 --- a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt @@ -724,7 +724,8 @@ class StreamableHttpClientTransportTest { appendLine("id: priming-1") appendLine("event: message") appendLine( - """data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"t1","progress":50}}""", + """data: {"jsonrpc":"2.0","method":"notifications/progress",""" + + """"params":{"progressToken":"t1","progress":50}}""", ) appendLine() // Notification without id diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index a66c17b29..9074c4c26 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -45,6 +45,7 @@ internal const val MCP_SESSION_ID_HEADER = "mcp-session-id" private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB +private const val MIN_PRIMING_EVENT_PROTOCOL_VERSION = "2025-11-25" /** * A holder for an active request call. @@ -707,12 +708,15 @@ public class StreamableHttpServerTransport(private val configuration: Configurat session: ServerSSESession?, clientProtocolVersion: String? = null, ) { - val store = configuration.eventStore ?: return - val sseSession = session ?: return - if (clientProtocolVersion != null && clientProtocolVersion < "2025-11-25") return + val store = configuration.eventStore + if (store == null || session == null) return + // Priming events have empty data which older clients cannot handle. + // Only send priming events to clients with protocol version >= 2025-11-25 + // which includes the fix for handling empty SSE data. + if (clientProtocolVersion != null && clientProtocolVersion < MIN_PRIMING_EVENT_PROTOCOL_VERSION) return try { val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage) - sseSession.send( + session.send( id = primingEventId, retry = configuration.retryInterval?.inWholeMilliseconds, data = "", From 6aafb56318d90d634f4ccfb9a23745fe55c410e6 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 02:31:49 +0100 Subject: [PATCH 3/6] update README and test baseline to reflect removal of `sse-retry` SDK limitation in conformance tests --- conformance-test/README.md | 3 +-- conformance-test/conformance-baseline.yml | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/conformance-test/README.md b/conformance-test/README.md index 0f08d751f..58d0d1a3a 100644 --- a/conformance-test/README.md +++ b/conformance-test/README.md @@ -110,7 +110,7 @@ Tests the conformance server against all server scenarios: ## Known SDK Limitations -9 scenarios are expected to fail due to current SDK limitations (tracked in [ +8 scenarios are expected to fail due to current SDK limitations (tracked in [ `conformance-baseline.yml`](conformance-baseline.yml). | Scenario | Suite | Root Cause | @@ -123,6 +123,5 @@ Tests the conformance server against all server scenarios: | `elicitation-sep1330-enums` | server | *(same as above)* | | `resources-templates-read` | server | SDK does not implement `addResourceTemplate()` with URI pattern matching; resources are looked up by exact URI | | `elicitation-sep1034-client-defaults` | client | SDK does not fill in `default` values from the elicitation request schema before sending the response | -| `sse-retry` | client | Transport does not respect the SSE `retry` field timing or send `Last-Event-ID` on reconnection | These failures reveal SDK gaps and are intentionally not fixed in this module. diff --git a/conformance-test/conformance-baseline.yml b/conformance-test/conformance-baseline.yml index 9126f0d34..cc06a389b 100644 --- a/conformance-test/conformance-baseline.yml +++ b/conformance-test/conformance-baseline.yml @@ -11,4 +11,3 @@ server: client: - elicitation-sep1034-client-defaults - - sse-retry From cee5c9c5306b3011ba1c7b6624fdb15317641f5f Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 18:40:17 +0100 Subject: [PATCH 4/6] Fix based on comments --- .../kotlin/sdk/client/ReconnectionOptions.kt | 10 +++++----- .../sdk/client/StreamableHttpClientTransport.kt | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt index b22ad6cfe..95c5bdefa 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt @@ -8,13 +8,13 @@ import kotlin.time.Duration.Companion.seconds * * @property initialReconnectionDelay The initial delay before the first reconnection attempt. * @property maxReconnectionDelay The maximum delay between reconnection attempts. - * @property reconnectionDelayGrowFactor The factor by which the delay grows on each attempt. + * @property reconnectionDelayMultiplier The factor by which the delay grows on each attempt. * @property maxRetries The maximum number of reconnection attempts per disconnect. */ public class ReconnectionOptions( public val initialReconnectionDelay: Duration = 1.seconds, public val maxReconnectionDelay: Duration = 30.seconds, - public val reconnectionDelayGrowFactor: Double = 1.5, + public val reconnectionDelayMultiplier: Double = 1.5, public val maxRetries: Int = 2, ) { override fun equals(other: Any?): Boolean { @@ -23,7 +23,7 @@ public class ReconnectionOptions( other as ReconnectionOptions - if (reconnectionDelayGrowFactor != other.reconnectionDelayGrowFactor) return false + if (reconnectionDelayMultiplier != other.reconnectionDelayMultiplier) return false if (maxRetries != other.maxRetries) return false if (initialReconnectionDelay != other.initialReconnectionDelay) return false if (maxReconnectionDelay != other.maxReconnectionDelay) return false @@ -32,7 +32,7 @@ public class ReconnectionOptions( } override fun hashCode(): Int { - var result = reconnectionDelayGrowFactor.hashCode() + var result = reconnectionDelayMultiplier.hashCode() result = 31 * result + maxRetries result = 31 * result + initialReconnectionDelay.hashCode() result = 31 * result + maxReconnectionDelay.hashCode() @@ -40,5 +40,5 @@ public class ReconnectionOptions( } override fun toString(): String = - "ReconnectionOptions(initialReconnectionDelay=$initialReconnectionDelay, maxReconnectionDelay=$maxReconnectionDelay, reconnectionDelayGrowFactor=$reconnectionDelayGrowFactor, maxRetries=$maxRetries)" + "ReconnectionOptions(initialReconnectionDelay=$initialReconnectionDelay, maxReconnectionDelay=$maxReconnectionDelay, reconnectionDelayMultiplier=$reconnectionDelayMultiplier, maxRetries=$maxRetries)" } diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index 711b0f7a2..9d58045c2 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -77,6 +77,8 @@ public class StreamableHttpClientTransport( replaceWith = ReplaceWith( "StreamableHttpClientTransport(client, url, " + "ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)", + "kotlin.time.Duration.Companion.seconds", + "io.modelcontextprotocol.kotlin.sdk.client.ReconnectionOptions", ), ) public constructor( @@ -94,6 +96,7 @@ public class StreamableHttpClientTransport( private var sseSession: ClientSSESession? = null private var sseJob: Job? = null + private var reconnectJob: Job? = null private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) } @@ -168,7 +171,7 @@ public class StreamableHttpClientTransport( replayMessageId = if (message is JSONRPCRequest) message.id else null, ) if (result.hasPrimingEvent && !result.receivedResponse) { - scope.launch { + reconnectJob = scope.launch { try { serverRetryDelay?.let { delay(it) } startSseSession( @@ -176,6 +179,8 @@ public class StreamableHttpClientTransport( replayMessageId = if (message is JSONRPCRequest) message.id else null, onResumptionToken = options?.onResumptionToken, ) + } catch (e: CancellationException) { + throw e } catch (e: Exception) { logger.debug { "POST-to-GET SSE reconnection failed: ${e.message}" } } @@ -219,6 +224,7 @@ public class StreamableHttpClientTransport( sseSession?.cancel() sseJob?.cancelAndJoin() + reconnectJob?.cancelAndJoin() scope.cancel() } @@ -305,8 +311,12 @@ public class StreamableHttpClientTransport( requestBuilder() } return true + } catch (e: CancellationException) { + throw e } catch (e: SSEClientException) { if (isNonRetryableSseError(e)) return false + } catch (e: Exception) { + logger.debug { "SSE reconnection attempt $attempt failed: ${e.message}" } } } return false @@ -315,7 +325,7 @@ public class StreamableHttpClientTransport( private fun getNextReconnectionDelay(attempt: Int): Duration { serverRetryDelay?.let { return it } val delay = reconnectionOptions.initialReconnectionDelay * - reconnectionOptions.reconnectionDelayGrowFactor.pow(attempt) + reconnectionOptions.reconnectionDelayMultiplier.pow(attempt) return delay.coerceAtMost(reconnectionOptions.maxReconnectionDelay) } From bd9a0f91a00acda3660fe96fe5ebc66fe130ad85 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 18:48:49 +0100 Subject: [PATCH 5/6] suppress "TooGenericExceptionCaught" warning in reconnectSseSession method --- .../kotlin/sdk/client/StreamableHttpClientTransport.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index 9d58045c2..0aba76d00 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -295,7 +295,7 @@ public class StreamableHttpClientTransport( } } - @Suppress("ReturnCount") + @Suppress("ReturnCount", "TooGenericExceptionCaught") private suspend fun reconnectSseSession(): Boolean { for (attempt in 0 until reconnectionOptions.maxRetries) { delay(getNextReconnectionDelay(attempt)) From 4491632223e8b4b3e3c7c1f33b8cf45fc104cb29 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 10 Mar 2026 22:54:35 +0100 Subject: [PATCH 6/6] refactor StreamableHttpClientTransport to eliminate shared mutable state --- kotlin-sdk-client/api/kotlin-sdk-client.api | 18 ++ .../client/StreamableHttpClientTransport.kt | 195 +++++++++--------- .../sdk/client/StreamableHttpClientTest.kt | 27 ++- 3 files changed, 142 insertions(+), 98 deletions(-) diff --git a/kotlin-sdk-client/api/kotlin-sdk-client.api b/kotlin-sdk-client/api/kotlin-sdk-client.api index 268a2b8df..da0e752a7 100644 --- a/kotlin-sdk-client/api/kotlin-sdk-client.api +++ b/kotlin-sdk-client/api/kotlin-sdk-client.api @@ -62,6 +62,18 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/KtorClientKt { public static synthetic fun mcpSseTransport-5_5nbZA$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/SseClientTransport; } +public final class io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions { + public synthetic fun (JJDIILkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (JJDILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun equals (Ljava/lang/Object;)Z + public final fun getInitialReconnectionDelay-UwyO8pc ()J + public final fun getMaxReconnectionDelay-UwyO8pc ()J + public final fun getMaxRetries ()I + public final fun getReconnectionDelayMultiplier ()D + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport { public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V @@ -88,6 +100,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTranspor } public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport { + public fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;)V + public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun getProtocolVersion ()Ljava/lang/String; @@ -106,8 +120,12 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpError } public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensionsKt { + public static final fun mcpStreamableHttp (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun mcpStreamableHttp$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun mcpStreamableHttp-BZiP2OM (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun mcpStreamableHttp-BZiP2OM$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun mcpStreamableHttpTransport (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport; + public static synthetic fun mcpStreamableHttpTransport$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport; public static final fun mcpStreamableHttpTransport-5_5nbZA (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport; public static synthetic fun mcpStreamableHttpTransport-5_5nbZA$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport; } diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index 0aba76d00..80f37aedc 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -41,8 +41,6 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlin.concurrent.Volatile -import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.math.pow import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @@ -58,12 +56,17 @@ private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" public class StreamableHttpError(public val code: Int? = null, message: String? = null) : Exception("Streamable HTTP error: $message") +private sealed interface ConnectResult { + data class Success(val session: ClientSSESession) : ConnectResult + data object NonRetryable : ConnectResult + data object Failed : ConnectResult +} + /** * Client transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. * It will connect to a server using HTTP POST for sending messages and HTTP GET with Server-Sent Events * for receiving messages. */ -@OptIn(ExperimentalAtomicApi::class) @Suppress("TooManyFunctions") public class StreamableHttpClientTransport( private val client: HttpClient, @@ -94,20 +97,17 @@ public class StreamableHttpClientTransport( private set public var protocolVersion: String? = null - private var sseSession: ClientSSESession? = null private var sseJob: Job? = null - private var reconnectJob: Job? = null private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) } - @Volatile - private var lastEventId: String? = null - - @Volatile - private var serverRetryDelay: Duration? = null - - /** Result of SSE stream collection. Reconnect when [hasPrimingEvent] is true and [receivedResponse] is false. */ - private data class SseStreamResult(val hasPrimingEvent: Boolean, val receivedResponse: Boolean) + /** Result of an SSE stream collection. Reconnect when [hasPrimingEvent] is true and [receivedResponse] is false. */ + private data class SseStreamResult( + val hasPrimingEvent: Boolean, + val receivedResponse: Boolean, + val lastEventId: String? = null, + val serverRetryDelay: Duration? = null, + ) override suspend fun initialize() { logger.debug { "Client transport is starting..." } @@ -165,26 +165,15 @@ public class StreamableHttpClientTransport( } ContentType.Text.EventStream -> { - val result = handleInlineSse( - response, - onResumptionToken = options?.onResumptionToken, - replayMessageId = if (message is JSONRPCRequest) message.id else null, - ) + val replayMessageId = if (message is JSONRPCRequest) message.id else null + val result = handleInlineSse(response, replayMessageId, options?.onResumptionToken) if (result.hasPrimingEvent && !result.receivedResponse) { - reconnectJob = scope.launch { - try { - serverRetryDelay?.let { delay(it) } - startSseSession( - resumptionToken = lastEventId, - replayMessageId = if (message is JSONRPCRequest) message.id else null, - onResumptionToken = options?.onResumptionToken, - ) - } catch (e: CancellationException) { - throw e - } catch (e: Exception) { - logger.debug { "POST-to-GET SSE reconnection failed: ${e.message}" } - } - } + startSseSession( + resumptionToken = result.lastEventId, + replayMessageId = replayMessageId, + onResumptionToken = options?.onResumptionToken, + initialServerRetryDelay = result.serverRetryDelay, + ) } } @@ -218,13 +207,7 @@ public class StreamableHttpClientTransport( override suspend fun closeResources() { logger.debug { "Client transport closing." } - - // Try to terminate session if we have one - terminateSession() - - sseSession?.cancel() sseJob?.cancelAndJoin() - reconnectJob?.cancelAndJoin() scope.cancel() } @@ -251,78 +234,94 @@ public class StreamableHttpClientTransport( } sessionId = null - lastEventId = null logger.debug { "Session terminated successfully" } } - private suspend fun startSseSession( + private fun startSseSession( resumptionToken: String? = null, replayMessageId: RequestId? = null, onResumptionToken: ((String) -> Unit)? = null, + initialServerRetryDelay: Duration? = null, ) { - sseSession?.cancel() - sseJob?.cancelAndJoin() + // Cancel-and-replace: cancel() signals the previous job, join() inside + // the new coroutine ensures it completes before we start collecting. + // This is intentionally non-suspend to avoid blocking performSend. + val previousJob = sseJob + previousJob?.cancel() + sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) { + previousJob?.join() + var lastEventId = resumptionToken + var serverRetryDelay = initialServerRetryDelay + var attempt = 0 + var needsDelay = initialServerRetryDelay != null + + @Suppress("LoopWithTooManyJumpStatements") + while (isActive) { + // Delay before (re)connection: skip only for first fresh SSE connection + if (needsDelay) { + delay(getNextReconnectionDelay(attempt, serverRetryDelay)) + } + needsDelay = true + + // Connect + val session = when (val cr = connectSse(lastEventId)) { + is ConnectResult.Success -> { + attempt = 0 + cr.session + } + + ConnectResult.NonRetryable -> return@launch + + ConnectResult.Failed -> { + // Give up after maxRetries consecutive failed connection attempts + if (++attempt >= reconnectionOptions.maxRetries) { + _onError(StreamableHttpError(null, "Maximum reconnection attempts exceeded")) + return@launch + } + continue + } + } + // Collect + val result = collectSse(session, replayMessageId, onResumptionToken) + lastEventId = result.lastEventId ?: lastEventId + serverRetryDelay = result.serverRetryDelay ?: serverRetryDelay + if (result.receivedResponse) break + } + } + } + + @Suppress("TooGenericExceptionCaught") + private suspend fun connectSse(lastEventId: String?): ConnectResult { logger.debug { "Client attempting to start SSE session at url: $url" } - try { - sseSession = client.sseSession( - urlString = url, - showRetryEvents = true, - ) { + return try { + val session = client.sseSession(urlString = url, showRetryEvents = true) { method = HttpMethod.Get applyCommonHeaders(this) - // sseSession will add ContentType.Text.EventStream automatically accept(ContentType.Application.Json) - (resumptionToken ?: lastEventId)?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } + lastEventId?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } requestBuilder() } logger.debug { "Client SSE session started successfully." } - } catch (e: SSEClientException) { - if (isNonRetryableSseError(e)) return + ConnectResult.Success(session) + } catch (e: CancellationException) { throw e - } - - sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) { - @Suppress("LoopWithTooManyJumpStatements") - while (isActive) { - val result = sseSession?.let { collectSse(it, replayMessageId, onResumptionToken) } ?: break - if (result.receivedResponse) break - if (!reconnectSseSession()) { - _onError(StreamableHttpError(null, "Maximum reconnection attempts exceeded")) - break - } - } - } - } - - @Suppress("ReturnCount", "TooGenericExceptionCaught") - private suspend fun reconnectSseSession(): Boolean { - for (attempt in 0 until reconnectionOptions.maxRetries) { - delay(getNextReconnectionDelay(attempt)) - try { - sseSession = client.sseSession( - urlString = url, - showRetryEvents = true, - ) { - method = HttpMethod.Get - applyCommonHeaders(this) - accept(ContentType.Application.Json) - lastEventId?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) } - requestBuilder() - } - return true - } catch (e: CancellationException) { - throw e - } catch (e: SSEClientException) { - if (isNonRetryableSseError(e)) return false - } catch (e: Exception) { - logger.debug { "SSE reconnection attempt $attempt failed: ${e.message}" } + } catch (e: SSEClientException) { + if (isNonRetryableSseError(e)) { + ConnectResult.NonRetryable + } else { + logger.debug { "SSE connection failed: ${e.message}" } + ConnectResult.Failed } + } catch (e: Exception) { + logger.debug { "SSE connection failed: ${e.message}" } + ConnectResult.Failed } - return false } - private fun getNextReconnectionDelay(attempt: Int): Duration { + private fun getNextReconnectionDelay(attempt: Int, serverRetryDelay: Duration?): Duration { + // Per SSE specification, the server-sent `retry` field sets the reconnection time + // for all subsequent attempts, taking priority over exponential backoff. serverRetryDelay?.let { return it } val delay = reconnectionOptions.initialReconnectionDelay * reconnectionOptions.reconnectionDelayMultiplier.pow(attempt) @@ -367,11 +366,13 @@ public class StreamableHttpClientTransport( ): SseStreamResult { var hasPrimingEvent = false var receivedResponse = false + var localLastEventId: String? = null + var localServerRetryDelay: Duration? = null try { session.incoming.collect { event -> - event.retry?.let { serverRetryDelay = it.milliseconds } + event.retry?.let { localServerRetryDelay = it.milliseconds } event.id?.let { - lastEventId = it + localLastEventId = it hasPrimingEvent = true onResumptionToken?.invoke(it) } @@ -399,7 +400,7 @@ public class StreamableHttpClientTransport( } catch (t: Throwable) { _onError(t) } - return SseStreamResult(hasPrimingEvent, receivedResponse) + return SseStreamResult(hasPrimingEvent, receivedResponse, localLastEventId, localServerRetryDelay) } @Suppress("CyclomaticComplexMethod") @@ -413,13 +414,15 @@ public class StreamableHttpClientTransport( var hasPrimingEvent = false var receivedResponse = false + var localLastEventId: String? = null + var localServerRetryDelay: Duration? = null val sb = StringBuilder() var id: String? = null var eventName: String? = null suspend fun dispatch(id: String?, eventName: String?, data: String) { id?.let { - lastEventId = it + localLastEventId = it hasPrimingEvent = true onResumptionToken?.invoke(it) } @@ -466,10 +469,10 @@ public class StreamableHttpClientTransport( line.startsWith("data:") -> sb.append(line.substringAfter("data:").trim()) line.startsWith("retry:") -> line.substringAfter("retry:").trim().toLongOrNull()?.let { - serverRetryDelay = it.milliseconds + localServerRetryDelay = it.milliseconds } } } - return SseStreamResult(hasPrimingEvent, receivedResponse) + return SseStreamResult(hasPrimingEvent, receivedResponse, localLastEventId, localServerRetryDelay) } } diff --git a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt index 6513b469b..a1fe89e66 100644 --- a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt +++ b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt @@ -1,6 +1,7 @@ package io.modelcontextprotocol.kotlin.sdk.client import io.kotest.matchers.collections.shouldContain +import io.kotest.matchers.shouldBe import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode @@ -203,7 +204,31 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { meta = EmptyJsonObject, ) + client.close() + } + + @Test + fun `terminateSession sends DELETE request`() = runBlocking { + val client = Client( + clientInfo = Implementation(name = "client1", version = "1.0.0"), + options = ClientOptions(capabilities = ClientCapabilities()), + ) + val sessionId = Uuid.random().toString() + + mockMcp.onInitialize(clientName = "client1", sessionId = sessionId) + mockMcp.handleJSONRPCRequest( + jsonRpcMethod = "notifications/initialized", + expectedSessionId = sessionId, + sessionId = sessionId, + statusCode = HttpStatusCode.Accepted, + ) + mockMcp.handleSubscribeWithGet(sessionId) { emptyFlow() } + + connect(client) + mockMcp.mockUnsubscribeRequest(sessionId = sessionId) + (client.transport as StreamableHttpClientTransport).terminateSession() + (client.transport as StreamableHttpClientTransport).sessionId shouldBe null client.close() } @@ -257,8 +282,6 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { buildJsonObject {} } - mockMcp.mockUnsubscribeRequest(sessionId = sessionId) - connect(client) delay(1.seconds)