diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 110804c9..353e0160 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,11 +9,23 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + enabled: bool + topic: str + producer_config: "KafkaProducerConfig" + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str consumer_group: NotRequired[str] additional_settings: Mapping[str, Any] + dlq: NotRequired[DlqConfig] class KafkaProducerConfig(TypedDict, StepConfig): diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index a52994f6..edd4a4bc 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]): """ +@dataclass +class DlqConfig: + """ + Configuration for Dead Letter Queue (DLQ). + + When provided, invalid messages will be sent to the DLQ topic + instead of causing the consumer to stop processing. + """ + + topic: str + bootstrap_servers: Sequence[str] + + @dataclass class StreamSource(Source[bytes]): """ @@ -233,6 +246,7 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..26667351 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct PyDlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl PyDlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + PyDlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,17 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +114,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +126,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +154,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = self.build_dlq_policy(); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +182,46 @@ impl ArroyoConsumer { } } +// Internal implementation methods (not exposed to Python) +impl ArroyoConsumer { + /// Builds the DLQ policy if dlq_config is provided. + /// Returns None if DLQ is not configured. + fn build_dlq_policy(&self) -> Option> { + match &self.dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = + KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Get tokio runtime handle for async DLQ operations + let handle = self.concurrency_config.handle(); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!( + "DLQ not configured, invalid messages will cause processing to stop" + ); + None + } + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +508,7 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + // Note: DLQ functionality is primarily tested through Python integration tests + // in tests/test_dlq.py, as Rust unit tests require the full Python module to be built. } diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..d5131492 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?;