From 007d27fbbc4be7e19e6b2b0f6c38ff06a78bf842 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 14:19:38 +0200 Subject: [PATCH 01/11] feat(kafka): Add no-arg producer interceptor for Kafka config Allow kafka-clients to instantiate SentryKafkaProducerInterceptor via interceptor.classes by adding a no-arg constructor that uses ScopesAdapter. This makes native Kafka interceptor wiring work out of the box in applications and samples.\n\nAlso add a Kafka tracing example to the console sample with a transaction-scoped producer send, and cover no-arg constructor behavior in sentry-kafka tests. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 1 + .../kafka/SentryKafkaProducerInterceptor.java | 5 +++ .../SentryKafkaProducerInterceptorTest.kt | 30 ++++++++++++++- .../sentry-samples-console/build.gradle.kts | 2 + .../java/io/sentry/samples/console/Main.java | 37 +++++++++++++++++++ 5 files changed, 74 insertions(+), 1 deletion(-) diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 30faaa1256..c5b58ecee5 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -15,6 +15,7 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; + public fun ()V public fun (Lio/sentry/IScopes;)V public fun (Lio/sentry/IScopes;Ljava/lang/String;)V public fun close ()V diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index c6b3184b39..923104427e 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -4,6 +4,7 @@ import io.sentry.DateUtils; import io.sentry.IScopes; import io.sentry.ISpan; +import io.sentry.ScopesAdapter; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanOptions; @@ -28,6 +29,10 @@ public final class SentryKafkaProducerInterceptor implements ProducerInter private final @NotNull IScopes scopes; private final @NotNull String traceOrigin; + public SentryKafkaProducerInterceptor() { + this(ScopesAdapter.getInstance(), TRACE_ORIGIN); + } + public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { this(scopes, TRACE_ORIGIN); } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index 99b487c1c0..61ac1ab20e 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -1,6 +1,7 @@ package io.sentry.kafka import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader @@ -26,7 +27,11 @@ class SentryKafkaProducerInterceptorTest { @BeforeTest fun setup() { - initForTest { it.dsn = "https://key@sentry.io/proj" } + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } scopes = mock() options = SentryOptions().apply { @@ -95,4 +100,27 @@ class SentryKafkaProducerInterceptorTest { assertSame(record, result) } + + @Test + fun `no-arg constructor uses current scopes`() { + val transaction = Sentry.startTransaction("tx", "op") + val record = ProducerRecord("my-topic", "key", "value") + + try { + val token: ISentryLifecycleToken = transaction.makeCurrent() + try { + val interceptor = SentryKafkaProducerInterceptor() + interceptor.onSend(record) + } finally { + token.close() + } + } finally { + transaction.finish() + } + + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull( + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) + ) + } } diff --git a/sentry-samples/sentry-samples-console/build.gradle.kts b/sentry-samples/sentry-samples-console/build.gradle.kts index 0dc6183b4f..010195c677 100644 --- a/sentry-samples/sentry-samples-console/build.gradle.kts +++ b/sentry-samples/sentry-samples-console/build.gradle.kts @@ -36,8 +36,10 @@ dependencies { implementation(projects.sentry) implementation(projects.sentryAsyncProfiler) implementation(projects.sentryJcache) + implementation(projects.sentryKafka) implementation(libs.jcache) implementation(libs.caffeine.jcache) + implementation(libs.kafka.clients) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(projects.sentry) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 0ed0646c7b..738ba2de55 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -3,13 +3,19 @@ import io.sentry.*; import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; +import io.sentry.kafka.SentryKafkaProducerInterceptor; import io.sentry.protocol.Message; import io.sentry.protocol.User; import java.util.Collections; +import java.util.Properties; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; public class Main { @@ -95,6 +101,7 @@ public static void main(String[] args) throws InterruptedException { // Enable cache tracing to create spans for cache operations options.setEnableCacheTracing(true); + options.setEnableQueueTracing(true); // Determine traces sample rate based on the sampling context // options.setTracesSampler( @@ -178,6 +185,12 @@ public static void main(String[] args) throws InterruptedException { // cache.remove, and cache.flush spans as children of the active transaction. demonstrateCacheTracing(); + // Kafka queue tracing with kafka-clients interceptors. + // + // This uses the native producer interceptor from sentry-kafka. + // If no local Kafka broker is available, this block exits quietly. + demonstrateKafkaTracing(); + // Performance feature // // Transactions collect execution time of the piece of code that's executed between the start @@ -247,6 +260,30 @@ private static void captureMetrics() { Sentry.metrics().distribution("distributionMetric", 7.0); } + private static void demonstrateKafkaTracing() { + final Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + + final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); + try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { + try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + producer.send(new ProducerRecord<>("sentry-topic", "sentry-kafka sample message")).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception ignoredException) { + // local broker may not be available when running the sample + } + } finally { + transaction.finish(); + } + } + private static class SomeEventProcessor implements EventProcessor { @Override public SentryEvent process(SentryEvent event, Hint hint) { From 82cfc3704f711ad30129b16b306201d1338ab021 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 14:56:28 +0200 Subject: [PATCH 02/11] feat(kafka): Add consumer demo to console sample Show end-to-end Kafka queue tracing in the console sample by running a background consumer thread, producing a message, and waiting for consume before exit.\n\nAdd a no-arg constructor to SentryKafkaConsumerInterceptor so kafka-clients can instantiate it from interceptor.classes, and add test coverage for that constructor. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 1 + .../kafka/SentryKafkaConsumerInterceptor.java | 5 ++ .../SentryKafkaConsumerInterceptorTest.kt | 28 ++++++++ .../java/io/sentry/samples/console/Main.java | 68 ++++++++++++++++++- 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index c5b58ecee5..6fe7f41222 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -5,6 +5,7 @@ public final class io/sentry/kafka/BuildConfig { public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { public static final field TRACE_ORIGIN Ljava/lang/String; + public fun ()V public fun (Lio/sentry/IScopes;)V public fun close ()V public fun configure (Ljava/util/Map;)V diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java index caa773352e..a37d01cd90 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java @@ -3,6 +3,7 @@ import io.sentry.BaggageHeader; import io.sentry.IScopes; import io.sentry.ITransaction; +import io.sentry.ScopesAdapter; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanStatus; @@ -29,6 +30,10 @@ public final class SentryKafkaConsumerInterceptor implements ConsumerInter private final @NotNull IScopes scopes; + public SentryKafkaConsumerInterceptor() { + this(ScopesAdapter.getInstance()); + } + public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { this.scopes = scopes; } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt index daee640793..f6786bc8f5 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt @@ -2,9 +2,13 @@ package io.sentry.kafka import io.sentry.IScopes import io.sentry.ITransaction +import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.TransactionContext import io.sentry.TransactionOptions +import io.sentry.test.initForTest +import kotlin.test.AfterTest +import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertSame import org.apache.kafka.clients.consumer.ConsumerRecord @@ -19,6 +23,20 @@ import org.mockito.kotlin.whenever class SentryKafkaConsumerInterceptorTest { + @BeforeTest + fun setup() { + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } + } + + @AfterTest + fun teardown() { + Sentry.close() + } + @Test fun `does nothing when queue tracing is disabled`() { val scopes = mock() @@ -64,6 +82,16 @@ class SentryKafkaConsumerInterceptorTest { interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) } + @Test + fun `no-arg constructor uses current scopes`() { + val interceptor = SentryKafkaConsumerInterceptor() + val records = singleRecordBatch() + + val result = interceptor.onConsume(records) + + assertSame(records, result) + } + private fun singleRecordBatch(): ConsumerRecords { val partition = TopicPartition("my-topic", 0) val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 738ba2de55..3b9e2476cb 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -3,18 +3,27 @@ import io.sentry.*; import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; +import io.sentry.kafka.SentryKafkaConsumerInterceptor; import io.sentry.kafka.SentryKafkaProducerInterceptor; import io.sentry.protocol.Message; import io.sentry.protocol.User; +import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; public class Main { @@ -261,6 +270,10 @@ private static void captureMetrics() { } private static void demonstrateKafkaTracing() { + final String topic = "sentry-topic-console-sample"; + final CountDownLatch consumedLatch = new CountDownLatch(1); + final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch); + final Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProperties.put( @@ -273,17 +286,70 @@ private static void demonstrateKafkaTracing() { final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { - producer.send(new ProducerRecord<>("sentry-topic", "sentry-kafka sample message")).get(); + Thread.sleep(500); + producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception ignoredException) { // local broker may not be available when running the sample } + + try { + consumedLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } finally { + consumerThread.interrupt(); + try { + consumerThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } transaction.finish(); } } + private static Thread startKafkaConsumerThread( + final String topic, final CountDownLatch consumedLatch) { + final Thread consumerThread = + new Thread( + () -> { + final Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + SentryKafkaConsumerInterceptor.class.getName()); + + try (KafkaConsumer consumer = + new KafkaConsumer<>(consumerProperties)) { + consumer.subscribe(Collections.singletonList(topic)); + + while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { + final ConsumerRecords records = + consumer.poll(Duration.ofMillis(500)); + if (!records.isEmpty()) { + consumedLatch.countDown(); + break; + } + } + } catch (Exception ignored) { + // local broker may not be available when running the sample + } + }, + "sentry-kafka-sample-consumer"); + consumerThread.start(); + return consumerThread; + } + private static class SomeEventProcessor implements EventProcessor { @Override public SentryEvent process(SentryEvent event, Hint hint) { From cb4d2acfb88995b308c405d2051ff5b38a32f5b8 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 15:31:46 +0200 Subject: [PATCH 03/11] ref(samples): Extract Kafka console showcase into dedicated class Move Kafka producer/consumer showcase logic out of Main into KafkaShowcase to make the sample easier to read and follow. Keep runtime behavior unchanged by preserving the same demo entry point and flow. Co-Authored-By: Claude --- .../sentry/samples/console/KafkaShowcase.java | 107 ++++++++++++++++++ .../java/io/sentry/samples/console/Main.java | 98 +--------------- 2 files changed, 108 insertions(+), 97 deletions(-) create mode 100644 sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java new file mode 100644 index 0000000000..aecc4f6b19 --- /dev/null +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java @@ -0,0 +1,107 @@ +package io.sentry.samples.console; + +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.Sentry; +import io.sentry.kafka.SentryKafkaConsumerInterceptor; +import io.sentry.kafka.SentryKafkaProducerInterceptor; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +final class KafkaShowcase { + + private KafkaShowcase() {} + + static void demonstrate() { + final String topic = "sentry-topic-console-sample"; + final CountDownLatch consumedLatch = new CountDownLatch(1); + final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch); + + final Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + + final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); + try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { + try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + Thread.sleep(500); + producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception ignoredException) { + // local broker may not be available when running the sample + } + + try { + consumedLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } finally { + consumerThread.interrupt(); + try { + consumerThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + transaction.finish(); + } + } + + private static Thread startKafkaConsumerThread( + final String topic, final CountDownLatch consumedLatch) { + final Thread consumerThread = + new Thread( + () -> { + final Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + SentryKafkaConsumerInterceptor.class.getName()); + + try (KafkaConsumer consumer = + new KafkaConsumer<>(consumerProperties)) { + consumer.subscribe(Collections.singletonList(topic)); + + while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { + final ConsumerRecords records = + consumer.poll(Duration.ofMillis(500)); + if (!records.isEmpty()) { + consumedLatch.countDown(); + break; + } + } + } catch (Exception ignored) { + // local broker may not be available when running the sample + } + }, + "sentry-kafka-sample-consumer"); + consumerThread.start(); + return consumerThread; + } +} diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 3b9e2476cb..f42dee311f 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -3,28 +3,13 @@ import io.sentry.*; import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; -import io.sentry.kafka.SentryKafkaConsumerInterceptor; -import io.sentry.kafka.SentryKafkaProducerInterceptor; import io.sentry.protocol.Message; import io.sentry.protocol.User; -import java.time.Duration; import java.util.Collections; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; public class Main { @@ -198,7 +183,7 @@ public static void main(String[] args) throws InterruptedException { // // This uses the native producer interceptor from sentry-kafka. // If no local Kafka broker is available, this block exits quietly. - demonstrateKafkaTracing(); + KafkaShowcase.demonstrate(); // Performance feature // @@ -269,87 +254,6 @@ private static void captureMetrics() { Sentry.metrics().distribution("distributionMetric", 7.0); } - private static void demonstrateKafkaTracing() { - final String topic = "sentry-topic-console-sample"; - final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch); - - final Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - producerProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); - - final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); - try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { - try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { - Thread.sleep(500); - producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception ignoredException) { - // local broker may not be available when running the sample - } - - try { - consumedLatch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } finally { - consumerThread.interrupt(); - try { - consumerThread.join(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - transaction.finish(); - } - } - - private static Thread startKafkaConsumerThread( - final String topic, final CountDownLatch consumedLatch) { - final Thread consumerThread = - new Thread( - () -> { - final Properties consumerProperties = new Properties(); - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - consumerProperties.put( - ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - SentryKafkaConsumerInterceptor.class.getName()); - - try (KafkaConsumer consumer = - new KafkaConsumer<>(consumerProperties)) { - consumer.subscribe(Collections.singletonList(topic)); - - while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { - final ConsumerRecords records = - consumer.poll(Duration.ofMillis(500)); - if (!records.isEmpty()) { - consumedLatch.countDown(); - break; - } - } - } catch (Exception ignored) { - // local broker may not be available when running the sample - } - }, - "sentry-kafka-sample-consumer"); - consumerThread.start(); - return consumerThread; - } - private static class SomeEventProcessor implements EventProcessor { @Override public SentryEvent process(SentryEvent event, Hint hint) { From 33c4c79793692834a74757ee1e41ae2cfebdb82a Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 16:03:05 +0200 Subject: [PATCH 04/11] feat(samples): Add opt-in Kafka console e2e coverage Gate the console Kafka showcase behind SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS so Kafka behavior is enabled only when configured. Keep the showcase isolated in KafkaShowcase and use fail-fast Kafka client timeouts for local runs.\n\nExtend console system tests to assert producer and consumer queue tracing when Kafka is enabled. Update system-test-runner to provision or reuse a local Kafka broker for the console module and clean up runner-managed resources. Co-Authored-By: Claude --- .../sentry/samples/console/KafkaShowcase.java | 15 ++-- .../java/io/sentry/samples/console/Main.java | 13 ++- .../ConsoleApplicationSystemTest.kt | 48 +++++++--- test/system-test-runner.py | 88 +++++++++++++++++++ 4 files changed, 142 insertions(+), 22 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java index aecc4f6b19..2467133d39 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java @@ -24,19 +24,22 @@ final class KafkaShowcase { private KafkaShowcase() {} - static void demonstrate() { + static void demonstrate(final String bootstrapServers) { final String topic = "sentry-topic-console-sample"; final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch); + final Thread consumerThread = startKafkaConsumerThread(topic, bootstrapServers, consumedLatch); final Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { @@ -66,12 +69,12 @@ static void demonstrate() { } private static Thread startKafkaConsumerThread( - final String topic, final CountDownLatch consumedLatch) { + final String topic, final String bootstrapServers, final CountDownLatch consumedLatch) { final Thread consumerThread = new Thread( () -> { final Properties consumerProperties = new Properties(); - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put( ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -83,6 +86,8 @@ private static Thread startKafkaConsumerThread( consumerProperties.put( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); + consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); + consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index f42dee311f..f13b5101d2 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -16,6 +16,10 @@ public class Main { private static long numberOfDiscardedSpansDueToOverflow = 0; public static void main(String[] args) throws InterruptedException { + final String kafkaBootstrapServers = System.getenv("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS"); + final boolean kafkaEnabled = + kafkaBootstrapServers != null && !kafkaBootstrapServers.trim().isEmpty(); + Sentry.init( options -> { // NOTE: Replace the test DSN below with YOUR OWN DSN to see the events from this app in @@ -95,7 +99,7 @@ public static void main(String[] args) throws InterruptedException { // Enable cache tracing to create spans for cache operations options.setEnableCacheTracing(true); - options.setEnableQueueTracing(true); + options.setEnableQueueTracing(kafkaEnabled); // Determine traces sample rate based on the sampling context // options.setTracesSampler( @@ -181,9 +185,10 @@ public static void main(String[] args) throws InterruptedException { // Kafka queue tracing with kafka-clients interceptors. // - // This uses the native producer interceptor from sentry-kafka. - // If no local Kafka broker is available, this block exits quietly. - KafkaShowcase.demonstrate(); + // Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092 + if (kafkaEnabled) { + KafkaShowcase.demonstrate(kafkaBootstrapServers); + } // Performance feature // diff --git a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt index 2b009167ac..1b512fdc48 100644 --- a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt +++ b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt @@ -19,19 +19,7 @@ class ConsoleApplicationSystemTest { @Test fun `console application sends expected events when run as JAR`() { - val jarFile = testHelper.findJar("sentry-samples-console") - val process = - testHelper.launch( - jarFile, - mapOf( - "SENTRY_DSN" to testHelper.dsn, - "SENTRY_TRACES_SAMPLE_RATE" to "1.0", - "SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false", - "SENTRY_DEBUG" to "true", - "SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0", - "SENTRY_PROFILE_LIFECYCLE" to "TRACE", - ), - ) + val process = launchConsoleProcess() process.waitFor(30, TimeUnit.SECONDS) assertEquals(0, process.exitValue()) @@ -40,6 +28,40 @@ class ConsoleApplicationSystemTest { verifyExpectedEvents() } + @Test + fun `console application sends kafka producer and consumer tracing when kafka is enabled`() { + val process = + launchConsoleProcess(mapOf("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS" to "localhost:9092")) + + process.waitFor(30, TimeUnit.SECONDS) + assertEquals(0, process.exitValue()) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "kafka-demo" && + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.receive") && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" + } + } + + private fun launchConsoleProcess(overrides: Map = emptyMap()): Process { + val jarFile = testHelper.findJar("sentry-samples-console") + val env = + mutableMapOf( + "SENTRY_DSN" to testHelper.dsn, + "SENTRY_TRACES_SAMPLE_RATE" to "1.0", + "SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false", + "SENTRY_DEBUG" to "true", + "SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0", + "SENTRY_PROFILE_LIFECYCLE" to "TRACE", + ) + env.putAll(overrides) + return testHelper.launch(jarFile, env) + } + private fun verifyExpectedEvents() { var profilerId: SentryId? = null // Verify we received a "Fatal message!" event diff --git a/test/system-test-runner.py b/test/system-test-runner.py index 70489c580a..64979b3e0e 100644 --- a/test/system-test-runner.py +++ b/test/system-test-runner.py @@ -42,6 +42,7 @@ import argparse import requests import threading +import socket from pathlib import Path from typing import Optional, List, Tuple from dataclasses import dataclass @@ -65,6 +66,9 @@ "SENTRY_ENABLE_CACHE_TRACING": "true" } +KAFKA_CONTAINER_NAME = "sentry-java-system-test-kafka" +KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" + class ServerType(Enum): TOMCAT = 0 SPRING = 1 @@ -155,6 +159,7 @@ def __init__(self): self.mock_server = Server(name="Mock", pid_filepath="sentry-mock-server.pid") self.tomcat_server = Server(name="Tomcat", pid_filepath="tomcat-server.pid") self.spring_server = Server(name="Spring", pid_filepath="spring-server.pid") + self.kafka_started_by_runner = False # Load existing PIDs if available for server in (self.mock_server, self.tomcat_server, self.spring_server): @@ -196,7 +201,78 @@ def kill_process(self, pid: int, name: str) -> None: except (OSError, ProcessLookupError): print(f"Process {pid} was already dead") + def module_requires_kafka(self, sample_module: str) -> bool: + return sample_module == "sentry-samples-console" + + def wait_for_port(self, host: str, port: int, max_attempts: int = 20) -> bool: + for _ in range(max_attempts): + try: + with socket.create_connection((host, port), timeout=1): + return True + except OSError: + time.sleep(1) + return False + def start_kafka_broker(self) -> None: + if self.wait_for_port("localhost", 9092, max_attempts=1): + print("Kafka broker already running on localhost:9092, reusing it.") + self.kafka_started_by_runner = False + return + + self.stop_kafka_broker() + + print("Starting Kafka broker (Redpanda) for system tests...") + run_result = subprocess.run( + [ + "docker", + "run", + "-d", + "--name", + KAFKA_CONTAINER_NAME, + "-p", + "9092:9092", + "docker.redpanda.com/redpandadata/redpanda:v24.1.9", + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "1G", + "--reserve-memory", + "0M", + "--node-id", + "0", + "--check=false", + "--kafka-addr", + "PLAINTEXT://0.0.0.0:9092", + "--advertise-kafka-addr", + "PLAINTEXT://localhost:9092", + ], + check=False, + capture_output=True, + text=True, + ) + + if run_result.returncode != 0: + raise RuntimeError(f"Failed to start Kafka container: {run_result.stderr}") + + if not self.wait_for_port("localhost", 9092, max_attempts=30): + raise RuntimeError("Kafka broker did not become ready on localhost:9092") + + self.kafka_started_by_runner = True + + def stop_kafka_broker(self) -> None: + if not self.kafka_started_by_runner: + return + + subprocess.run( + ["docker", "rm", "-f", KAFKA_CONTAINER_NAME], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + self.kafka_started_by_runner = False def start_sentry_mock_server(self) -> None: """Start the Sentry mock server.""" @@ -557,6 +633,12 @@ def setup_test_infrastructure(self, sample_module: str, java_agent: str, java_agent_auto_init: str, build_before_run: str, server_type: Optional[ServerType]) -> int: """Set up test infrastructure. Returns 0 on success, error code on failure.""" + if self.module_requires_kafka(sample_module): + self.start_kafka_broker() + os.environ["SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS"] = KAFKA_BOOTSTRAP_SERVERS + else: + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) + # Build if requested if build_before_run == "1": print("Building before test run") @@ -624,6 +706,8 @@ def run_single_test(self, sample_module: str, java_agent: str, elif server_type == ServerType.SPRING: self.stop_spring_server() self.stop_sentry_mock_server() + self.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) def run_all_tests(self) -> int: """Run all system tests.""" @@ -954,6 +1038,8 @@ def cleanup_on_exit(self, signum, frame): self.stop_spring_server() self.stop_sentry_mock_server() self.stop_tomcat_server() + self.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) sys.exit(1) def main(): @@ -1152,6 +1238,8 @@ def main(): runner.stop_spring_server() runner.stop_sentry_mock_server() runner.stop_tomcat_server() + runner.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) if __name__ == "__main__": sys.exit(main()) From 58b67b2d8e5acdb22176b4308d49b76d58122315 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 18:33:44 +0200 Subject: [PATCH 05/11] ref(samples): Move KafkaShowcase to kafka subpackage Move KafkaShowcase under io.sentry.samples.console.kafka and update Main to import the relocated class. This keeps Kafka-specific sample code grouped in a dedicated package without changing runtime behavior. Co-Authored-By: Claude --- .../src/main/java/io/sentry/samples/console/Main.java | 1 + .../sentry/samples/console/{ => kafka}/KafkaShowcase.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) rename sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/{ => kafka}/KafkaShowcase.java (97%) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index f13b5101d2..3b93d3aed1 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -4,6 +4,7 @@ import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; import io.sentry.protocol.Message; +import io.sentry.samples.console.kafka.KafkaShowcase; import io.sentry.protocol.User; import java.util.Collections; import javax.cache.Cache; diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java similarity index 97% rename from sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java rename to sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 2467133d39..0a33c7eed3 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -1,4 +1,4 @@ -package io.sentry.samples.console; +package io.sentry.samples.console.kafka; import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; @@ -20,11 +20,11 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -final class KafkaShowcase { +public final class KafkaShowcase { private KafkaShowcase() {} - static void demonstrate(final String bootstrapServers) { + public static void demonstrate(final String bootstrapServers) { final String topic = "sentry-topic-console-sample"; final CountDownLatch consumedLatch = new CountDownLatch(1); final Thread consumerThread = startKafkaConsumerThread(topic, bootstrapServers, consumedLatch); From daeba534e7e1e87a8e2b8844c77cbaf16f4fa419 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 18:34:39 +0200 Subject: [PATCH 06/11] Update KafkaShowcase.java extract constant --- .../io/sentry/samples/console/kafka/KafkaShowcase.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 0a33c7eed3..bc5ee2074b 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -22,12 +22,13 @@ public final class KafkaShowcase { + public static final String TOPIC = "sentry-topic-console-sample"; + private KafkaShowcase() {} public static void demonstrate(final String bootstrapServers) { - final String topic = "sentry-topic-console-sample"; final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = startKafkaConsumerThread(topic, bootstrapServers, consumedLatch); + final Thread consumerThread = startKafkaConsumerThread(TOPIC, bootstrapServers, consumedLatch); final Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -45,7 +46,7 @@ public static void demonstrate(final String bootstrapServers) { try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { Thread.sleep(500); - producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get(); + producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception ignoredException) { From a22236259bc9435ea3e3e613c1ce0a10e745abaf Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 18:36:12 +0200 Subject: [PATCH 07/11] Update KafkaShowcase.java extract methods --- .../samples/console/kafka/KafkaShowcase.java | 66 +++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index bc5ee2074b..47e8e366e5 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -30,17 +30,7 @@ public static void demonstrate(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); final Thread consumerThread = startKafkaConsumerThread(TOPIC, bootstrapServers, consumedLatch); - final Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - producerProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); - producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); - producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); - producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + final Properties producerProperties = getProducerProperties(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { @@ -74,21 +64,7 @@ private static Thread startKafkaConsumerThread( final Thread consumerThread = new Thread( () -> { - final Properties consumerProperties = new Properties(); - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - consumerProperties.put( - ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - SentryKafkaConsumerInterceptor.class.getName()); - consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); - consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + final Properties consumerProperties = getConsumerProperties(bootstrapServers); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { @@ -110,4 +86,42 @@ private static Thread startKafkaConsumerThread( consumerThread.start(); return consumerThread; } + + private static Properties getConsumerProperties(String bootstrapServers) { + final Properties consumerProperties = new Properties(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + SentryKafkaConsumerInterceptor.class.getName()); + consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); + consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + + return consumerProperties; + } + + private static Properties getProducerProperties(String bootstrapServers) { + final Properties producerProperties = new Properties(); + + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + + return producerProperties; + } } From 7661f6cfab701565917835e2e517054223cb04f6 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 18:39:10 +0200 Subject: [PATCH 08/11] Update KafkaShowcase.java refactor --- .../samples/console/kafka/KafkaShowcase.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 47e8e366e5..0d15c8d753 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -28,7 +28,7 @@ private KafkaShowcase() {} public static void demonstrate(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = startKafkaConsumerThread(TOPIC, bootstrapServers, consumedLatch); + final Thread consumerThread = startKafkaConsumerThread(bootstrapServers, consumedLatch); final Properties producerProperties = getProducerProperties(bootstrapServers); @@ -60,7 +60,7 @@ public static void demonstrate(final String bootstrapServers) { } private static Thread startKafkaConsumerThread( - final String topic, final String bootstrapServers, final CountDownLatch consumedLatch) { + final String bootstrapServers, final CountDownLatch consumedLatch) { final Thread consumerThread = new Thread( () -> { @@ -90,6 +90,10 @@ private static Thread startKafkaConsumerThread( private static Properties getConsumerProperties(String bootstrapServers) { final Properties consumerProperties = new Properties(); + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + SentryKafkaConsumerInterceptor.class.getName()); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put( ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); @@ -99,9 +103,6 @@ private static Properties getConsumerProperties(String bootstrapServers) { consumerProperties.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - SentryKafkaConsumerInterceptor.class.getName()); consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); @@ -111,13 +112,14 @@ private static Properties getConsumerProperties(String bootstrapServers) { private static Properties getProducerProperties(String bootstrapServers) { final Properties producerProperties = new Properties(); + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); From e02a9079c426de20767050ecbb3a34252ef7a1c7 Mon Sep 17 00:00:00 2001 From: Sentry Github Bot Date: Mon, 13 Apr 2026 16:42:36 +0000 Subject: [PATCH 09/11] Format code --- .../main/java/io/sentry/samples/console/Main.java | 2 +- .../sentry/samples/console/kafka/KafkaShowcase.java | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 3b93d3aed1..90b9c9bd32 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -4,8 +4,8 @@ import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; import io.sentry.protocol.Message; -import io.sentry.samples.console.kafka.KafkaShowcase; import io.sentry.protocol.User; +import io.sentry.samples.console.kafka.KafkaShowcase; import java.util.Collections; import javax.cache.Cache; import javax.cache.CacheManager; diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 0d15c8d753..33ee9c9d0f 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -91,8 +91,7 @@ private static Properties getConsumerProperties(String bootstrapServers) { final Properties consumerProperties = new Properties(); consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - SentryKafkaConsumerInterceptor.class.getName()); + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put( @@ -101,8 +100,7 @@ private static Properties getConsumerProperties(String bootstrapServers) { consumerProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); @@ -113,13 +111,13 @@ private static Properties getProducerProperties(String bootstrapServers) { final Properties producerProperties = new Properties(); producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); From efd8727436b43aaa1bace1532f253a0815565578 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Tue, 14 Apr 2026 05:57:19 +0200 Subject: [PATCH 10/11] fix --- .../java/io/sentry/samples/console/kafka/KafkaShowcase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 0d15c8d753..205fb8d7ed 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -68,7 +68,7 @@ private static Thread startKafkaConsumerThread( try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { - consumer.subscribe(Collections.singletonList(topic)); + consumer.subscribe(Collections.singletonList(TOPIC)); while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { final ConsumerRecords records = From 540ea073b74c3575a25519c1d7047924958d4d8e Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Tue, 14 Apr 2026 06:17:48 +0200 Subject: [PATCH 11/11] ref(samples): Clarify Kafka setup in console showcase Restructure KafkaShowcase to highlight the required Sentry interceptor configuration for producer and consumer setups. Split property construction into explicit helper methods and rename the entrypoint to make customer integration requirements easier to follow without changing behavior. Co-Authored-By: Claude --- .../java/io/sentry/samples/console/Main.java | 4 +- .../samples/console/kafka/KafkaShowcase.java | 95 ++++++++++--------- 2 files changed, 51 insertions(+), 48 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 3b93d3aed1..4fee0a8374 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -4,8 +4,8 @@ import io.sentry.clientreport.DiscardReason; import io.sentry.jcache.SentryJCacheWrapper; import io.sentry.protocol.Message; -import io.sentry.samples.console.kafka.KafkaShowcase; import io.sentry.protocol.User; +import io.sentry.samples.console.kafka.KafkaShowcase; import java.util.Collections; import javax.cache.Cache; import javax.cache.CacheManager; @@ -188,7 +188,7 @@ public static void main(String[] args) throws InterruptedException { // // Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092 if (kafkaEnabled) { - KafkaShowcase.demonstrate(kafkaBootstrapServers); + KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers); } // Performance feature diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 205fb8d7ed..9c84b6f004 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -26,11 +26,11 @@ public final class KafkaShowcase { private KafkaShowcase() {} - public static void demonstrate(final String bootstrapServers) { + public static void runKafkaWithSentryInterceptors(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = startKafkaConsumerThread(bootstrapServers, consumedLatch); - - final Properties producerProperties = getProducerProperties(bootstrapServers); + final Thread consumerThread = + startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch); + final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { @@ -59,12 +59,55 @@ public static void demonstrate(final String bootstrapServers) { } } - private static Thread startKafkaConsumerThread( + public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) { + final Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Required for Sentry queue tracing in kafka-clients producer setup. + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + + // Optional tuning for sample stability in CI/local runs. + producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + + return producerProperties; + } + + public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) { + final Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + // Required for Sentry queue tracing in kafka-clients consumer setup. + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); + + // Optional tuning for sample stability in CI/local runs. + consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); + consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + + return consumerProperties; + } + + private static Thread startConsumerWithSentryInterceptor( final String bootstrapServers, final CountDownLatch consumedLatch) { final Thread consumerThread = new Thread( () -> { - final Properties consumerProperties = getConsumerProperties(bootstrapServers); + final Properties consumerProperties = + createConsumerPropertiesWithSentry(bootstrapServers); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { @@ -86,44 +129,4 @@ private static Thread startKafkaConsumerThread( consumerThread.start(); return consumerThread; } - - private static Properties getConsumerProperties(String bootstrapServers) { - final Properties consumerProperties = new Properties(); - - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - SentryKafkaConsumerInterceptor.class.getName()); - - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - consumerProperties.put( - ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); - consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); - - return consumerProperties; - } - - private static Properties getProducerProperties(String bootstrapServers) { - final Properties producerProperties = new Properties(); - - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); - - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - producerProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); - producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); - producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); - - return producerProperties; - } }