feat(streams): Add topic name override for StreamSource and StreamSink#257
feat(streams): Add topic name override for StreamSource and StreamSink#257
Conversation
Add the ability to override topic names for StreamSource and StreamSink steps via deployment configuration using the "topic" key. The override is optional and falls back to the hardcoded stream_name if not provided. Changes: - Add override_config() method to StreamSource class - Add override_config() method to StreamSink class - Update Arroyo Rust adapter to call override_config() for sources - Update Arroyo legacy adapter to call override_config() for sources - Add comprehensive test coverage (4 new tests) This follows the existing override_config() pattern used by GCSSink and maintains full backwards compatibility with existing pipelines. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
🤖 This preview updates automatically when you update the PR. |
| # Apply config overrides and validate | ||
| step_config: Mapping[str, Any] = self.config.get(step.name, {}) | ||
| step.override_config(step_config) | ||
| step.validate() |
There was a problem hiding this comment.
Bug: The legacy ArroyoAdapter.sink() method doesn't call override_config(), causing topic overrides for StreamSink to be silently ignored, unlike the new Rust adapter.
Severity: HIGH
Suggested Fix
In sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding the StreamSinkStep, call step.override_config() to apply any configuration overrides. This can be done by fetching the step_config from self.steps_config and passing it to step.override_config(step_config), mirroring the implementation in add_source() and the rust_arroyo.py adapter's sink() method.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/adapters/arroyo/adapter.py#L107-L110
Potential issue: The `ArroyoAdapter.sink()` method in `adapter.py` fails to call
`step.override_config()` before adding a `StreamSinkStep`. As a result, if a deployment
configuration provides a `topic` override for a `StreamSink`, it will be silently
ignored. The sink will continue to write to its hardcoded `stream_name` instead of the
intended, overridden topic. This creates an inconsistency with the `rust_arroyo.py`
adapter, which correctly handles this override, leading to different behavior depending
on which adapter is used.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| # Apply config overrides and validate | ||
| step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) | ||
| step.override_config(step_config) | ||
| step.validate() |
There was a problem hiding this comment.
Redundant config fetch in rust_arroyo source method
Low Severity
step_config on line 252 fetches the same value from self.steps_config as source_config on line 248, since source_name == step.name. The source_config is already asserted non-None on line 249, making the {} default on step_config unreachable dead code. The existing source_config variable could be passed directly to override_config() instead of performing a redundant dictionary lookup.
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) |
There was a problem hiding this comment.
Falsy topic values silently skip the override
Low Severity
Both StreamSource.override_config() and StreamSink.override_config() use if loaded_config.get("topic"): as a truthy check. If a deployment config explicitly sets topic to an empty string "", the override silently does not apply and the original stream_name is kept, with no warning. Using if "topic" in loaded_config: or if loaded_config.get("topic") is not None: would be more precise and consistent with the DevNullSink pattern (which uses is not None checks).
Additional Locations (1)
There was a problem hiding this comment.
We don't care about falsey topics.
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) |
There was a problem hiding this comment.
We don't care about falsey topics.
| step.validate() | ||
|
|
||
| assert isinstance(self.__write_healthcheck, bool) | ||
| self.__consumers[source_name] = ArroyoConsumer( |
There was a problem hiding this comment.
Bug: When a topic name is overridden, the new topic name is incorrectly used as the schema for the ArroyoConsumer, which will cause schema lookup failures downstream.
Severity: HIGH
Suggested Fix
Preserve the original logical stream name before any overrides are applied. Pass this original name to the ArroyoConsumer as the schema parameter, while continuing to use the potentially-overridden step.stream_name for the topic parameter.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L257
Potential issue: When a stream's topic name is overridden in the deployment
configuration (e.g., from "events" to "production-events-v2"), the `override_config`
function mutates the `step.stream_name`. This overridden name is then passed to the
`ArroyoConsumer` constructor as both the `topic` and the `schema`. While correct for the
`topic`, using the overridden name for the `schema` is incorrect. Downstream `Parser`
steps use the `schema` value to look up a message codec. Since the overridden topic name
(e.g., "production-events-v2") does not exist in the schema registry, the codec lookup
will fail, causing a `KeyError` at runtime.
Allow deployment config to override Kafka consumer group for streaming sources (same pattern as topic override in PR #257). Made-with: Cursor
Allow deployment configuration to override the Kafka consumer group name for streaming sources, following the same pattern as topic override in [PR #257](#257). **Changes** - **StreamSource**: Added `consumer_group` field (optional) and `override_config()` to apply `topic` and `consumer_group` from deployment config. - **StreamSink**: Added `override_config()` to apply `topic` from deployment config (for parity with the Rust adapter). - **Rust adapter**: Calls `override_config()` for sources before building the consumer; preserves logical stream name for schema/codec lookup when topic is overridden; passes `consumer_group` into `build_kafka_consumer_config()`. - **Legacy adapter**: Calls `override_config()` in `add_source()` and `sink()`; uses `step.consumer_group` or config `consumer_group` for `group_id` (fallback: `pipeline-{source_name}`). - **Config**: `KafkaConsumerConfig.consumer_group` is now optional (`NotRequired`) so deployment YAML can omit it. - **Tests**: Added `test_streamsource_override_config` and `test_streamsink_override_config` in `test_pipeline.py`. **Usage (deployment YAML)** ```yaml pipeline: segments: - steps_config: my_source: starts_segment: true bootstrap_servers: ["kafka:9092"] topic: "production-events-v2" # optional topic override consumer_group: "my-consumer-group" # optional consumer group override ``` Made with [Cursor](https://cursor.com)


Summary
Add the ability to override topic names for
StreamSourceandStreamSinksteps via deployment configuration using the"topic"key. The override is optional and falls back to the hardcodedstream_nameif not provided.Changes
override_config()method toStreamSourceclassoverride_config()method toStreamSinkclassoverride_config()for sourcesoverride_config()for sourcesImplementation Details
This follows the existing
override_config()pattern used byGCSSink, ensuring consistency across the codebase. The implementation maintains full backwards compatibility with existing pipelines.Usage Example
Pipeline Code (remains unchanged):
Deployment YAML (new optional configuration):
Result:
Test plan
🤖 Generated with Claude Code