diff --git a/sentry_streams/sentry_streams/adapters/arroyo/dlq.py b/sentry_streams/sentry_streams/adapters/arroyo/dlq.py new file mode 100644 index 00000000..8ca86c2c --- /dev/null +++ b/sentry_streams/sentry_streams/adapters/arroyo/dlq.py @@ -0,0 +1,272 @@ +""" +Dead Letter Queue (DLQ) implementation for the Rust Arroyo adapter. + +This module provides DLQ functionality that bypasses Arroyo's built-in DLQ system +to avoid memory buffering of raw messages. Instead, it produces only metadata +(topic, partition, offset, error info) to a separate Kafka topic. +""" + +from __future__ import annotations + +import json +import logging +import time +from dataclasses import dataclass +from typing import Any, Callable, Mapping, Sequence + +from confluent_kafka import Producer + +from sentry_streams.config_types import DlqPipelineConfig, DlqStepConfig +from sentry_streams.pipeline.exception import DlqHandledError +from sentry_streams.pipeline.message import Message + +logger = logging.getLogger(__name__) + + +@dataclass +class DlqMetadata: + """Metadata about a failed message that gets sent to the DLQ topic.""" + + original_topic: str + original_partition: int | None + original_offset: int | None + original_key: str | None + step_name: str + consumer_group: str + error: str + error_type: str + timestamp: float + + def to_json(self) -> bytes: + """Serialize the metadata to JSON bytes.""" + return json.dumps( + { + "original_topic": self.original_topic, + "original_partition": self.original_partition, + "original_offset": self.original_offset, + "original_key": self.original_key, + "step_name": self.step_name, + "consumer_group": self.consumer_group, + "error": self.error, + "error_type": self.error_type, + "timestamp": self.timestamp, + } + ).encode("utf-8") + + +class DlqProducer: + """ + A wrapper around a Kafka producer specifically for DLQ messages. + + This producer is used to send metadata about failed messages to a DLQ topic. + It does NOT buffer or store the original message content, only metadata. + """ + + def __init__( + self, + bootstrap_servers: Sequence[str], + default_topic: str, + ) -> None: + self._producer = Producer( + { + "bootstrap.servers": ",".join(bootstrap_servers), + "enable.idempotence": "false", # DLQ doesn't need exactly-once + } + ) + self._default_topic = default_topic + + def produce(self, metadata: DlqMetadata, topic: str | None = None) -> None: + """ + Produce a DLQ metadata message to the specified topic. + + Args: + metadata: The metadata about the failed message + topic: Optional topic override; uses default if not specified + """ + target_topic = topic or self._default_topic + try: + self._producer.produce( + topic=target_topic, + value=metadata.to_json(), + ) + # Trigger delivery reports without blocking + self._producer.poll(0) + except Exception as e: + # Log but don't fail - DLQ should not block processing + logger.error(f"Failed to produce to DLQ topic {target_topic}: {e}") + + def flush(self, timeout: float = 10.0) -> int: + """ + Flush pending DLQ messages. + + Returns the number of messages still in queue (0 if all flushed). + """ + return self._producer.flush(timeout) + + +class DlqStepWrapper: + """ + Wraps a step function with DLQ error handling. + + When an exception occurs during step execution: + 1. If DLQ is enabled for this step, produce metadata to the DLQ topic + 2. Raise DlqHandledError to signal the Rust layer to skip the message + 3. Processing continues normally with the next message + + If DLQ is not enabled, exceptions propagate normally (crash/retry behavior). + """ + + def __init__( + self, + step_name: str, + func: Callable[[Message[Any]], Any], + dlq_config: DlqStepConfig | None, + dlq_producer: DlqProducer | None, + consumer_group: str, + source_topic: str, + ) -> None: + self._step_name = step_name + self._func = func + self._dlq_config = dlq_config + self._dlq_producer = dlq_producer + self._consumer_group = consumer_group + self._source_topic = source_topic + + # Check if DLQ is actually enabled + self._dlq_enabled = ( + dlq_config is not None and dlq_config.get("enabled", False) and dlq_producer is not None + ) + + def __call__(self, msg: Message[Any]) -> Any: + """ + Execute the wrapped function with DLQ error handling. + """ + if not self._dlq_enabled: + # No DLQ configured - let exceptions propagate normally + return self._func(msg) + + try: + return self._func(msg) + except Exception as e: + # Log the original error with full traceback before sending to DLQ + logger.exception( + f"Error in step {self._step_name}, sending to DLQ: " + f"partition={msg.partition}, offset={msg.offset}" + ) + self._produce_to_dlq(msg, e) + raise DlqHandledError() from e + + def _produce_to_dlq(self, msg: Message[Any], error: Exception) -> None: + """ + Produce metadata about the failed message to the DLQ topic. + """ + assert self._dlq_producer is not None + + # Extract key from headers if available + original_key: str | None = None + for header_name, header_value in msg.headers: + if header_name.lower() == "key": + try: + original_key = header_value.decode("utf-8") + except (UnicodeDecodeError, AttributeError): + original_key = str(header_value) + break + + metadata = DlqMetadata( + original_topic=self._source_topic, + original_partition=msg.partition, + original_offset=msg.offset, + original_key=original_key, + step_name=self._step_name, + consumer_group=self._consumer_group, + error=str(error), + error_type=type(error).__name__, + timestamp=time.time(), + ) + + # Use step-specific topic override if configured + topic_override = None + if self._dlq_config is not None: + topic_override = self._dlq_config.get("topic") + + self._dlq_producer.produce(metadata, topic_override) + + +def create_dlq_producer( + dlq_config: DlqPipelineConfig | None, + fallback_bootstrap_servers: Sequence[str] | None = None, +) -> DlqProducer | None: + """ + Create a DLQ producer from pipeline configuration. + + Returns None if DLQ is not configured at the pipeline level. + """ + if dlq_config is None: + return None + + topic = dlq_config.get("topic") + if not topic: + logger.warning("DLQ config provided but no topic specified") + return None + + # Use DLQ-specific bootstrap servers, or fall back to source config + bootstrap_servers = dlq_config.get("bootstrap_servers") + if not bootstrap_servers: + if fallback_bootstrap_servers: + bootstrap_servers = fallback_bootstrap_servers + else: + logger.warning( + "DLQ config provided but no bootstrap_servers specified and no fallback available" + ) + return None + + return DlqProducer( + bootstrap_servers=bootstrap_servers, + default_topic=topic, + ) + + +def wrap_step_with_dlq( + step_name: str, + func: Callable[[Message[Any]], Any], + step_config: Mapping[str, Any] | None, + dlq_producer: DlqProducer | None, + consumer_group: str, + source_topic: str, +) -> Callable[[Message[Any]], Any]: + """ + Wrap a step function with DLQ error handling if configured. + + Args: + step_name: Name of the step (for logging/metadata) + func: The step function to wrap + step_config: The step's configuration dict (may contain 'dlq' key) + dlq_producer: The DLQ producer (or None if not configured) + consumer_group: The Kafka consumer group ID + source_topic: The source Kafka topic name + + Returns: + The wrapped function if DLQ is enabled, otherwise the original function. + """ + dlq_step_config: DlqStepConfig | None = None + if step_config is not None: + dlq_step_config = step_config.get("dlq") # type: ignore[assignment] + + # If no DLQ config or not enabled, return the original function + if dlq_step_config is None or not dlq_step_config.get("enabled", False): + return func + + if dlq_producer is None: + logger.warning( + f"Step {step_name} has DLQ enabled but no DLQ producer is configured at pipeline level" + ) + return func + + return DlqStepWrapper( + step_name=step_name, + func=func, + dlq_config=dlq_step_config, + dlq_producer=dlq_producer, + consumer_group=consumer_group, + source_topic=source_topic, + ) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 76e52a0b..3f928d5a 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -19,6 +19,11 @@ MultiprocessingPool, ) +from sentry_streams.adapters.arroyo.dlq import ( + DlqProducer, + create_dlq_producer, + wrap_step_with_dlq, +) from sentry_streams.adapters.arroyo.multi_process_delegate import ( MultiprocessDelegateFactory, ) @@ -29,6 +34,7 @@ from sentry_streams.adapters.arroyo.steps_chain import TransformChains from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter from sentry_streams.config_types import ( + DlqPipelineConfig, KafkaConsumerConfig, KafkaProducerConfig, MultiProcessConfig, @@ -199,12 +205,16 @@ def __init__( self, steps_config: Mapping[str, StepConfig], metric_config: PyMetricConfig | None = None, + dlq_producer: DlqProducer | None = None, ) -> None: super().__init__() self.steps_config = steps_config self.__metric_config = metric_config + self.__dlq_producer = dlq_producer self.__consumers: MutableMapping[str, ArroyoConsumer] = {} self.__chains = TransformChains() + # Maps source name to (topic, consumer_group) + self.__source_info: MutableMapping[str, tuple[str, str]] = {} @classmethod def build( @@ -214,13 +224,46 @@ def build( ) -> Self: steps_config = config["steps_config"] - return cls(steps_config, metric_config) + # Extract DLQ config and create producer + dlq_config: DlqPipelineConfig | None = config.get("dlq") + dlq_producer: DlqProducer | None = None + + if dlq_config: + # Try to find fallback bootstrap servers from any source config + fallback_servers: Sequence[str] | None = None + for step_cfg in steps_config.values(): + if "bootstrap_servers" in step_cfg: + fallback_servers = step_cfg["bootstrap_servers"] # type: ignore[assignment] + break + + dlq_producer = create_dlq_producer(dlq_config, fallback_servers) + + return cls(steps_config, metric_config, dlq_producer) def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): logger.info(f"Closing transformation chain: {stream} and adding to pipeline") self.__consumers[stream.source].add_step(finalize_chain(self.__chains, stream)) + def __wrap_with_dlq( + self, + step_name: str, + func: Callable[[Message[Any]], Any], + stream: Route, + ) -> Callable[[Message[Any]], Any]: + """Wrap a step function with DLQ error handling if configured.""" + step_config: Mapping[str, Any] = self.steps_config.get(step_name, {}) + source_topic, consumer_group = self.__source_info.get(stream.source, ("", "")) + + return wrap_step_with_dlq( + step_name=step_name, + func=func, + step_config=step_config, + dlq_producer=self.__dlq_producer, + consumer_group=consumer_group, + source_topic=source_topic, + ) + def get_consumer(self, source: str) -> ArroyoConsumer: return self.__consumers[source] @@ -242,6 +285,9 @@ def source(self, step: Source[Any]) -> Route: source_config = self.steps_config.get(source_name) assert source_config is not None, f"Config not provided for source {source_name}" + consumer_group = f"pipeline-{source_name}" + self.__source_info[source_name] = (step.stream_name, consumer_group) + self.__consumers[source_name] = ArroyoConsumer( source=source_name, kafka_config=build_kafka_consumer_config(source_name, source_config), @@ -327,9 +373,9 @@ def map(self, step: Map[Any, Any], stream: Route) -> Route: application_function = step.resolved_function - wrapped_function = functools.partial( - _metrics_wrapped_function, step.name, application_function - ) + # Wrap with DLQ handling first, then metrics + dlq_wrapped = self.__wrap_with_dlq(step.name, application_function, stream) + wrapped_function = functools.partial(_metrics_wrapped_function, step.name, dlq_wrapped) step = replace(step, function=wrapped_function) @@ -381,12 +427,15 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: step.override_config(step_config) step.validate() + # Wrap with DLQ handling + dlq_wrapped = self.__wrap_with_dlq(step.name, step.resolved_function, stream) + def filter_msg(msg: Message[Any]) -> bool: msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None start_time = input_metrics(step.name, msg_size) has_error = output_size = None try: - result = step.resolved_function(msg) + result = dlq_wrapped(msg) output_size = get_size(result) return result except Exception as e: @@ -466,12 +515,15 @@ def router( route = RustRoute(stream.source, stream.waypoints) + # Wrap the routing function with DLQ handling + dlq_wrapped_routing = self.__wrap_with_dlq(step.name, step.routing_function, stream) + def routing_function(msg: Message[Any]) -> str: msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None start_time = input_metrics(step.name, msg_size) has_error = None try: - waypoint = step.routing_function(msg) + waypoint = dlq_wrapped_routing(msg) branch = step.routing_table[waypoint] return branch.root.name except Exception as e: diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index b5486543..c735e6a7 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -21,6 +21,9 @@ "title": "pipeline", "additionalProperties": true, "properties": { + "dlq": { + "$ref": "#/definitions/DlqPipelineConfig" + }, "segments": { "type": "array", "items": { @@ -38,6 +41,40 @@ "segments" ] }, + "DlqPipelineConfig": { + "type": "object", + "title": "DLQ Pipeline Configuration", + "description": "Pipeline-level DLQ configuration. Provides defaults for all steps.", + "properties": { + "topic": { + "type": "string", + "description": "Default DLQ topic for all opted-in steps" + }, + "bootstrap_servers": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Kafka bootstrap servers for DLQ producer" + } + }, + "required": ["topic"] + }, + "DlqStepConfig": { + "type": "object", + "title": "DLQ Step Configuration", + "description": "Per-step DLQ configuration. Steps must explicitly opt-in.", + "properties": { + "enabled": { + "type": "boolean", + "description": "Enable DLQ for this step" + }, + "topic": { + "type": "string", + "description": "Override the pipeline default DLQ topic" + } + } + }, "Env": { "type": "object", "additionalProperties": true diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index a9abc55a..25b08f7a 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -34,3 +34,22 @@ class MultiProcessConfig(TypedDict): output_block_size: int | None max_input_block_size: int | None max_output_block_size: int | None + + +class DlqStepConfig(TypedDict, total=False): + """ + Per-step DLQ configuration. Steps must explicitly opt-in + with enabled=True to have their errors sent to the DLQ. + """ + + enabled: bool # Required to be True for DLQ to be active + topic: str # Optional: override the pipeline default DLQ topic + + +class DlqPipelineConfig(TypedDict, total=False): + """ + Pipeline-level DLQ configuration. Provides defaults for all steps. + """ + + topic: str # Default DLQ topic for all opted-in steps + bootstrap_servers: Sequence[str] # Optional: defaults to source config diff --git a/sentry_streams/sentry_streams/pipeline/exception.py b/sentry_streams/sentry_streams/pipeline/exception.py index 8838f4a7..5c868983 100644 --- a/sentry_streams/sentry_streams/pipeline/exception.py +++ b/sentry_streams/sentry_streams/pipeline/exception.py @@ -8,3 +8,16 @@ class InvalidMessageError(Exception): """ pass + + +class DlqHandledError(Exception): + """ + Exception raised after a message has been successfully sent to the DLQ. + + This exception signals to the Rust runtime that the message has been + handled (sent to DLQ) and should be skipped rather than retried or + causing a crash. The Rust layer will treat this like a filter returning + False - the message is dropped but processing continues normally. + """ + + pass diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index 81a15d8a..730a374a 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -62,6 +62,18 @@ def timestamp(self) -> float: def schema(self) -> str | None: raise NotImplementedError + @property + @abstractmethod + def partition(self) -> int | None: + """The original Kafka partition this message came from (if available).""" + raise NotImplementedError + + @property + @abstractmethod + def offset(self) -> int | None: + """The original Kafka offset this message came from (if available).""" + raise NotImplementedError + @abstractmethod def deepcopy(self) -> Message[TPayload]: raise NotImplementedError @@ -82,6 +94,8 @@ def __eq__(self, other: object) -> bool: and self.headers == other.headers and self.timestamp == other.timestamp and self.schema == other.schema + and self.partition == other.partition + and self.offset == other.offset ) @@ -123,6 +137,16 @@ def timestamp(self) -> float: def schema(self) -> str | None: return self.inner.schema + @property + def partition(self) -> int | None: + """PyAnyMessage does not have partition info.""" + return None + + @property + def offset(self) -> int | None: + """PyAnyMessage does not have offset info.""" + return None + def size(self) -> int | None: if isinstance(self.inner.payload, (str, bytes)): return len(self.inner.payload) @@ -173,8 +197,10 @@ def __init__( headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: Optional[str] = None, + partition: Optional[int] = None, + offset: Optional[int] = None, ) -> None: - self.inner = RawMessage(payload, headers, timestamp, schema) + self.inner = RawMessage(payload, headers, timestamp, schema, partition, offset) @property def payload(self) -> bytes: @@ -192,6 +218,16 @@ def timestamp(self) -> float: def schema(self) -> str | None: return self.inner.schema + @property + def partition(self) -> int | None: + """The original Kafka partition this message came from (if available).""" + return self.inner.partition + + @property + def offset(self) -> int | None: + """The original Kafka offset this message came from (if available).""" + return self.inner.offset + def size(self) -> int | None: return len(self.inner.payload) @@ -210,6 +246,8 @@ def deepcopy(self) -> PyRawMessage: deepcopy(self.inner.headers), self.inner.timestamp, self.inner.schema, + self.inner.partition, + self.inner.offset, ) def __getstate__(self) -> Mapping[str, Any]: @@ -218,6 +256,8 @@ def __getstate__(self) -> Mapping[str, Any]: "headers": self.headers, "timestamp": self.timestamp, "schema": self.schema, + "partition": self.partition, + "offset": self.offset, } def __setstate__(self, state: Mapping[str, Any]) -> None: @@ -226,6 +266,8 @@ def __setstate__(self, state: Mapping[str, Any]) -> None: state["headers"], state["timestamp"], state.get("schema"), + state.get("partition"), + state.get("offset"), ) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 4b039a68..04c51bde 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -124,6 +124,8 @@ class RawMessage: headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: str | None, + partition: int | None = None, + offset: int | None = None, ) -> None: ... @property def payload(self) -> bytes: ... @@ -133,6 +135,10 @@ class RawMessage: def timestamp(self) -> float: ... @property def schema(self) -> str | None: ... + @property + def partition(self) -> int | None: ... + @property + def offset(self) -> int | None: ... class PyWatermark: def __init__( diff --git a/sentry_streams/src/callers.rs b/sentry_streams/src/callers.rs index 867b4305..ee248cc2 100644 --- a/sentry_streams/src/callers.rs +++ b/sentry_streams/src/callers.rs @@ -1,6 +1,7 @@ use pyo3::{import_exception, prelude::*, types::PyTuple}; import_exception!(sentry_streams.pipeline.exception, InvalidMessageError); +import_exception!(sentry_streams.pipeline.exception, DlqHandledError); pub type ApplyResult = Result; @@ -8,6 +9,10 @@ pub type ApplyResult = Result; pub enum ApplyError { InvalidMessage, ApplyFailed, + /// The message was handled (e.g., sent to DLQ) and should be skipped. + /// This is similar to a filter returning false - the message is dropped + /// but processing continues normally. + Skipped, } pub fn try_apply_py<'py, N>( @@ -22,6 +27,8 @@ where py_err.print(py); if py_err.is_instance(py, &py.get_type::()) { ApplyError::InvalidMessage + } else if py_err.is_instance(py, &py.get_type::()) { + ApplyError::Skipped } else { ApplyError::ApplyFailed } @@ -61,6 +68,25 @@ mod tests { }); } + #[test] + fn test_apply_py_dlq_handled_err() { + crate::testutils::initialize_python(); + + import_py_dep("sentry_streams.pipeline.exception", "DlqHandledError"); + + traced_with_gil!(|py| { + let callable = make_lambda( + py, + c_str!("lambda: (_ for _ in ()).throw(DlqHandledError())"), + ); + + assert!(matches!( + try_apply_py(py, &callable, ()), + Err(ApplyError::Skipped) + )); + }); + } + #[test] fn test_apply_py_throws_other_exception() { crate::testutils::initialize_python(); diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index a8e30a5a..5ab7023d 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -26,7 +26,7 @@ use sentry_arroyo::processing::strategies::ProcessingStrategy; use sentry_arroyo::processing::strategies::ProcessingStrategyFactory; use sentry_arroyo::processing::ProcessorHandle; use sentry_arroyo::processing::StreamProcessor; -use sentry_arroyo::types::{Message, Topic}; +use sentry_arroyo::types::{InnerMessage, Message, Topic}; use std::sync::Arc; /// The class that represent the consumer. @@ -175,11 +175,23 @@ fn to_routed_value( Some(ts) => ts.timestamp_millis() as f64 / 1_000_000_000.0, None => 0.0, // Default to 0 if no timestamp is available }; + + // Extract partition and offset from the message for DLQ metadata + let (partition, offset) = match &message.inner_message { + InnerMessage::BrokerMessage(broker_msg) => ( + Some(broker_msg.partition.index as i32), + Some(broker_msg.offset), + ), + InnerMessage::AnyMessage(_) => (None, None), + }; + let raw_message = RawMessage { payload: raw_payload.to_vec(), headers: transformed_headers, timestamp, schema: schema.clone(), + partition, + offset, }; let py_msg = traced_with_gil!(|py| PyStreamingMessage::RawMessage { content: into_pyraw(py, raw_message).unwrap(), diff --git a/sentry_streams/src/filter_step.rs b/sentry_streams/src/filter_step.rs index 846e7572..e5f8da40 100644 --- a/sentry_streams/src/filter_step.rs +++ b/sentry_streams/src/filter_step.rs @@ -64,7 +64,9 @@ impl ProcessingStrategy for Filter { match (res, &message.inner_message) { (Ok(true), _) => self.next_step.submit(message), (Ok(false), _) => Ok(()), - (Err(ApplyError::ApplyFailed), _) => panic!("Python filter function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"), + // DLQ handled - skip the message and continue processing + (Err(ApplyError::Skipped), _) => Ok(()), + (Err(ApplyError::ApplyFailed), _) => panic!("Python filter function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError or DlqHandledError"), (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())), } diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index e3a6492a..bc6b787a 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -258,16 +258,29 @@ pub struct RawMessage { #[pyo3(get, set)] pub schema: Option, + + /// The original Kafka partition this message came from (if available). + /// Used for DLQ metadata. + #[pyo3(get)] + pub partition: Option, + + /// The original Kafka offset this message came from (if available). + /// Used for DLQ metadata. + #[pyo3(get)] + pub offset: Option, } #[pymethods] impl RawMessage { #[new] + #[pyo3(signature = (payload, headers, timestamp, schema, partition=None, offset=None))] pub fn new( payload: Py, headers: Py, timestamp: f64, schema: Option, + partition: Option, + offset: Option, py: Python, ) -> PyResult { Ok(Self { @@ -275,6 +288,8 @@ impl RawMessage { headers: headers_to_vec(py, headers)?, timestamp, schema, + partition, + offset, }) } @@ -294,13 +309,15 @@ impl RawMessage { headers: self.headers.clone(), timestamp: self.timestamp, schema: self.schema.clone(), + partition: self.partition, + offset: self.offset, } } fn __repr__(&self) -> PyResult { Ok(format!( - "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?})", - self.payload, self.headers, self.timestamp, self.schema + "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?}, partition={:?}, offset={:?})", + self.payload, self.headers, self.timestamp, self.schema, self.partition, self.offset )) } @@ -318,6 +335,8 @@ pub fn replace_raw_payload(message: RawMessage, new_payload: Vec) -> RawMess headers: message.headers, timestamp: message.timestamp, schema: message.schema, + partition: message.partition, + offset: message.offset, } } @@ -639,6 +658,8 @@ mod tests { py_headers.clone_ref(py), timestamp, schema.clone(), + Some(5), // partition + Some(100), // offset py, ) .unwrap(); @@ -677,8 +698,8 @@ mod tests { let repr = pymsg.call_method0(py, "__repr__").unwrap(); let expected_repr = format!( - "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?})", - payload_bytes, headers, timestamp, schema + "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?}, partition={:?}, offset={:?})", + payload_bytes, headers, timestamp, schema, Some(5), Some(100u64) ); assert_eq!(repr.extract::(py).unwrap(), expected_repr); }); @@ -730,9 +751,16 @@ mod tests { let py_headers = headers_to_sequence(py, &headers).unwrap(); let payload_bytes = vec![100, 101, 102, 103]; let py_payload = PyBytes::new(py, &payload_bytes); - let raw_msg = - RawMessage::new(py_payload.unbind(), py_headers.clone_ref(py), 0., None, py) - .unwrap(); + let raw_msg = RawMessage::new( + py_payload.unbind(), + py_headers.clone_ref(py), + 0., + None, + None, + None, + py, + ) + .unwrap(); let py_raw_msg = raw_msg.into_pyobject(py).unwrap().unbind(); let msg = PyStreamingMessage::RawMessage { content: py_raw_msg, diff --git a/sentry_streams/src/routers.rs b/sentry_streams/src/routers.rs index 54de5552..98f71eb6 100644 --- a/sentry_streams/src/routers.rs +++ b/sentry_streams/src/routers.rs @@ -3,43 +3,92 @@ use crate::messages::RoutedValuePayload; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use pyo3::prelude::*; -use sentry_arroyo::processing::strategies::run_task::RunTask; -use sentry_arroyo::processing::strategies::{ProcessingStrategy, SubmitError}; +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; use sentry_arroyo::types::{InnerMessage, Message}; +use std::time::Duration; -#[allow(clippy::result_large_err)] -fn route_message( - route: &Route, - callable: &Py, - message: Message, -) -> Result, SubmitError> { - if message.payload().route != *route { - return Ok(message); +pub struct RouterStep { + pub callable: Py, + pub next_step: Box>, + pub route: Route, +} + +impl RouterStep { + /// A strategy that routes a message to a single route downstream. + /// The route is picked by a Python function passed as PyAny. The python function + /// is expected to return a string that represents the waypoint to add to the + /// route. + /// The strategy also handles messages arriving on different routes; + /// it simply forwards them as-is to the next step. + pub fn new( + callable: Py, + next_step: Box>, + route: Route, + ) -> Self { + Self { + callable, + next_step, + route, + } + } +} + +impl ProcessingStrategy for RouterStep { + fn poll(&mut self) -> Result, StrategyError> { + self.next_step.poll() } - let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) = message.payload().payload - else { - // TODO: a future PR will remove this gate on WatermarkMessage and duplicate it for each downstream route. - return Ok(message); - }; + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + if message.payload().route != self.route { + return self.next_step.submit(message); + } - let res = traced_with_gil!(|py| { - try_apply_py(py, callable, (Into::>::into(py_streaming_msg),)).and_then( - |py_res| { + let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) = + message.payload().payload + else { + // TODO: a future PR will remove this gate on WatermarkMessage and duplicate it for each downstream route. + return self.next_step.submit(message); + }; + + let res = traced_with_gil!(|py| { + try_apply_py( + py, + &self.callable, + (Into::>::into(py_streaming_msg),), + ) + .and_then(|py_res| { py_res .extract::(py) .map_err(|_| ApplyError::ApplyFailed) - }, - ) - }); - - match (res, &message.inner_message) { - (Ok(new_waypoint), _) => { - message.try_map(|payload| Ok(payload.add_waypoint(new_waypoint.clone()))) - }, - (Err(ApplyError::ApplyFailed), _) => panic!("Python route function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"), - (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), - (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())) + }) + }); + + match (res, &message.inner_message) { + (Ok(new_waypoint), _) => { + let new_message = message.try_map(|payload| { + Ok::>( + payload.add_waypoint(new_waypoint.clone()), + ) + })?; + self.next_step.submit(new_message) + } + // DLQ handled - skip the message and continue processing + (Err(ApplyError::Skipped), _) => Ok(()), + (Err(ApplyError::ApplyFailed), _) => panic!("Python route function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError or DlqHandledError"), + (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), + (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())), + } + } + + fn terminate(&mut self) { + self.next_step.terminate() + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.next_step.join(timeout)?; + Ok(None) } } @@ -52,11 +101,7 @@ pub fn build_router( callable: Py, next: Box>, ) -> Box> { - let copied_route = route.clone(); - let mapper = - move |message: Message| route_message(&copied_route, &callable, message); - - Box::new(RunTask::new(mapper, next)) + Box::new(RouterStep::new(callable, next, route.clone())) } #[cfg(test)] @@ -147,13 +192,15 @@ mod tests { } #[test] - #[should_panic( - expected = "Python route function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError" - )] fn test_router_handles_invalid_msg_exception() { crate::testutils::initialize_python(); - let mut router = create_simple_router(c_str!("lambda x: {}[0]"), Noop {}); + import_py_dep("sentry_streams.pipeline.exception", "InvalidMessageError"); + + let mut router = create_simple_router( + c_str!("lambda x: (_ for _ in ()).throw(InvalidMessageError())"), + Noop {}, + ); traced_with_gil!(|py| { let message = Message::new_broker_message( @@ -180,10 +227,25 @@ mod tests { #[test] fn test_route_msg() { + use crate::fake_strategy::assert_messages_match; + use crate::fake_strategy::FakeStrategy; + use std::sync::{Arc, Mutex}; + crate::testutils::initialize_python(); traced_with_gil!(|py| { let callable = make_lambda(py, c_str!("lambda x: 'waypoint2'")); + let submitted_messages = Arc::new(Mutex::new(Vec::new())); + let submitted_messages_clone = submitted_messages.clone(); + let submitted_watermarks = Arc::new(Mutex::new(Vec::new())); + let next_step = FakeStrategy::new(submitted_messages, submitted_watermarks, false); + + let mut router = build_router( + &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), + callable, + Box::new(next_step), + ); + // Message on matching route - should be routed with waypoint added let message = Message::new_any_message( build_routed_value( py, @@ -193,36 +255,32 @@ mod tests { ), BTreeMap::new(), ); + let result = router.submit(message); + assert!(result.is_ok()); - let routed = route_message( - &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), - &callable, - message, + // Message on different route - should pass through unchanged + let message2 = Message::new_any_message( + build_routed_value( + py, + "test_message2".into_py_any(py).unwrap(), + "source3", + vec!["waypoint1".to_string()], + ), + BTreeMap::new(), ); + let result2 = router.submit(message2); + assert!(result2.is_ok()); - let routed = routed.unwrap(); - - assert_eq!( - routed.payload().route, - Route::new( - "source1".to_string(), - vec!["waypoint1".to_string(), "waypoint2".to_string()] - ) - ); + // Check that both messages were submitted + let messages = submitted_messages_clone.lock().unwrap(); + assert_eq!(messages.len(), 2); - let through = route_message( - &Route::new("source3".to_string(), vec!["waypoint1".to_string()]), - &callable, - routed, - ); - let through = through.unwrap(); - assert_eq!( - through.payload().route, - Route::new( - "source1".to_string(), - vec!["waypoint1".to_string(), "waypoint2".to_string()] - ) - ); + // Verify the payloads are correct (FakeStrategy stores payloads, not routes) + let expected_messages = vec![ + "test_message".into_py_any(py).unwrap(), + "test_message2".into_py_any(py).unwrap(), + ]; + assert_messages_match(py, expected_messages, &messages); }); } } diff --git a/sentry_streams/src/testutils.rs b/sentry_streams/src/testutils.rs index e3aadc4e..e8682a07 100644 --- a/sentry_streams/src/testutils.rs +++ b/sentry_streams/src/testutils.rs @@ -89,6 +89,8 @@ pub fn build_raw_routed_value( headers: vec![], timestamp: 0.0, schema: None, + partition: None, + offset: None, }, ) .unwrap(), diff --git a/sentry_streams/src/transformer.rs b/sentry_streams/src/transformer.rs index 4e500e17..691f751f 100644 --- a/sentry_streams/src/transformer.rs +++ b/sentry_streams/src/transformer.rs @@ -4,50 +4,101 @@ use crate::messages::RoutedValuePayload; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use pyo3::prelude::*; -use sentry_arroyo::processing::strategies::run_task::RunTask; -use sentry_arroyo::processing::strategies::{ProcessingStrategy, SubmitError}; +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; use sentry_arroyo::types::{InnerMessage, Message}; +use std::time::Duration; -/// Creates an Arroyo transformer strategy that uses a Python callable to -/// transform messages. The callable is expected to take a Message -/// as input and return a transformed message. The strategy is built on top of -/// the `RunTask` Arroyo strategy. -/// -/// This function takes a `next` step to wire the Arroyo strategy to. -pub fn build_map( - route: &Route, - callable: Py, - next: Box>, -) -> Box> { - let copied_route = route.clone(); - let mapper = move |message: Message| { - if message.payload().route != copied_route { - return Ok(message); +pub struct MapStep { + pub callable: Py, + pub next_step: Box>, + pub route: Route, +} + +impl MapStep { + /// A strategy that takes a callable and applies it to messages to transform them. + /// The callable is expected to take a Message as input and return + /// a transformed message. + /// The strategy also handles messages arriving on different routes; + /// it simply forwards them as-is to the next step. + pub fn new( + callable: Py, + next_step: Box>, + route: Route, + ) -> Self { + Self { + callable, + next_step, + route, + } + } +} + +impl ProcessingStrategy for MapStep { + fn poll(&mut self) -> Result, StrategyError> { + self.next_step.poll() + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + // Messages on different routes or watermark messages are forwarded as-is + if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { + return self.next_step.submit(message); } let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) = message.payload().payload else { - return Ok(message); + return self.next_step.submit(message); }; let route = message.payload().route.clone(); let res = traced_with_gil!(|py| { - try_apply_py(py, &callable, (Into::>::into(py_streaming_msg),)) + try_apply_py( + py, + &self.callable, + (Into::>::into(py_streaming_msg),), + ) }); match (res, &message.inner_message) { - (Ok(transformed), _) => Ok(message.replace(RoutedValue { - route, - payload: RoutedValuePayload::PyStreamingMessage(transformed.into()), - })), - (Err(ApplyError::ApplyFailed), _) => panic!("Python map function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"), + (Ok(transformed), _) => { + let new_message = message.replace(RoutedValue { + route, + payload: RoutedValuePayload::PyStreamingMessage(transformed.into()), + }); + self.next_step.submit(new_message) + } + // DLQ handled - skip the message and continue processing + (Err(ApplyError::Skipped), _) => Ok(()), + (Err(ApplyError::ApplyFailed), _) => panic!("Python map function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError or DlqHandledError"), (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), - (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())) + (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())), } - }; - Box::new(RunTask::new(mapper, next)) + } + + fn terminate(&mut self) { + self.next_step.terminate() + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.next_step.join(timeout)?; + Ok(None) + } +} + +/// Creates an Arroyo transformer strategy that uses a Python callable to +/// transform messages. The callable is expected to take a Message +/// as input and return a transformed message. +/// +/// This function takes a `next` step to wire the Arroyo strategy to. +pub fn build_map( + route: &Route, + callable: Py, + next: Box>, +) -> Box> { + Box::new(MapStep::new(callable, next, route.clone())) } /// Creates an Arroyo-based filter step strategy that uses a Python callable to diff --git a/sentry_streams/tests/rust_test_functions/Cargo.lock b/sentry_streams/tests/rust_test_functions/Cargo.lock index 14488a6a..e91a15d7 100644 --- a/sentry_streams/tests/rust_test_functions/Cargo.lock +++ b/sentry_streams/tests/rust_test_functions/Cargo.lock @@ -29,6 +29,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -112,15 +121,6 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" -[[package]] -name = "cadence" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3075f133bee430b7644c54fb629b9b4420346ffa275a45c81a6babe8b09b4f51" -dependencies = [ - "crossbeam-channel", -] - [[package]] name = "cc" version = "1.2.29" @@ -204,10 +204,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] -name = "crossbeam-channel" -version = "0.5.15" +name = "crossbeam-epoch" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ "crossbeam-utils", ] @@ -267,6 +267,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -295,6 +301,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -438,6 +450,15 @@ version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash", +] + [[package]] name = "heck" version = "0.5.0" @@ -707,7 +728,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.4", ] [[package]] @@ -837,14 +858,38 @@ dependencies = [ ] [[package]] -name = "metrics-exporter-statsd" -version = "0.9.0" +name = "metrics-exporter-dogstatsd" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57971e32cdee88619fb6c673fd89cfd37c12f2dd7c187fb400b7aae0ede0ed9e" +checksum = "961f3712d8a7cfe14caaf74c3af503fe701cee6439ff49a7a3ebd04bf49c0502" dependencies = [ - "cadence", + "bytes", + "itoa", "metrics", - "thiserror 1.0.69", + "metrics-util", + "ryu", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.1", + "rand_xoshiro", + "sketches-ddsketch", ] [[package]] @@ -890,6 +935,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.30.1" @@ -1008,6 +1062,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -1208,6 +1271,21 @@ dependencies = [ "syn", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.1+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.40" @@ -1223,6 +1301,16 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -1282,6 +1370,24 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "rdkafka" version = "0.37.0" @@ -1387,7 +1493,7 @@ dependencies = [ "gcp_auth", "log", "metrics", - "metrics-exporter-statsd", + "metrics-exporter-dogstatsd", "pyo3", "pyo3-build-config 0.25.1", "rdkafka", @@ -1657,6 +1763,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.10"