From ba33ca8e0f7c7e591c98664b172202de45c0bbdf Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 12 Mar 2026 19:34:07 -0700 Subject: [PATCH] feat(streams): Add topic name override for StreamSource and StreamSink Add the ability to override topic names for StreamSource and StreamSink steps via deployment configuration using the "topic" key. The override is optional and falls back to the hardcoded stream_name if not provided. Changes: - Add override_config() method to StreamSource class - Add override_config() method to StreamSink class - Update Arroyo Rust adapter to call override_config() for sources - Update Arroyo legacy adapter to call override_config() for sources - Add comprehensive test coverage (4 new tests) This follows the existing override_config() pattern used by GCSSink and maintains full backwards compatibility with existing pipelines. Co-Authored-By: Claude Sonnet 4.5 --- .../sentry_streams/adapters/arroyo/adapter.py | 5 ++ .../adapters/arroyo/rust_arroyo.py | 5 ++ .../sentry_streams/pipeline/pipeline.py | 10 ++++ sentry_streams/tests/test_pipeline.py | 48 +++++++++++++++++++ sentry_streams/uv.lock | 4 +- 5 files changed, 70 insertions(+), 2 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py index c4bb3be2..dfa776a4 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py @@ -104,6 +104,11 @@ def add_source(self, step: Source[Any]) -> None: ) ) + # Apply config overrides and validate + step_config: Mapping[str, Any] = self.config.get(step.name, {}) + step.override_config(step_config) + step.validate() + self.__source_topics[source_name] = Topic(step.stream_name) def get_topic(self, source: str) -> Topic: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 1989994a..038218e4 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -248,6 +248,11 @@ def source(self, step: Source[Any]) -> Route: source_config = self.steps_config.get(source_name) assert source_config is not None, f"Config not provided for source {source_name}" + # Apply config overrides and validate + step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) + step.override_config(step_config) + step.validate() + assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( source=source_name, diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 69b5b224..ad95056e 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -237,6 +237,11 @@ class StreamSource(Source[bytes]): def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) + def override_config(self, loaded_config: Mapping[str, Any]) -> None: + """Override topic name from deployment configuration.""" + if loaded_config.get("topic"): + self.stream_name = str(loaded_config.get("topic")) + @dataclass class WithInput(Step, Generic[TIn]): @@ -309,6 +314,11 @@ class StreamSink(Sink[TIn]): stream_name: str step_type: StepType = StepType.SINK + def override_config(self, loaded_config: Mapping[str, Any]) -> None: + """Override topic name from deployment configuration.""" + if loaded_config.get("topic"): + self.stream_name = str(loaded_config.get("topic")) + @dataclass class DevNullSink(Sink[TIn]): diff --git a/sentry_streams/tests/test_pipeline.py b/sentry_streams/tests/test_pipeline.py index 94d917bb..61b91cb2 100644 --- a/sentry_streams/tests/test_pipeline.py +++ b/sentry_streams/tests/test_pipeline.py @@ -524,3 +524,51 @@ def test_gcssink_override_config_empty() -> None: assert sink.bucket == "original-bucket" assert sink.thread_count == 3 + + +def test_streamsource_override_config() -> None: + """Test that StreamSource topic can be overridden from deployment config.""" + from sentry_streams.pipeline.pipeline import StreamSource + + source = StreamSource(name="my_source", stream_name="original-topic") + + config = {"topic": "overridden-topic"} + source.override_config(config) + + assert source.stream_name == "overridden-topic" + + +def test_streamsource_override_config_empty() -> None: + """Test that StreamSource handles empty config correctly.""" + from sentry_streams.pipeline.pipeline import StreamSource + + source = StreamSource(name="my_source", stream_name="original-topic") + + config: Mapping[str, Any] = {} + source.override_config(config) + + assert source.stream_name == "original-topic" + + +def test_streamsink_override_config() -> None: + """Test that StreamSink topic can be overridden from deployment config.""" + from sentry_streams.pipeline.pipeline import StreamSink + + sink = StreamSink[str](name="my_sink", stream_name="original-topic") + + config = {"topic": "overridden-topic"} + sink.override_config(config) + + assert sink.stream_name == "overridden-topic" + + +def test_streamsink_override_config_empty() -> None: + """Test that StreamSink handles empty config correctly.""" + from sentry_streams.pipeline.pipeline import StreamSink + + sink = StreamSink[str](name="my_sink", stream_name="original-topic") + + config: Mapping[str, Any] = {} + sink.override_config(config) + + assert sink.stream_name == "original-topic" diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 01ea3e23..f78a03ae 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" [[package]] @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.37" +version = "0.0.38" source = { editable = "." } dependencies = [ { name = "click" },