From 2501e57c520929c8ddb7456297458999ff83afed Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Fri, 10 Apr 2026 10:28:18 +0200 Subject: [PATCH] fix(spring-jakarta): Clean up stale ThreadLocal context in Kafka consumer interceptor Implement clearThreadState() and defensive cleanup in intercept() to prevent ThreadLocal leaks of SentryRecordContext. Spring Kafka calls clearThreadState() in the poll loop's finally block, making it the most reliable cleanup hook for edge cases where success()/failure() callbacks are skipped (e.g. Error thrown by listener). Also add defensive cleanup at the start of intercept() to handle any stale context from a previous record that was not properly cleaned up. Co-Authored-By: Claude --- .../api/sentry-spring-jakarta.api | 1 + .../kafka/SentryKafkaRecordInterceptor.java | 13 ++++ .../kafka/SentryKafkaRecordInterceptorTest.kt | 60 +++++++++++++++++++ 3 files changed, 74 insertions(+) diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index 57d46f05bc..0ba6c77725 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -260,6 +260,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : public fun (Lio/sentry/IScopes;)V public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index e7b13f08dc..d11f7f8a67 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -52,6 +52,8 @@ public SentryKafkaRecordInterceptor( return delegateIntercept(record, consumer); } + finishStaleContext(); + final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); @@ -98,6 +100,11 @@ public void afterRecord( } } + @Override + public void clearThreadState(final @NotNull Consumer consumer) { + finishStaleContext(); + } + private @Nullable ConsumerRecord delegateIntercept( final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { if (delegate != null) { @@ -165,6 +172,12 @@ public void afterRecord( return transaction; } + private void finishStaleContext() { + if (currentContext.get() != null) { + finishSpan(SpanStatus.UNKNOWN, null); + } + } + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { final @Nullable SentryRecordContext ctx = currentContext.get(); if (ctx == null) { diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 370da75585..0688af70db 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -208,4 +208,64 @@ class SentryKafkaRecordInterceptorTest { SentryKafkaRecordInterceptor.TRACE_ORIGIN, ) } + + @Test + fun `clearThreadState cleans up stale context`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + // intercept sets up context in ThreadLocal + interceptor.intercept(record, consumer) + + // clearThreadState should clean up without success/failure being called + interceptor.clearThreadState(consumer) + + // lifecycle token should have been closed + verify(lifecycleToken).close() + } + + @Test + fun `clearThreadState is no-op when no context exists`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.clearThreadState(consumer) + } + + @Test + fun `intercept cleans up stale context from previous record`() { + val lifecycleToken2 = mock() + val forkedScopes2 = mock() + whenever(forkedScopes2.options).thenReturn(options) + whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2) + val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2) + whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) + + var callCount = 0 + whenever(scopes.forkedScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) { + val forkedScopes1 = mock() + whenever(forkedScopes1.options).thenReturn(options) + whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken) + val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1) + whenever(forkedScopes1.startTransaction(any(), any())).thenReturn(tx1) + forkedScopes1 + } else { + forkedScopes2 + } + } + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + // First intercept sets up context + interceptor.intercept(record, consumer) + + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + + // First lifecycle token should have been closed by the defensive cleanup + verify(lifecycleToken).close() + } }