From b44040de8fb08e0900bf952c898b5ed8f795fef3 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 23 Mar 2026 15:29:02 +0300 Subject: [PATCH] Ported Executor to asyncio --- hazelcast/internal/asyncio_client.py | 13 ++ hazelcast/internal/asyncio_proxy/base.py | 6 + hazelcast/internal/asyncio_proxy/executor.py | 136 ++++++++++++++++++ hazelcast/internal/asyncio_proxy/manager.py | 4 +- .../asyncio/proxy/executor_test.py | 81 +++++++++++ 5 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 hazelcast/internal/asyncio_proxy/executor.py create mode 100644 tests/integration/asyncio/proxy/executor_test.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f6e2d62954..2132da6a9d 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -23,6 +23,7 @@ dynamic_config_add_vector_collection_config_codec, ) from hazelcast.internal.asyncio_proxy.manager import ( + EXECUTOR_SERVICE, LIST_SERVICE, MAP_SERVICE, ProxyManager, @@ -30,6 +31,7 @@ VECTOR_SERVICE, ) from hazelcast.internal.asyncio_proxy.base import Proxy +from hazelcast.internal.asyncio_proxy.executor import Executor from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap @@ -252,6 +254,17 @@ async def _start(self): raise _logger.info("Client started") + async def get_executor(self, name: str) -> Executor: + """Returns the executor instance with the specified name. + + Args: + name: Name of the executor. + + Returns: + Executor instance with the specified name. + """ + return await self._proxy_manager.get_or_create(EXECUTOR_SERVICE, name) + async def get_list(self, name: str) -> List[KeyType]: """Returns the distributed list instance with the specified name. diff --git a/hazelcast/internal/asyncio_proxy/base.py b/hazelcast/internal/asyncio_proxy/base.py index 03c3656b5b..136a6b8200 100644 --- a/hazelcast/internal/asyncio_proxy/base.py +++ b/hazelcast/internal/asyncio_proxy/base.py @@ -67,6 +67,12 @@ def _invoke_on_target( self._invocation_service.invoke(invocation) return invocation.future + async def _ainvoke_on_target( + self, request, uuid, response_handler=_no_op_response_handler + ) -> typing.Any: + fut = self._invoke_on_target(request, uuid, response_handler) + return await fut + async def _invoke_on_key( self, request, key_data, response_handler=_no_op_response_handler ) -> typing.Any: diff --git a/hazelcast/internal/asyncio_proxy/executor.py b/hazelcast/internal/asyncio_proxy/executor.py new file mode 100644 index 0000000000..3cdd493803 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/executor.py @@ -0,0 +1,136 @@ +import asyncio +import typing +from uuid import uuid4 + +from hazelcast.core import MemberInfo +from hazelcast.protocol.codec import ( + executor_service_shutdown_codec, + executor_service_is_shutdown_codec, + executor_service_submit_to_partition_codec, + executor_service_submit_to_member_codec, +) +from hazelcast.internal.asyncio_proxy.base import Proxy +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_not_none + + +class Executor(Proxy): + """An object that executes submitted executable tasks.""" + + async def execute_on_key_owner(self, key: typing.Any, task: typing.Any) -> typing.Any: + """Executes a task on the owner of the specified key. + + Args: + key: The specified key. + task: A task executed on the owner of the specified key. + + Returns: + The result of the task. + """ + check_not_none(key, "key can't be None") + check_not_none(task, "task can't be None") + + try: + key_data = self._to_data(key) + task_data = self._to_data(task) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.execute_on_key_owner, key, task) + + partition_id = self._partition_service.get_partition_id(key_data) + uuid = uuid4() + + def handler(message): + return self._to_object( + executor_service_submit_to_partition_codec.decode_response(message) + ) + + request = executor_service_submit_to_partition_codec.encode_request( + self.name, uuid, task_data + ) + return await self._ainvoke_on_partition(request, partition_id, handler) + + async def execute_on_member(self, member: MemberInfo, task: typing.Any) -> typing.Any: + """Executes a task on the specified member. + + Args: + member: The specified member. + task: The task executed on the specified member. + + Returns: + The result of the task. + """ + check_not_none(task, "task can't be None") + try: + task_data = self._to_data(task) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.execute_on_member, member, task) + + uuid = uuid4() + return await self._execute_on_member(uuid, task_data, member.uuid) + + async def execute_on_members( + self, members: typing.Sequence[MemberInfo], task: typing.Any + ) -> typing.List[typing.Any]: + """Executes a task on each of the specified members. + + Args: + members: The specified members. + task: The task executed on the specified members. + + Returns: + The list of results of the tasks on each member. + """ + try: + task_data = self._to_data(task) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.execute_on_members, members, task) + + uuid = uuid4() + tasks = [] + async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] + tasks = [ + tg.create_task(self._execute_on_member(uuid, task_data, member.uuid)) + for member in members + ] + return [task.result() for task in tasks] + + async def execute_on_all_members(self, task: typing.Any) -> typing.List[typing.Any]: + """Executes a task on all the known cluster members. + + Args: + task: The task executed on the all the members. + + Returns: + The list of results of the tasks on each member. + """ + return await self.execute_on_members(self._context.cluster_service.get_members(), task) + + async def is_shutdown(self) -> bool: + """Determines whether this executor has been shutdown or not. + + Returns: + ``True`` if the executor has been shutdown, ``False`` otherwise. + """ + request = executor_service_is_shutdown_codec.encode_request(self.name) + return await self._invoke(request, executor_service_is_shutdown_codec.decode_response) + + async def shutdown(self) -> None: + """Initiates a shutdown process which works orderly. Tasks that were + submitted before shutdown are executed but new task will not be + accepted. + """ + request = executor_service_shutdown_codec.encode_request(self.name) + return await self._invoke(request) + + async def _execute_on_member(self, uuid, task_data, member_uuid) -> typing.Any: + def handler(message): + return self._to_object(executor_service_submit_to_member_codec.decode_response(message)) + + request = executor_service_submit_to_member_codec.encode_request( + self.name, uuid, task_data, member_uuid + ) + return await self._ainvoke_on_target(request, member_uuid, handler) + + +async def create_executor_proxy(service_name, name, context): + return Executor(service_name, name, context) diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 812d8eeb72..187246a630 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -1,9 +1,9 @@ import asyncio import typing +from hazelcast.internal.asyncio_proxy.executor import create_executor_proxy from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( - VectorCollection, create_vector_collection_proxy, ) from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec @@ -13,6 +13,7 @@ from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy from hazelcast.util import to_list +EXECUTOR_SERVICE = "hz:impl:executorService" LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" @@ -22,6 +23,7 @@ str, typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], ] = { + EXECUTOR_SERVICE: create_executor_proxy, LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, diff --git a/tests/integration/asyncio/proxy/executor_test.py b/tests/integration/asyncio/proxy/executor_test.py new file mode 100644 index 0000000000..d37b850c23 --- /dev/null +++ b/tests/integration/asyncio/proxy/executor_test.py @@ -0,0 +1,81 @@ +import os + +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.integration.backward_compatible.util import ( + read_string_from_input, + write_string_to_output, +) +from tests.util import random_string + + +class AppendTask(IdentifiedDataSerializable): + """Client side version of com.hazelcast.client.test.executor.tasks.AppendCallable""" + + def __init__(self, message): + self.message = message + + def write_data(self, object_data_output): + write_string_to_output(object_data_output, self.message) + + def read_data(self, object_data_input): + self.message = read_string_from_input(object_data_input) + + def get_factory_id(self): + return 66 + + def get_class_id(self): + return 5 + + +APPENDAGE = ":CallableResult" # defined on the server side + + +class ExecutorTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml")) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.executor = await self.client.get_executor(random_string()) + self.message = random_string() + self.task = AppendTask(self.message) + + async def asyncTearDown(self): + await self.executor.shutdown() + await self.executor.destroy() + await super().asyncTearDown() + + async def test_execute_on_key_owner(self): + result = await self.executor.execute_on_key_owner("key", self.task) + self.assertEqual(self.message + APPENDAGE, result) + + async def test_execute_on_member(self): + member = self.client.cluster_service.get_members()[0] + result = await self.executor.execute_on_member(member, self.task) + self.assertEqual(self.message + APPENDAGE, result) + + async def test_execute_on_members(self): + members = self.client.cluster_service.get_members() + result = await self.executor.execute_on_members(members, self.task) + self.assertEqual([self.message + APPENDAGE], result) + + async def test_execute_on_all_members(self): + result = await self.executor.execute_on_all_members(self.task) + self.assertEqual([self.message + APPENDAGE], result) + + async def test_shutdown(self): + await self.executor.shutdown() + self.assertTrue(await self.executor.is_shutdown()) + + async def test_str(self): + self.assertTrue(str(self.executor).startswith("Executor"))