From f8418961b5c3e5c3f3cda91e16002d516227c09c Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 20 Mar 2026 15:48:58 -0400 Subject: [PATCH 1/2] add --- sentry_streams/sentry_streams/config_types.py | 12 +++ sentry_streams/src/consumer.rs | 88 ++++++++++++++++++- sentry_streams/src/lib.rs | 1 + 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 110804c9..353e0160 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,11 +9,23 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + enabled: bool + topic: str + producer_config: "KafkaProducerConfig" + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str consumer_group: NotRequired[str] additional_settings: Mapping[str, Any] + dlq: NotRequired[DlqConfig] class KafkaProducerConfig(TypedDict, StepConfig): diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..913eaa6c 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct DlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl DlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + DlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,17 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +114,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +126,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +154,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = self.build_dlq_policy(); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +182,46 @@ impl ArroyoConsumer { } } +// Internal implementation methods (not exposed to Python) +impl ArroyoConsumer { + /// Builds the DLQ policy if dlq_config is provided. + /// Returns None if DLQ is not configured. + fn build_dlq_policy(&self) -> Option> { + match &self.dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = + KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Get tokio runtime handle for async DLQ operations + let handle = self.concurrency_config.handle(); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!( + "DLQ not configured, invalid messages will cause processing to stop" + ); + None + } + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +508,7 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + // Note: DLQ functionality is primarily tested through Python integration tests + // in tests/test_dlq.py, as Rust unit tests require the full Python module to be built. } diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..d0171ab6 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From 72716350adf04e8cf1d600d79ded4b67ecbd2031 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 20 Mar 2026 16:17:16 -0400 Subject: [PATCH 2/2] make it part of StreamSource --- sentry_streams/sentry_streams/pipeline/pipeline.py | 14 ++++++++++++++ sentry_streams/src/consumer.rs | 10 +++++----- sentry_streams/src/lib.rs | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index a52994f6..edd4a4bc 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]): """ +@dataclass +class DlqConfig: + """ + Configuration for Dead Letter Queue (DLQ). + + When provided, invalid messages will be sent to the DLQ topic + instead of causing the consumer to stop processing. + """ + + topic: str + bootstrap_servers: Sequence[str] + + @dataclass class StreamSource(Source[bytes]): """ @@ -233,6 +246,7 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 913eaa6c..26667351 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -40,7 +40,7 @@ const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; /// When provided, invalid messages will be sent to the DLQ topic. #[pyclass] #[derive(Debug, Clone)] -pub struct DlqConfig { +pub struct PyDlqConfig { /// The Kafka topic name to send invalid messages to #[pyo3(get)] pub topic: String, @@ -51,10 +51,10 @@ pub struct DlqConfig { } #[pymethods] -impl DlqConfig { +impl PyDlqConfig { #[new] fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { - DlqConfig { + PyDlqConfig { topic, producer_config, } @@ -100,7 +100,7 @@ pub struct ArroyoConsumer { /// DLQ (Dead Letter Queue) configuration. /// If provided, invalid messages will be sent to the DLQ topic. /// Otherwise, invalid messages will cause the consumer to stop processing. - dlq_config: Option, + dlq_config: Option, } #[pymethods] @@ -114,7 +114,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, - dlq_config: Option, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index d0171ab6..d5131492 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,7 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?;