-
Notifications
You must be signed in to change notification settings - Fork 0
feat: 1 kafka topic = 1 msg type, everywhere #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -47,7 +47,7 @@ | |||||||||
| ) | ||||||||||
| from app.dlq.manager import DLQManager | ||||||||||
| from app.dlq.models import RetryPolicy, RetryStrategy | ||||||||||
| from app.domain.enums import KafkaTopic | ||||||||||
| from app.domain.enums import EventType | ||||||||||
| from app.domain.rate_limit import RateLimitConfig | ||||||||||
| from app.domain.saga import SagaConfig | ||||||||||
| from app.events.core import UnifiedProducer | ||||||||||
|
|
@@ -222,74 +222,56 @@ def _default_retry_policies(prefix: str) -> dict[str, RetryPolicy]: | |||||||||
|
|
||||||||||
| Keys must match message.original_topic (full prefixed topic name). | ||||||||||
| """ | ||||||||||
| execution_events = f"{prefix}{KafkaTopic.EXECUTION_EVENTS}" | ||||||||||
| pod_events = f"{prefix}{KafkaTopic.POD_EVENTS}" | ||||||||||
| saga_commands = f"{prefix}{KafkaTopic.SAGA_COMMANDS}" | ||||||||||
| execution_results = f"{prefix}{KafkaTopic.EXECUTION_RESULTS}" | ||||||||||
|
|
||||||||||
| return { | ||||||||||
| execution_events: RetryPolicy( | ||||||||||
| topic=execution_events, | ||||||||||
| policies: dict[str, RetryPolicy] = {} | ||||||||||
|
|
||||||||||
| for et in (EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED, | ||||||||||
| EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, | ||||||||||
| EventType.EXECUTION_CANCELLED): | ||||||||||
| topic = f"{prefix}{et}" | ||||||||||
| policies[topic] = RetryPolicy( | ||||||||||
| topic=topic, | ||||||||||
| strategy=RetryStrategy.EXPONENTIAL_BACKOFF, | ||||||||||
| max_retries=5, | ||||||||||
| base_delay_seconds=30, | ||||||||||
| max_delay_seconds=300, | ||||||||||
| retry_multiplier=2.0, | ||||||||||
| ), | ||||||||||
| pod_events: RetryPolicy( | ||||||||||
| topic=pod_events, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| for et in (EventType.POD_CREATED, EventType.POD_FAILED, EventType.POD_SUCCEEDED): | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Pod retry policies now omit several pod lifecycle topics (scheduled/running/terminated/deleted), so those topics fall back to the default policy instead of the pod-specific policy. Add the missing pod event types to keep retry behavior consistent after the topic split. Prompt for AI agents
Suggested change
|
||||||||||
| topic = f"{prefix}{et}" | ||||||||||
| policies[topic] = RetryPolicy( | ||||||||||
| topic=topic, | ||||||||||
| strategy=RetryStrategy.EXPONENTIAL_BACKOFF, | ||||||||||
| max_retries=3, | ||||||||||
| base_delay_seconds=60, | ||||||||||
| max_delay_seconds=600, | ||||||||||
| retry_multiplier=3.0, | ||||||||||
| ), | ||||||||||
| saga_commands: RetryPolicy( | ||||||||||
| topic=saga_commands, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| for et in (EventType.CREATE_POD_COMMAND, EventType.DELETE_POD_COMMAND): | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Saga command retry policies now omit allocate/release resource commands, so those topics fall back to the default policy instead of the saga-specific policy. Include the missing saga command types to keep retry behavior consistent after the topic split. Prompt for AI agents
Suggested change
|
||||||||||
| topic = f"{prefix}{et}" | ||||||||||
| policies[topic] = RetryPolicy( | ||||||||||
| topic=topic, | ||||||||||
| strategy=RetryStrategy.EXPONENTIAL_BACKOFF, | ||||||||||
| max_retries=5, | ||||||||||
| base_delay_seconds=30, | ||||||||||
| max_delay_seconds=300, | ||||||||||
| retry_multiplier=2.0, | ||||||||||
| ), | ||||||||||
| execution_results: RetryPolicy( | ||||||||||
| topic=execution_results, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| for et in (EventType.RESULT_STORED, EventType.RESULT_FAILED): | ||||||||||
| topic = f"{prefix}{et}" | ||||||||||
| policies[topic] = RetryPolicy( | ||||||||||
| topic=topic, | ||||||||||
| strategy=RetryStrategy.IMMEDIATE, | ||||||||||
| max_retries=3, | ||||||||||
| ), | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class DLQProvider(Provider): | ||||||||||
| """Provides DLQManager without scheduling. Used by all containers except the DLQ worker.""" | ||||||||||
|
|
||||||||||
| scope = Scope.APP | ||||||||||
|
|
||||||||||
| @provide | ||||||||||
| def get_dlq_manager( | ||||||||||
| self, | ||||||||||
| broker: KafkaBroker, | ||||||||||
| settings: Settings, | ||||||||||
| logger: logging.Logger, | ||||||||||
| dlq_metrics: DLQMetrics, | ||||||||||
| repository: DLQRepository, | ||||||||||
| ) -> DLQManager: | ||||||||||
| return DLQManager( | ||||||||||
| settings=settings, | ||||||||||
| broker=broker, | ||||||||||
| logger=logger, | ||||||||||
| dlq_metrics=dlq_metrics, | ||||||||||
| repository=repository, | ||||||||||
| default_retry_policy=_default_retry_policy(), | ||||||||||
| retry_policies=_default_retry_policies(settings.KAFKA_TOPIC_PREFIX), | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| return policies | ||||||||||
|
|
||||||||||
| class DLQWorkerProvider(Provider): | ||||||||||
| """Provides DLQManager with APScheduler-managed retry monitoring. | ||||||||||
|
|
||||||||||
| Used by the DLQ worker container only. | ||||||||||
| """ | ||||||||||
| class DLQWorkerProvider(Provider): | ||||||||||
| """Provides DLQManager with APScheduler-managed retry monitoring.""" | ||||||||||
|
|
||||||||||
| scope = Scope.APP | ||||||||||
|
|
||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,7 +15,6 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RetryPolicy, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RetryStrategy, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.domain.enums import KafkaTopic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.domain.events import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DLQMessageDiscardedEvent, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DLQMessageReceivedEvent, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -42,15 +41,13 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| repository: DLQRepository, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default_retry_policy: RetryPolicy, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_policies: dict[str, RetryPolicy], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filters: list[Callable[[DLQMessage], bool]] | None = None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.settings = settings | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._broker = broker | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger = logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.metrics = dlq_metrics | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.repository = repository | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.dlq_topic = dlq_topic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.default_retry_policy = default_retry_policy | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._retry_policies = retry_policies | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -61,7 +58,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ] if f is not None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._dlq_events_topic = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DLQ_EVENTS}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._topic_prefix = settings.KAFKA_TOPIC_PREFIX | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _filter_test_events(self, message: DLQMessage) -> bool: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return not message.event.event_id.startswith("test-") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -87,23 +84,24 @@ async def handle_message(self, message: DLQMessage) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| message.last_updated = datetime.now(timezone.utc) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self.repository.save_message(message) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DLQMessageReceivedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| error=message.error, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=message.retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| producer_id=message.producer_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| failed_at=message.failed_at, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| received_event = DLQMessageReceivedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| error=message.error, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=message.retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| producer_id=message.producer_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| failed_at=message.failed_at, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=self._dlq_events_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| received_event, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=f"{self._topic_prefix}{received_event.event_type}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_policy = self._retry_policies.get(message.original_topic, self.default_retry_policy) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -144,21 +142,22 @@ async def retry_message(self, message: DLQMessage) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DLQMessageRetriedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=new_retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retried_event = DLQMessageRetriedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=new_retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=self._dlq_events_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retried_event, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=f"{self._topic_prefix}{retried_event.event_type}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+145
to
161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Notification publish failure after successful retry can corrupt batch results. In Consider wrapping the notification publishes in a try/except so that a notification failure doesn't mask a successful retry/discard. 🛡️ Proposed fix- await self._broker.publish(
- retried_event,
- topic=f"{self._topic_prefix}{retried_event.event_type}",
- )
+ try:
+ await self._broker.publish(
+ retried_event,
+ topic=f"{self._topic_prefix}{retried_event.event_type}",
+ )
+ except Exception:
+ self.logger.warning(
+ "Failed to publish DLQ retried notification event",
+ extra={"event_id": message.event.event_id},
+ exc_info=True,
+ )Apply the same pattern to 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.info("Successfully retried message", extra={"event_id": message.event.event_id}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -175,21 +174,22 @@ async def discard_message(self, message: DLQMessage, reason: str) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DLQMessageDiscardedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reason=reason, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=message.retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| discarded_event = DLQMessageDiscardedEvent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dlq_event_id=message.event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_topic=message.original_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| original_event_type=message.event.event_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reason=reason, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_count=message.retry_count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata=EventMetadata( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_name="dlq-manager", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| service_version="1.0.0", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| correlation_id=message.event.metadata.correlation_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| user_id=message.event.metadata.user_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=self._dlq_events_topic, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._broker.publish( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| discarded_event, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic=f"{self._topic_prefix}{discarded_event.event_type}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.warning("Discarded message", extra={"event_id": message.event.event_id, "reason": reason}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Execution event retry policies now exclude several execution lifecycle topics (accepted/queued/started/running), so those topics fall back to the default policy instead of the execution-specific policy. Include all execution lifecycle event types to preserve prior retry behavior after the per-topic split.
Prompt for AI agents