From 5054112f16b2a513802c9ccf61b7e55628ae7a10 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 22:14:12 +0100 Subject: [PATCH 1/7] feat: vulture for checking dead code in backend --- .github/workflows/vulture.yml | 32 ++++++++++++++++++++++++++++++++ .pre-commit-config.yaml | 8 ++++++++ backend/pyproject.toml | 11 ++++++++++- backend/uv.lock | 11 +++++++++++ backend/vulture_whitelist.py | 8 ++++++++ 5 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/vulture.yml create mode 100644 backend/vulture_whitelist.py diff --git a/.github/workflows/vulture.yml b/.github/workflows/vulture.yml new file mode 100644 index 00000000..9d4c1e56 --- /dev/null +++ b/.github/workflows/vulture.yml @@ -0,0 +1,32 @@ +name: Dead Code Detection + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + vulture: + name: Vulture Dead Code Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - name: Set up uv + uses: astral-sh/setup-uv@v7 + with: + enable-cache: true + cache-dependency-glob: "backend/uv.lock" + + - name: Install Python and dependencies + run: | + cd backend + uv python install 3.12 + uv sync --frozen + + - name: Run vulture + run: | + cd backend + uv run vulture app/ vulture_whitelist.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a31b2e04..af38e0b0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,6 +22,14 @@ repos: files: ^backend/.*\.py$ pass_filenames: false + # Vulture - matches CI: cd backend && uv run vulture app/ vulture_whitelist.py + - id: vulture-backend + name: vulture dead code (backend) + entry: bash -c 'cd backend && uv run vulture app/ vulture_whitelist.py' + language: system + files: ^backend/.*\.py$ + pass_filenames: false + # ESLint - matches CI: cd frontend && npx eslint src - id: eslint-frontend name: eslint (frontend) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index ba26ca9b..b1a96720 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -150,6 +150,7 @@ dev = [ "pytest-xdist==3.6.1", "ruff==0.14.10", "types-cachetools==6.2.0.20250827", + "vulture==2.14", ] # Ruff configuration @@ -166,7 +167,8 @@ exclude = [ "**/venv/**", "**/.venv/**", "**/site-packages/**", - "**/.claude/**" + "**/.claude/**", + "vulture_whitelist.py" ] [tool.ruff.lint.flake8-bugbear] @@ -183,6 +185,7 @@ warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true disable_error_code = ["import-untyped", "import-not-found"] +exclude = ["vulture_whitelist\\.py$"] plugins = ["pydantic.mypy"] [tool.pydantic-mypy] @@ -224,3 +227,9 @@ OTEL_SDK_DISABLED = "true" # Prevents teardown delays from OTLP exporter retrie [tool.coverage.run] # Use sysmon for faster coverage (requires Python 3.12+) core = "sysmon" + +# Vulture dead code detection +[tool.vulture] +min_confidence = 80 +paths = ["app"] +exclude = ["tests"] diff --git a/backend/uv.lock b/backend/uv.lock index 6576b413..7a8464c6 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -1176,6 +1176,7 @@ dev = [ { name = "pytest-xdist" }, { name = "ruff" }, { name = "types-cachetools" }, + { name = "vulture" }, ] [package.metadata] @@ -1319,6 +1320,7 @@ dev = [ { name = "pytest-xdist", specifier = "==3.6.1" }, { name = "ruff", specifier = "==0.14.10" }, { name = "types-cachetools", specifier = "==6.2.0.20250827" }, + { name = "vulture", specifier = "==2.14" }, ] [[package]] @@ -3029,6 +3031,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/4b/4cef6ce21a2aaca9d852a6e84ef4f135d99fcd74fa75105e2fc0c8308acd/uvicorn-0.34.2-py3-none-any.whl", hash = "sha256:deb49af569084536d269fe0a6d67e3754f104cf03aba7c11c40f01aadf33c403", size = 62483, upload-time = "2025-04-19T06:02:48.42Z" }, ] +[[package]] +name = "vulture" +version = "2.14" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/8e/25/925f35db758a0f9199113aaf61d703de891676b082bd7cf73ea01d6000f7/vulture-2.14.tar.gz", hash = "sha256:cb8277902a1138deeab796ec5bef7076a6e0248ca3607a3f3dee0b6d9e9b8415", size = 58823, upload-time = "2024-12-08T17:39:43.319Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/56/0cc15b8ff2613c1d5c3dc1f3f576ede1c43868c1bc2e5ccaa2d4bcd7974d/vulture-2.14-py2.py3-none-any.whl", hash = "sha256:d9a90dba89607489548a49d557f8bac8112bd25d3cbc8aeef23e860811bd5ed9", size = 28915, upload-time = "2024-12-08T17:39:40.573Z" }, +] + [[package]] name = "websocket-client" version = "1.8.0" diff --git a/backend/vulture_whitelist.py b/backend/vulture_whitelist.py new file mode 100644 index 00000000..09e7763e --- /dev/null +++ b/backend/vulture_whitelist.py @@ -0,0 +1,8 @@ +"""Vulture whitelist — framework-required code that appears unused at the AST level. + +Structlog processors must accept (logger, method_name, event_dict) by convention, +even when `method_name` is unused in the body. +""" + +method_name # unused variable (app/core/logging.py:36) +method_name # unused variable (app/core/logging.py:53) From 5c249718a920b545b6d2b68b513934d3c6118692 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 22:18:55 +0100 Subject: [PATCH 2/7] fix: removed useless code from mappings --- backend/app/infrastructure/kafka/mappings.py | 4 ---- backend/tests/unit/events/test_mappings_and_types.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/backend/app/infrastructure/kafka/mappings.py b/backend/app/infrastructure/kafka/mappings.py index c7597db2..dc339728 100644 --- a/backend/app/infrastructure/kafka/mappings.py +++ b/backend/app/infrastructure/kafka/mappings.py @@ -98,10 +98,6 @@ def get_topic_for_event(event_type: EventType) -> KafkaTopic: return EVENT_TYPE_TO_TOPIC.get(event_type, KafkaTopic.SYSTEM_EVENTS) -def get_event_types_for_topic(topic: KafkaTopic) -> list[EventType]: - """Get all event types that publish to a given topic.""" - return [et for et, t in EVENT_TYPE_TO_TOPIC.items() if t == topic] - CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[KafkaTopic]] = { GroupId.EXECUTION_COORDINATOR: { diff --git a/backend/tests/unit/events/test_mappings_and_types.py b/backend/tests/unit/events/test_mappings_and_types.py index bc7cfb10..ce32da44 100644 --- a/backend/tests/unit/events/test_mappings_and_types.py +++ b/backend/tests/unit/events/test_mappings_and_types.py @@ -1,7 +1,6 @@ from app.domain.enums import EventType, KafkaTopic from app.infrastructure.kafka.mappings import ( get_event_class_for_type, - get_event_types_for_topic, get_topic_for_event, ) @@ -11,6 +10,3 @@ def test_event_mappings_topics() -> None: assert get_topic_for_event(EventType.EXECUTION_REQUESTED) == KafkaTopic.EXECUTION_EVENTS cls = get_event_class_for_type(EventType.CREATE_POD_COMMAND) assert cls is not None - # All event types for a topic include at least one of the checked types - ev_types = get_event_types_for_topic(KafkaTopic.EXECUTION_EVENTS) - assert EventType.EXECUTION_REQUESTED in ev_types From d3f7d6a52fddb35e128cc15531869a7904e9bd79 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 22:47:01 +0100 Subject: [PATCH 3/7] fix: removed (allegedly) unused code --- backend/app/core/dishka_lifespan.py | 1 - backend/app/db/docs/replay.py | 12 - .../admin/admin_events_repository.py | 13 +- backend/app/db/repositories/dlq_repository.py | 22 -- .../db/repositories/execution_repository.py | 12 - .../repositories/user_settings_repository.py | 3 - backend/app/dlq/manager.py | 3 - backend/app/domain/events/event_models.py | 29 -- backend/app/domain/saga/exceptions.py | 9 - backend/app/domain/saga/models.py | 14 - backend/app/domain/user/settings_models.py | 13 - backend/app/domain/user/user_models.py | 10 - backend/app/events/consumer_group_monitor.py | 359 ------------------ backend/app/events/core/producer.py | 44 --- backend/app/schemas_pydantic/execution.py | 17 - backend/app/schemas_pydantic/notification.py | 52 --- backend/app/schemas_pydantic/replay.py | 6 - backend/app/schemas_pydantic/saved_script.py | 10 - backend/app/schemas_pydantic/user.py | 15 +- backend/app/schemas_pydantic/user_settings.py | 16 - backend/app/services/event_service.py | 64 ---- .../services/idempotency/redis_repository.py | 6 - backend/app/services/k8s_worker/worker.py | 13 - .../app/services/pod_monitor/event_mapper.py | 3 - .../result_processor/resource_cleaner.py | 107 +----- backend/app/services/saga/saga_step.py | 27 -- backend/app/services/user_settings_service.py | 5 - .../db/repositories/test_dlq_repository.py | 3 - .../repositories/test_execution_repository.py | 9 +- backend/tests/e2e/dlq/test_dlq_discard.py | 141 ------- backend/tests/e2e/dlq/test_dlq_retry.py | 205 ---------- .../e2e/events/test_consumer_group_monitor.py | 20 - .../test_consumer_group_monitor_real.py | 106 ------ .../e2e/events/test_producer_roundtrip.py | 10 +- .../idempotency/test_redis_repository.py | 7 - .../test_user_settings_service.py | 21 - backend/tests/e2e/test_resource_cleaner.py | 84 ---- .../test_notification_schemas.py | 15 +- .../services/saga/test_saga_comprehensive.py | 8 - .../services/saga/test_saga_step_and_base.py | 22 -- 40 files changed, 6 insertions(+), 1530 deletions(-) delete mode 100644 backend/tests/e2e/dlq/test_dlq_discard.py delete mode 100644 backend/tests/e2e/dlq/test_dlq_retry.py delete mode 100644 backend/tests/e2e/events/test_consumer_group_monitor.py delete mode 100644 backend/tests/e2e/events/test_consumer_group_monitor_real.py delete mode 100644 backend/tests/e2e/test_resource_cleaner.py diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py index 76f6f4b2..3eef6169 100644 --- a/backend/app/core/dishka_lifespan.py +++ b/backend/app/core/dishka_lifespan.py @@ -62,7 +62,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Get unstarted broker from DI (BrokerProvider yields without starting) broker: KafkaBroker = await container.get(KafkaBroker) - app.state.kafka_broker = broker # Register subscribers BEFORE broker.start() - FastStream requirement register_sse_subscriber(broker, settings) diff --git a/backend/app/db/docs/replay.py b/backend/app/db/docs/replay.py index eb784dc1..6f923a5b 100644 --- a/backend/app/db/docs/replay.py +++ b/backend/app/db/docs/replay.py @@ -69,18 +69,6 @@ class ReplaySessionDocument(Document): model_config = ConfigDict(from_attributes=True) - @property - def progress_percentage(self) -> float: - """Calculate progress percentage.""" - if self.total_events == 0: - return 0.0 - return round((self.replayed_events / self.total_events) * 100, 2) - - @property - def is_completed(self) -> bool: - """Check if session is completed.""" - return self.status in [ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED] - @property def is_running(self) -> bool: """Check if session is running.""" diff --git a/backend/app/db/repositories/admin/admin_events_repository.py b/backend/app/db/repositories/admin/admin_events_repository.py index 5b4846d5..0dfaa715 100644 --- a/backend/app/db/repositories/admin/admin_events_repository.py +++ b/backend/app/db/repositories/admin/admin_events_repository.py @@ -25,7 +25,7 @@ HourlyEventCount, UserEventCount, ) -from app.domain.replay import ReplayFilter, ReplaySessionState +from app.domain.replay import ReplayFilter class AdminEventsRepository: @@ -210,17 +210,6 @@ async def archive_event(self, event: DomainEvent, deleted_by: str) -> bool: await archive_doc.insert() return True - async def create_replay_session(self, session: ReplaySessionState) -> str: - doc = ReplaySessionDocument(**session.model_dump()) - await doc.insert() - return session.session_id - - async def get_replay_session(self, session_id: str) -> ReplaySessionState | None: - doc = await ReplaySessionDocument.find_one(ReplaySessionDocument.session_id == session_id) - if not doc: - return None - return ReplaySessionState.model_validate(doc) - async def update_replay_session(self, session_id: str, updates: ReplaySessionUpdate) -> bool: update_dict = updates.model_dump(exclude_none=True) if not update_dict: diff --git a/backend/app/db/repositories/dlq_repository.py b/backend/app/db/repositories/dlq_repository.py index f697685d..b228ce51 100644 --- a/backend/app/db/repositories/dlq_repository.py +++ b/backend/app/db/repositories/dlq_repository.py @@ -149,25 +149,3 @@ async def get_queue_sizes_by_topic(self) -> dict[str, int]: result[row["_id"]] = row["count"] return result - async def mark_message_retried(self, event_id: str) -> bool: - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - if not doc: - return False - now = datetime.now(timezone.utc) - doc.status = DLQMessageStatus.RETRIED - doc.retried_at = now - doc.last_updated = now - await doc.save() - return True - - async def mark_message_discarded(self, event_id: str, reason: str) -> bool: - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - if not doc: - return False - now = datetime.now(timezone.utc) - doc.status = DLQMessageStatus.DISCARDED - doc.discarded_at = now - doc.discard_reason = reason - doc.last_updated = now - await doc.save() - return True diff --git a/backend/app/db/repositories/execution_repository.py b/backend/app/db/repositories/execution_repository.py index f8004ded..a90af22c 100644 --- a/backend/app/db/repositories/execution_repository.py +++ b/backend/app/db/repositories/execution_repository.py @@ -8,7 +8,6 @@ from app.domain.execution import ( DomainExecution, DomainExecutionCreate, - DomainExecutionUpdate, ExecutionResultDomain, ) @@ -34,17 +33,6 @@ async def get_execution(self, execution_id: str) -> DomainExecution | None: self.logger.info("Found execution in MongoDB", execution_id=execution_id) return DomainExecution.model_validate(doc) - async def update_execution(self, execution_id: str, update_data: DomainExecutionUpdate) -> bool: - doc = await ExecutionDocument.find_one(ExecutionDocument.execution_id == execution_id) - if not doc: - return False - - update_dict = update_data.model_dump(exclude_none=True) - if update_dict: - update_dict["updated_at"] = datetime.now(timezone.utc) - await doc.set(update_dict) - return True - async def write_terminal_result(self, result: ExecutionResultDomain) -> bool: doc = await ExecutionDocument.find_one(ExecutionDocument.execution_id == result.execution_id) if not doc: diff --git a/backend/app/db/repositories/user_settings_repository.py b/backend/app/db/repositories/user_settings_repository.py index 6002c26c..08639ba8 100644 --- a/backend/app/db/repositories/user_settings_repository.py +++ b/backend/app/db/repositories/user_settings_repository.py @@ -65,9 +65,6 @@ async def count_events_since_snapshot(self, user_id: str) -> int: GT(EventDocument.timestamp, snapshot.updated_at), ).count() - async def count_events_for_user(self, user_id: str) -> int: - return await EventDocument.find(EventDocument.aggregate_id == f"user_settings_{user_id}").count() - async def delete_user_settings(self, user_id: str) -> None: doc = await UserSettingsSnapshotDocument.find_one(UserSettingsSnapshotDocument.user_id == user_id) if doc: diff --git a/backend/app/dlq/manager.py b/backend/app/dlq/manager.py index 05ef2284..6ff600a2 100644 --- a/backend/app/dlq/manager.py +++ b/backend/app/dlq/manager.py @@ -209,9 +209,6 @@ async def update_queue_metrics(self) -> None: def set_retry_policy(self, topic: str, policy: RetryPolicy) -> None: self._retry_policies[topic] = policy - def add_filter(self, filter_func: Callable[[DLQMessage], bool]) -> None: - self._filters.append(filter_func) - async def retry_message_manually(self, event_id: str) -> bool: message = await self.repository.get_message_by_id(event_id) if not message: diff --git a/backend/app/domain/events/event_models.py b/backend/app/domain/events/event_models.py index e1f5c7a7..7561054f 100644 --- a/backend/app/domain/events/event_models.py +++ b/backend/app/domain/events/event_models.py @@ -3,7 +3,6 @@ from pydantic import BaseModel, ConfigDict, Field -from app.core.utils import StringEnum from app.domain.enums import EventType from app.domain.events.typed import DomainEvent, EventMetadata @@ -11,24 +10,6 @@ MongoQuery = dict[str, MongoQueryValue] -class CollectionNames(StringEnum): - EVENTS = "events" - EVENT_STORE = "event_store" - REPLAY_SESSIONS = "replay_sessions" - EVENTS_ARCHIVE = "events_archive" - RESOURCE_ALLOCATIONS = "resource_allocations" - USERS = "users" - EXECUTIONS = "executions" - EXECUTION_RESULTS = "execution_results" - SAVED_SCRIPTS = "saved_scripts" - NOTIFICATIONS = "notifications" - NOTIFICATION_SUBSCRIPTIONS = "notification_subscriptions" - USER_SETTINGS = "user_settings" - USER_SETTINGS_SNAPSHOTS = "user_settings_snapshots" - SAGAS = "sagas" - DLQ_MESSAGES = "dlq_messages" - - class EventSummary(BaseModel): """Lightweight event summary for lists and previews.""" @@ -167,16 +148,6 @@ class ExecutionEventsResult(BaseModel): access_allowed: bool include_system_events: bool - def get_filtered_events(self) -> list[DomainEvent]: - """Get events filtered based on access and system event settings.""" - if not self.access_allowed: - return [] - - events = self.events - if not self.include_system_events: - events = [e for e in events if not e.metadata.service_name.startswith("system-")] - - return events class EventExportRow(BaseModel): diff --git a/backend/app/domain/saga/exceptions.py b/backend/app/domain/saga/exceptions.py index ebb0788c..edcd1943 100644 --- a/backend/app/domain/saga/exceptions.py +++ b/backend/app/domain/saga/exceptions.py @@ -28,15 +28,6 @@ def __init__(self, saga_id: str, current_state: SagaState, operation: str) -> No super().__init__(f"Cannot {operation} saga '{saga_id}' in state '{current_state}'") -class SagaCompensationError(InfrastructureError): - """Raised when saga compensation fails.""" - - def __init__(self, saga_id: str, step: str, reason: str) -> None: - self.saga_id = saga_id - self.step = step - super().__init__(f"Compensation failed for saga '{saga_id}' at step '{step}': {reason}") - - class SagaTimeoutError(InfrastructureError): """Raised when a saga times out.""" diff --git a/backend/app/domain/saga/models.py b/backend/app/domain/saga/models.py index 86303031..e0ede7f6 100644 --- a/backend/app/domain/saga/models.py +++ b/backend/app/domain/saga/models.py @@ -94,20 +94,6 @@ class SagaCancellationResult(BaseModel): saga_id: str -class SagaStatistics(BaseModel): - """Saga statistics.""" - - model_config = ConfigDict(from_attributes=True) - - total_sagas: int - sagas_by_state: dict[str, int] = Field(default_factory=dict) - sagas_by_name: dict[str, int] = Field(default_factory=dict) - average_duration_seconds: float = 0.0 - success_rate: float = 0.0 - failure_rate: float = 0.0 - compensation_rate: float = 0.0 - - class SagaConfig(BaseModel): """Configuration for saga orchestration (domain).""" diff --git a/backend/app/domain/user/settings_models.py b/backend/app/domain/user/settings_models.py index 9c7aef69..cc16917b 100644 --- a/backend/app/domain/user/settings_models.py +++ b/backend/app/domain/user/settings_models.py @@ -57,16 +57,6 @@ class DomainUserSettingsUpdate(BaseModel): custom_settings: dict[str, Any] | None = None -class DomainSettingChange(BaseModel): - model_config = ConfigDict(from_attributes=True) - - field_path: str - old_value: Any - new_value: Any - changed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - change_reason: str | None = None - - class DomainUserSettingsChangedEvent(BaseModel): """Well-typed domain event for user settings changes.""" @@ -105,6 +95,3 @@ class CachedSettings(BaseModel): settings: DomainUserSettings expires_at: datetime - - def is_expired(self) -> bool: - return datetime.now(timezone.utc) > self.expires_at diff --git a/backend/app/domain/user/user_models.py b/backend/app/domain/user/user_models.py index 1f9b16ce..457adb44 100644 --- a/backend/app/domain/user/user_models.py +++ b/backend/app/domain/user/user_models.py @@ -71,16 +71,6 @@ class UserUpdate(BaseModel): is_active: bool | None = None password: str | None = None - def has_updates(self) -> bool: - return any( - [ - self.username is not None, - self.email is not None, - self.role is not None, - self.is_active is not None, - self.password is not None, - ] - ) class UserListResult(BaseModel): diff --git a/backend/app/events/consumer_group_monitor.py b/backend/app/events/consumer_group_monitor.py index d6da44f9..5f81d27f 100644 --- a/backend/app/events/consumer_group_monitor.py +++ b/backend/app/events/consumer_group_monitor.py @@ -1,17 +1,12 @@ -import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any -from aiokafka import AIOKafkaConsumer, TopicPartition -from aiokafka.admin import AIOKafkaAdminClient from aiokafka.protocol.api import Response from aiokafka.protocol.group import MemberAssignment -from aiokafka.structs import OffsetAndMetadata from app.core.utils import StringEnum -from app.settings import Settings class ConsumerGroupHealth(StringEnum): @@ -128,358 +123,4 @@ def _state_from_string(state_str: str) -> ConsumerGroupState: return ConsumerGroupState.UNKNOWN -class NativeConsumerGroupMonitor: - """ - Enhanced consumer group monitoring using aiokafka. - - Provides detailed consumer group health monitoring, lag tracking, and - rebalancing detection using AIOKafkaAdminClient's native capabilities. - """ - - def __init__( - self, - settings: Settings, - logger: logging.Logger, - client_id: str = "integr8scode-consumer-group-monitor", - # Health thresholds - critical_lag_threshold: int = 10000, - warning_lag_threshold: int = 1000, - min_members_threshold: int = 1, - ): - self.logger = logger - self._settings = settings - self._bootstrap_servers = settings.KAFKA_BOOTSTRAP_SERVERS - self._client_id = client_id - - self._admin: AIOKafkaAdminClient | None = None - - # Health thresholds - self.critical_lag_threshold = critical_lag_threshold - self.warning_lag_threshold = warning_lag_threshold - self.min_members_threshold = min_members_threshold - - # Monitoring state - self._group_status_cache: dict[str, ConsumerGroupStatus] = {} - self._cache_ttl_seconds = 30 - - async def _get_admin(self) -> AIOKafkaAdminClient: - """Get or create the admin client.""" - if self._admin is None: - self._admin = AIOKafkaAdminClient( - bootstrap_servers=self._bootstrap_servers, - client_id=self._client_id, - ) - await self._admin.start() - return self._admin - - async def close(self) -> None: - """Close the admin client.""" - if self._admin is not None: - await self._admin.close() - self._admin = None - - async def get_consumer_group_status( - self, group_id: str, include_lag: bool = True - ) -> ConsumerGroupStatus: - """Get comprehensive status for a consumer group.""" - try: - # Check cache first - cached = self._group_status_cache.get(group_id) - if cached is not None: - cache_age = (datetime.now(timezone.utc) - cached.timestamp).total_seconds() - if cache_age < self._cache_ttl_seconds: - return cached - - # Get group description from AdminClient - described_group = await self._describe_consumer_group(group_id) - - # Build member information - members: list[ConsumerGroupMember] = [] - partition_distribution: dict[str, int] = {} - total_assigned_partitions = 0 - - for member_data in described_group.members: - member_id: str = member_data["member_id"] - client_id: str = member_data["client_id"] - client_host: str = member_data["client_host"] - assignment_bytes: bytes = member_data["member_assignment"] - - # Parse assigned partitions from assignment bytes - assigned_partitions: list[str] = [] - topic_partitions = _parse_member_assignment(assignment_bytes) - for topic, partitions in topic_partitions: - for partition in partitions: - assigned_partitions.append(f"{topic}:{partition}") - - members.append( - ConsumerGroupMember( - member_id=member_id, - client_id=client_id, - host=client_host, - assigned_partitions=assigned_partitions, - ) - ) - - partition_distribution[member_id] = len(assigned_partitions) - total_assigned_partitions += len(assigned_partitions) - - # Get coordinator information - admin = await self._get_admin() - coordinator_id = await admin.find_coordinator(group_id) - coordinator = f"node:{coordinator_id}" - - # Parse state - state = _state_from_string(described_group.state) - - # Get lag information if requested - total_lag = 0 - partition_lags: dict[str, int] = {} - if include_lag and state == ConsumerGroupState.STABLE: - try: - lag_info = await self._get_consumer_group_lag(group_id) - total_lag = lag_info["total_lag"] - partition_lags = lag_info["partition_lags"] - except Exception as e: - self.logger.warning(f"Failed to get lag info for group {group_id}: {e}") - - # Create status object - status = ConsumerGroupStatus( - group_id=group_id, - state=state, - protocol=described_group.protocol, - protocol_type=described_group.protocol_type, - coordinator=coordinator, - members=members, - member_count=len(members), - assigned_partitions=total_assigned_partitions, - partition_distribution=partition_distribution, - total_lag=total_lag, - partition_lags=partition_lags, - ) - - # Assess health - status.health, status.health_message = self._assess_group_health(status) - - # Cache the result - self._group_status_cache[group_id] = status - - return status - - except Exception as e: - self.logger.error(f"Failed to get consumer group status for {group_id}: {e}") - - # Return minimal status with error - return ConsumerGroupStatus( - group_id=group_id, - state=ConsumerGroupState.UNKNOWN, - protocol="unknown", - protocol_type="unknown", - coordinator="unknown", - members=[], - member_count=0, - assigned_partitions=0, - partition_distribution={}, - health=ConsumerGroupHealth.UNHEALTHY, - health_message=f"Failed to get group status: {e}", - ) - - async def get_multiple_group_status( - self, group_ids: list[str], include_lag: bool = True - ) -> dict[str, ConsumerGroupStatus]: - """Get status for multiple consumer groups efficiently.""" - results: dict[str, ConsumerGroupStatus] = {} - - # Process groups concurrently - tasks = [self.get_consumer_group_status(group_id, include_lag) for group_id in group_ids] - - try: - statuses = await asyncio.gather(*tasks, return_exceptions=True) - - for group_id, status in zip(group_ids, statuses, strict=False): - if isinstance(status, ConsumerGroupStatus): - results[group_id] = status - else: - # status is BaseException - self.logger.error(f"Failed to get status for group {group_id}: {status}") - results[group_id] = ConsumerGroupStatus( - group_id=group_id, - state=ConsumerGroupState.UNKNOWN, - protocol="unknown", - protocol_type="unknown", - coordinator="unknown", - members=[], - member_count=0, - assigned_partitions=0, - partition_distribution={}, - health=ConsumerGroupHealth.UNHEALTHY, - health_message=str(status), - ) - - except Exception as e: - self.logger.error(f"Failed to get multiple group status: {e}") - # Return error status for all groups - for group_id in group_ids: - if group_id not in results: - results[group_id] = ConsumerGroupStatus( - group_id=group_id, - state=ConsumerGroupState.UNKNOWN, - protocol="unknown", - protocol_type="unknown", - coordinator="unknown", - members=[], - member_count=0, - assigned_partitions=0, - partition_distribution={}, - health=ConsumerGroupHealth.UNHEALTHY, - health_message=str(e), - ) - - return results - - async def list_consumer_groups(self) -> list[str]: - """List all consumer groups in the cluster.""" - try: - admin = await self._get_admin() - # Returns list of tuples: (group_id, protocol_type) - groups: list[tuple[Any, ...]] = await admin.list_consumer_groups() - return [str(g[0]) for g in groups] - except Exception as e: - self.logger.error(f"Failed to list consumer groups: {e}") - return [] - - async def _describe_consumer_group(self, group_id: str) -> DescribedGroup: - """Describe a single consumer group using native AdminClient.""" - admin = await self._get_admin() - responses: list[Response] = await admin.describe_consumer_groups([group_id]) - - if not responses: - raise ValueError(f"No response for group {group_id}") - - # Parse the response - groups = _parse_describe_groups_response(responses[0]) - - # Find our group in the response - for group in groups: - if group.group_id == group_id: - if group.error_code != 0: - raise ValueError(f"Error describing group {group_id}: error_code={group.error_code}") - return group - - raise ValueError(f"Group {group_id} not found in response") - - async def _get_consumer_group_lag(self, group_id: str) -> dict[str, Any]: - """Get consumer group lag information.""" - try: - admin = await self._get_admin() - - # Get committed offsets for the group - offsets: dict[TopicPartition, OffsetAndMetadata] = await admin.list_consumer_group_offsets(group_id) - - if not offsets: - return {"total_lag": 0, "partition_lags": {}} - - # Create a temporary consumer to get high watermarks - consumer = AIOKafkaConsumer( - bootstrap_servers=self._bootstrap_servers, - group_id=f"{group_id}-lag-monitor-{datetime.now().timestamp()}", - enable_auto_commit=False, - auto_offset_reset="earliest", - session_timeout_ms=self._settings.KAFKA_SESSION_TIMEOUT_MS, - heartbeat_interval_ms=self._settings.KAFKA_HEARTBEAT_INTERVAL_MS, - request_timeout_ms=self._settings.KAFKA_REQUEST_TIMEOUT_MS, - ) - - try: - await consumer.start() - - total_lag = 0 - partition_lags: dict[str, int] = {} - - # Get end offsets for all partitions - tps = list(offsets.keys()) - if tps: - end_offsets: dict[TopicPartition, int] = await consumer.end_offsets(tps) - - for tp, offset_meta in offsets.items(): - committed_offset = offset_meta.offset - high = end_offsets.get(tp, 0) - - if committed_offset >= 0: - lag = max(0, high - committed_offset) - partition_lags[f"{tp.topic}:{tp.partition}"] = lag - total_lag += lag - - return {"total_lag": total_lag, "partition_lags": partition_lags} - - finally: - await consumer.stop() - - except Exception as e: - self.logger.warning(f"Failed to get consumer group lag for {group_id}: {e}") - return {"total_lag": 0, "partition_lags": {}} - - def _assess_group_health(self, status: ConsumerGroupStatus) -> tuple[ConsumerGroupHealth, str]: - """Assess the health of a consumer group based on its status.""" - - # Check for error/unknown state - if status.state == ConsumerGroupState.UNKNOWN: - return ConsumerGroupHealth.UNHEALTHY, "Group is in unknown state" - - if status.state == ConsumerGroupState.DEAD: - return ConsumerGroupHealth.UNHEALTHY, "Group is dead" - - if status.member_count < self.min_members_threshold: - return ConsumerGroupHealth.UNHEALTHY, f"Insufficient members: {status.member_count}" - - # Check for rebalancing issues - if status.state in (ConsumerGroupState.PREPARING_REBALANCE, ConsumerGroupState.COMPLETING_REBALANCE): - return ConsumerGroupHealth.DEGRADED, f"Group is rebalancing: {status.state}" - - # Check for empty group - if status.state == ConsumerGroupState.EMPTY: - return ConsumerGroupHealth.DEGRADED, "Group is empty (no active members)" - - # Check lag if available - if status.total_lag >= self.critical_lag_threshold: - return ConsumerGroupHealth.UNHEALTHY, f"Critical lag: {status.total_lag} messages" - - if status.total_lag >= self.warning_lag_threshold: - return ConsumerGroupHealth.DEGRADED, f"High lag: {status.total_lag} messages" - - # Check partition distribution - if status.partition_distribution: - values = list(status.partition_distribution.values()) - max_partitions = max(values) - min_partitions = min(values) - - # Warn if partition distribution is very uneven - if max_partitions > 0 and (max_partitions - min_partitions) > max_partitions * 0.5: - return ConsumerGroupHealth.DEGRADED, "Uneven partition distribution" - - # Check if group is stable and consuming - if status.state == ConsumerGroupState.STABLE and status.assigned_partitions > 0: - return ConsumerGroupHealth.HEALTHY, f"Group is stable with {status.member_count} members" - - # Default case - return ConsumerGroupHealth.UNKNOWN, f"Group state: {status.state}" - - def get_health_summary(self, status: ConsumerGroupStatus) -> dict[str, Any]: - """Get a health summary for a consumer group.""" - return { - "group_id": status.group_id, - "health": status.health, - "health_message": status.health_message, - "state": status.state, - "members": status.member_count, - "assigned_partitions": status.assigned_partitions, - "total_lag": status.total_lag, - "coordinator": status.coordinator, - "timestamp": status.timestamp.isoformat(), - "partition_distribution": status.partition_distribution, - } - - def clear_cache(self) -> None: - """Clear the status cache.""" - self._group_status_cache.clear() - diff --git a/backend/app/events/core/producer.py b/backend/app/events/core/producer.py index 1946f479..aab668bc 100644 --- a/backend/app/events/core/producer.py +++ b/backend/app/events/core/producer.py @@ -1,14 +1,8 @@ -import asyncio -import socket -from datetime import datetime, timezone - import structlog from faststream.kafka import KafkaBroker from app.core.metrics import EventMetrics from app.db.repositories import EventRepository -from app.dlq.models import DLQMessage, DLQMessageStatus -from app.domain.enums import KafkaTopic from app.domain.events import DomainEvent from app.infrastructure.kafka.mappings import EVENT_TYPE_TO_TOPIC from app.settings import Settings @@ -54,41 +48,3 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None: self.logger.error(f"Failed to produce message: {e}") raise - async def send_to_dlq( - self, original_event: DomainEvent, original_topic: str, error: Exception, retry_count: int = 0 - ) -> None: - """Send a failed event to the Dead Letter Queue.""" - try: - current_task = asyncio.current_task() - task_name = current_task.get_name() if current_task else "main" - producer_id = f"{socket.gethostname()}-{task_name}" - - dlq_topic = f"{self._topic_prefix}{KafkaTopic.DEAD_LETTER_QUEUE}" - - dlq_msg = DLQMessage( - event=original_event, - original_topic=original_topic, - error=str(error), - retry_count=retry_count, - failed_at=datetime.now(timezone.utc), - status=DLQMessageStatus.PENDING, - producer_id=producer_id, - ) - - await self._broker.publish( - message=dlq_msg, - topic=dlq_topic, - key=original_event.event_id.encode(), - ) - - self._event_metrics.record_kafka_message_produced(dlq_topic) - self.logger.warning( - f"Event {original_event.event_id} sent to DLQ. " - f"Original topic: {original_topic}, Error: {error}, " - f"Retry count: {retry_count}" - ) - - except Exception as e: - self.logger.critical( - f"Failed to send event {original_event.event_id} to DLQ: {e}. Original error: {error}", exc_info=True - ) diff --git a/backend/app/schemas_pydantic/execution.py b/backend/app/schemas_pydantic/execution.py index 2fc60a25..4b16d8e4 100644 --- a/backend/app/schemas_pydantic/execution.py +++ b/backend/app/schemas_pydantic/execution.py @@ -20,12 +20,6 @@ class ExecutionBase(BaseModel): lang_version: str = "3.11" -class ExecutionCreate(ExecutionBase): - """Model for creating a new execution.""" - - pass - - class ExecutionInDB(ExecutionBase): """Model for execution as stored in database.""" @@ -40,17 +34,6 @@ class ExecutionInDB(ExecutionBase): model_config = ConfigDict(from_attributes=True) -class ExecutionUpdate(BaseModel): - """Model for updating an execution.""" - - status: ExecutionStatus | None = None - stdout: str | None = None - stderr: str | None = None - resource_usage: ResourceUsage | None = None - exit_code: int | None = None - error_type: ExecutionErrorType | None = None - - class ResourceUsage(BaseModel): """Model for execution resource usage.""" diff --git a/backend/app/schemas_pydantic/notification.py b/backend/app/schemas_pydantic/notification.py index 05491c3d..76805209 100644 --- a/backend/app/schemas_pydantic/notification.py +++ b/backend/app/schemas_pydantic/notification.py @@ -56,27 +56,6 @@ def validate_scheduled_for(cls, v: datetime | None) -> datetime | None: model_config = ConfigDict(from_attributes=True) -class NotificationBatch(BaseModel): - """Batch of notifications for bulk processing""" - - batch_id: str = Field(default_factory=lambda: str(uuid4())) - notifications: list[Notification] - created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) - processed_count: int = 0 - failed_count: int = 0 - - @field_validator("notifications") - @classmethod - def validate_notifications(cls, v: list[Notification]) -> list[Notification]: - if not v: - raise ValueError("Batch must contain at least one notification") - if len(v) > 1000: - raise ValueError("Batch cannot exceed 1000 notifications") - return v - - model_config = ConfigDict(from_attributes=True) - - # Rules removed in unified model @@ -109,37 +88,6 @@ class NotificationSubscription(BaseModel): model_config = ConfigDict(from_attributes=True) -class NotificationStats(BaseModel): - """Statistics for notification delivery""" - - user_id: str | None = None - channel: NotificationChannel | None = None - tags: list[str] | None = None - severity: NotificationSeverity | None = None - - # Time range - start_date: datetime - end_date: datetime - - # Counts - total_sent: int = 0 - total_delivered: int = 0 - total_failed: int = 0 - total_read: int = 0 - total_clicked: int = 0 - - # Rates - delivery_rate: float = 0.0 - read_rate: float = 0.0 - click_rate: float = 0.0 - - # Performance - avg_delivery_time_seconds: float = 0.0 - avg_read_time_seconds: float = 0.0 - - model_config = ConfigDict(from_attributes=True) - - class NotificationResponse(BaseModel): """Response schema for notification endpoints""" diff --git a/backend/app/schemas_pydantic/replay.py b/backend/app/schemas_pydantic/replay.py index 95685b45..19d77b1e 100644 --- a/backend/app/schemas_pydantic/replay.py +++ b/backend/app/schemas_pydantic/replay.py @@ -67,12 +67,6 @@ def duration_seconds(self) -> float | None: return (self.completed_at - self.started_at).total_seconds() return None - @computed_field # type: ignore[prop-decorator] - @property - def throughput_events_per_second(self) -> float | None: - if self.duration_seconds and self.duration_seconds > 0 and self.replayed_events > 0: - return self.replayed_events / self.duration_seconds - return None class CleanupResponse(BaseModel): diff --git a/backend/app/schemas_pydantic/saved_script.py b/backend/app/schemas_pydantic/saved_script.py index e315c656..e9cd0c84 100644 --- a/backend/app/schemas_pydantic/saved_script.py +++ b/backend/app/schemas_pydantic/saved_script.py @@ -1,5 +1,4 @@ from datetime import datetime, timezone -from uuid import uuid4 from pydantic import BaseModel, ConfigDict, Field @@ -18,15 +17,6 @@ class SavedScriptCreate(SavedScriptBase): pass -class SavedScriptInDB(SavedScriptBase): - script_id: str = Field(default_factory=lambda: str(uuid4())) - user_id: str - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - - model_config = ConfigDict(from_attributes=True) - - class SavedScriptUpdate(BaseModel): name: str | None = None script: str | None = None diff --git a/backend/app/schemas_pydantic/user.py b/backend/app/schemas_pydantic/user.py index 268b7aaa..29b82f9e 100644 --- a/backend/app/schemas_pydantic/user.py +++ b/backend/app/schemas_pydantic/user.py @@ -1,5 +1,4 @@ -from datetime import datetime, timezone -from uuid import uuid4 +from datetime import datetime from pydantic import BaseModel, ConfigDict, EmailStr, Field @@ -26,18 +25,6 @@ class UserCreate(UserBase): model_config = ConfigDict(from_attributes=True) -class UserInDB(UserBase): - """User model as stored in database (with hashed password)""" - - user_id: str = Field(default_factory=lambda: str(uuid4())) - hashed_password: str - is_superuser: bool = False - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - - model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) - - class UserUpdate(BaseModel): """Model for updating a user""" diff --git a/backend/app/schemas_pydantic/user_settings.py b/backend/app/schemas_pydantic/user_settings.py index 76140933..6cba84b6 100644 --- a/backend/app/schemas_pydantic/user_settings.py +++ b/backend/app/schemas_pydantic/user_settings.py @@ -77,16 +77,6 @@ class UserSettingsUpdate(BaseModel): custom_settings: dict[str, Any] | None = None -class SettingChange(BaseModel): - """Represents a single setting change for event sourcing""" - - field_path: str # e.g., "theme", "editor.font_size", "notifications.channels" - old_value: Any - new_value: Any - changed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - change_reason: str | None = None - - class ThemeUpdateRequest(BaseModel): """Request model for theme update""" @@ -122,9 +112,3 @@ class RestoreSettingsRequest(BaseModel): reason: str | None = None -class SettingsEvent(BaseModel): - """Minimal event model for user settings service consumption.""" - - event_type: EventType - timestamp: datetime - payload: dict[str, Any] diff --git a/backend/app/services/event_service.py b/backend/app/services/event_service.py index 596aa4bd..ea0aa65f 100644 --- a/backend/app/services/event_service.py +++ b/backend/app/services/event_service.py @@ -1,47 +1,16 @@ from datetime import datetime -from typing import Any from app.db.repositories import EventRepository from app.domain.enums import EventType, UserRole from app.domain.events import ( ArchivedEvent, DomainEvent, - EventFilter, EventListResult, EventReplayInfo, EventStatistics, ) -def _filter_to_mongo_query(flt: EventFilter) -> dict[str, Any]: - """Convert EventFilter to MongoDB query dict.""" - query: dict[str, Any] = {} - - if flt.event_types: - query["event_type"] = {"$in": flt.event_types} - if flt.aggregate_id: - query["aggregate_id"] = flt.aggregate_id - if flt.user_id: - query["metadata.user_id"] = flt.user_id - if flt.service_name: - query["metadata.service_name"] = flt.service_name - if getattr(flt, "status", None): - query["status"] = flt.status - - if flt.start_time or flt.end_time: - time_query: dict[str, Any] = {} - if flt.start_time: - time_query["$gte"] = flt.start_time - if flt.end_time: - time_query["$lte"] = flt.end_time - query["timestamp"] = time_query - - if flt.search_text: - query["$text"] = {"$search": flt.search_text} - - return query - - class EventService: def __init__(self, repository: EventRepository): self.repository = repository @@ -102,39 +71,6 @@ async def get_user_events_paginated( sort_order=sort_order, ) - async def query_events_advanced( - self, - user_id: str, - user_role: UserRole, - filters: EventFilter, - sort_by: str = "timestamp", - limit: int = 100, - skip: int = 0, - ) -> EventListResult | None: - # Access control - if filters.user_id and filters.user_id != user_id and user_role != UserRole.ADMIN: - return None - - query = _filter_to_mongo_query(filters) - if not filters.user_id and user_role != UserRole.ADMIN: - query["metadata.user_id"] = user_id - - # Sort field mapping - field_map = { - "timestamp": "timestamp", - "event_type": "event_type", - "aggregate_id": "aggregate_id", - "stored_at": "stored_at", - } - sort_field = field_map.get(sort_by, "timestamp") - - return await self.repository.query_events( - query=query, - sort_field=sort_field, - skip=skip, - limit=limit, - ) - async def get_event_statistics( self, user_id: str, diff --git a/backend/app/services/idempotency/redis_repository.py b/backend/app/services/idempotency/redis_repository.py index 65ba91a1..f15471a0 100644 --- a/backend/app/services/idempotency/redis_repository.py +++ b/backend/app/services/idempotency/redis_repository.py @@ -118,9 +118,3 @@ async def update_record(self, record: IdempotencyRecord) -> int: await self._r.set(k, payload) return 1 - async def delete_key(self, key: str) -> int: - k = self._full_key(key) - return int(await self._r.delete(k) or 0) - - async def health_check(self) -> None: - await self._r.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py index 591a27c1..53f6cd4e 100644 --- a/backend/app/services/k8s_worker/worker.py +++ b/backend/app/services/k8s_worker/worker.py @@ -13,7 +13,6 @@ CreatePodCommandEvent, DeletePodCommandEvent, ExecutionFailedEvent, - ExecutionStartedEvent, PodCreatedEvent, ) from app.events.core import UnifiedProducer @@ -201,18 +200,6 @@ async def _create_pod(self, pod: k8s_client.V1Pod) -> None: else: raise - async def _publish_execution_started(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None: - """Publish execution started event""" - event = ExecutionStartedEvent( - execution_id=command.execution_id, - aggregate_id=command.execution_id, - pod_name=pod.metadata.name, - node_name=pod.spec.node_name, - container_id=None, - metadata=command.metadata, - ) - await self.producer.produce(event_to_produce=event, key=command.execution_id) - async def _publish_pod_created(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None: """Publish pod created event""" event = PodCreatedEvent( diff --git a/backend/app/services/pod_monitor/event_mapper.py b/backend/app/services/pod_monitor/event_mapper.py index a2615b7c..e8e73943 100644 --- a/backend/app/services/pod_monitor/event_mapper.py +++ b/backend/app/services/pod_monitor/event_mapper.py @@ -519,6 +519,3 @@ def _log_extraction_error(self, pod_name: str, error: str) -> None: else: self.logger.warning(f"Failed to extract logs from pod {pod_name}: {error}") - def clear_cache(self) -> None: - """Clear event cache""" - self._event_cache.clear() diff --git a/backend/app/services/result_processor/resource_cleaner.py b/backend/app/services/result_processor/resource_cleaner.py index 4734e0db..92b57a06 100644 --- a/backend/app/services/result_processor/resource_cleaner.py +++ b/backend/app/services/result_processor/resource_cleaner.py @@ -1,16 +1,12 @@ -import asyncio -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from typing import Any import structlog from kubernetes_asyncio import client as k8s_client from kubernetes_asyncio.client.rest import ApiException -from app.domain.exceptions import InfrastructureError - # Python 3.12 type aliases type ResourceDict = dict[str, list[str]] -type CountDict = dict[str, int] class ResourceCleaner: @@ -24,41 +20,6 @@ def __init__(self, api_client: k8s_client.ApiClient, logger: structlog.stdlib.Bo self.networking_v1 = k8s_client.NetworkingV1Api(api_client) self.logger = logger - async def cleanup_pod_resources( - self, - pod_name: str, - namespace: str = "integr8scode", - execution_id: str | None = None, - timeout: int = 60, - delete_pvcs: bool = False, - ) -> None: - """Clean up all resources associated with a pod""" - self.logger.info(f"Cleaning up resources for pod: {pod_name}") - - try: - tasks = [ - self._delete_pod(pod_name, namespace), - *( - [ - self._delete_configmaps(execution_id, namespace), - *([self._delete_pvcs(execution_id, namespace)] if delete_pvcs else []), - ] - if execution_id - else [] - ), - ] - - await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=timeout) - - self.logger.info(f"Successfully cleaned up resources for pod: {pod_name}") - - except asyncio.TimeoutError as e: - self.logger.error(f"Timeout cleaning up resources for pod: {pod_name}") - raise InfrastructureError("Resource cleanup timed out") from e - except Exception as e: - self.logger.error(f"Failed to cleanup resources: {e}") - raise InfrastructureError(f"Resource cleanup failed: {e}") from e - async def _delete_pod(self, pod_name: str, namespace: str) -> None: """Delete a pod""" try: @@ -108,30 +69,6 @@ async def _delete_labeled_resources( except ApiException as e: self.logger.error(f"Failed to delete {resource_type}s: {e}") - async def cleanup_orphaned_resources( - self, - namespace: str = "integr8scode", - max_age_hours: int = 24, - dry_run: bool = False, - ) -> ResourceDict: - """Clean up orphaned resources older than specified age""" - cutoff_time = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) - cleaned: ResourceDict = { - "pods": [], - "configmaps": [], - "pvcs": [], - } - - try: - await self._cleanup_orphaned_pods(namespace, cutoff_time, cleaned, dry_run) - await self._cleanup_orphaned_configmaps(namespace, cutoff_time, cleaned, dry_run) - - return cleaned - - except Exception as e: - self.logger.error(f"Failed to cleanup orphaned resources: {e}") - raise InfrastructureError(f"Orphaned resource cleanup failed: {e}") from e - async def _cleanup_orphaned_pods( self, namespace: str, cutoff_time: datetime, cleaned: ResourceDict, dry_run: bool ) -> None: @@ -169,45 +106,3 @@ async def _cleanup_orphaned_configmaps( except Exception as e: self.logger.error(f"Failed to delete orphaned ConfigMap {cm.metadata.name}: {e}") - async def get_resource_usage(self, namespace: str = "default") -> CountDict: - """Get current resource usage counts""" - label_selector = "app=integr8s" - - default_counts = {"pods": 0, "configmaps": 0, "network_policies": 0} - - try: - # Get pods count - try: - pods = await self.v1.list_namespaced_pod(namespace, label_selector=label_selector) - pod_count = len(pods.items) - except Exception as e: - self.logger.warning(f"Failed to get pods: {e}") - pod_count = 0 - - # Get configmaps count - try: - configmaps = await self.v1.list_namespaced_config_map(namespace, label_selector=label_selector) - configmap_count = len(configmaps.items) - except Exception as e: - self.logger.warning(f"Failed to get configmaps: {e}") - configmap_count = 0 - - # Get network policies count - try: - policies = await self.networking_v1.list_namespaced_network_policy( - namespace, label_selector=label_selector - ) - policy_count = len(policies.items) - except Exception as e: - self.logger.warning(f"Failed to get network policies: {e}") - policy_count = 0 - - return { - "pods": pod_count, - "configmaps": configmap_count, - "network_policies": policy_count, - } - - except Exception as e: - self.logger.error(f"Failed to get resource usage: {e}") - return default_counts diff --git a/backend/app/services/saga/saga_step.py b/backend/app/services/saga/saga_step.py index fe3ab191..c0f560aa 100644 --- a/backend/app/services/saga/saga_step.py +++ b/backend/app/services/saga/saga_step.py @@ -1,8 +1,6 @@ from abc import ABC, abstractmethod from typing import Any, Generic, TypeVar -from fastapi.encoders import jsonable_encoder - from app.domain.events import DomainEvent T = TypeVar("T", bound=DomainEvent) @@ -29,31 +27,6 @@ def add_compensation(self, compensation: "CompensationStep") -> None: """Add compensation step""" self.compensations.append(compensation) - def to_public_dict(self) -> dict[str, Any]: - """Return a safe, persistable snapshot of context data. - - - Excludes private/ephemeral keys (prefixed with "_") - - Encodes values to JSON-friendly types using FastAPI's jsonable_encoder - """ - - def _is_simple(val: Any) -> bool: - if isinstance(val, (str, int, float, bool)) or val is None: - return True - if isinstance(val, dict): - return all(isinstance(k, str) and _is_simple(v) for k, v in val.items()) - if isinstance(val, (list, tuple)): - return all(_is_simple(i) for i in val) - return False - - public: dict[str, Any] = {} - for k, v in self.data.items(): - if isinstance(k, str) and k.startswith("_"): - continue - encoded = jsonable_encoder(v, exclude_none=False) - if _is_simple(encoded): - public[k] = encoded - # else: drop complex/unknown types - return public class SagaStep(ABC, Generic[T]): diff --git a/backend/app/services/user_settings_service.py b/backend/app/services/user_settings_service.py index d7f21584..d181e4b1 100644 --- a/backend/app/services/user_settings_service.py +++ b/backend/app/services/user_settings_service.py @@ -205,8 +205,3 @@ def _add_to_cache(self, user_id: str, settings: DomainUserSettings) -> None: self._cache[user_id] = settings self.logger.debug(f"Cached settings for user {user_id}", cache_size=len(self._cache)) - async def reset_user_settings(self, user_id: str) -> None: - """Reset user settings by deleting all data and cache.""" - await self.invalidate_cache(user_id) - await self.repository.delete_user_settings(user_id) - self.logger.info(f"Reset settings for user {user_id}") diff --git a/backend/tests/e2e/db/repositories/test_dlq_repository.py b/backend/tests/e2e/db/repositories/test_dlq_repository.py index 4238b7c6..7d1f048d 100644 --- a/backend/tests/e2e/db/repositories/test_dlq_repository.py +++ b/backend/tests/e2e/db/repositories/test_dlq_repository.py @@ -81,8 +81,5 @@ async def test_list_get_and_updates(repo: DLQRepository) -> None: assert res.total >= 3 and len(res.messages) <= 2 msg = await repo.get_message_by_id("id1") assert msg and msg.event.event_id == "id1" - assert await repo.mark_message_retried("id1") in (True, False) - assert await repo.mark_message_discarded("id1", "r") in (True, False) - topics = await repo.get_topics_summary() assert any(t.topic == "t1" for t in topics) diff --git a/backend/tests/e2e/db/repositories/test_execution_repository.py b/backend/tests/e2e/db/repositories/test_execution_repository.py index 083ebf82..c1396020 100644 --- a/backend/tests/e2e/db/repositories/test_execution_repository.py +++ b/backend/tests/e2e/db/repositories/test_execution_repository.py @@ -4,7 +4,7 @@ import pytest from app.db.repositories import ExecutionRepository from app.domain.enums import ExecutionStatus -from app.domain.execution import DomainExecutionCreate, DomainExecutionUpdate +from app.domain.execution import DomainExecutionCreate _test_logger = structlog.get_logger("test.db.repositories.execution_repository") @@ -30,13 +30,6 @@ async def test_execution_crud_and_query() -> None: got = await repo.get_execution(created.execution_id) assert got and got.script.startswith("print") and got.status == ExecutionStatus.QUEUED - # Update - update = DomainExecutionUpdate(status=ExecutionStatus.RUNNING, stdout="ok") - ok = await repo.update_execution(created.execution_id, update) - assert ok is True - got2 = await repo.get_execution(created.execution_id) - assert got2 and got2.status == ExecutionStatus.RUNNING - # List items = await repo.get_executions({"user_id": user_id}, limit=10, skip=0, sort=[("created_at", 1)]) assert any(x.execution_id == created.execution_id for x in items) diff --git a/backend/tests/e2e/dlq/test_dlq_discard.py b/backend/tests/e2e/dlq/test_dlq_discard.py deleted file mode 100644 index e9a89e03..00000000 --- a/backend/tests/e2e/dlq/test_dlq_discard.py +++ /dev/null @@ -1,141 +0,0 @@ -import logging -import uuid -from datetime import datetime, timezone - -import pytest -from app.db.docs import DLQMessageDocument -from app.db.repositories import DLQRepository -from app.dlq.models import DLQMessageStatus -from app.domain.enums import KafkaTopic -from dishka import AsyncContainer - -from tests.conftest import make_execution_requested_event - -pytestmark = [pytest.mark.e2e, pytest.mark.mongodb] - -_test_logger = logging.getLogger("test.dlq.discard") - - -async def _create_dlq_document( - event_id: str | None = None, - status: DLQMessageStatus = DLQMessageStatus.PENDING, -) -> DLQMessageDocument: - """Helper to create a DLQ document directly in MongoDB.""" - if event_id is None: - event_id = str(uuid.uuid4()) - - event = make_execution_requested_event(execution_id=f"exec-{uuid.uuid4().hex[:8]}") - # Override event_id for test predictability - event_dict = event.model_dump() - event_dict["event_id"] = event_id - now = datetime.now(timezone.utc) - - doc = DLQMessageDocument( - event=event_dict, - original_topic=KafkaTopic.EXECUTION_EVENTS, - error="Test error", - retry_count=0, - failed_at=now, - status=status, - producer_id="test-producer", - created_at=now, - ) - await doc.insert() - return doc - - -@pytest.mark.asyncio -async def test_dlq_repository_marks_message_discarded(scope: AsyncContainer) -> None: - """Test that DLQRepository.mark_message_discarded() updates status correctly.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DLQ document - event_id = f"dlq-discard-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.PENDING) - - # Discard the message - reason = "max_retries_exceeded" - result = await repository.mark_message_discarded(event_id, reason) - - assert result is True - - # Verify the status changed - updated_doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert updated_doc is not None - assert updated_doc.status == DLQMessageStatus.DISCARDED - assert updated_doc.discard_reason == reason - assert updated_doc.discarded_at is not None - - -@pytest.mark.asyncio -async def test_dlq_discard_nonexistent_message_returns_false(scope: AsyncContainer) -> None: - """Test that discarding a nonexistent message returns False.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Try to discard a message that doesn't exist - result = await repository.mark_message_discarded( - f"nonexistent-{uuid.uuid4().hex[:8]}", - "test_reason", - ) - - assert result is False - - -@pytest.mark.asyncio -async def test_dlq_discard_sets_timestamp(scope: AsyncContainer) -> None: - """Test that discarding sets the discarded_at timestamp.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DLQ document - event_id = f"dlq-ts-{uuid.uuid4().hex[:8]}" - before_discard = datetime.now(timezone.utc) - await _create_dlq_document(event_id=event_id) - - # Discard the message - await repository.mark_message_discarded(event_id, "manual_discard") - after_discard = datetime.now(timezone.utc) - - # Verify timestamp is set correctly - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.discarded_at is not None - assert before_discard <= doc.discarded_at <= after_discard - - -@pytest.mark.asyncio -async def test_dlq_discard_with_custom_reason(scope: AsyncContainer) -> None: - """Test that custom discard reasons are stored.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DLQ document - event_id = f"dlq-reason-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id) - - # Discard with custom reason - custom_reason = "manual: User requested deletion due to invalid payload" - await repository.mark_message_discarded(event_id, custom_reason) - - # Verify the reason is stored - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.discard_reason == custom_reason - - -@pytest.mark.asyncio -async def test_dlq_discard_from_scheduled_status(scope: AsyncContainer) -> None: - """Test that scheduled messages can be discarded.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a SCHEDULED DLQ document - event_id = f"dlq-scheduled-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED) - - # Discard the message - result = await repository.mark_message_discarded(event_id, "policy_change") - - assert result is True - - # Verify status transition - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.DISCARDED diff --git a/backend/tests/e2e/dlq/test_dlq_retry.py b/backend/tests/e2e/dlq/test_dlq_retry.py deleted file mode 100644 index 931b6565..00000000 --- a/backend/tests/e2e/dlq/test_dlq_retry.py +++ /dev/null @@ -1,205 +0,0 @@ -import logging -import uuid -from datetime import datetime, timezone - -import pytest -from app.db.docs import DLQMessageDocument -from app.db.repositories import DLQRepository -from app.dlq.models import DLQMessageStatus -from app.domain.enums import KafkaTopic -from dishka import AsyncContainer - -from tests.conftest import make_execution_requested_event - -pytestmark = [pytest.mark.e2e, pytest.mark.mongodb] - -_test_logger = logging.getLogger("test.dlq.retry") - - -async def _create_dlq_document( - event_id: str | None = None, - status: DLQMessageStatus = DLQMessageStatus.PENDING, -) -> DLQMessageDocument: - """Helper to create a DLQ document directly in MongoDB.""" - if event_id is None: - event_id = str(uuid.uuid4()) - - event = make_execution_requested_event(execution_id=f"exec-{uuid.uuid4().hex[:8]}") - # Override event_id for test predictability - event_dict = event.model_dump() - event_dict["event_id"] = event_id - now = datetime.now(timezone.utc) - - doc = DLQMessageDocument( - event=event_dict, - original_topic=KafkaTopic.EXECUTION_EVENTS, - error="Test error", - retry_count=0, - failed_at=now, - status=status, - producer_id="test-producer", - created_at=now, - ) - await doc.insert() - return doc - - -@pytest.mark.asyncio -async def test_dlq_repository_marks_message_retried(scope: AsyncContainer) -> None: - """Test that DLQRepository.mark_message_retried() updates status correctly.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DLQ document - event_id = f"dlq-retry-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED) - - # Mark as retried - result = await repository.mark_message_retried(event_id) - - assert result is True - - # Verify the status changed - updated_doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert updated_doc is not None - assert updated_doc.status == DLQMessageStatus.RETRIED - assert updated_doc.retried_at is not None - - -@pytest.mark.asyncio -async def test_dlq_retry_nonexistent_message_returns_false(scope: AsyncContainer) -> None: - """Test that retrying a nonexistent message returns False.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Try to retry a message that doesn't exist - result = await repository.mark_message_retried(f"nonexistent-{uuid.uuid4().hex[:8]}") - - assert result is False - - -@pytest.mark.asyncio -async def test_dlq_retry_sets_timestamp(scope: AsyncContainer) -> None: - """Test that retrying sets the retried_at timestamp.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DLQ document - event_id = f"dlq-retry-ts-{uuid.uuid4().hex[:8]}" - before_retry = datetime.now(timezone.utc) - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED) - - # Retry the message - await repository.mark_message_retried(event_id) - after_retry = datetime.now(timezone.utc) - - # Verify timestamp is set correctly - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.retried_at is not None - assert before_retry <= doc.retried_at <= after_retry - - -@pytest.mark.asyncio -async def test_dlq_retry_from_pending_status(scope: AsyncContainer) -> None: - """Test that pending messages can be retried.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a PENDING DLQ document - event_id = f"dlq-pending-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.PENDING) - - # Retry the message - result = await repository.mark_message_retried(event_id) - - assert result is True - - # Verify status transition - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.RETRIED - - -@pytest.mark.asyncio -async def test_dlq_retry_already_retried_message(scope: AsyncContainer) -> None: - """Test that retrying an already RETRIED message still succeeds at repository level. - - Note: The DLQManager.retry_message_manually guards against this, but the - repository method doesn't - it's a low-level operation that always succeeds. - """ - repository: DLQRepository = await scope.get(DLQRepository) - - # Create an already RETRIED document - event_id = f"dlq-already-retried-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.RETRIED) - - # Repository method still succeeds (no guard at this level) - result = await repository.mark_message_retried(event_id) - assert result is True - - # Status remains RETRIED - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.RETRIED - - -@pytest.mark.asyncio -async def test_dlq_retry_discarded_message(scope: AsyncContainer) -> None: - """Test that retrying a DISCARDED message still succeeds at repository level. - - Note: The DLQManager.retry_message_manually guards against this and returns False, - but the repository method is a low-level operation that doesn't validate transitions. - """ - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a DISCARDED document - event_id = f"dlq-discarded-retry-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.DISCARDED) - - # Repository method succeeds (transitions status back to RETRIED) - result = await repository.mark_message_retried(event_id) - assert result is True - - # Status is now RETRIED (repository doesn't guard transitions) - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.RETRIED - - -@pytest.mark.asyncio -async def test_dlq_discard_already_discarded_message(scope: AsyncContainer) -> None: - """Test that discarding an already DISCARDED message updates the reason.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create an already DISCARDED document - event_id = f"dlq-already-discarded-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.DISCARDED) - - # Discard again with a new reason - new_reason = "updated_discard_reason" - result = await repository.mark_message_discarded(event_id, new_reason) - assert result is True - - # Reason is updated - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.DISCARDED - assert doc.discard_reason == new_reason - - -@pytest.mark.asyncio -async def test_dlq_discard_retried_message(scope: AsyncContainer) -> None: - """Test that discarding a RETRIED message transitions to DISCARDED.""" - repository: DLQRepository = await scope.get(DLQRepository) - - # Create a RETRIED document - event_id = f"dlq-retried-discard-{uuid.uuid4().hex[:8]}" - await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.RETRIED) - - # Discard it - reason = "manual_cleanup" - result = await repository.mark_message_discarded(event_id, reason) - assert result is True - - # Status is now DISCARDED - doc = await DLQMessageDocument.find_one({"event.event_id": event_id}) - assert doc is not None - assert doc.status == DLQMessageStatus.DISCARDED - assert doc.discard_reason == reason diff --git a/backend/tests/e2e/events/test_consumer_group_monitor.py b/backend/tests/e2e/events/test_consumer_group_monitor.py deleted file mode 100644 index 97d45c21..00000000 --- a/backend/tests/e2e/events/test_consumer_group_monitor.py +++ /dev/null @@ -1,20 +0,0 @@ -import logging - -import pytest -from app.events.consumer_group_monitor import ConsumerGroupHealth, NativeConsumerGroupMonitor -from app.settings import Settings - -_test_logger = logging.getLogger("test.events.consumer_group_monitor") - - -@pytest.mark.e2e -@pytest.mark.kafka -@pytest.mark.asyncio -async def test_list_groups_and_error_status(test_settings: Settings) -> None: - mon = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger) - groups = await mon.list_consumer_groups() - assert isinstance(groups, list) - - # Query a non-existent group to exercise error handling with real AdminClient - status = await mon.get_consumer_group_status("nonexistent-group-for-tests") - assert status.health in {ConsumerGroupHealth.UNHEALTHY, ConsumerGroupHealth.UNKNOWN} diff --git a/backend/tests/e2e/events/test_consumer_group_monitor_real.py b/backend/tests/e2e/events/test_consumer_group_monitor_real.py deleted file mode 100644 index bc53591f..00000000 --- a/backend/tests/e2e/events/test_consumer_group_monitor_real.py +++ /dev/null @@ -1,106 +0,0 @@ -import logging -from uuid import uuid4 - -import pytest -from app.events.consumer_group_monitor import ( - ConsumerGroupHealth, - ConsumerGroupState, - ConsumerGroupStatus, - NativeConsumerGroupMonitor, -) -from app.settings import Settings - -pytestmark = [pytest.mark.e2e, pytest.mark.kafka] - -_test_logger = logging.getLogger("test.events.consumer_group_monitor_real") - - -@pytest.mark.asyncio -async def test_consumer_group_status_error_path_and_summary(test_settings: Settings) -> None: - monitor = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger) - # Non-existent group triggers error-handling path and returns minimal status - gid = f"does-not-exist-{uuid4().hex[:8]}" - status = await monitor.get_consumer_group_status(gid, include_lag=False) - assert status.group_id == gid - # Some clusters report non-existent groups as DEAD/UNKNOWN rather than raising - assert status.state in (ConsumerGroupState.DEAD, ConsumerGroupState.UNKNOWN) - assert status.health is ConsumerGroupHealth.UNHEALTHY - summary = monitor.get_health_summary(status) - assert summary["group_id"] == gid and summary["health"] == ConsumerGroupHealth.UNHEALTHY - - -def test_assess_group_health_branches(test_settings: Settings) -> None: - m = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger) - # Unknown state (triggers unhealthy) - s = ConsumerGroupStatus( - group_id="g", - state=ConsumerGroupState.UNKNOWN, - protocol="p", - protocol_type="ptype", - coordinator="c", - members=[], - member_count=0, - assigned_partitions=0, - partition_distribution={}, - total_lag=0, - ) - h, msg = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.UNHEALTHY and "unknown" in msg.lower() - - # Dead state - s.state = ConsumerGroupState.DEAD - h, msg = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.UNHEALTHY and "dead" in msg.lower() - - # Insufficient members - s.state = ConsumerGroupState.STABLE - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.UNHEALTHY - - # Rebalancing (preparing) - s.member_count = 1 - s.state = ConsumerGroupState.PREPARING_REBALANCE - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.DEGRADED - - # Rebalancing (completing) - s.state = ConsumerGroupState.COMPLETING_REBALANCE - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.DEGRADED - - # Empty group - s.state = ConsumerGroupState.EMPTY - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.DEGRADED - - # Critical lag - s.state = ConsumerGroupState.STABLE - s.total_lag = m.critical_lag_threshold + 1 - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.UNHEALTHY - - # Warning lag - s.total_lag = m.warning_lag_threshold + 1 - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.DEGRADED - - # Uneven partition distribution - s.total_lag = 0 - s.partition_distribution = {"m1": 10, "m2": 1} - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.DEGRADED - - # Healthy stable - s.partition_distribution = {"m1": 1, "m2": 1} - s.assigned_partitions = 2 - h, _ = m._assess_group_health(s) # noqa: SLF001 - assert h is ConsumerGroupHealth.HEALTHY - - -@pytest.mark.asyncio -async def test_multiple_group_status_mixed_errors(test_settings: Settings) -> None: - m = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger) - gids = [f"none-{uuid4().hex[:6]}", f"none-{uuid4().hex[:6]}"] - res = await m.get_multiple_group_status(gids, include_lag=False) - assert set(res.keys()) == set(gids) - assert all(v.health is ConsumerGroupHealth.UNHEALTHY for v in res.values()) diff --git a/backend/tests/e2e/events/test_producer_roundtrip.py b/backend/tests/e2e/events/test_producer_roundtrip.py index 773c7f3a..3a54f7b7 100644 --- a/backend/tests/e2e/events/test_producer_roundtrip.py +++ b/backend/tests/e2e/events/test_producer_roundtrip.py @@ -1,27 +1,19 @@ -import logging from uuid import uuid4 import pytest from app.events.core import UnifiedProducer -from app.infrastructure.kafka.mappings import get_topic_for_event from dishka import AsyncContainer from tests.conftest import make_execution_requested_event pytestmark = [pytest.mark.e2e, pytest.mark.kafka] -_test_logger = logging.getLogger("test.events.producer_roundtrip") - @pytest.mark.asyncio -async def test_unified_producer_produce_and_send_to_dlq( +async def test_unified_producer_produce( scope: AsyncContainer, ) -> None: prod: UnifiedProducer = await scope.get(UnifiedProducer) ev = make_execution_requested_event(execution_id=f"exec-{uuid4().hex[:8]}") await prod.produce(ev, key=ev.execution_id) - - # Exercise send_to_dlq path — should not raise - topic = str(get_topic_for_event(ev.event_type)) - await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1) diff --git a/backend/tests/e2e/services/idempotency/test_redis_repository.py b/backend/tests/e2e/services/idempotency/test_redis_repository.py index 79535328..71394494 100644 --- a/backend/tests/e2e/services/idempotency/test_redis_repository.py +++ b/backend/tests/e2e/services/idempotency/test_redis_repository.py @@ -121,10 +121,6 @@ async def test_insert_find_update_delete_flow( ttl_after = await redis_client.ttl(key) assert ttl_after == ttl or ttl_after <= ttl # ttl should not increase - # Delete - deleted = await repository.delete_key(sample_record.key) - assert deleted == 1 - assert await repository.find_by_key(sample_record.key) is None @pytest.mark.asyncio @@ -136,6 +132,3 @@ async def test_update_record_when_missing( assert res == 0 -@pytest.mark.asyncio -async def test_health_check(repository: RedisIdempotencyRepository) -> None: - await repository.health_check() # should not raise diff --git a/backend/tests/e2e/services/user_settings/test_user_settings_service.py b/backend/tests/e2e/services/user_settings/test_user_settings_service.py index 11a2dda9..c385b8b9 100644 --- a/backend/tests/e2e/services/user_settings/test_user_settings_service.py +++ b/backend/tests/e2e/services/user_settings/test_user_settings_service.py @@ -460,27 +460,6 @@ async def test_invalidate_cache(self, scope: AsyncContainer) -> None: settings = await svc.get_user_settings(user_id) assert settings.user_id == user_id -class TestResetUserSettings: - """Tests for reset_user_settings method.""" - - @pytest.mark.asyncio - async def test_reset_user_settings(self, scope: AsyncContainer) -> None: - """Reset user settings clears all data.""" - svc: UserSettingsService = await scope.get(UserSettingsService) - user_id = _unique_user_id() - - # Set some custom settings - await svc.update_theme(user_id, Theme.DARK) - await svc.update_custom_setting(user_id, "custom_key", "custom_value") - - # Reset - await svc.reset_user_settings(user_id) - - # Get fresh - should be defaults - settings = await svc.get_user_settings_fresh(user_id) - assert settings.theme == Theme.AUTO # Default - - class TestSettingsIntegration: """Integration tests for settings workflow.""" diff --git a/backend/tests/e2e/test_resource_cleaner.py b/backend/tests/e2e/test_resource_cleaner.py deleted file mode 100644 index 43086327..00000000 --- a/backend/tests/e2e/test_resource_cleaner.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio -from datetime import datetime - -import pytest -from app.services.result_processor import ResourceCleaner -from app.settings import Settings -from dishka import AsyncContainer -from kubernetes_asyncio import client as k8s_client - -pytestmark = [pytest.mark.e2e, pytest.mark.k8s] - - -@pytest.mark.asyncio -async def test_get_resource_usage(scope: AsyncContainer, test_settings: Settings) -> None: - resource_cleaner = await scope.get(ResourceCleaner) - usage = await resource_cleaner.get_resource_usage(namespace=test_settings.K8S_NAMESPACE) - assert set(usage.keys()) >= {"pods", "configmaps", "network_policies"} - - -@pytest.mark.asyncio -async def test_cleanup_orphaned_resources_dry_run(scope: AsyncContainer, test_settings: Settings) -> None: - resource_cleaner = await scope.get(ResourceCleaner) - cleaned = await resource_cleaner.cleanup_orphaned_resources( - namespace=test_settings.K8S_NAMESPACE, - max_age_hours=0, - dry_run=True, - ) - assert set(cleaned.keys()) >= {"pods", "configmaps", "pvcs"} - - -@pytest.mark.asyncio -async def test_cleanup_nonexistent_pod(scope: AsyncContainer, test_settings: Settings) -> None: - resource_cleaner = await scope.get(ResourceCleaner) - namespace = test_settings.K8S_NAMESPACE - nonexistent_pod = "integr8s-test-nonexistent-pod" - - # Use a local timeout variable with buffer for scheduler jitter - timeout = 2 # Reduced from 5s since non-existent resources return immediately (404) - jitter_buffer = 0.5 # Account for scheduler/GC pauses - - start_time = asyncio.get_running_loop().time() - await resource_cleaner.cleanup_pod_resources( - pod_name=nonexistent_pod, - namespace=namespace, - execution_id="test-exec-nonexistent", - timeout=timeout, - ) - elapsed = asyncio.get_running_loop().time() - start_time - - assert elapsed < timeout + jitter_buffer, ( - f"Cleanup took {elapsed:.2f}s, expected < {timeout + jitter_buffer}s for non-existent resources" - ) - - usage = await resource_cleaner.get_resource_usage(namespace=namespace) - assert isinstance(usage.get("pods", 0), int) - assert isinstance(usage.get("configmaps", 0), int) - - -@pytest.mark.asyncio -async def test_cleanup_orphaned_configmaps_dry_run(scope: AsyncContainer, test_settings: Settings) -> None: - api_client = await scope.get(k8s_client.ApiClient) - resource_cleaner = await scope.get(ResourceCleaner) - - v1 = k8s_client.CoreV1Api(api_client) - ns = test_settings.K8S_NAMESPACE - name = f"int-test-cm-{int(datetime.now().timestamp())}" - - metadata = k8s_client.V1ObjectMeta( - name=name, - labels={"app": "integr8s", "execution-id": "e-int-test"}, - ) - body = k8s_client.V1ConfigMap(metadata=metadata, data={"k": "v"}) - await v1.create_namespaced_config_map(namespace=ns, body=body) - - try: - res = await resource_cleaner.cleanup_orphaned_resources(namespace=ns, max_age_hours=0, dry_run=True) - assert any(name == cm for cm in res.get("configmaps", [])), ( - f"Expected ConfigMap '{name}' to be detected as orphan candidate" - ) - finally: - try: - await v1.delete_namespaced_config_map(name=name, namespace=ns) - except Exception: - pass diff --git a/backend/tests/unit/schemas_pydantic/test_notification_schemas.py b/backend/tests/unit/schemas_pydantic/test_notification_schemas.py index 16b58e3a..8971a09e 100644 --- a/backend/tests/unit/schemas_pydantic/test_notification_schemas.py +++ b/backend/tests/unit/schemas_pydantic/test_notification_schemas.py @@ -2,7 +2,7 @@ import pytest from app.domain.enums import NotificationChannel, NotificationSeverity, NotificationStatus -from app.schemas_pydantic.notification import Notification, NotificationBatch +from app.schemas_pydantic.notification import Notification def test_notification_scheduled_for_must_be_future() -> None: @@ -25,16 +25,3 @@ def test_notification_scheduled_for_must_be_future() -> None: body="y", scheduled_for=datetime.now(UTC) - timedelta(seconds=1), ) - - -def test_notification_batch_validation_limits() -> None: - n1 = Notification(user_id="u1", channel=NotificationChannel.IN_APP, subject="a", body="b") - ok = NotificationBatch(notifications=[n1]) - assert ok.processed_count == 0 - - with pytest.raises(ValueError): - NotificationBatch(notifications=[]) - - many = [n1.model_copy() for _ in range(1001)] - with pytest.raises(ValueError): - NotificationBatch(notifications=many) diff --git a/backend/tests/unit/services/saga/test_saga_comprehensive.py b/backend/tests/unit/services/saga/test_saga_comprehensive.py index a532e5be..180475dd 100644 --- a/backend/tests/unit/services/saga/test_saga_comprehensive.py +++ b/backend/tests/unit/services/saga/test_saga_comprehensive.py @@ -37,14 +37,6 @@ def _req_event() -> ExecutionRequestedEvent: return make_execution_requested_event(execution_id="e1", script="print('x')") -def test_saga_context_public_filtering() -> None: - ctx = SagaContext("s1", "e1") - ctx.set("public", 1) - ctx.set("_private", 2) - out = ctx.to_public_dict() - assert "public" in out and "_private" not in out - - @pytest.mark.asyncio async def test_step_success_and_compensation_chain() -> None: ctx = SagaContext("s1", "e1") diff --git a/backend/tests/unit/services/saga/test_saga_step_and_base.py b/backend/tests/unit/services/saga/test_saga_step_and_base.py index 33eea711..a7de8a51 100644 --- a/backend/tests/unit/services/saga/test_saga_step_and_base.py +++ b/backend/tests/unit/services/saga/test_saga_step_and_base.py @@ -5,28 +5,6 @@ pytestmark = pytest.mark.unit -def test_saga_context_public_dict_filters_and_encodes() -> None: - ctx = SagaContext("s1", "e1") - ctx.set("a", 1) - ctx.set("b", {"x": 2}) - ctx.set("c", [1, 2, 3]) - ctx.set("_private", {"won't": "leak"}) - - # Complex non-JSON object -> should be dropped - class X: - pass - - ctx.set("complex", X()) - # Nested complex objects get encoded by jsonable_encoder - # The nested dict with a complex object gets partially encoded - ctx.set("nested", {"ok": 1, "bad": X()}) - - d = ctx.to_public_dict() - # jsonable_encoder converts unknown objects to {}, which is still considered "simple" - # so they pass through the filter - assert d == {"a": 1, "b": {"x": 2}, "c": [1, 2, 3], "complex": {}, "nested": {"ok": 1, "bad": {}}} - - class _DummyComp(CompensationStep): def __init__(self) -> None: super().__init__("dummy") From 2331ef6dc5ef590f3025f8c7a73592bf3256a017 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 23:02:47 +0100 Subject: [PATCH 4/7] fix: tests issue --- .../tests/e2e/services/idempotency/test_redis_repository.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/tests/e2e/services/idempotency/test_redis_repository.py b/backend/tests/e2e/services/idempotency/test_redis_repository.py index 71394494..40ebe0a4 100644 --- a/backend/tests/e2e/services/idempotency/test_redis_repository.py +++ b/backend/tests/e2e/services/idempotency/test_redis_repository.py @@ -121,6 +121,9 @@ async def test_insert_find_update_delete_flow( ttl_after = await redis_client.ttl(key) assert ttl_after == ttl or ttl_after <= ttl # ttl should not increase + # Clean up + await redis_client.delete(key) + assert await repository.find_by_key(sample_record.key) is None @pytest.mark.asyncio From a24205215e481122b0cfe236d121f48e089e4d80 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 23:27:16 +0100 Subject: [PATCH 5/7] =?UTF-8?q?=20=201.=20ownerReference=20on=20ConfigMap?= =?UTF-8?q?=20=E2=86=92=20Pod=20=E2=80=94=20after=20creating=20the=20pod,?= =?UTF-8?q?=20patches=20the=20ConfigMap=20with=20an=20ownerReference=20poi?= =?UTF-8?q?nting=20to=20the=20pod's=20UID.=20K8s=20garbage=20collector=20a?= =?UTF-8?q?uto-deletes=20the=20ConfigMap=20when=20the=20pod=20is=20deleted?= =?UTF-8?q?=20(by=20anyone=20=E2=80=94=20saga=20=20=20compensation,=20manu?= =?UTF-8?q?al=20kubectl=20delete,=20namespace=20cleanup,=20etc.)=20=20=202?= =?UTF-8?q?.=20Simplified=20handle=5Fdelete=5Fpod=5Fcommand=20=E2=80=94=20?= =?UTF-8?q?only=20deletes=20the=20pod=20now;=20ConfigMap=20cleanup=20is=20?= =?UTF-8?q?handled=20by=20K8s=20GC=20=20=203.=20Removed=20networking=5Fv1?= =?UTF-8?q?=20=E2=80=94=20unused=20client=20(PR=20review=20comment)=20=20?= =?UTF-8?q?=204.=20=5Fcreate=5Fpod=20returns=20the=20created=20pod=20?= =?UTF-8?q?=E2=80=94=20needed=20to=20get=20the=20pod=20UID=20for=20the=20o?= =?UTF-8?q?wnerReference?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/core/container.py | 2 - backend/app/core/providers.py | 12 +- backend/app/services/k8s_worker/worker.py | 62 +++++++--- .../app/services/result_processor/__init__.py | 2 - .../result_processor/resource_cleaner.py | 108 ------------------ 5 files changed, 48 insertions(+), 138 deletions(-) delete mode 100644 backend/app/services/result_processor/resource_cleaner.py diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 42aef885..6f8bd90f 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -19,7 +19,6 @@ PodMonitorProvider, RedisProvider, RepositoryProvider, - ResourceCleanerProvider, ResultProcessorProvider, SagaOrchestratorProvider, SettingsProvider, @@ -59,7 +58,6 @@ def create_app_container(settings: Settings) -> AsyncContainer: BusinessServicesProvider(), CoordinatorProvider(), KubernetesProvider(), - ResourceCleanerProvider(), FastapiProvider(), context={Settings: settings}, ) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index d67f3c23..c07885fe 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -62,7 +62,7 @@ from app.services.notification_service import NotificationService from app.services.pod_monitor import PodEventMapper, PodMonitor, PodMonitorConfig from app.services.rate_limit_service import RateLimitService -from app.services.result_processor import ResourceCleaner, ResultProcessor +from app.services.result_processor import ResultProcessor from app.services.runtime_settings import RuntimeSettingsLoader from app.services.saga import SagaOrchestrator, SagaService from app.services.saved_script_service import SavedScriptService @@ -291,16 +291,6 @@ async def get_api_client( await api_client.close() -class ResourceCleanerProvider(Provider): - scope = Scope.APP - - @provide - def get_resource_cleaner( - self, api_client: k8s_client.ApiClient, logger: structlog.stdlib.BoundLogger - ) -> ResourceCleaner: - return ResourceCleaner(api_client=api_client, logger=logger) - - class MetricsProvider(Provider): """Provides all metrics instances via DI (no contextvars needed).""" diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py index 53f6cd4e..e5429eab 100644 --- a/backend/app/services/k8s_worker/worker.py +++ b/backend/app/services/k8s_worker/worker.py @@ -57,7 +57,6 @@ def __init__( # Kubernetes clients created from ApiClient self.v1 = k8s_client.CoreV1Api(api_client) - self.networking_v1 = k8s_client.NetworkingV1Api(api_client) self.apps_v1 = k8s_client.AppsV1Api(api_client) # Components @@ -82,30 +81,28 @@ async def handle_create_pod_command(self, command: CreatePodCommandEvent) -> Non await self._create_pod_for_execution(command) async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> None: - """Handle delete pod command from saga orchestrator (compensation)""" + """Handle delete pod command from saga orchestrator (compensation). + + Deleting the pod is sufficient — the ConfigMap has an ownerReference pointing + to the pod, so K8s garbage-collects it automatically. + """ execution_id = command.execution_id self.logger.info(f"Deleting pod for execution {execution_id} due to: {command.reason}") try: - # Delete the pod pod_name = f"executor-{execution_id}" await self.v1.delete_namespaced_pod( name=pod_name, namespace=self._settings.K8S_NAMESPACE, grace_period_seconds=30, ) - self.logger.info(f"Successfully deleted pod {pod_name}") - - # Delete associated ConfigMap - configmap_name = f"script-{execution_id}" - await self.v1.delete_namespaced_config_map(name=configmap_name, namespace=self._settings.K8S_NAMESPACE) - self.logger.info(f"Successfully deleted ConfigMap {configmap_name}") + self.logger.info(f"Successfully deleted pod {pod_name} (ConfigMap will be GC'd by K8s)") except ApiException as e: if e.status == 404: - self.logger.warning(f"Resources for execution {execution_id} not found (may have already been deleted)") + self.logger.warning(f"Pod for execution {execution_id} not found (may have already been deleted)") else: - self.logger.error(f"Failed to delete resources for execution {execution_id}: {e}") + self.logger.error(f"Failed to delete pod for execution {execution_id}: {e}") async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None: """Create pod for execution""" @@ -128,7 +125,11 @@ async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> Non await self._create_config_map(config_map) pod = self.pod_builder.build_pod_manifest(command=command) - await self._create_pod(pod) + created_pod = await self._create_pod(pod) + + # Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted + if created_pod and created_pod.metadata and created_pod.metadata.uid: + await self._set_configmap_owner(config_map, created_pod) # Publish PodCreated event await self._publish_pod_created(command, pod) @@ -189,17 +190,48 @@ async def _create_config_map(self, config_map: k8s_client.V1ConfigMap) -> None: self.metrics.record_k8s_config_map_created("failed") raise - async def _create_pod(self, pod: k8s_client.V1Pod) -> None: - """Create Pod in Kubernetes""" + async def _create_pod(self, pod: k8s_client.V1Pod) -> k8s_client.V1Pod | None: + """Create Pod in Kubernetes. Returns the created pod (with UID) or None if it already existed.""" try: - await self.v1.create_namespaced_pod(namespace=self._settings.K8S_NAMESPACE, body=pod) + created: k8s_client.V1Pod = await self.v1.create_namespaced_pod( + namespace=self._settings.K8S_NAMESPACE, body=pod + ) self.logger.debug(f"Created Pod {pod.metadata.name}") + return created except ApiException as e: if e.status == 409: # Already exists self.logger.warning(f"Pod {pod.metadata.name} already exists") + return None else: raise + async def _set_configmap_owner( + self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod + ) -> None: + """Patch the ConfigMap with an ownerReference pointing to the pod. + + This makes K8s garbage-collect the ConfigMap automatically when the pod is deleted. + """ + owner_ref = k8s_client.V1OwnerReference( + api_version="v1", + kind="Pod", + name=owner_pod.metadata.name, + uid=owner_pod.metadata.uid, + block_owner_deletion=False, + ) + patch_body = {"metadata": {"ownerReferences": [owner_ref]}} + try: + await self.v1.patch_namespaced_config_map( + name=config_map.metadata.name, + namespace=self._settings.K8S_NAMESPACE, + body=patch_body, + ) + self.logger.debug( + f"Set ownerReference on ConfigMap {config_map.metadata.name} -> Pod {owner_pod.metadata.name}" + ) + except ApiException as e: + self.logger.warning(f"Failed to set ownerReference on ConfigMap: {e.reason}") + async def _publish_pod_created(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None: """Publish pod created event""" event = PodCreatedEvent( diff --git a/backend/app/services/result_processor/__init__.py b/backend/app/services/result_processor/__init__.py index 278aec0a..58e6d26f 100644 --- a/backend/app/services/result_processor/__init__.py +++ b/backend/app/services/result_processor/__init__.py @@ -1,7 +1,5 @@ from app.services.result_processor.processor import ResultProcessor -from app.services.result_processor.resource_cleaner import ResourceCleaner __all__ = [ "ResultProcessor", - "ResourceCleaner", ] diff --git a/backend/app/services/result_processor/resource_cleaner.py b/backend/app/services/result_processor/resource_cleaner.py deleted file mode 100644 index 92b57a06..00000000 --- a/backend/app/services/result_processor/resource_cleaner.py +++ /dev/null @@ -1,108 +0,0 @@ -from datetime import datetime, timezone -from typing import Any - -import structlog -from kubernetes_asyncio import client as k8s_client -from kubernetes_asyncio.client.rest import ApiException - -# Python 3.12 type aliases -type ResourceDict = dict[str, list[str]] - - -class ResourceCleaner: - """Service for cleaning up Kubernetes resources. - - Accepts ApiClient via dependency injection for proper configuration management. - """ - - def __init__(self, api_client: k8s_client.ApiClient, logger: structlog.stdlib.BoundLogger) -> None: - self.v1 = k8s_client.CoreV1Api(api_client) - self.networking_v1 = k8s_client.NetworkingV1Api(api_client) - self.logger = logger - - async def _delete_pod(self, pod_name: str, namespace: str) -> None: - """Delete a pod""" - try: - await self.v1.read_namespaced_pod(pod_name, namespace) - await self.v1.delete_namespaced_pod(pod_name, namespace, grace_period_seconds=30) - self.logger.info(f"Deleted pod: {pod_name}") - - except ApiException as e: - if e.status == 404: - self.logger.info(f"Pod {pod_name} already deleted") - else: - self.logger.error(f"Failed to delete pod: {e}") - raise - - async def _delete_configmaps(self, execution_id: str, namespace: str) -> None: - """Delete ConfigMaps for an execution""" - await self._delete_labeled_resources( - execution_id, - namespace, - self.v1.list_namespaced_config_map, - self.v1.delete_namespaced_config_map, - "ConfigMap", - ) - - async def _delete_pvcs(self, execution_id: str, namespace: str) -> None: - """Delete PersistentVolumeClaims for an execution""" - await self._delete_labeled_resources( - execution_id, - namespace, - self.v1.list_namespaced_persistent_volume_claim, - self.v1.delete_namespaced_persistent_volume_claim, - "PVC", - ) - - async def _delete_labeled_resources( - self, execution_id: str, namespace: str, list_func: Any, delete_func: Any, resource_type: str - ) -> None: - """Generic function to delete labeled resources""" - try: - label_selector = f"execution-id={execution_id}" - resources = await list_func(namespace, label_selector=label_selector) - - for resource in resources.items: - await delete_func(resource.metadata.name, namespace) - self.logger.info(f"Deleted {resource_type}: {resource.metadata.name}") - - except ApiException as e: - self.logger.error(f"Failed to delete {resource_type}s: {e}") - - async def _cleanup_orphaned_pods( - self, namespace: str, cutoff_time: datetime, cleaned: ResourceDict, dry_run: bool - ) -> None: - """Clean up orphaned pods""" - pods = await self.v1.list_namespaced_pod(namespace, label_selector="app=integr8s") - - terminal_phases = {"Succeeded", "Failed", "Unknown"} - - for pod in pods.items: - if ( - pod.metadata.creation_timestamp.replace(tzinfo=timezone.utc) < cutoff_time - and pod.status.phase in terminal_phases - ): - cleaned["pods"].append(pod.metadata.name) - - if not dry_run: - try: - await self._delete_pod(pod.metadata.name, namespace) - except Exception as e: - self.logger.error(f"Failed to delete orphaned pod {pod.metadata.name}: {e}") - - async def _cleanup_orphaned_configmaps( - self, namespace: str, cutoff_time: datetime, cleaned: ResourceDict, dry_run: bool - ) -> None: - """Clean up orphaned ConfigMaps""" - configmaps = await self.v1.list_namespaced_config_map(namespace, label_selector="app=integr8s") - - for cm in configmaps.items: - if cm.metadata.creation_timestamp.replace(tzinfo=timezone.utc) < cutoff_time: - cleaned["configmaps"].append(cm.metadata.name) - - if not dry_run: - try: - await self.v1.delete_namespaced_config_map(cm.metadata.name, namespace) - except Exception as e: - self.logger.error(f"Failed to delete orphaned ConfigMap {cm.metadata.name}: {e}") - From 9761f90ac91a53bade49341d852fbf9d1caad899 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 23:28:22 +0100 Subject: [PATCH 6/7] =?UTF-8?q?Deleted=20test=5Fproducer=5Froundtrip.py=20?= =?UTF-8?q?=E2=80=94=20it=20was=20just=20await=20produce()=20with=20zero?= =?UTF-8?q?=20assertions=20after=20the=20DLQ=20path=20was=20removed.=20No?= =?UTF-8?q?=20roundtrip,=20no=20verification,=20no=20value.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../e2e/events/test_producer_roundtrip.py | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 backend/tests/e2e/events/test_producer_roundtrip.py diff --git a/backend/tests/e2e/events/test_producer_roundtrip.py b/backend/tests/e2e/events/test_producer_roundtrip.py deleted file mode 100644 index 3a54f7b7..00000000 --- a/backend/tests/e2e/events/test_producer_roundtrip.py +++ /dev/null @@ -1,19 +0,0 @@ -from uuid import uuid4 - -import pytest -from app.events.core import UnifiedProducer -from dishka import AsyncContainer - -from tests.conftest import make_execution_requested_event - -pytestmark = [pytest.mark.e2e, pytest.mark.kafka] - - -@pytest.mark.asyncio -async def test_unified_producer_produce( - scope: AsyncContainer, -) -> None: - prod: UnifiedProducer = await scope.get(UnifiedProducer) - - ev = make_execution_requested_event(execution_id=f"exec-{uuid4().hex[:8]}") - await prod.produce(ev, key=ev.execution_id) From 153392ef69e947eeb49428c258e78033a7484985 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 13 Feb 2026 23:49:02 +0100 Subject: [PATCH 7/7] readme and docs update --- README.md | 58 ++++++++++++++++----- docs/architecture/domain-exceptions.md | 3 +- docs/architecture/model-conversion.md | 3 +- docs/architecture/services-overview.md | 6 +-- docs/components/dead-letter-queue.md | 54 +++++-------------- docs/components/workers/k8s_worker.md | 4 +- docs/components/workers/result_processor.md | 4 +- docs/operations/cicd.md | 8 ++- docs/operations/tracing.md | 2 +- 9 files changed, 75 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index 4ed1a1ab..7b870e78 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,9 @@ Security Scan Status + + Dead Code Check + Docker Scan Status @@ -57,29 +60,56 @@ things safe and efficient. You'll get the results back in no time.
How to deploy -1. Clone this repository -2. Check if docker is enabled, kubernetes is running and kubectl is installed -3. `docker-compose up --build` +### Prerequisites + +- Docker and Docker Compose +- Kubernetes cluster (k3s, Docker Desktop K8s, or minikube) with `kubectl` configured + +### Quick start + +```bash +git clone https://github.com/HardMax71/Integr8sCode.git +cd Integr8sCode +cp backend/secrets.example.toml backend/secrets.toml +./deploy.sh dev +``` -- Frontend: `https://127.0.0.1:5001/` -- Backend: `https://127.0.0.1:443/` - - To check if it works, you can use `curl -k https://127.0.0.1/api/v1/k8s-limits`, should return JSON with current limits -- Grafana: `http://127.0.0.1:3000` (login - `admin`, pw - `admin123`) +The `secrets.toml` file holds credentials and is gitignored. The example template has working development defaults. +### Verify -You may also find out that k8s doesn't capture metrics (`CPU` and `Memory` params are `null`), it may well be that metrics server -for k8s is turned off/not enabled. To enable, execute: ```bash -kubectl create -f https://raw.githubusercontent.com/pythianarora/total-practice/master/sample-kubernetes-code/metrics-server.yaml +curl -k https://localhost/api/v1/health/live ``` -and test output by writing `kubectl top node` in console, should output sth like: +### Access + +| Service | URL | +|--------------------|--------------------------------------------------------| +| Frontend | [https://localhost:5001](https://localhost:5001) | +| Backend API | [https://localhost:443](https://localhost:443) | +| Kafdrop (Kafka UI) | [http://localhost:9000](http://localhost:9000) | +| Grafana | [http://localhost:3000](http://localhost:3000) | +| Jaeger (Tracing) | [http://localhost:16686](http://localhost:16686) | + +Default credentials: `user` / `user123` (regular), `admin` / `admin123` (admin). + +Self-signed TLS certs are generated automatically — accept the browser warning. + +### Run tests + +```bash +./deploy.sh test ``` -PS C:\Users\User\Desktop\Integr8sCode> kubectl top node -NAME CPU(cores) CPU% MEMORY(bytes) MEMORY% -docker-desktop 267m 3% 4732Mi 29% + +### Stop + +```bash +./deploy.sh down ``` +See the [full deployment guide](https://hardmax71.github.io/Integr8sCode/operations/deployment/) for Docker build strategy, troubleshooting, pre-built images, and more. +
diff --git a/docs/architecture/domain-exceptions.md b/docs/architecture/domain-exceptions.md index 4b10cfb2..87b4dd44 100644 --- a/docs/architecture/domain-exceptions.md +++ b/docs/architecture/domain-exceptions.md @@ -61,7 +61,6 @@ DomainError │ └── SagaInvalidStateError └── InfrastructureError ├── EventPublishError - ├── SagaCompensationError ├── SagaTimeoutError └── ReplayOperationError ``` @@ -74,7 +73,7 @@ Domain exceptions live in their respective domain modules: |--------------|-----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| | Base | `app/domain/exceptions.py` | `DomainError`, `NotFoundError`, `ValidationError`, etc. | | Execution | `app/domain/execution/exceptions.py` | `ExecutionNotFoundError`, `RuntimeNotSupportedError`, `EventPublishError` | -| Saga | `app/domain/saga/exceptions.py` | `SagaNotFoundError`, `SagaAccessDeniedError`, `SagaInvalidStateError`, `SagaCompensationError`, `SagaTimeoutError`, `SagaConcurrencyError` | +| Saga | `app/domain/saga/exceptions.py` | `SagaNotFoundError`, `SagaAccessDeniedError`, `SagaInvalidStateError`, `SagaTimeoutError`, `SagaConcurrencyError` | | Notification | `app/domain/notification/exceptions.py` | `NotificationNotFoundError`, `NotificationThrottledError`, `NotificationValidationError` | | Saved Script | `app/domain/saved_script/exceptions.py` | `SavedScriptNotFoundError` | | Replay | `app/domain/replay/exceptions.py` | `ReplaySessionNotFoundError`, `ReplayOperationError` | diff --git a/docs/architecture/model-conversion.md b/docs/architecture/model-conversion.md index a9a1e03d..2b23486d 100644 --- a/docs/architecture/model-conversion.md +++ b/docs/architecture/model-conversion.md @@ -160,8 +160,7 @@ Avoid approaches that scatter conversion logic or couple layers incorrectly. | Conversion logic in models | Scatters boundary logic; keep it in repositories/services | Thin wrappers that delegate to `model_dump()` with specific options are fine. For example, `BaseEvent.to_dict()` applies -`by_alias=True, mode="json"` consistently across all events. Methods with additional behavior like filtering private -keys (`to_public_dict()`) are also acceptable—the anti-pattern is manually listing fields. +`by_alias=True, mode="json"` consistently across all events—the anti-pattern is manually listing fields. ## Quick reference diff --git a/docs/architecture/services-overview.md b/docs/architecture/services-overview.md index 2a26bedb..5892dace 100644 --- a/docs/architecture/services-overview.md +++ b/docs/architecture/services-overview.md @@ -20,7 +20,7 @@ The k8s_worker/ module runs worker.py, a long-running service that consumes SAGA The pod_monitor/ module has monitor.py and event_mapper.py which watch K8s Pod/Container status, map them into domain events with helpful metadata like exit codes, failure reasons, and stdout/stderr slices, then publish into EXECUTION_EVENTS. This decouples what the cluster did from what the system emits so clients always see consistent event shapes. See [Pod Monitor](../components/workers/pod_monitor.md) for details. -The result_processor/ module runs processor.py which consumes terminal events, persists results, normalizes error types, and always records metrics by error type. The resource_cleaner.py deletes the per-execution pod and ConfigMap after completion. See [Result Processor](../components/workers/result_processor.md) for details. +The result_processor/ module runs processor.py which consumes terminal events, persists results, normalizes error types, and always records metrics by error type. See [Result Processor](../components/workers/result_processor.md) for details. ## Event and streaming services @@ -56,7 +56,7 @@ The Saga Orchestrator is a stateful choreographer for execution lifecycle. It su The K8s Worker materializes saga commands into K8s resources. It consumes SAGA_COMMANDS and creates ConfigMap (script, entrypoint) and Pod (hardened), relying on CiliumNetworkPolicy deny-all applied to the namespace rather than per-exec policies. Pod spec disables DNS, drops caps, runs non-root, no SA token. It publishes PodCreated and ExecutionStarted events, or errors when creation fails. -The Result Processor persists terminal execution outcomes, updates metrics, and triggers cleanup. It consumes EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT, writes DB records for status, outputs, errors, and usage, records metrics for errors by type and durations, and invokes ResourceCleaner to delete pods and configmaps. +The Result Processor persists terminal execution outcomes and updates metrics. It consumes EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT, writes DB records for status, outputs, errors, and usage, and records metrics for errors by type and durations. Kubernetes resource cleanup (pods and ConfigMaps) is handled automatically via ownerReference — the ConfigMap is owned by the pod, so K8s garbage-collects both when the pod is deleted (by saga compensation or manual cleanup). The Pod Monitor observes K8s pod state and translates to domain events. It watches CoreV1 Pod events and publishes EXECUTION_EVENTS for running, container started, logs tail, etc., adding useful metadata and best-effort failure analysis. @@ -64,7 +64,7 @@ The Coordinator owns the admission/queuing policy and sets priorities. It intera The Event Replay worker re-emits stored events to debug or rebuild projections, taking DB/event store and filters as inputs and outputting replayed events on regular topics with provenance markers. -The DLQ Processor drains and retries dead-lettered messages with backoff and visibility, taking DLQ topic and retry policies as inputs and outputting successful re-publishes or parked messages with audit trail. See [Dead Letter Queue](../components/dead-letter-queue.md) for more on DLQ handling. +The DLQ Processor retries dead-lettered messages with backoff and visibility. Failed messages are persisted directly to MongoDB (no DLQ Kafka topic). The processor is APScheduler-based, periodically checking for retryable messages and republishing them via a manually started broker. See [Dead Letter Queue](../components/dead-letter-queue.md) for more on DLQ handling. ## Operational notes diff --git a/docs/components/dead-letter-queue.md b/docs/components/dead-letter-queue.md index 9881594a..4201c3fc 100644 --- a/docs/components/dead-letter-queue.md +++ b/docs/components/dead-letter-queue.md @@ -4,46 +4,25 @@ Picture this: your Kafka consumer is happily processing events when suddenly it hits a poison pill - maybe a malformed event, a database outage, or just a bug in your code. Without a Dead Letter Queue (DLQ), that event would either block your entire consumer (if you keep retrying forever) or get lost forever (if you skip it). Neither option is great for an event-sourced system where events are your source of truth. -The DLQ acts as a safety net. When an event fails processing after a reasonable number of retries, instead of losing it, we send it to a special "dead letter" topic where it can be examined, fixed, and potentially replayed later. +The DLQ acts as a safety net. When an event fails processing after a reasonable number of retries, instead of losing it, we persist it to MongoDB where it can be examined, fixed, and potentially replayed later. ## How it works -The DLQ implementation in Integr8sCode follows a producer-agnostic pattern. Producers can route failed events to the DLQ; a dedicated DLQ manager/processor consumes DLQ messages, persists them, and applies retry/discard policies. Here's how the pieces fit together: +The DLQ implementation in Integr8sCode persists failed messages directly to MongoDB (no DLQ Kafka topic). The DLQ manager handles persistence and the DLQ processor worker retries messages on a schedule. -### Producer side +### Failure handling -Every `UnifiedProducer` instance has a `send_to_dlq()` method that knows how to package up a failed event with all its context - the original topic, error message, retry count, and metadata about when and where it failed. When called, it creates a special DLQ message and sends it to the `dead_letter_queue` topic in Kafka. - -The beauty here is that the producer doesn't make decisions about *when* to send something to DLQ - it just provides the mechanism. The decision-making happens at a higher level. - -### Consumer side - -When event handling fails in normal consumers, producers may call `send_to_dlq()` to persist failure context. The DLQ manager is the single component that reads the DLQ topic and orchestrates retries according to policy. - -For example, the event store consumer sets up its error handling like this: - -```python -if self.producer: - dlq_handler = create_dlq_error_handler( - producer=self.producer, - original_topic="event-store", - max_retries=3 - ) - self.consumer.register_error_callback(dlq_handler) -``` - -This handler tracks retry counts per event. If an event fails 3 times, it gets sent to DLQ. The consumer itself doesn't know about any of this - it just calls the error callback and moves on. +When event handling fails in consumers, the DLQ manager (`app/dlq/manager.py`) packages the failed event with all its context — the original topic, error message, retry count, and metadata about when and where it failed — and persists it directly to MongoDB. ### DLQ processor -The `run_dlq_processor` is a separate service that monitors the dead letter queue topic. It's responsible for the retry orchestration. When it sees a message in the DLQ, it applies topic-specific retry policies to determine when (or if) to retry sending that message back to its original topic. +The `run_dlq_processor` is a separate APScheduler-based worker that periodically checks MongoDB for retryable DLQ messages. When it finds eligible messages, it republishes them to their original Kafka topics via a manually started broker. Different topics have different retry strategies configured: - **Execution requests** get aggressive retries with exponential backoff - these are critical user operations -- **Pod events** get fewer retries with longer delays - these are less critical monitoring events +- **Pod events** get fewer retries with longer delays - these are less critical monitoring events - **Resource allocation** events get immediate retries - these need quick resolution -- **WebSocket events** use fixed intervals - these are real-time updates that become stale quickly The processor also implements safety features like: - Maximum age checks (messages older than 7 days are discarded) @@ -56,22 +35,20 @@ The processor also implements safety features like: graph TD Consumer[Consumer] -->|event fails| Handler[Error Handler] Handler -->|retries < limit| Kafka[(Kafka redeliver)] - Handler -->|retries >= limit| Producer[Producer] - Producer -->|send_to_dlq| DLQTopic[(dead_letter_queue)] - DLQTopic --> Processor[DLQ Processor] - Processor -->|check policy| Decision{retry?} - Decision -->|yes, after delay| Original[(Original Topic)] - Decision -->|max attempts| Archive[(Archive)] + Handler -->|retries >= limit| DLQManager[DLQ Manager] + DLQManager -->|persist| MongoDB[(MongoDB)] + Processor[DLQ Processor
APScheduler] -->|poll| MongoDB + Processor -->|retry via broker| Original[(Original Topic)] Original -->|reprocess| Consumer ``` -When a consumer fails to process an event, it invokes the registered error callback. The DLQ handler tracks how many times this specific event has failed. If the count is under the retry limit, the handler simply logs and returns, letting Kafka redeliver the message on its next poll. Once the retry limit is exceeded, the handler calls `producer.send_to_dlq()`, which packages the event together with failure context (original topic, error message, retry count, timestamps) and publishes it to the `dead_letter_queue` topic. +When a consumer fails to process an event, it invokes the registered error callback. The DLQ handler tracks how many times this specific event has failed. If the count is under the retry limit, the handler simply logs and returns, letting Kafka redeliver the message on its next poll. Once the retry limit is exceeded, the DLQ manager persists the message with full failure context (original topic, error message, retry count, timestamps) directly to MongoDB. -The DLQ processor service consumes from this topic and applies topic-specific retry policies. Depending on the policy, it either schedules the message for redelivery to its original topic after an appropriate delay, or archives it if maximum attempts have been exhausted. When redelivered, the message goes back through normal consumer processing. If it fails again, the cycle repeats until either success or final archival +The DLQ processor worker runs on an APScheduler schedule, querying MongoDB for retryable messages. Depending on the retry policy, it either republishes the message to its original Kafka topic via a manually started broker, or archives it if maximum attempts have been exhausted. ## Configuration -The DLQ system is configured through environment variables in the `dlq-processor` service: +The DLQ system is configured through settings: - `DLQ_MAX_RETRY_ATTEMPTS`: Global maximum retries (default: 5) - `DLQ_RETRY_DELAY_HOURS`: Base delay between retries (default: 1 hour) @@ -82,9 +59,7 @@ Each topic can override these with custom retry policies in the DLQ processor co ## Failure modes -If the DLQ processor itself fails, messages stay safely in the `dead_letter_queue` topic - Kafka acts as the durable buffer. When the processor restarts, it picks up where it left off. - -If sending to DLQ fails (extremely rare - would mean Kafka is down), the producer logs a critical error but doesn't crash the consumer. This follows the principle that it's better to lose one message than to stop processing everything. +If the DLQ processor itself fails, messages stay safely in MongoDB. When the processor restarts, it picks up where it left off. The system is designed to be resilient but not perfect. In catastrophic scenarios, you still have Kafka's built-in durability and the ability to replay topics from the beginning if needed. @@ -94,5 +69,4 @@ The system is designed to be resilient but not perfect. In catastrophic scenario |--------------------------------------------------------------------------------------------------------------------------|------------------------| | [`run_dlq_processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_dlq_processor.py) | DLQ processor worker | | [`manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/dlq/manager.py) | DLQ management logic | -| [`unified_producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/unified_producer.py) | `send_to_dlq()` method | | [`dlq.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/api/routes/dlq.py) | Admin API routes | diff --git a/docs/components/workers/k8s_worker.md b/docs/components/workers/k8s_worker.md index 4c44124c..f2ef2d69 100644 --- a/docs/components/workers/k8s_worker.md +++ b/docs/components/workers/k8s_worker.md @@ -19,9 +19,11 @@ it builds a complete pod specification including: - A ConfigMap containing the user's script and an entrypoint script - A Pod manifest with hardened security context - Proper labels for tracking and network policy matching +- An ownerReference on the ConfigMap pointing to the pod, so K8s garbage-collects the ConfigMap when the pod is deleted After creating resources, it publishes `PodCreated` and `ExecutionStarted` events so the rest of the system knows -the execution has begun. +the execution has begun. For `DeletePodCommand` (saga compensation), only the pod needs to be deleted — K8s +automatically cleans up the owned ConfigMap. ## Pod security diff --git a/docs/components/workers/result_processor.md b/docs/components/workers/result_processor.md index b94fb005..13c37a74 100644 --- a/docs/components/workers/result_processor.md +++ b/docs/components/workers/result_processor.md @@ -24,8 +24,7 @@ execution has finished and results are available. ## Resource cleanup -The processor also handles cleanup after executions complete. The [`resource_cleaner.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/resource_cleaner.py) -module deletes ConfigMaps and pods that are no longer needed, keeping the Kubernetes namespace tidy. +Kubernetes resource cleanup is handled via ownerReference — the K8s worker sets an ownerReference on each ConfigMap pointing to its pod. When the pod is deleted (by saga compensation or manual cleanup), K8s garbage-collects the ConfigMap automatically. The result processor itself does not perform resource cleanup. ## Key files @@ -33,7 +32,6 @@ module deletes ConfigMaps and pods that are no longer needed, keeping the Kubern |--------------------------------------------------------------------------------------------------------------------------------|------------------| | [`run_result_processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_result_processor.py) | Entry point | | [`processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/processor.py) | Result handling | -| [`resource_cleaner.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/resource_cleaner.py) | K8s cleanup | ## Deployment diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md index 53a63e22..745f7966 100644 --- a/docs/operations/cicd.md +++ b/docs/operations/cicd.md @@ -11,6 +11,7 @@ graph LR subgraph "Code Quality (lightweight)" Ruff["Ruff Linting"] MyPy["MyPy Type Check"] + Vulture["Vulture Dead Code"] ESLint["ESLint + Svelte Check"] end @@ -42,7 +43,7 @@ graph LR Pages["GitHub Pages"] end - Push["Push / PR"] --> Ruff & MyPy & ESLint & Bandit & SBOM & UnitBE & UnitFE & Docs + Push["Push / PR"] --> Ruff & MyPy & Vulture & ESLint & Bandit & SBOM & UnitBE & UnitFE & Docs Build -->|main, all tests pass| Scan Docs -->|main only| Pages ``` @@ -63,6 +64,7 @@ forward when everything passes. | MyPy Type Checking | `.github/workflows/mypy.yml` | Push/PR to `main` | Python static type analysis | | Frontend CI | `.github/workflows/frontend-ci.yml` | Push/PR to `main` (frontend changes) | ESLint + Svelte type check | | Security Scanning | `.github/workflows/security.yml` | Push/PR to `main` | Bandit SAST | +| Dead Code Detection | `.github/workflows/vulture.yml` | Push/PR to `main` | Vulture dead code analysis | | Documentation | `.github/workflows/docs.yml` | Push/PR (`docs/`, `mkdocs.yml`) | MkDocs build and GitHub Pages deploy | ## Composite actions @@ -307,6 +309,7 @@ Three lightweight workflows run independently since they catch obvious issues qu - [Ruff](https://docs.astral.sh/ruff/) checks for style violations, import ordering, and common bugs - [mypy](https://mypy.readthedocs.io/) with strict settings catches type mismatches and missing return types +- [Vulture](https://github.com/jendrikseipp/vulture) detects unused functions, classes, methods, imports, and variables. A whitelist file (`backend/vulture_whitelist.py`) excludes framework patterns (Dishka providers, FastAPI routes, Beanie documents, Pydantic models) that look unused but are called at runtime **Frontend (TypeScript/Svelte):** @@ -385,6 +388,9 @@ uv run ruff check . --config pyproject.toml # Type checking uv run mypy --config-file pyproject.toml --strict . +# Dead code detection +uv run vulture app/ vulture_whitelist.py + # Security scan uv tool run bandit -r . -x tests/ -ll diff --git a/docs/operations/tracing.md b/docs/operations/tracing.md index c8e51ccd..9281e5fc 100644 --- a/docs/operations/tracing.md +++ b/docs/operations/tracing.md @@ -63,7 +63,7 @@ sequenceDiagram Note over Worker: re-injects trace context alt on failure - Kafka->>DLQ: message to dead_letter_queue + Worker->>DLQ: persist to MongoDB Note over DLQ: dlq.consume span
preserves original context DLQ->>Kafka: retry with same context end