From c3d6ede5bd5e1690cf1ae8e411b6cda55dfb6f8d Mon Sep 17 00:00:00 2001 From: Chandra Date: Thu, 12 Feb 2026 16:25:03 +0000 Subject: [PATCH 1/2] fix: init mp pool & grpc client once, use os.sched_setaffinity process --- .../microbenchmarks/writes/test_writes.py | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/tests/perf/microbenchmarks/writes/test_writes.py b/tests/perf/microbenchmarks/writes/test_writes.py index e2a2d6f63..15e43bc0c 100644 --- a/tests/perf/microbenchmarks/writes/test_writes.py +++ b/tests/perf/microbenchmarks/writes/test_writes.py @@ -318,10 +318,30 @@ def target_wrapper(*args, **kwargs): ) +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None + + +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + os.sched_setaffinity(0, {i for i in range(20, 190)}) # Pin to cores 20-189 + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) + else: # regional + from google.cloud import storage + + worker_json_client = storage.Client() + + def _upload_files_worker(files_to_upload, other_params, bucket_type): """A worker function for multi-processing uploads. - Initializes a client and calls the appropriate multi-coroutine upload function. + Calls the appropriate multi-coroutine upload function using the global client. This function is intended to be called in a separate process. Args: @@ -333,41 +353,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type): float: The maximum latency from the uploads performed by this worker. """ if bucket_type == "zonal": - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = loop.run_until_complete(create_client()) - try: - result = upload_files_using_grpc_multi_coro( - loop, client, files_to_upload, other_params - ) - finally: - # cleanup loop - tasks = asyncio.all_tasks(loop=loop) - for task in tasks: - task.cancel() - loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) - loop.close() - return result + return upload_files_using_grpc_multi_coro( + worker_loop, worker_client, files_to_upload, other_params + ) else: # regional - json_client = storage.Client() return upload_files_using_json_multi_threaded( - None, json_client, files_to_upload, other_params + None, worker_json_client, files_to_upload, other_params ) -def upload_files_mp_mc_wrapper(files_names, params): +def upload_files_mp_mc_wrapper(pool, files_names, params): """Wrapper for multi-process, multi-coroutine uploads. Distributes files among a pool of processes and calls the worker function. Args: + pool: The multiprocessing pool. files_names (list): The full list of filenames to upload. - params: An object containing benchmark parameters (num_processes, num_coros). + params: An object containing benchmark parameters (num_coros). Returns: float: The maximum latency observed across all processes. """ - num_processes = params.num_processes num_coros = params.num_coros filenames_per_process = [ @@ -383,9 +390,7 @@ def upload_files_mp_mc_wrapper(files_names, params): for filenames in filenames_per_process ] - ctx = multiprocessing.get_context("spawn") - with ctx.Pool(processes=num_processes) as pool: - results = pool.starmap(_upload_files_worker, args) + results = pool.starmap(_upload_files_worker, args) return max(results) @@ -414,6 +419,12 @@ def target_wrapper(*args, **kwargs): output_times.append(result) return output_times + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) try: with monitor() as m: output_times = benchmark.pedantic( @@ -421,11 +432,14 @@ def target_wrapper(*args, **kwargs): iterations=1, rounds=params.rounds, args=( + pool, files_names, params, ), ) finally: + pool.close() + pool.join() publish_benchmark_extra_info( benchmark, params, benchmark_group="write", true_times=output_times ) From 50b8105a3099f0030ef5f28c6ce92586b9554ea8 Mon Sep 17 00:00:00 2001 From: Chandra Date: Fri, 13 Feb 2026 05:36:52 +0000 Subject: [PATCH 2/2] remove unused import --- tests/perf/microbenchmarks/writes/test_writes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/perf/microbenchmarks/writes/test_writes.py b/tests/perf/microbenchmarks/writes/test_writes.py index 15e43bc0c..0ce2f4ef8 100644 --- a/tests/perf/microbenchmarks/writes/test_writes.py +++ b/tests/perf/microbenchmarks/writes/test_writes.py @@ -41,7 +41,6 @@ ) from tests.perf.microbenchmarks.conftest import publish_resource_metrics import tests.perf.microbenchmarks.writes.config as config -from google.cloud import storage # Get write parameters all_params = config.get_write_params()