From e8795b9cb4cf04ef59e27d04f2a3dba371e894bf Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 09:12:34 +0300 Subject: [PATCH] ported Topic proxy to asyncio --- hazelcast/internal/asyncio_client.py | 15 ++- hazelcast/internal/asyncio_proxy/manager.py | 3 + hazelcast/internal/asyncio_proxy/topic.py | 116 ++++++++++++++++++ hazelcast/proxy/topic.py | 2 +- tests/integration/asyncio/proxy/topic_test.py | 83 +++++++++++++ 5 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 hazelcast/internal/asyncio_proxy/topic.py create mode 100644 tests/integration/asyncio/proxy/topic_test.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 4377234c42..2fbae2b7ae 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -27,16 +27,18 @@ MULTI_MAP_SERVICE, ProxyManager, REPLICATED_MAP_SERVICE, + TOPIC_SERVICE, VECTOR_SERVICE, ) from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.multi_map import MultiMap from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.topic import Topic from hazelcast.internal.asyncio_reactor import AsyncioReactor from hazelcast.serialization import SerializationServiceV1 from hazelcast.internal.asyncio_statistics import Statistics -from hazelcast.types import KeyType, ValueType +from hazelcast.types import KeyType, MessageType, ValueType from hazelcast.util import AtomicInteger, RoundRobinLB __all__ = ("HazelcastClient",) @@ -297,6 +299,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_topic(self, name: str) -> Topic[MessageType]: + """Returns the distributed topic instance with the specified name. + + Args: + name: Name of the distributed topic. + + Returns: + Distributed topic instance with the specified name. + """ + return await self._proxy_manager.get_or_create(TOPIC_SERVICE, name) + async def create_vector_collection_config( self, name: str, diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 878bc934b2..f9b101cf15 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -3,6 +3,7 @@ from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy +from hazelcast.internal.asyncio_proxy.topic import create_topic_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( VectorCollection, create_vector_collection_proxy, @@ -18,6 +19,7 @@ MAP_SERVICE = "hz:impl:mapService" MULTI_MAP_SERVICE = "hz:impl:multiMapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" +TOPIC_SERVICE = "hz:impl:topicService" VECTOR_SERVICE = "hz:service:vector" _proxy_init: typing.Dict[ @@ -28,6 +30,7 @@ MAP_SERVICE: create_map_proxy, MULTI_MAP_SERVICE: create_multi_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, + TOPIC_SERVICE: create_topic_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/topic.py b/hazelcast/internal/asyncio_proxy/topic.py new file mode 100644 index 0000000000..9994c4e136 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/topic.py @@ -0,0 +1,116 @@ +import typing + +from hazelcast.protocol.codec import ( + topic_add_message_listener_codec, + topic_publish_codec, + topic_publish_all_codec, + topic_remove_message_listener_codec, +) +from hazelcast.internal.asyncio_proxy.base import PartitionSpecificProxy +from hazelcast.proxy.base import TopicMessage +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.types import MessageType +from hazelcast.util import check_not_none + + +class Topic(PartitionSpecificProxy, typing.Generic[MessageType]): + """Hazelcast provides distribution mechanism for publishing messages that + are delivered to multiple subscribers, which is also known as a + publish/subscribe (pub/sub) messaging model. + + Publish and subscriptions are cluster-wide. When a member subscribes to + a topic, it is actually registering for messages published by any member + in the cluster, including the new members joined after you added the + listener. + + Messages are ordered, meaning that listeners(subscribers) will process the + messages in the order they are actually published. + + Example: + >>> my_topic = await client.get_topic("my_topic") + >>> await my_topic.publish("hello") + + Warning: + Asyncio client topic proxy is not thread-safe, do not access it from other threads. + """ + + async def add_listener( + self, on_message: typing.Callable[[TopicMessage[MessageType]], None] = None + ) -> str: + """Subscribes to this topic. + + When someone publishes a message on this topic, ``on_message`` function + is called if provided. + + Args: + on_message: Function to be called when a message is published. + + Returns: + A registration id which is used as a key to remove the listener. + """ + codec = topic_add_message_listener_codec + request = codec.encode_request(self.name, self._is_smart) + + def handle(item_data, publish_time, uuid): + member = self._context.cluster_service.get_member(uuid) + item_event = TopicMessage( + self.name, self._to_object(item_data), publish_time / 1000.0, member + ) + on_message(item_event) + + return await self._register_listener( + request, + lambda r: codec.decode_response(r), + lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, reg_id), + lambda m: codec.handle(m, handle), + ) + + async def publish(self, message: MessageType) -> None: + """Publishes the message to all subscribers of this topic. + + Args: + message: The message to be published. + """ + try: + message_data = self._to_data(message) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.publish, message) + + request = topic_publish_codec.encode_request(self.name, message_data) + return await self._invoke(request) + + async def publish_all(self, messages: typing.Sequence[MessageType]) -> None: + """Publishes the messages to all subscribers of this topic. + + Args: + messages: The messages to be published. + """ + check_not_none(messages, "Messages cannot be None") + try: + topic_messages = [] + for m in messages: + check_not_none(m, "Message cannot be None") + data = self._to_data(m) + topic_messages.append(data) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.publish_all, messages) + + request = topic_publish_all_codec.encode_request(self.name, topic_messages) + return await self._invoke(request) + + async def remove_listener(self, registration_id: str) -> bool: + """Stops receiving messages for the given message listener. + + If the given listener already removed, this method does nothing. + + Args: + registration_id: Registration id of the listener to be removed. + + Returns: + ``True`` if the listener is removed, ``False`` otherwise. + """ + return await self._deregister_listener(registration_id) + + +async def create_topic_proxy(service_name, name, context): + return Topic(service_name, name, context) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index a17df87435..4d98ec4e7d 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -18,7 +18,7 @@ class Topic(PartitionSpecificProxy["BlockingTopic"], typing.Generic[MessageType] are delivered to multiple subscribers, which is also known as a publish/subscribe (pub/sub) messaging model. - Publish and subscriptions are cluster-wide. When a member subscribes for + Publish and subscriptions are cluster-wide. When a member subscribes to a topic, it is actually registering for messages published by any member in the cluster, including the new members joined after you added the listener. diff --git a/tests/integration/asyncio/proxy/topic_test.py b/tests/integration/asyncio/proxy/topic_test.py new file mode 100644 index 0000000000..f9b5498f02 --- /dev/null +++ b/tests/integration/asyncio/proxy/topic_test.py @@ -0,0 +1,83 @@ +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import ( + random_string, + event_collector, + skip_if_client_version_older_than, + skip_if_server_version_older_than, +) + + +class TopicTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.topic = await self.client.get_topic(random_string()) + + async def asyncTearDown(self): + await self.topic.destroy() + await super().asyncTearDown() + + async def test_add_listener(self): + collector = event_collector() + await self.topic.add_listener(on_message=collector) + await self.topic.publish("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, "item-value") + self.assertGreater(event.publish_time, 0) + + await self.assertTrueEventually(assert_event, 5) + + async def test_remove_listener(self): + collector = event_collector() + reg_id = await self.topic.add_listener(on_message=collector) + await self.topic.remove_listener(reg_id) + await self.topic.publish("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 0) + if len(collector.events) > 0: + event = collector.events[0] + self.assertEqual(event.message, "item-value") + self.assertGreater(event.publish_time, 0) + + await self.assertTrueEventually(assert_event, 5) + + async def test_str(self): + self.assertTrue(str(self.topic).startswith("Topic")) + + async def test_publish_all(self): + skip_if_client_version_older_than(self, "5.2") + skip_if_server_version_older_than(self, self.client, "4.1") + + collector = event_collector() + await self.topic.add_listener(on_message=collector) + + messages = ["message1", "message2", "message3"] + await self.topic.publish_all(messages) + + def assert_event(): + self.assertEqual(len(collector.events), 3) + + await self.assertTrueEventually(assert_event, 5) + + async def test_publish_all_none_messages(self): + skip_if_client_version_older_than(self, "5.2") + skip_if_server_version_older_than(self, self.client, "4.1") + + with self.assertRaises(AssertionError): + await self.topic.publish_all(None) + + async def test_publish_all_none_message(self): + skip_if_client_version_older_than(self, "5.2") + skip_if_server_version_older_than(self, self.client, "4.1") + + messages = ["message1", None, "message3"] + with self.assertRaises(AssertionError): + await self.topic.publish_all(messages)