-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(dlq): add dlq support rust side #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
| //! The pipeline is built by adding RuntimeOperators to the consumer. | ||
|
|
||
| use crate::commit_policy::WatermarkCommitOffsets; | ||
| use crate::kafka_config::PyKafkaConsumerConfig; | ||
| use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; | ||
| use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; | ||
| use crate::metrics::configure_metrics; | ||
| use crate::metrics_config::PyMetricConfig; | ||
|
|
@@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; | |
| use crate::watermark::WatermarkEmitter; | ||
| use pyo3::prelude::*; | ||
| use rdkafka::message::{Header, Headers, OwnedHeaders}; | ||
| use sentry_arroyo::backends::kafka::producer::KafkaProducer; | ||
| use sentry_arroyo::backends::kafka::types::KafkaPayload; | ||
| use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; | ||
| use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; | ||
| use sentry_arroyo::processing::strategies::noop::Noop; | ||
| use sentry_arroyo::processing::strategies::run_task::RunTask; | ||
|
|
@@ -34,6 +36,31 @@ use std::sync::Arc; | |
| /// Matches Arroyo docs for Kubernetes liveness probes. | ||
| const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; | ||
|
|
||
| /// Configuration for Dead Letter Queue (DLQ). | ||
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { | ||
| /// The Kafka topic name to send invalid messages to | ||
| #[pyo3(get)] | ||
| pub topic: String, | ||
|
|
||
| /// The Kafka producer configuration for the DLQ | ||
| #[pyo3(get)] | ||
| pub producer_config: PyKafkaProducerConfig, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl PyDlqConfig { | ||
| #[new] | ||
| fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { | ||
| PyDlqConfig { | ||
| topic, | ||
| producer_config, | ||
| } | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Python type stub missing new DLQ class and parameterMedium Severity The |
||
|
|
||
| /// The class that represent the consumer. | ||
| /// This class is exposed to python and it is the main entry point | ||
| /// used by the Python adapter to build a pipeline and run it. | ||
|
|
@@ -69,19 +96,25 @@ pub struct ArroyoConsumer { | |
|
|
||
| /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. | ||
| write_healthcheck: bool, | ||
|
|
||
| /// DLQ (Dead Letter Queue) configuration. | ||
| /// If provided, invalid messages will be sent to the DLQ topic. | ||
| /// Otherwise, invalid messages will cause the consumer to stop processing. | ||
| dlq_config: Option<PyDlqConfig>, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl ArroyoConsumer { | ||
| #[new] | ||
| #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] | ||
| #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] | ||
| fn new( | ||
| source: String, | ||
| kafka_config: PyKafkaConsumerConfig, | ||
| topic: String, | ||
| schema: Option<String>, | ||
| metric_config: Option<PyMetricConfig>, | ||
| write_healthcheck: bool, | ||
| dlq_config: Option<PyDlqConfig>, | ||
| ) -> Self { | ||
| ArroyoConsumer { | ||
| consumer_config: kafka_config, | ||
|
|
@@ -93,6 +126,7 @@ impl ArroyoConsumer { | |
| concurrency_config: Arc::new(ConcurrencyConfig::new(1)), | ||
| metric_config, | ||
| write_healthcheck, | ||
| dlq_config, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -120,7 +154,12 @@ impl ArroyoConsumer { | |
| self.write_healthcheck, | ||
| ); | ||
| let config = self.consumer_config.clone().into(); | ||
| let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); | ||
|
|
||
| // Build DLQ policy if configured | ||
| let dlq_policy = self.build_dlq_policy(); | ||
|
|
||
| let processor = | ||
| StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); | ||
| self.handle = Some(processor.get_handle()); | ||
|
|
||
| let mut handle = processor.get_handle(); | ||
|
|
@@ -143,6 +182,46 @@ impl ArroyoConsumer { | |
| } | ||
| } | ||
|
|
||
| // Internal implementation methods (not exposed to Python) | ||
| impl ArroyoConsumer { | ||
| /// Builds the DLQ policy if dlq_config is provided. | ||
| /// Returns None if DLQ is not configured. | ||
| fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> { | ||
| match &self.dlq_config { | ||
| Some(dlq_config) => { | ||
| tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); | ||
|
|
||
| // Create Kafka producer for DLQ | ||
| let producer_config = dlq_config.producer_config.clone().into(); | ||
| let kafka_producer = KafkaProducer::new(producer_config); | ||
| let dlq_producer = | ||
| KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); | ||
|
|
||
| // Get tokio runtime handle for async DLQ operations | ||
| let handle = self.concurrency_config.handle(); | ||
|
|
||
| // Use default DLQ limits (no limits) and no max buffered messages | ||
| // These can be made configurable in a future PR if needed | ||
| let dlq_limit = DlqLimit::default(); | ||
| let max_buffered_messages = None; | ||
|
|
||
| Some(DlqPolicy::new( | ||
| handle, | ||
| Box::new(dlq_producer), | ||
| dlq_limit, | ||
| max_buffered_messages, | ||
| )) | ||
| } | ||
| None => { | ||
| tracing::info!( | ||
| "DLQ not configured, invalid messages will cause processing to stop" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Converts a Message<KafkaPayload> to a Message<RoutedValue>. | ||
| /// | ||
| /// The messages we send around between steps in the pipeline contain | ||
|
|
@@ -429,4 +508,7 @@ mod tests { | |
| let _ = std::fs::remove_file(healthcheck_path); | ||
| }) | ||
| } | ||
|
|
||
| // Note: DLQ functionality is primarily tested through Python integration tests | ||
| // in tests/test_dlq.py, as Rust unit tests require the full Python module to be built. | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The Python adapter in
rust_arroyo.pydoesn't read thedlqconfiguration or pass it to the RustArroyoConsumer, rendering the DLQ feature non-functional.Severity: CRITICAL
Suggested Fix
Update
rust_arroyo.pyto read thedlqconfiguration from the source config. If present, construct the corresponding RustDlqConfigobject and pass it as thedlq_configargument when initializing theArroyoConsumer. Add integration tests to verify the DLQ functionality.Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.