Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions clients/python/src/taskbroker_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any

import sentry_sdk
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation
from sentry_sdk.consts import OP, SPANDATA
Expand All @@ -17,7 +17,7 @@
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import ExternalTask, P, R, Task
from taskbroker_client.types import ProducerFactory
from taskbroker_client.types import ProducerFactory, ProducerProtocol

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,7 +51,7 @@ def __init__(
self.default_processing_deadline_duration = processing_deadline_duration # seconds
self.app_feature = app_feature or name
self._registered_tasks: dict[str, Task[Any, Any]] = {}
self._producers: dict[str, KafkaProducer] = {}
self._producers: dict[str, ProducerProtocol] = {}
self._producer_factory = producer_factory
self.metrics = metrics

Expand Down Expand Up @@ -191,7 +191,7 @@ def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False)
except Exception:
logger.exception("Failed to wait for delivery")

def _producer(self, topic: str) -> KafkaProducer:
def _producer(self, topic: str) -> ProducerProtocol:
if topic not in self._producers:
self._producers[topic] = self._producer_factory(topic)
return self._producers[topic]
Expand Down
14 changes: 12 additions & 2 deletions clients/python/src/taskbroker_client/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import dataclasses
from typing import Callable, Protocol

from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.abstract import ProducerFuture
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus


Expand All @@ -13,7 +15,15 @@ class AtMostOnceStore(Protocol):
def add(self, key: str, value: str, timeout: int) -> bool: ...


ProducerFactory = Callable[[str], KafkaProducer]
class ProducerProtocol(Protocol):
"""Interface for producers that tasks depend on."""

def produce(
self, topic: Topic, payload: KafkaPayload
) -> ProducerFuture[BrokerValue[KafkaPayload]]: ...


ProducerFactory = Callable[[str], ProducerProtocol]
"""
A factory interface for resolving topics into a KafkaProducer
that can produce on the provided topic.
Expand Down
Loading