diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index ff29b8783..b7b66b542 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, List +from typing import Any, List, Optional import statistics import io import os @@ -22,7 +22,10 @@ def publish_benchmark_extra_info( params: Any, benchmark_group: str = "read", true_times: List[float] = [], + download_bytes_list: Optional[List[int]] = None, + duration: Optional[int] = None, ) -> None: + """ Helper function to publish benchmark parameters to the extra_info property. """ @@ -41,13 +44,23 @@ def publish_benchmark_extra_info( benchmark.extra_info["processes"] = params.num_processes benchmark.group = benchmark_group - object_size = params.file_size_bytes - num_files = params.num_files - total_uploaded_mib = object_size / (1024 * 1024) * num_files - min_throughput = total_uploaded_mib / benchmark.stats["max"] - max_throughput = total_uploaded_mib / benchmark.stats["min"] - mean_throughput = total_uploaded_mib / benchmark.stats["mean"] - median_throughput = total_uploaded_mib / benchmark.stats["median"] + if download_bytes_list is not None: + assert duration is not None, "Duration must be provided if total_bytes_transferred is provided." + throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] + min_throughput = min(throughputs_list) + max_throughput = max(throughputs_list) + mean_throughput = statistics.mean(throughputs_list) + median_throughput = statistics.median(throughputs_list) + + + else: + object_size = params.file_size_bytes + num_files = params.num_files + total_uploaded_mib = object_size / (1024 * 1024) * num_files + min_throughput = total_uploaded_mib / benchmark.stats["max"] + max_throughput = total_uploaded_mib / benchmark.stats["min"] + mean_throughput = total_uploaded_mib / benchmark.stats["mean"] + median_throughput = total_uploaded_mib / benchmark.stats["median"] benchmark.extra_info["throughput_MiB_s_min"] = min_throughput benchmark.extra_info["throughput_MiB_s_max"] = max_throughput diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py new file mode 100644 index 000000000..bcd186d7b --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -0,0 +1,21 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + + +@pytest.fixture +def workload_params(request): + params = request.param + files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)] + return params, files_names diff --git a/tests/perf/microbenchmarks/time_based/reads/config.py b/tests/perf/microbenchmarks/time_based/reads/config.py new file mode 100644 index 000000000..737bb3b84 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.py @@ -0,0 +1,106 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +import os +from typing import Dict, List + +import yaml + +try: + from tests.perf.microbenchmarks.time_based.reads.parameters import ( + TimeBasedReadParameters, + ) +except ModuleNotFoundError: + from reads.parameters import TimeBasedReadParameters + + +def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: + """Generates a dictionary of benchmark parameters for time based read operations.""" + params: Dict[str, List[TimeBasedReadParameters]] = {} + config_path = os.path.join(os.path.dirname(__file__), "config.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_params = config["common"] + bucket_types = common_params["bucket_types"] + file_sizes_mib = common_params["file_sizes_mib"] + chunk_sizes_kib = common_params["chunk_sizes_kib"] + num_ranges = common_params["num_ranges"] + rounds = common_params["rounds"] + duration = common_params["duration"] + warmup_duration = common_params["warmup_duration"] + + bucket_map = { + "zonal": os.environ.get( + "DEFAULT_RAPID_ZONAL_BUCKET", + config["defaults"]["DEFAULT_RAPID_ZONAL_BUCKET"], + ), + "regional": os.environ.get( + "DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"] + ), + } + + for workload in config["workload"]: + workload_name = workload["name"] + params[workload_name] = [] + pattern = workload["pattern"] + processes = workload["processes"] + coros = workload["coros"] + + # Create a product of all parameter combinations + product = itertools.product( + bucket_types, + file_sizes_mib, + chunk_sizes_kib, + num_ranges, + processes, + coros, + ) + + for ( + bucket_type, + file_size_mib, + chunk_size_kib, + num_ranges_val, + num_processes, + num_coros, + ) in product: + file_size_bytes = file_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_kib * 1024 + bucket_name = bucket_map[bucket_type] + + num_files = num_processes * num_coros + + # Create a descriptive name for the parameter set + name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" + + params[workload_name].append( + TimeBasedReadParameters( + name=name, + workload_name=workload_name, + pattern=pattern, + bucket_name=bucket_name, + bucket_type=bucket_type, + num_coros=num_coros, + num_processes=num_processes, + num_files=num_files, + rounds=rounds, + chunk_size_bytes=chunk_size_bytes, + file_size_bytes=file_size_bytes, + duration=duration, + warmup_duration=warmup_duration, + num_ranges=num_ranges_val, + ) + ) + return params diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml new file mode 100644 index 000000000..e739bfd2f --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -0,0 +1,28 @@ +common: + bucket_types: + - "regional" + - "zonal" + file_sizes_mib: + - 10240 # 10GiB + chunk_sizes_kib: [64] # 16KiB + num_ranges: [1] + rounds: 1 + duration: 30 # seconds + warmup_duration: 5 # seconds + +workload: + ############# multi process multi coroutine ######### + - name: "read_seq_multi_process" + pattern: "seq" + coros: [1] + processes: [96] + + + - name: "read_rand_multi_process" + pattern: "rand" + coros: [1] + processes: [1] + +defaults: + DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" + DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" \ No newline at end of file diff --git a/tests/perf/microbenchmarks/time_based/reads/parameters.py b/tests/perf/microbenchmarks/time_based/reads/parameters.py new file mode 100644 index 000000000..6ed2da210 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/parameters.py @@ -0,0 +1,23 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass +from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters + + +@dataclass +class TimeBasedReadParameters(IOBenchmarkParameters): + pattern: str + duration: int + warmup_duration: int + num_ranges: int diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py new file mode 100644 index 000000000..c56112da9 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -0,0 +1,220 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Microbenchmarks for time-based Google Cloud Storage read operations.""" + +import time +import asyncio +import random +import logging +import os +import multiprocessing + +import pytest + +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) +from tests.perf.microbenchmarks._utils import publish_benchmark_extra_info +from tests.perf.microbenchmarks.conftest import ( + publish_resource_metrics, +) +from io import BytesIO +import tests.perf.microbenchmarks.time_based.reads.config as config + +all_params = config._get_params() + + +async def create_client(): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient() + + +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None +CORE_OFFSET = 20 # Start pinning cores from 20 + + +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + os.sched_setaffinity(0, {i for i in range(20, 180)}) # Pin to cores 20-189 + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) + else: # regional + from google.cloud import storage + + worker_json_client = storage.Client() + + +def _download_time_based_json(client, filename, params): + """Performs time-based downloads using the JSON API.""" + total_bytes_downloaded = 0 + bucket = client.bucket(params.bucket_name) + blob = bucket.blob(filename) + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + bytes_in_iteration = 0 + # For JSON, we can't batch ranges like gRPC, so we download one by one + for _ in range(params.num_ranges): + if params.pattern == "rand": + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + + data = blob.download_as_bytes( + start=offset, end=offset + params.chunk_size_bytes - 1 + ) + bytes_in_iteration += len(data) + + if params.pattern == "seq": + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 + + assert bytes_in_iteration == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += bytes_in_iteration + + return total_bytes_downloaded + + +async def _download_time_based_async(client, filename, params): + total_bytes_downloaded = 0 + + mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) + await mrd.open() + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + ranges = [] + if params.pattern == "rand": + for _ in range(params.num_ranges): + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + else: # seq + for _ in range(params.num_ranges): + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.download_ranges(ranges) + + bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) + assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges + + await mrd.close() + return total_bytes_downloaded + + +def _download_files_worker(process_idx, filename, params, bucket_type): + + if bucket_type == "zonal": + return worker_loop.run_until_complete( + _download_time_based_async(worker_client, filename, params) + ) + else: # regional + return _download_time_based_json(worker_json_client, filename, params) + + +def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): + args = [(i, files_names[i], params, bucket_type) for i in range(len(files_names))] + + results = pool.starmap(_download_files_worker, args) + return sum(results) + + +@pytest.mark.parametrize( + "workload_params", + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], + indirect=True, + ids=lambda p: p.name, +) +def test_downloads_multi_proc_multi_coro( + benchmark, storage_client, monitor, workload_params +): + params, files_names = workload_params + logging.info(f"num files: {len(files_names)}") + + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) + + download_bytes_list = [] + + def target_wrapper(*args, **kwargs): + download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) + return + + try: + with monitor() as m: + benchmark.pedantic( + target=target_wrapper, + iterations=1, + rounds=params.rounds, + args=(files_names, params, params.bucket_type), + ) + finally: + pool.close() + pool.join() + total_bytes_downloaded = sum(download_bytes_list) + throughput_mib_s = ( + total_bytes_downloaded / params.duration / params.rounds + ) / (1024 * 1024) + benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print( + f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s" + ) + publish_benchmark_extra_info( + benchmark, + params, + download_bytes_list=download_bytes_list, + duration=params.duration, + ) + publish_resource_metrics(benchmark, m)