Skip to content

feat(streams): Add topic name override for StreamSource and StreamSink#257

Merged
fpacifici merged 2 commits intomainfrom
fpacifici/override_topic
Mar 18, 2026
Merged

feat(streams): Add topic name override for StreamSource and StreamSink#257
fpacifici merged 2 commits intomainfrom
fpacifici/override_topic

Conversation

@fpacifici
Copy link
Collaborator

Summary

Add the ability to override topic names for StreamSource and StreamSink steps via deployment configuration using the "topic" key. The override is optional and falls back to the hardcoded stream_name if not provided.

Changes

  • Add override_config() method to StreamSource class
  • Add override_config() method to StreamSink class
  • Update Arroyo Rust adapter to call override_config() for sources
  • Update Arroyo legacy adapter to call override_config() for sources
  • Add comprehensive test coverage (4 new tests)

Implementation Details

This follows the existing override_config() pattern used by GCSSink, ensuring consistency across the codebase. The implementation maintains full backwards compatibility with existing pipelines.

Usage Example

Pipeline Code (remains unchanged):

pipeline = (
    streaming_source(name="my_source", stream_name="events")
    .apply(Map(name="transform", function=transform_func))
    .sink(StreamSink(name="my_sink", stream_name="output"))
)

Deployment YAML (new optional configuration):

pipeline:
  segments:
    - steps_config:
        my_source:
          starts_segment: true
          bootstrap_servers: ["kafka:9092"]
          topic: "production-events-v2"  # Override source topic

        my_sink:
          bootstrap_servers: ["kafka:9092"]
          topic: "production-output-v2"  # Override sink topic

Result:

  • Source reads from "production-events-v2" (not "events")
  • Sink writes to "production-output-v2" (not "output")

Test plan

  • All 4 new unit tests pass
  • All 29 pipeline tests pass (no regressions)
  • Type checking passes with no errors
  • Pre-commit hooks pass

🤖 Generated with Claude Code

Add the ability to override topic names for StreamSource and StreamSink
steps via deployment configuration using the "topic" key. The override is
optional and falls back to the hardcoded stream_name if not provided.

Changes:
- Add override_config() method to StreamSource class
- Add override_config() method to StreamSink class
- Update Arroyo Rust adapter to call override_config() for sources
- Update Arroyo legacy adapter to call override_config() for sources
- Add comprehensive test coverage (4 new tests)

This follows the existing override_config() pattern used by GCSSink and
maintains full backwards compatibility with existing pipelines.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@fpacifici fpacifici requested a review from a team as a code owner March 13, 2026 02:34
@github-actions
Copy link

github-actions bot commented Mar 13, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (streams) Add topic name override for StreamSource and StreamSink by fpacifici in #257

🤖 This preview updates automatically when you update the PR.

Comment on lines +107 to +110
# Apply config overrides and validate
step_config: Mapping[str, Any] = self.config.get(step.name, {})
step.override_config(step_config)
step.validate()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The legacy ArroyoAdapter.sink() method doesn't call override_config(), causing topic overrides for StreamSink to be silently ignored, unlike the new Rust adapter.
Severity: HIGH

Suggested Fix

In sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding the StreamSinkStep, call step.override_config() to apply any configuration overrides. This can be done by fetching the step_config from self.steps_config and passing it to step.override_config(step_config), mirroring the implementation in add_source() and the rust_arroyo.py adapter's sink() method.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/adapter.py#L107-L110

Potential issue: The `ArroyoAdapter.sink()` method in `adapter.py` fails to call
`step.override_config()` before adding a `StreamSinkStep`. As a result, if a deployment
configuration provides a `topic` override for a `StreamSink`, it will be silently
ignored. The sink will continue to write to its hardcoded `stream_name` instead of the
intended, overridden topic. This creates an inconsistency with the `rust_arroyo.py`
adapter, which correctly handles this override, leading to different behavior depending
on which adapter is used.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.steps_config.get(step.name, {})
step.override_config(step_config)
step.validate()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant config fetch in rust_arroyo source method

Low Severity

step_config on line 252 fetches the same value from self.steps_config as source_config on line 248, since source_name == step.name. The source_config is already asserted non-None on line 249, making the {} default on step_config unreachable dead code. The existing source_config variable could be passed directly to override_config() instead of performing a redundant dictionary lookup.

Fix in Cursor Fix in Web

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falsy topic values silently skip the override

Low Severity

Both StreamSource.override_config() and StreamSink.override_config() use if loaded_config.get("topic"): as a truthy check. If a deployment config explicitly sets topic to an empty string "", the override silently does not apply and the original stream_name is kept, with no warning. Using if "topic" in loaded_config: or if loaded_config.get("topic") is not None: would be more precise and consistent with the DevNullSink pattern (which uses is not None checks).

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about falsey topics.

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about falsey topics.

@fpacifici fpacifici enabled auto-merge (squash) March 18, 2026 20:22
@fpacifici fpacifici merged commit 4c828db into main Mar 18, 2026
22 checks passed
step.validate()

assert isinstance(self.__write_healthcheck, bool)
self.__consumers[source_name] = ArroyoConsumer(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When a topic name is overridden, the new topic name is incorrectly used as the schema for the ArroyoConsumer, which will cause schema lookup failures downstream.
Severity: HIGH

Suggested Fix

Preserve the original logical stream name before any overrides are applied. Pass this original name to the ArroyoConsumer as the schema parameter, while continuing to use the potentially-overridden step.stream_name for the topic parameter.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L257

Potential issue: When a stream's topic name is overridden in the deployment
configuration (e.g., from "events" to "production-events-v2"), the `override_config`
function mutates the `step.stream_name`. This overridden name is then passed to the
`ArroyoConsumer` constructor as both the `topic` and the `schema`. While correct for the
`topic`, using the overridden name for the `schema` is incorrect. Downstream `Parser`
steps use the `schema` value to look up a message codec. Since the overridden topic name
(e.g., "production-events-v2") does not exist in the schema registry, the codec lookup
will fail, causing a `KeyError` at runtime.

fpacifici added a commit that referenced this pull request Mar 18, 2026
Allow deployment config to override Kafka consumer group for streaming sources (same pattern as topic override in PR #257).

Made-with: Cursor
fpacifici added a commit that referenced this pull request Mar 19, 2026
Allow deployment configuration to override the Kafka consumer group name
for streaming sources, following the same pattern as topic override in
[PR #257](#257).

**Changes**
- **StreamSource**: Added `consumer_group` field (optional) and
`override_config()` to apply `topic` and `consumer_group` from
deployment config.
- **StreamSink**: Added `override_config()` to apply `topic` from
deployment config (for parity with the Rust adapter).
- **Rust adapter**: Calls `override_config()` for sources before
building the consumer; preserves logical stream name for schema/codec
lookup when topic is overridden; passes `consumer_group` into
`build_kafka_consumer_config()`.
- **Legacy adapter**: Calls `override_config()` in `add_source()` and
`sink()`; uses `step.consumer_group` or config `consumer_group` for
`group_id` (fallback: `pipeline-{source_name}`).
- **Config**: `KafkaConsumerConfig.consumer_group` is now optional
(`NotRequired`) so deployment YAML can omit it.
- **Tests**: Added `test_streamsource_override_config` and
`test_streamsink_override_config` in `test_pipeline.py`.

**Usage (deployment YAML)**

```yaml
pipeline:
  segments:
    - steps_config:
        my_source:
          starts_segment: true
          bootstrap_servers: ["kafka:9092"]
          topic: "production-events-v2"      # optional topic override
          consumer_group: "my-consumer-group"  # optional consumer group override
```

Made with [Cursor](https://cursor.com)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants