Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ jobs:
docker compose logs --timestamps > logs/docker-compose.log 2>&1
for svc in backend mongo redis kafka zookeeper \
coordinator k8s-worker pod-monitor result-processor \
saga-orchestrator event-replay dlq-processor; do
saga-orchestrator event-replay; do
docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true
done
kubectl get events --sort-by='.metadata.creationTimestamp' -A > logs/k8s-events.log 2>&1 || true
Expand Down
29 changes: 1 addition & 28 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
BusinessServicesProvider,
CoordinatorProvider,
CoreServicesProvider,
DLQProvider,
DLQWorkerProvider,
EventReplayProvider,
EventReplayWorkerProvider,
Expand Down Expand Up @@ -51,7 +50,7 @@ def create_app_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
DLQWorkerProvider(),
SagaOrchestratorProvider(),
KafkaServicesProvider(),
SSEProvider(),
Expand Down Expand Up @@ -79,7 +78,6 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)
Expand All @@ -96,7 +94,6 @@ def create_coordinator_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
CoordinatorProvider(),
context={Settings: settings},
)
Expand All @@ -113,7 +110,6 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
Expand All @@ -131,7 +127,6 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
Expand All @@ -153,7 +148,6 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
SagaWorkerProvider(),
context={Settings: settings},
)
Expand All @@ -173,27 +167,6 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventReplayWorkerProvider(),
context={Settings: settings},
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker.

Uses DLQWorkerProvider which adds APScheduler-managed retry monitoring
and configures retry policies and filters.
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
BrokerProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQWorkerProvider(),
context={Settings: settings},
)
78 changes: 30 additions & 48 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from app.dlq.manager import DLQManager
from app.dlq.models import RetryPolicy, RetryStrategy
from app.domain.enums import KafkaTopic
from app.domain.enums import EventType
from app.domain.rate_limit import RateLimitConfig
from app.domain.saga import SagaConfig
from app.events.core import UnifiedProducer
Expand Down Expand Up @@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:

Keys must match message.original_topic (full prefixed topic name).
"""
execution_events = f"{prefix}{KafkaTopic.EXECUTION_EVENTS}"
pod_events = f"{prefix}{KafkaTopic.POD_EVENTS}"
saga_commands = f"{prefix}{KafkaTopic.SAGA_COMMANDS}"
execution_results = f"{prefix}{KafkaTopic.EXECUTION_RESULTS}"

return {
execution_events: RetryPolicy(
topic=execution_events,
policies: dict[str, RetryPolicy] = {}

for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED,
EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT,
EventType.EXECUTION_CANCELLED):
Comment on lines +227 to +229
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 227:

<comment>Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.</comment>

<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
-            topic=execution_events,
+    policies: dict[str, RetryPolicy] = {}
+
+    for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED,
+               EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT,
+               EventType.EXECUTION_CANCELLED):
</file context>
Suggested change
for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED,
EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT,
EventType.EXECUTION_CANCELLED):
for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_ACCEPTED,
EventType.EXECUTION_QUEUED, EventType.EXECUTION_STARTED,
EventType.EXECUTION_RUNNING, EventType.EXECUTION_COMPLETED,
EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT,
EventType.EXECUTION_CANCELLED):
Fix with Cubic

topic = f"{prefix}{et}"
policies[topic] = RetryPolicy(
topic=topic,
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
max_retries=5,
base_delay_seconds=30,
max_delay_seconds=300,
retry_multiplier=2.0,
),
pod_events: RetryPolicy(
topic=pod_events,
)

for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED):
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 240:

<comment>Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split.</comment>

<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
-            topic=pod_events,
+        )
+
+    for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED):
+        topic = f"{prefix}{et}"
+        policies[topic] = RetryPolicy(
</file context>
Suggested change
for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED):
for et in (EventType.POD_CREATED, EventType.POD_SCHEDULED, EventType.POD_RUNNING,
EventType.POD_SUCCEEDED, EventType.POD_FAILED, EventType.POD_TERMINATED,
EventType.POD_DELETED):
Fix with Cubic

topic = f"{prefix}{et}"
policies[topic] = RetryPolicy(
topic=topic,
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
max_retries=3,
base_delay_seconds=60,
max_delay_seconds=600,
retry_multiplier=3.0,
),
saga_commands: RetryPolicy(
topic=saga_commands,
)

for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND):
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/core/providers.py, line 251:

<comment>Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split.</comment>

<file context>
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]:
-            topic=saga_commands,
+        )
+
+    for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND):
+        topic = f"{prefix}{et}"
+        policies[topic] = RetryPolicy(
</file context>
Suggested change
for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND):
for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND,
EventType.ALLOCATE_RESOURCES_COMMAND, EventType.RELEASE_RESOURCES_COMMAND):
Fix with Cubic

topic = f"{prefix}{et}"
policies[topic] = RetryPolicy(
topic=topic,
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
max_retries=5,
base_delay_seconds=30,
max_delay_seconds=300,
retry_multiplier=2.0,
),
execution_results: RetryPolicy(
topic=execution_results,
)

for et in (EventType.RESULT_STORED, EventType.RESULT_FAILED):
topic = f"{prefix}{et}"
policies[topic] = RetryPolicy(
topic=topic,
strategy=RetryStrategy.IMMEDIATE,
max_retries=3,
),
}


class DLQProvider(Provider):
"""Provides DLQManager without scheduling. Used by all containers except the DLQ worker."""

scope = Scope.APP

@provide
def get_dlq_manager(
self,
broker: KafkaBroker,
settings: Settings,
logger: logging.Logger,
dlq_metrics: DLQMetrics,
repository: DLQRepository,
) -> DLQManager:
return DLQManager(
settings=settings,
broker=broker,
logger=logger,
dlq_metrics=dlq_metrics,
repository=repository,
default_retry_policy=_default_retry_policy(),
retry_policies=_default_retry_policies(settings.KAFKA_TOPIC_PREFIX),
)

return policies

class DLQWorkerProvider(Provider):
"""Provides DLQManager with APScheduler-managed retry monitoring.

Used by the DLQ worker container only.
"""
class DLQWorkerProvider(Provider):
"""Provides DLQManager with APScheduler-managed retry monitoring."""

scope = Scope.APP

Expand Down
3 changes: 1 addition & 2 deletions backend/app/db/docs/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pydantic import BaseModel, ConfigDict, Field
from pymongo import IndexModel

from app.domain.enums import EventType, KafkaTopic, ReplayStatus, ReplayTarget, ReplayType
from app.domain.enums import ReplayStatus, ReplayTarget, ReplayType
from app.domain.replay import ReplayError, ReplayFilter


Expand All @@ -24,7 +24,6 @@ class ReplayConfig(BaseModel):
batch_size: int = Field(default=100, ge=1, le=1000)
max_events: int | None = Field(default=None, ge=1)

target_topics: dict[EventType, KafkaTopic] | None = None
target_file_path: str | None = None

skip_errors: bool = True
Expand Down
96 changes: 48 additions & 48 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
RetryPolicy,
RetryStrategy,
)
from app.domain.enums import KafkaTopic
from app.domain.events import (
DLQMessageDiscardedEvent,
DLQMessageReceivedEvent,
Expand All @@ -42,15 +41,13 @@ def __init__(
repository: DLQRepository,
default_retry_policy: RetryPolicy,
retry_policies: dict[str, RetryPolicy],
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
filters: list[Callable[[DLQMessage], bool]] | None = None,
):
self.settings = settings
self._broker = broker
self.logger = logger
self.metrics = dlq_metrics
self.repository = repository
self.dlq_topic = dlq_topic
self.default_retry_policy = default_retry_policy
self._retry_policies = retry_policies

Expand All @@ -61,7 +58,7 @@ def __init__(
] if f is not None
]

self._dlq_events_topic = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DLQ_EVENTS}"
self._topic_prefix = settings.KAFKA_TOPIC_PREFIX

def _filter_test_events(self, message: DLQMessage) -> bool:
return not message.event.event_id.startswith("test-")
Expand All @@ -87,23 +84,24 @@ async def handle_message(self, message: DLQMessage) -> None:
message.last_updated = datetime.now(timezone.utc)
await self.repository.save_message(message)

await self._broker.publish(
DLQMessageReceivedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
error=message.error,
retry_count=message.retry_count,
producer_id=message.producer_id,
failed_at=message.failed_at,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
received_event = DLQMessageReceivedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
error=message.error,
retry_count=message.retry_count,
producer_id=message.producer_id,
failed_at=message.failed_at,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
topic=self._dlq_events_topic,
)
await self._broker.publish(
received_event,
topic=f"{self._topic_prefix}{received_event.event_type}",
)

retry_policy = self._retry_policies.get(message.original_topic, self.default_retry_policy)
Expand Down Expand Up @@ -144,21 +142,22 @@ async def retry_message(self, message: DLQMessage) -> None:
),
)

await self._broker.publish(
DLQMessageRetriedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
retried_event = DLQMessageRetriedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
topic=self._dlq_events_topic,
)
await self._broker.publish(
retried_event,
topic=f"{self._topic_prefix}{retried_event.event_type}",
)
Comment on lines +145 to 161
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Notification publish failure after successful retry can corrupt batch results.

In retry_message, the core retry (Line 127-131) and DB update (Line 136-143) complete before the DLQMessageRetriedEvent is published (Line 158-161). If this notification publish fails, the exception propagates to retry_messages_batch (Line 246), which catches it and marks the message as "failed" — even though the retry actually succeeded.

Consider wrapping the notification publishes in a try/except so that a notification failure doesn't mask a successful retry/discard.

🛡️ Proposed fix
-        await self._broker.publish(
-            retried_event,
-            topic=f"{self._topic_prefix}{retried_event.event_type}",
-        )
+        try:
+            await self._broker.publish(
+                retried_event,
+                topic=f"{self._topic_prefix}{retried_event.event_type}",
+            )
+        except Exception:
+            self.logger.warning(
+                "Failed to publish DLQ retried notification event",
+                extra={"event_id": message.event.event_id},
+                exc_info=True,
+            )

Apply the same pattern to handle_message (Line 102-105) and discard_message (Line 190-193).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
retried_event = DLQMessageRetriedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
topic=self._dlq_events_topic,
)
await self._broker.publish(
retried_event,
topic=f"{self._topic_prefix}{retried_event.event_type}",
)
retried_event = DLQMessageRetriedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
)
try:
await self._broker.publish(
retried_event,
topic=f"{self._topic_prefix}{retried_event.event_type}",
)
except Exception:
self.logger.warning(
"Failed to publish DLQ retried notification event",
extra={"event_id": message.event.event_id},
exc_info=True,
)
🤖 Prompt for AI Agents
In `@backend/app/dlq/manager.py` around lines 145 - 161, The publish of
DLQMessageRetriedEvent via self._broker.publish in retry_message can raise and
cause retry_messages_batch to mark a message as "failed" even though retry and
DB update succeeded; wrap the publish call in a try/except that catches
exceptions from self._broker.publish, logs the failure (or records a non-fatal
notification error) and does not re-raise so the successful retry/discard state
remains; apply the same pattern to the publish calls in handle_message and
discard_message to ensure notification failures never overwrite successful
operations (reference functions: retry_message, handle_message, discard_message;
event class: DLQMessageRetriedEvent; method: self._broker.publish; surrounding
caller: retry_messages_batch).

self.logger.info("Successfully retried message", extra={"event_id": message.event.event_id})

Expand All @@ -175,21 +174,22 @@ async def discard_message(self, message: DLQMessage, reason: str) -> None:
),
)

await self._broker.publish(
DLQMessageDiscardedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
reason=reason,
retry_count=message.retry_count,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
discarded_event = DLQMessageDiscardedEvent(
dlq_event_id=message.event.event_id,
original_topic=message.original_topic,
original_event_type=message.event.event_type,
reason=reason,
retry_count=message.retry_count,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
user_id=message.event.metadata.user_id,
),
topic=self._dlq_events_topic,
)
await self._broker.publish(
discarded_event,
topic=f"{self._topic_prefix}{discarded_event.event_type}",
)
self.logger.warning("Discarded message", extra={"event_id": message.event.event_id, "reason": reason})

Expand Down
3 changes: 1 addition & 2 deletions backend/app/domain/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from app.domain.enums.common import Environment, ErrorType, ExportFormat, SortOrder, Theme
from app.domain.enums.events import EventType
from app.domain.enums.execution import CancelStatus, ExecutionStatus, QueuePriority
from app.domain.enums.kafka import GroupId, KafkaTopic
from app.domain.enums.kafka import GroupId
from app.domain.enums.notification import (
NotificationChannel,
NotificationSeverity,
Expand Down Expand Up @@ -32,7 +32,6 @@
"QueuePriority",
# Kafka
"GroupId",
"KafkaTopic",
# Notification
"NotificationChannel",
"NotificationSeverity",
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/enums/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class EventType(StringEnum):
ALLOCATE_RESOURCES_COMMAND = "allocate_resources_command"
RELEASE_RESOURCES_COMMAND = "release_resources_command"

# DLQ events
# DLQ
DLQ_MESSAGE_RECEIVED = "dlq_message_received"
DLQ_MESSAGE_RETRIED = "dlq_message_retried"
DLQ_MESSAGE_DISCARDED = "dlq_message_discarded"
Loading
Loading