diff --git a/vertexai/_genai/_evals_common.py b/vertexai/_genai/_evals_common.py index b51017a889..3a585d287f 100644 --- a/vertexai/_genai/_evals_common.py +++ b/vertexai/_genai/_evals_common.py @@ -1532,6 +1532,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def] dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None, dest: Optional[str] = None, location: Optional[str] = None, + evaluation_service_qps: Optional[float] = None, **kwargs, ) -> types.EvaluationResult: """Evaluates a dataset using the provided metrics. @@ -1544,6 +1545,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def] dest: The destination to save the evaluation results. location: The location to use for the evaluation. If not specified, the location configured in the client will be used. + evaluation_service_qps: The rate limit (queries per second) for calls + to the evaluation service. Defaults to 10. Increase this value if + your project has a higher EvaluateInstances API quota. **kwargs: Extra arguments to pass to evaluation, such as `agent_info`. Returns: @@ -1619,7 +1623,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def] logger.info("Running Metric Computation...") t1 = time.perf_counter() evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate( - evaluation_run_config + evaluation_run_config, + evaluation_service_qps=evaluation_service_qps, ) t2 = time.perf_counter() logger.info("Evaluation took: %f seconds", t2 - t1) diff --git a/vertexai/_genai/_evals_metric_handlers.py b/vertexai/_genai/_evals_metric_handlers.py index 9d72bafc86..37a31adeea 100644 --- a/vertexai/_genai/_evals_metric_handlers.py +++ b/vertexai/_genai/_evals_metric_handlers.py @@ -31,6 +31,7 @@ from . import _evals_common from . import _evals_constant +from . import _evals_utils from . import evals from . import types @@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel): """The number of response candidates for the evaluation run.""" +def _rate_limited_get_metric_result( + rate_limiter: _evals_utils.RateLimiter, + handler: MetricHandler[Any], + eval_case: types.EvalCase, + response_index: int, +) -> types.EvalCaseMetricResult: + """Wraps a handler's get_metric_result with rate limiting.""" + rate_limiter.sleep_and_advance() + return handler.get_metric_result(eval_case, response_index) + + def compute_metrics_and_aggregate( evaluation_run_config: EvaluationRunConfig, + evaluation_service_qps: Optional[float] = None, ) -> types.EvaluationResult: - """Computes metrics and aggregates them for a given evaluation run config.""" + """Computes metrics and aggregates them for a given evaluation run config. + + Args: + evaluation_run_config: The configuration for the evaluation run. + evaluation_service_qps: Optional QPS limit for the evaluation service. + Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher + quotas can increase this value. + """ metric_handlers = [] all_futures = [] results_by_case_response_metric: collections.defaultdict[ @@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate( execution_errors = [] case_indices_with_errors = set() + if evaluation_service_qps is not None and evaluation_service_qps <= 0: + raise ValueError("evaluation_service_qps must be a positive number.") + qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS + rate_limiter = _evals_utils.RateLimiter(rate=qps) + logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps) + for eval_metric in evaluation_run_config.metrics: metric_handlers.append( get_handler_for_metric(evaluation_run_config.evals_module, eval_metric) @@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate( for response_index in range(actual_num_candidates_for_case): try: future = executor.submit( - metric_handler_instance.get_metric_result, + _rate_limited_get_metric_result, + rate_limiter, + metric_handler_instance, eval_case, response_index, ) diff --git a/vertexai/_genai/_evals_utils.py b/vertexai/_genai/_evals_utils.py index 9d4dd4fc71..8754e280e0 100644 --- a/vertexai/_genai/_evals_utils.py +++ b/vertexai/_genai/_evals_utils.py @@ -15,9 +15,11 @@ """Utility functions for evals.""" import abc +import json import logging import os -import json +import threading +import time from typing import Any, Optional, Union from google.genai._api_client import BaseApiClient @@ -36,12 +38,59 @@ GCS_PREFIX = "gs://" BQ_PREFIX = "bq://" +_DEFAULT_EVAL_SERVICE_QPS = 10 + + +class RateLimiter: + """Helper class for rate-limiting requests to Vertex AI to improve QoS. + + Implements a token bucket algorithm to limit the rate at which API calls + can occur. Designed for cases where the batch size is always 1 for traffic + shaping and rate limiting. + + Attributes: + seconds_per_event: The time interval (in seconds) between events to + maintain the desired rate. + last: The timestamp of the last event. + _lock: A lock to ensure thread safety. + """ + + def __init__(self, rate: float) -> None: + """Initializes the rate limiter. + + Args: + rate: The number of queries allowed per second. + + Raises: + ValueError: If the rate is not positive. + """ + if not rate or rate <= 0: + raise ValueError("Rate must be a positive number") + self.seconds_per_event = 1.0 / rate + self._next_allowed = time.monotonic() + self._lock = threading.Lock() + + def sleep_and_advance(self) -> None: + """Blocks the current thread until the next event can be admitted. + + The lock is held only long enough to reserve a time slot. The + actual sleep happens outside the lock so that multiple threads + can be sleeping concurrently with staggered wake-up times. + """ + with self._lock: + now = time.monotonic() + wait_until = max(now, self._next_allowed) + delay = wait_until - now + self._next_allowed = wait_until + self.seconds_per_event + + if delay > 0: + time.sleep(delay) class EvalDatasetLoader: """A loader for datasets from various sources, using a shared client.""" - def __init__(self, api_client: BaseApiClient): + def __init__(self, api_client: BaseApiClient) -> None: self.api_client = api_client self.gcs_utils = _gcs_utils.GcsUtils(self.api_client) self.bigquery_utils = _bigquery_utils.BigQueryUtils(self.api_client) diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 535fc9cb26..a7e8fe16fd 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -15396,6 +15396,12 @@ class EvaluateMethodConfig(_common.BaseModel): dest: Optional[str] = Field( default=None, description="""The destination path for the evaluation results.""" ) + evaluation_service_qps: Optional[float] = Field( + default=None, + description="""The rate limit (queries per second) for calls to the + evaluation service. Defaults to 10. Increase this value if your + project has a higher EvaluateInstances API quota.""", + ) class EvaluateMethodConfigDict(TypedDict, total=False): @@ -15412,6 +15418,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False): dest: Optional[str] """The destination path for the evaluation results.""" + evaluation_service_qps: Optional[float] + """The rate limit (queries per second) for calls to the + evaluation service. Defaults to 10. Increase this value if your + project has a higher EvaluateInstances API quota.""" + EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]