diff --git a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py index dfa776a4..e7aaa14d 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py @@ -88,27 +88,29 @@ def add_source(self, step: Source[Any]) -> None: assert isinstance(step, StreamSource), "Only Stream Sources are supported" source_name = step.name + step_config: Mapping[str, Any] = self.config.get(source_name, {}) + step.override_config(step_config) + step.validate() + if source_name not in self.__sources: source_config = self.config.get(source_name) assert source_config is not None, f"Config not provided for source {source_name}" source_config = cast(KafkaConsumerConfig, source_config) + group_id = step.consumer_group or source_config.get( + "consumer_group", f"pipeline-{source_name}" + ) self.__sources[source_name] = KafkaConsumer( build_kafka_consumer_configuration( default_config=source_config.get("additional_settings", {}), bootstrap_servers=source_config.get("bootstrap_servers", ["localhost: 9092"]), auto_offset_reset=(source_config.get("auto_offset_reset", "latest")), - group_id=f"pipeline-{source_name}", + group_id=group_id, ) ) - # Apply config overrides and validate - step_config: Mapping[str, Any] = self.config.get(step.name, {}) - step.override_config(step_config) - step.validate() - self.__source_topics[source_name] = Topic(step.stream_name) def get_topic(self, source: str) -> Topic: @@ -165,6 +167,7 @@ def source(self, step: Source[Any]) -> Route: # This is the Arroyo adapter, and it only supports consuming from StreamSource anyways assert isinstance(step, StreamSource) + # schema is logical stream name for codec lookup (StreamSources.add_source already applied overrides) self.__consumers[source_name] = ArroyoConsumer( source_name, step.stream_name, step.stream_name, step.header_filter ) @@ -184,17 +187,20 @@ def sink(self, step: Sink[Any], stream: Route) -> Route: assert isinstance(step, StreamSink), "Only Stream Sinks are supported" sink_name = step.name + step_override_config: Mapping[str, Any] = self.steps_config.get(sink_name, {}) + step.override_config(step_override_config) + if sink_name not in self.__sinks: - sink_config = self.steps_config.get(sink_name) - assert sink_config is not None, f"Config not provided for sink {sink_name}" + producer_config = self.steps_config.get(sink_name) + assert producer_config is not None, f"Config not provided for sink {sink_name}" - sink_config = cast(KafkaProducerConfig, sink_config) + producer_config = cast(KafkaProducerConfig, producer_config) producer = KafkaProducer( build_kafka_configuration( - default_config=sink_config.get("additional_settings", {}), - bootstrap_servers=sink_config.get("bootstrap_servers", "localhost:9092"), + default_config=producer_config.get("additional_settings", {}), + bootstrap_servers=producer_config.get("bootstrap_servers", "localhost:9092"), ) ) else: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 038218e4..8fc4a2d9 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -131,13 +131,19 @@ def build_initial_offset(offset_reset: str) -> InitialOffset: raise ValueError(f"Invalid offset reset value: {offset_reset}") -def build_kafka_consumer_config(source: str, source_config: StepConfig) -> PyKafkaConsumerConfig: +def build_kafka_consumer_config( + source: str, + source_config: StepConfig, + consumer_group_override: str | None = None, +) -> PyKafkaConsumerConfig: """ Build the Kafka consumer configuration for the source. """ consumer_config = cast(KafkaConsumerConfig, source_config) bootstrap_servers = consumer_config["bootstrap_servers"] - group_id = f"pipeline-{source}" + group_id = ( + consumer_group_override or consumer_config.get("consumer_group") or f"pipeline-{source}" + ) auto_offset_reset = build_initial_offset(consumer_config.get("auto_offset_reset", "latest")) strict_offset_reset = bool(consumer_config.get("strict_offset_reset", False)) override_params = cast(Mapping[str, str], consumer_config.get("override_params", {})) @@ -248,17 +254,20 @@ def source(self, step: Source[Any]) -> Route: source_config = self.steps_config.get(source_name) assert source_config is not None, f"Config not provided for source {source_name}" - # Apply config overrides and validate - step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) + # Apply config overrides (topic, consumer_group); preserve logical stream for schema + step_config: Mapping[str, Any] = self.steps_config.get(source_name, {}) + schema_name = step.stream_name # Logical name for schema/codec lookup (before override) step.override_config(step_config) step.validate() assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( source=source_name, - kafka_config=build_kafka_consumer_config(source_name, source_config), + kafka_config=build_kafka_consumer_config( + source_name, source_config, step.consumer_group + ), topic=step.stream_name, - schema=step.stream_name, + schema=schema_name, metric_config=self.__metric_config, write_healthcheck=self.__write_healthcheck, ) diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index a9abc55a..110804c9 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -1,4 +1,4 @@ -from typing import Any, Mapping, Optional, Sequence, TypedDict +from typing import Any, Mapping, NotRequired, Optional, Sequence, TypedDict class StepConfig(TypedDict): @@ -12,7 +12,7 @@ class StepConfig(TypedDict): class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str - consumer_group: str + consumer_group: NotRequired[str] additional_settings: Mapping[str, Any] diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index ad95056e..a52994f6 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -232,15 +232,18 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None + consumer_group: Optional[str] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic name from deployment configuration.""" + """Override topic and consumer_group from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) + if loaded_config.get("consumer_group"): + self.consumer_group = str(loaded_config.get("consumer_group")) @dataclass diff --git a/sentry_streams/tests/test_pipeline.py b/sentry_streams/tests/test_pipeline.py index 61b91cb2..a6fcf59d 100644 --- a/sentry_streams/tests/test_pipeline.py +++ b/sentry_streams/tests/test_pipeline.py @@ -527,15 +527,20 @@ def test_gcssink_override_config_empty() -> None: def test_streamsource_override_config() -> None: - """Test that StreamSource topic can be overridden from deployment config.""" + """Test that StreamSource topic and consumer_group can be overridden from deployment config.""" from sentry_streams.pipeline.pipeline import StreamSource - source = StreamSource(name="my_source", stream_name="original-topic") + source = StreamSource(name="my_source", stream_name="events") + assert source.stream_name == "events" + assert source.consumer_group is None - config = {"topic": "overridden-topic"} - source.override_config(config) + source.override_config({"topic": "production-events-v2", "consumer_group": "my-consumer-group"}) + assert source.stream_name == "production-events-v2" + assert source.consumer_group == "my-consumer-group" - assert source.stream_name == "overridden-topic" + source.override_config({}) + assert source.stream_name == "production-events-v2" + assert source.consumer_group == "my-consumer-group" def test_streamsource_override_config_empty() -> None: @@ -554,12 +559,14 @@ def test_streamsink_override_config() -> None: """Test that StreamSink topic can be overridden from deployment config.""" from sentry_streams.pipeline.pipeline import StreamSink - sink = StreamSink[str](name="my_sink", stream_name="original-topic") + sink = StreamSink[str](name="my_sink", stream_name="output") + assert sink.stream_name == "output" - config = {"topic": "overridden-topic"} - sink.override_config(config) + sink.override_config({"topic": "production-output-v2"}) + assert sink.stream_name == "production-output-v2" - assert sink.stream_name == "overridden-topic" + sink.override_config({}) + assert sink.stream_name == "production-output-v2" def test_streamsink_override_config_empty() -> None: