diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 4377234c42..a8f2aca03b 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -26,12 +26,14 @@ MAP_SERVICE, MULTI_MAP_SERVICE, ProxyManager, + QUEUE_SERVICE, REPLICATED_MAP_SERVICE, VECTOR_SERVICE, ) from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.multi_map import MultiMap +from hazelcast.internal.asyncio_proxy.queue import Queue from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap from hazelcast.internal.asyncio_reactor import AsyncioReactor from hazelcast.serialization import SerializationServiceV1 @@ -285,6 +287,17 @@ async def get_multi_map(self, name: str) -> MultiMap[KeyType, ValueType]: """ return await self._proxy_manager.get_or_create(MULTI_MAP_SERVICE, name) + async def get_queue(self, name: str) -> Queue[KeyType]: + """Returns the distributed queue instance with the specified name. + + Args: + name: Name of the distributed queue. + + Returns: + Distributed queue instance with the specified name. + """ + return await self._proxy_manager.get_or_create(QUEUE_SERVICE, name) + async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueType]: """Returns the distributed ReplicatedMap instance with the specified name. diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 878bc934b2..e2ec2a88a1 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -3,6 +3,7 @@ from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy +from hazelcast.internal.asyncio_proxy.queue import create_queue_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( VectorCollection, create_vector_collection_proxy, @@ -17,6 +18,7 @@ LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" MULTI_MAP_SERVICE = "hz:impl:multiMapService" +QUEUE_SERVICE = "hz:impl:queueService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" VECTOR_SERVICE = "hz:service:vector" @@ -27,6 +29,7 @@ LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, MULTI_MAP_SERVICE: create_multi_map_proxy, + QUEUE_SERVICE: create_queue_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/queue.py b/hazelcast/internal/asyncio_proxy/queue.py new file mode 100644 index 0000000000..60f9bf3ee2 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/queue.py @@ -0,0 +1,427 @@ +import typing + +from hazelcast.errors import IllegalStateError +from hazelcast.protocol.codec import ( + queue_add_all_codec, + queue_add_listener_codec, + queue_clear_codec, + queue_compare_and_remove_all_codec, + queue_compare_and_retain_all_codec, + queue_contains_all_codec, + queue_contains_codec, + queue_drain_to_max_size_codec, + queue_is_empty_codec, + queue_iterator_codec, + queue_offer_codec, + queue_peek_codec, + queue_poll_codec, + queue_put_codec, + queue_remaining_capacity_codec, + queue_remove_codec, + queue_remove_listener_codec, + queue_size_codec, + queue_take_codec, +) +from hazelcast.internal.asyncio_proxy.base import ( + PartitionSpecificProxy, + ItemEvent, + ItemEventType, +) +from hazelcast.types import ItemType +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_not_none, to_millis, deserialize_list_in_place + + +class Queue(PartitionSpecificProxy, typing.Generic[ItemType]): + """Concurrent, blocking, distributed, observable queue. + + Queue is not a partitioned data-structure. All of the Queue content is + stored in a single machine (and in the backup). Queue will not scale by + adding more members in the cluster. + + Example: + >>> my_queue = await client.get_queue("my_queue") + >>> print("queue.offer", await my_queue.offer("item")) + >>> print("queue.size", await my_queue.size()) + + Warning: + Asyncio client queue proxy is not thread-safe, do not access it from other threads. + """ + + async def add(self, item: ItemType) -> bool: + """Adds the specified item to this queue if there is available space. + + Args: + item: The specified item. + + Returns: + ``True`` if element is successfully added, ``False`` otherwise. + + Raises: + IllegalStateError: If queue is full. + """ + if not await self.offer(item): + raise IllegalStateError("Queue is full!") + return True + + async def add_all(self, items: typing.Sequence[ItemType]) -> bool: + """Adds the elements in the specified collection to this queue. + + Args: + items: Collection which includes the items to be added. + + Returns: + ``True`` if this queue is changed after call, ``False`` otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add_all, items) + + request = queue_add_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, queue_add_all_codec.decode_response) + + async def add_listener( + self, + include_value: bool = False, + item_added_func: typing.Callable[[ItemEvent[ItemType]], None] = None, + item_removed_func: typing.Callable[[ItemEvent[ItemType]], None] = None, + ) -> str: + """Adds an item listener for this queue. Listener will be notified for + all queue add/remove events. + + Args: + include_value: Whether received events include the updated item or + not. + item_added_func: Function to be called when an item is added to + this queue. + item_removed_func: Function to be called when an item is deleted + from this queue. + + Returns: + A registration id which is used as a key to remove the listener. + """ + codec = queue_add_listener_codec + request = codec.encode_request(self.name, include_value, self._is_smart) + + def handle_event_item(item_data, uuid, event_type): + item = self._to_object(item_data) if include_value else None + member = self._context.cluster_service.get_member(uuid) + + item_event = ItemEvent(self.name, item, event_type, member) + if event_type == ItemEventType.ADDED: + if item_added_func: + item_added_func(item_event) + else: + if item_removed_func: + item_removed_func(item_event) + + return await self._register_listener( + request, + lambda r: codec.decode_response(r), + lambda reg_id: queue_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: codec.handle(m, handle_event_item), + ) + + async def clear(self) -> None: + """Clears this queue. Queue will be empty after this call.""" + request = queue_clear_codec.encode_request(self.name) + return await self._invoke(request) + + async def contains(self, item: ItemType) -> bool: + """Determines whether this queue contains the specified item or not. + + Args: + item: The specified item to be searched. + + Returns: + ``True`` if the specified item exists in this queue, ``False`` + otherwise. + """ + check_not_none(item, "Item can't be None") + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains, item) + + request = queue_contains_codec.encode_request(self.name, item_data) + return await self._invoke(request, queue_contains_codec.decode_response) + + async def contains_all(self, items: typing.Sequence[ItemType]) -> bool: + """Determines whether this queue contains all of the items in the + specified collection or not. + + Args: + items: The specified collection which includes the items to be + searched. + + Returns: + ``True`` if all of the items in the specified collection exist in + this queue, ``False`` otherwise. + """ + check_not_none(items, "Items can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "item can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains_all, items) + + request = queue_contains_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, queue_contains_all_codec.decode_response) + + async def drain_to(self, target_list: typing.List[ItemType], max_size: int = -1) -> int: + """Transfers all available items to the given `target_list` and removes + these items from this queue. + + If a max_size is specified, it transfers at most the given number of + items. In case of a failure, an item can exist in both collections or + none of them. + + This operation may be more efficient than polling elements repeatedly + and putting into collection. + + Args: + target_list: the list where the items in this queue will be + transferred. + max_size: The maximum number items to transfer. + + Returns: + Number of transferred items. + """ + + def handler(message): + response = queue_drain_to_max_size_codec.decode_response(message) + items = [self._to_object(item) for item in response] + target_list.extend(items) + return len(response) + + request = queue_drain_to_max_size_codec.encode_request(self.name, max_size) + return await self._invoke(request, handler) + + async def iterator(self) -> typing.List[ItemType]: + """Returns all the items in this queue. + + Returns: + Collection of items in this queue. + """ + + def handler(message): + data_list = queue_iterator_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) + + request = queue_iterator_codec.encode_request(self.name) + return await self._invoke(request, handler) + + async def is_empty(self) -> bool: + """Determines whether this queue is empty or not. + + Returns: + ``True`` if this queue is empty, ``False`` otherwise. + """ + request = queue_is_empty_codec.encode_request(self.name) + return await self._invoke(request, queue_is_empty_codec.decode_response) + + async def offer(self, item: ItemType, timeout: float = 0) -> bool: + """Inserts the specified element into this queue if it is possible to + do so immediately without violating capacity restrictions. + + If there is no space currently available: + + - If the timeout is provided, it waits until this timeout elapses + and returns the result. + - If the timeout is not provided, returns ``False`` immediately. + + Args: + item: The item to be added. + timeout: Maximum time in seconds to wait for addition. + + Returns: + ``True`` if the element was added to this queue, ``False`` + otherwise. + """ + check_not_none(item, "Value can't be None") + try: + element_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.offer, item, timeout) + + request = queue_offer_codec.encode_request(self.name, element_data, to_millis(timeout)) + return await self._invoke(request, queue_offer_codec.decode_response) + + async def peek(self) -> typing.Optional[ItemType]: + """Retrieves the head of queue without removing it from the queue. + + Returns: + The head of this queue, or ``None`` if this queue is empty. + """ + + def handler(message): + return self._to_object(queue_peek_codec.decode_response(message)) + + request = queue_peek_codec.encode_request(self.name) + return await self._invoke(request, handler) + + async def poll(self, timeout: float = 0) -> typing.Optional[ItemType]: + """Retrieves and removes the head of this queue. + + If this queue is empty: + + - If the timeout is provided, it waits until this timeout elapses + and returns the result. + - If the timeout is not provided, returns ``None``. + + Args: + timeout: Maximum time in seconds to wait for addition. + + Returns: + The head of this queue, or ``None`` if this queue is empty or + specified timeout elapses before an item is added to the queue. + """ + + def handler(message): + return self._to_object(queue_poll_codec.decode_response(message)) + + request = queue_poll_codec.encode_request(self.name, to_millis(timeout)) + return await self._invoke(request, handler) + + async def put(self, item: ItemType) -> None: + """Adds the specified element into this queue. + + If there is no space, it waits until necessary space becomes available. + + Args: + item: The specified item. + """ + check_not_none(item, "Value can't be None") + try: + element_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.put, item) + + request = queue_put_codec.encode_request(self.name, element_data) + return await self._invoke(request) + + async def remaining_capacity(self) -> int: + """Returns the remaining capacity of this queue. + + Returns: + Remaining capacity of this queue. + """ + request = queue_remaining_capacity_codec.encode_request(self.name) + return await self._invoke(request, queue_remaining_capacity_codec.decode_response) + + async def remove(self, item: ItemType) -> bool: + """Removes the specified element from the queue if it exists. + + Args: + item: The specified element to be removed. + + Returns: + ``True`` if the specified element exists in this queue, ``False`` + otherwise. + """ + check_not_none(item, "Value can't be None") + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.remove, item) + + request = queue_remove_codec.encode_request(self.name, item_data) + return await self._invoke(request, queue_remove_codec.decode_response) + + async def remove_all(self, items: typing.Sequence[ItemType]) -> bool: + """Removes all of the elements of the specified collection from this + queue. + + Args: + items: The specified collection. + + Returns: + ``True`` if the call changed this queue, ``False`` otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.remove_all, items) + + request = queue_compare_and_remove_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, queue_compare_and_remove_all_codec.decode_response) + + async def remove_listener(self, registration_id: str) -> bool: + """Removes the specified item listener. + + Returns silently if the specified listener was not added before. + + Args: + registration_id: Id of the listener to be deleted. + + Returns: + ``True`` if the item listener is removed, ``False`` otherwise. + """ + return await self._deregister_listener(registration_id) + + async def retain_all(self, items: typing.Sequence[ItemType]) -> bool: + """Removes the items which are not contained in the specified + collection. + + In other words, only the items that are contained in the specified + collection will be retained. + + Args: + items: Collection which includes the elements to be retained in + this queue. + + Returns: + ``True`` if this queue changed as a result of the call, ``False`` + otherwise. + """ + check_not_none(items, "Value can't be None") + try: + data_items = [] + for item in items: + check_not_none(item, "Value can't be None") + data_items.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.retain_all, items) + + request = queue_compare_and_retain_all_codec.encode_request(self.name, data_items) + return await self._invoke(request, queue_compare_and_retain_all_codec.decode_response) + + async def size(self) -> int: + """Returns the number of elements in this collection. + + If the size is greater than ``2**31 - 1``, it returns ``2**31 - 1``. + + Returns: + Size of the queue. + """ + request = queue_size_codec.encode_request(self.name) + return await self._invoke(request, queue_size_codec.decode_response) + + async def take(self) -> ItemType: + """Retrieves and removes the head of this queue, if necessary, waits + until an item becomes available. + + Returns: + The head of this queue. + """ + + def handler(message): + return self._to_object(queue_take_codec.decode_response(message)) + + request = queue_take_codec.encode_request(self.name) + return await self._invoke(request, handler) + + +async def create_queue_proxy(service_name, name, context): + return Queue(service_name, name, context) diff --git a/tests/integration/asyncio/proxy/queue_test.py b/tests/integration/asyncio/proxy/queue_test.py new file mode 100644 index 0000000000..ac0aa7f267 --- /dev/null +++ b/tests/integration/asyncio/proxy/queue_test.py @@ -0,0 +1,229 @@ +import os + +from hazelcast.errors import IllegalStateError +from hazelcast.internal.asyncio_proxy.base import ItemEventType +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string, event_collector + + +class QueueTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml")) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.queue = await self.client.get_queue("ClientQueueTest_" + random_string()) + + async def asyncTearDown(self): + await self.queue.destroy() + await super().asyncTearDown() + + async def test_add_entry_listener_item_added(self): + collector = event_collector() + await self.queue.add_listener(include_value=False, item_added_func=collector) + await self.queue.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, None) + self.assertEqual(event.event_type, ItemEventType.ADDED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_added_include_value(self): + collector = event_collector() + await self.queue.add_listener(include_value=True, item_added_func=collector) + await self.queue.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, "item-value") + self.assertEqual(event.event_type, ItemEventType.ADDED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_removed(self): + collector = event_collector() + await self.queue.add_listener(include_value=False, item_removed_func=collector) + await self.queue.add("item-value") + await self.queue.remove("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, None) + self.assertEqual(event.event_type, ItemEventType.REMOVED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add_entry_listener_item_removed_include_value(self): + collector = event_collector() + await self.queue.add_listener(include_value=True, item_removed_func=collector) + await self.queue.add("item-value") + await self.queue.remove("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.item, "item-value") + self.assertEqual(event.event_type, ItemEventType.REMOVED) + + await self.assertTrueEventually(assert_event, 5) + + async def test_remove_entry_listener_item_added(self): + collector = event_collector() + reg_id = await self.queue.add_listener(include_value=False, item_added_func=collector) + await self.queue.remove_listener(reg_id) + await self.queue.add("item-value") + + def assert_event(): + self.assertEqual(len(collector.events), 0) + + await self.assertTrueEventually(assert_event, 5) + + async def test_add(self): + add_resp = await self.queue.add("Test") + result = await self.queue.contains("Test") + self.assertTrue(add_resp) + self.assertTrue(result) + + async def test_add_full(self): + _all = ["1", "2", "3", "4", "5", "6"] + await self.queue.add_all(_all) + with self.assertRaises(IllegalStateError): + await self.queue.add("cannot add this one") + + async def test_add_null_element(self): + with self.assertRaises(AssertionError): + await self.queue.add(None) + + async def test_add_all(self): + _all = ["1", "2", "3"] + add_resp = await self.queue.add_all(_all) + q_all = await self.queue.iterator() + self.assertCountEqual(_all, q_all) + self.assertTrue(add_resp) + + async def test_add_all_null_element(self): + _all = ["1", "2", "3", None] + with self.assertRaises(AssertionError): + await self.queue.add_all(_all) + + async def test_add_all_null_elements(self): + with self.assertRaises(AssertionError): + await self.queue.add_all(None) + + async def test_clear(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + size = await self.queue.size() + await self.queue.clear() + size_cleared = await self.queue.size() + self.assertEqual(size, len(_all)) + self.assertEqual(size_cleared, 0) + + async def test_contains(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + contains_result = await self.queue.contains("2") + self.assertTrue(contains_result) + + async def test_contains_all(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + contains_result = await self.queue.contains_all(_all) + self.assertTrue(contains_result) + + async def test_iterator(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + all_result = await self.queue.iterator() + self.assertCountEqual(all_result, _all) + + async def test_is_empty(self): + is_empty = await self.queue.is_empty() + self.assertTrue(is_empty) + + async def test_remaining_capacity(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + capacity = await self.queue.remaining_capacity() + self.assertEqual(capacity, 3) + + async def test_remove(self): + await self.queue.add("Test") + remove_result = await self.queue.remove("Test") + size = await self.queue.size() + self.assertTrue(remove_result) + self.assertEqual(size, 0) + + async def test_remove_all(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + await self.queue.remove_all(["2", "3"]) + result = await self.queue.iterator() + self.assertEqual(result, ["1"]) + + async def test_retain_all(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + await self.queue.retain_all(["2", "3"]) + result = await self.queue.iterator() + self.assertEqual(result, ["2", "3"]) + + async def test_size(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + size = await self.queue.size() + self.assertEqual(size, len(_all)) + + async def test_drain_to(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + drain = [] + size = await self.queue.drain_to(drain) + self.assertCountEqual(drain, _all) + self.assertEqual(size, 3) + + async def test_peek(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + peek_result = await self.queue.peek() + self.assertEqual(peek_result, "1") + + async def test_put(self): + await self.queue.put("Test") + result = await self.queue.contains("Test") + self.assertTrue(result) + + async def test_take(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + take_result = await self.queue.take() + self.assertEqual(take_result, "1") + + async def test_poll(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + poll_result = await self.queue.poll() + self.assertEqual(poll_result, "1") + + async def test_poll_timeout(self): + _all = ["1", "2", "3"] + await self.queue.add_all(_all) + poll_result = await self.queue.poll(1) + self.assertEqual(poll_result, "1") + + def test_str(self): + self.assertTrue(str(self.queue).startswith("Queue"))