Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import io.sentry.util.SpanUtils;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
Expand All @@ -21,6 +22,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.KafkaHeaders;

/**
* A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka
Expand Down Expand Up @@ -161,6 +163,11 @@ private boolean isIgnored() {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
}

final @Nullable Integer retryCount = retryCount(record);
if (retryCount != null) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);
}

final @Nullable String enqueuedTimeStr =
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
if (enqueuedTimeStr != null) {
Expand All @@ -178,6 +185,25 @@ private boolean isIgnored() {
return transaction;
}

private @Nullable Integer retryCount(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
if (header == null) {
return null;
}

final byte[] value = header.value();
if (value == null || value.length != Integer.BYTES) {
return null;
}

final int attempt = ByteBuffer.wrap(value).getInt();
if (attempt <= 0) {
return null;
}

return attempt - 1;
}

private void finishStaleContext() {
if (currentContext.get() != null) {
finishSpan(SpanStatus.UNKNOWN, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import io.sentry.Sentry
import io.sentry.SentryOptions
import io.sentry.SentryTraceHeader
import io.sentry.SentryTracer
import io.sentry.SpanDataConvention
import io.sentry.TransactionContext
import io.sentry.test.initForTest
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.internals.RecordHeaders
Expand All @@ -24,6 +27,7 @@ import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.kafka.listener.RecordInterceptor
import org.springframework.kafka.support.KafkaHeaders

class SentryKafkaRecordInterceptorTest {

Expand All @@ -32,6 +36,7 @@ class SentryKafkaRecordInterceptorTest {
private lateinit var options: SentryOptions
private lateinit var consumer: Consumer<String, String>
private lateinit var lifecycleToken: ISentryLifecycleToken
private lateinit var transaction: SentryTracer

@BeforeTest
fun setup() {
Expand All @@ -52,8 +57,9 @@ class SentryKafkaRecordInterceptorTest {
whenever(forkedScopes.options).thenReturn(options)
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)

val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any()))
.thenReturn(transaction)
}

@AfterTest
Expand Down Expand Up @@ -81,6 +87,7 @@ class SentryKafkaRecordInterceptorTest {
sentryTrace: String? = null,
baggage: String? = null,
enqueuedTime: Long? = null,
deliveryAttempt: Int? = null,
): ConsumerRecord<String, String> {
val headers = RecordHeaders()
sentryTrace?.let {
Expand All @@ -95,6 +102,12 @@ class SentryKafkaRecordInterceptorTest {
it.toString().toByteArray(StandardCharsets.UTF_8),
)
}
deliveryAttempt?.let {
headers.add(
KafkaHeaders.DELIVERY_ATTEMPT,
ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(),
)
}
val record = ConsumerRecord<String, String>("my-topic", 0, 0L, "key", "value")
headers.forEach { record.headers().add(it) }
return record
Expand Down Expand Up @@ -132,6 +145,26 @@ class SentryKafkaRecordInterceptorTest {
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
}

@Test
fun `sets retry count from delivery attempt header`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecordWithHeaders(deliveryAttempt = 3)

withMockSentry { interceptor.intercept(record, consumer) }

assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
}

@Test
fun `does not set retry count when delivery attempt header is missing`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

withMockSentry { interceptor.intercept(record, consumer) }

assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
}

@Test
fun `does not create span when queue tracing is disabled`() {
options.isEnableQueueTracing = false
Expand Down
Loading