-
Notifications
You must be signed in to change notification settings - Fork 172
fix: init mp pool & grpc client once, use os.sched_setaffinity #1751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: time_based_benchmarks
Are you sure you want to change the base?
Changes from all commits
c3d6ede
f44ebba
50b8105
6bf0860
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,6 @@ | |
| ) | ||
| from tests.perf.microbenchmarks.conftest import publish_resource_metrics | ||
| import tests.perf.microbenchmarks.writes.config as config | ||
| from google.cloud import storage | ||
|
|
||
| # Get write parameters | ||
| all_params = config.get_write_params() | ||
|
|
@@ -318,10 +317,30 @@ def target_wrapper(*args, **kwargs): | |
| ) | ||
|
|
||
|
|
||
| # --- Global Variables for Worker Process --- | ||
| worker_loop = None | ||
| worker_client = None | ||
| worker_json_client = None | ||
|
|
||
|
|
||
| def _worker_init(bucket_type): | ||
| """Initializes a persistent event loop and client for each worker process.""" | ||
| os.sched_setaffinity(0, {i for i in range(20, 190)}) # Pin to cores 20-189 | ||
| global worker_loop, worker_client, worker_json_client | ||
| if bucket_type == "zonal": | ||
| worker_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(worker_loop) | ||
| worker_client = worker_loop.run_until_complete(create_client()) | ||
| else: # regional | ||
| from google.cloud import storage | ||
|
|
||
| worker_json_client = storage.Client() | ||
|
Comment on lines
+326
to
+337
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
|
|
||
| def _upload_files_worker(files_to_upload, other_params, bucket_type): | ||
| """A worker function for multi-processing uploads. | ||
|
|
||
| Initializes a client and calls the appropriate multi-coroutine upload function. | ||
| Calls the appropriate multi-coroutine upload function using the global client. | ||
| This function is intended to be called in a separate process. | ||
|
|
||
| Args: | ||
|
|
@@ -333,41 +352,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type): | |
| float: The maximum latency from the uploads performed by this worker. | ||
| """ | ||
| if bucket_type == "zonal": | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| client = loop.run_until_complete(create_client()) | ||
| try: | ||
| result = upload_files_using_grpc_multi_coro( | ||
| loop, client, files_to_upload, other_params | ||
| ) | ||
| finally: | ||
| # cleanup loop | ||
| tasks = asyncio.all_tasks(loop=loop) | ||
| for task in tasks: | ||
| task.cancel() | ||
| loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | ||
| loop.close() | ||
| return result | ||
| return upload_files_using_grpc_multi_coro( | ||
| worker_loop, worker_client, files_to_upload, other_params | ||
| ) | ||
| else: # regional | ||
| json_client = storage.Client() | ||
| return upload_files_using_json_multi_threaded( | ||
| None, json_client, files_to_upload, other_params | ||
| None, worker_json_client, files_to_upload, other_params | ||
| ) | ||
|
|
||
|
|
||
| def upload_files_mp_mc_wrapper(files_names, params): | ||
| def upload_files_mp_mc_wrapper(pool, files_names, params): | ||
| """Wrapper for multi-process, multi-coroutine uploads. | ||
|
|
||
| Distributes files among a pool of processes and calls the worker function. | ||
|
|
||
| Args: | ||
| pool: The multiprocessing pool. | ||
| files_names (list): The full list of filenames to upload. | ||
| params: An object containing benchmark parameters (num_processes, num_coros). | ||
| params: An object containing benchmark parameters (num_coros). | ||
|
|
||
| Returns: | ||
| float: The maximum latency observed across all processes. | ||
| """ | ||
| num_processes = params.num_processes | ||
| num_coros = params.num_coros | ||
|
|
||
| filenames_per_process = [ | ||
|
|
@@ -383,9 +389,7 @@ def upload_files_mp_mc_wrapper(files_names, params): | |
| for filenames in filenames_per_process | ||
| ] | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| with ctx.Pool(processes=num_processes) as pool: | ||
| results = pool.starmap(_upload_files_worker, args) | ||
| results = pool.starmap(_upload_files_worker, args) | ||
|
|
||
| return max(results) | ||
|
|
||
|
|
@@ -414,18 +418,27 @@ def target_wrapper(*args, **kwargs): | |
| output_times.append(result) | ||
| return output_times | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| pool = ctx.Pool( | ||
| processes=params.num_processes, | ||
| initializer=_worker_init, | ||
| initargs=(params.bucket_type,), | ||
| ) | ||
| try: | ||
| with monitor() as m: | ||
| output_times = benchmark.pedantic( | ||
| target=target_wrapper, | ||
| iterations=1, | ||
| rounds=params.rounds, | ||
| args=( | ||
| pool, | ||
| files_names, | ||
| params, | ||
| ), | ||
| ) | ||
| finally: | ||
| pool.close() | ||
| pool.join() | ||
| publish_benchmark_extra_info( | ||
| benchmark, params, benchmark_group="write", true_times=output_times | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded core range
range(20, 190)foros.sched_setaffinityis not portable and will raise aValueErroron systems with fewer than 190 cores, causing the benchmark to crash. Please consider making this more robust by checking against the number of available cores, for example by usingos.cpu_count().