From 27603a940ff421b02bb9e1a03db1d7c5b85e066f Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 23 Mar 2026 11:52:26 +0300 Subject: [PATCH] Ported Set to asyncio --- hazelcast/internal/asyncio_client.py | 13 + hazelcast/internal/asyncio_proxy/manager.py | 3 + hazelcast/internal/asyncio_proxy/set.py | 285 ++++++++++++++++++++ tests/integration/asyncio/proxy/set_test.py | 174 ++++++++++++ 4 files changed, 475 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/set.py create mode 100644 tests/integration/asyncio/proxy/set_test.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f6e2d62954..3e132e7b8d 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -27,12 +27,14 @@ MAP_SERVICE, ProxyManager, REPLICATED_MAP_SERVICE, + SET_SERVICE, VECTOR_SERVICE, ) from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.set import Set from hazelcast.internal.asyncio_reactor import AsyncioReactor from hazelcast.serialization import SerializationServiceV1 from hazelcast.internal.asyncio_statistics import Statistics @@ -274,6 +276,17 @@ async def get_map(self, name: str) -> Map[KeyType, ValueType]: """ return await self._proxy_manager.get_or_create(MAP_SERVICE, name) + async def get_set(self, name: str) -> Set[KeyType]: + """Returns the distributed set instance with the specified name. + + Args: + name: Name of the distributed set. + + Returns: + Distributed set instance with the specified name. + """ + return await self._proxy_manager.get_or_create(SET_SERVICE, name) + async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueType]: """Returns the distributed ReplicatedMap instance with the specified name. diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 812d8eeb72..8989003f1f 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -2,6 +2,7 @@ import typing from hazelcast.internal.asyncio_proxy.list import create_list_proxy +from hazelcast.internal.asyncio_proxy.set import create_set_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( VectorCollection, create_vector_collection_proxy, @@ -16,6 +17,7 @@ LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" +SET_SERVICE = "hz:impl:setService" VECTOR_SERVICE = "hz:service:vector" _proxy_init: typing.Dict[ @@ -25,6 +27,7 @@ LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, + SET_SERVICE: create_set_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/set.py b/hazelcast/internal/asyncio_proxy/set.py new file mode 100644 index 0000000000..92fd7ac7b1 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/set.py @@ -0,0 +1,285 @@ +import typing + +from hazelcast.protocol.codec import ( + set_add_all_codec, + set_add_codec, + set_add_listener_codec, + set_clear_codec, + set_compare_and_remove_all_codec, + set_compare_and_retain_all_codec, + set_contains_all_codec, + set_contains_codec, + set_get_all_codec, + set_is_empty_codec, + set_remove_codec, + set_remove_listener_codec, + set_size_codec, +) +from hazelcast.internal.asyncio_proxy.base import ( + PartitionSpecificProxy, + ItemEvent, + ItemEventType, +) +from hazelcast.types import ItemType +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_not_none, deserialize_list_in_place + + +class Set(PartitionSpecificProxy, typing.Generic[ItemType]): + """Concurrent, distributed implementation of Set. + + Example: + >>> my_set = await client.get_set("my_set") + >>> print("set.add", await my_set.add("item")) + >>> print("set.size", await my_set.size()) + + Warning: + Asyncio client set proxy is not thread-safe, do not access it from other threads. + """ + + async def add(self, item: ItemType) -> bool: + """Adds the specified item if it is not exists in this set. + + Args: + item: The specified item to be added. + + Returns: + ``True`` if this set is changed after call, ``False`` otherwise. + """ + check_not_none(item, "Value can't be None") + try: + element_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add, item) + + request = set_add_codec.encode_request(self.name, element_data) + return await self._invoke(request, set_add_codec.decode_response) + + async def add_all(self, items: typing.Sequence[ItemType]) -> bool: + """Adds the elements in the specified collection if they're not exist + in this set. + + Args: + items: Collection which includes the items to be added. + + Returns: + ``True`` if this set is changed after call, ``False`` otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add_all, items) + + request = set_add_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, set_add_all_codec.decode_response) + + async def add_listener( + self, + include_value: bool = False, + item_added_func: typing.Callable[[ItemEvent[ItemType]], None] = None, + item_removed_func: typing.Callable[[ItemEvent[ItemType]], None] = None, + ) -> str: + """Adds an item listener for this container. + + Listener will be notified for all container add/remove events. + + Args: + include_value: Whether received events include the updated item or + not. + item_added_func: Function to be called when an item is added to + this set. + item_removed_func: Function to be called when an item is deleted + from this set. + + Returns: + A registration id which is used as a key to remove the listener. + """ + request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart) + + def handle_event_item(item_data, uuid, event_type): + item = self._to_object(item_data) if include_value else None + member = self._context.cluster_service.get_member(uuid) + + item_event = ItemEvent(self.name, item, event_type, member) + if event_type == ItemEventType.ADDED: + if item_added_func: + item_added_func(item_event) + else: + if item_removed_func: + item_removed_func(item_event) + + return await self._register_listener( + request, + lambda r: set_add_listener_codec.decode_response(r), + lambda reg_id: set_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: set_add_listener_codec.handle(m, handle_event_item), + ) + + async def clear(self) -> None: + """Clears the set. Set will be empty with this call.""" + request = set_clear_codec.encode_request(self.name) + return await self._invoke(request) + + async def contains(self, item: ItemType) -> bool: + """Determines whether this set contains the specified item or not. + + Args: + item: The specified item to be searched. + + Returns: + ``True`` if the specified item exists in this set, ``False`` + otherwise. + """ + check_not_none(item, "Value can't be None") + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains, item) + + request = set_contains_codec.encode_request(self.name, item_data) + return await self._invoke(request, set_contains_codec.decode_response) + + async def contains_all(self, items: typing.Sequence[ItemType]) -> bool: + """Determines whether this set contains all items in the specified + collection or not. + + Args: + items: The specified collection which includes the items to be + searched. + + Returns: + ``True`` if all the items in the specified collection exist in + this set, ``False`` otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains_all, items) + + request = set_contains_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, set_contains_all_codec.decode_response) + + async def get_all(self) -> typing.List[ItemType]: + """Returns all the items in the set. + + Returns: + List of the items in this set. + """ + + def handler(message): + data_list = set_get_all_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) + + request = set_get_all_codec.encode_request(self.name) + return await self._invoke(request, handler) + + async def is_empty(self) -> bool: + """Determines whether this set is empty or not. + + Returns: + ``True`` if this set is empty, ``False`` otherwise. + """ + request = set_is_empty_codec.encode_request(self.name) + return await self._invoke(request, set_is_empty_codec.decode_response) + + async def remove(self, item: ItemType) -> bool: + """Removes the specified element from the set if it exists. + + Args: + item: The specified element to be removed. + + Returns: + ``True`` if the specified element exists in this set, ``False`` + otherwise. + """ + check_not_none(item, "Value can't be None") + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.remove, item) + + request = set_remove_codec.encode_request(self.name, item_data) + return await self._invoke(request, set_remove_codec.decode_response) + + async def remove_all(self, items: typing.Sequence[ItemType]) -> bool: + """Removes all of the elements of the specified collection from this + set. + + Args: + items: The specified collection. + + Returns: + ``True`` if the call changed this set, ``False`` otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.remove_all, items) + + request = set_compare_and_remove_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, set_compare_and_remove_all_codec.decode_response) + + async def remove_listener(self, registration_id: str) -> bool: + """Removes the specified item listener. + + Returns silently if the specified listener was not added before. + + Args: + registration_id: Id of the listener to be deleted. + + Returns: + ``True`` if the item listener is removed, ``False`` otherwise. + """ + return await self._deregister_listener(registration_id) + + async def retain_all(self, items: typing.Sequence[ItemType]) -> bool: + """Removes the items which are not contained in the specified + collection. + + In other words, only the items that are contained in the specified + collection will be retained. + + Args: + items: Collection which includes the elements to be retained in + this set. + + Returns: + ``True`` if this set changed as a result of the call, ``False`` + otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.retain_all, items) + + request = set_compare_and_retain_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, set_compare_and_retain_all_codec.decode_response) + + async def size(self) -> int: + """Returns the number of items in this set. + + Returns: + Number of items in this set. + """ + request = set_size_codec.encode_request(self.name) + return await self._invoke(request, set_size_codec.decode_response) + + +async def create_set_proxy(service_name, name, context): + return Set(service_name, name, context) diff --git a/tests/integration/asyncio/proxy/set_test.py b/tests/integration/asyncio/proxy/set_test.py new file mode 100644 index 0000000000..2f8eb14e18 --- /dev/null +++ b/tests/integration/asyncio/proxy/set_test.py @@ -0,0 +1,174 @@ +from hazelcast.internal.asyncio_proxy.base import ItemEventType +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string, event_collector + + +class SetTest(SingleMemberTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.set = await self.client.get_set(random_string()) + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncTearDown(self): + await self.set.destroy() + await super().asyncTearDown() + + async def test_add_entry_listener_item_added(self): + collector = event_collector() + await self.set.add_listener(include_value=False, item_added_func=collector) + await self.set.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, None) + self.assertEqual(event.event_type, ItemEventType.ADDED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_added_include_value(self): + collector = event_collector() + await self.set.add_listener(include_value=True, item_added_func=collector) + await self.set.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, "item-value") + self.assertEqual(event.event_type, ItemEventType.ADDED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_removed(self): + collector = event_collector() + await self.set.add_listener(include_value=False, item_removed_func=collector) + await self.set.add("item-value") + await self.set.remove("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, None) + self.assertEqual(event.event_type, ItemEventType.REMOVED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_removed_include_value(self): + collector = event_collector() + await self.set.add_listener(include_value=True, item_removed_func=collector) + await self.set.add("item-value") + await self.set.remove("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, "item-value") + self.assertEqual(event.event_type, ItemEventType.REMOVED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_remove_entry_listener_item_added(self): + collector = event_collector() + reg_id = await self.set.add_listener(include_value=False, item_added_func=collector) + await self.set.remove_listener(reg_id) + await self.set.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 0) + if len(collector.events) > 0: + event = collector.events[0] + self.assertEqual(event.item, None) + self.assertEqual(event.event_type, ItemEventType.ADDED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add(self): + add_resp = await self.set.add("Test") + result = await self.set.contains("Test") + self.assertTrue(add_resp) + self.assertTrue(result) + + async def test_add_null_element(self): + with self.assertRaises(AssertionError): + await self.set.add(None) + + async def test_add_all(self): + _all = ["1", "2", "3"] + add_resp = await self.set.add_all(_all) + set_all = await self.set.get_all() + self.assertCountEqual(_all, set_all) + self.assertTrue(add_resp) + + async def test_add_all_null_element(self): + _all = ["1", "2", "3", None] + with self.assertRaises(AssertionError): + await self.set.add_all(_all) + + async def test_add_all_null_elements(self): + with self.assertRaises(AssertionError): + await self.set.add_all(None) + + async def test_clear(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + size = await self.set.size() + await self.set.clear() + size_cleared = await self.set.size() + self.assertEqual(size, len(_all)) + self.assertEqual(size_cleared, 0) + + async def test_contains(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + contains_result = await self.set.contains("2") + self.assertTrue(contains_result) + + async def test_contains_all(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + contains_result = await self.set.contains_all(_all) + self.assertTrue(contains_result) + + async def test_get_all(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + all_result = await self.set.get_all() + self.assertCountEqual(all_result, _all) + + async def test_is_empty(self): + is_empty = await self.set.is_empty() + self.assertTrue(is_empty) + + async def test_remove(self): + await self.set.add("Test") + remove_result = await self.set.remove("Test") + size = await self.set.size() + self.assertTrue(remove_result) + self.assertEqual(size, 0) + + async def test_remove_all(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + await self.set.remove_all(["2", "3"]) + result = await self.set.get_all() + self.assertEqual(result, ["1"]) + + async def test_retain_all(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + await self.set.retain_all(["2", "3"]) + result = await self.set.get_all() + self.assertEqual(result, ["2", "3"]) + + async def test_size(self): + _all = ["1", "2", "3"] + await self.set.add_all(_all) + size = await self.set.size() + self.assertEqual(size, len(_all)) + + def test_str(self): + self.assertTrue(str(self.set).startswith("Set"))