From 3bde0ede82f59cabefdc60621d185de2fa4c13a5 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 1 Apr 2026 14:51:03 +0300 Subject: [PATCH 1/5] Added asyncio ringbuffer proxy --- hazelcast/internal/asyncio_client.py | 17 +- hazelcast/internal/asyncio_proxy/manager.py | 3 + .../internal/asyncio_proxy/ringbuffer.py | 299 ++++++++++++++++++ hazelcast/proxy/ringbuffer.py | 2 +- 4 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 hazelcast/internal/asyncio_proxy/ringbuffer.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f6e2d62954..b02d2a7222 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -7,7 +7,7 @@ from hazelcast.internal.asyncio_compact import CompactSchemaService from hazelcast.config import Config, IndexConfig from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAsyncioAddressProvider -from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo +from hazelcast.core import DistributedObjectEvent from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation @@ -18,7 +18,6 @@ from hazelcast.internal.asyncio_partition import PartitionService, InternalPartitionService from hazelcast.protocol.codec import ( client_add_distributed_object_listener_codec, - client_get_distributed_objects_codec, client_remove_distributed_object_listener_codec, dynamic_config_add_vector_collection_config_codec, ) @@ -27,12 +26,13 @@ MAP_SERVICE, ProxyManager, REPLICATED_MAP_SERVICE, + RINGBUFFER_SERVICE, VECTOR_SERVICE, ) -from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer from hazelcast.internal.asyncio_reactor import AsyncioReactor from hazelcast.serialization import SerializationServiceV1 from hazelcast.internal.asyncio_statistics import Statistics @@ -286,6 +286,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_ringbuffer(self, name: str) -> Ringbuffer: + """Returns the distributed Ringbuffer instance with the specified name. + + Args: + name: Name of the distributed ringbuffer. + + Returns: + Distributed Ringbuffer instance with the specified name. + """ + return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name) + async def create_vector_collection_config( self, name: str, diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 812d8eeb72..c34a1659b4 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -11,11 +11,13 @@ from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.map import create_map_proxy from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy +from hazelcast.internal.asyncio_proxy.ringbuffer import create_ringbuffer_proxy from hazelcast.util import to_list LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" +RINGBUFFER_SERVICE = "hz:impl:ringbufferService" VECTOR_SERVICE = "hz:service:vector" _proxy_init: typing.Dict[ @@ -25,6 +27,7 @@ LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, + RINGBUFFER_SERVICE: create_ringbuffer_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/ringbuffer.py b/hazelcast/internal/asyncio_proxy/ringbuffer.py new file mode 100644 index 0000000000..77021e6162 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/ringbuffer.py @@ -0,0 +1,299 @@ +import typing + +from hazelcast.protocol.codec import ( + ringbuffer_add_all_codec, + ringbuffer_add_codec, + ringbuffer_capacity_codec, + ringbuffer_head_sequence_codec, + ringbuffer_read_many_codec, + ringbuffer_read_one_codec, + ringbuffer_remaining_capacity_codec, + ringbuffer_size_codec, + ringbuffer_tail_sequence_codec, +) +from hazelcast.internal.asyncio_proxy.base import PartitionSpecificProxy +from hazelcast.proxy.ringbuffer import ReadResult, OVERFLOW_POLICY_OVERWRITE, MAX_BATCH_SIZE +from hazelcast.types import ItemType +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import ( + check_not_negative, + check_not_none, + check_not_empty, + check_true, + deserialize_list_in_place, +) + + +class Ringbuffer(PartitionSpecificProxy, typing.Generic[ItemType]): + """A Ringbuffer is an append-only data-structure where the content is + stored in a ring like structure. + + A ringbuffer has a capacity so it won't grow beyond that capacity and + endanger the stability of the system. If that capacity is exceeded, then + the oldest item in the ringbuffer is overwritten. The ringbuffer has two + always incrementing sequences: + + - :func:`tail_sequence`: This is the side where the youngest item is found. + So the tail is the side of the ringbuffer where items are added to. + - :func:`head_sequence`: This is the side where the oldest items are found. + So the head is the side where items gets discarded. + + The items in the ringbuffer can be found by a sequence that is in between + (inclusive) the head and tail sequence. + + If data is read from a ringbuffer with a sequence that is smaller than the + head sequence, it means that the data is not available anymore and a + :class:`hazelcast.errors.StaleSequenceError` is thrown. + + A Ringbuffer currently is a replicated, but not partitioned data structure. + So all data is stored in a single partition, similarly to the + :class:`hazelcast.internal.asyncio_proxy.queue.Queue` implementation. + + A Ringbuffer can be used in a way similar to the Queue, but one of the key + differences is that a :func:`hazelcast.internal.asyncio_proxy.queue.Queue.take` + is destructive, meaning that only 1 consumer is able to take an item. + A :func:`read_one` is not destructive, so you can have multiple consumers reading the + same item multiple times. + + Example: + >>> rb = await client.get_ringbuffer("my_ringbuffer") + >>> await rb.add("item") + >>> print("read_one", await rb.read_one(0)) + """ + + def __init__(self, service_name, name, context): + super(Ringbuffer, self).__init__(service_name, name, context) + self._capacity = None + + async def capacity(self) -> int: + """Returns the capacity of this Ringbuffer. + + Returns: + The capacity of Ringbuffer. + """ + if not self._capacity: + + def handler(message): + self._capacity = ringbuffer_capacity_codec.decode_response(message) + return self._capacity + + request = ringbuffer_capacity_codec.encode_request(self.name) + return await self._invoke(request, handler) + + return self._capacity + + async def size(self) -> int: + """Returns number of items in the Ringbuffer. + + Returns: + The size of Ringbuffer. + """ + request = ringbuffer_size_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_size_codec.decode_response) + + async def tail_sequence(self) -> int: + """Returns the sequence of the tail. + + The tail is the side of the Ringbuffer where the items are added to. + The initial value of the tail is ``-1``. + + Returns: + The sequence of the tail. + """ + request = ringbuffer_tail_sequence_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_tail_sequence_codec.decode_response) + + async def head_sequence(self) -> int: + """Returns the sequence of the head. + + The head is the side of the Ringbuffer where the oldest items in the + Ringbuffer are found. If the Ringbuffer is empty, the head will be one + more than the tail. The initial value of the head is ``0`` (``1`` more + than tail). + + Returns: + The sequence of the head. + """ + request = ringbuffer_head_sequence_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_head_sequence_codec.decode_response) + + async def remaining_capacity(self) -> int: + """Returns the remaining capacity of the Ringbuffer. + + Returns: + The remaining capacity of Ringbuffer. + """ + request = ringbuffer_remaining_capacity_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_remaining_capacity_codec.decode_response) + + async def add(self, item, overflow_policy: int = OVERFLOW_POLICY_OVERWRITE) -> int: + """Adds the specified item to the tail of the Ringbuffer. + + If there is no space in the Ringbuffer, the action is determined by + ``overflow_policy``. + + Args: + item: The specified item to be added. + overflow_policy: the OverflowPolicy to be used when there is no + space. + + Returns: + The sequenceId of the added item, or ``-1`` if the add failed. + """ + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add, item, overflow_policy) + + request = ringbuffer_add_codec.encode_request(self.name, overflow_policy, item_data) + return await self._invoke(request, ringbuffer_add_codec.decode_response) + + async def add_all( + self, + items: typing.Sequence[ItemType], + overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, + ) -> int: + """Adds all of the item in the specified collection to the tail of the + Ringbuffer. + + This is likely to outperform multiple calls to :func:`add` due + to better io utilization and a reduced number of executed operations. + The items are added in the order of the Iterator of the collection. + + If there is no space in the Ringbuffer, the action is determined by + ``overflow_policy``. + + Args: + items: The specified collection which contains the items to be + added. + overflow_policy: The OverflowPolicy to be used when there is no + space. + + Returns: + The sequenceId of the last written item, or ``-1`` of the last + write is failed. + """ + check_not_empty(items, "items can't be empty") + if len(items) > MAX_BATCH_SIZE: + raise AssertionError("Batch size can't be greater than %d" % MAX_BATCH_SIZE) + + try: + item_data_list = [] + for item in items: + check_not_none(item, "item can't be None") + item_data_list.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add_all, items, overflow_policy) + + request = ringbuffer_add_all_codec.encode_request( + self.name, item_data_list, overflow_policy + ) + return await self._invoke(request, ringbuffer_add_all_codec.decode_response) + + async def read_one(self, sequence: int) -> ItemType: + """Reads one item from the Ringbuffer. + + If the sequence is one beyond the current tail, this call blocks until + an item is added. Currently it isn't possible to control how long + this call is going to block. + + Args: + sequence: The sequence of the item to read. + + Returns: + The read item. + """ + check_not_negative(sequence, "sequence can't be smaller than 0") + + def handler(message): + return self._to_object(ringbuffer_read_one_codec.decode_response(message)) + + request = ringbuffer_read_one_codec.encode_request(self.name, sequence) + return await self._invoke(request, handler) + + async def read_many( + self, start_sequence: int, min_count: int, max_count: int, filter: typing.Any = None + ) -> ReadResult: + """Reads a batch of items from the Ringbuffer. + + If the number of available items after the first read item is smaller + than the ``max_count``, these items are returned. So it could be the + number of items read is smaller than the ``max_count``. If there are + less items available than ``min_count``, then this call blocks. + + Warnings: + These blocking calls consume server memory and if there are many + calls, it can be possible to see leaking memory or + ``OutOfMemoryError`` s on the server. + + Reading a batch of items is likely to perform better because less + overhead is involved. + + A filter can be provided to only select items that need to be read. If + the filter is ``None``, all items are read. If the filter is not + ``None``, only items where the filter function returns true are + returned. Using filters is a good way to prevent getting items that + are of no value to the receiver. This reduces the amount of IO and the + number of operations being executed, and can result in a significant + performance improvement. Note that, filtering logic must be defined + on the server-side. + + If the ``start_sequence`` is smaller than the smallest sequence still + available in the Ringbuffer (:func:`head_sequence`), then the smallest + available sequence will be used as the start sequence and the + minimum/maximum number of items will be attempted to be read from there + on. + + If the ``start_sequence`` is bigger than the last available sequence + in the Ringbuffer (:func:`tail_sequence`), then the last available + sequence plus one will be used as the start sequence and the call will + block until further items become available and it can read at least the + minimum number of items. + + Args: + start_sequence: The start sequence of the first item to read. + min_count: The minimum number of items to read. + max_count: The maximum number of items to read. + filter: Filter to select returned elements. + + Returns: + The list of read items. + """ + check_not_negative(start_sequence, "sequence can't be smaller than 0") + check_not_negative(min_count, "min count can't be smaller than 0") + check_true(max_count >= min_count, "max count should be greater or equal to min count") + check_true( + max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE + ) + try: + filter_data = self._to_data(filter) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry( + e, self.read_many, start_sequence, min_count, max_count, filter + ) + + # Since the first call to capacity is cached on the client-side, + # doing a capacity check each time should not be a problem. + capacity = await self.capacity() + check_true( + max_count <= capacity, + "max count: %d should be smaller or equal to capacity: %d" % (max_count, capacity), + ) + + request = ringbuffer_read_many_codec.encode_request( + self.name, start_sequence, min_count, max_count, filter_data + ) + + def handler(message): + response = ringbuffer_read_many_codec.decode_response(message) + items = deserialize_list_in_place(response["items"], self._to_object) + read_count = response["read_count"] + next_seq = response["next_seq"] + item_seqs = response["item_seqs"] + return ReadResult(read_count, next_seq, item_seqs, items) + + return await self._invoke(request, handler) + + +async def create_ringbuffer_proxy(service_name, name, context): + return Ringbuffer(service_name, name, context) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 73669b04e6..710281c68d 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -152,7 +152,7 @@ class Ringbuffer(PartitionSpecificProxy["BlockingRingbuffer"], typing.Generic[It stored in a ring like structure. A ringbuffer has a capacity so it won't grow beyond that capacity and - endanger the stability of the system. If that capacity is exceeded, than + endanger the stability of the system. If that capacity is exceeded, then the oldest item in the ringbuffer is overwritten. The ringbuffer has two always incrementing sequences: From 4cfaaaa208af1db71323d850d2fdcbd98f91fdc8 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 1 Apr 2026 16:15:06 +0300 Subject: [PATCH 2/5] Some doc improvements --- .../internal/asyncio_proxy/ringbuffer.py | 17 +++++++------- hazelcast/proxy/ringbuffer.py | 23 ++++++++----------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/ringbuffer.py b/hazelcast/internal/asyncio_proxy/ringbuffer.py index 77021e6162..da1dcf5471 100644 --- a/hazelcast/internal/asyncio_proxy/ringbuffer.py +++ b/hazelcast/internal/asyncio_proxy/ringbuffer.py @@ -153,7 +153,7 @@ async def add_all( items: typing.Sequence[ItemType], overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, ) -> int: - """Adds all of the item in the specified collection to the tail of the + """Adds all items in the specified collection to the tail of the Ringbuffer. This is likely to outperform multiple calls to :func:`add` due @@ -194,7 +194,7 @@ async def read_one(self, sequence: int) -> ItemType: """Reads one item from the Ringbuffer. If the sequence is one beyond the current tail, this call blocks until - an item is added. Currently it isn't possible to control how long + an item is added. Currently, it isn't possible to control how long this call is going to block. Args: @@ -217,21 +217,20 @@ async def read_many( """Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller - than the ``max_count``, these items are returned. So it could be the - number of items read is smaller than the ``max_count``. If there are - less items available than ``min_count``, then this call blocks. + than ``max_count``, these items are returned. So, number of items + read may be smaller than ``max_count``. If there are + fewer items available than ``min_count``, then this call blocks. Warnings: These blocking calls consume server memory and if there are many - calls, it can be possible to see leaking memory or - ``OutOfMemoryError`` s on the server. + calls, an ``OutOfMemoryError`` may be thrown on server-side. Reading a batch of items is likely to perform better because less overhead is involved. - A filter can be provided to only select items that need to be read. If + A filter can be provided to select items that need to be read. If the filter is ``None``, all items are read. If the filter is not - ``None``, only items where the filter function returns true are + ``None``, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 710281c68d..94bcec17bd 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -271,7 +271,7 @@ def add_all( items: typing.Sequence[ItemType], overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, ) -> Future[int]: - """Adds all of the item in the specified collection to the tail of the + """Adds all items in the specified collection to the tail of the Ringbuffer. This is likely to outperform multiple calls to :func:`add` due @@ -312,7 +312,7 @@ def read_one(self, sequence: int) -> Future[ItemType]: """Reads one item from the Ringbuffer. If the sequence is one beyond the current tail, this call blocks until - an item is added. Currently it isn't possible to control how long + an item is added. Currently, it isn't possible to control how long this call is going to block. Args: @@ -335,21 +335,20 @@ def read_many( """Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller - than the ``max_count``, these items are returned. So it could be the - number of items read is smaller than the ``max_count``. If there are - less items available than ``min_count``, then this call blocks. + than ``max_count``, these items are returned. So, number of items + read may be smaller than ``max_count``. If there are + fewer items available than ``min_count``, then this call blocks. Warnings: These blocking calls consume server memory and if there are many - calls, it can be possible to see leaking memory or - ``OutOfMemoryError`` s on the server. + calls, an ``OutOfMemoryError`` may be thrown on server-side. Reading a batch of items is likely to perform better because less overhead is involved. - A filter can be provided to only select items that need to be read. If + A filter can be provided to select items that need to be read. If the filter is ``None``, all items are read. If the filter is not - ``None``, only items where the filter function returns true are + ``None``, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant @@ -404,10 +403,8 @@ def handler(message): return ReadResult(read_count, next_seq, item_seqs, items) def continuation(future): - # Since the first call to capacity - # is cached on the client-side, doing - # a capacity check each time should not - # be a problem + # Since the first call to capacity is cached on the client-side, + # doing a capacity check each time should not be a problem. capacity = future.result() check_true( From 14e751cac91a6a58ee03bc7825ce1e53b08cacca Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 08:50:49 +0300 Subject: [PATCH 3/5] Ported ringbuffer test --- .../asyncio/proxy/ringbuffer_test.py | 307 ++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 tests/integration/asyncio/proxy/ringbuffer_test.py diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py new file mode 100644 index 0000000000..8c5ea7ae3b --- /dev/null +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -0,0 +1,307 @@ +import asyncio +import os +import unittest + +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, MAX_BATCH_SIZE +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string, compare_client_version + +CAPACITY = 10 + + +class RingBufferTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + xml_path = os.path.join( + dir_path, "../../backward_compatible/proxy/hazelcast.xml" + ) + with open(xml_path) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.ringbuffer = await self.client.get_ringbuffer( + "ClientRingbufferTestWithTTL-" + random_string() + ) + + async def asyncTearDown(self): + await self.ringbuffer.destroy() + await super().asyncTearDown() + + async def test_capacity(self): + self.assertEqual(await self.ringbuffer.capacity(), CAPACITY) + + async def test_add_size(self): + self.assertEqual(0, await self.ringbuffer.add("value")) + self.assertEqual(1, await self.ringbuffer.add("value")) + self.assertEqual(2, await self.ringbuffer.add("value")) + + self.assertEqual(3, await self.ringbuffer.size()) + + async def test_add_when_full(self): + await self.fill_ringbuffer() + + self.assertEqual(-1, await self.ringbuffer.add(CAPACITY + 1, OVERFLOW_POLICY_FAIL)) + + async def test_add_all(self): + self.assertEqual(CAPACITY - 1, await self.ringbuffer.add_all(list(range(0, CAPACITY)))) + + async def test_add_all_when_full(self): + self.assertEqual( + -1, await self.ringbuffer.add_all(list(range(0, CAPACITY * 2)), OVERFLOW_POLICY_FAIL) + ) + + async def test_add_all_when_empty_list(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.add_all([]) + + async def test_add_all_when_too_large_batch(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.add_all(list(range(0, MAX_BATCH_SIZE + 1))) + + async def test_head_sequence(self): + await self.fill_ringbuffer(CAPACITY * 2) + + self.assertEqual(CAPACITY, await self.ringbuffer.head_sequence()) + + async def test_tail_sequence(self): + await self.fill_ringbuffer(CAPACITY * 2) + + self.assertEqual(CAPACITY * 2 - 1, await self.ringbuffer.tail_sequence()) + + async def test_remaining_capacity(self): + await self.fill_ringbuffer(CAPACITY // 2) + + self.assertEqual(CAPACITY // 2, await self.ringbuffer.remaining_capacity()) + + async def test_read_one(self): + await self.ringbuffer.add("item") + await self.ringbuffer.add("item-2") + await self.ringbuffer.add("item-3") + self.assertEqual("item", await self.ringbuffer.read_one(0)) + self.assertEqual("item-2", await self.ringbuffer.read_one(1)) + self.assertEqual("item-3", await self.ringbuffer.read_one(2)) + + async def test_read_one_negative_sequence(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_one(-1) + + async def test_read_many(self): + await self.fill_ringbuffer(CAPACITY) + items = await self.ringbuffer.read_many(0, 0, CAPACITY) + self.assertEqual(items, list(range(0, CAPACITY))) + + async def test_read_many_when_negative_start_seq(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(-1, 0, CAPACITY) + + async def test_read_many_when_min_count_greater_than_max_count(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, CAPACITY, 0) + + async def test_read_many_when_min_count_greater_than_capacity(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, CAPACITY + 1, CAPACITY + 1) + + async def test_read_many_when_max_count_greater_than_batch_size(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, 0, MAX_BATCH_SIZE + 1) + + async def fill_ringbuffer(self, n=CAPACITY): + for x in range(0, n): + await self.ringbuffer.add(x) + + async def test_str(self): + self.assertTrue(str(self.ringbuffer).startswith("Ringbuffer")) + + +@unittest.skipIf( + compare_client_version("4.1") < 0, "Tests the features added in 4.1 version of the client" +) +class RingbufferReadManyTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + xml_path = os.path.join( + dir_path, "../../backward_compatible/proxy/hazelcast.xml" + ) + with open(xml_path) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.ringbuffer = await self.client.get_ringbuffer( + "ClientRingbufferTestWithTTL-" + random_string() + ) + + async def asyncTearDown(self): + await self.ringbuffer.destroy() + await super().asyncTearDown() + + async def test_when_start_sequence_is_no_longer_available_gets_clamped(self): + await self.fill_ringbuffer(item_count=CAPACITY + 1) + + result_set = await self.ringbuffer.read_many(0, 1, CAPACITY) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(CAPACITY, result_set.size) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) + + for i in range(1, CAPACITY + 1): + self.assertEqual(i, result_set[i - 1]) + self.assertEqual(i, result_set.get_sequence(i - 1)) + + async def test_when_start_sequence_is_equal_to_tail_sequence(self): + await self.fill_ringbuffer() + + result_set = await self.ringbuffer.read_many(CAPACITY - 1, 1, CAPACITY) + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + + async def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY + 1, 1, CAPACITY)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def test_when_min_count_items_are_not_available_then_blocks(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def test_when_some_waiting_needed(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + + await self.ringbuffer.add(CAPACITY) + + await self.assertTrueEventually(lambda: self.assertTrue(task.done())) + + result_set = task.result() + self.assertEqual(2, result_set.read_count) + self.assertEqual(2, result_set.size) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + self.assertEqual(CAPACITY, result_set[1]) + self.assertEqual(CAPACITY, result_set.get_sequence(1)) + + async def test_min_zero_when_item_available(self): + await self.fill_ringbuffer() + + result_set = await self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + + async def test_min_zero_when_no_item_available(self): + result_set = await self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(0, result_set.read_count) + self.assertEqual(0, result_set.size) + + async def test_max_count(self): + # If more results are available than needed, the surplus results + # should not be read. + await self.fill_ringbuffer() + + max_count = CAPACITY // 2 + result_set = await self.ringbuffer.read_many(0, 0, max_count) + self.assertEqual(max_count, result_set.read_count) + self.assertEqual(max_count, result_set.size) + self.assertEqual(max_count, result_set.next_sequence_to_read_from) + + for i in range(max_count): + self.assertEqual(i, result_set[i]) + self.assertEqual(i, result_set.get_sequence(i)) + + async def test_filter(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + await self.fill_ringbuffer(item_factory) + + expected_size = CAPACITY // 2 + + result_set = await self.ringbuffer.read_many(0, 0, CAPACITY, PrefixFilter("good")) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + async def test_filter_with_max_count(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + await self.fill_ringbuffer(item_factory) + + expected_size = 3 + + result_set = await self.ringbuffer.read_many(0, 0, expected_size, PrefixFilter("good")) + self.assertEqual(expected_size * 2 - 1, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(expected_size * 2 - 1, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + async def fill_ringbuffer(self, item_factory=lambda i: i, item_count=CAPACITY): + for i in range(0, item_count): + await self.ringbuffer.add(item_factory(i)) + + +class PrefixFilter(IdentifiedDataSerializable): + def __init__(self, prefix): + self.prefix = prefix + + def write_data(self, object_data_output): + object_data_output.write_string(self.prefix) + + def read_data(self, object_data_input): + self.prefix = object_data_input.read_string() + + def get_factory_id(self): + return 666 + + def get_class_id(self): + return 14 From bc258ca1667c0999f5d8fc4d54c90cdb8bdd589e Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 08:56:01 +0300 Subject: [PATCH 4/5] black --- tests/integration/asyncio/proxy/ringbuffer_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py index 8c5ea7ae3b..76cc72e31e 100644 --- a/tests/integration/asyncio/proxy/ringbuffer_test.py +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -20,9 +20,7 @@ def configure_client(cls, config): def configure_cluster(cls): path = os.path.abspath(__file__) dir_path = os.path.dirname(path) - xml_path = os.path.join( - dir_path, "../../backward_compatible/proxy/hazelcast.xml" - ) + xml_path = os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml") with open(xml_path) as f: return f.read() @@ -136,9 +134,7 @@ def configure_client(cls, config): def configure_cluster(cls): path = os.path.abspath(__file__) dir_path = os.path.dirname(path) - xml_path = os.path.join( - dir_path, "../../backward_compatible/proxy/hazelcast.xml" - ) + xml_path = os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml") with open(xml_path) as f: return f.read() From 26973e6d0f223b1125d5e4555bf2c2f1063edeee Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 09:21:33 +0300 Subject: [PATCH 5/5] deal with task cancelation in another PR --- tests/integration/asyncio/proxy/ringbuffer_test.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py index 76cc72e31e..5b113b5a8b 100644 --- a/tests/integration/asyncio/proxy/ringbuffer_test.py +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -176,11 +176,6 @@ async def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY + 1, 1, CAPACITY)) await asyncio.sleep(0.5) self.assertFalse(task.done()) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass async def test_when_min_count_items_are_not_available_then_blocks(self): await self.fill_ringbuffer() @@ -188,11 +183,6 @@ async def test_when_min_count_items_are_not_available_then_blocks(self): task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) await asyncio.sleep(0.5) self.assertFalse(task.done()) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass async def test_when_some_waiting_needed(self): await self.fill_ringbuffer()