diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index ca67c497..6f505cf2 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -7,7 +7,7 @@ from typing import Any import sentry_sdk -from arroyo.backends.kafka import KafkaPayload, KafkaProducer +from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Topic from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA @@ -17,7 +17,7 @@ from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import ExternalTask, P, R, Task -from taskbroker_client.types import ProducerFactory +from taskbroker_client.types import ProducerFactory, ProducerProtocol logger = logging.getLogger(__name__) @@ -51,7 +51,7 @@ def __init__( self.default_processing_deadline_duration = processing_deadline_duration # seconds self.app_feature = app_feature or name self._registered_tasks: dict[str, Task[Any, Any]] = {} - self._producers: dict[str, KafkaProducer] = {} + self._producers: dict[str, ProducerProtocol] = {} self._producer_factory = producer_factory self.metrics = metrics @@ -191,7 +191,7 @@ def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) except Exception: logger.exception("Failed to wait for delivery") - def _producer(self, topic: str) -> KafkaProducer: + def _producer(self, topic: str) -> ProducerProtocol: if topic not in self._producers: self._producers[topic] = self._producer_factory(topic) return self._producers[topic] diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 7bb59dad..6ab5058e 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -1,7 +1,9 @@ import dataclasses from typing import Callable, Protocol -from arroyo.backends.kafka import KafkaProducer +from arroyo.backends.abstract import ProducerFuture +from arroyo.backends.kafka import KafkaPayload +from arroyo.types import BrokerValue, Topic from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus @@ -13,7 +15,15 @@ class AtMostOnceStore(Protocol): def add(self, key: str, value: str, timeout: int) -> bool: ... -ProducerFactory = Callable[[str], KafkaProducer] +class ProducerProtocol(Protocol): + """Interface for producers that tasks depend on.""" + + def produce( + self, topic: Topic, payload: KafkaPayload + ) -> ProducerFuture[BrokerValue[KafkaPayload]]: ... + + +ProducerFactory = Callable[[str], ProducerProtocol] """ A factory interface for resolving topics into a KafkaProducer that can produce on the provided topic.