diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 43b737f1..42aef885 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -9,9 +9,7 @@ CoordinatorProvider, CoreServicesProvider, DLQProvider, - DLQWorkerProvider, EventReplayProvider, - EventReplayWorkerProvider, K8sWorkerProvider, KafkaServicesProvider, KubernetesProvider, @@ -24,7 +22,6 @@ ResourceCleanerProvider, ResultProcessorProvider, SagaOrchestratorProvider, - SagaWorkerProvider, SettingsProvider, SSEProvider, UserServicesProvider, @@ -140,10 +137,7 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer: def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer: - """Create DI container for the SagaOrchestrator worker. - - Uses SagaWorkerProvider which adds APScheduler-managed timeout checking. - """ + """Create DI container for the SagaOrchestrator worker.""" return make_async_container( SettingsProvider(), LoggingProvider(), @@ -154,16 +148,13 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer: RepositoryProvider(), MessagingProvider(), DLQProvider(), - SagaWorkerProvider(), + SagaOrchestratorProvider(), context={Settings: settings}, ) def create_event_replay_container(settings: Settings) -> AsyncContainer: - """Create DI container for the EventReplay worker. - - Uses EventReplayWorkerProvider which adds APScheduler-managed session cleanup. - """ + """Create DI container for the EventReplay worker.""" return make_async_container( SettingsProvider(), LoggingProvider(), @@ -174,17 +165,13 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer: RepositoryProvider(), MessagingProvider(), DLQProvider(), - EventReplayWorkerProvider(), + EventReplayProvider(), context={Settings: settings}, ) def create_dlq_processor_container(settings: Settings) -> AsyncContainer: - """Create DI container for the DLQ processor worker. - - Uses DLQWorkerProvider which adds APScheduler-managed retry monitoring - and configures retry policies and filters. - """ + """Create DI container for the DLQ processor worker.""" return make_async_container( SettingsProvider(), LoggingProvider(), @@ -194,6 +181,6 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer: MetricsProvider(), RepositoryProvider(), MessagingProvider(), - DLQWorkerProvider(), + DLQProvider(), context={Settings: settings}, ) diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py index d484e267..76f6f4b2 100644 --- a/backend/app/core/dishka_lifespan.py +++ b/backend/app/core/dishka_lifespan.py @@ -4,6 +4,7 @@ from typing import Any, AsyncGenerator import structlog +from apscheduler.schedulers.asyncio import AsyncIOScheduler from beanie import init_beanie from dishka import AsyncContainer from dishka.integrations.faststream import setup_dishka as setup_dishka_faststream @@ -32,7 +33,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: init_beanie() is called here BEFORE any providers are resolved, so that Beanie document classes are initialized before repositories use them. - KafkaBroker lifecycle (start/stop) is managed by BrokerProvider. + KafkaBroker lifecycle (start/stop) is managed here explicitly. Subscriber registration and FastStream integration are set up here. """ settings: Settings = app.state.settings @@ -76,14 +77,26 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: await broker.start() logger.info("Kafka broker started") - # Resolve NotificationScheduler — starts APScheduler via DI provider graph. - await container.get(NotificationScheduler) - logger.info("NotificationScheduler started") + # Set up APScheduler for NotificationScheduler + notification_scheduler: NotificationScheduler = await container.get(NotificationScheduler) + notification_apscheduler = AsyncIOScheduler() + notification_apscheduler.add_job( + notification_scheduler.process_due_notifications, + trigger="interval", + seconds=15, + id="process_due_notifications", + max_instances=1, + misfire_grace_time=60, + ) + notification_apscheduler.start() + logger.info("NotificationScheduler started (APScheduler interval=15s)") try: yield finally: - # Container close triggers BrokerProvider cleanup (closes broker) - # and all other async generators in providers + notification_apscheduler.shutdown(wait=False) + logger.info("NotificationScheduler stopped") + await broker.stop() + logger.info("Kafka broker stopped") await container.close() logger.info("DI container closed") diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index e9fcf64d..abab1a2d 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -4,13 +4,11 @@ import redis.asyncio as redis import structlog -from apscheduler.schedulers.asyncio import AsyncIOScheduler from dishka import Provider, Scope, from_context, provide from faststream.kafka import KafkaBroker from faststream.kafka.opentelemetry import KafkaTelemetryMiddleware from kubernetes_asyncio import client as k8s_client from kubernetes_asyncio import config as k8s_config -from kubernetes_asyncio.client.rest import ApiException from app.core.logging import setup_logger from app.core.metrics import ( @@ -63,7 +61,7 @@ from app.services.login_lockout import LoginLockoutService from app.services.notification_scheduler import NotificationScheduler from app.services.notification_service import NotificationService -from app.services.pod_monitor import ErrorType, PodEventMapper, PodMonitor, PodMonitorConfig +from app.services.pod_monitor import PodEventMapper, PodMonitor, PodMonitorConfig from app.services.rate_limit_service import RateLimitService from app.services.result_processor import ResourceCleaner, ResultProcessor from app.services.runtime_settings import RuntimeSettingsLoader @@ -75,23 +73,23 @@ class BrokerProvider(Provider): - """Provides KafkaBroker instance. + """Provides KafkaBroker instance (creation only, no lifecycle management). The broker is created here but NOT started. This is required because: 1. Subscribers must be registered before broker.start() 2. setup_dishka() must be called before broker.start() Lifecycle is managed externally: - - Workers with FastStream: FastStream(broker).run() handles start/stop - - Main app / event_replay: Manual broker.start(), provider handles stop + - Workers: FastStream(broker).run() handles start/stop + - Main app: dishka_lifespan handles broker.start() and broker.stop() """ scope = Scope.APP @provide - async def get_broker( + def get_broker( self, settings: Settings, logger: structlog.stdlib.BoundLogger, _tracer: Tracer, - ) -> AsyncIterator[KafkaBroker]: + ) -> KafkaBroker: broker = KafkaBroker( settings.KAFKA_BOOTSTRAP_SERVERS, logger=logger, @@ -100,11 +98,7 @@ async def get_broker( middlewares=(KafkaTelemetryMiddleware(),), ) logger.info("Kafka broker created") - try: - yield broker - finally: - await broker.stop() - logger.info("Kafka broker stopped") + return broker class SettingsProvider(Provider): @@ -293,52 +287,6 @@ def get_dlq_manager( ) -class DLQWorkerProvider(Provider): - """Provides DLQManager with APScheduler-managed retry monitoring. - - Used by the DLQ worker container only. - """ - - scope = Scope.APP - - @provide - async def get_dlq_manager( - self, - broker: KafkaBroker, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - dlq_metrics: DLQMetrics, - repository: DLQRepository, - ) -> AsyncIterator[DLQManager]: - manager = DLQManager( - settings=settings, - broker=broker, - logger=logger, - dlq_metrics=dlq_metrics, - repository=repository, - default_retry_policy=_default_retry_policy(), - retry_policies=_default_retry_policies(settings.KAFKA_TOPIC_PREFIX), - ) - - scheduler = AsyncIOScheduler() - scheduler.add_job( - manager.process_monitoring_cycle, - trigger="interval", - seconds=10, - id="dlq_monitor_retries", - max_instances=1, - misfire_grace_time=60, - ) - scheduler.start() - logger.info("DLQManager retry monitor started (APScheduler interval=10s)") - - try: - yield manager - finally: - scheduler.shutdown(wait=False) - logger.info("DLQManager retry monitor stopped") - - class KubernetesProvider(Provider): scope = Scope.APP @@ -624,37 +572,18 @@ def get_notification_service( ) @provide - async def get_notification_scheduler( + def get_notification_scheduler( self, notification_repository: NotificationRepository, notification_service: NotificationService, logger: structlog.stdlib.BoundLogger, - ) -> AsyncIterator[NotificationScheduler]: - - scheduler_service = NotificationScheduler( + ) -> NotificationScheduler: + return NotificationScheduler( notification_repository=notification_repository, notification_service=notification_service, logger=logger, ) - apscheduler = AsyncIOScheduler() - apscheduler.add_job( - scheduler_service.process_due_notifications, - trigger="interval", - seconds=15, - id="process_due_notifications", - max_instances=1, - misfire_grace_time=60, - ) - apscheduler.start() - logger.info("NotificationScheduler started (APScheduler interval=15s)") - - try: - yield scheduler_service - finally: - apscheduler.shutdown(wait=False) - logger.info("NotificationScheduler stopped") - def _create_default_saga_config() -> SagaConfig: """Factory for default SagaConfig used by orchestrators. @@ -798,7 +727,7 @@ def get_pod_monitor_config(self, settings: Settings) -> PodMonitorConfig: ) @provide - async def get_pod_monitor( + def get_pod_monitor( self, config: PodMonitorConfig, kafka_event_service: KafkaEventService, @@ -806,9 +735,8 @@ async def get_pod_monitor( logger: structlog.stdlib.BoundLogger, event_mapper: PodEventMapper, kubernetes_metrics: KubernetesMetrics, - ) -> AsyncIterator[PodMonitor]: - - monitor = PodMonitor( + ) -> PodMonitor: + return PodMonitor( config=config, kafka_event_service=kafka_event_service, logger=logger, @@ -817,40 +745,6 @@ async def get_pod_monitor( kubernetes_metrics=kubernetes_metrics, ) - async def _watch_cycle() -> None: - try: - await monitor.watch_pod_events() - except ApiException as e: - if e.status == 410: - logger.warning("Resource version expired, resetting watch cursor") - monitor._last_resource_version = None - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED) - else: - logger.error(f"API error in watch: {e}") - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.API_ERROR) - kubernetes_metrics.increment_pod_monitor_watch_reconnects() - except Exception as e: - logger.error(f"Unexpected error in watch: {e}", exc_info=True) - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.UNEXPECTED) - kubernetes_metrics.increment_pod_monitor_watch_reconnects() - - scheduler = AsyncIOScheduler() - scheduler.add_job( - _watch_cycle, - trigger="interval", - seconds=5, - id="pod_monitor_watch", - max_instances=1, - misfire_grace_time=60, - ) - scheduler.start() - logger.info("PodMonitor scheduler started (list-then-watch)") - - try: - yield monitor - finally: - scheduler.shutdown(wait=False) - class SagaOrchestratorProvider(Provider): scope = Scope.APP @@ -872,51 +766,6 @@ def get_saga_orchestrator( ) -class SagaWorkerProvider(Provider): - """Provides SagaOrchestrator with APScheduler-managed timeout checking. - - Used by the saga worker container only. The main app container uses - SagaOrchestratorProvider (no scheduler needed). - """ - - scope = Scope.APP - - @provide - async def get_saga_orchestrator( - self, - saga_repository: SagaRepository, - kafka_producer: UnifiedProducer, - resource_allocation_repository: ResourceAllocationRepository, - logger: structlog.stdlib.BoundLogger, - ) -> AsyncIterator[SagaOrchestrator]: - - orchestrator = SagaOrchestrator( - config=_create_default_saga_config(), - saga_repository=saga_repository, - producer=kafka_producer, - resource_allocation_repository=resource_allocation_repository, - logger=logger, - ) - - scheduler = AsyncIOScheduler() - scheduler.add_job( - orchestrator.check_timeouts, - trigger="interval", - seconds=30, - id="saga_check_timeouts", - max_instances=1, - misfire_grace_time=60, - ) - scheduler.start() - logger.info("SagaOrchestrator timeout scheduler started (APScheduler interval=30s)") - - try: - yield orchestrator - finally: - scheduler.shutdown(wait=False) - logger.info("SagaOrchestrator timeout scheduler stopped") - - class ResultProcessorProvider(Provider): scope = Scope.APP @@ -957,46 +806,3 @@ def get_event_replay_service( ) -class EventReplayWorkerProvider(Provider): - """Provides EventReplayService with APScheduler-managed session cleanup. - - Used by the event replay worker container only. The main app container - uses EventReplayProvider (no scheduled cleanup needed). - """ - - scope = Scope.APP - - @provide - async def get_event_replay_service( - self, - replay_repository: ReplayRepository, - kafka_producer: UnifiedProducer, - replay_metrics: ReplayMetrics, - logger: structlog.stdlib.BoundLogger, - ) -> AsyncIterator[EventReplayService]: - - service = EventReplayService( - repository=replay_repository, - producer=kafka_producer, - replay_metrics=replay_metrics, - logger=logger, - ) - - scheduler = AsyncIOScheduler() - scheduler.add_job( - service.cleanup_old_sessions, - trigger="interval", - hours=6, - kwargs={"older_than_hours": 48}, - id="replay_cleanup_old_sessions", - max_instances=1, - misfire_grace_time=300, - ) - scheduler.start() - logger.info("EventReplayService cleanup scheduler started (APScheduler interval=6h)") - - try: - yield service - finally: - scheduler.shutdown(wait=False) - logger.info("EventReplayService cleanup scheduler stopped") diff --git a/backend/workers/run_dlq_processor.py b/backend/workers/run_dlq_processor.py index 609b884a..333f3d1b 100644 --- a/backend/workers/run_dlq_processor.py +++ b/backend/workers/run_dlq_processor.py @@ -7,6 +7,7 @@ from app.dlq.manager import DLQManager from app.events.handlers import register_dlq_subscriber from app.settings import Settings +from apscheduler.schedulers.asyncio import AsyncIOScheduler from beanie import init_beanie from dishka.integrations.faststream import setup_dishka from faststream import FastStream @@ -41,12 +42,26 @@ async def run() -> None: register_dlq_subscriber(broker, settings) setup_dishka(container, broker=broker, auto_inject=True) - # Resolving DLQManager starts APScheduler retry monitor (via provider) + scheduler = AsyncIOScheduler() + async def init_dlq() -> None: - await container.get(DLQManager) - logger.info("DLQ Processor initialized") + manager = await container.get(DLQManager) + scheduler.add_job( + manager.process_monitoring_cycle, + trigger="interval", + seconds=10, + id="dlq_monitor_retries", + max_instances=1, + misfire_grace_time=60, + ) + scheduler.start() + logger.info("DLQ Processor initialized (APScheduler interval=10s)") + + async def shutdown() -> None: + scheduler.shutdown(wait=False) + await container.close() - app = FastStream(broker, on_startup=[init_dlq], on_shutdown=[container.close]) + app = FastStream(broker, on_startup=[init_dlq], on_shutdown=[shutdown]) await app.run() logger.info("DLQ Processor shutdown complete") diff --git a/backend/workers/run_event_replay.py b/backend/workers/run_event_replay.py index 4c474a0f..97d02139 100644 --- a/backend/workers/run_event_replay.py +++ b/backend/workers/run_event_replay.py @@ -1,5 +1,4 @@ import asyncio -import signal from typing import Any from app.core.container import create_event_replay_container @@ -7,51 +6,14 @@ from app.db.docs import ALL_DOCUMENTS from app.services.event_replay import EventReplayService from app.settings import Settings +from apscheduler.schedulers.asyncio import AsyncIOScheduler from beanie import init_beanie from dishka.integrations.faststream import setup_dishka +from faststream import FastStream from faststream.kafka import KafkaBroker from pymongo import AsyncMongoClient -async def run_replay_service(settings: Settings) -> None: - """Run the event replay service with DI-managed cleanup scheduler.""" - logger = setup_logger(settings.LOG_LEVEL) - - # Initialize Beanie with tz_aware client (so MongoDB returns aware datetimes) - client: AsyncMongoClient[dict[str, Any]] = AsyncMongoClient(settings.MONGODB_URL, tz_aware=True) - await init_beanie( - database=client.get_default_database(default=settings.DATABASE_NAME), - document_models=ALL_DOCUMENTS, - ) - logger.info("MongoDB initialized via Beanie") - - # Create DI container - container = create_event_replay_container(settings) - logger.info("Starting EventReplayService with DI container...") - - # Get broker, setup DI, start (no subscribers - this worker only publishes) - broker: KafkaBroker = await container.get(KafkaBroker) - setup_dishka(container, broker=broker, auto_inject=True) - await broker.start() - logger.info("Kafka broker started") - - # Resolving EventReplayService starts the APScheduler cleanup scheduler - # (via EventReplayWorkerProvider). - await container.get(EventReplayService) - logger.info("Event replay service initialized") - - shutdown_event = asyncio.Event() - loop = asyncio.get_running_loop() - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, shutdown_event.set) - - try: - await shutdown_event.wait() - finally: - logger.info("Initiating graceful shutdown...") - await container.close() - - def main() -> None: """Main entry point for event replay service""" settings = Settings(override_path="config.event-replay.toml") @@ -60,7 +22,44 @@ def main() -> None: logger.info("Starting Event Replay Service...") - asyncio.run(run_replay_service(settings)) + async def run() -> None: + # Initialize Beanie with tz_aware client (so MongoDB returns aware datetimes) + client: AsyncMongoClient[dict[str, Any]] = AsyncMongoClient(settings.MONGODB_URL, tz_aware=True) + await init_beanie( + database=client.get_default_database(default=settings.DATABASE_NAME), + document_models=ALL_DOCUMENTS, + ) + logger.info("MongoDB initialized via Beanie") + + container = create_event_replay_container(settings) + broker: KafkaBroker = await container.get(KafkaBroker) + setup_dishka(container, broker=broker, auto_inject=True) + + scheduler = AsyncIOScheduler() + + async def init_replay() -> None: + service = await container.get(EventReplayService) + scheduler.add_job( + service.cleanup_old_sessions, + trigger="interval", + hours=6, + kwargs={"older_than_hours": 48}, + id="replay_cleanup_old_sessions", + max_instances=1, + misfire_grace_time=300, + ) + scheduler.start() + logger.info("Event replay service initialized (APScheduler interval=6h)") + + async def shutdown() -> None: + scheduler.shutdown(wait=False) + await container.close() + + app = FastStream(broker, on_startup=[init_replay], on_shutdown=[shutdown]) + await app.run() + logger.info("EventReplayService shutdown complete") + + asyncio.run(run()) if __name__ == "__main__": diff --git a/backend/workers/run_pod_monitor.py b/backend/workers/run_pod_monitor.py index 7ba1235b..7b439aa0 100644 --- a/backend/workers/run_pod_monitor.py +++ b/backend/workers/run_pod_monitor.py @@ -3,13 +3,16 @@ from app.core.container import create_pod_monitor_container from app.core.logging import setup_logger +from app.core.metrics import KubernetesMetrics from app.db.docs import ALL_DOCUMENTS -from app.services.pod_monitor import PodMonitor +from app.services.pod_monitor import ErrorType, PodMonitor from app.settings import Settings +from apscheduler.schedulers.asyncio import AsyncIOScheduler from beanie import init_beanie from dishka.integrations.faststream import setup_dishka from faststream import FastStream from faststream.kafka import KafkaBroker +from kubernetes_asyncio.client.rest import ApiException from pymongo import AsyncMongoClient @@ -39,12 +42,45 @@ async def run() -> None: # Set up DI integration (no subscribers for pod monitor - it only publishes) setup_dishka(container, broker=broker, auto_inject=True) - # Resolving PodMonitor starts K8s watch loop and reconciliation scheduler (via provider) + scheduler = AsyncIOScheduler() + async def init_monitor() -> None: - await container.get(PodMonitor) - logger.info("PodMonitor initialized") + monitor = await container.get(PodMonitor) + kubernetes_metrics = await container.get(KubernetesMetrics) + + async def _watch_cycle() -> None: + try: + await monitor.watch_pod_events() + except ApiException as e: + if e.status == 410: + logger.warning("Resource version expired, resetting watch cursor") + monitor._last_resource_version = None + kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED) + else: + logger.error(f"API error in watch: {e}") + kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.API_ERROR) + kubernetes_metrics.increment_pod_monitor_watch_reconnects() + except Exception as e: + logger.error(f"Unexpected error in watch: {e}", exc_info=True) + kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.UNEXPECTED) + kubernetes_metrics.increment_pod_monitor_watch_reconnects() + + scheduler.add_job( + _watch_cycle, + trigger="interval", + seconds=5, + id="pod_monitor_watch", + max_instances=1, + misfire_grace_time=60, + ) + scheduler.start() + logger.info("PodMonitor initialized (APScheduler interval=5s)") + + async def shutdown() -> None: + scheduler.shutdown(wait=False) + await container.close() - app = FastStream(broker, on_startup=[init_monitor], on_shutdown=[container.close]) + app = FastStream(broker, on_startup=[init_monitor], on_shutdown=[shutdown]) await app.run() logger.info("PodMonitor shutdown complete") diff --git a/backend/workers/run_saga_orchestrator.py b/backend/workers/run_saga_orchestrator.py index d2f999f8..f8f61821 100644 --- a/backend/workers/run_saga_orchestrator.py +++ b/backend/workers/run_saga_orchestrator.py @@ -7,6 +7,7 @@ from app.events.handlers import register_saga_subscriber from app.services.saga import SagaOrchestrator from app.settings import Settings +from apscheduler.schedulers.asyncio import AsyncIOScheduler from beanie import init_beanie from dishka.integrations.faststream import setup_dishka from faststream import FastStream @@ -41,12 +42,26 @@ async def run() -> None: register_saga_subscriber(broker, settings) setup_dishka(container, broker=broker, auto_inject=True) - # Resolving SagaOrchestrator starts APScheduler timeout checker (via provider) + scheduler = AsyncIOScheduler() + async def init_saga() -> None: - await container.get(SagaOrchestrator) - logger.info("SagaOrchestrator initialized") + orchestrator = await container.get(SagaOrchestrator) + scheduler.add_job( + orchestrator.check_timeouts, + trigger="interval", + seconds=30, + id="saga_check_timeouts", + max_instances=1, + misfire_grace_time=60, + ) + scheduler.start() + logger.info("SagaOrchestrator initialized (APScheduler interval=30s)") + + async def shutdown() -> None: + scheduler.shutdown(wait=False) + await container.close() - app = FastStream(broker, on_startup=[init_saga], on_shutdown=[container.close]) + app = FastStream(broker, on_startup=[init_saga], on_shutdown=[shutdown]) await app.run() logger.info("SagaOrchestrator shutdown complete")