Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sentry_streams/sentry_streams/config_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@ class StepConfig(TypedDict):
starts_segment: Optional[bool]


class DlqConfig(TypedDict, total=False):
"""
Dead Letter Queue configuration for a StreamSource.
All fields are optional to allow for default behavior.
"""

enabled: bool
topic: str
producer_config: "KafkaProducerConfig"

Comment on lines +17 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The Python adapter in rust_arroyo.py doesn't read the dlq configuration or pass it to the Rust ArroyoConsumer, rendering the DLQ feature non-functional.
Severity: CRITICAL

Suggested Fix

Update rust_arroyo.py to read the dlq configuration from the source config. If present, construct the corresponding Rust DlqConfig object and pass it as the dlq_config argument when initializing the ArroyoConsumer. Add integration tests to verify the DLQ functionality.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/config_types.py#L17-L21

Potential issue: The pull request adds Dead Letter Queue (DLQ) support, with a
`DlqConfig` defined in Python and the Rust consumer expecting a `dlq_config` parameter.
However, the Python adapter in `rust_arroyo.py` that instantiates the `ArroyoConsumer`
never reads the `dlq` key from the configuration dictionary and does not pass it during
initialization. As a result, any user-provided DLQ configuration will be silently
ignored, and the DLQ functionality will not work. Invalid messages will be dropped
instead of being routed to the configured dead-letter topic.

Did we get this right? 👍 / 👎 to inform future reviews.


class KafkaConsumerConfig(TypedDict, StepConfig):
bootstrap_servers: Sequence[str]
auto_offset_reset: str
consumer_group: NotRequired[str]
additional_settings: Mapping[str, Any]
dlq: NotRequired[DlqConfig]


class KafkaProducerConfig(TypedDict, StepConfig):
Expand Down
14 changes: 14 additions & 0 deletions sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]):
"""


@dataclass
class DlqConfig:
"""
Configuration for Dead Letter Queue (DLQ).

When provided, invalid messages will be sent to the DLQ topic
instead of causing the consumer to stop processing.
"""

topic: str
bootstrap_servers: Sequence[str]


@dataclass
class StreamSource(Source[bytes]):
"""
Expand All @@ -233,6 +246,7 @@ class StreamSource(Source[bytes]):
stream_name: str
header_filter: Optional[Tuple[str, bytes]] = None
consumer_group: Optional[str] = None
dlq_config: Optional[DlqConfig] = None
step_type: StepType = StepType.SOURCE

def register(self, ctx: Pipeline[bytes], previous: Step) -> None:
Expand Down
88 changes: 85 additions & 3 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! The pipeline is built by adding RuntimeOperators to the consumer.

use crate::commit_policy::WatermarkCommitOffsets;
use crate::kafka_config::PyKafkaConsumerConfig;
use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig};
use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload};
use crate::metrics::configure_metrics;
use crate::metrics_config::PyMetricConfig;
Expand All @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil;
use crate::watermark::WatermarkEmitter;
use pyo3::prelude::*;
use rdkafka::message::{Header, Headers, OwnedHeaders};
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer};
use sentry_arroyo::processing::strategies::healthcheck::HealthCheck;
use sentry_arroyo::processing::strategies::noop::Noop;
use sentry_arroyo::processing::strategies::run_task::RunTask;
Expand All @@ -34,6 +36,31 @@ use std::sync::Arc;
/// Matches Arroyo docs for Kubernetes liveness probes.
const HEALTHCHECK_PATH: &str = "/tmp/health.txt";

/// Configuration for Dead Letter Queue (DLQ).
/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
/// The Kafka topic name to send invalid messages to
#[pyo3(get)]
pub topic: String,

/// The Kafka producer configuration for the DLQ
#[pyo3(get)]
pub producer_config: PyKafkaProducerConfig,
}

#[pymethods]
impl PyDlqConfig {
#[new]
fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self {
PyDlqConfig {
topic,
producer_config,
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python type stub missing new DLQ class and parameter

Medium Severity

The rust_streams.pyi type stub is not updated to reflect the new Rust-side changes. The PyDlqConfig class exposed via m.add_class and the new dlq_config parameter on ArroyoConsumer.__init__ are missing from the stub. This file is the typed interface used by mypy and IDEs for the Rust extension module, and the project has mypy integration tests that rely on it. When PR3 wires the DLQ feature through Python, the missing stub definitions will cause type-checking failures.

Fix in Cursor Fix in Web


/// The class that represent the consumer.
/// This class is exposed to python and it is the main entry point
/// used by the Python adapter to build a pipeline and run it.
Expand Down Expand Up @@ -69,19 +96,25 @@ pub struct ArroyoConsumer {

/// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness.
write_healthcheck: bool,

/// DLQ (Dead Letter Queue) configuration.
/// If provided, invalid messages will be sent to the DLQ topic.
/// Otherwise, invalid messages will cause the consumer to stop processing.
dlq_config: Option<PyDlqConfig>,
}

#[pymethods]
impl ArroyoConsumer {
#[new]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))]
fn new(
source: String,
kafka_config: PyKafkaConsumerConfig,
topic: String,
schema: Option<String>,
metric_config: Option<PyMetricConfig>,
write_healthcheck: bool,
dlq_config: Option<PyDlqConfig>,
) -> Self {
ArroyoConsumer {
consumer_config: kafka_config,
Expand All @@ -93,6 +126,7 @@ impl ArroyoConsumer {
concurrency_config: Arc::new(ConcurrencyConfig::new(1)),
metric_config,
write_healthcheck,
dlq_config,
}
}

Expand Down Expand Up @@ -120,7 +154,12 @@ impl ArroyoConsumer {
self.write_healthcheck,
);
let config = self.consumer_config.clone().into();
let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None);

// Build DLQ policy if configured
let dlq_policy = self.build_dlq_policy();

let processor =
StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy);
self.handle = Some(processor.get_handle());

let mut handle = processor.get_handle();
Expand All @@ -143,6 +182,46 @@ impl ArroyoConsumer {
}
}

// Internal implementation methods (not exposed to Python)
impl ArroyoConsumer {
/// Builds the DLQ policy if dlq_config is provided.
/// Returns None if DLQ is not configured.
fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> {
match &self.dlq_config {
Some(dlq_config) => {
tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic);

// Create Kafka producer for DLQ
let producer_config = dlq_config.producer_config.clone().into();
let kafka_producer = KafkaProducer::new(producer_config);
let dlq_producer =
KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic));

// Get tokio runtime handle for async DLQ operations
let handle = self.concurrency_config.handle();

// Use default DLQ limits (no limits) and no max buffered messages
// These can be made configurable in a future PR if needed
let dlq_limit = DlqLimit::default();
let max_buffered_messages = None;

Some(DlqPolicy::new(
handle,
Box::new(dlq_producer),
dlq_limit,
max_buffered_messages,
))
}
None => {
tracing::info!(
"DLQ not configured, invalid messages will cause processing to stop"
);
None
}
}
}
}

/// Converts a Message<KafkaPayload> to a Message<RoutedValue>.
///
/// The messages we send around between steps in the pipeline contain
Expand Down Expand Up @@ -429,4 +508,7 @@ mod tests {
let _ = std::fs::remove_file(healthcheck_path);
})
}

// Note: DLQ functionality is primarily tested through Python integration tests
// in tests/test_dlq.py, as Rust unit tests require the full Python module to be built.
}
1 change: 1 addition & 0 deletions sentry_streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<kafka_config::PyKafkaProducerConfig>()?;
m.add_class::<kafka_config::InitialOffset>()?;
m.add_class::<consumer::ArroyoConsumer>()?;
m.add_class::<consumer::PyDlqConfig>()?;
m.add_class::<metrics_config::PyMetricConfig>()?;
m.add_class::<messages::PyAnyMessage>()?;
m.add_class::<messages::RawMessage>()?;
Expand Down
Loading