Skip to content

feat(client) Make the ProducerFactory depend on a protocol#578

Merged
markstory merged 2 commits intomainfrom
chore-producer-protocol
Mar 20, 2026
Merged

feat(client) Make the ProducerFactory depend on a protocol#578
markstory merged 2 commits intomainfrom
chore-producer-protocol

Conversation

@markstory
Copy link
Member

Using a concrete return type for ProducerFactory is blocking the adoption of a singleton producer in sentry.

Refs getsentry/sentry#111191

Using a concrete return type for ProducerFactory is blocking the
adoption of a singleton producer in sentry.

Refs getsentry/sentry#111191
@markstory markstory requested a review from a team as a code owner March 20, 2026 16:53
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Incomplete protocol migration leaves registry using concrete type
    • Updated TaskNamespace to use ProducerProtocol instead of KafkaProducer in _producers dict and _producer method return type, completing the protocol migration.

Create PR

Or push these changes by commenting:

@cursor push e9348a2be7
Preview (e9348a2be7)
diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py
--- a/clients/python/src/taskbroker_client/registry.py
+++ b/clients/python/src/taskbroker_client/registry.py
@@ -17,7 +17,7 @@
 from taskbroker_client.retry import Retry
 from taskbroker_client.router import TaskRouter
 from taskbroker_client.task import ExternalTask, P, R, Task
-from taskbroker_client.types import ProducerFactory
+from taskbroker_client.types import ProducerFactory, ProducerProtocol
 
 logger = logging.getLogger(__name__)
 
@@ -51,7 +51,7 @@
         self.default_processing_deadline_duration = processing_deadline_duration  # seconds
         self.app_feature = app_feature or name
         self._registered_tasks: dict[str, Task[Any, Any]] = {}
-        self._producers: dict[str, KafkaProducer] = {}
+        self._producers: dict[str, ProducerProtocol] = {}
         self._producer_factory = producer_factory
         self.metrics = metrics
 
@@ -191,7 +191,7 @@
             except Exception:
                 logger.exception("Failed to wait for delivery")
 
-    def _producer(self, topic: str) -> KafkaProducer:
+    def _producer(self, topic: str) -> ProducerProtocol:
         if topic not in self._producers:
             self._producers[topic] = self._producer_factory(topic)
         return self._producers[topic]

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@markstory markstory merged commit 2757233 into main Mar 20, 2026
23 checks passed
@markstory markstory deleted the chore-producer-protocol branch March 20, 2026 20:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants