-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(streams): Add topic name override for StreamSource and StreamSink #257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -248,6 +248,11 @@ def source(self, step: Source[Any]) -> Route: | |
| source_config = self.steps_config.get(source_name) | ||
| assert source_config is not None, f"Config not provided for source {source_name}" | ||
|
|
||
| # Apply config overrides and validate | ||
| step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) | ||
| step.override_config(step_config) | ||
| step.validate() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant config fetch in rust_arroyo source methodLow Severity
|
||
|
|
||
| assert isinstance(self.__write_healthcheck, bool) | ||
| self.__consumers[source_name] = ArroyoConsumer( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: When a topic name is overridden, the new topic name is incorrectly used as the Suggested FixPreserve the original logical stream name before any overrides are applied. Pass this original name to the Prompt for AI Agent |
||
| source=source_name, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -237,6 +237,11 @@ class StreamSource(Source[bytes]): | |
| def register(self, ctx: Pipeline[bytes], previous: Step) -> None: | ||
| super().register(ctx, previous) | ||
|
|
||
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Falsy topic values silently skip the overrideLow Severity Both Additional Locations (1)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't care about falsey topics. |
||
|
|
||
|
|
||
| @dataclass | ||
| class WithInput(Step, Generic[TIn]): | ||
|
|
@@ -309,6 +314,11 @@ class StreamSink(Sink[TIn]): | |
| stream_name: str | ||
| step_type: StepType = StepType.SINK | ||
|
|
||
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) | ||
|
|
||
|
|
||
| @dataclass | ||
| class DevNullSink(Sink[TIn]): | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The legacy
ArroyoAdapter.sink()method doesn't calloverride_config(), causing topic overrides forStreamSinkto be silently ignored, unlike the new Rust adapter.Severity: HIGH
Suggested Fix
In
sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding theStreamSinkStep, callstep.override_config()to apply any configuration overrides. This can be done by fetching thestep_configfromself.steps_configand passing it tostep.override_config(step_config), mirroring the implementation inadd_source()and therust_arroyo.pyadapter'ssink()method.Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.