Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sentry-spring-jakarta/api/sentry-spring-jakarta.api
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V
public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public SentryKafkaRecordInterceptor(
return delegateIntercept(record, consumer);
}

finishStaleContext();

final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();

Expand Down Expand Up @@ -98,6 +100,11 @@ public void afterRecord(
}
}

@Override
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
finishStaleContext();
}

private @Nullable ConsumerRecord<K, V> delegateIntercept(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (delegate != null) {
Expand Down Expand Up @@ -165,6 +172,12 @@ public void afterRecord(
return transaction;
}

private void finishStaleContext() {
if (currentContext.get() != null) {
finishSpan(SpanStatus.UNKNOWN, null);
}
}

private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) {
final @Nullable SentryRecordContext ctx = currentContext.get();
if (ctx == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,64 @@ class SentryKafkaRecordInterceptorTest {
SentryKafkaRecordInterceptor.TRACE_ORIGIN,
)
}

@Test
fun `clearThreadState cleans up stale context`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

// intercept sets up context in ThreadLocal
interceptor.intercept(record, consumer)

// clearThreadState should clean up without success/failure being called
interceptor.clearThreadState(consumer)

// lifecycle token should have been closed
verify(lifecycleToken).close()
}

@Test
fun `clearThreadState is no-op when no context exists`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

// should not throw
interceptor.clearThreadState(consumer)
}

@Test
fun `intercept cleans up stale context from previous record`() {
val lifecycleToken2 = mock<ISentryLifecycleToken>()
val forkedScopes2 = mock<IScopes>()
whenever(forkedScopes2.options).thenReturn(options)
whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2)
val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2)
whenever(forkedScopes2.startTransaction(any<TransactionContext>(), any())).thenReturn(tx2)

var callCount = 0
whenever(scopes.forkedScopes(any())).thenAnswer {
callCount++
if (callCount == 1) {
val forkedScopes1 = mock<IScopes>()
whenever(forkedScopes1.options).thenReturn(options)
whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken)
val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1)
whenever(forkedScopes1.startTransaction(any<TransactionContext>(), any())).thenReturn(tx1)
forkedScopes1
} else {
forkedScopes2
}
}

val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

// First intercept sets up context
interceptor.intercept(record, consumer)

// Second intercept without success/failure โ€” should clean up stale context first
interceptor.intercept(record, consumer)

// First lifecycle token should have been closed by the defensive cleanup
verify(lifecycleToken).close()
}
}
Loading