Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions tests/perf/microbenchmarks/writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
)
from tests.perf.microbenchmarks.conftest import publish_resource_metrics
import tests.perf.microbenchmarks.writes.config as config
from google.cloud import storage

# Get write parameters
all_params = config.get_write_params()
Expand Down Expand Up @@ -318,10 +317,30 @@ def target_wrapper(*args, **kwargs):
)


# --- Global Variables for Worker Process ---
worker_loop = None
worker_client = None
worker_json_client = None


def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
os.sched_setaffinity(0, {i for i in range(20, 190)}) # Pin to cores 20-189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The hardcoded core range range(20, 190) for os.sched_setaffinity is not portable and will raise a ValueError on systems with fewer than 190 cores, causing the benchmark to crash. Please consider making this more robust by checking against the number of available cores, for example by using os.cpu_count().

global worker_loop, worker_client, worker_json_client
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
else: # regional
from google.cloud import storage

worker_json_client = storage.Client()
Comment on lines +326 to +337
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _worker_init function initializes an event loop and clients for each worker process, but there's no corresponding cleanup logic. This can lead to resource leaks. While process termination will clean up resources, a graceful shutdown is better practice. Consider using atexit.register() in _worker_init to call a cleanup function that closes the clients and the event loop before the worker process exits.



def _upload_files_worker(files_to_upload, other_params, bucket_type):
"""A worker function for multi-processing uploads.

Initializes a client and calls the appropriate multi-coroutine upload function.
Calls the appropriate multi-coroutine upload function using the global client.
This function is intended to be called in a separate process.

Args:
Expand All @@ -333,41 +352,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type):
float: The maximum latency from the uploads performed by this worker.
"""
if bucket_type == "zonal":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = loop.run_until_complete(create_client())
try:
result = upload_files_using_grpc_multi_coro(
loop, client, files_to_upload, other_params
)
finally:
# cleanup loop
tasks = asyncio.all_tasks(loop=loop)
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return result
return upload_files_using_grpc_multi_coro(
worker_loop, worker_client, files_to_upload, other_params
)
else: # regional
json_client = storage.Client()
return upload_files_using_json_multi_threaded(
None, json_client, files_to_upload, other_params
None, worker_json_client, files_to_upload, other_params
)


def upload_files_mp_mc_wrapper(files_names, params):
def upload_files_mp_mc_wrapper(pool, files_names, params):
"""Wrapper for multi-process, multi-coroutine uploads.

Distributes files among a pool of processes and calls the worker function.

Args:
pool: The multiprocessing pool.
files_names (list): The full list of filenames to upload.
params: An object containing benchmark parameters (num_processes, num_coros).
params: An object containing benchmark parameters (num_coros).

Returns:
float: The maximum latency observed across all processes.
"""
num_processes = params.num_processes
num_coros = params.num_coros

filenames_per_process = [
Expand All @@ -383,9 +389,7 @@ def upload_files_mp_mc_wrapper(files_names, params):
for filenames in filenames_per_process
]

ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=num_processes) as pool:
results = pool.starmap(_upload_files_worker, args)
results = pool.starmap(_upload_files_worker, args)

return max(results)

Expand Down Expand Up @@ -414,18 +418,27 @@ def target_wrapper(*args, **kwargs):
output_times.append(result)
return output_times

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(
processes=params.num_processes,
initializer=_worker_init,
initargs=(params.bucket_type,),
)
try:
with monitor() as m:
output_times = benchmark.pedantic(
target=target_wrapper,
iterations=1,
rounds=params.rounds,
args=(
pool,
files_names,
params,
),
)
finally:
pool.close()
pool.join()
publish_benchmark_extra_info(
benchmark, params, benchmark_group="write", true_times=output_times
)
Expand Down