Skip to content

feat: moved lifecycle handling away from providers to workers, update…#176

Merged
HardMax71 merged 1 commit intomainfrom
feat/providers
Feb 13, 2026
Merged

feat: moved lifecycle handling away from providers to workers, update…#176
HardMax71 merged 1 commit intomainfrom
feat/providers

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 13, 2026

Summary by cubic

Moved scheduler and Kafka lifecycle management out of DI providers and into workers and the app lifespan. This simplifies providers and makes startup/shutdown explicit and predictable.

  • Refactors
    • Providers now return plain instances; no APScheduler or broker start/stop in providers.
    • Removed DLQWorkerProvider, SagaWorkerProvider, and EventReplayWorkerProvider; containers use DLQProvider, SagaOrchestratorProvider, and EventReplayProvider.
    • Workers create and manage AsyncIOScheduler jobs for DLQ retries, event replay cleanup, pod monitoring, and saga timeouts; shutdown stops schedulers and closes the container.
    • FastAPI lifespan starts/stops the Kafka broker and sets up/stops NotificationScheduler via APScheduler.

Written for commit 8c4db86. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Restructured background job lifecycle management to use explicit interval-based scheduling for core worker processes.
    • Enhanced error handling and recovery in monitoring and event processing workflows.
    • Improved startup/shutdown coordination for scheduled tasks across the system.
  • Chores

    • Simplified internal scheduler configuration and dependency injection patterns.

@coderabbitai
Copy link

coderabbitai bot commented Feb 13, 2026

📝 Walkthrough

Walkthrough

This PR shifts lifecycle management from DI providers to explicit application-level APScheduler orchestration, replacing worker-specific providers with higher-level orchestrator providers and converting async lifecycle-managed provider methods to synchronous instance creation methods.

Changes

Cohort / File(s) Summary
DI Container Provider Wiring
backend/app/core/container.py
Replaced worker-specific provider imports (SagaWorkerProvider, EventReplayWorkerProvider, DLQWorkerProvider) with higher-level orchestrator providers (SagaOrchestratorProvider, EventReplayProvider, DLQProvider) in container creation functions.
Provider Method Signature Changes
backend/app/core/providers.py
Converted async lifecycle-managed provider methods to synchronous instance creation: BrokerProvider.get_broker, KafkaServicesProvider.get_notification_scheduler, PodMonitorProvider.get_pod_monitor. Removed SagaWorkerProvider, DLQWorkerProvider, EventReplayWorkerProvider classes and their internal APScheduler logic.
Application Lifespan Orchestration
backend/app/core/dishka_lifespan.py
Introduced explicit APScheduler (AsyncIOScheduler) for NotificationScheduler lifecycle management (15-second intervals). Moved scheduler startup/shutdown into lifespan hooks with explicit container lifecycle control.
Worker-Level Explicit Scheduling
backend/workers/run_dlq_processor.py, backend/workers/run_saga_orchestrator.py
Added explicit APScheduler integration with interval-based jobs (DLQ: 10s process_monitoring_cycle; Saga: 30s check_timeouts). Implemented FastStream startup/shutdown hooks to manage scheduler lifecycle.
Worker Event Processing & Replay
backend/workers/run_event_replay.py, backend/workers/run_pod_monitor.py
Replaced provider-driven lifecycle with FastStream app orchestration and explicit APScheduler. Event replay schedules cleanup every 6 hours; pod monitor schedules watch_cycle every 5 seconds with ApiException error handling.

Sequence Diagram(s)

sequenceDiagram
    participant App as FastStream App
    participant Scheduler as APScheduler
    participant DI as DI Container
    participant Service as Worker Service
    
    rect rgba(100, 150, 200, 0.5)
    Note over App,Service: Startup Flow (New Pattern)
    App->>DI: Resolve service instance
    DI-->>App: Service created (no internal scheduling)
    App->>Scheduler: Create AsyncIOScheduler
    App->>Scheduler: Register periodic job
    App->>Scheduler: Start scheduler
    Scheduler-->>App: Running with intervals
    end
    
    rect rgba(200, 100, 100, 0.5)
    Note over Scheduler,Service: Runtime (Explicit Scheduling)
    loop Every N seconds
        Scheduler->>Service: Call task (check_timeouts, process_monitoring, etc.)
        Service-->>Scheduler: Task complete
    end
    end
    
    rect rgba(100, 200, 100, 0.5)
    Note over App,Scheduler: Shutdown Flow
    App->>Scheduler: Stop scheduler
    Scheduler-->>App: Stopped
    App->>DI: Close container
    DI-->>App: Closed
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 Once lived scheduling in providers deep,
Now APSchedulers their vigil keep,
Async yields gave way to returns so clean,
Lifecycle bound to where we've been,
Simpler code, the workers dance,
✨ Explicit flows, a second chance!

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main objective: moving lifecycle handling from providers to workers, which is reflected in all modified files.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/providers

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sonarqubecloud
Copy link

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 7 files

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
backend/app/core/container.py (1)

173-186: ⚠️ Potential issue | 🟡 Minor

Stale docstring in DLQProvider — now used by the DLQ worker container too.

create_dlq_processor_container now uses DLQProvider() (line 184), but DLQProvider's docstring in providers.py (line 266) says "Used by all containers except the DLQ worker." This is no longer accurate since the worker-specific provider was removed.

backend/app/core/providers.py (1)

265-266: ⚠️ Potential issue | 🟡 Minor

Stale docstring: DLQProvider is now used by the DLQ worker too.

The docstring says "Used by all containers except the DLQ worker" but create_dlq_processor_container now uses DLQProvider() directly.

Proposed fix
 class DLQProvider(Provider):
-    """Provides DLQManager without scheduling. Used by all containers except the DLQ worker."""
+    """Provides DLQManager without scheduling. Used by all containers."""
backend/app/core/dishka_lifespan.py (1)

46-48: ⚠️ Potential issue | 🟠 Major

AsyncMongoClient is never closed on shutdown — resource leak.

The client created on line 46 is not closed in the finally block (lines 94-102). When the application shuts down, the MongoDB connection pool leaks. This pattern affects every worker file in this PR: run_dlq_processor.py, run_pod_monitor.py, run_saga_orchestrator.py, run_event_replay.py, run_result_processor.py, run_coordinator.py, and run_k8s_worker.py.

Proposed fix for dishka_lifespan.py
     try:
         yield
     finally:
         notification_apscheduler.shutdown(wait=False)
         logger.info("NotificationScheduler stopped")
         await broker.stop()
         logger.info("Kafka broker stopped")
         await container.close()
         logger.info("DI container closed")
+        await client.close()
+        logger.info("MongoDB client closed")

Apply the same pattern to all worker files' shutdown callbacks.

🧹 Nitpick comments (2)
backend/workers/run_pod_monitor.py (2)

51-66: Accessing private _last_resource_version breaks encapsulation.

Line 57 directly mutates monitor._last_resource_version = None. This couples the worker to PodMonitor's internal state management. Consider adding a public reset_watch_cursor() method to PodMonitor instead.

Proposed approach

In PodMonitor (backend/app/services/pod_monitor/monitor.py), add:

def reset_watch_cursor(self) -> None:
    """Reset the resource version cursor, forcing a full LIST on next watch."""
    self._last_resource_version = None

Then in this file:

                    if e.status == 410:
                        logger.warning("Resource version expired, resetting watch cursor")
-                       monitor._last_resource_version = None
+                       monitor.reset_watch_cursor()
                        kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED)

68-77: APScheduler interval of 5s with long-running watch — verify intent.

watch_pod_events() blocks for watch_timeout_seconds (server-side timeout). With max_instances=1, APScheduler will skip firings while a watch is active, effectively re-launching within 5s of the previous watch completing. This is fine as a reconnect delay, but the 5s interval isn't the actual watch frequency — it's the max idle gap between watch streams. A brief comment clarifying this would help future readers.

@HardMax71 HardMax71 merged commit 5a0bb99 into main Feb 13, 2026
14 of 15 checks passed
@HardMax71 HardMax71 deleted the feat/providers branch February 13, 2026 20:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant