feat: moved lifecycle handling away from providers to workers, update…#176
feat: moved lifecycle handling away from providers to workers, update…#176
Conversation
…d corresponding files
📝 WalkthroughWalkthroughThis PR shifts lifecycle management from DI providers to explicit application-level APScheduler orchestration, replacing worker-specific providers with higher-level orchestrator providers and converting async lifecycle-managed provider methods to synchronous instance creation methods. Changes
Sequence Diagram(s)sequenceDiagram
participant App as FastStream App
participant Scheduler as APScheduler
participant DI as DI Container
participant Service as Worker Service
rect rgba(100, 150, 200, 0.5)
Note over App,Service: Startup Flow (New Pattern)
App->>DI: Resolve service instance
DI-->>App: Service created (no internal scheduling)
App->>Scheduler: Create AsyncIOScheduler
App->>Scheduler: Register periodic job
App->>Scheduler: Start scheduler
Scheduler-->>App: Running with intervals
end
rect rgba(200, 100, 100, 0.5)
Note over Scheduler,Service: Runtime (Explicit Scheduling)
loop Every N seconds
Scheduler->>Service: Call task (check_timeouts, process_monitoring, etc.)
Service-->>Scheduler: Task complete
end
end
rect rgba(100, 200, 100, 0.5)
Note over App,Scheduler: Shutdown Flow
App->>Scheduler: Stop scheduler
Scheduler-->>App: Stopped
App->>DI: Close container
DI-->>App: Closed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/app/core/container.py (1)
173-186:⚠️ Potential issue | 🟡 MinorStale docstring in
DLQProvider— now used by the DLQ worker container too.
create_dlq_processor_containernow usesDLQProvider()(line 184), butDLQProvider's docstring inproviders.py(line 266) says "Used by all containers except the DLQ worker." This is no longer accurate since the worker-specific provider was removed.backend/app/core/providers.py (1)
265-266:⚠️ Potential issue | 🟡 MinorStale docstring:
DLQProvideris now used by the DLQ worker too.The docstring says "Used by all containers except the DLQ worker" but
create_dlq_processor_containernow usesDLQProvider()directly.Proposed fix
class DLQProvider(Provider): - """Provides DLQManager without scheduling. Used by all containers except the DLQ worker.""" + """Provides DLQManager without scheduling. Used by all containers."""backend/app/core/dishka_lifespan.py (1)
46-48:⚠️ Potential issue | 🟠 Major
AsyncMongoClientis never closed on shutdown — resource leak.The
clientcreated on line 46 is not closed in thefinallyblock (lines 94-102). When the application shuts down, the MongoDB connection pool leaks. This pattern affects every worker file in this PR:run_dlq_processor.py,run_pod_monitor.py,run_saga_orchestrator.py,run_event_replay.py,run_result_processor.py,run_coordinator.py, andrun_k8s_worker.py.Proposed fix for dishka_lifespan.py
try: yield finally: notification_apscheduler.shutdown(wait=False) logger.info("NotificationScheduler stopped") await broker.stop() logger.info("Kafka broker stopped") await container.close() logger.info("DI container closed") + await client.close() + logger.info("MongoDB client closed")Apply the same pattern to all worker files' shutdown callbacks.
🧹 Nitpick comments (2)
backend/workers/run_pod_monitor.py (2)
51-66: Accessing private_last_resource_versionbreaks encapsulation.Line 57 directly mutates
monitor._last_resource_version = None. This couples the worker toPodMonitor's internal state management. Consider adding a publicreset_watch_cursor()method toPodMonitorinstead.Proposed approach
In
PodMonitor(backend/app/services/pod_monitor/monitor.py), add:def reset_watch_cursor(self) -> None: """Reset the resource version cursor, forcing a full LIST on next watch.""" self._last_resource_version = NoneThen in this file:
if e.status == 410: logger.warning("Resource version expired, resetting watch cursor") - monitor._last_resource_version = None + monitor.reset_watch_cursor() kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED)
68-77: APScheduler interval of 5s with long-running watch — verify intent.
watch_pod_events()blocks forwatch_timeout_seconds(server-side timeout). Withmax_instances=1, APScheduler will skip firings while a watch is active, effectively re-launching within 5s of the previous watch completing. This is fine as a reconnect delay, but the 5s interval isn't the actual watch frequency — it's the max idle gap between watch streams. A brief comment clarifying this would help future readers.



Summary by cubic
Moved scheduler and Kafka lifecycle management out of DI providers and into workers and the app lifespan. This simplifies providers and makes startup/shutdown explicit and predictable.
Written for commit 8c4db86. Summary will update on new commits.
Summary by CodeRabbit
Refactor
Chores