diff --git a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py index c4bb3be2..dfa776a4 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py @@ -104,6 +104,11 @@ def add_source(self, step: Source[Any]) -> None: ) ) + # Apply config overrides and validate + step_config: Mapping[str, Any] = self.config.get(step.name, {}) + step.override_config(step_config) + step.validate() + self.__source_topics[source_name] = Topic(step.stream_name) def get_topic(self, source: str) -> Topic: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 1989994a..038218e4 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -248,6 +248,11 @@ def source(self, step: Source[Any]) -> Route: source_config = self.steps_config.get(source_name) assert source_config is not None, f"Config not provided for source {source_name}" + # Apply config overrides and validate + step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) + step.override_config(step_config) + step.validate() + assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( source=source_name, diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 69b5b224..ad95056e 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -237,6 +237,11 @@ class StreamSource(Source[bytes]): def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) + def override_config(self, loaded_config: Mapping[str, Any]) -> None: + """Override topic name from deployment configuration.""" + if loaded_config.get("topic"): + self.stream_name = str(loaded_config.get("topic")) + @dataclass class WithInput(Step, Generic[TIn]): @@ -309,6 +314,11 @@ class StreamSink(Sink[TIn]): stream_name: str step_type: StepType = StepType.SINK + def override_config(self, loaded_config: Mapping[str, Any]) -> None: + """Override topic name from deployment configuration.""" + if loaded_config.get("topic"): + self.stream_name = str(loaded_config.get("topic")) + @dataclass class DevNullSink(Sink[TIn]): diff --git a/sentry_streams/tests/test_pipeline.py b/sentry_streams/tests/test_pipeline.py index 94d917bb..61b91cb2 100644 --- a/sentry_streams/tests/test_pipeline.py +++ b/sentry_streams/tests/test_pipeline.py @@ -524,3 +524,51 @@ def test_gcssink_override_config_empty() -> None: assert sink.bucket == "original-bucket" assert sink.thread_count == 3 + + +def test_streamsource_override_config() -> None: + """Test that StreamSource topic can be overridden from deployment config.""" + from sentry_streams.pipeline.pipeline import StreamSource + + source = StreamSource(name="my_source", stream_name="original-topic") + + config = {"topic": "overridden-topic"} + source.override_config(config) + + assert source.stream_name == "overridden-topic" + + +def test_streamsource_override_config_empty() -> None: + """Test that StreamSource handles empty config correctly.""" + from sentry_streams.pipeline.pipeline import StreamSource + + source = StreamSource(name="my_source", stream_name="original-topic") + + config: Mapping[str, Any] = {} + source.override_config(config) + + assert source.stream_name == "original-topic" + + +def test_streamsink_override_config() -> None: + """Test that StreamSink topic can be overridden from deployment config.""" + from sentry_streams.pipeline.pipeline import StreamSink + + sink = StreamSink[str](name="my_sink", stream_name="original-topic") + + config = {"topic": "overridden-topic"} + sink.override_config(config) + + assert sink.stream_name == "overridden-topic" + + +def test_streamsink_override_config_empty() -> None: + """Test that StreamSink handles empty config correctly.""" + from sentry_streams.pipeline.pipeline import StreamSink + + sink = StreamSink[str](name="my_sink", stream_name="original-topic") + + config: Mapping[str, Any] = {} + sink.override_config(config) + + assert sink.stream_name == "original-topic" diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 1925241c..890ee6fb 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.39" +version = "0.0.40" source = { editable = "." } dependencies = [ { name = "click" },