Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions sentry_streams/sentry_streams/adapters/arroyo/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,29 @@ def add_source(self, step: Source[Any]) -> None:
assert isinstance(step, StreamSource), "Only Stream Sources are supported"
source_name = step.name

step_config: Mapping[str, Any] = self.config.get(source_name, {})
step.override_config(step_config)
step.validate()

if source_name not in self.__sources:

source_config = self.config.get(source_name)
assert source_config is not None, f"Config not provided for source {source_name}"

source_config = cast(KafkaConsumerConfig, source_config)
group_id = step.consumer_group or source_config.get(
"consumer_group", f"pipeline-{source_name}"
)

self.__sources[source_name] = KafkaConsumer(
build_kafka_consumer_configuration(
default_config=source_config.get("additional_settings", {}),
bootstrap_servers=source_config.get("bootstrap_servers", ["localhost: 9092"]),
auto_offset_reset=(source_config.get("auto_offset_reset", "latest")),
group_id=f"pipeline-{source_name}",
group_id=group_id,
)
)

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.config.get(step.name, {})
step.override_config(step_config)
step.validate()

self.__source_topics[source_name] = Topic(step.stream_name)

def get_topic(self, source: str) -> Topic:
Expand Down Expand Up @@ -165,6 +167,7 @@ def source(self, step: Source[Any]) -> Route:
# This is the Arroyo adapter, and it only supports consuming from StreamSource anyways
assert isinstance(step, StreamSource)

# schema is logical stream name for codec lookup (StreamSources.add_source already applied overrides)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy adapter loses logical schema name after topic override

High Severity

When a topic override is configured, add_source() calls step.override_config() which mutates step.stream_name to the overridden topic. Back in source(), ArroyoConsumer receives step.stream_name for both stream_name and schema, so the schema becomes the overridden topic name (e.g., "production-events-v2") instead of the original logical name (e.g., "events"). This breaks codec lookups in _get_codec_from_msg(), which calls get_codec(stream_schema) with the wrong name. The Rust adapter correctly captures schema_name = step.stream_name before calling override_config, but this legacy adapter does not.

Additional Locations (1)
Fix in Cursor Fix in Web

self.__consumers[source_name] = ArroyoConsumer(
source_name, step.stream_name, step.stream_name, step.header_filter
)
Expand All @@ -184,17 +187,20 @@ def sink(self, step: Sink[Any], stream: Route) -> Route:
assert isinstance(step, StreamSink), "Only Stream Sinks are supported"

sink_name = step.name
step_override_config: Mapping[str, Any] = self.steps_config.get(sink_name, {})
step.override_config(step_override_config)

if sink_name not in self.__sinks:

sink_config = self.steps_config.get(sink_name)
assert sink_config is not None, f"Config not provided for sink {sink_name}"
producer_config = self.steps_config.get(sink_name)
assert producer_config is not None, f"Config not provided for sink {sink_name}"

sink_config = cast(KafkaProducerConfig, sink_config)
producer_config = cast(KafkaProducerConfig, producer_config)

producer = KafkaProducer(
build_kafka_configuration(
default_config=sink_config.get("additional_settings", {}),
bootstrap_servers=sink_config.get("bootstrap_servers", "localhost:9092"),
default_config=producer_config.get("additional_settings", {}),
bootstrap_servers=producer_config.get("bootstrap_servers", "localhost:9092"),
)
)
else:
Expand Down
21 changes: 15 additions & 6 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,19 @@ def build_initial_offset(offset_reset: str) -> InitialOffset:
raise ValueError(f"Invalid offset reset value: {offset_reset}")


def build_kafka_consumer_config(source: str, source_config: StepConfig) -> PyKafkaConsumerConfig:
def build_kafka_consumer_config(
source: str,
source_config: StepConfig,
consumer_group_override: str | None = None,
) -> PyKafkaConsumerConfig:
"""
Build the Kafka consumer configuration for the source.
"""
consumer_config = cast(KafkaConsumerConfig, source_config)
bootstrap_servers = consumer_config["bootstrap_servers"]
group_id = f"pipeline-{source}"
group_id = (
consumer_group_override or consumer_config.get("consumer_group") or f"pipeline-{source}"
)
auto_offset_reset = build_initial_offset(consumer_config.get("auto_offset_reset", "latest"))
strict_offset_reset = bool(consumer_config.get("strict_offset_reset", False))
override_params = cast(Mapping[str, str], consumer_config.get("override_params", {}))
Expand Down Expand Up @@ -248,17 +254,20 @@ def source(self, step: Source[Any]) -> Route:
source_config = self.steps_config.get(source_name)
assert source_config is not None, f"Config not provided for source {source_name}"

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.steps_config.get(step.name, {})
# Apply config overrides (topic, consumer_group); preserve logical stream for schema
step_config: Mapping[str, Any] = self.steps_config.get(source_name, {})
schema_name = step.stream_name # Logical name for schema/codec lookup (before override)
step.override_config(step_config)
step.validate()

assert isinstance(self.__write_healthcheck, bool)
self.__consumers[source_name] = ArroyoConsumer(
source=source_name,
kafka_config=build_kafka_consumer_config(source_name, source_config),
kafka_config=build_kafka_consumer_config(
source_name, source_config, step.consumer_group
),
topic=step.stream_name,
schema=step.stream_name,
schema=schema_name,
metric_config=self.__metric_config,
write_healthcheck=self.__write_healthcheck,
)
Expand Down
4 changes: 2 additions & 2 deletions sentry_streams/sentry_streams/config_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Mapping, Optional, Sequence, TypedDict
from typing import Any, Mapping, NotRequired, Optional, Sequence, TypedDict


class StepConfig(TypedDict):
Expand All @@ -12,7 +12,7 @@ class StepConfig(TypedDict):
class KafkaConsumerConfig(TypedDict, StepConfig):
bootstrap_servers: Sequence[str]
auto_offset_reset: str
consumer_group: str
consumer_group: NotRequired[str]
additional_settings: Mapping[str, Any]


Expand Down
5 changes: 4 additions & 1 deletion sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,18 @@ class StreamSource(Source[bytes]):

stream_name: str
header_filter: Optional[Tuple[str, bytes]] = None
consumer_group: Optional[str] = None
step_type: StepType = StepType.SOURCE

def register(self, ctx: Pipeline[bytes], previous: Step) -> None:
super().register(ctx, previous)

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
"""Override topic and consumer_group from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
if loaded_config.get("consumer_group"):
self.consumer_group = str(loaded_config.get("consumer_group"))


@dataclass
Expand Down
25 changes: 16 additions & 9 deletions sentry_streams/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,20 @@ def test_gcssink_override_config_empty() -> None:


def test_streamsource_override_config() -> None:
"""Test that StreamSource topic can be overridden from deployment config."""
"""Test that StreamSource topic and consumer_group can be overridden from deployment config."""
from sentry_streams.pipeline.pipeline import StreamSource

source = StreamSource(name="my_source", stream_name="original-topic")
source = StreamSource(name="my_source", stream_name="events")
assert source.stream_name == "events"
assert source.consumer_group is None

config = {"topic": "overridden-topic"}
source.override_config(config)
source.override_config({"topic": "production-events-v2", "consumer_group": "my-consumer-group"})
assert source.stream_name == "production-events-v2"
assert source.consumer_group == "my-consumer-group"

assert source.stream_name == "overridden-topic"
source.override_config({})
assert source.stream_name == "production-events-v2"
assert source.consumer_group == "my-consumer-group"


def test_streamsource_override_config_empty() -> None:
Expand All @@ -554,12 +559,14 @@ def test_streamsink_override_config() -> None:
"""Test that StreamSink topic can be overridden from deployment config."""
from sentry_streams.pipeline.pipeline import StreamSink

sink = StreamSink[str](name="my_sink", stream_name="original-topic")
sink = StreamSink[str](name="my_sink", stream_name="output")
assert sink.stream_name == "output"

config = {"topic": "overridden-topic"}
sink.override_config(config)
sink.override_config({"topic": "production-output-v2"})
assert sink.stream_name == "production-output-v2"

assert sink.stream_name == "overridden-topic"
sink.override_config({})
assert sink.stream_name == "production-output-v2"


def test_streamsink_override_config_empty() -> None:
Expand Down
Loading