From 9f996d3504131cea5ce919f0ce22d11659e1eb05 Mon Sep 17 00:00:00 2001 From: davidliu Date: Mon, 16 Feb 2026 20:07:21 +0900 Subject: [PATCH] SignalClient fixes --- .changeset/popular-actors-rhyme.md | 5 +++ .changeset/silent-kids-compare.md | 5 +++ .../io/livekit/android/room/SignalClient.kt | 33 ++++++++++--------- 3 files changed, 27 insertions(+), 16 deletions(-) create mode 100644 .changeset/popular-actors-rhyme.md create mode 100644 .changeset/silent-kids-compare.md diff --git a/.changeset/popular-actors-rhyme.md b/.changeset/popular-actors-rhyme.md new file mode 100644 index 000000000..bc507e37a --- /dev/null +++ b/.changeset/popular-actors-rhyme.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Cancel websocket when join coroutine is cancelled diff --git a/.changeset/silent-kids-compare.md b/.changeset/silent-kids-compare.md new file mode 100644 index 000000000..0627bb43f --- /dev/null +++ b/.changeset/silent-kids-compare.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Concurrency fixes for SignalClient connection diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index 8a0acac1d..7c8889b0a 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -40,7 +40,6 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withTimeout import kotlinx.serialization.decodeFromString import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json @@ -63,7 +62,6 @@ import java.util.Date import javax.inject.Inject import javax.inject.Named import javax.inject.Singleton -import kotlin.time.Duration.Companion.seconds /** * SignalClient to LiveKit WS servers @@ -99,6 +97,7 @@ constructor( // join will always return a JoinResponse. // reconnect will return a ReconnectResponse or a Unit if a different response was received. + @Volatile private var joinContinuation: CancellableContinuation< Either< JoinResponse, @@ -186,19 +185,17 @@ constructor( .addHeader("Authorization", "Bearer $token") .build() - return withTimeout(5.seconds) { - suspendCancellableCoroutine { cont -> - // Wait for join response through WebSocketListener - joinContinuation = cont - // When a coroutine is canceled, WebSocket must be interrupted. - cont.invokeOnCancellation { - LKLog.w { "connect cancelled, abort websocket" } - currentWs?.cancel() - currentWs = null - joinContinuation = null - } - currentWs = websocketFactory.newWebSocket(request, this@SignalClient) - } + return suspendCancellableCoroutine { cont -> + // Wait for join response through WebSocketListener + joinContinuation = cont + cont.invokeOnCancellation { + // If the coroutine is cancelled, websocket needs to be cancelled. + // onFailure will handle cleanup. + LKLog.v { "connect cancelled, abort websocket" } + currentWs?.cancel() + joinContinuation = null + } + currentWs = websocketFactory.newWebSocket(request, this@SignalClient) } } @@ -362,6 +359,7 @@ constructor( listener?.onError(t) joinContinuation?.cancel(t) } + joinContinuation = null val wasConnected = isConnected @@ -671,6 +669,7 @@ constructor( version = serverVersion ) joinContinuation?.resumeWith(Result.success(Either.Left(response.join))) + joinContinuation = null } else if (response.hasLeave()) { // Some reconnects may immediately send leave back without a join response first. handleSignalResponseImpl(ws, response) @@ -685,8 +684,10 @@ constructor( if (response.hasReconnect()) { joinContinuation?.resumeWith(Result.success(Either.Right(Either.Left(response.reconnect)))) + joinContinuation = null } else { joinContinuation?.resumeWith(Result.success(Either.Right(Either.Right(Unit)))) + joinContinuation = null // Non-reconnect response, handle normally shouldProcessMessage = true } @@ -879,7 +880,7 @@ constructor( pingJob = null pongJob?.cancel() pongJob = null - currentWs?.cancel() + currentWs?.close(code, reason) currentWs = null joinContinuation?.cancel() joinContinuation = null