Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
CoordinatorProvider,
CoreServicesProvider,
DLQProvider,
DLQWorkerProvider,
EventReplayProvider,
EventReplayWorkerProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
Expand All @@ -24,7 +22,6 @@
ResourceCleanerProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SagaWorkerProvider,
SettingsProvider,
SSEProvider,
UserServicesProvider,
Expand Down Expand Up @@ -140,10 +137,7 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:


def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the SagaOrchestrator worker.

Uses SagaWorkerProvider which adds APScheduler-managed timeout checking.
"""
"""Create DI container for the SagaOrchestrator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -154,16 +148,13 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
SagaWorkerProvider(),
SagaOrchestratorProvider(),
context={Settings: settings},
)


def create_event_replay_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the EventReplay worker.

Uses EventReplayWorkerProvider which adds APScheduler-managed session cleanup.
"""
"""Create DI container for the EventReplay worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -174,17 +165,13 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventReplayWorkerProvider(),
EventReplayProvider(),
context={Settings: settings},
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker.

Uses DLQWorkerProvider which adds APScheduler-managed retry monitoring
and configures retry policies and filters.
"""
"""Create DI container for the DLQ processor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -194,6 +181,6 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQWorkerProvider(),
DLQProvider(),
context={Settings: settings},
)
25 changes: 19 additions & 6 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, AsyncGenerator

import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from beanie import init_beanie
from dishka import AsyncContainer
from dishka.integrations.faststream import setup_dishka as setup_dishka_faststream
Expand Down Expand Up @@ -32,7 +33,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
init_beanie() is called here BEFORE any providers are resolved, so that
Beanie document classes are initialized before repositories use them.

KafkaBroker lifecycle (start/stop) is managed by BrokerProvider.
KafkaBroker lifecycle (start/stop) is managed here explicitly.
Subscriber registration and FastStream integration are set up here.
"""
settings: Settings = app.state.settings
Expand Down Expand Up @@ -76,14 +77,26 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
await broker.start()
logger.info("Kafka broker started")

# Resolve NotificationScheduler — starts APScheduler via DI provider graph.
await container.get(NotificationScheduler)
logger.info("NotificationScheduler started")
# Set up APScheduler for NotificationScheduler
notification_scheduler: NotificationScheduler = await container.get(NotificationScheduler)
notification_apscheduler = AsyncIOScheduler()
notification_apscheduler.add_job(
notification_scheduler.process_due_notifications,
trigger="interval",
seconds=15,
id="process_due_notifications",
max_instances=1,
misfire_grace_time=60,
)
notification_apscheduler.start()
logger.info("NotificationScheduler started (APScheduler interval=15s)")

try:
yield
finally:
# Container close triggers BrokerProvider cleanup (closes broker)
# and all other async generators in providers
notification_apscheduler.shutdown(wait=False)
logger.info("NotificationScheduler stopped")
await broker.stop()
logger.info("Kafka broker stopped")
await container.close()
logger.info("DI container closed")
Loading
Loading