From e86169fcef0e055650c7a0aa13c910e16deaad2f Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Fri, 10 Apr 2026 13:07:03 +0200 Subject: [PATCH] fix(spring-jakarta): [Queue Instrumentation 12] Add Kafka retry count attribute Set messaging.message.retry.count on queue.process transactions when the Spring Kafka delivery attempt header is present. This keeps retry context on consumer traces without changing transaction lifecycle behavior. Co-Authored-By: Claude --- .../kafka/SentryKafkaRecordInterceptor.java | 26 +++++++++++++ .../kafka/SentryKafkaRecordInterceptorTest.kt | 37 ++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index e07f86fa26..ad4b87464a 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -11,6 +11,7 @@ import io.sentry.TransactionContext; import io.sentry.TransactionOptions; import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -21,6 +22,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.KafkaHeaders; /** * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka @@ -161,6 +163,11 @@ private boolean isIgnored() { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); } + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + final @Nullable String enqueuedTimeStr = headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { @@ -178,6 +185,25 @@ private boolean isIgnored() { return transaction; } + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + private void finishStaleContext() { if (currentContext.get() != null) { finishSpan(SpanStatus.UNKNOWN, null); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 206a43298e..57aef26bc0 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -7,13 +7,16 @@ import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader import io.sentry.SentryTracer +import io.sentry.SpanDataConvention import io.sentry.TransactionContext import io.sentry.test.initForTest +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNull import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders @@ -24,6 +27,7 @@ import org.mockito.kotlin.never import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import org.springframework.kafka.listener.RecordInterceptor +import org.springframework.kafka.support.KafkaHeaders class SentryKafkaRecordInterceptorTest { @@ -32,6 +36,7 @@ class SentryKafkaRecordInterceptorTest { private lateinit var options: SentryOptions private lateinit var consumer: Consumer private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: SentryTracer @BeforeTest fun setup() { @@ -52,8 +57,9 @@ class SentryKafkaRecordInterceptorTest { whenever(forkedScopes.options).thenReturn(options) whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) - val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) - whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) } @AfterTest @@ -81,6 +87,7 @@ class SentryKafkaRecordInterceptorTest { sentryTrace: String? = null, baggage: String? = null, enqueuedTime: Long? = null, + deliveryAttempt: Int? = null, ): ConsumerRecord { val headers = RecordHeaders() sentryTrace?.let { @@ -95,6 +102,12 @@ class SentryKafkaRecordInterceptorTest { it.toString().toByteArray(StandardCharsets.UTF_8), ) } + deliveryAttempt?.let { + headers.add( + KafkaHeaders.DELIVERY_ATTEMPT, + ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(), + ) + } val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") headers.forEach { record.headers().add(it) } return record @@ -132,6 +145,26 @@ class SentryKafkaRecordInterceptorTest { verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } + @Test + fun `sets retry count from delivery attempt header`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecordWithHeaders(deliveryAttempt = 3) + + withMockSentry { interceptor.intercept(record, consumer) } + + assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `does not set retry count when delivery attempt header is missing`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + @Test fun `does not create span when queue tracing is disabled`() { options.isEnableQueueTracing = false