diff --git a/.github/workflows/vulture.yml b/.github/workflows/vulture.yml
new file mode 100644
index 00000000..9d4c1e56
--- /dev/null
+++ b/.github/workflows/vulture.yml
@@ -0,0 +1,32 @@
+name: Dead Code Detection
+
+on:
+ push:
+ branches: [ main ]
+ pull_request:
+ branches: [ main ]
+ workflow_dispatch:
+
+jobs:
+ vulture:
+ name: Vulture Dead Code Check
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Set up uv
+ uses: astral-sh/setup-uv@v7
+ with:
+ enable-cache: true
+ cache-dependency-glob: "backend/uv.lock"
+
+ - name: Install Python and dependencies
+ run: |
+ cd backend
+ uv python install 3.12
+ uv sync --frozen
+
+ - name: Run vulture
+ run: |
+ cd backend
+ uv run vulture app/ vulture_whitelist.py
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index a31b2e04..af38e0b0 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -22,6 +22,14 @@ repos:
files: ^backend/.*\.py$
pass_filenames: false
+ # Vulture - matches CI: cd backend && uv run vulture app/ vulture_whitelist.py
+ - id: vulture-backend
+ name: vulture dead code (backend)
+ entry: bash -c 'cd backend && uv run vulture app/ vulture_whitelist.py'
+ language: system
+ files: ^backend/.*\.py$
+ pass_filenames: false
+
# ESLint - matches CI: cd frontend && npx eslint src
- id: eslint-frontend
name: eslint (frontend)
diff --git a/README.md b/README.md
index 4ed1a1ab..7b870e78 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,9 @@
+
+
+
@@ -57,29 +60,56 @@ things safe and efficient. You'll get the results back in no time.
How to deploy
-1. Clone this repository
-2. Check if docker is enabled, kubernetes is running and kubectl is installed
-3. `docker-compose up --build`
+### Prerequisites
+
+- Docker and Docker Compose
+- Kubernetes cluster (k3s, Docker Desktop K8s, or minikube) with `kubectl` configured
+
+### Quick start
+
+```bash
+git clone https://github.com/HardMax71/Integr8sCode.git
+cd Integr8sCode
+cp backend/secrets.example.toml backend/secrets.toml
+./deploy.sh dev
+```
-- Frontend: `https://127.0.0.1:5001/`
-- Backend: `https://127.0.0.1:443/`
- - To check if it works, you can use `curl -k https://127.0.0.1/api/v1/k8s-limits`, should return JSON with current limits
-- Grafana: `http://127.0.0.1:3000` (login - `admin`, pw - `admin123`)
+The `secrets.toml` file holds credentials and is gitignored. The example template has working development defaults.
+### Verify
-You may also find out that k8s doesn't capture metrics (`CPU` and `Memory` params are `null`), it may well be that metrics server
-for k8s is turned off/not enabled. To enable, execute:
```bash
-kubectl create -f https://raw.githubusercontent.com/pythianarora/total-practice/master/sample-kubernetes-code/metrics-server.yaml
+curl -k https://localhost/api/v1/health/live
```
-and test output by writing `kubectl top node` in console, should output sth like:
+### Access
+
+| Service | URL |
+|--------------------|--------------------------------------------------------|
+| Frontend | [https://localhost:5001](https://localhost:5001) |
+| Backend API | [https://localhost:443](https://localhost:443) |
+| Kafdrop (Kafka UI) | [http://localhost:9000](http://localhost:9000) |
+| Grafana | [http://localhost:3000](http://localhost:3000) |
+| Jaeger (Tracing) | [http://localhost:16686](http://localhost:16686) |
+
+Default credentials: `user` / `user123` (regular), `admin` / `admin123` (admin).
+
+Self-signed TLS certs are generated automatically — accept the browser warning.
+
+### Run tests
+
+```bash
+./deploy.sh test
```
-PS C:\Users\User\Desktop\Integr8sCode> kubectl top node
-NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
-docker-desktop 267m 3% 4732Mi 29%
+
+### Stop
+
+```bash
+./deploy.sh down
```
+See the [full deployment guide](https://hardmax71.github.io/Integr8sCode/operations/deployment/) for Docker build strategy, troubleshooting, pre-built images, and more.
+
diff --git a/backend/app/core/container.py b/backend/app/core/container.py
index 42aef885..6f8bd90f 100644
--- a/backend/app/core/container.py
+++ b/backend/app/core/container.py
@@ -19,7 +19,6 @@
PodMonitorProvider,
RedisProvider,
RepositoryProvider,
- ResourceCleanerProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SettingsProvider,
@@ -59,7 +58,6 @@ def create_app_container(settings: Settings) -> AsyncContainer:
BusinessServicesProvider(),
CoordinatorProvider(),
KubernetesProvider(),
- ResourceCleanerProvider(),
FastapiProvider(),
context={Settings: settings},
)
diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py
index 76f6f4b2..3eef6169 100644
--- a/backend/app/core/dishka_lifespan.py
+++ b/backend/app/core/dishka_lifespan.py
@@ -62,7 +62,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Get unstarted broker from DI (BrokerProvider yields without starting)
broker: KafkaBroker = await container.get(KafkaBroker)
- app.state.kafka_broker = broker
# Register subscribers BEFORE broker.start() - FastStream requirement
register_sse_subscriber(broker, settings)
diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py
index d67f3c23..c07885fe 100644
--- a/backend/app/core/providers.py
+++ b/backend/app/core/providers.py
@@ -62,7 +62,7 @@
from app.services.notification_service import NotificationService
from app.services.pod_monitor import PodEventMapper, PodMonitor, PodMonitorConfig
from app.services.rate_limit_service import RateLimitService
-from app.services.result_processor import ResourceCleaner, ResultProcessor
+from app.services.result_processor import ResultProcessor
from app.services.runtime_settings import RuntimeSettingsLoader
from app.services.saga import SagaOrchestrator, SagaService
from app.services.saved_script_service import SavedScriptService
@@ -291,16 +291,6 @@ async def get_api_client(
await api_client.close()
-class ResourceCleanerProvider(Provider):
- scope = Scope.APP
-
- @provide
- def get_resource_cleaner(
- self, api_client: k8s_client.ApiClient, logger: structlog.stdlib.BoundLogger
- ) -> ResourceCleaner:
- return ResourceCleaner(api_client=api_client, logger=logger)
-
-
class MetricsProvider(Provider):
"""Provides all metrics instances via DI (no contextvars needed)."""
diff --git a/backend/app/db/docs/replay.py b/backend/app/db/docs/replay.py
index eb784dc1..6f923a5b 100644
--- a/backend/app/db/docs/replay.py
+++ b/backend/app/db/docs/replay.py
@@ -69,18 +69,6 @@ class ReplaySessionDocument(Document):
model_config = ConfigDict(from_attributes=True)
- @property
- def progress_percentage(self) -> float:
- """Calculate progress percentage."""
- if self.total_events == 0:
- return 0.0
- return round((self.replayed_events / self.total_events) * 100, 2)
-
- @property
- def is_completed(self) -> bool:
- """Check if session is completed."""
- return self.status in [ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED]
-
@property
def is_running(self) -> bool:
"""Check if session is running."""
diff --git a/backend/app/db/repositories/admin/admin_events_repository.py b/backend/app/db/repositories/admin/admin_events_repository.py
index 5b4846d5..0dfaa715 100644
--- a/backend/app/db/repositories/admin/admin_events_repository.py
+++ b/backend/app/db/repositories/admin/admin_events_repository.py
@@ -25,7 +25,7 @@
HourlyEventCount,
UserEventCount,
)
-from app.domain.replay import ReplayFilter, ReplaySessionState
+from app.domain.replay import ReplayFilter
class AdminEventsRepository:
@@ -210,17 +210,6 @@ async def archive_event(self, event: DomainEvent, deleted_by: str) -> bool:
await archive_doc.insert()
return True
- async def create_replay_session(self, session: ReplaySessionState) -> str:
- doc = ReplaySessionDocument(**session.model_dump())
- await doc.insert()
- return session.session_id
-
- async def get_replay_session(self, session_id: str) -> ReplaySessionState | None:
- doc = await ReplaySessionDocument.find_one(ReplaySessionDocument.session_id == session_id)
- if not doc:
- return None
- return ReplaySessionState.model_validate(doc)
-
async def update_replay_session(self, session_id: str, updates: ReplaySessionUpdate) -> bool:
update_dict = updates.model_dump(exclude_none=True)
if not update_dict:
diff --git a/backend/app/db/repositories/dlq_repository.py b/backend/app/db/repositories/dlq_repository.py
index f697685d..b228ce51 100644
--- a/backend/app/db/repositories/dlq_repository.py
+++ b/backend/app/db/repositories/dlq_repository.py
@@ -149,25 +149,3 @@ async def get_queue_sizes_by_topic(self) -> dict[str, int]:
result[row["_id"]] = row["count"]
return result
- async def mark_message_retried(self, event_id: str) -> bool:
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- if not doc:
- return False
- now = datetime.now(timezone.utc)
- doc.status = DLQMessageStatus.RETRIED
- doc.retried_at = now
- doc.last_updated = now
- await doc.save()
- return True
-
- async def mark_message_discarded(self, event_id: str, reason: str) -> bool:
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- if not doc:
- return False
- now = datetime.now(timezone.utc)
- doc.status = DLQMessageStatus.DISCARDED
- doc.discarded_at = now
- doc.discard_reason = reason
- doc.last_updated = now
- await doc.save()
- return True
diff --git a/backend/app/db/repositories/execution_repository.py b/backend/app/db/repositories/execution_repository.py
index f8004ded..a90af22c 100644
--- a/backend/app/db/repositories/execution_repository.py
+++ b/backend/app/db/repositories/execution_repository.py
@@ -8,7 +8,6 @@
from app.domain.execution import (
DomainExecution,
DomainExecutionCreate,
- DomainExecutionUpdate,
ExecutionResultDomain,
)
@@ -34,17 +33,6 @@ async def get_execution(self, execution_id: str) -> DomainExecution | None:
self.logger.info("Found execution in MongoDB", execution_id=execution_id)
return DomainExecution.model_validate(doc)
- async def update_execution(self, execution_id: str, update_data: DomainExecutionUpdate) -> bool:
- doc = await ExecutionDocument.find_one(ExecutionDocument.execution_id == execution_id)
- if not doc:
- return False
-
- update_dict = update_data.model_dump(exclude_none=True)
- if update_dict:
- update_dict["updated_at"] = datetime.now(timezone.utc)
- await doc.set(update_dict)
- return True
-
async def write_terminal_result(self, result: ExecutionResultDomain) -> bool:
doc = await ExecutionDocument.find_one(ExecutionDocument.execution_id == result.execution_id)
if not doc:
diff --git a/backend/app/db/repositories/user_settings_repository.py b/backend/app/db/repositories/user_settings_repository.py
index 6002c26c..08639ba8 100644
--- a/backend/app/db/repositories/user_settings_repository.py
+++ b/backend/app/db/repositories/user_settings_repository.py
@@ -65,9 +65,6 @@ async def count_events_since_snapshot(self, user_id: str) -> int:
GT(EventDocument.timestamp, snapshot.updated_at),
).count()
- async def count_events_for_user(self, user_id: str) -> int:
- return await EventDocument.find(EventDocument.aggregate_id == f"user_settings_{user_id}").count()
-
async def delete_user_settings(self, user_id: str) -> None:
doc = await UserSettingsSnapshotDocument.find_one(UserSettingsSnapshotDocument.user_id == user_id)
if doc:
diff --git a/backend/app/dlq/manager.py b/backend/app/dlq/manager.py
index 05ef2284..6ff600a2 100644
--- a/backend/app/dlq/manager.py
+++ b/backend/app/dlq/manager.py
@@ -209,9 +209,6 @@ async def update_queue_metrics(self) -> None:
def set_retry_policy(self, topic: str, policy: RetryPolicy) -> None:
self._retry_policies[topic] = policy
- def add_filter(self, filter_func: Callable[[DLQMessage], bool]) -> None:
- self._filters.append(filter_func)
-
async def retry_message_manually(self, event_id: str) -> bool:
message = await self.repository.get_message_by_id(event_id)
if not message:
diff --git a/backend/app/domain/events/event_models.py b/backend/app/domain/events/event_models.py
index e1f5c7a7..7561054f 100644
--- a/backend/app/domain/events/event_models.py
+++ b/backend/app/domain/events/event_models.py
@@ -3,7 +3,6 @@
from pydantic import BaseModel, ConfigDict, Field
-from app.core.utils import StringEnum
from app.domain.enums import EventType
from app.domain.events.typed import DomainEvent, EventMetadata
@@ -11,24 +10,6 @@
MongoQuery = dict[str, MongoQueryValue]
-class CollectionNames(StringEnum):
- EVENTS = "events"
- EVENT_STORE = "event_store"
- REPLAY_SESSIONS = "replay_sessions"
- EVENTS_ARCHIVE = "events_archive"
- RESOURCE_ALLOCATIONS = "resource_allocations"
- USERS = "users"
- EXECUTIONS = "executions"
- EXECUTION_RESULTS = "execution_results"
- SAVED_SCRIPTS = "saved_scripts"
- NOTIFICATIONS = "notifications"
- NOTIFICATION_SUBSCRIPTIONS = "notification_subscriptions"
- USER_SETTINGS = "user_settings"
- USER_SETTINGS_SNAPSHOTS = "user_settings_snapshots"
- SAGAS = "sagas"
- DLQ_MESSAGES = "dlq_messages"
-
-
class EventSummary(BaseModel):
"""Lightweight event summary for lists and previews."""
@@ -167,16 +148,6 @@ class ExecutionEventsResult(BaseModel):
access_allowed: bool
include_system_events: bool
- def get_filtered_events(self) -> list[DomainEvent]:
- """Get events filtered based on access and system event settings."""
- if not self.access_allowed:
- return []
-
- events = self.events
- if not self.include_system_events:
- events = [e for e in events if not e.metadata.service_name.startswith("system-")]
-
- return events
class EventExportRow(BaseModel):
diff --git a/backend/app/domain/saga/exceptions.py b/backend/app/domain/saga/exceptions.py
index ebb0788c..edcd1943 100644
--- a/backend/app/domain/saga/exceptions.py
+++ b/backend/app/domain/saga/exceptions.py
@@ -28,15 +28,6 @@ def __init__(self, saga_id: str, current_state: SagaState, operation: str) -> No
super().__init__(f"Cannot {operation} saga '{saga_id}' in state '{current_state}'")
-class SagaCompensationError(InfrastructureError):
- """Raised when saga compensation fails."""
-
- def __init__(self, saga_id: str, step: str, reason: str) -> None:
- self.saga_id = saga_id
- self.step = step
- super().__init__(f"Compensation failed for saga '{saga_id}' at step '{step}': {reason}")
-
-
class SagaTimeoutError(InfrastructureError):
"""Raised when a saga times out."""
diff --git a/backend/app/domain/saga/models.py b/backend/app/domain/saga/models.py
index 86303031..e0ede7f6 100644
--- a/backend/app/domain/saga/models.py
+++ b/backend/app/domain/saga/models.py
@@ -94,20 +94,6 @@ class SagaCancellationResult(BaseModel):
saga_id: str
-class SagaStatistics(BaseModel):
- """Saga statistics."""
-
- model_config = ConfigDict(from_attributes=True)
-
- total_sagas: int
- sagas_by_state: dict[str, int] = Field(default_factory=dict)
- sagas_by_name: dict[str, int] = Field(default_factory=dict)
- average_duration_seconds: float = 0.0
- success_rate: float = 0.0
- failure_rate: float = 0.0
- compensation_rate: float = 0.0
-
-
class SagaConfig(BaseModel):
"""Configuration for saga orchestration (domain)."""
diff --git a/backend/app/domain/user/settings_models.py b/backend/app/domain/user/settings_models.py
index 9c7aef69..cc16917b 100644
--- a/backend/app/domain/user/settings_models.py
+++ b/backend/app/domain/user/settings_models.py
@@ -57,16 +57,6 @@ class DomainUserSettingsUpdate(BaseModel):
custom_settings: dict[str, Any] | None = None
-class DomainSettingChange(BaseModel):
- model_config = ConfigDict(from_attributes=True)
-
- field_path: str
- old_value: Any
- new_value: Any
- changed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- change_reason: str | None = None
-
-
class DomainUserSettingsChangedEvent(BaseModel):
"""Well-typed domain event for user settings changes."""
@@ -105,6 +95,3 @@ class CachedSettings(BaseModel):
settings: DomainUserSettings
expires_at: datetime
-
- def is_expired(self) -> bool:
- return datetime.now(timezone.utc) > self.expires_at
diff --git a/backend/app/domain/user/user_models.py b/backend/app/domain/user/user_models.py
index 1f9b16ce..457adb44 100644
--- a/backend/app/domain/user/user_models.py
+++ b/backend/app/domain/user/user_models.py
@@ -71,16 +71,6 @@ class UserUpdate(BaseModel):
is_active: bool | None = None
password: str | None = None
- def has_updates(self) -> bool:
- return any(
- [
- self.username is not None,
- self.email is not None,
- self.role is not None,
- self.is_active is not None,
- self.password is not None,
- ]
- )
class UserListResult(BaseModel):
diff --git a/backend/app/events/consumer_group_monitor.py b/backend/app/events/consumer_group_monitor.py
index d6da44f9..5f81d27f 100644
--- a/backend/app/events/consumer_group_monitor.py
+++ b/backend/app/events/consumer_group_monitor.py
@@ -1,17 +1,12 @@
-import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
-from aiokafka import AIOKafkaConsumer, TopicPartition
-from aiokafka.admin import AIOKafkaAdminClient
from aiokafka.protocol.api import Response
from aiokafka.protocol.group import MemberAssignment
-from aiokafka.structs import OffsetAndMetadata
from app.core.utils import StringEnum
-from app.settings import Settings
class ConsumerGroupHealth(StringEnum):
@@ -128,358 +123,4 @@ def _state_from_string(state_str: str) -> ConsumerGroupState:
return ConsumerGroupState.UNKNOWN
-class NativeConsumerGroupMonitor:
- """
- Enhanced consumer group monitoring using aiokafka.
-
- Provides detailed consumer group health monitoring, lag tracking, and
- rebalancing detection using AIOKafkaAdminClient's native capabilities.
- """
-
- def __init__(
- self,
- settings: Settings,
- logger: logging.Logger,
- client_id: str = "integr8scode-consumer-group-monitor",
- # Health thresholds
- critical_lag_threshold: int = 10000,
- warning_lag_threshold: int = 1000,
- min_members_threshold: int = 1,
- ):
- self.logger = logger
- self._settings = settings
- self._bootstrap_servers = settings.KAFKA_BOOTSTRAP_SERVERS
- self._client_id = client_id
-
- self._admin: AIOKafkaAdminClient | None = None
-
- # Health thresholds
- self.critical_lag_threshold = critical_lag_threshold
- self.warning_lag_threshold = warning_lag_threshold
- self.min_members_threshold = min_members_threshold
-
- # Monitoring state
- self._group_status_cache: dict[str, ConsumerGroupStatus] = {}
- self._cache_ttl_seconds = 30
-
- async def _get_admin(self) -> AIOKafkaAdminClient:
- """Get or create the admin client."""
- if self._admin is None:
- self._admin = AIOKafkaAdminClient(
- bootstrap_servers=self._bootstrap_servers,
- client_id=self._client_id,
- )
- await self._admin.start()
- return self._admin
-
- async def close(self) -> None:
- """Close the admin client."""
- if self._admin is not None:
- await self._admin.close()
- self._admin = None
-
- async def get_consumer_group_status(
- self, group_id: str, include_lag: bool = True
- ) -> ConsumerGroupStatus:
- """Get comprehensive status for a consumer group."""
- try:
- # Check cache first
- cached = self._group_status_cache.get(group_id)
- if cached is not None:
- cache_age = (datetime.now(timezone.utc) - cached.timestamp).total_seconds()
- if cache_age < self._cache_ttl_seconds:
- return cached
-
- # Get group description from AdminClient
- described_group = await self._describe_consumer_group(group_id)
-
- # Build member information
- members: list[ConsumerGroupMember] = []
- partition_distribution: dict[str, int] = {}
- total_assigned_partitions = 0
-
- for member_data in described_group.members:
- member_id: str = member_data["member_id"]
- client_id: str = member_data["client_id"]
- client_host: str = member_data["client_host"]
- assignment_bytes: bytes = member_data["member_assignment"]
-
- # Parse assigned partitions from assignment bytes
- assigned_partitions: list[str] = []
- topic_partitions = _parse_member_assignment(assignment_bytes)
- for topic, partitions in topic_partitions:
- for partition in partitions:
- assigned_partitions.append(f"{topic}:{partition}")
-
- members.append(
- ConsumerGroupMember(
- member_id=member_id,
- client_id=client_id,
- host=client_host,
- assigned_partitions=assigned_partitions,
- )
- )
-
- partition_distribution[member_id] = len(assigned_partitions)
- total_assigned_partitions += len(assigned_partitions)
-
- # Get coordinator information
- admin = await self._get_admin()
- coordinator_id = await admin.find_coordinator(group_id)
- coordinator = f"node:{coordinator_id}"
-
- # Parse state
- state = _state_from_string(described_group.state)
-
- # Get lag information if requested
- total_lag = 0
- partition_lags: dict[str, int] = {}
- if include_lag and state == ConsumerGroupState.STABLE:
- try:
- lag_info = await self._get_consumer_group_lag(group_id)
- total_lag = lag_info["total_lag"]
- partition_lags = lag_info["partition_lags"]
- except Exception as e:
- self.logger.warning(f"Failed to get lag info for group {group_id}: {e}")
-
- # Create status object
- status = ConsumerGroupStatus(
- group_id=group_id,
- state=state,
- protocol=described_group.protocol,
- protocol_type=described_group.protocol_type,
- coordinator=coordinator,
- members=members,
- member_count=len(members),
- assigned_partitions=total_assigned_partitions,
- partition_distribution=partition_distribution,
- total_lag=total_lag,
- partition_lags=partition_lags,
- )
-
- # Assess health
- status.health, status.health_message = self._assess_group_health(status)
-
- # Cache the result
- self._group_status_cache[group_id] = status
-
- return status
-
- except Exception as e:
- self.logger.error(f"Failed to get consumer group status for {group_id}: {e}")
-
- # Return minimal status with error
- return ConsumerGroupStatus(
- group_id=group_id,
- state=ConsumerGroupState.UNKNOWN,
- protocol="unknown",
- protocol_type="unknown",
- coordinator="unknown",
- members=[],
- member_count=0,
- assigned_partitions=0,
- partition_distribution={},
- health=ConsumerGroupHealth.UNHEALTHY,
- health_message=f"Failed to get group status: {e}",
- )
-
- async def get_multiple_group_status(
- self, group_ids: list[str], include_lag: bool = True
- ) -> dict[str, ConsumerGroupStatus]:
- """Get status for multiple consumer groups efficiently."""
- results: dict[str, ConsumerGroupStatus] = {}
-
- # Process groups concurrently
- tasks = [self.get_consumer_group_status(group_id, include_lag) for group_id in group_ids]
-
- try:
- statuses = await asyncio.gather(*tasks, return_exceptions=True)
-
- for group_id, status in zip(group_ids, statuses, strict=False):
- if isinstance(status, ConsumerGroupStatus):
- results[group_id] = status
- else:
- # status is BaseException
- self.logger.error(f"Failed to get status for group {group_id}: {status}")
- results[group_id] = ConsumerGroupStatus(
- group_id=group_id,
- state=ConsumerGroupState.UNKNOWN,
- protocol="unknown",
- protocol_type="unknown",
- coordinator="unknown",
- members=[],
- member_count=0,
- assigned_partitions=0,
- partition_distribution={},
- health=ConsumerGroupHealth.UNHEALTHY,
- health_message=str(status),
- )
-
- except Exception as e:
- self.logger.error(f"Failed to get multiple group status: {e}")
- # Return error status for all groups
- for group_id in group_ids:
- if group_id not in results:
- results[group_id] = ConsumerGroupStatus(
- group_id=group_id,
- state=ConsumerGroupState.UNKNOWN,
- protocol="unknown",
- protocol_type="unknown",
- coordinator="unknown",
- members=[],
- member_count=0,
- assigned_partitions=0,
- partition_distribution={},
- health=ConsumerGroupHealth.UNHEALTHY,
- health_message=str(e),
- )
-
- return results
-
- async def list_consumer_groups(self) -> list[str]:
- """List all consumer groups in the cluster."""
- try:
- admin = await self._get_admin()
- # Returns list of tuples: (group_id, protocol_type)
- groups: list[tuple[Any, ...]] = await admin.list_consumer_groups()
- return [str(g[0]) for g in groups]
- except Exception as e:
- self.logger.error(f"Failed to list consumer groups: {e}")
- return []
-
- async def _describe_consumer_group(self, group_id: str) -> DescribedGroup:
- """Describe a single consumer group using native AdminClient."""
- admin = await self._get_admin()
- responses: list[Response] = await admin.describe_consumer_groups([group_id])
-
- if not responses:
- raise ValueError(f"No response for group {group_id}")
-
- # Parse the response
- groups = _parse_describe_groups_response(responses[0])
-
- # Find our group in the response
- for group in groups:
- if group.group_id == group_id:
- if group.error_code != 0:
- raise ValueError(f"Error describing group {group_id}: error_code={group.error_code}")
- return group
-
- raise ValueError(f"Group {group_id} not found in response")
-
- async def _get_consumer_group_lag(self, group_id: str) -> dict[str, Any]:
- """Get consumer group lag information."""
- try:
- admin = await self._get_admin()
-
- # Get committed offsets for the group
- offsets: dict[TopicPartition, OffsetAndMetadata] = await admin.list_consumer_group_offsets(group_id)
-
- if not offsets:
- return {"total_lag": 0, "partition_lags": {}}
-
- # Create a temporary consumer to get high watermarks
- consumer = AIOKafkaConsumer(
- bootstrap_servers=self._bootstrap_servers,
- group_id=f"{group_id}-lag-monitor-{datetime.now().timestamp()}",
- enable_auto_commit=False,
- auto_offset_reset="earliest",
- session_timeout_ms=self._settings.KAFKA_SESSION_TIMEOUT_MS,
- heartbeat_interval_ms=self._settings.KAFKA_HEARTBEAT_INTERVAL_MS,
- request_timeout_ms=self._settings.KAFKA_REQUEST_TIMEOUT_MS,
- )
-
- try:
- await consumer.start()
-
- total_lag = 0
- partition_lags: dict[str, int] = {}
-
- # Get end offsets for all partitions
- tps = list(offsets.keys())
- if tps:
- end_offsets: dict[TopicPartition, int] = await consumer.end_offsets(tps)
-
- for tp, offset_meta in offsets.items():
- committed_offset = offset_meta.offset
- high = end_offsets.get(tp, 0)
-
- if committed_offset >= 0:
- lag = max(0, high - committed_offset)
- partition_lags[f"{tp.topic}:{tp.partition}"] = lag
- total_lag += lag
-
- return {"total_lag": total_lag, "partition_lags": partition_lags}
-
- finally:
- await consumer.stop()
-
- except Exception as e:
- self.logger.warning(f"Failed to get consumer group lag for {group_id}: {e}")
- return {"total_lag": 0, "partition_lags": {}}
-
- def _assess_group_health(self, status: ConsumerGroupStatus) -> tuple[ConsumerGroupHealth, str]:
- """Assess the health of a consumer group based on its status."""
-
- # Check for error/unknown state
- if status.state == ConsumerGroupState.UNKNOWN:
- return ConsumerGroupHealth.UNHEALTHY, "Group is in unknown state"
-
- if status.state == ConsumerGroupState.DEAD:
- return ConsumerGroupHealth.UNHEALTHY, "Group is dead"
-
- if status.member_count < self.min_members_threshold:
- return ConsumerGroupHealth.UNHEALTHY, f"Insufficient members: {status.member_count}"
-
- # Check for rebalancing issues
- if status.state in (ConsumerGroupState.PREPARING_REBALANCE, ConsumerGroupState.COMPLETING_REBALANCE):
- return ConsumerGroupHealth.DEGRADED, f"Group is rebalancing: {status.state}"
-
- # Check for empty group
- if status.state == ConsumerGroupState.EMPTY:
- return ConsumerGroupHealth.DEGRADED, "Group is empty (no active members)"
-
- # Check lag if available
- if status.total_lag >= self.critical_lag_threshold:
- return ConsumerGroupHealth.UNHEALTHY, f"Critical lag: {status.total_lag} messages"
-
- if status.total_lag >= self.warning_lag_threshold:
- return ConsumerGroupHealth.DEGRADED, f"High lag: {status.total_lag} messages"
-
- # Check partition distribution
- if status.partition_distribution:
- values = list(status.partition_distribution.values())
- max_partitions = max(values)
- min_partitions = min(values)
-
- # Warn if partition distribution is very uneven
- if max_partitions > 0 and (max_partitions - min_partitions) > max_partitions * 0.5:
- return ConsumerGroupHealth.DEGRADED, "Uneven partition distribution"
-
- # Check if group is stable and consuming
- if status.state == ConsumerGroupState.STABLE and status.assigned_partitions > 0:
- return ConsumerGroupHealth.HEALTHY, f"Group is stable with {status.member_count} members"
-
- # Default case
- return ConsumerGroupHealth.UNKNOWN, f"Group state: {status.state}"
-
- def get_health_summary(self, status: ConsumerGroupStatus) -> dict[str, Any]:
- """Get a health summary for a consumer group."""
- return {
- "group_id": status.group_id,
- "health": status.health,
- "health_message": status.health_message,
- "state": status.state,
- "members": status.member_count,
- "assigned_partitions": status.assigned_partitions,
- "total_lag": status.total_lag,
- "coordinator": status.coordinator,
- "timestamp": status.timestamp.isoformat(),
- "partition_distribution": status.partition_distribution,
- }
-
- def clear_cache(self) -> None:
- """Clear the status cache."""
- self._group_status_cache.clear()
-
diff --git a/backend/app/events/core/producer.py b/backend/app/events/core/producer.py
index 1946f479..aab668bc 100644
--- a/backend/app/events/core/producer.py
+++ b/backend/app/events/core/producer.py
@@ -1,14 +1,8 @@
-import asyncio
-import socket
-from datetime import datetime, timezone
-
import structlog
from faststream.kafka import KafkaBroker
from app.core.metrics import EventMetrics
from app.db.repositories import EventRepository
-from app.dlq.models import DLQMessage, DLQMessageStatus
-from app.domain.enums import KafkaTopic
from app.domain.events import DomainEvent
from app.infrastructure.kafka.mappings import EVENT_TYPE_TO_TOPIC
from app.settings import Settings
@@ -54,41 +48,3 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
self.logger.error(f"Failed to produce message: {e}")
raise
- async def send_to_dlq(
- self, original_event: DomainEvent, original_topic: str, error: Exception, retry_count: int = 0
- ) -> None:
- """Send a failed event to the Dead Letter Queue."""
- try:
- current_task = asyncio.current_task()
- task_name = current_task.get_name() if current_task else "main"
- producer_id = f"{socket.gethostname()}-{task_name}"
-
- dlq_topic = f"{self._topic_prefix}{KafkaTopic.DEAD_LETTER_QUEUE}"
-
- dlq_msg = DLQMessage(
- event=original_event,
- original_topic=original_topic,
- error=str(error),
- retry_count=retry_count,
- failed_at=datetime.now(timezone.utc),
- status=DLQMessageStatus.PENDING,
- producer_id=producer_id,
- )
-
- await self._broker.publish(
- message=dlq_msg,
- topic=dlq_topic,
- key=original_event.event_id.encode(),
- )
-
- self._event_metrics.record_kafka_message_produced(dlq_topic)
- self.logger.warning(
- f"Event {original_event.event_id} sent to DLQ. "
- f"Original topic: {original_topic}, Error: {error}, "
- f"Retry count: {retry_count}"
- )
-
- except Exception as e:
- self.logger.critical(
- f"Failed to send event {original_event.event_id} to DLQ: {e}. Original error: {error}", exc_info=True
- )
diff --git a/backend/app/infrastructure/kafka/mappings.py b/backend/app/infrastructure/kafka/mappings.py
index c7597db2..dc339728 100644
--- a/backend/app/infrastructure/kafka/mappings.py
+++ b/backend/app/infrastructure/kafka/mappings.py
@@ -98,10 +98,6 @@ def get_topic_for_event(event_type: EventType) -> KafkaTopic:
return EVENT_TYPE_TO_TOPIC.get(event_type, KafkaTopic.SYSTEM_EVENTS)
-def get_event_types_for_topic(topic: KafkaTopic) -> list[EventType]:
- """Get all event types that publish to a given topic."""
- return [et for et, t in EVENT_TYPE_TO_TOPIC.items() if t == topic]
-
CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[KafkaTopic]] = {
GroupId.EXECUTION_COORDINATOR: {
diff --git a/backend/app/schemas_pydantic/execution.py b/backend/app/schemas_pydantic/execution.py
index 2fc60a25..4b16d8e4 100644
--- a/backend/app/schemas_pydantic/execution.py
+++ b/backend/app/schemas_pydantic/execution.py
@@ -20,12 +20,6 @@ class ExecutionBase(BaseModel):
lang_version: str = "3.11"
-class ExecutionCreate(ExecutionBase):
- """Model for creating a new execution."""
-
- pass
-
-
class ExecutionInDB(ExecutionBase):
"""Model for execution as stored in database."""
@@ -40,17 +34,6 @@ class ExecutionInDB(ExecutionBase):
model_config = ConfigDict(from_attributes=True)
-class ExecutionUpdate(BaseModel):
- """Model for updating an execution."""
-
- status: ExecutionStatus | None = None
- stdout: str | None = None
- stderr: str | None = None
- resource_usage: ResourceUsage | None = None
- exit_code: int | None = None
- error_type: ExecutionErrorType | None = None
-
-
class ResourceUsage(BaseModel):
"""Model for execution resource usage."""
diff --git a/backend/app/schemas_pydantic/notification.py b/backend/app/schemas_pydantic/notification.py
index 05491c3d..76805209 100644
--- a/backend/app/schemas_pydantic/notification.py
+++ b/backend/app/schemas_pydantic/notification.py
@@ -56,27 +56,6 @@ def validate_scheduled_for(cls, v: datetime | None) -> datetime | None:
model_config = ConfigDict(from_attributes=True)
-class NotificationBatch(BaseModel):
- """Batch of notifications for bulk processing"""
-
- batch_id: str = Field(default_factory=lambda: str(uuid4()))
- notifications: list[Notification]
- created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
- processed_count: int = 0
- failed_count: int = 0
-
- @field_validator("notifications")
- @classmethod
- def validate_notifications(cls, v: list[Notification]) -> list[Notification]:
- if not v:
- raise ValueError("Batch must contain at least one notification")
- if len(v) > 1000:
- raise ValueError("Batch cannot exceed 1000 notifications")
- return v
-
- model_config = ConfigDict(from_attributes=True)
-
-
# Rules removed in unified model
@@ -109,37 +88,6 @@ class NotificationSubscription(BaseModel):
model_config = ConfigDict(from_attributes=True)
-class NotificationStats(BaseModel):
- """Statistics for notification delivery"""
-
- user_id: str | None = None
- channel: NotificationChannel | None = None
- tags: list[str] | None = None
- severity: NotificationSeverity | None = None
-
- # Time range
- start_date: datetime
- end_date: datetime
-
- # Counts
- total_sent: int = 0
- total_delivered: int = 0
- total_failed: int = 0
- total_read: int = 0
- total_clicked: int = 0
-
- # Rates
- delivery_rate: float = 0.0
- read_rate: float = 0.0
- click_rate: float = 0.0
-
- # Performance
- avg_delivery_time_seconds: float = 0.0
- avg_read_time_seconds: float = 0.0
-
- model_config = ConfigDict(from_attributes=True)
-
-
class NotificationResponse(BaseModel):
"""Response schema for notification endpoints"""
diff --git a/backend/app/schemas_pydantic/replay.py b/backend/app/schemas_pydantic/replay.py
index 95685b45..19d77b1e 100644
--- a/backend/app/schemas_pydantic/replay.py
+++ b/backend/app/schemas_pydantic/replay.py
@@ -67,12 +67,6 @@ def duration_seconds(self) -> float | None:
return (self.completed_at - self.started_at).total_seconds()
return None
- @computed_field # type: ignore[prop-decorator]
- @property
- def throughput_events_per_second(self) -> float | None:
- if self.duration_seconds and self.duration_seconds > 0 and self.replayed_events > 0:
- return self.replayed_events / self.duration_seconds
- return None
class CleanupResponse(BaseModel):
diff --git a/backend/app/schemas_pydantic/saved_script.py b/backend/app/schemas_pydantic/saved_script.py
index e315c656..e9cd0c84 100644
--- a/backend/app/schemas_pydantic/saved_script.py
+++ b/backend/app/schemas_pydantic/saved_script.py
@@ -1,5 +1,4 @@
from datetime import datetime, timezone
-from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Field
@@ -18,15 +17,6 @@ class SavedScriptCreate(SavedScriptBase):
pass
-class SavedScriptInDB(SavedScriptBase):
- script_id: str = Field(default_factory=lambda: str(uuid4()))
- user_id: str
- created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
-
- model_config = ConfigDict(from_attributes=True)
-
-
class SavedScriptUpdate(BaseModel):
name: str | None = None
script: str | None = None
diff --git a/backend/app/schemas_pydantic/user.py b/backend/app/schemas_pydantic/user.py
index 268b7aaa..29b82f9e 100644
--- a/backend/app/schemas_pydantic/user.py
+++ b/backend/app/schemas_pydantic/user.py
@@ -1,5 +1,4 @@
-from datetime import datetime, timezone
-from uuid import uuid4
+from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field
@@ -26,18 +25,6 @@ class UserCreate(UserBase):
model_config = ConfigDict(from_attributes=True)
-class UserInDB(UserBase):
- """User model as stored in database (with hashed password)"""
-
- user_id: str = Field(default_factory=lambda: str(uuid4()))
- hashed_password: str
- is_superuser: bool = False
- created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
-
- model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
-
-
class UserUpdate(BaseModel):
"""Model for updating a user"""
diff --git a/backend/app/schemas_pydantic/user_settings.py b/backend/app/schemas_pydantic/user_settings.py
index 76140933..6cba84b6 100644
--- a/backend/app/schemas_pydantic/user_settings.py
+++ b/backend/app/schemas_pydantic/user_settings.py
@@ -77,16 +77,6 @@ class UserSettingsUpdate(BaseModel):
custom_settings: dict[str, Any] | None = None
-class SettingChange(BaseModel):
- """Represents a single setting change for event sourcing"""
-
- field_path: str # e.g., "theme", "editor.font_size", "notifications.channels"
- old_value: Any
- new_value: Any
- changed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- change_reason: str | None = None
-
-
class ThemeUpdateRequest(BaseModel):
"""Request model for theme update"""
@@ -122,9 +112,3 @@ class RestoreSettingsRequest(BaseModel):
reason: str | None = None
-class SettingsEvent(BaseModel):
- """Minimal event model for user settings service consumption."""
-
- event_type: EventType
- timestamp: datetime
- payload: dict[str, Any]
diff --git a/backend/app/services/event_service.py b/backend/app/services/event_service.py
index 596aa4bd..ea0aa65f 100644
--- a/backend/app/services/event_service.py
+++ b/backend/app/services/event_service.py
@@ -1,47 +1,16 @@
from datetime import datetime
-from typing import Any
from app.db.repositories import EventRepository
from app.domain.enums import EventType, UserRole
from app.domain.events import (
ArchivedEvent,
DomainEvent,
- EventFilter,
EventListResult,
EventReplayInfo,
EventStatistics,
)
-def _filter_to_mongo_query(flt: EventFilter) -> dict[str, Any]:
- """Convert EventFilter to MongoDB query dict."""
- query: dict[str, Any] = {}
-
- if flt.event_types:
- query["event_type"] = {"$in": flt.event_types}
- if flt.aggregate_id:
- query["aggregate_id"] = flt.aggregate_id
- if flt.user_id:
- query["metadata.user_id"] = flt.user_id
- if flt.service_name:
- query["metadata.service_name"] = flt.service_name
- if getattr(flt, "status", None):
- query["status"] = flt.status
-
- if flt.start_time or flt.end_time:
- time_query: dict[str, Any] = {}
- if flt.start_time:
- time_query["$gte"] = flt.start_time
- if flt.end_time:
- time_query["$lte"] = flt.end_time
- query["timestamp"] = time_query
-
- if flt.search_text:
- query["$text"] = {"$search": flt.search_text}
-
- return query
-
-
class EventService:
def __init__(self, repository: EventRepository):
self.repository = repository
@@ -102,39 +71,6 @@ async def get_user_events_paginated(
sort_order=sort_order,
)
- async def query_events_advanced(
- self,
- user_id: str,
- user_role: UserRole,
- filters: EventFilter,
- sort_by: str = "timestamp",
- limit: int = 100,
- skip: int = 0,
- ) -> EventListResult | None:
- # Access control
- if filters.user_id and filters.user_id != user_id and user_role != UserRole.ADMIN:
- return None
-
- query = _filter_to_mongo_query(filters)
- if not filters.user_id and user_role != UserRole.ADMIN:
- query["metadata.user_id"] = user_id
-
- # Sort field mapping
- field_map = {
- "timestamp": "timestamp",
- "event_type": "event_type",
- "aggregate_id": "aggregate_id",
- "stored_at": "stored_at",
- }
- sort_field = field_map.get(sort_by, "timestamp")
-
- return await self.repository.query_events(
- query=query,
- sort_field=sort_field,
- skip=skip,
- limit=limit,
- )
-
async def get_event_statistics(
self,
user_id: str,
diff --git a/backend/app/services/idempotency/redis_repository.py b/backend/app/services/idempotency/redis_repository.py
index 65ba91a1..f15471a0 100644
--- a/backend/app/services/idempotency/redis_repository.py
+++ b/backend/app/services/idempotency/redis_repository.py
@@ -118,9 +118,3 @@ async def update_record(self, record: IdempotencyRecord) -> int:
await self._r.set(k, payload)
return 1
- async def delete_key(self, key: str) -> int:
- k = self._full_key(key)
- return int(await self._r.delete(k) or 0)
-
- async def health_check(self) -> None:
- await self._r.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool
diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py
index 591a27c1..e5429eab 100644
--- a/backend/app/services/k8s_worker/worker.py
+++ b/backend/app/services/k8s_worker/worker.py
@@ -13,7 +13,6 @@
CreatePodCommandEvent,
DeletePodCommandEvent,
ExecutionFailedEvent,
- ExecutionStartedEvent,
PodCreatedEvent,
)
from app.events.core import UnifiedProducer
@@ -58,7 +57,6 @@ def __init__(
# Kubernetes clients created from ApiClient
self.v1 = k8s_client.CoreV1Api(api_client)
- self.networking_v1 = k8s_client.NetworkingV1Api(api_client)
self.apps_v1 = k8s_client.AppsV1Api(api_client)
# Components
@@ -83,30 +81,28 @@ async def handle_create_pod_command(self, command: CreatePodCommandEvent) -> Non
await self._create_pod_for_execution(command)
async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> None:
- """Handle delete pod command from saga orchestrator (compensation)"""
+ """Handle delete pod command from saga orchestrator (compensation).
+
+ Deleting the pod is sufficient — the ConfigMap has an ownerReference pointing
+ to the pod, so K8s garbage-collects it automatically.
+ """
execution_id = command.execution_id
self.logger.info(f"Deleting pod for execution {execution_id} due to: {command.reason}")
try:
- # Delete the pod
pod_name = f"executor-{execution_id}"
await self.v1.delete_namespaced_pod(
name=pod_name,
namespace=self._settings.K8S_NAMESPACE,
grace_period_seconds=30,
)
- self.logger.info(f"Successfully deleted pod {pod_name}")
-
- # Delete associated ConfigMap
- configmap_name = f"script-{execution_id}"
- await self.v1.delete_namespaced_config_map(name=configmap_name, namespace=self._settings.K8S_NAMESPACE)
- self.logger.info(f"Successfully deleted ConfigMap {configmap_name}")
+ self.logger.info(f"Successfully deleted pod {pod_name} (ConfigMap will be GC'd by K8s)")
except ApiException as e:
if e.status == 404:
- self.logger.warning(f"Resources for execution {execution_id} not found (may have already been deleted)")
+ self.logger.warning(f"Pod for execution {execution_id} not found (may have already been deleted)")
else:
- self.logger.error(f"Failed to delete resources for execution {execution_id}: {e}")
+ self.logger.error(f"Failed to delete pod for execution {execution_id}: {e}")
async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None:
"""Create pod for execution"""
@@ -129,7 +125,11 @@ async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> Non
await self._create_config_map(config_map)
pod = self.pod_builder.build_pod_manifest(command=command)
- await self._create_pod(pod)
+ created_pod = await self._create_pod(pod)
+
+ # Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted
+ if created_pod and created_pod.metadata and created_pod.metadata.uid:
+ await self._set_configmap_owner(config_map, created_pod)
# Publish PodCreated event
await self._publish_pod_created(command, pod)
@@ -190,28 +190,47 @@ async def _create_config_map(self, config_map: k8s_client.V1ConfigMap) -> None:
self.metrics.record_k8s_config_map_created("failed")
raise
- async def _create_pod(self, pod: k8s_client.V1Pod) -> None:
- """Create Pod in Kubernetes"""
+ async def _create_pod(self, pod: k8s_client.V1Pod) -> k8s_client.V1Pod | None:
+ """Create Pod in Kubernetes. Returns the created pod (with UID) or None if it already existed."""
try:
- await self.v1.create_namespaced_pod(namespace=self._settings.K8S_NAMESPACE, body=pod)
+ created: k8s_client.V1Pod = await self.v1.create_namespaced_pod(
+ namespace=self._settings.K8S_NAMESPACE, body=pod
+ )
self.logger.debug(f"Created Pod {pod.metadata.name}")
+ return created
except ApiException as e:
if e.status == 409: # Already exists
self.logger.warning(f"Pod {pod.metadata.name} already exists")
+ return None
else:
raise
- async def _publish_execution_started(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None:
- """Publish execution started event"""
- event = ExecutionStartedEvent(
- execution_id=command.execution_id,
- aggregate_id=command.execution_id,
- pod_name=pod.metadata.name,
- node_name=pod.spec.node_name,
- container_id=None,
- metadata=command.metadata,
+ async def _set_configmap_owner(
+ self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod
+ ) -> None:
+ """Patch the ConfigMap with an ownerReference pointing to the pod.
+
+ This makes K8s garbage-collect the ConfigMap automatically when the pod is deleted.
+ """
+ owner_ref = k8s_client.V1OwnerReference(
+ api_version="v1",
+ kind="Pod",
+ name=owner_pod.metadata.name,
+ uid=owner_pod.metadata.uid,
+ block_owner_deletion=False,
)
- await self.producer.produce(event_to_produce=event, key=command.execution_id)
+ patch_body = {"metadata": {"ownerReferences": [owner_ref]}}
+ try:
+ await self.v1.patch_namespaced_config_map(
+ name=config_map.metadata.name,
+ namespace=self._settings.K8S_NAMESPACE,
+ body=patch_body,
+ )
+ self.logger.debug(
+ f"Set ownerReference on ConfigMap {config_map.metadata.name} -> Pod {owner_pod.metadata.name}"
+ )
+ except ApiException as e:
+ self.logger.warning(f"Failed to set ownerReference on ConfigMap: {e.reason}")
async def _publish_pod_created(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None:
"""Publish pod created event"""
diff --git a/backend/app/services/pod_monitor/event_mapper.py b/backend/app/services/pod_monitor/event_mapper.py
index a2615b7c..e8e73943 100644
--- a/backend/app/services/pod_monitor/event_mapper.py
+++ b/backend/app/services/pod_monitor/event_mapper.py
@@ -519,6 +519,3 @@ def _log_extraction_error(self, pod_name: str, error: str) -> None:
else:
self.logger.warning(f"Failed to extract logs from pod {pod_name}: {error}")
- def clear_cache(self) -> None:
- """Clear event cache"""
- self._event_cache.clear()
diff --git a/backend/app/services/result_processor/__init__.py b/backend/app/services/result_processor/__init__.py
index 278aec0a..58e6d26f 100644
--- a/backend/app/services/result_processor/__init__.py
+++ b/backend/app/services/result_processor/__init__.py
@@ -1,7 +1,5 @@
from app.services.result_processor.processor import ResultProcessor
-from app.services.result_processor.resource_cleaner import ResourceCleaner
__all__ = [
"ResultProcessor",
- "ResourceCleaner",
]
diff --git a/backend/app/services/result_processor/resource_cleaner.py b/backend/app/services/result_processor/resource_cleaner.py
deleted file mode 100644
index 4734e0db..00000000
--- a/backend/app/services/result_processor/resource_cleaner.py
+++ /dev/null
@@ -1,213 +0,0 @@
-import asyncio
-from datetime import datetime, timedelta, timezone
-from typing import Any
-
-import structlog
-from kubernetes_asyncio import client as k8s_client
-from kubernetes_asyncio.client.rest import ApiException
-
-from app.domain.exceptions import InfrastructureError
-
-# Python 3.12 type aliases
-type ResourceDict = dict[str, list[str]]
-type CountDict = dict[str, int]
-
-
-class ResourceCleaner:
- """Service for cleaning up Kubernetes resources.
-
- Accepts ApiClient via dependency injection for proper configuration management.
- """
-
- def __init__(self, api_client: k8s_client.ApiClient, logger: structlog.stdlib.BoundLogger) -> None:
- self.v1 = k8s_client.CoreV1Api(api_client)
- self.networking_v1 = k8s_client.NetworkingV1Api(api_client)
- self.logger = logger
-
- async def cleanup_pod_resources(
- self,
- pod_name: str,
- namespace: str = "integr8scode",
- execution_id: str | None = None,
- timeout: int = 60,
- delete_pvcs: bool = False,
- ) -> None:
- """Clean up all resources associated with a pod"""
- self.logger.info(f"Cleaning up resources for pod: {pod_name}")
-
- try:
- tasks = [
- self._delete_pod(pod_name, namespace),
- *(
- [
- self._delete_configmaps(execution_id, namespace),
- *([self._delete_pvcs(execution_id, namespace)] if delete_pvcs else []),
- ]
- if execution_id
- else []
- ),
- ]
-
- await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=timeout)
-
- self.logger.info(f"Successfully cleaned up resources for pod: {pod_name}")
-
- except asyncio.TimeoutError as e:
- self.logger.error(f"Timeout cleaning up resources for pod: {pod_name}")
- raise InfrastructureError("Resource cleanup timed out") from e
- except Exception as e:
- self.logger.error(f"Failed to cleanup resources: {e}")
- raise InfrastructureError(f"Resource cleanup failed: {e}") from e
-
- async def _delete_pod(self, pod_name: str, namespace: str) -> None:
- """Delete a pod"""
- try:
- await self.v1.read_namespaced_pod(pod_name, namespace)
- await self.v1.delete_namespaced_pod(pod_name, namespace, grace_period_seconds=30)
- self.logger.info(f"Deleted pod: {pod_name}")
-
- except ApiException as e:
- if e.status == 404:
- self.logger.info(f"Pod {pod_name} already deleted")
- else:
- self.logger.error(f"Failed to delete pod: {e}")
- raise
-
- async def _delete_configmaps(self, execution_id: str, namespace: str) -> None:
- """Delete ConfigMaps for an execution"""
- await self._delete_labeled_resources(
- execution_id,
- namespace,
- self.v1.list_namespaced_config_map,
- self.v1.delete_namespaced_config_map,
- "ConfigMap",
- )
-
- async def _delete_pvcs(self, execution_id: str, namespace: str) -> None:
- """Delete PersistentVolumeClaims for an execution"""
- await self._delete_labeled_resources(
- execution_id,
- namespace,
- self.v1.list_namespaced_persistent_volume_claim,
- self.v1.delete_namespaced_persistent_volume_claim,
- "PVC",
- )
-
- async def _delete_labeled_resources(
- self, execution_id: str, namespace: str, list_func: Any, delete_func: Any, resource_type: str
- ) -> None:
- """Generic function to delete labeled resources"""
- try:
- label_selector = f"execution-id={execution_id}"
- resources = await list_func(namespace, label_selector=label_selector)
-
- for resource in resources.items:
- await delete_func(resource.metadata.name, namespace)
- self.logger.info(f"Deleted {resource_type}: {resource.metadata.name}")
-
- except ApiException as e:
- self.logger.error(f"Failed to delete {resource_type}s: {e}")
-
- async def cleanup_orphaned_resources(
- self,
- namespace: str = "integr8scode",
- max_age_hours: int = 24,
- dry_run: bool = False,
- ) -> ResourceDict:
- """Clean up orphaned resources older than specified age"""
- cutoff_time = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
- cleaned: ResourceDict = {
- "pods": [],
- "configmaps": [],
- "pvcs": [],
- }
-
- try:
- await self._cleanup_orphaned_pods(namespace, cutoff_time, cleaned, dry_run)
- await self._cleanup_orphaned_configmaps(namespace, cutoff_time, cleaned, dry_run)
-
- return cleaned
-
- except Exception as e:
- self.logger.error(f"Failed to cleanup orphaned resources: {e}")
- raise InfrastructureError(f"Orphaned resource cleanup failed: {e}") from e
-
- async def _cleanup_orphaned_pods(
- self, namespace: str, cutoff_time: datetime, cleaned: ResourceDict, dry_run: bool
- ) -> None:
- """Clean up orphaned pods"""
- pods = await self.v1.list_namespaced_pod(namespace, label_selector="app=integr8s")
-
- terminal_phases = {"Succeeded", "Failed", "Unknown"}
-
- for pod in pods.items:
- if (
- pod.metadata.creation_timestamp.replace(tzinfo=timezone.utc) < cutoff_time
- and pod.status.phase in terminal_phases
- ):
- cleaned["pods"].append(pod.metadata.name)
-
- if not dry_run:
- try:
- await self._delete_pod(pod.metadata.name, namespace)
- except Exception as e:
- self.logger.error(f"Failed to delete orphaned pod {pod.metadata.name}: {e}")
-
- async def _cleanup_orphaned_configmaps(
- self, namespace: str, cutoff_time: datetime, cleaned: ResourceDict, dry_run: bool
- ) -> None:
- """Clean up orphaned ConfigMaps"""
- configmaps = await self.v1.list_namespaced_config_map(namespace, label_selector="app=integr8s")
-
- for cm in configmaps.items:
- if cm.metadata.creation_timestamp.replace(tzinfo=timezone.utc) < cutoff_time:
- cleaned["configmaps"].append(cm.metadata.name)
-
- if not dry_run:
- try:
- await self.v1.delete_namespaced_config_map(cm.metadata.name, namespace)
- except Exception as e:
- self.logger.error(f"Failed to delete orphaned ConfigMap {cm.metadata.name}: {e}")
-
- async def get_resource_usage(self, namespace: str = "default") -> CountDict:
- """Get current resource usage counts"""
- label_selector = "app=integr8s"
-
- default_counts = {"pods": 0, "configmaps": 0, "network_policies": 0}
-
- try:
- # Get pods count
- try:
- pods = await self.v1.list_namespaced_pod(namespace, label_selector=label_selector)
- pod_count = len(pods.items)
- except Exception as e:
- self.logger.warning(f"Failed to get pods: {e}")
- pod_count = 0
-
- # Get configmaps count
- try:
- configmaps = await self.v1.list_namespaced_config_map(namespace, label_selector=label_selector)
- configmap_count = len(configmaps.items)
- except Exception as e:
- self.logger.warning(f"Failed to get configmaps: {e}")
- configmap_count = 0
-
- # Get network policies count
- try:
- policies = await self.networking_v1.list_namespaced_network_policy(
- namespace, label_selector=label_selector
- )
- policy_count = len(policies.items)
- except Exception as e:
- self.logger.warning(f"Failed to get network policies: {e}")
- policy_count = 0
-
- return {
- "pods": pod_count,
- "configmaps": configmap_count,
- "network_policies": policy_count,
- }
-
- except Exception as e:
- self.logger.error(f"Failed to get resource usage: {e}")
- return default_counts
diff --git a/backend/app/services/saga/saga_step.py b/backend/app/services/saga/saga_step.py
index fe3ab191..c0f560aa 100644
--- a/backend/app/services/saga/saga_step.py
+++ b/backend/app/services/saga/saga_step.py
@@ -1,8 +1,6 @@
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
-from fastapi.encoders import jsonable_encoder
-
from app.domain.events import DomainEvent
T = TypeVar("T", bound=DomainEvent)
@@ -29,31 +27,6 @@ def add_compensation(self, compensation: "CompensationStep") -> None:
"""Add compensation step"""
self.compensations.append(compensation)
- def to_public_dict(self) -> dict[str, Any]:
- """Return a safe, persistable snapshot of context data.
-
- - Excludes private/ephemeral keys (prefixed with "_")
- - Encodes values to JSON-friendly types using FastAPI's jsonable_encoder
- """
-
- def _is_simple(val: Any) -> bool:
- if isinstance(val, (str, int, float, bool)) or val is None:
- return True
- if isinstance(val, dict):
- return all(isinstance(k, str) and _is_simple(v) for k, v in val.items())
- if isinstance(val, (list, tuple)):
- return all(_is_simple(i) for i in val)
- return False
-
- public: dict[str, Any] = {}
- for k, v in self.data.items():
- if isinstance(k, str) and k.startswith("_"):
- continue
- encoded = jsonable_encoder(v, exclude_none=False)
- if _is_simple(encoded):
- public[k] = encoded
- # else: drop complex/unknown types
- return public
class SagaStep(ABC, Generic[T]):
diff --git a/backend/app/services/user_settings_service.py b/backend/app/services/user_settings_service.py
index d7f21584..d181e4b1 100644
--- a/backend/app/services/user_settings_service.py
+++ b/backend/app/services/user_settings_service.py
@@ -205,8 +205,3 @@ def _add_to_cache(self, user_id: str, settings: DomainUserSettings) -> None:
self._cache[user_id] = settings
self.logger.debug(f"Cached settings for user {user_id}", cache_size=len(self._cache))
- async def reset_user_settings(self, user_id: str) -> None:
- """Reset user settings by deleting all data and cache."""
- await self.invalidate_cache(user_id)
- await self.repository.delete_user_settings(user_id)
- self.logger.info(f"Reset settings for user {user_id}")
diff --git a/backend/pyproject.toml b/backend/pyproject.toml
index ba26ca9b..b1a96720 100644
--- a/backend/pyproject.toml
+++ b/backend/pyproject.toml
@@ -150,6 +150,7 @@ dev = [
"pytest-xdist==3.6.1",
"ruff==0.14.10",
"types-cachetools==6.2.0.20250827",
+ "vulture==2.14",
]
# Ruff configuration
@@ -166,7 +167,8 @@ exclude = [
"**/venv/**",
"**/.venv/**",
"**/site-packages/**",
- "**/.claude/**"
+ "**/.claude/**",
+ "vulture_whitelist.py"
]
[tool.ruff.lint.flake8-bugbear]
@@ -183,6 +185,7 @@ warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
disable_error_code = ["import-untyped", "import-not-found"]
+exclude = ["vulture_whitelist\\.py$"]
plugins = ["pydantic.mypy"]
[tool.pydantic-mypy]
@@ -224,3 +227,9 @@ OTEL_SDK_DISABLED = "true" # Prevents teardown delays from OTLP exporter retrie
[tool.coverage.run]
# Use sysmon for faster coverage (requires Python 3.12+)
core = "sysmon"
+
+# Vulture dead code detection
+[tool.vulture]
+min_confidence = 80
+paths = ["app"]
+exclude = ["tests"]
diff --git a/backend/tests/e2e/db/repositories/test_dlq_repository.py b/backend/tests/e2e/db/repositories/test_dlq_repository.py
index 4238b7c6..7d1f048d 100644
--- a/backend/tests/e2e/db/repositories/test_dlq_repository.py
+++ b/backend/tests/e2e/db/repositories/test_dlq_repository.py
@@ -81,8 +81,5 @@ async def test_list_get_and_updates(repo: DLQRepository) -> None:
assert res.total >= 3 and len(res.messages) <= 2
msg = await repo.get_message_by_id("id1")
assert msg and msg.event.event_id == "id1"
- assert await repo.mark_message_retried("id1") in (True, False)
- assert await repo.mark_message_discarded("id1", "r") in (True, False)
-
topics = await repo.get_topics_summary()
assert any(t.topic == "t1" for t in topics)
diff --git a/backend/tests/e2e/db/repositories/test_execution_repository.py b/backend/tests/e2e/db/repositories/test_execution_repository.py
index 083ebf82..c1396020 100644
--- a/backend/tests/e2e/db/repositories/test_execution_repository.py
+++ b/backend/tests/e2e/db/repositories/test_execution_repository.py
@@ -4,7 +4,7 @@
import pytest
from app.db.repositories import ExecutionRepository
from app.domain.enums import ExecutionStatus
-from app.domain.execution import DomainExecutionCreate, DomainExecutionUpdate
+from app.domain.execution import DomainExecutionCreate
_test_logger = structlog.get_logger("test.db.repositories.execution_repository")
@@ -30,13 +30,6 @@ async def test_execution_crud_and_query() -> None:
got = await repo.get_execution(created.execution_id)
assert got and got.script.startswith("print") and got.status == ExecutionStatus.QUEUED
- # Update
- update = DomainExecutionUpdate(status=ExecutionStatus.RUNNING, stdout="ok")
- ok = await repo.update_execution(created.execution_id, update)
- assert ok is True
- got2 = await repo.get_execution(created.execution_id)
- assert got2 and got2.status == ExecutionStatus.RUNNING
-
# List
items = await repo.get_executions({"user_id": user_id}, limit=10, skip=0, sort=[("created_at", 1)])
assert any(x.execution_id == created.execution_id for x in items)
diff --git a/backend/tests/e2e/dlq/test_dlq_discard.py b/backend/tests/e2e/dlq/test_dlq_discard.py
deleted file mode 100644
index e9a89e03..00000000
--- a/backend/tests/e2e/dlq/test_dlq_discard.py
+++ /dev/null
@@ -1,141 +0,0 @@
-import logging
-import uuid
-from datetime import datetime, timezone
-
-import pytest
-from app.db.docs import DLQMessageDocument
-from app.db.repositories import DLQRepository
-from app.dlq.models import DLQMessageStatus
-from app.domain.enums import KafkaTopic
-from dishka import AsyncContainer
-
-from tests.conftest import make_execution_requested_event
-
-pytestmark = [pytest.mark.e2e, pytest.mark.mongodb]
-
-_test_logger = logging.getLogger("test.dlq.discard")
-
-
-async def _create_dlq_document(
- event_id: str | None = None,
- status: DLQMessageStatus = DLQMessageStatus.PENDING,
-) -> DLQMessageDocument:
- """Helper to create a DLQ document directly in MongoDB."""
- if event_id is None:
- event_id = str(uuid.uuid4())
-
- event = make_execution_requested_event(execution_id=f"exec-{uuid.uuid4().hex[:8]}")
- # Override event_id for test predictability
- event_dict = event.model_dump()
- event_dict["event_id"] = event_id
- now = datetime.now(timezone.utc)
-
- doc = DLQMessageDocument(
- event=event_dict,
- original_topic=KafkaTopic.EXECUTION_EVENTS,
- error="Test error",
- retry_count=0,
- failed_at=now,
- status=status,
- producer_id="test-producer",
- created_at=now,
- )
- await doc.insert()
- return doc
-
-
-@pytest.mark.asyncio
-async def test_dlq_repository_marks_message_discarded(scope: AsyncContainer) -> None:
- """Test that DLQRepository.mark_message_discarded() updates status correctly."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DLQ document
- event_id = f"dlq-discard-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.PENDING)
-
- # Discard the message
- reason = "max_retries_exceeded"
- result = await repository.mark_message_discarded(event_id, reason)
-
- assert result is True
-
- # Verify the status changed
- updated_doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert updated_doc is not None
- assert updated_doc.status == DLQMessageStatus.DISCARDED
- assert updated_doc.discard_reason == reason
- assert updated_doc.discarded_at is not None
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_nonexistent_message_returns_false(scope: AsyncContainer) -> None:
- """Test that discarding a nonexistent message returns False."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Try to discard a message that doesn't exist
- result = await repository.mark_message_discarded(
- f"nonexistent-{uuid.uuid4().hex[:8]}",
- "test_reason",
- )
-
- assert result is False
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_sets_timestamp(scope: AsyncContainer) -> None:
- """Test that discarding sets the discarded_at timestamp."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DLQ document
- event_id = f"dlq-ts-{uuid.uuid4().hex[:8]}"
- before_discard = datetime.now(timezone.utc)
- await _create_dlq_document(event_id=event_id)
-
- # Discard the message
- await repository.mark_message_discarded(event_id, "manual_discard")
- after_discard = datetime.now(timezone.utc)
-
- # Verify timestamp is set correctly
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.discarded_at is not None
- assert before_discard <= doc.discarded_at <= after_discard
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_with_custom_reason(scope: AsyncContainer) -> None:
- """Test that custom discard reasons are stored."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DLQ document
- event_id = f"dlq-reason-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id)
-
- # Discard with custom reason
- custom_reason = "manual: User requested deletion due to invalid payload"
- await repository.mark_message_discarded(event_id, custom_reason)
-
- # Verify the reason is stored
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.discard_reason == custom_reason
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_from_scheduled_status(scope: AsyncContainer) -> None:
- """Test that scheduled messages can be discarded."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a SCHEDULED DLQ document
- event_id = f"dlq-scheduled-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED)
-
- # Discard the message
- result = await repository.mark_message_discarded(event_id, "policy_change")
-
- assert result is True
-
- # Verify status transition
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.DISCARDED
diff --git a/backend/tests/e2e/dlq/test_dlq_retry.py b/backend/tests/e2e/dlq/test_dlq_retry.py
deleted file mode 100644
index 931b6565..00000000
--- a/backend/tests/e2e/dlq/test_dlq_retry.py
+++ /dev/null
@@ -1,205 +0,0 @@
-import logging
-import uuid
-from datetime import datetime, timezone
-
-import pytest
-from app.db.docs import DLQMessageDocument
-from app.db.repositories import DLQRepository
-from app.dlq.models import DLQMessageStatus
-from app.domain.enums import KafkaTopic
-from dishka import AsyncContainer
-
-from tests.conftest import make_execution_requested_event
-
-pytestmark = [pytest.mark.e2e, pytest.mark.mongodb]
-
-_test_logger = logging.getLogger("test.dlq.retry")
-
-
-async def _create_dlq_document(
- event_id: str | None = None,
- status: DLQMessageStatus = DLQMessageStatus.PENDING,
-) -> DLQMessageDocument:
- """Helper to create a DLQ document directly in MongoDB."""
- if event_id is None:
- event_id = str(uuid.uuid4())
-
- event = make_execution_requested_event(execution_id=f"exec-{uuid.uuid4().hex[:8]}")
- # Override event_id for test predictability
- event_dict = event.model_dump()
- event_dict["event_id"] = event_id
- now = datetime.now(timezone.utc)
-
- doc = DLQMessageDocument(
- event=event_dict,
- original_topic=KafkaTopic.EXECUTION_EVENTS,
- error="Test error",
- retry_count=0,
- failed_at=now,
- status=status,
- producer_id="test-producer",
- created_at=now,
- )
- await doc.insert()
- return doc
-
-
-@pytest.mark.asyncio
-async def test_dlq_repository_marks_message_retried(scope: AsyncContainer) -> None:
- """Test that DLQRepository.mark_message_retried() updates status correctly."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DLQ document
- event_id = f"dlq-retry-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED)
-
- # Mark as retried
- result = await repository.mark_message_retried(event_id)
-
- assert result is True
-
- # Verify the status changed
- updated_doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert updated_doc is not None
- assert updated_doc.status == DLQMessageStatus.RETRIED
- assert updated_doc.retried_at is not None
-
-
-@pytest.mark.asyncio
-async def test_dlq_retry_nonexistent_message_returns_false(scope: AsyncContainer) -> None:
- """Test that retrying a nonexistent message returns False."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Try to retry a message that doesn't exist
- result = await repository.mark_message_retried(f"nonexistent-{uuid.uuid4().hex[:8]}")
-
- assert result is False
-
-
-@pytest.mark.asyncio
-async def test_dlq_retry_sets_timestamp(scope: AsyncContainer) -> None:
- """Test that retrying sets the retried_at timestamp."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DLQ document
- event_id = f"dlq-retry-ts-{uuid.uuid4().hex[:8]}"
- before_retry = datetime.now(timezone.utc)
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.SCHEDULED)
-
- # Retry the message
- await repository.mark_message_retried(event_id)
- after_retry = datetime.now(timezone.utc)
-
- # Verify timestamp is set correctly
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.retried_at is not None
- assert before_retry <= doc.retried_at <= after_retry
-
-
-@pytest.mark.asyncio
-async def test_dlq_retry_from_pending_status(scope: AsyncContainer) -> None:
- """Test that pending messages can be retried."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a PENDING DLQ document
- event_id = f"dlq-pending-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.PENDING)
-
- # Retry the message
- result = await repository.mark_message_retried(event_id)
-
- assert result is True
-
- # Verify status transition
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.RETRIED
-
-
-@pytest.mark.asyncio
-async def test_dlq_retry_already_retried_message(scope: AsyncContainer) -> None:
- """Test that retrying an already RETRIED message still succeeds at repository level.
-
- Note: The DLQManager.retry_message_manually guards against this, but the
- repository method doesn't - it's a low-level operation that always succeeds.
- """
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create an already RETRIED document
- event_id = f"dlq-already-retried-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.RETRIED)
-
- # Repository method still succeeds (no guard at this level)
- result = await repository.mark_message_retried(event_id)
- assert result is True
-
- # Status remains RETRIED
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.RETRIED
-
-
-@pytest.mark.asyncio
-async def test_dlq_retry_discarded_message(scope: AsyncContainer) -> None:
- """Test that retrying a DISCARDED message still succeeds at repository level.
-
- Note: The DLQManager.retry_message_manually guards against this and returns False,
- but the repository method is a low-level operation that doesn't validate transitions.
- """
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a DISCARDED document
- event_id = f"dlq-discarded-retry-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.DISCARDED)
-
- # Repository method succeeds (transitions status back to RETRIED)
- result = await repository.mark_message_retried(event_id)
- assert result is True
-
- # Status is now RETRIED (repository doesn't guard transitions)
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.RETRIED
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_already_discarded_message(scope: AsyncContainer) -> None:
- """Test that discarding an already DISCARDED message updates the reason."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create an already DISCARDED document
- event_id = f"dlq-already-discarded-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.DISCARDED)
-
- # Discard again with a new reason
- new_reason = "updated_discard_reason"
- result = await repository.mark_message_discarded(event_id, new_reason)
- assert result is True
-
- # Reason is updated
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.DISCARDED
- assert doc.discard_reason == new_reason
-
-
-@pytest.mark.asyncio
-async def test_dlq_discard_retried_message(scope: AsyncContainer) -> None:
- """Test that discarding a RETRIED message transitions to DISCARDED."""
- repository: DLQRepository = await scope.get(DLQRepository)
-
- # Create a RETRIED document
- event_id = f"dlq-retried-discard-{uuid.uuid4().hex[:8]}"
- await _create_dlq_document(event_id=event_id, status=DLQMessageStatus.RETRIED)
-
- # Discard it
- reason = "manual_cleanup"
- result = await repository.mark_message_discarded(event_id, reason)
- assert result is True
-
- # Status is now DISCARDED
- doc = await DLQMessageDocument.find_one({"event.event_id": event_id})
- assert doc is not None
- assert doc.status == DLQMessageStatus.DISCARDED
- assert doc.discard_reason == reason
diff --git a/backend/tests/e2e/events/test_consumer_group_monitor.py b/backend/tests/e2e/events/test_consumer_group_monitor.py
deleted file mode 100644
index 97d45c21..00000000
--- a/backend/tests/e2e/events/test_consumer_group_monitor.py
+++ /dev/null
@@ -1,20 +0,0 @@
-import logging
-
-import pytest
-from app.events.consumer_group_monitor import ConsumerGroupHealth, NativeConsumerGroupMonitor
-from app.settings import Settings
-
-_test_logger = logging.getLogger("test.events.consumer_group_monitor")
-
-
-@pytest.mark.e2e
-@pytest.mark.kafka
-@pytest.mark.asyncio
-async def test_list_groups_and_error_status(test_settings: Settings) -> None:
- mon = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger)
- groups = await mon.list_consumer_groups()
- assert isinstance(groups, list)
-
- # Query a non-existent group to exercise error handling with real AdminClient
- status = await mon.get_consumer_group_status("nonexistent-group-for-tests")
- assert status.health in {ConsumerGroupHealth.UNHEALTHY, ConsumerGroupHealth.UNKNOWN}
diff --git a/backend/tests/e2e/events/test_consumer_group_monitor_real.py b/backend/tests/e2e/events/test_consumer_group_monitor_real.py
deleted file mode 100644
index bc53591f..00000000
--- a/backend/tests/e2e/events/test_consumer_group_monitor_real.py
+++ /dev/null
@@ -1,106 +0,0 @@
-import logging
-from uuid import uuid4
-
-import pytest
-from app.events.consumer_group_monitor import (
- ConsumerGroupHealth,
- ConsumerGroupState,
- ConsumerGroupStatus,
- NativeConsumerGroupMonitor,
-)
-from app.settings import Settings
-
-pytestmark = [pytest.mark.e2e, pytest.mark.kafka]
-
-_test_logger = logging.getLogger("test.events.consumer_group_monitor_real")
-
-
-@pytest.mark.asyncio
-async def test_consumer_group_status_error_path_and_summary(test_settings: Settings) -> None:
- monitor = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger)
- # Non-existent group triggers error-handling path and returns minimal status
- gid = f"does-not-exist-{uuid4().hex[:8]}"
- status = await monitor.get_consumer_group_status(gid, include_lag=False)
- assert status.group_id == gid
- # Some clusters report non-existent groups as DEAD/UNKNOWN rather than raising
- assert status.state in (ConsumerGroupState.DEAD, ConsumerGroupState.UNKNOWN)
- assert status.health is ConsumerGroupHealth.UNHEALTHY
- summary = monitor.get_health_summary(status)
- assert summary["group_id"] == gid and summary["health"] == ConsumerGroupHealth.UNHEALTHY
-
-
-def test_assess_group_health_branches(test_settings: Settings) -> None:
- m = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger)
- # Unknown state (triggers unhealthy)
- s = ConsumerGroupStatus(
- group_id="g",
- state=ConsumerGroupState.UNKNOWN,
- protocol="p",
- protocol_type="ptype",
- coordinator="c",
- members=[],
- member_count=0,
- assigned_partitions=0,
- partition_distribution={},
- total_lag=0,
- )
- h, msg = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.UNHEALTHY and "unknown" in msg.lower()
-
- # Dead state
- s.state = ConsumerGroupState.DEAD
- h, msg = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.UNHEALTHY and "dead" in msg.lower()
-
- # Insufficient members
- s.state = ConsumerGroupState.STABLE
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.UNHEALTHY
-
- # Rebalancing (preparing)
- s.member_count = 1
- s.state = ConsumerGroupState.PREPARING_REBALANCE
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.DEGRADED
-
- # Rebalancing (completing)
- s.state = ConsumerGroupState.COMPLETING_REBALANCE
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.DEGRADED
-
- # Empty group
- s.state = ConsumerGroupState.EMPTY
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.DEGRADED
-
- # Critical lag
- s.state = ConsumerGroupState.STABLE
- s.total_lag = m.critical_lag_threshold + 1
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.UNHEALTHY
-
- # Warning lag
- s.total_lag = m.warning_lag_threshold + 1
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.DEGRADED
-
- # Uneven partition distribution
- s.total_lag = 0
- s.partition_distribution = {"m1": 10, "m2": 1}
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.DEGRADED
-
- # Healthy stable
- s.partition_distribution = {"m1": 1, "m2": 1}
- s.assigned_partitions = 2
- h, _ = m._assess_group_health(s) # noqa: SLF001
- assert h is ConsumerGroupHealth.HEALTHY
-
-
-@pytest.mark.asyncio
-async def test_multiple_group_status_mixed_errors(test_settings: Settings) -> None:
- m = NativeConsumerGroupMonitor(settings=test_settings, logger=_test_logger)
- gids = [f"none-{uuid4().hex[:6]}", f"none-{uuid4().hex[:6]}"]
- res = await m.get_multiple_group_status(gids, include_lag=False)
- assert set(res.keys()) == set(gids)
- assert all(v.health is ConsumerGroupHealth.UNHEALTHY for v in res.values())
diff --git a/backend/tests/e2e/events/test_producer_roundtrip.py b/backend/tests/e2e/events/test_producer_roundtrip.py
deleted file mode 100644
index 773c7f3a..00000000
--- a/backend/tests/e2e/events/test_producer_roundtrip.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import logging
-from uuid import uuid4
-
-import pytest
-from app.events.core import UnifiedProducer
-from app.infrastructure.kafka.mappings import get_topic_for_event
-from dishka import AsyncContainer
-
-from tests.conftest import make_execution_requested_event
-
-pytestmark = [pytest.mark.e2e, pytest.mark.kafka]
-
-_test_logger = logging.getLogger("test.events.producer_roundtrip")
-
-
-@pytest.mark.asyncio
-async def test_unified_producer_produce_and_send_to_dlq(
- scope: AsyncContainer,
-) -> None:
- prod: UnifiedProducer = await scope.get(UnifiedProducer)
-
- ev = make_execution_requested_event(execution_id=f"exec-{uuid4().hex[:8]}")
- await prod.produce(ev, key=ev.execution_id)
-
- # Exercise send_to_dlq path — should not raise
- topic = str(get_topic_for_event(ev.event_type))
- await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)
diff --git a/backend/tests/e2e/services/idempotency/test_redis_repository.py b/backend/tests/e2e/services/idempotency/test_redis_repository.py
index 79535328..40ebe0a4 100644
--- a/backend/tests/e2e/services/idempotency/test_redis_repository.py
+++ b/backend/tests/e2e/services/idempotency/test_redis_repository.py
@@ -121,9 +121,8 @@ async def test_insert_find_update_delete_flow(
ttl_after = await redis_client.ttl(key)
assert ttl_after == ttl or ttl_after <= ttl # ttl should not increase
- # Delete
- deleted = await repository.delete_key(sample_record.key)
- assert deleted == 1
+ # Clean up
+ await redis_client.delete(key)
assert await repository.find_by_key(sample_record.key) is None
@@ -136,6 +135,3 @@ async def test_update_record_when_missing(
assert res == 0
-@pytest.mark.asyncio
-async def test_health_check(repository: RedisIdempotencyRepository) -> None:
- await repository.health_check() # should not raise
diff --git a/backend/tests/e2e/services/user_settings/test_user_settings_service.py b/backend/tests/e2e/services/user_settings/test_user_settings_service.py
index 11a2dda9..c385b8b9 100644
--- a/backend/tests/e2e/services/user_settings/test_user_settings_service.py
+++ b/backend/tests/e2e/services/user_settings/test_user_settings_service.py
@@ -460,27 +460,6 @@ async def test_invalidate_cache(self, scope: AsyncContainer) -> None:
settings = await svc.get_user_settings(user_id)
assert settings.user_id == user_id
-class TestResetUserSettings:
- """Tests for reset_user_settings method."""
-
- @pytest.mark.asyncio
- async def test_reset_user_settings(self, scope: AsyncContainer) -> None:
- """Reset user settings clears all data."""
- svc: UserSettingsService = await scope.get(UserSettingsService)
- user_id = _unique_user_id()
-
- # Set some custom settings
- await svc.update_theme(user_id, Theme.DARK)
- await svc.update_custom_setting(user_id, "custom_key", "custom_value")
-
- # Reset
- await svc.reset_user_settings(user_id)
-
- # Get fresh - should be defaults
- settings = await svc.get_user_settings_fresh(user_id)
- assert settings.theme == Theme.AUTO # Default
-
-
class TestSettingsIntegration:
"""Integration tests for settings workflow."""
diff --git a/backend/tests/e2e/test_resource_cleaner.py b/backend/tests/e2e/test_resource_cleaner.py
deleted file mode 100644
index 43086327..00000000
--- a/backend/tests/e2e/test_resource_cleaner.py
+++ /dev/null
@@ -1,84 +0,0 @@
-import asyncio
-from datetime import datetime
-
-import pytest
-from app.services.result_processor import ResourceCleaner
-from app.settings import Settings
-from dishka import AsyncContainer
-from kubernetes_asyncio import client as k8s_client
-
-pytestmark = [pytest.mark.e2e, pytest.mark.k8s]
-
-
-@pytest.mark.asyncio
-async def test_get_resource_usage(scope: AsyncContainer, test_settings: Settings) -> None:
- resource_cleaner = await scope.get(ResourceCleaner)
- usage = await resource_cleaner.get_resource_usage(namespace=test_settings.K8S_NAMESPACE)
- assert set(usage.keys()) >= {"pods", "configmaps", "network_policies"}
-
-
-@pytest.mark.asyncio
-async def test_cleanup_orphaned_resources_dry_run(scope: AsyncContainer, test_settings: Settings) -> None:
- resource_cleaner = await scope.get(ResourceCleaner)
- cleaned = await resource_cleaner.cleanup_orphaned_resources(
- namespace=test_settings.K8S_NAMESPACE,
- max_age_hours=0,
- dry_run=True,
- )
- assert set(cleaned.keys()) >= {"pods", "configmaps", "pvcs"}
-
-
-@pytest.mark.asyncio
-async def test_cleanup_nonexistent_pod(scope: AsyncContainer, test_settings: Settings) -> None:
- resource_cleaner = await scope.get(ResourceCleaner)
- namespace = test_settings.K8S_NAMESPACE
- nonexistent_pod = "integr8s-test-nonexistent-pod"
-
- # Use a local timeout variable with buffer for scheduler jitter
- timeout = 2 # Reduced from 5s since non-existent resources return immediately (404)
- jitter_buffer = 0.5 # Account for scheduler/GC pauses
-
- start_time = asyncio.get_running_loop().time()
- await resource_cleaner.cleanup_pod_resources(
- pod_name=nonexistent_pod,
- namespace=namespace,
- execution_id="test-exec-nonexistent",
- timeout=timeout,
- )
- elapsed = asyncio.get_running_loop().time() - start_time
-
- assert elapsed < timeout + jitter_buffer, (
- f"Cleanup took {elapsed:.2f}s, expected < {timeout + jitter_buffer}s for non-existent resources"
- )
-
- usage = await resource_cleaner.get_resource_usage(namespace=namespace)
- assert isinstance(usage.get("pods", 0), int)
- assert isinstance(usage.get("configmaps", 0), int)
-
-
-@pytest.mark.asyncio
-async def test_cleanup_orphaned_configmaps_dry_run(scope: AsyncContainer, test_settings: Settings) -> None:
- api_client = await scope.get(k8s_client.ApiClient)
- resource_cleaner = await scope.get(ResourceCleaner)
-
- v1 = k8s_client.CoreV1Api(api_client)
- ns = test_settings.K8S_NAMESPACE
- name = f"int-test-cm-{int(datetime.now().timestamp())}"
-
- metadata = k8s_client.V1ObjectMeta(
- name=name,
- labels={"app": "integr8s", "execution-id": "e-int-test"},
- )
- body = k8s_client.V1ConfigMap(metadata=metadata, data={"k": "v"})
- await v1.create_namespaced_config_map(namespace=ns, body=body)
-
- try:
- res = await resource_cleaner.cleanup_orphaned_resources(namespace=ns, max_age_hours=0, dry_run=True)
- assert any(name == cm for cm in res.get("configmaps", [])), (
- f"Expected ConfigMap '{name}' to be detected as orphan candidate"
- )
- finally:
- try:
- await v1.delete_namespaced_config_map(name=name, namespace=ns)
- except Exception:
- pass
diff --git a/backend/tests/unit/events/test_mappings_and_types.py b/backend/tests/unit/events/test_mappings_and_types.py
index bc7cfb10..ce32da44 100644
--- a/backend/tests/unit/events/test_mappings_and_types.py
+++ b/backend/tests/unit/events/test_mappings_and_types.py
@@ -1,7 +1,6 @@
from app.domain.enums import EventType, KafkaTopic
from app.infrastructure.kafka.mappings import (
get_event_class_for_type,
- get_event_types_for_topic,
get_topic_for_event,
)
@@ -11,6 +10,3 @@ def test_event_mappings_topics() -> None:
assert get_topic_for_event(EventType.EXECUTION_REQUESTED) == KafkaTopic.EXECUTION_EVENTS
cls = get_event_class_for_type(EventType.CREATE_POD_COMMAND)
assert cls is not None
- # All event types for a topic include at least one of the checked types
- ev_types = get_event_types_for_topic(KafkaTopic.EXECUTION_EVENTS)
- assert EventType.EXECUTION_REQUESTED in ev_types
diff --git a/backend/tests/unit/schemas_pydantic/test_notification_schemas.py b/backend/tests/unit/schemas_pydantic/test_notification_schemas.py
index 16b58e3a..8971a09e 100644
--- a/backend/tests/unit/schemas_pydantic/test_notification_schemas.py
+++ b/backend/tests/unit/schemas_pydantic/test_notification_schemas.py
@@ -2,7 +2,7 @@
import pytest
from app.domain.enums import NotificationChannel, NotificationSeverity, NotificationStatus
-from app.schemas_pydantic.notification import Notification, NotificationBatch
+from app.schemas_pydantic.notification import Notification
def test_notification_scheduled_for_must_be_future() -> None:
@@ -25,16 +25,3 @@ def test_notification_scheduled_for_must_be_future() -> None:
body="y",
scheduled_for=datetime.now(UTC) - timedelta(seconds=1),
)
-
-
-def test_notification_batch_validation_limits() -> None:
- n1 = Notification(user_id="u1", channel=NotificationChannel.IN_APP, subject="a", body="b")
- ok = NotificationBatch(notifications=[n1])
- assert ok.processed_count == 0
-
- with pytest.raises(ValueError):
- NotificationBatch(notifications=[])
-
- many = [n1.model_copy() for _ in range(1001)]
- with pytest.raises(ValueError):
- NotificationBatch(notifications=many)
diff --git a/backend/tests/unit/services/saga/test_saga_comprehensive.py b/backend/tests/unit/services/saga/test_saga_comprehensive.py
index a532e5be..180475dd 100644
--- a/backend/tests/unit/services/saga/test_saga_comprehensive.py
+++ b/backend/tests/unit/services/saga/test_saga_comprehensive.py
@@ -37,14 +37,6 @@ def _req_event() -> ExecutionRequestedEvent:
return make_execution_requested_event(execution_id="e1", script="print('x')")
-def test_saga_context_public_filtering() -> None:
- ctx = SagaContext("s1", "e1")
- ctx.set("public", 1)
- ctx.set("_private", 2)
- out = ctx.to_public_dict()
- assert "public" in out and "_private" not in out
-
-
@pytest.mark.asyncio
async def test_step_success_and_compensation_chain() -> None:
ctx = SagaContext("s1", "e1")
diff --git a/backend/tests/unit/services/saga/test_saga_step_and_base.py b/backend/tests/unit/services/saga/test_saga_step_and_base.py
index 33eea711..a7de8a51 100644
--- a/backend/tests/unit/services/saga/test_saga_step_and_base.py
+++ b/backend/tests/unit/services/saga/test_saga_step_and_base.py
@@ -5,28 +5,6 @@
pytestmark = pytest.mark.unit
-def test_saga_context_public_dict_filters_and_encodes() -> None:
- ctx = SagaContext("s1", "e1")
- ctx.set("a", 1)
- ctx.set("b", {"x": 2})
- ctx.set("c", [1, 2, 3])
- ctx.set("_private", {"won't": "leak"})
-
- # Complex non-JSON object -> should be dropped
- class X:
- pass
-
- ctx.set("complex", X())
- # Nested complex objects get encoded by jsonable_encoder
- # The nested dict with a complex object gets partially encoded
- ctx.set("nested", {"ok": 1, "bad": X()})
-
- d = ctx.to_public_dict()
- # jsonable_encoder converts unknown objects to {}, which is still considered "simple"
- # so they pass through the filter
- assert d == {"a": 1, "b": {"x": 2}, "c": [1, 2, 3], "complex": {}, "nested": {"ok": 1, "bad": {}}}
-
-
class _DummyComp(CompensationStep):
def __init__(self) -> None:
super().__init__("dummy")
diff --git a/backend/uv.lock b/backend/uv.lock
index 6576b413..7a8464c6 100644
--- a/backend/uv.lock
+++ b/backend/uv.lock
@@ -1176,6 +1176,7 @@ dev = [
{ name = "pytest-xdist" },
{ name = "ruff" },
{ name = "types-cachetools" },
+ { name = "vulture" },
]
[package.metadata]
@@ -1319,6 +1320,7 @@ dev = [
{ name = "pytest-xdist", specifier = "==3.6.1" },
{ name = "ruff", specifier = "==0.14.10" },
{ name = "types-cachetools", specifier = "==6.2.0.20250827" },
+ { name = "vulture", specifier = "==2.14" },
]
[[package]]
@@ -3029,6 +3031,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b1/4b/4cef6ce21a2aaca9d852a6e84ef4f135d99fcd74fa75105e2fc0c8308acd/uvicorn-0.34.2-py3-none-any.whl", hash = "sha256:deb49af569084536d269fe0a6d67e3754f104cf03aba7c11c40f01aadf33c403", size = 62483, upload-time = "2025-04-19T06:02:48.42Z" },
]
+[[package]]
+name = "vulture"
+version = "2.14"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/8e/25/925f35db758a0f9199113aaf61d703de891676b082bd7cf73ea01d6000f7/vulture-2.14.tar.gz", hash = "sha256:cb8277902a1138deeab796ec5bef7076a6e0248ca3607a3f3dee0b6d9e9b8415", size = 58823, upload-time = "2024-12-08T17:39:43.319Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/a0/56/0cc15b8ff2613c1d5c3dc1f3f576ede1c43868c1bc2e5ccaa2d4bcd7974d/vulture-2.14-py2.py3-none-any.whl", hash = "sha256:d9a90dba89607489548a49d557f8bac8112bd25d3cbc8aeef23e860811bd5ed9", size = 28915, upload-time = "2024-12-08T17:39:40.573Z" },
+]
+
[[package]]
name = "websocket-client"
version = "1.8.0"
diff --git a/backend/vulture_whitelist.py b/backend/vulture_whitelist.py
new file mode 100644
index 00000000..09e7763e
--- /dev/null
+++ b/backend/vulture_whitelist.py
@@ -0,0 +1,8 @@
+"""Vulture whitelist — framework-required code that appears unused at the AST level.
+
+Structlog processors must accept (logger, method_name, event_dict) by convention,
+even when `method_name` is unused in the body.
+"""
+
+method_name # unused variable (app/core/logging.py:36)
+method_name # unused variable (app/core/logging.py:53)
diff --git a/docs/architecture/domain-exceptions.md b/docs/architecture/domain-exceptions.md
index 4b10cfb2..87b4dd44 100644
--- a/docs/architecture/domain-exceptions.md
+++ b/docs/architecture/domain-exceptions.md
@@ -61,7 +61,6 @@ DomainError
│ └── SagaInvalidStateError
└── InfrastructureError
├── EventPublishError
- ├── SagaCompensationError
├── SagaTimeoutError
└── ReplayOperationError
```
@@ -74,7 +73,7 @@ Domain exceptions live in their respective domain modules:
|--------------|-----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Base | `app/domain/exceptions.py` | `DomainError`, `NotFoundError`, `ValidationError`, etc. |
| Execution | `app/domain/execution/exceptions.py` | `ExecutionNotFoundError`, `RuntimeNotSupportedError`, `EventPublishError` |
-| Saga | `app/domain/saga/exceptions.py` | `SagaNotFoundError`, `SagaAccessDeniedError`, `SagaInvalidStateError`, `SagaCompensationError`, `SagaTimeoutError`, `SagaConcurrencyError` |
+| Saga | `app/domain/saga/exceptions.py` | `SagaNotFoundError`, `SagaAccessDeniedError`, `SagaInvalidStateError`, `SagaTimeoutError`, `SagaConcurrencyError` |
| Notification | `app/domain/notification/exceptions.py` | `NotificationNotFoundError`, `NotificationThrottledError`, `NotificationValidationError` |
| Saved Script | `app/domain/saved_script/exceptions.py` | `SavedScriptNotFoundError` |
| Replay | `app/domain/replay/exceptions.py` | `ReplaySessionNotFoundError`, `ReplayOperationError` |
diff --git a/docs/architecture/model-conversion.md b/docs/architecture/model-conversion.md
index a9a1e03d..2b23486d 100644
--- a/docs/architecture/model-conversion.md
+++ b/docs/architecture/model-conversion.md
@@ -160,8 +160,7 @@ Avoid approaches that scatter conversion logic or couple layers incorrectly.
| Conversion logic in models | Scatters boundary logic; keep it in repositories/services |
Thin wrappers that delegate to `model_dump()` with specific options are fine. For example, `BaseEvent.to_dict()` applies
-`by_alias=True, mode="json"` consistently across all events. Methods with additional behavior like filtering private
-keys (`to_public_dict()`) are also acceptable—the anti-pattern is manually listing fields.
+`by_alias=True, mode="json"` consistently across all events—the anti-pattern is manually listing fields.
## Quick reference
diff --git a/docs/architecture/services-overview.md b/docs/architecture/services-overview.md
index 2a26bedb..5892dace 100644
--- a/docs/architecture/services-overview.md
+++ b/docs/architecture/services-overview.md
@@ -20,7 +20,7 @@ The k8s_worker/ module runs worker.py, a long-running service that consumes SAGA
The pod_monitor/ module has monitor.py and event_mapper.py which watch K8s Pod/Container status, map them into domain events with helpful metadata like exit codes, failure reasons, and stdout/stderr slices, then publish into EXECUTION_EVENTS. This decouples what the cluster did from what the system emits so clients always see consistent event shapes. See [Pod Monitor](../components/workers/pod_monitor.md) for details.
-The result_processor/ module runs processor.py which consumes terminal events, persists results, normalizes error types, and always records metrics by error type. The resource_cleaner.py deletes the per-execution pod and ConfigMap after completion. See [Result Processor](../components/workers/result_processor.md) for details.
+The result_processor/ module runs processor.py which consumes terminal events, persists results, normalizes error types, and always records metrics by error type. See [Result Processor](../components/workers/result_processor.md) for details.
## Event and streaming services
@@ -56,7 +56,7 @@ The Saga Orchestrator is a stateful choreographer for execution lifecycle. It su
The K8s Worker materializes saga commands into K8s resources. It consumes SAGA_COMMANDS and creates ConfigMap (script, entrypoint) and Pod (hardened), relying on CiliumNetworkPolicy deny-all applied to the namespace rather than per-exec policies. Pod spec disables DNS, drops caps, runs non-root, no SA token. It publishes PodCreated and ExecutionStarted events, or errors when creation fails.
-The Result Processor persists terminal execution outcomes, updates metrics, and triggers cleanup. It consumes EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT, writes DB records for status, outputs, errors, and usage, records metrics for errors by type and durations, and invokes ResourceCleaner to delete pods and configmaps.
+The Result Processor persists terminal execution outcomes and updates metrics. It consumes EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT, writes DB records for status, outputs, errors, and usage, and records metrics for errors by type and durations. Kubernetes resource cleanup (pods and ConfigMaps) is handled automatically via ownerReference — the ConfigMap is owned by the pod, so K8s garbage-collects both when the pod is deleted (by saga compensation or manual cleanup).
The Pod Monitor observes K8s pod state and translates to domain events. It watches CoreV1 Pod events and publishes EXECUTION_EVENTS for running, container started, logs tail, etc., adding useful metadata and best-effort failure analysis.
@@ -64,7 +64,7 @@ The Coordinator owns the admission/queuing policy and sets priorities. It intera
The Event Replay worker re-emits stored events to debug or rebuild projections, taking DB/event store and filters as inputs and outputting replayed events on regular topics with provenance markers.
-The DLQ Processor drains and retries dead-lettered messages with backoff and visibility, taking DLQ topic and retry policies as inputs and outputting successful re-publishes or parked messages with audit trail. See [Dead Letter Queue](../components/dead-letter-queue.md) for more on DLQ handling.
+The DLQ Processor retries dead-lettered messages with backoff and visibility. Failed messages are persisted directly to MongoDB (no DLQ Kafka topic). The processor is APScheduler-based, periodically checking for retryable messages and republishing them via a manually started broker. See [Dead Letter Queue](../components/dead-letter-queue.md) for more on DLQ handling.
## Operational notes
diff --git a/docs/components/dead-letter-queue.md b/docs/components/dead-letter-queue.md
index 9881594a..4201c3fc 100644
--- a/docs/components/dead-letter-queue.md
+++ b/docs/components/dead-letter-queue.md
@@ -4,46 +4,25 @@
Picture this: your Kafka consumer is happily processing events when suddenly it hits a poison pill - maybe a malformed event, a database outage, or just a bug in your code. Without a Dead Letter Queue (DLQ), that event would either block your entire consumer (if you keep retrying forever) or get lost forever (if you skip it). Neither option is great for an event-sourced system where events are your source of truth.
-The DLQ acts as a safety net. When an event fails processing after a reasonable number of retries, instead of losing it, we send it to a special "dead letter" topic where it can be examined, fixed, and potentially replayed later.
+The DLQ acts as a safety net. When an event fails processing after a reasonable number of retries, instead of losing it, we persist it to MongoDB where it can be examined, fixed, and potentially replayed later.
## How it works
-The DLQ implementation in Integr8sCode follows a producer-agnostic pattern. Producers can route failed events to the DLQ; a dedicated DLQ manager/processor consumes DLQ messages, persists them, and applies retry/discard policies. Here's how the pieces fit together:
+The DLQ implementation in Integr8sCode persists failed messages directly to MongoDB (no DLQ Kafka topic). The DLQ manager handles persistence and the DLQ processor worker retries messages on a schedule.
-### Producer side
+### Failure handling
-Every `UnifiedProducer` instance has a `send_to_dlq()` method that knows how to package up a failed event with all its context - the original topic, error message, retry count, and metadata about when and where it failed. When called, it creates a special DLQ message and sends it to the `dead_letter_queue` topic in Kafka.
-
-The beauty here is that the producer doesn't make decisions about *when* to send something to DLQ - it just provides the mechanism. The decision-making happens at a higher level.
-
-### Consumer side
-
-When event handling fails in normal consumers, producers may call `send_to_dlq()` to persist failure context. The DLQ manager is the single component that reads the DLQ topic and orchestrates retries according to policy.
-
-For example, the event store consumer sets up its error handling like this:
-
-```python
-if self.producer:
- dlq_handler = create_dlq_error_handler(
- producer=self.producer,
- original_topic="event-store",
- max_retries=3
- )
- self.consumer.register_error_callback(dlq_handler)
-```
-
-This handler tracks retry counts per event. If an event fails 3 times, it gets sent to DLQ. The consumer itself doesn't know about any of this - it just calls the error callback and moves on.
+When event handling fails in consumers, the DLQ manager (`app/dlq/manager.py`) packages the failed event with all its context — the original topic, error message, retry count, and metadata about when and where it failed — and persists it directly to MongoDB.
### DLQ processor
-The `run_dlq_processor` is a separate service that monitors the dead letter queue topic. It's responsible for the retry orchestration. When it sees a message in the DLQ, it applies topic-specific retry policies to determine when (or if) to retry sending that message back to its original topic.
+The `run_dlq_processor` is a separate APScheduler-based worker that periodically checks MongoDB for retryable DLQ messages. When it finds eligible messages, it republishes them to their original Kafka topics via a manually started broker.
Different topics have different retry strategies configured:
- **Execution requests** get aggressive retries with exponential backoff - these are critical user operations
-- **Pod events** get fewer retries with longer delays - these are less critical monitoring events
+- **Pod events** get fewer retries with longer delays - these are less critical monitoring events
- **Resource allocation** events get immediate retries - these need quick resolution
-- **WebSocket events** use fixed intervals - these are real-time updates that become stale quickly
The processor also implements safety features like:
- Maximum age checks (messages older than 7 days are discarded)
@@ -56,22 +35,20 @@ The processor also implements safety features like:
graph TD
Consumer[Consumer] -->|event fails| Handler[Error Handler]
Handler -->|retries < limit| Kafka[(Kafka redeliver)]
- Handler -->|retries >= limit| Producer[Producer]
- Producer -->|send_to_dlq| DLQTopic[(dead_letter_queue)]
- DLQTopic --> Processor[DLQ Processor]
- Processor -->|check policy| Decision{retry?}
- Decision -->|yes, after delay| Original[(Original Topic)]
- Decision -->|max attempts| Archive[(Archive)]
+ Handler -->|retries >= limit| DLQManager[DLQ Manager]
+ DLQManager -->|persist| MongoDB[(MongoDB)]
+ Processor[DLQ Processor
APScheduler] -->|poll| MongoDB
+ Processor -->|retry via broker| Original[(Original Topic)]
Original -->|reprocess| Consumer
```
-When a consumer fails to process an event, it invokes the registered error callback. The DLQ handler tracks how many times this specific event has failed. If the count is under the retry limit, the handler simply logs and returns, letting Kafka redeliver the message on its next poll. Once the retry limit is exceeded, the handler calls `producer.send_to_dlq()`, which packages the event together with failure context (original topic, error message, retry count, timestamps) and publishes it to the `dead_letter_queue` topic.
+When a consumer fails to process an event, it invokes the registered error callback. The DLQ handler tracks how many times this specific event has failed. If the count is under the retry limit, the handler simply logs and returns, letting Kafka redeliver the message on its next poll. Once the retry limit is exceeded, the DLQ manager persists the message with full failure context (original topic, error message, retry count, timestamps) directly to MongoDB.
-The DLQ processor service consumes from this topic and applies topic-specific retry policies. Depending on the policy, it either schedules the message for redelivery to its original topic after an appropriate delay, or archives it if maximum attempts have been exhausted. When redelivered, the message goes back through normal consumer processing. If it fails again, the cycle repeats until either success or final archival
+The DLQ processor worker runs on an APScheduler schedule, querying MongoDB for retryable messages. Depending on the retry policy, it either republishes the message to its original Kafka topic via a manually started broker, or archives it if maximum attempts have been exhausted.
## Configuration
-The DLQ system is configured through environment variables in the `dlq-processor` service:
+The DLQ system is configured through settings:
- `DLQ_MAX_RETRY_ATTEMPTS`: Global maximum retries (default: 5)
- `DLQ_RETRY_DELAY_HOURS`: Base delay between retries (default: 1 hour)
@@ -82,9 +59,7 @@ Each topic can override these with custom retry policies in the DLQ processor co
## Failure modes
-If the DLQ processor itself fails, messages stay safely in the `dead_letter_queue` topic - Kafka acts as the durable buffer. When the processor restarts, it picks up where it left off.
-
-If sending to DLQ fails (extremely rare - would mean Kafka is down), the producer logs a critical error but doesn't crash the consumer. This follows the principle that it's better to lose one message than to stop processing everything.
+If the DLQ processor itself fails, messages stay safely in MongoDB. When the processor restarts, it picks up where it left off.
The system is designed to be resilient but not perfect. In catastrophic scenarios, you still have Kafka's built-in durability and the ability to replay topics from the beginning if needed.
@@ -94,5 +69,4 @@ The system is designed to be resilient but not perfect. In catastrophic scenario
|--------------------------------------------------------------------------------------------------------------------------|------------------------|
| [`run_dlq_processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_dlq_processor.py) | DLQ processor worker |
| [`manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/dlq/manager.py) | DLQ management logic |
-| [`unified_producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/unified_producer.py) | `send_to_dlq()` method |
| [`dlq.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/api/routes/dlq.py) | Admin API routes |
diff --git a/docs/components/workers/k8s_worker.md b/docs/components/workers/k8s_worker.md
index 4c44124c..f2ef2d69 100644
--- a/docs/components/workers/k8s_worker.md
+++ b/docs/components/workers/k8s_worker.md
@@ -19,9 +19,11 @@ it builds a complete pod specification including:
- A ConfigMap containing the user's script and an entrypoint script
- A Pod manifest with hardened security context
- Proper labels for tracking and network policy matching
+- An ownerReference on the ConfigMap pointing to the pod, so K8s garbage-collects the ConfigMap when the pod is deleted
After creating resources, it publishes `PodCreated` and `ExecutionStarted` events so the rest of the system knows
-the execution has begun.
+the execution has begun. For `DeletePodCommand` (saga compensation), only the pod needs to be deleted — K8s
+automatically cleans up the owned ConfigMap.
## Pod security
diff --git a/docs/components/workers/result_processor.md b/docs/components/workers/result_processor.md
index b94fb005..13c37a74 100644
--- a/docs/components/workers/result_processor.md
+++ b/docs/components/workers/result_processor.md
@@ -24,8 +24,7 @@ execution has finished and results are available.
## Resource cleanup
-The processor also handles cleanup after executions complete. The [`resource_cleaner.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/resource_cleaner.py)
-module deletes ConfigMaps and pods that are no longer needed, keeping the Kubernetes namespace tidy.
+Kubernetes resource cleanup is handled via ownerReference — the K8s worker sets an ownerReference on each ConfigMap pointing to its pod. When the pod is deleted (by saga compensation or manual cleanup), K8s garbage-collects the ConfigMap automatically. The result processor itself does not perform resource cleanup.
## Key files
@@ -33,7 +32,6 @@ module deletes ConfigMaps and pods that are no longer needed, keeping the Kubern
|--------------------------------------------------------------------------------------------------------------------------------|------------------|
| [`run_result_processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_result_processor.py) | Entry point |
| [`processor.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/processor.py) | Result handling |
-| [`resource_cleaner.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/result_processor/resource_cleaner.py) | K8s cleanup |
## Deployment
diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md
index 53a63e22..745f7966 100644
--- a/docs/operations/cicd.md
+++ b/docs/operations/cicd.md
@@ -11,6 +11,7 @@ graph LR
subgraph "Code Quality (lightweight)"
Ruff["Ruff Linting"]
MyPy["MyPy Type Check"]
+ Vulture["Vulture Dead Code"]
ESLint["ESLint + Svelte Check"]
end
@@ -42,7 +43,7 @@ graph LR
Pages["GitHub Pages"]
end
- Push["Push / PR"] --> Ruff & MyPy & ESLint & Bandit & SBOM & UnitBE & UnitFE & Docs
+ Push["Push / PR"] --> Ruff & MyPy & Vulture & ESLint & Bandit & SBOM & UnitBE & UnitFE & Docs
Build -->|main, all tests pass| Scan
Docs -->|main only| Pages
```
@@ -63,6 +64,7 @@ forward when everything passes.
| MyPy Type Checking | `.github/workflows/mypy.yml` | Push/PR to `main` | Python static type analysis |
| Frontend CI | `.github/workflows/frontend-ci.yml` | Push/PR to `main` (frontend changes) | ESLint + Svelte type check |
| Security Scanning | `.github/workflows/security.yml` | Push/PR to `main` | Bandit SAST |
+| Dead Code Detection | `.github/workflows/vulture.yml` | Push/PR to `main` | Vulture dead code analysis |
| Documentation | `.github/workflows/docs.yml` | Push/PR (`docs/`, `mkdocs.yml`) | MkDocs build and GitHub Pages deploy |
## Composite actions
@@ -307,6 +309,7 @@ Three lightweight workflows run independently since they catch obvious issues qu
- [Ruff](https://docs.astral.sh/ruff/) checks for style violations, import ordering, and common bugs
- [mypy](https://mypy.readthedocs.io/) with strict settings catches type mismatches and missing return types
+- [Vulture](https://github.com/jendrikseipp/vulture) detects unused functions, classes, methods, imports, and variables. A whitelist file (`backend/vulture_whitelist.py`) excludes framework patterns (Dishka providers, FastAPI routes, Beanie documents, Pydantic models) that look unused but are called at runtime
**Frontend (TypeScript/Svelte):**
@@ -385,6 +388,9 @@ uv run ruff check . --config pyproject.toml
# Type checking
uv run mypy --config-file pyproject.toml --strict .
+# Dead code detection
+uv run vulture app/ vulture_whitelist.py
+
# Security scan
uv tool run bandit -r . -x tests/ -ll
diff --git a/docs/operations/tracing.md b/docs/operations/tracing.md
index c8e51ccd..9281e5fc 100644
--- a/docs/operations/tracing.md
+++ b/docs/operations/tracing.md
@@ -63,7 +63,7 @@ sequenceDiagram
Note over Worker: re-injects trace context
alt on failure
- Kafka->>DLQ: message to dead_letter_queue
+ Worker->>DLQ: persist to MongoDB
Note over DLQ: dlq.consume span
preserves original context
DLQ->>Kafka: retry with same context
end