From 2dfb04453275742a1ec3a6120ed9885a8b0a469f Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 20 Mar 2026 12:51:50 -0400 Subject: [PATCH 1/2] feat(client) Make the ProducerFactory depend on a protocol Using a concrete return type for ProducerFactory is blocking the adoption of a singleton producer in sentry. Refs getsentry/sentry#111191 --- clients/python/src/taskbroker_client/types.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 7bb59dad..6ab5058e 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -1,7 +1,9 @@ import dataclasses from typing import Callable, Protocol -from arroyo.backends.kafka import KafkaProducer +from arroyo.backends.abstract import ProducerFuture +from arroyo.backends.kafka import KafkaPayload +from arroyo.types import BrokerValue, Topic from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus @@ -13,7 +15,15 @@ class AtMostOnceStore(Protocol): def add(self, key: str, value: str, timeout: int) -> bool: ... -ProducerFactory = Callable[[str], KafkaProducer] +class ProducerProtocol(Protocol): + """Interface for producers that tasks depend on.""" + + def produce( + self, topic: Topic, payload: KafkaPayload + ) -> ProducerFuture[BrokerValue[KafkaPayload]]: ... + + +ProducerFactory = Callable[[str], ProducerProtocol] """ A factory interface for resolving topics into a KafkaProducer that can produce on the provided topic. From 3fb520785d8e243a1a8f2256c9cadd52804dc409 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 20 Mar 2026 16:13:14 -0400 Subject: [PATCH 2/2] Align types --- clients/python/src/taskbroker_client/registry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index ca67c497..6f505cf2 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -7,7 +7,7 @@ from typing import Any import sentry_sdk -from arroyo.backends.kafka import KafkaPayload, KafkaProducer +from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Topic from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA @@ -17,7 +17,7 @@ from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import ExternalTask, P, R, Task -from taskbroker_client.types import ProducerFactory +from taskbroker_client.types import ProducerFactory, ProducerProtocol logger = logging.getLogger(__name__) @@ -51,7 +51,7 @@ def __init__( self.default_processing_deadline_duration = processing_deadline_duration # seconds self.app_feature = app_feature or name self._registered_tasks: dict[str, Task[Any, Any]] = {} - self._producers: dict[str, KafkaProducer] = {} + self._producers: dict[str, ProducerProtocol] = {} self._producer_factory = producer_factory self.metrics = metrics @@ -191,7 +191,7 @@ def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) except Exception: logger.exception("Failed to wait for delivery") - def _producer(self, topic: str) -> KafkaProducer: + def _producer(self, topic: str) -> ProducerProtocol: if topic not in self._producers: self._producers[topic] = self._producer_factory(topic) return self._producers[topic]