Conversation
📝 WalkthroughWalkthroughReplaces KafkaTopic-based routing with EventType/string topics, removes KafkaTopic enum and topic config helpers, consolidates DLQ logic into a DLQWorkerProvider with APScheduler-managed retry monitoring, and removes the standalone DLQ processor worker; many consumers/subscribers and tests updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Container as Container (create_*)
participant Provider as DLQWorkerProvider
participant Scheduler as APScheduler
participant DLQMgr as DLQManager
participant Broker as KafkaBroker
Container->>Provider: resolve DLQWorkerProvider
Provider->>Scheduler: start scheduler (on provider start)
Provider->>DLQMgr: instantiate DLQManager (get_dlq_manager)
DLQMgr->>Broker: publish(received_event) -> topic = prefix + EventType
Scheduler->>DLQMgr: trigger retry check (periodic)
DLQMgr->>Broker: publish(retried_event/discarded_event) -> topic = prefix + EventType
Provider->>Scheduler: stop scheduler (on provider stop)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/dlq/manager.py`:
- Around line 145-161: The publish of DLQMessageRetriedEvent via
self._broker.publish in retry_message can raise and cause retry_messages_batch
to mark a message as "failed" even though retry and DB update succeeded; wrap
the publish call in a try/except that catches exceptions from
self._broker.publish, logs the failure (or records a non-fatal notification
error) and does not re-raise so the successful retry/discard state remains;
apply the same pattern to the publish calls in handle_message and
discard_message to ensure notification failures never overwrite successful
operations (reference functions: retry_message, handle_message, discard_message;
event class: DLQMessageRetriedEvent; method: self._broker.publish; surrounding
caller: retry_messages_batch).
In `@backend/tests/e2e/dlq/test_dlq_discard.py`:
- Line 35: The test DLQ document uses
original_topic=str(EventType.EXECUTION_REQUESTED) which omits the
KAFKA_TOPIC_PREFIX and creates inconsistent, non-production-like topic names;
update the creation to include the prefix (e.g.,
f"{KAFKA_TOPIC_PREFIX}{EventType.EXECUTION_REQUESTED}") or change the helper
_create_dlq_document to accept a prefix parameter and use it when building
original_topic so tests (including where _create_dlq_document is called) produce
prefixed topics consistent with production and with the behavior in
test_dlq_manager.py.
🧹 Nitpick comments (5)
backend/tests/unit/events/test_mappings_and_types.py (1)
5-10: Consider testing a negative case (unknown/unmapped event type).The test only verifies two happy-path lookups. A test for an event type that has no class mapping (returning
None) would increase confidence in the function's behavior.backend/app/infrastructure/kafka/mappings.py (1)
46-49: Minor: redundantlru_cacheonget_event_class_for_type.Since
_get_event_type_to_class()is already cached (returns the same dict every time), caching the.get()lookup on top adds negligible value. Not a problem, just unnecessary layering.backend/tests/e2e/dlq/test_dlq_retry.py (1)
19-44: Duplicated_create_dlq_documenthelper across DLQ test files.This helper is nearly identical to the one in
test_dlq_discard.py(lines 19–44). Both also share the sameoriginal_topicprefix inconsistency. Consider extracting it into a shared DLQ test fixture or conftest to reduce duplication and fix the topic format in one place.backend/tests/e2e/events/test_producer_roundtrip.py (1)
15-22: Test has no assertions — consider adding a basic check.The test fires
produce()but never asserts on the outcome. Even for an e2e smoke test, verifying thatproducecompletes without exception (or returns a non-Noneresult) would add a small safety net. Consider at minimum an explicit assertion or a comment explaining that "no exception == pass" is intentional.backend/app/dlq/manager.py (1)
87-105: DuplicatedEventMetadataconstruction across three methods.The same
EventMetadata(service_name="dlq-manager", service_version="1.0.0", ...)pattern is repeated inhandle_message(Line 95),retry_message(Line 151), anddiscard_message(Line 183). Consider extracting a small helper to reduce duplication and ensure consistency if the service name/version ever changes.♻️ Example helper extraction
+ def _event_metadata(self, message: DLQMessage) -> EventMetadata: + return EventMetadata( + service_name="dlq-manager", + service_version="1.0.0", + correlation_id=message.event.metadata.correlation_id, + user_id=message.event.metadata.user_id, + )Then replace each inline
EventMetadata(...)withself._event_metadata(message).
| retried_event = DLQMessageRetriedEvent( | ||
| dlq_event_id=message.event.event_id, | ||
| original_topic=message.original_topic, | ||
| original_event_type=message.event.event_type, | ||
| retry_count=new_retry_count, | ||
| retry_topic=message.original_topic, | ||
| metadata=EventMetadata( | ||
| service_name="dlq-manager", | ||
| service_version="1.0.0", | ||
| correlation_id=message.event.metadata.correlation_id, | ||
| user_id=message.event.metadata.user_id, | ||
| ), | ||
| topic=self._dlq_events_topic, | ||
| ) | ||
| await self._broker.publish( | ||
| retried_event, | ||
| topic=f"{self._topic_prefix}{retried_event.event_type}", | ||
| ) |
There was a problem hiding this comment.
Notification publish failure after successful retry can corrupt batch results.
In retry_message, the core retry (Line 127-131) and DB update (Line 136-143) complete before the DLQMessageRetriedEvent is published (Line 158-161). If this notification publish fails, the exception propagates to retry_messages_batch (Line 246), which catches it and marks the message as "failed" — even though the retry actually succeeded.
Consider wrapping the notification publishes in a try/except so that a notification failure doesn't mask a successful retry/discard.
🛡️ Proposed fix
- await self._broker.publish(
- retried_event,
- topic=f"{self._topic_prefix}{retried_event.event_type}",
- )
+ try:
+ await self._broker.publish(
+ retried_event,
+ topic=f"{self._topic_prefix}{retried_event.event_type}",
+ )
+ except Exception:
+ self.logger.warning(
+ "Failed to publish DLQ retried notification event",
+ extra={"event_id": message.event.event_id},
+ exc_info=True,
+ )Apply the same pattern to handle_message (Line 102-105) and discard_message (Line 190-193).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| retried_event = DLQMessageRetriedEvent( | |
| dlq_event_id=message.event.event_id, | |
| original_topic=message.original_topic, | |
| original_event_type=message.event.event_type, | |
| retry_count=new_retry_count, | |
| retry_topic=message.original_topic, | |
| metadata=EventMetadata( | |
| service_name="dlq-manager", | |
| service_version="1.0.0", | |
| correlation_id=message.event.metadata.correlation_id, | |
| user_id=message.event.metadata.user_id, | |
| ), | |
| topic=self._dlq_events_topic, | |
| ) | |
| await self._broker.publish( | |
| retried_event, | |
| topic=f"{self._topic_prefix}{retried_event.event_type}", | |
| ) | |
| retried_event = DLQMessageRetriedEvent( | |
| dlq_event_id=message.event.event_id, | |
| original_topic=message.original_topic, | |
| original_event_type=message.event.event_type, | |
| retry_count=new_retry_count, | |
| retry_topic=message.original_topic, | |
| metadata=EventMetadata( | |
| service_name="dlq-manager", | |
| service_version="1.0.0", | |
| correlation_id=message.event.metadata.correlation_id, | |
| user_id=message.event.metadata.user_id, | |
| ), | |
| ) | |
| try: | |
| await self._broker.publish( | |
| retried_event, | |
| topic=f"{self._topic_prefix}{retried_event.event_type}", | |
| ) | |
| except Exception: | |
| self.logger.warning( | |
| "Failed to publish DLQ retried notification event", | |
| extra={"event_id": message.event.event_id}, | |
| exc_info=True, | |
| ) |
🤖 Prompt for AI Agents
In `@backend/app/dlq/manager.py` around lines 145 - 161, The publish of
DLQMessageRetriedEvent via self._broker.publish in retry_message can raise and
cause retry_messages_batch to mark a message as "failed" even though retry and
DB update succeeded; wrap the publish call in a try/except that catches
exceptions from self._broker.publish, logs the failure (or records a non-fatal
notification error) and does not re-raise so the successful retry/discard state
remains; apply the same pattern to the publish calls in handle_message and
discard_message to ensure notification failures never overwrite successful
operations (reference functions: retry_message, handle_message, discard_message;
event class: DLQMessageRetriedEvent; method: self._broker.publish; surrounding
caller: retry_messages_batch).
| doc = DLQMessageDocument( | ||
| event=event_dict, | ||
| original_topic=KafkaTopic.EXECUTION_EVENTS, | ||
| original_topic=str(EventType.EXECUTION_REQUESTED), |
There was a problem hiding this comment.
original_topic is missing the KAFKA_TOPIC_PREFIX, inconsistent with production topic names.
In production, topics are always prefixed (e.g., f"{prefix}{EventType.EXECUTION_REQUESTED}"). In test_dlq_manager.py:89, the same field uses the prefix. Here, str(EventType.EXECUTION_REQUESTED) yields just "execution_requested".
While this doesn't affect these repository-level tests (they don't route by topic), it creates unrealistic test data and inconsistency across the DLQ test suite.
♻️ Suggested fix
- original_topic=str(EventType.EXECUTION_REQUESTED),
+ original_topic=f"test-prefix-{EventType.EXECUTION_REQUESTED}",Or better, accept prefix as a parameter for _create_dlq_document.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| original_topic=str(EventType.EXECUTION_REQUESTED), | |
| original_topic=f"test-prefix-{EventType.EXECUTION_REQUESTED}", |
🤖 Prompt for AI Agents
In `@backend/tests/e2e/dlq/test_dlq_discard.py` at line 35, The test DLQ document
uses original_topic=str(EventType.EXECUTION_REQUESTED) which omits the
KAFKA_TOPIC_PREFIX and creates inconsistent, non-production-like topic names;
update the creation to include the prefix (e.g.,
f"{KAFKA_TOPIC_PREFIX}{EventType.EXECUTION_REQUESTED}") or change the helper
_create_dlq_document to accept a prefix parameter and use it when building
original_topic so tests (including where _create_dlq_document is called) produce
prefixed topics consistent with production and with the behavior in
test_dlq_manager.py.
There was a problem hiding this comment.
4 issues found across 27 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/core/providers.py">
<violation number="1" location="backend/app/core/providers.py:227">
P2: Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.</violation>
<violation number="2" location="backend/app/core/providers.py:240">
P2: Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split.</violation>
<violation number="3" location="backend/app/core/providers.py:251">
P2: Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split.</violation>
</file>
<file name="backend/app/infrastructure/kafka/mappings.py">
<violation number="1" location="backend/app/infrastructure/kafka/mappings.py:6">
P1: Missing subscription entries for active consumer groups. `POD_MONITOR`, `WEBSOCKET_GATEWAY`, and `DLQ_MANAGER` still exist in the codebase (e.g., `workers/run_pod_monitor.py`, `services/pod_monitor/event_mapper.py`) but are absent from the new `CONSUMER_GROUP_SUBSCRIPTIONS`. If this map is used to determine topic subscriptions, these services will have no subscriptions or fail with a `KeyError`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| EventType.DLQ_MESSAGE_RECEIVED: KafkaTopic.DLQ_EVENTS, | ||
| EventType.DLQ_MESSAGE_RETRIED: KafkaTopic.DLQ_EVENTS, | ||
| EventType.DLQ_MESSAGE_DISCARDED: KafkaTopic.DLQ_EVENTS, | ||
| CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[EventType]] = { |
There was a problem hiding this comment.
P1: Missing subscription entries for active consumer groups. POD_MONITOR, WEBSOCKET_GATEWAY, and DLQ_MANAGER still exist in the codebase (e.g., workers/run_pod_monitor.py, services/pod_monitor/event_mapper.py) but are absent from the new CONSUMER_GROUP_SUBSCRIPTIONS. If this map is used to determine topic subscriptions, these services will have no subscriptions or fail with a KeyError.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/infrastructure/kafka/mappings.py, line 6:
<comment>Missing subscription entries for active consumer groups. `POD_MONITOR`, `WEBSOCKET_GATEWAY`, and `DLQ_MANAGER` still exist in the codebase (e.g., `workers/run_pod_monitor.py`, `services/pod_monitor/event_mapper.py`) but are absent from the new `CONSUMER_GROUP_SUBSCRIPTIONS`. If this map is used to determine topic subscriptions, these services will have no subscriptions or fail with a `KeyError`.</comment>
<file context>
@@ -1,78 +1,35 @@
- EventType.DLQ_MESSAGE_RECEIVED: KafkaTopic.DLQ_EVENTS,
- EventType.DLQ_MESSAGE_RETRIED: KafkaTopic.DLQ_EVENTS,
- EventType.DLQ_MESSAGE_DISCARDED: KafkaTopic.DLQ_EVENTS,
+CONSUMER_GROUP_SUBSCRIPTIONS: dict[GroupId, set[EventType]] = {
+ GroupId.EXECUTION_COORDINATOR: {
+ EventType.EXECUTION_REQUESTED,
</file context>
| topic=saga_commands, | ||
| ) | ||
|
|
||
| for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND): |
There was a problem hiding this comment.
P2: Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 251:
<comment>Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split.</comment>
<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
- topic=saga_commands,
+ )
+
+ for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND):
+ topic = f"{prefix}{et}"
+ policies[topic] = RetryPolicy(
</file context>
| for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND): | |
| for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND, | |
| EventType.ALLOCATE_RESOURCES_COMMAND, EventType.RELEASE_RESOURCES_COMMAND): |
| topic=pod_events, | ||
| ) | ||
|
|
||
| for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED): |
There was a problem hiding this comment.
P2: Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 240:
<comment>Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split.</comment>
<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
- topic=pod_events,
+ )
+
+ for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED):
+ topic = f"{prefix}{et}"
+ policies[topic] = RetryPolicy(
</file context>
| for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED): | |
| for et in (EventType.POD_CREATED, EventType.POD_SCHEDULED, EventType.POD_RUNNING, | |
| EventType.POD_SUCCEEDED, EventType.POD_FAILED, EventType.POD_TERMINATED, | |
| EventType.POD_DELETED): |
| for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED, | ||
| EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, | ||
| EventType.EXECUTION_CANCELLED): |
There was a problem hiding this comment.
P2: Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 227:
<comment>Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.</comment>
<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
- topic=execution_events,
+ policies: dict[str, RetryPolicy] = {}
+
+ for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED,
+ EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT,
+ EventType.EXECUTION_CANCELLED):
</file context>
| for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED, | |
| EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, | |
| EventType.EXECUTION_CANCELLED): | |
| for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_ACCEPTED, | |
| EventType.EXECUTION_QUEUED, EventType.EXECUTION_STARTED, | |
| EventType.EXECUTION_RUNNING, EventType.EXECUTION_COMPLETED, | |
| EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, | |
| EventType.EXECUTION_CANCELLED): |
|



Summary by cubic
Switch to one Kafka topic per event type across the backend, with producers/consumers publishing and subscribing directly to event-type topics. Cleaned up deployment by removing the DLQ processor service.
Refactors
Migration
Written for commit 0b220b5. Summary will update on new commits.
Summary by CodeRabbit
Refactor
Chores
Tests