From 25405a71bc715b0802e44582f3927a0dad40e61e Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 12 Mar 2026 19:21:41 -0700 Subject: [PATCH] feat(runner): add override config support with deep-merge - Add `deepmerge` utility to `pipeline/config.py` (ported from sentry_streams_k8s) - Update `load_config` to accept optional `override_path`; loads override YAML and deep-merges it into the base config before schema validation - Update `load_runtime_with_config_file` and `run_with_config_file` to accept `override_config: Optional[str] = None` (Rust CLI callers unaffected by default) - Replace `--config` (required) CLI option with `--config-path` (directory) and `--config-file` (direct path), mutually exclusive; add `--override-config` - Add tests and fixture for override config path Co-Authored-By: Claude Sonnet 4.6 --- .../sentry_streams/pipeline/config.py | 77 ++++++++++++++++++- sentry_streams/sentry_streams/runner.py | 47 ++++++++--- .../tests/fixtures/override_config.yaml | 7 ++ sentry_streams/tests/pipeline/test_config.py | 58 +++++++++++++- 4 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 sentry_streams/tests/fixtures/override_config.yaml diff --git a/sentry_streams/sentry_streams/pipeline/config.py b/sentry_streams/sentry_streams/pipeline/config.py index 52d6611b..126a3c42 100644 --- a/sentry_streams/sentry_streams/pipeline/config.py +++ b/sentry_streams/sentry_streams/pipeline/config.py @@ -1,8 +1,9 @@ +import copy import importlib.resources import json import os import re -from typing import Any, cast +from typing import Any, Optional, cast import jsonschema import yaml @@ -13,6 +14,70 @@ _ENVVAR_PATTERN = re.compile(r"^\$\{envvar:([A-Za-z_][A-Za-z0-9_]*)\}$") +class TypeMismatchError(TypeError): + """Raised when attempting to merge incompatible types in deepmerge.""" + + pass + + +class ScalarOverwriteError(ValueError): + """Raised when attempting to overwrite a scalar value during deepmerge.""" + + pass + + +def deepmerge( + base: dict[str, Any], + override: dict[str, Any], + fail_on_scalar_overwrite: bool = False, + _path: list[str] | None = None, +) -> dict[str, Any]: + """ + Deep merge two dictionaries. + + Merge semantics: + - Simple types (str, int, bool, None): override replaces base + - Dictionaries: recursively merge + - Lists: concatenate (append override elements to base) + - Type mismatches (e.g., dict + list, dict + str): raises TypeMismatchError + """ + if _path is None: + _path = [] + + result = copy.deepcopy(base) + + for key, override_value in override.items(): + current_path = _path + [key] + path_str = ".".join(current_path) + + if key not in result: + result[key] = copy.deepcopy(override_value) + else: + base_value = result[key] + + if isinstance(base_value, dict) and isinstance(override_value, dict): + result[key] = deepmerge( + base_value, + override_value, + fail_on_scalar_overwrite=fail_on_scalar_overwrite, + _path=current_path, + ) + elif isinstance(base_value, list) and isinstance(override_value, list): + result[key] = base_value + copy.deepcopy(override_value) + elif type(base_value) is not type(override_value): + raise TypeMismatchError( + f"Cannot merge key '{key}': base type is {type(base_value)} but override type is {type(override_value)}" + ) + else: + if fail_on_scalar_overwrite and base_value != override_value: + raise ScalarOverwriteError( + f"Cannot overwrite scalar at '{path_str}': would change {base_value!r} to {override_value!r}" + ) + result[key] = copy.deepcopy(override_value) + + return result + + class ConfigEnvError(Exception): """Raised when a referenced environment variable is not set.""" @@ -60,13 +125,21 @@ def _resolve_string(value: str) -> str | None: return os.environ[var_name] -def load_config(config_path: str) -> PipelineConfig: +def load_config(config_path: str, override_path: Optional[str] = None) -> PipelineConfig: """ Load a pipeline config file: read YAML, resolve ${envvar:...}, validate against schema. + + If override_path is provided, load the override YAML and deep-merge it into the base + config before schema validation. """ with open(config_path, "r") as f: config = yaml.safe_load(f) + if override_path is not None: + with open(override_path, "r") as f: + override = yaml.safe_load(f) + config = deepmerge(config, override) + resolve_envvars(config) schema_path = importlib.resources.files("sentry_streams") / "config.json" diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index be059d12..766aade6 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -1,7 +1,7 @@ -import importlib import logging import multiprocessing import sys +from pathlib import Path from typing import Any, Mapping, Optional, cast import click @@ -137,12 +137,13 @@ def load_runtime_with_config_file( name: str, log_level: str, adapter: str, - config: str, + config_file: str, segment_id: Optional[str], application: str, + override_config: Optional[str] = None, ) -> Any: """Load runtime from a config file path, returning the runtime object without calling run().""" - environment_config = load_config(config) + environment_config = load_config(config_file, override_config) sentry_sdk_config = environment_config.get("sentry_sdk_config") if sentry_sdk_config: @@ -155,9 +156,10 @@ def run_with_config_file( name: str, log_level: str, adapter: str, - config: str, + config_file: str, segment_id: Optional[str], application: str, + override_config: Optional[str] = None, ) -> None: """ Load runtime from config file and run it. Used by the Python CLI. @@ -169,7 +171,7 @@ def run_with_config_file( control when .run() is called """ runtime = load_runtime_with_config_file( - name, log_level, adapter, config, segment_id, application + name, log_level, adapter, config_file, segment_id, application, override_config ) runtime.run() @@ -205,12 +207,23 @@ def run_with_config_file( ), ) @click.option( - "--config", - required=True, + "--config-path", + default=None, help=( - "The deployment config file path. Each config file currently corresponds to a specific pipeline." + "Directory containing config files. The config file is resolved as " + "/.yaml. Mutually exclusive with --config-file." ), ) +@click.option( + "--config-file", + default=None, + help=("Direct path to the deployment config file. " "Mutually exclusive with --config-path."), +) +@click.option( + "--override-config", + default=None, + help="Path to an override YAML file to deep-merge on top of the base config.", +) @click.option( "--segment-id", "-s", @@ -225,11 +238,25 @@ def main( name: str, log_level: str, adapter: str, - config: str, + config_path: Optional[str], + config_file: Optional[str], + override_config: Optional[str], segment_id: Optional[str], application: str, ) -> None: - run_with_config_file(name, log_level, adapter, config, segment_id, application) + if config_path is None and config_file is None: + raise click.UsageError("One of --config-path or --config-file must be provided.") + if config_path is not None and config_file is not None: + raise click.UsageError("--config-path and --config-file are mutually exclusive.") + + if config_path is not None: + resolved_config_file = str(Path(config_path) / (Path(application).stem + ".yaml")) + else: + resolved_config_file = config_file # type: ignore[assignment] + + run_with_config_file( + name, log_level, adapter, resolved_config_file, segment_id, application, override_config + ) if __name__ == "__main__": diff --git a/sentry_streams/tests/fixtures/override_config.yaml b/sentry_streams/tests/fixtures/override_config.yaml new file mode 100644 index 00000000..7342e4b4 --- /dev/null +++ b/sentry_streams/tests/fixtures/override_config.yaml @@ -0,0 +1,7 @@ +metrics: + type: dummy +pipeline: + segments: + - steps_config: + myinput: + bootstrap_servers: ["override-broker:9092"] diff --git a/sentry_streams/tests/pipeline/test_config.py b/sentry_streams/tests/pipeline/test_config.py index 939c4f6f..608289cf 100644 --- a/sentry_streams/tests/pipeline/test_config.py +++ b/sentry_streams/tests/pipeline/test_config.py @@ -4,7 +4,12 @@ import pytest -from sentry_streams.pipeline.config import ConfigEnvError, load_config, resolve_envvars +from sentry_streams.pipeline.config import ( + ConfigEnvError, + TypeMismatchError, + load_config, + resolve_envvars, +) FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" @@ -105,3 +110,54 @@ def test_load_config_returns_pipeline_config() -> None: "127.0.0.1:9092", ] assert config.get("metrics", {}).get("type") == "dummy" + + +def test_load_config_with_override_merges_and_validates() -> None: + """load_config with override_path deep-merges the override and passes schema validation.""" + config_file = FIXTURES_DIR / "config_with_envvar.yaml" + override_file = FIXTURES_DIR / "override_config.yaml" + with mock.patch.dict(os.environ, {"BOOTSTRAP_SERVERS": "127.0.0.1:9092"}, clear=True): + config = load_config(str(config_file), override_path=str(override_file)) + assert "pipeline" in config + # The override appends its segment to the base segment list (list concatenation) + segments = config["pipeline"]["segments"] + assert len(segments) == 2 + # Second segment comes from the override + assert segments[1]["steps_config"]["myinput"]["bootstrap_servers"] == ["override-broker:9092"] + assert config.get("metrics", {}).get("type") == "dummy" + + +def test_load_config_without_override_no_regression() -> None: + """load_config without override_path behaves exactly as before.""" + config_file = FIXTURES_DIR / "config_with_envvar.yaml" + with mock.patch.dict(os.environ, {"BOOTSTRAP_SERVERS": "10.0.0.1:9092"}, clear=True): + config = load_config(str(config_file)) + segments = config["pipeline"]["segments"] + assert len(segments) == 1 + assert segments[0]["steps_config"]["myinput"]["bootstrap_servers"] == ["10.0.0.1:9092"] + + +def test_load_config_override_type_mismatch_raises() -> None: + """load_config raises TypeMismatchError when override has incompatible type.""" + import tempfile + + import yaml + + base_content = { + "metrics": {"type": "dummy"}, + "pipeline": {"segments": []}, + "env": {"key": {"nested": "value"}}, + } + override_content = {"env": {"key": "string_not_dict"}} + + with ( + tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as base_f, + tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as override_f, + ): + yaml.dump(base_content, base_f) + yaml.dump(override_content, override_f) + base_path = base_f.name + override_path = override_f.name + + with pytest.raises(TypeMismatchError): + load_config(base_path, override_path=override_path)