Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def add_source(self, step: Source[Any]) -> None:
)
)

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.config.get(step.name, {})
step.override_config(step_config)
step.validate()
Comment on lines +107 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The legacy ArroyoAdapter.sink() method doesn't call override_config(), causing topic overrides for StreamSink to be silently ignored, unlike the new Rust adapter.
Severity: HIGH

Suggested Fix

In sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding the StreamSinkStep, call step.override_config() to apply any configuration overrides. This can be done by fetching the step_config from self.steps_config and passing it to step.override_config(step_config), mirroring the implementation in add_source() and the rust_arroyo.py adapter's sink() method.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/adapter.py#L107-L110

Potential issue: The `ArroyoAdapter.sink()` method in `adapter.py` fails to call
`step.override_config()` before adding a `StreamSinkStep`. As a result, if a deployment
configuration provides a `topic` override for a `StreamSink`, it will be silently
ignored. The sink will continue to write to its hardcoded `stream_name` instead of the
intended, overridden topic. This creates an inconsistency with the `rust_arroyo.py`
adapter, which correctly handles this override, leading to different behavior depending
on which adapter is used.

Did we get this right? 👍 / 👎 to inform future reviews.


self.__source_topics[source_name] = Topic(step.stream_name)

def get_topic(self, source: str) -> Topic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ def source(self, step: Source[Any]) -> Route:
source_config = self.steps_config.get(source_name)
assert source_config is not None, f"Config not provided for source {source_name}"

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.steps_config.get(step.name, {})
step.override_config(step_config)
step.validate()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant config fetch in rust_arroyo source method

Low Severity

step_config on line 252 fetches the same value from self.steps_config as source_config on line 248, since source_name == step.name. The source_config is already asserted non-None on line 249, making the {} default on step_config unreachable dead code. The existing source_config variable could be passed directly to override_config() instead of performing a redundant dictionary lookup.

Fix in Cursor Fix in Web


assert isinstance(self.__write_healthcheck, bool)
self.__consumers[source_name] = ArroyoConsumer(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When a topic name is overridden, the new topic name is incorrectly used as the schema for the ArroyoConsumer, which will cause schema lookup failures downstream.
Severity: HIGH

Suggested Fix

Preserve the original logical stream name before any overrides are applied. Pass this original name to the ArroyoConsumer as the schema parameter, while continuing to use the potentially-overridden step.stream_name for the topic parameter.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L257

Potential issue: When a stream's topic name is overridden in the deployment
configuration (e.g., from "events" to "production-events-v2"), the `override_config`
function mutates the `step.stream_name`. This overridden name is then passed to the
`ArroyoConsumer` constructor as both the `topic` and the `schema`. While correct for the
`topic`, using the overridden name for the `schema` is incorrect. Downstream `Parser`
steps use the `schema` value to look up a message codec. Since the overridden topic name
(e.g., "production-events-v2") does not exist in the schema registry, the codec lookup
will fail, causing a `KeyError` at runtime.

source=source_name,
Expand Down
10 changes: 10 additions & 0 deletions sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ class StreamSource(Source[bytes]):
def register(self, ctx: Pipeline[bytes], previous: Step) -> None:
super().register(ctx, previous)

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falsy topic values silently skip the override

Low Severity

Both StreamSource.override_config() and StreamSink.override_config() use if loaded_config.get("topic"): as a truthy check. If a deployment config explicitly sets topic to an empty string "", the override silently does not apply and the original stream_name is kept, with no warning. Using if "topic" in loaded_config: or if loaded_config.get("topic") is not None: would be more precise and consistent with the DevNullSink pattern (which uses is not None checks).

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about falsey topics.



@dataclass
class WithInput(Step, Generic[TIn]):
Expand Down Expand Up @@ -309,6 +314,11 @@ class StreamSink(Sink[TIn]):
stream_name: str
step_type: StepType = StepType.SINK

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))


@dataclass
class DevNullSink(Sink[TIn]):
Expand Down
48 changes: 48 additions & 0 deletions sentry_streams/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,51 @@ def test_gcssink_override_config_empty() -> None:

assert sink.bucket == "original-bucket"
assert sink.thread_count == 3


def test_streamsource_override_config() -> None:
"""Test that StreamSource topic can be overridden from deployment config."""
from sentry_streams.pipeline.pipeline import StreamSource

source = StreamSource(name="my_source", stream_name="original-topic")

config = {"topic": "overridden-topic"}
source.override_config(config)

assert source.stream_name == "overridden-topic"


def test_streamsource_override_config_empty() -> None:
"""Test that StreamSource handles empty config correctly."""
from sentry_streams.pipeline.pipeline import StreamSource

source = StreamSource(name="my_source", stream_name="original-topic")

config: Mapping[str, Any] = {}
source.override_config(config)

assert source.stream_name == "original-topic"


def test_streamsink_override_config() -> None:
"""Test that StreamSink topic can be overridden from deployment config."""
from sentry_streams.pipeline.pipeline import StreamSink

sink = StreamSink[str](name="my_sink", stream_name="original-topic")

config = {"topic": "overridden-topic"}
sink.override_config(config)

assert sink.stream_name == "overridden-topic"


def test_streamsink_override_config_empty() -> None:
"""Test that StreamSink handles empty config correctly."""
from sentry_streams.pipeline.pipeline import StreamSink

sink = StreamSink[str](name="my_sink", stream_name="original-topic")

config: Mapping[str, Any] = {}
sink.override_config(config)

assert sink.stream_name == "original-topic"
2 changes: 1 addition & 1 deletion sentry_streams/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading