Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion vertexai/_genai/_evals_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None,
dest: Optional[str] = None,
location: Optional[str] = None,
evaluation_service_qps: Optional[float] = None,
**kwargs,
) -> types.EvaluationResult:
"""Evaluates a dataset using the provided metrics.
Expand All @@ -1544,6 +1545,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
dest: The destination to save the evaluation results.
location: The location to use for the evaluation. If not specified, the
location configured in the client will be used.
evaluation_service_qps: The rate limit (queries per second) for calls
to the evaluation service. Defaults to 10. Increase this value if
your project has a higher EvaluateInstances API quota.
**kwargs: Extra arguments to pass to evaluation, such as `agent_info`.

Returns:
Expand Down Expand Up @@ -1619,7 +1623,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
logger.info("Running Metric Computation...")
t1 = time.perf_counter()
evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate(
evaluation_run_config
evaluation_run_config,
evaluation_service_qps=evaluation_service_qps,
)
t2 = time.perf_counter()
logger.info("Evaluation took: %f seconds", t2 - t1)
Expand Down
32 changes: 30 additions & 2 deletions vertexai/_genai/_evals_metric_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from . import _evals_common
from . import _evals_constant
from . import _evals_utils
from . import evals
from . import types

Expand Down Expand Up @@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel):
"""The number of response candidates for the evaluation run."""


def _rate_limited_get_metric_result(
rate_limiter: _evals_utils.RateLimiter,
handler: MetricHandler[Any],
eval_case: types.EvalCase,
response_index: int,
) -> types.EvalCaseMetricResult:
"""Wraps a handler's get_metric_result with rate limiting."""
rate_limiter.sleep_and_advance()
return handler.get_metric_result(eval_case, response_index)


def compute_metrics_and_aggregate(
evaluation_run_config: EvaluationRunConfig,
evaluation_service_qps: Optional[float] = None,
) -> types.EvaluationResult:
"""Computes metrics and aggregates them for a given evaluation run config."""
"""Computes metrics and aggregates them for a given evaluation run config.

Args:
evaluation_run_config: The configuration for the evaluation run.
evaluation_service_qps: Optional QPS limit for the evaluation service.
Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher
quotas can increase this value.
"""
metric_handlers = []
all_futures = []
results_by_case_response_metric: collections.defaultdict[
Expand All @@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate(
execution_errors = []
case_indices_with_errors = set()

if evaluation_service_qps is not None and evaluation_service_qps <= 0:
raise ValueError("evaluation_service_qps must be a positive number.")
qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS
rate_limiter = _evals_utils.RateLimiter(rate=qps)
logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps)

for eval_metric in evaluation_run_config.metrics:
metric_handlers.append(
get_handler_for_metric(evaluation_run_config.evals_module, eval_metric)
Expand Down Expand Up @@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate(
for response_index in range(actual_num_candidates_for_case):
try:
future = executor.submit(
metric_handler_instance.get_metric_result,
_rate_limited_get_metric_result,
rate_limiter,
metric_handler_instance,
eval_case,
response_index,
)
Expand Down
53 changes: 51 additions & 2 deletions vertexai/_genai/_evals_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
"""Utility functions for evals."""

import abc
import json
import logging
import os
import json
import threading
import time
from typing import Any, Optional, Union

from google.genai._api_client import BaseApiClient
Expand All @@ -36,12 +38,59 @@

GCS_PREFIX = "gs://"
BQ_PREFIX = "bq://"
_DEFAULT_EVAL_SERVICE_QPS = 10


class RateLimiter:
"""Helper class for rate-limiting requests to Vertex AI to improve QoS.

Implements a token bucket algorithm to limit the rate at which API calls
can occur. Designed for cases where the batch size is always 1 for traffic
shaping and rate limiting.

Attributes:
seconds_per_event: The time interval (in seconds) between events to
maintain the desired rate.
last: The timestamp of the last event.
_lock: A lock to ensure thread safety.
"""

def __init__(self, rate: float) -> None:
"""Initializes the rate limiter.

Args:
rate: The number of queries allowed per second.

Raises:
ValueError: If the rate is not positive.
"""
if not rate or rate <= 0:
raise ValueError("Rate must be a positive number")
self.seconds_per_event = 1.0 / rate
self._next_allowed = time.monotonic()
self._lock = threading.Lock()

def sleep_and_advance(self) -> None:
"""Blocks the current thread until the next event can be admitted.

The lock is held only long enough to reserve a time slot. The
actual sleep happens outside the lock so that multiple threads
can be sleeping concurrently with staggered wake-up times.
"""
with self._lock:
now = time.monotonic()
wait_until = max(now, self._next_allowed)
delay = wait_until - now
self._next_allowed = wait_until + self.seconds_per_event

if delay > 0:
time.sleep(delay)


class EvalDatasetLoader:
"""A loader for datasets from various sources, using a shared client."""

def __init__(self, api_client: BaseApiClient):
def __init__(self, api_client: BaseApiClient) -> None:
self.api_client = api_client
self.gcs_utils = _gcs_utils.GcsUtils(self.api_client)
self.bigquery_utils = _bigquery_utils.BigQueryUtils(self.api_client)
Expand Down
11 changes: 11 additions & 0 deletions vertexai/_genai/types/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15396,6 +15396,12 @@ class EvaluateMethodConfig(_common.BaseModel):
dest: Optional[str] = Field(
default=None, description="""The destination path for the evaluation results."""
)
evaluation_service_qps: Optional[float] = Field(
default=None,
description="""The rate limit (queries per second) for calls to the
evaluation service. Defaults to 10. Increase this value if your
project has a higher EvaluateInstances API quota.""",
)


class EvaluateMethodConfigDict(TypedDict, total=False):
Expand All @@ -15412,6 +15418,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False):
dest: Optional[str]
"""The destination path for the evaluation results."""

evaluation_service_qps: Optional[float]
"""The rate limit (queries per second) for calls to the
evaluation service. Defaults to 10. Increase this value if your
project has a higher EvaluateInstances API quota."""


EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]

Expand Down
Loading