From 8e34b590ceed094fa8a33744b6f3b48a93ac1c64 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 12 Feb 2026 20:22:18 +0100 Subject: [PATCH 1/2] feat: 1 kafka topic = 1 msg type, everywhere --- backend/app/core/container.py | 29 +-- backend/app/core/providers.py | 78 +++---- backend/app/db/docs/replay.py | 3 +- backend/app/dlq/manager.py | 96 ++++----- backend/app/domain/enums/__init__.py | 3 +- backend/app/domain/enums/events.py | 2 +- backend/app/domain/enums/kafka.py | 50 ----- backend/app/domain/replay/models.py | 3 +- backend/app/events/core/producer.py | 47 +--- backend/app/events/handlers.py | 189 +++++++--------- backend/app/infrastructure/kafka/__init__.py | 6 +- backend/app/infrastructure/kafka/mappings.py | 145 +++---------- backend/app/infrastructure/kafka/topics.py | 201 ------------------ backend/app/schemas_pydantic/replay.py | 3 +- backend/app/schemas_pydantic/replay_models.py | 3 +- backend/app/services/pod_monitor/config.py | 13 +- backend/config.dlq-processor.toml | 2 - backend/scripts/create_topics.py | 41 +--- backend/tests/e2e/conftest.py | 10 +- backend/tests/e2e/dlq/test_dlq_discard.py | 4 +- backend/tests/e2e/dlq/test_dlq_manager.py | 6 +- backend/tests/e2e/dlq/test_dlq_retry.py | 4 +- .../e2e/events/test_producer_roundtrip.py | 7 +- .../events/test_event_schema_coverage.py | 16 +- .../unit/events/test_mappings_and_types.py | 18 +- .../pod_monitor/test_config_and_init.py | 5 +- backend/workers/run_dlq_processor.py | 70 ------ 27 files changed, 238 insertions(+), 816 deletions(-) delete mode 100644 backend/app/infrastructure/kafka/topics.py delete mode 100644 backend/config.dlq-processor.toml delete mode 100644 backend/workers/run_dlq_processor.py diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 43b737f1..5b08538e 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -8,7 +8,6 @@ BusinessServicesProvider, CoordinatorProvider, CoreServicesProvider, - DLQProvider, DLQWorkerProvider, EventReplayProvider, EventReplayWorkerProvider, @@ -51,7 +50,7 @@ def create_app_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), + DLQWorkerProvider(), SagaOrchestratorProvider(), KafkaServicesProvider(), SSEProvider(), @@ -79,7 +78,6 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), ResultProcessorProvider(), context={Settings: settings}, ) @@ -96,7 +94,6 @@ def create_coordinator_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), CoordinatorProvider(), context={Settings: settings}, ) @@ -113,7 +110,6 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), KubernetesProvider(), K8sWorkerProvider(), context={Settings: settings}, @@ -131,7 +127,6 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), KafkaServicesProvider(), KubernetesProvider(), PodMonitorProvider(), @@ -153,7 +148,6 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), SagaWorkerProvider(), context={Settings: settings}, ) @@ -173,27 +167,6 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQProvider(), EventReplayWorkerProvider(), context={Settings: settings}, ) - - -def create_dlq_processor_container(settings: Settings) -> AsyncContainer: - """Create DI container for the DLQ processor worker. - - Uses DLQWorkerProvider which adds APScheduler-managed retry monitoring - and configures retry policies and filters. - """ - return make_async_container( - SettingsProvider(), - LoggingProvider(), - BrokerProvider(), - RedisProvider(), - CoreServicesProvider(), - MetricsProvider(), - RepositoryProvider(), - MessagingProvider(), - DLQWorkerProvider(), - context={Settings: settings}, - ) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index 033132a9..0e1bc7ac 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -47,7 +47,7 @@ ) from app.dlq.manager import DLQManager from app.dlq.models import RetryPolicy, RetryStrategy -from app.domain.enums import KafkaTopic +from app.domain.enums import EventType from app.domain.rate_limit import RateLimitConfig from app.domain.saga import SagaConfig from app.events.core import UnifiedProducer @@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]: Keys must match message.original_topic (full prefixed topic name). """ - execution_events = f"{prefix}{KafkaTopic.EXECUTION_EVENTS}" - pod_events = f"{prefix}{KafkaTopic.POD_EVENTS}" - saga_commands = f"{prefix}{KafkaTopic.SAGA_COMMANDS}" - execution_results = f"{prefix}{KafkaTopic.EXECUTION_RESULTS}" - - return { - execution_events: RetryPolicy( - topic=execution_events, + policies: dict[str, RetryPolicy] = {} + + for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, + EventType.EXECUTION_CANCELLED): + topic = f"{prefix}{et}" + policies[topic] = RetryPolicy( + topic=topic, strategy=RetryStrategy.EXPONENTIAL_BACKOFF, max_retries=5, base_delay_seconds=30, max_delay_seconds=300, retry_multiplier=2.0, - ), - pod_events: RetryPolicy( - topic=pod_events, + ) + + for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED): + topic = f"{prefix}{et}" + policies[topic] = RetryPolicy( + topic=topic, strategy=RetryStrategy.EXPONENTIAL_BACKOFF, max_retries=3, base_delay_seconds=60, max_delay_seconds=600, retry_multiplier=3.0, - ), - saga_commands: RetryPolicy( - topic=saga_commands, + ) + + for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND): + topic = f"{prefix}{et}" + policies[topic] = RetryPolicy( + topic=topic, strategy=RetryStrategy.EXPONENTIAL_BACKOFF, max_retries=5, base_delay_seconds=30, max_delay_seconds=300, retry_multiplier=2.0, - ), - execution_results: RetryPolicy( - topic=execution_results, + ) + + for et in (EventType.RESULT_STORED, EventType.RESULT_FAILED): + topic = f"{prefix}{et}" + policies[topic] = RetryPolicy( + topic=topic, strategy=RetryStrategy.IMMEDIATE, max_retries=3, - ), - } - - -class DLQProvider(Provider): - """Provides DLQManager without scheduling. Used by all containers except the DLQ worker.""" - - scope = Scope.APP - - @provide - def get_dlq_manager( - self, - broker: KafkaBroker, - settings: Settings, - logger: logging.Logger, - dlq_metrics: DLQMetrics, - repository: DLQRepository, - ) -> DLQManager: - return DLQManager( - settings=settings, - broker=broker, - logger=logger, - dlq_metrics=dlq_metrics, - repository=repository, - default_retry_policy=_default_retry_policy(), - retry_policies=_default_retry_policies(settings.KAFKA_TOPIC_PREFIX), ) + return policies -class DLQWorkerProvider(Provider): - """Provides DLQManager with APScheduler-managed retry monitoring. - Used by the DLQ worker container only. - """ +class DLQWorkerProvider(Provider): + """Provides DLQManager with APScheduler-managed retry monitoring.""" scope = Scope.APP diff --git a/backend/app/db/docs/replay.py b/backend/app/db/docs/replay.py index 37ec9441..f87fbeff 100644 --- a/backend/app/db/docs/replay.py +++ b/backend/app/db/docs/replay.py @@ -5,7 +5,7 @@ from pydantic import BaseModel, ConfigDict, Field from pymongo import IndexModel -from app.domain.enums import EventType, KafkaTopic, ReplayStatus, ReplayTarget, ReplayType +from app.domain.enums import ReplayStatus, ReplayTarget, ReplayType from app.domain.replay import ReplayError, ReplayFilter @@ -24,7 +24,6 @@ class ReplayConfig(BaseModel): batch_size: int = Field(default=100, ge=1, le=1000) max_events: int | None = Field(default=None, ge=1) - target_topics: dict[EventType, KafkaTopic] | None = None target_file_path: str | None = None skip_errors: bool = True diff --git a/backend/app/dlq/manager.py b/backend/app/dlq/manager.py index 323a12c4..d41df790 100644 --- a/backend/app/dlq/manager.py +++ b/backend/app/dlq/manager.py @@ -15,7 +15,6 @@ RetryPolicy, RetryStrategy, ) -from app.domain.enums import KafkaTopic from app.domain.events import ( DLQMessageDiscardedEvent, DLQMessageReceivedEvent, @@ -42,7 +41,6 @@ def __init__( repository: DLQRepository, default_retry_policy: RetryPolicy, retry_policies: dict[str, RetryPolicy], - dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE, filters: list[Callable[[DLQMessage], bool]] | None = None, ): self.settings = settings @@ -50,7 +48,6 @@ def __init__( self.logger = logger self.metrics = dlq_metrics self.repository = repository - self.dlq_topic = dlq_topic self.default_retry_policy = default_retry_policy self._retry_policies = retry_policies @@ -61,7 +58,7 @@ def __init__( ] if f is not None ] - self._dlq_events_topic = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DLQ_EVENTS}" + self._topic_prefix = settings.KAFKA_TOPIC_PREFIX def _filter_test_events(self, message: DLQMessage) -> bool: return not message.event.event_id.startswith("test-") @@ -87,23 +84,24 @@ async def handle_message(self, message: DLQMessage) -> None: message.last_updated = datetime.now(timezone.utc) await self.repository.save_message(message) - await self._broker.publish( - DLQMessageReceivedEvent( - dlq_event_id=message.event.event_id, - original_topic=message.original_topic, - original_event_type=message.event.event_type, - error=message.error, - retry_count=message.retry_count, - producer_id=message.producer_id, - failed_at=message.failed_at, - metadata=EventMetadata( - service_name="dlq-manager", - service_version="1.0.0", - correlation_id=message.event.metadata.correlation_id, - user_id=message.event.metadata.user_id, - ), + received_event = DLQMessageReceivedEvent( + dlq_event_id=message.event.event_id, + original_topic=message.original_topic, + original_event_type=message.event.event_type, + error=message.error, + retry_count=message.retry_count, + producer_id=message.producer_id, + failed_at=message.failed_at, + metadata=EventMetadata( + service_name="dlq-manager", + service_version="1.0.0", + correlation_id=message.event.metadata.correlation_id, + user_id=message.event.metadata.user_id, ), - topic=self._dlq_events_topic, + ) + await self._broker.publish( + received_event, + topic=f"{self._topic_prefix}{received_event.event_type}", ) retry_policy = self._retry_policies.get(message.original_topic, self.default_retry_policy) @@ -144,21 +142,22 @@ async def retry_message(self, message: DLQMessage) -> None: ), ) - await self._broker.publish( - DLQMessageRetriedEvent( - dlq_event_id=message.event.event_id, - original_topic=message.original_topic, - original_event_type=message.event.event_type, - retry_count=new_retry_count, - retry_topic=message.original_topic, - metadata=EventMetadata( - service_name="dlq-manager", - service_version="1.0.0", - correlation_id=message.event.metadata.correlation_id, - user_id=message.event.metadata.user_id, - ), + retried_event = DLQMessageRetriedEvent( + dlq_event_id=message.event.event_id, + original_topic=message.original_topic, + original_event_type=message.event.event_type, + retry_count=new_retry_count, + retry_topic=message.original_topic, + metadata=EventMetadata( + service_name="dlq-manager", + service_version="1.0.0", + correlation_id=message.event.metadata.correlation_id, + user_id=message.event.metadata.user_id, ), - topic=self._dlq_events_topic, + ) + await self._broker.publish( + retried_event, + topic=f"{self._topic_prefix}{retried_event.event_type}", ) self.logger.info("Successfully retried message", extra={"event_id": message.event.event_id}) @@ -175,21 +174,22 @@ async def discard_message(self, message: DLQMessage, reason: str) -> None: ), ) - await self._broker.publish( - DLQMessageDiscardedEvent( - dlq_event_id=message.event.event_id, - original_topic=message.original_topic, - original_event_type=message.event.event_type, - reason=reason, - retry_count=message.retry_count, - metadata=EventMetadata( - service_name="dlq-manager", - service_version="1.0.0", - correlation_id=message.event.metadata.correlation_id, - user_id=message.event.metadata.user_id, - ), + discarded_event = DLQMessageDiscardedEvent( + dlq_event_id=message.event.event_id, + original_topic=message.original_topic, + original_event_type=message.event.event_type, + reason=reason, + retry_count=message.retry_count, + metadata=EventMetadata( + service_name="dlq-manager", + service_version="1.0.0", + correlation_id=message.event.metadata.correlation_id, + user_id=message.event.metadata.user_id, ), - topic=self._dlq_events_topic, + ) + await self._broker.publish( + discarded_event, + topic=f"{self._topic_prefix}{discarded_event.event_type}", ) self.logger.warning("Discarded message", extra={"event_id": message.event.event_id, "reason": reason}) diff --git a/backend/app/domain/enums/__init__.py b/backend/app/domain/enums/__init__.py index dac9a236..7cd101a5 100644 --- a/backend/app/domain/enums/__init__.py +++ b/backend/app/domain/enums/__init__.py @@ -2,7 +2,7 @@ from app.domain.enums.common import Environment, ErrorType, ExportFormat, SortOrder, Theme from app.domain.enums.events import EventType from app.domain.enums.execution import CancelStatus, ExecutionStatus, QueuePriority -from app.domain.enums.kafka import GroupId, KafkaTopic +from app.domain.enums.kafka import GroupId from app.domain.enums.notification import ( NotificationChannel, NotificationSeverity, @@ -32,7 +32,6 @@ "QueuePriority", # Kafka "GroupId", - "KafkaTopic", # Notification "NotificationChannel", "NotificationSeverity", diff --git a/backend/app/domain/enums/events.py b/backend/app/domain/enums/events.py index 8927ca91..d3aaf334 100644 --- a/backend/app/domain/enums/events.py +++ b/backend/app/domain/enums/events.py @@ -82,7 +82,7 @@ class EventType(StringEnum): ALLOCATE_RESOURCES_COMMAND = "allocate_resources_command" RELEASE_RESOURCES_COMMAND = "release_resources_command" - # DLQ events + # DLQ DLQ_MESSAGE_RECEIVED = "dlq_message_received" DLQ_MESSAGE_RETRIED = "dlq_message_retried" DLQ_MESSAGE_DISCARDED = "dlq_message_discarded" diff --git a/backend/app/domain/enums/kafka.py b/backend/app/domain/enums/kafka.py index fbb5d692..8df0c3ec 100644 --- a/backend/app/domain/enums/kafka.py +++ b/backend/app/domain/enums/kafka.py @@ -1,55 +1,6 @@ from app.core.utils import StringEnum -class KafkaTopic(StringEnum): - """Kafka topic names used throughout the system.""" - - EXECUTION_EVENTS = "execution_events" - EXECUTION_COMPLETED = "execution_completed" - EXECUTION_FAILED = "execution_failed" - EXECUTION_TIMEOUT = "execution_timeout" - EXECUTION_REQUESTS = "execution_requests" - EXECUTION_COMMANDS = "execution_commands" - EXECUTION_TASKS = "execution_tasks" - - # Pod topics - POD_EVENTS = "pod_events" - POD_STATUS_UPDATES = "pod_status_updates" - POD_RESULTS = "pod_results" - - # Result topics - EXECUTION_RESULTS = "execution_results" - - # User topics - USER_EVENTS = "user_events" - USER_NOTIFICATIONS = "user_notifications" - USER_SETTINGS_EVENTS = "user_settings_events" - - # Script topics - SCRIPT_EVENTS = "script_events" - - # Security topics - SECURITY_EVENTS = "security_events" - - # Resource topics - RESOURCE_EVENTS = "resource_events" - - # Notification topics - NOTIFICATION_EVENTS = "notification_events" - - # System topics - SYSTEM_EVENTS = "system_events" - - # Saga topics - SAGA_EVENTS = "saga_events" - SAGA_COMMANDS = "saga_commands" - - # Infrastructure topics - DEAD_LETTER_QUEUE = "dead_letter_queue" - DLQ_EVENTS = "dlq_events" - WEBSOCKET_EVENTS = "websocket_events" - - class GroupId(StringEnum): """Kafka consumer group IDs.""" @@ -61,4 +12,3 @@ class GroupId(StringEnum): EVENT_STORE_CONSUMER = "event-store-consumer" WEBSOCKET_GATEWAY = "websocket-gateway" NOTIFICATION_SERVICE = "notification-service" - DLQ_MANAGER = "dlq-manager" diff --git a/backend/app/domain/replay/models.py b/backend/app/domain/replay/models.py index 5f1642a7..857b2084 100644 --- a/backend/app/domain/replay/models.py +++ b/backend/app/domain/replay/models.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, ConfigDict, Field -from app.domain.enums import EventType, KafkaTopic, ReplayStatus, ReplayTarget, ReplayType +from app.domain.enums import EventType, ReplayStatus, ReplayTarget, ReplayType class ReplayError(BaseModel): @@ -114,7 +114,6 @@ class ReplayConfig(BaseModel): batch_size: int = Field(default=100, ge=1, le=1000) max_events: int | None = Field(default=None, ge=1) - target_topics: dict[EventType, KafkaTopic] | None = None target_file_path: str | None = None skip_errors: bool = True diff --git a/backend/app/events/core/producer.py b/backend/app/events/core/producer.py index a1e5caf7..418f3722 100644 --- a/backend/app/events/core/producer.py +++ b/backend/app/events/core/producer.py @@ -1,16 +1,10 @@ -import asyncio import logging -import socket -from datetime import datetime, timezone from faststream.kafka import KafkaBroker from app.core.metrics import EventMetrics from app.db.repositories import EventRepository -from app.dlq.models import DLQMessage, DLQMessageStatus -from app.domain.enums import KafkaTopic from app.domain.events import DomainEvent -from app.infrastructure.kafka.mappings import EVENT_TYPE_TO_TOPIC from app.settings import Settings @@ -38,7 +32,7 @@ def __init__( async def produce(self, event_to_produce: DomainEvent, key: str) -> None: """Persist event to MongoDB, then publish to Kafka.""" await self._event_repository.store_event(event_to_produce) - topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}" + topic = f"{self._topic_prefix}{event_to_produce.event_type}" try: await self._broker.publish( message=event_to_produce, @@ -53,42 +47,3 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None: self._event_metrics.record_kafka_production_error(topic=topic, error_type=type(e).__name__) self.logger.error(f"Failed to produce message: {e}") raise - - async def send_to_dlq( - self, original_event: DomainEvent, original_topic: str, error: Exception, retry_count: int = 0 - ) -> None: - """Send a failed event to the Dead Letter Queue.""" - try: - current_task = asyncio.current_task() - task_name = current_task.get_name() if current_task else "main" - producer_id = f"{socket.gethostname()}-{task_name}" - - dlq_topic = f"{self._topic_prefix}{KafkaTopic.DEAD_LETTER_QUEUE}" - - dlq_msg = DLQMessage( - event=original_event, - original_topic=original_topic, - error=str(error), - retry_count=retry_count, - failed_at=datetime.now(timezone.utc), - status=DLQMessageStatus.PENDING, - producer_id=producer_id, - ) - - await self._broker.publish( - message=dlq_msg, - topic=dlq_topic, - key=original_event.event_id.encode(), - ) - - self._event_metrics.record_kafka_message_produced(dlq_topic) - self.logger.warning( - f"Event {original_event.event_id} sent to DLQ. " - f"Original topic: {original_topic}, Error: {error}, " - f"Retry count: {retry_count}" - ) - - except Exception as e: - self.logger.critical( - f"Failed to send event {original_event.event_id} to DLQ: {e}. Original error: {error}", exc_info=True - ) diff --git a/backend/app/events/handlers.py b/backend/app/events/handlers.py index c8bfc4e7..13584470 100644 --- a/backend/app/events/handlers.py +++ b/backend/app/events/handlers.py @@ -1,16 +1,11 @@ -import asyncio import logging -from datetime import datetime, timezone from typing import Any from dishka.integrations.faststream import FromDishka from faststream import AckPolicy -from faststream.kafka import KafkaBroker, KafkaMessage -from faststream.message import decode_message +from faststream.kafka import KafkaBroker -from app.dlq.manager import DLQManager -from app.dlq.models import DLQMessage -from app.domain.enums import EventType, GroupId, KafkaTopic +from app.domain.enums import EventType, GroupId from app.domain.events import ( CreatePodCommandEvent, DeletePodCommandEvent, @@ -22,7 +17,6 @@ ExecutionTimeoutEvent, ) from app.domain.idempotency import KeyStrategy -from app.infrastructure.kafka.mappings import CONSUMER_GROUP_SUBSCRIPTIONS from app.services.coordinator import ExecutionCoordinator from app.services.idempotency import IdempotencyManager from app.services.k8s_worker import KubernetesWorker @@ -58,26 +52,16 @@ async def with_idempotency( raise -def _topics(settings: Settings, group_id: GroupId) -> list[str]: - return [ - f"{settings.KAFKA_TOPIC_PREFIX}{t}" - for t in CONSUMER_GROUP_SUBSCRIPTIONS[group_id] - ] - - -def _event_type_filter(msg: Any, expected: str) -> bool: - """Body-based event_type filter for @sub(filter=...) lambdas.""" - return decode_message(msg).get("event_type") == expected # type: ignore[union-attr] +def _topic(settings: Settings, event_type: EventType) -> str: + return f"{settings.KAFKA_TOPIC_PREFIX}{event_type}" def register_coordinator_subscriber(broker: KafkaBroker, settings: Settings) -> None: - sub = broker.subscriber( - *_topics(settings, GroupId.EXECUTION_COORDINATOR), + @broker.subscriber( + _topic(settings, EventType.EXECUTION_REQUESTED), group_id=GroupId.EXECUTION_COORDINATOR, ack_policy=AckPolicy.ACK, ) - - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_REQUESTED)) async def on_execution_requested( body: ExecutionRequestedEvent, coordinator: FromDishka[ExecutionCoordinator], @@ -88,7 +72,11 @@ async def on_execution_requested( body, coordinator.handle_execution_requested, idem, KeyStrategy.EVENT_BASED, 7200, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_COMPLETED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_COMPLETED), + group_id=GroupId.EXECUTION_COORDINATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_completed( body: ExecutionCompletedEvent, coordinator: FromDishka[ExecutionCoordinator], @@ -99,7 +87,11 @@ async def on_execution_completed( body, coordinator.handle_execution_completed, idem, KeyStrategy.EVENT_BASED, 7200, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_FAILED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_FAILED), + group_id=GroupId.EXECUTION_COORDINATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_failed( body: ExecutionFailedEvent, coordinator: FromDishka[ExecutionCoordinator], @@ -110,7 +102,11 @@ async def on_execution_failed( body, coordinator.handle_execution_failed, idem, KeyStrategy.EVENT_BASED, 7200, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_CANCELLED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_CANCELLED), + group_id=GroupId.EXECUTION_COORDINATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_cancelled( body: ExecutionCancelledEvent, coordinator: FromDishka[ExecutionCoordinator], @@ -121,19 +117,13 @@ async def on_execution_cancelled( body, coordinator.handle_execution_cancelled, idem, KeyStrategy.EVENT_BASED, 7200, logger, ) - @sub - async def on_unhandled(body: DomainEvent) -> None: - pass - def register_k8s_worker_subscriber(broker: KafkaBroker, settings: Settings) -> None: - sub = broker.subscriber( - *_topics(settings, GroupId.K8S_WORKER), + @broker.subscriber( + _topic(settings, EventType.CREATE_POD_COMMAND), group_id=GroupId.K8S_WORKER, ack_policy=AckPolicy.ACK, ) - - @sub(filter=lambda msg: _event_type_filter(msg, EventType.CREATE_POD_COMMAND)) async def on_create_pod( body: CreatePodCommandEvent, worker: FromDishka[KubernetesWorker], @@ -144,7 +134,11 @@ async def on_create_pod( body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.DELETE_POD_COMMAND)) + @broker.subscriber( + _topic(settings, EventType.DELETE_POD_COMMAND), + group_id=GroupId.K8S_WORKER, + ack_policy=AckPolicy.ACK, + ) async def on_delete_pod( body: DeletePodCommandEvent, worker: FromDishka[KubernetesWorker], @@ -155,21 +149,15 @@ async def on_delete_pod( body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger, ) - @sub - async def on_unhandled(body: DomainEvent) -> None: - pass - def register_result_processor_subscriber(broker: KafkaBroker, settings: Settings) -> None: - sub = broker.subscriber( - *_topics(settings, GroupId.RESULT_PROCESSOR), + @broker.subscriber( + _topic(settings, EventType.EXECUTION_COMPLETED), group_id=GroupId.RESULT_PROCESSOR, ack_policy=AckPolicy.ACK, max_poll_records=1, auto_offset_reset="earliest", ) - - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_COMPLETED)) async def on_execution_completed( body: ExecutionCompletedEvent, processor: FromDishka[ResultProcessor], @@ -180,7 +168,13 @@ async def on_execution_completed( body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_FAILED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_FAILED), + group_id=GroupId.RESULT_PROCESSOR, + ack_policy=AckPolicy.ACK, + max_poll_records=1, + auto_offset_reset="earliest", + ) async def on_execution_failed( body: ExecutionFailedEvent, processor: FromDishka[ResultProcessor], @@ -191,7 +185,13 @@ async def on_execution_failed( body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger, ) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_TIMEOUT)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_TIMEOUT), + group_id=GroupId.RESULT_PROCESSOR, + ack_policy=AckPolicy.ACK, + max_poll_records=1, + auto_offset_reset="earliest", + ) async def on_execution_timeout( body: ExecutionTimeoutEvent, processor: FromDishka[ResultProcessor], @@ -202,55 +202,58 @@ async def on_execution_timeout( body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger, ) - @sub - async def on_unhandled(body: DomainEvent) -> None: - pass - def register_saga_subscriber(broker: KafkaBroker, settings: Settings) -> None: - sub = broker.subscriber( - *_topics(settings, GroupId.SAGA_ORCHESTRATOR), + @broker.subscriber( + _topic(settings, EventType.EXECUTION_REQUESTED), group_id=GroupId.SAGA_ORCHESTRATOR, ack_policy=AckPolicy.ACK, ) - - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_REQUESTED)) async def on_execution_requested( body: ExecutionRequestedEvent, orchestrator: FromDishka[SagaOrchestrator], ) -> None: await orchestrator.handle_execution_requested(body) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_COMPLETED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_COMPLETED), + group_id=GroupId.SAGA_ORCHESTRATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_completed( body: ExecutionCompletedEvent, orchestrator: FromDishka[SagaOrchestrator], ) -> None: await orchestrator.handle_execution_completed(body) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_FAILED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_FAILED), + group_id=GroupId.SAGA_ORCHESTRATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_failed( body: ExecutionFailedEvent, orchestrator: FromDishka[SagaOrchestrator], ) -> None: await orchestrator.handle_execution_failed(body) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_TIMEOUT)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_TIMEOUT), + group_id=GroupId.SAGA_ORCHESTRATOR, + ack_policy=AckPolicy.ACK, + ) async def on_execution_timeout( body: ExecutionTimeoutEvent, orchestrator: FromDishka[SagaOrchestrator], ) -> None: await orchestrator.handle_execution_timeout(body) - @sub - async def on_unhandled(body: DomainEvent) -> None: - pass - - def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None: + sse_topics = [_topic(settings, et) for et in SSERedisBus.SSE_ROUTED_EVENTS] + @broker.subscriber( - *_topics(settings, GroupId.WEBSOCKET_GATEWAY), + *sse_topics, group_id="sse-bridge-pool", ack_policy=AckPolicy.ACK_FIRST, auto_offset_reset="latest", @@ -260,77 +263,47 @@ async def on_sse_event( body: DomainEvent, sse_bus: FromDishka[SSERedisBus], ) -> None: - if body.event_type in SSERedisBus.SSE_ROUTED_EVENTS: - await sse_bus.route_domain_event(body) + await sse_bus.route_domain_event(body) def register_notification_subscriber(broker: KafkaBroker, settings: Settings) -> None: - sub = broker.subscriber( - *_topics(settings, GroupId.NOTIFICATION_SERVICE), + @broker.subscriber( + _topic(settings, EventType.EXECUTION_COMPLETED), group_id=GroupId.NOTIFICATION_SERVICE, ack_policy=AckPolicy.ACK, max_poll_records=10, auto_offset_reset="latest", ) - - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_COMPLETED)) async def on_execution_completed( body: ExecutionCompletedEvent, service: FromDishka[NotificationService], ) -> None: await service.handle_execution_completed(body) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_FAILED)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_FAILED), + group_id=GroupId.NOTIFICATION_SERVICE, + ack_policy=AckPolicy.ACK, + max_poll_records=10, + auto_offset_reset="latest", + ) async def on_execution_failed( body: ExecutionFailedEvent, service: FromDishka[NotificationService], ) -> None: await service.handle_execution_failed(body) - @sub(filter=lambda msg: _event_type_filter(msg, EventType.EXECUTION_TIMEOUT)) + @broker.subscriber( + _topic(settings, EventType.EXECUTION_TIMEOUT), + group_id=GroupId.NOTIFICATION_SERVICE, + ack_policy=AckPolicy.ACK, + max_poll_records=10, + auto_offset_reset="latest", + ) async def on_execution_timeout( body: ExecutionTimeoutEvent, service: FromDishka[NotificationService], ) -> None: await service.handle_execution_timeout(body) - @sub - async def on_unhandled(body: DomainEvent) -> None: - pass - - -def register_dlq_subscriber(broker: KafkaBroker, settings: Settings) -> None: - """Register a DLQ subscriber that consumes dead-letter messages. - - DLQ messages are JSON-encoded DLQMessage models (Pydantic serialization via FastStream). - All DLQ metadata is in the message body — no Kafka headers needed. - """ - topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DEAD_LETTER_QUEUE}" - @broker.subscriber( - topic_name, - group_id=GroupId.DLQ_MANAGER, - ack_policy=AckPolicy.ACK, - auto_offset_reset="earliest", - ) - async def on_dlq_message( - body: DLQMessage, - msg: KafkaMessage, - manager: FromDishka[DLQManager], - logger: FromDishka[logging.Logger], - ) -> None: - start = asyncio.get_running_loop().time() - raw = msg.raw_message - assert not isinstance(raw, tuple) - body.dlq_offset = raw.offset - body.dlq_partition = raw.partition - - await manager.handle_message(body) - - manager.metrics.record_dlq_message_received(body.original_topic, body.event.event_type) - manager.metrics.record_dlq_message_age( - (datetime.now(timezone.utc) - body.failed_at).total_seconds() - ) - manager.metrics.record_dlq_processing_duration( - asyncio.get_running_loop().time() - start, "process" - ) diff --git a/backend/app/infrastructure/kafka/__init__.py b/backend/app/infrastructure/kafka/__init__.py index fae49311..1a513861 100644 --- a/backend/app/infrastructure/kafka/__init__.py +++ b/backend/app/infrastructure/kafka/__init__.py @@ -1,12 +1,8 @@ from app.domain.events import DomainEvent, EventMetadata -from app.infrastructure.kafka.mappings import get_event_class_for_type, get_topic_for_event -from app.infrastructure.kafka.topics import get_all_topics, get_topic_configs +from app.infrastructure.kafka.mappings import get_event_class_for_type __all__ = [ "DomainEvent", "EventMetadata", - "get_all_topics", - "get_topic_configs", "get_event_class_for_type", - "get_topic_for_event", ] diff --git a/backend/app/infrastructure/kafka/mappings.py b/backend/app/infrastructure/kafka/mappings.py index c7597db2..0ee7e571 100644 --- a/backend/app/infrastructure/kafka/mappings.py +++ b/backend/app/infrastructure/kafka/mappings.py @@ -1,78 +1,35 @@ from functools import lru_cache from typing import get_args, get_origin -from app.domain.enums import EventType, GroupId, KafkaTopic +from app.domain.enums import EventType, GroupId -# EventType -> KafkaTopic routing -EVENT_TYPE_TO_TOPIC: dict[EventType, KafkaTopic] = { - # Execution events - EventType.EXECUTION_REQUESTED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_ACCEPTED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_QUEUED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_STARTED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_RUNNING: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_COMPLETED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_FAILED: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_TIMEOUT: KafkaTopic.EXECUTION_EVENTS, - EventType.EXECUTION_CANCELLED: KafkaTopic.EXECUTION_EVENTS, - # Pod events - EventType.POD_CREATED: KafkaTopic.POD_EVENTS, - EventType.POD_SCHEDULED: KafkaTopic.POD_EVENTS, - EventType.POD_RUNNING: KafkaTopic.POD_EVENTS, - EventType.POD_SUCCEEDED: KafkaTopic.POD_EVENTS, - EventType.POD_FAILED: KafkaTopic.POD_EVENTS, - EventType.POD_TERMINATED: KafkaTopic.POD_EVENTS, - EventType.POD_DELETED: KafkaTopic.POD_EVENTS, - # Result events - EventType.RESULT_STORED: KafkaTopic.EXECUTION_RESULTS, - EventType.RESULT_FAILED: KafkaTopic.EXECUTION_RESULTS, - # User events - EventType.USER_REGISTERED: KafkaTopic.USER_EVENTS, - EventType.USER_LOGIN: KafkaTopic.USER_EVENTS, - EventType.USER_LOGGED_IN: KafkaTopic.USER_EVENTS, - EventType.USER_LOGGED_OUT: KafkaTopic.USER_EVENTS, - EventType.USER_UPDATED: KafkaTopic.USER_EVENTS, - EventType.USER_DELETED: KafkaTopic.USER_EVENTS, - EventType.USER_SETTINGS_UPDATED: KafkaTopic.USER_SETTINGS_EVENTS, - # Notification events - EventType.NOTIFICATION_CREATED: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_SENT: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_DELIVERED: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_FAILED: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_READ: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_CLICKED: KafkaTopic.NOTIFICATION_EVENTS, - EventType.NOTIFICATION_PREFERENCES_UPDATED: KafkaTopic.NOTIFICATION_EVENTS, - # Script events - EventType.SCRIPT_SAVED: KafkaTopic.SCRIPT_EVENTS, - EventType.SCRIPT_DELETED: KafkaTopic.SCRIPT_EVENTS, - EventType.SCRIPT_SHARED: KafkaTopic.SCRIPT_EVENTS, - # Security events - EventType.SECURITY_VIOLATION: KafkaTopic.SECURITY_EVENTS, - EventType.RATE_LIMIT_EXCEEDED: KafkaTopic.SECURITY_EVENTS, - EventType.AUTH_FAILED: KafkaTopic.SECURITY_EVENTS, - # Resource events - EventType.RESOURCE_LIMIT_EXCEEDED: KafkaTopic.RESOURCE_EVENTS, - EventType.QUOTA_EXCEEDED: KafkaTopic.RESOURCE_EVENTS, - # System events - EventType.SYSTEM_ERROR: KafkaTopic.SYSTEM_EVENTS, - EventType.SERVICE_UNHEALTHY: KafkaTopic.SYSTEM_EVENTS, - EventType.SERVICE_RECOVERED: KafkaTopic.SYSTEM_EVENTS, - # Saga events - EventType.SAGA_STARTED: KafkaTopic.SAGA_EVENTS, - EventType.SAGA_COMPLETED: KafkaTopic.SAGA_EVENTS, - EventType.SAGA_FAILED: KafkaTopic.SAGA_EVENTS, - EventType.SAGA_CANCELLED: KafkaTopic.SAGA_EVENTS, - EventType.SAGA_COMPENSATING: KafkaTopic.SAGA_EVENTS, - EventType.SAGA_COMPENSATED: KafkaTopic.SAGA_EVENTS, - # Saga command events - EventType.CREATE_POD_COMMAND: KafkaTopic.SAGA_COMMANDS, - EventType.DELETE_POD_COMMAND: KafkaTopic.SAGA_COMMANDS, - EventType.ALLOCATE_RESOURCES_COMMAND: KafkaTopic.SAGA_COMMANDS, - EventType.RELEASE_RESOURCES_COMMAND: KafkaTopic.SAGA_COMMANDS, - # DLQ events - EventType.DLQ_MESSAGE_RECEIVED: KafkaTopic.DLQ_EVENTS, - EventType.DLQ_MESSAGE_RETRIED: KafkaTopic.DLQ_EVENTS, - EventType.DLQ_MESSAGE_DISCARDED: KafkaTopic.DLQ_EVENTS, +CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[EventType]] = { + GroupId.EXECUTION_COORDINATOR: { + EventType.EXECUTION_REQUESTED, + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_CANCELLED, + }, + GroupId.K8S_WORKER: { + EventType.CREATE_POD_COMMAND, + EventType.DELETE_POD_COMMAND, + }, + GroupId.RESULT_PROCESSOR: { + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + }, + GroupId.SAGA_ORCHESTRATOR: { + EventType.EXECUTION_REQUESTED, + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + }, + GroupId.NOTIFICATION_SERVICE: { + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + }, } @@ -90,49 +47,3 @@ def _get_event_type_to_class() -> dict[EventType, type]: def get_event_class_for_type(event_type: EventType) -> type | None: """Get the event class for a given event type.""" return _get_event_type_to_class().get(event_type) - - -@lru_cache(maxsize=128) -def get_topic_for_event(event_type: EventType) -> KafkaTopic: - """Get the Kafka topic for a given event type.""" - return EVENT_TYPE_TO_TOPIC.get(event_type, KafkaTopic.SYSTEM_EVENTS) - - -def get_event_types_for_topic(topic: KafkaTopic) -> list[EventType]: - """Get all event types that publish to a given topic.""" - return [et for et, t in EVENT_TYPE_TO_TOPIC.items() if t == topic] - - -CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[KafkaTopic]] = { - GroupId.EXECUTION_COORDINATOR: { - KafkaTopic.EXECUTION_EVENTS, - KafkaTopic.EXECUTION_RESULTS, - }, - GroupId.K8S_WORKER: { - KafkaTopic.SAGA_COMMANDS, - }, - GroupId.POD_MONITOR: { - KafkaTopic.POD_EVENTS, - KafkaTopic.POD_STATUS_UPDATES, - }, - GroupId.RESULT_PROCESSOR: { - KafkaTopic.EXECUTION_EVENTS, - }, - GroupId.SAGA_ORCHESTRATOR: { - KafkaTopic.EXECUTION_EVENTS, - KafkaTopic.SAGA_COMMANDS, - }, - GroupId.WEBSOCKET_GATEWAY: { - KafkaTopic.EXECUTION_EVENTS, - KafkaTopic.EXECUTION_RESULTS, - KafkaTopic.POD_EVENTS, - KafkaTopic.POD_STATUS_UPDATES, - }, - GroupId.NOTIFICATION_SERVICE: { - KafkaTopic.NOTIFICATION_EVENTS, - KafkaTopic.EXECUTION_EVENTS, - }, - GroupId.DLQ_MANAGER: { - KafkaTopic.DEAD_LETTER_QUEUE, - }, -} diff --git a/backend/app/infrastructure/kafka/topics.py b/backend/app/infrastructure/kafka/topics.py deleted file mode 100644 index a664bb19..00000000 --- a/backend/app/infrastructure/kafka/topics.py +++ /dev/null @@ -1,201 +0,0 @@ -from typing import Any - -from app.domain.enums import KafkaTopic - - -def get_all_topics() -> set[KafkaTopic]: - """Get all Kafka topics.""" - return set(KafkaTopic) - - -def get_topic_configs() -> dict[KafkaTopic, dict[str, Any]]: - """Get configuration for all Kafka topics.""" - return { - # High-volume execution topics - KafkaTopic.EXECUTION_EVENTS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_COMPLETED: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_FAILED: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_TIMEOUT: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_REQUESTS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_COMMANDS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "86400000", # 1 day - "compression.type": "gzip", - }, - }, - KafkaTopic.EXECUTION_TASKS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "86400000", # 1 day - "compression.type": "gzip", - }, - }, - # Pod lifecycle topics - KafkaTopic.POD_EVENTS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "86400000", # 1 day - "compression.type": "gzip", - }, - }, - KafkaTopic.POD_STATUS_UPDATES: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "86400000", # 1 day - "compression.type": "gzip", - }, - }, - KafkaTopic.POD_RESULTS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # Result topics - KafkaTopic.EXECUTION_RESULTS: { - "num_partitions": 10, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # User topics - KafkaTopic.USER_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "2592000000", # 30 days - "compression.type": "gzip", - }, - }, - KafkaTopic.USER_NOTIFICATIONS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - KafkaTopic.USER_SETTINGS_EVENTS: { - "num_partitions": 3, - "replication_factor": 1, - "config": { - "retention.ms": "2592000000", # 30 days - "compression.type": "gzip", - }, - }, - # Script topics - KafkaTopic.SCRIPT_EVENTS: { - "num_partitions": 3, - "replication_factor": 1, - "config": { - "retention.ms": "2592000000", # 30 days - "compression.type": "gzip", - }, - }, - # Security topics - KafkaTopic.SECURITY_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "2592000000", # 30 days - "compression.type": "gzip", - }, - }, - # Resource topics - KafkaTopic.RESOURCE_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # Notification topics - KafkaTopic.NOTIFICATION_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # System topics - KafkaTopic.SYSTEM_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # Saga topics - KafkaTopic.SAGA_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - # Infrastructure topics - KafkaTopic.DEAD_LETTER_QUEUE: { - "num_partitions": 3, - "replication_factor": 1, - "config": { - "retention.ms": "1209600000", # 14 days - "compression.type": "gzip", - }, - }, - KafkaTopic.WEBSOCKET_EVENTS: { - "num_partitions": 5, - "replication_factor": 1, - "config": { - "retention.ms": "86400000", # 1 day - "compression.type": "gzip", - }, - }, - } diff --git a/backend/app/schemas_pydantic/replay.py b/backend/app/schemas_pydantic/replay.py index 95685b45..4cc8d59a 100644 --- a/backend/app/schemas_pydantic/replay.py +++ b/backend/app/schemas_pydantic/replay.py @@ -2,7 +2,7 @@ from pydantic import BaseModel, ConfigDict, Field, computed_field -from app.domain.enums import EventType, KafkaTopic, ReplayStatus, ReplayTarget, ReplayType +from app.domain.enums import ReplayStatus, ReplayTarget, ReplayType from app.domain.replay import ReplayFilter @@ -19,7 +19,6 @@ class ReplayRequest(BaseModel): max_events: int | None = Field(default=None, ge=1) skip_errors: bool = True target_file_path: str | None = None - target_topics: dict[EventType, KafkaTopic] | None = None retry_failed: bool = False retry_attempts: int = Field(default=3, ge=1, le=10) enable_progress_tracking: bool = True diff --git a/backend/app/schemas_pydantic/replay_models.py b/backend/app/schemas_pydantic/replay_models.py index 8f2769b6..bb83711b 100644 --- a/backend/app/schemas_pydantic/replay_models.py +++ b/backend/app/schemas_pydantic/replay_models.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, ConfigDict, Field -from app.domain.enums import EventType, KafkaTopic, ReplayStatus, ReplayTarget, ReplayType +from app.domain.enums import EventType, ReplayStatus, ReplayTarget, ReplayType from app.domain.replay import ReplayError @@ -31,7 +31,6 @@ class ReplayConfigSchema(BaseModel): batch_size: int = Field(default=100, ge=1, le=1000) max_events: int | None = Field(default=None, ge=1) - target_topics: dict[EventType, KafkaTopic] | None = None target_file_path: str | None = None skip_errors: bool = True diff --git a/backend/app/services/pod_monitor/config.py b/backend/app/services/pod_monitor/config.py index 04af8524..68b75c36 100644 --- a/backend/app/services/pod_monitor/config.py +++ b/backend/app/services/pod_monitor/config.py @@ -1,8 +1,7 @@ import os from dataclasses import dataclass, field -from app.domain.enums import EventType, KafkaTopic -from app.infrastructure.kafka import get_topic_for_event +from app.domain.enums import EventType from app.services.pod_monitor.event_mapper import PodPhase @@ -10,11 +9,11 @@ class PodMonitorConfig: """Configuration for PodMonitor service""" - # Kafka settings - pod_events_topic: KafkaTopic = get_topic_for_event(EventType.POD_CREATED) - execution_events_topic: KafkaTopic = get_topic_for_event(EventType.EXECUTION_REQUESTED) - execution_completed_topic: KafkaTopic = get_topic_for_event(EventType.EXECUTION_COMPLETED) - execution_failed_topic: KafkaTopic = get_topic_for_event(EventType.EXECUTION_FAILED) + # Kafka settings (topic names derived from EventType) + pod_events_topic: str = str(EventType.POD_CREATED) + execution_events_topic: str = str(EventType.EXECUTION_REQUESTED) + execution_completed_topic: str = str(EventType.EXECUTION_COMPLETED) + execution_failed_topic: str = str(EventType.EXECUTION_FAILED) # Kubernetes settings namespace: str = os.environ.get("K8S_NAMESPACE", "integr8scode") diff --git a/backend/config.dlq-processor.toml b/backend/config.dlq-processor.toml deleted file mode 100644 index 5d8447fc..00000000 --- a/backend/config.dlq-processor.toml +++ /dev/null @@ -1,2 +0,0 @@ -TRACING_SERVICE_NAME = "dlq-processor" -KAFKA_CONSUMER_GROUP_ID = "dlq-processor" diff --git a/backend/scripts/create_topics.py b/backend/scripts/create_topics.py index 2cf81e21..49d90715 100755 --- a/backend/scripts/create_topics.py +++ b/backend/scripts/create_topics.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 """ Create all required Kafka topics for the Integr8sCode backend. + +Topics are created with Kafka broker defaults (partitions, replication, retention). +Configure these via broker-level settings, not in application code. """ import asyncio @@ -10,16 +13,15 @@ from aiokafka.admin import AIOKafkaAdminClient, NewTopic from aiokafka.errors import TopicAlreadyExistsError from app.core.logging import setup_logger -from app.infrastructure.kafka.topics import get_all_topics, get_topic_configs +from app.domain.enums import EventType from app.settings import Settings logger = setup_logger(os.environ.get("LOG_LEVEL", "INFO")) async def create_topics(settings: Settings) -> None: - """Create all required Kafka topics using provided settings.""" + """Create all required Kafka topics using broker defaults.""" - # Create admin client admin_client = AIOKafkaAdminClient( bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, client_id="topic-creator", @@ -29,44 +31,19 @@ async def create_topics(settings: Settings) -> None: await admin_client.start() logger.info(f"Connected to Kafka brokers: {settings.KAFKA_BOOTSTRAP_SERVERS}") - # Get existing topics existing_topics: list[str] = await admin_client.list_topics() existing_topics_set = set(existing_topics) - logger.info(f"Existing topics: {existing_topics_set}") - # Get all required topics and their configs - all_topics = get_all_topics() - topic_configs = get_topic_configs() + all_topics = {str(et) for et in EventType} topic_prefix = settings.KAFKA_TOPIC_PREFIX logger.info(f"Total required topics: {len(all_topics)} (prefix: '{topic_prefix}')") - # Create topics topics_to_create: list[NewTopic] = [] for topic in all_topics: - # Apply topic prefix for consistency with consumers/producers topic_name = f"{topic_prefix}{topic}" if topic_name not in existing_topics_set: - # Get config from topic_configs - config = topic_configs.get( - topic, - { - "num_partitions": 3, - "replication_factor": 1, - "config": { - "retention.ms": "604800000", # 7 days - "compression.type": "gzip", - }, - }, - ) - - new_topic = NewTopic( - name=topic_name, - num_partitions=config.get("num_partitions", 3), - replication_factor=config.get("replication_factor", 1), - topic_configs=config.get("config", {}), - ) - topics_to_create.append(new_topic) + topics_to_create.append(NewTopic(name=topic_name, num_partitions=-1, replication_factor=-1)) logger.info(f"Will create topic: {topic_name}") else: logger.info(f"Topic already exists: {topic_name}") @@ -84,11 +61,10 @@ async def create_topics(settings: Settings) -> None: else: logger.info("All topics already exist") - # List final topics final_topics: list[str] = await admin_client.list_topics() logger.info(f"Final topics count: {len(final_topics)}") for topic_name in sorted(final_topics): - if not topic_name.startswith("__"): # Skip internal topics + if not topic_name.startswith("__"): logger.info(f" - {topic_name}") finally: @@ -108,5 +84,4 @@ async def main() -> None: if __name__ == "__main__": - # Run with proper event loop asyncio.run(main()) diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index 57fbd95e..423ed4fa 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -9,7 +9,7 @@ import pytest_asyncio from aiokafka import AIOKafkaConsumer from app.db.docs.saga import SagaDocument -from app.domain.enums import EventType, KafkaTopic, UserRole +from app.domain.enums import EventType, UserRole from app.domain.events import DomainEvent, DomainEventAdapter from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse @@ -136,13 +136,7 @@ async def wait_for_notification_created(self, execution_id: str, timeout: float async def event_waiter(test_settings: Settings) -> AsyncGenerator[EventWaiter, None]: """Session-scoped Kafka event waiter. Starts before any test produces events.""" prefix = test_settings.KAFKA_TOPIC_PREFIX - topics = [ - f"{prefix}{KafkaTopic.EXECUTION_EVENTS}", - f"{prefix}{KafkaTopic.EXECUTION_RESULTS}", - f"{prefix}{KafkaTopic.SAGA_EVENTS}", - f"{prefix}{KafkaTopic.SAGA_COMMANDS}", - f"{prefix}{KafkaTopic.NOTIFICATION_EVENTS}", - ] + topics = [f"{prefix}{et}" for et in EventType] waiter = EventWaiter(test_settings.KAFKA_BOOTSTRAP_SERVERS, topics) await waiter.start() _logger.info("EventWaiter started on %s", topics) diff --git a/backend/tests/e2e/dlq/test_dlq_discard.py b/backend/tests/e2e/dlq/test_dlq_discard.py index e9a89e03..2efa0f89 100644 --- a/backend/tests/e2e/dlq/test_dlq_discard.py +++ b/backend/tests/e2e/dlq/test_dlq_discard.py @@ -6,7 +6,7 @@ from app.db.docs import DLQMessageDocument from app.db.repositories import DLQRepository from app.dlq.models import DLQMessageStatus -from app.domain.enums import KafkaTopic +from app.domain.enums import EventType from dishka import AsyncContainer from tests.conftest import make_execution_requested_event @@ -32,7 +32,7 @@ async def _create_dlq_document( doc = DLQMessageDocument( event=event_dict, - original_topic=KafkaTopic.EXECUTION_EVENTS, + original_topic=str(EventType.EXECUTION_REQUESTED), error="Test error", retry_count=0, failed_at=now, diff --git a/backend/tests/e2e/dlq/test_dlq_manager.py b/backend/tests/e2e/dlq/test_dlq_manager.py index 90f96865..f4988957 100644 --- a/backend/tests/e2e/dlq/test_dlq_manager.py +++ b/backend/tests/e2e/dlq/test_dlq_manager.py @@ -11,7 +11,7 @@ from app.db.repositories import DLQRepository from app.dlq.manager import DLQManager from app.dlq.models import DLQMessage -from app.domain.enums import EventType, KafkaTopic +from app.domain.enums import EventType from app.domain.events import DLQMessageReceivedEvent, DomainEventAdapter from app.settings import Settings from dishka import AsyncContainer @@ -39,7 +39,7 @@ async def test_dlq_manager_persists_and_emits_event(scope: AsyncContainer, test_ received_future: asyncio.Future[DLQMessageReceivedEvent] = asyncio.get_running_loop().create_future() # Create consumer for DLQ events topic - dlq_events_topic = f"{prefix}{KafkaTopic.DLQ_EVENTS}" + dlq_events_topic = f"{prefix}{EventType.DLQ_MESSAGE_RECEIVED}" events_consumer = AIOKafkaConsumer( dlq_events_topic, bootstrap_servers=test_settings.KAFKA_BOOTSTRAP_SERVERS, @@ -86,7 +86,7 @@ async def consume_dlq_events() -> None: # Build a DLQMessage directly and call handle_message (no internal consumer loop) dlq_msg = DLQMessage( event=ev, - original_topic=f"{prefix}{KafkaTopic.EXECUTION_EVENTS}", + original_topic=f"{prefix}{EventType.EXECUTION_REQUESTED}", error="handler failed", retry_count=0, failed_at=datetime.now(timezone.utc), diff --git a/backend/tests/e2e/dlq/test_dlq_retry.py b/backend/tests/e2e/dlq/test_dlq_retry.py index 931b6565..e8e629bf 100644 --- a/backend/tests/e2e/dlq/test_dlq_retry.py +++ b/backend/tests/e2e/dlq/test_dlq_retry.py @@ -6,7 +6,7 @@ from app.db.docs import DLQMessageDocument from app.db.repositories import DLQRepository from app.dlq.models import DLQMessageStatus -from app.domain.enums import KafkaTopic +from app.domain.enums import EventType from dishka import AsyncContainer from tests.conftest import make_execution_requested_event @@ -32,7 +32,7 @@ async def _create_dlq_document( doc = DLQMessageDocument( event=event_dict, - original_topic=KafkaTopic.EXECUTION_EVENTS, + original_topic=str(EventType.EXECUTION_REQUESTED), error="Test error", retry_count=0, failed_at=now, diff --git a/backend/tests/e2e/events/test_producer_roundtrip.py b/backend/tests/e2e/events/test_producer_roundtrip.py index 773c7f3a..ee92a116 100644 --- a/backend/tests/e2e/events/test_producer_roundtrip.py +++ b/backend/tests/e2e/events/test_producer_roundtrip.py @@ -3,7 +3,6 @@ import pytest from app.events.core import UnifiedProducer -from app.infrastructure.kafka.mappings import get_topic_for_event from dishka import AsyncContainer from tests.conftest import make_execution_requested_event @@ -14,14 +13,10 @@ @pytest.mark.asyncio -async def test_unified_producer_produce_and_send_to_dlq( +async def test_unified_producer_produce( scope: AsyncContainer, ) -> None: prod: UnifiedProducer = await scope.get(UnifiedProducer) ev = make_execution_requested_event(execution_id=f"exec-{uuid4().hex[:8]}") await prod.produce(ev, key=ev.execution_id) - - # Exercise send_to_dlq path — should not raise - topic = str(get_topic_for_event(ev.event_type)) - await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1) diff --git a/backend/tests/unit/domain/events/test_event_schema_coverage.py b/backend/tests/unit/domain/events/test_event_schema_coverage.py index c1616a7e..84e0c4bf 100644 --- a/backend/tests/unit/domain/events/test_event_schema_coverage.py +++ b/backend/tests/unit/domain/events/test_event_schema_coverage.py @@ -53,13 +53,17 @@ def get_kafka_event_classes() -> dict[EventType, type]: return get_domain_event_classes() +# EventType values that represent topics, not domain events (no event class expected) +TOPIC_ONLY_TYPES: set[EventType] = set() + + class TestEventSchemaCoverage: """Ensure complete correspondence between EventType and event classes.""" def test_all_event_types_have_domain_event_class(self) -> None: - """Every EventType must have a corresponding domain event class.""" + """Every EventType (except topic-only types) must have a corresponding domain event class.""" domain_mapping = get_domain_event_classes() - all_types = set(EventType) + all_types = set(EventType) - TOPIC_ONLY_TYPES covered_types = set(domain_mapping.keys()) missing = all_types - covered_types @@ -69,9 +73,9 @@ def test_all_event_types_have_domain_event_class(self) -> None: ) def test_all_event_types_have_kafka_event_class(self) -> None: - """Every EventType must have a corresponding Kafka event class.""" + """Every EventType (except topic-only types) must have a corresponding Kafka event class.""" kafka_mapping = get_kafka_event_classes() - all_types = set(EventType) + all_types = set(EventType) - TOPIC_ONLY_TYPES covered_types = set(kafka_mapping.keys()) missing = all_types - covered_types @@ -87,7 +91,7 @@ def test_DomainEventAdapter_covers_all_types(self) -> None: """The DomainEventAdapter TypeAdapter must handle all EventTypes.""" errors: list[str] = [] - for et in EventType: + for et in set(EventType) - TOPIC_ONLY_TYPES: try: # Validation will fail due to missing required fields, but that's OK # We just want to confirm the type IS in the union (not "unknown discriminator") @@ -147,7 +151,7 @@ def test_domain_and_kafka_event_names_match(self) -> None: kafka_mapping = get_kafka_event_classes() mismatches: list[str] = [] - for et in EventType: + for et in set(EventType) - TOPIC_ONLY_TYPES: domain_cls = domain_mapping.get(et) kafka_cls = kafka_mapping.get(et) diff --git a/backend/tests/unit/events/test_mappings_and_types.py b/backend/tests/unit/events/test_mappings_and_types.py index bc7cfb10..94803d90 100644 --- a/backend/tests/unit/events/test_mappings_and_types.py +++ b/backend/tests/unit/events/test_mappings_and_types.py @@ -1,16 +1,10 @@ -from app.domain.enums import EventType, KafkaTopic -from app.infrastructure.kafka.mappings import ( - get_event_class_for_type, - get_event_types_for_topic, - get_topic_for_event, -) +from app.domain.enums import EventType +from app.infrastructure.kafka.mappings import get_event_class_for_type -def test_event_mappings_topics() -> None: - # A few spot checks - assert get_topic_for_event(EventType.EXECUTION_REQUESTED) == KafkaTopic.EXECUTION_EVENTS +def test_event_class_for_type() -> None: cls = get_event_class_for_type(EventType.CREATE_POD_COMMAND) assert cls is not None - # All event types for a topic include at least one of the checked types - ev_types = get_event_types_for_topic(KafkaTopic.EXECUTION_EVENTS) - assert EventType.EXECUTION_REQUESTED in ev_types + + cls2 = get_event_class_for_type(EventType.EXECUTION_REQUESTED) + assert cls2 is not None diff --git a/backend/tests/unit/services/pod_monitor/test_config_and_init.py b/backend/tests/unit/services/pod_monitor/test_config_and_init.py index 57fd710a..e60c686b 100644 --- a/backend/tests/unit/services/pod_monitor/test_config_and_init.py +++ b/backend/tests/unit/services/pod_monitor/test_config_and_init.py @@ -1,7 +1,6 @@ import importlib import pytest -from app.domain.enums import KafkaTopic from app.services.pod_monitor import PodMonitorConfig pytestmark = pytest.mark.unit @@ -10,8 +9,8 @@ def test_pod_monitor_config_defaults() -> None: cfg = PodMonitorConfig() assert cfg.namespace in {"integr8scode", "default"} - assert isinstance(cfg.pod_events_topic, KafkaTopic) and cfg.pod_events_topic - assert isinstance(cfg.execution_completed_topic, KafkaTopic) + assert isinstance(cfg.pod_events_topic, str) and cfg.pod_events_topic + assert isinstance(cfg.execution_completed_topic, str) assert cfg.ignored_pod_phases == [] diff --git a/backend/workers/run_dlq_processor.py b/backend/workers/run_dlq_processor.py deleted file mode 100644 index 8ae604ed..00000000 --- a/backend/workers/run_dlq_processor.py +++ /dev/null @@ -1,70 +0,0 @@ -import asyncio -from typing import Any - -from app.core.container import create_dlq_processor_container -from app.core.logging import setup_logger -from app.core.tracing import init_tracing -from app.db.docs import ALL_DOCUMENTS -from app.dlq.manager import DLQManager -from app.domain.enums import GroupId -from app.events.handlers import register_dlq_subscriber -from app.settings import Settings -from beanie import init_beanie -from dishka.integrations.faststream import setup_dishka -from faststream import FastStream -from faststream.kafka import KafkaBroker -from pymongo import AsyncMongoClient - - -def main() -> None: - """Main entry point for DLQ processor worker.""" - settings = Settings(override_path="config.dlq-processor.toml") - - logger = setup_logger(settings.LOG_LEVEL) - - logger.info("Starting DLQ Processor worker...") - - if settings.ENABLE_TRACING: - init_tracing( - service_name=GroupId.DLQ_MANAGER, - settings=settings, - logger=logger, - service_version=settings.TRACING_SERVICE_VERSION, - enable_console_exporter=False, - sampling_rate=settings.TRACING_SAMPLING_RATE, - ) - logger.info("Tracing initialized for DLQ Processor") - - async def run() -> None: - # Initialize Beanie with tz_aware client (so MongoDB returns aware datetimes) - client: AsyncMongoClient[dict[str, Any]] = AsyncMongoClient(settings.MONGODB_URL, tz_aware=True) - await init_beanie( - database=client.get_default_database(default=settings.DATABASE_NAME), - document_models=ALL_DOCUMENTS, - ) - logger.info("MongoDB initialized via Beanie") - - # Create DI container - container = create_dlq_processor_container(settings) - - # Get broker from DI - broker: KafkaBroker = await container.get(KafkaBroker) - - # Register DLQ subscriber and set up DI integration - register_dlq_subscriber(broker, settings) - setup_dishka(container, broker=broker, auto_inject=True) - - # Resolving DLQManager starts APScheduler retry monitor (via provider) - async def init_dlq() -> None: - await container.get(DLQManager) - logger.info("DLQ Processor initialized") - - app = FastStream(broker, on_startup=[init_dlq], on_shutdown=[container.close]) - await app.run() - logger.info("DLQ Processor shutdown complete") - - asyncio.run(run()) - - -if __name__ == "__main__": - main() From 0b220b5776e13cb9e54dcdaed18a30670c28b776 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 12 Feb 2026 20:37:17 +0100 Subject: [PATCH 2/2] fix: deploy (no dlq processor) --- .github/workflows/stack-tests.yml | 2 +- docker-compose.yaml | 22 ++-------------------- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml index 7ff6aac4..af6cb22f 100644 --- a/.github/workflows/stack-tests.yml +++ b/.github/workflows/stack-tests.yml @@ -299,7 +299,7 @@ jobs: docker compose logs --timestamps > logs/docker-compose.log 2>&1 for svc in backend mongo redis kafka zookeeper \ coordinator k8s-worker pod-monitor result-processor \ - saga-orchestrator event-replay dlq-processor; do + saga-orchestrator event-replay; do docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true done kubectl get events --sort-by='.metadata.creationTimestamp' -A > logs/k8s-events.log 2>&1 || true diff --git a/docker-compose.yaml b/docker-compose.yaml index 1108b554..e28edf62 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -349,6 +349,8 @@ services: environment: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 volumes: + - ./backend/app:/app/app:ro + - ./backend/scripts:/app/scripts:ro - ./backend/config.toml:/app/config.toml:ro - ./backend/secrets.toml:/app/secrets.toml:ro command: ["python", "-m", "scripts.create_topics"] @@ -517,26 +519,6 @@ services: - app-network restart: unless-stopped - # DLQ Processor Service - dlq-processor: - image: ghcr.io/hardmax71/integr8scode/backend:${IMAGE_TAG:-latest} - container_name: dlq-processor - command: ["python", "workers/run_dlq_processor.py"] - depends_on: - kafka-init: - condition: service_completed_successfully - mongo: - condition: service_started - volumes: - - ./backend/app:/app/app:ro - - ./backend/workers:/app/workers:ro - - ./backend/config.toml:/app/config.toml:ro - - ./backend/secrets.toml:/app/secrets.toml:ro - - ./backend/config.dlq-processor.toml:/app/config.dlq-processor.toml:ro - networks: - - app-network - restart: unless-stopped - # Monitoring Stack # Victoria Metrics - Time series database victoria-metrics: