Major refactor to move multi-threaded load generator to async event loops#282
Major refactor to move multi-threaded load generator to async event loops#282nv-alicheng wants to merge 6 commits intomainfrom
Conversation
…anently removed (#214)
* Add KVStore, ready-check mechanism, and ServiceLauncher - BasicKVStore (mmap on /dev/shm) for cross-process metric reads - Ready-check protocol for subprocess startup synchronization - ServiceLauncher for managing subprocess lifecycle - Refactored metrics_table to remove duplicated code * Address PR comments * Move MetricsAggregator to use Enums instead of strings to track metrics * Minor comments, fixes. Restrict filemode to 600 instead of 666 * fix: address PR #215 review feedback - crash detection bug, _grow() capacity update ordering, document ARM workaround * Add guard to avoid no-op flush() syscall on x86 * Pass in KVStore to triggers to avoid injection during table.add_trigger * Fix timeout to be across all processes, not per process
* Add report building * Cleanup of PR comments, typing fix for KVStore to explicitly specify uint64 or float64 for precision purposes * Fix zip() usage, remove overly strict guard for TPS value * Move report_builder.py to be a classmethod in Report. Use 'lower' instead of linear interpolation for numpy percentile
* Add design doc for async load-generator * feat: add async load strategies, sample ordering, and delay functions (Phase 1) * feat: async BenchmarkSession with multi-phase support, PhaseIssuer, and stale completion filtering (Phase 2) * feat: rewrite HttpClientSampleIssuer for async session, add HTTPEndpointClient.create() factory (Phase 3) * refactor: remove legacy load generator (Sample, Scheduler, SampleEventHandler) and update references * Fix test_end_to_end_oracle.py to use new loadgen * Integrate pubsub into execute.py * Fix integration with accuracy tests and Scorer * PR fixes, add full e2e accuracy integration test * Fix for sockets in some docker contexts * Fix race condition bug where workers were closing+reopening sockets after timeout errors which could cause non-existent socket errors * Remove hardcoded 1-min drain timeout * Remove lazy imports, fix bug with accuracy datasets discarding preprocessing tag, which may cause overwrites * Resolve report at end of session rather than end of phase * Fix name mismatch in metrics KVStore using str(enum) rather than enum.value * Add report.txt detailed report back in * Show error if tmpfs is used on ARM systems. Remove StreamChunk(is_complete=True) codepaths * Remove unused is_complete flag from StreamChunk * PR Comments - documentation fixes, ZMQ context scoping * Remove lazy imports from tests
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
| loop: asyncio.AbstractEventLoop | None, | ||
| streaming: bool, | ||
| ) -> None: | ||
| for key in MetricCounterKey: |
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| class FakeDataset(Dataset): |
| recv (async ZMQ recv), shutdown. | ||
| """ | ||
|
|
||
| def issue(self, query: Query) -> None: ... |
| """ | ||
|
|
||
| def issue(self, query: Query) -> None: ... | ||
| async def recv(self) -> QueryResult | StreamChunk | None: ... |
|
|
||
| def issue(self, query: Query) -> None: ... | ||
| async def recv(self) -> QueryResult | StreamChunk | None: ... | ||
| def shutdown(self) -> None: ... |
| class EventPublisher(Protocol): | ||
| """Publishes EventRecords to the metrics pipeline.""" | ||
|
|
||
| def publish(self, event_record: EventRecord) -> None: ... |
|
|
||
| def issue(self, sample_index: int) -> str | None: | ||
| """Issue a sample. Returns query_id, or None if the session is stopping.""" | ||
| ... |
|
|
||
| async def execute(self, phase_issuer: PhaseIssuerProtocol) -> int: | ||
| """Drive sample issuance. Returns count of samples issued.""" | ||
| ... |
|
|
||
| Used by ConcurrencyStrategy to release semaphore slots. | ||
| """ | ||
| ... |
|
|
||
| injector = asyncio.create_task(inject_streaming_responses()) | ||
| result = await asyncio.wait_for(session.run(phases), timeout=5.0) | ||
| await injector |
There was a problem hiding this comment.
Code Review
This pull request transitions the load generator to an asynchronous architecture and replaces the SQLite-based metrics pipeline with a high-performance, mmap-backed key-value store. It introduces a new BenchmarkSession for phase orchestration and optimized async strategies for Poisson, burst, and concurrency load patterns. The review feedback identifies a potential race condition caused by premature temporary directory deletion in the scoring artifact logic, suggests using generic placeholders for endpoint URLs to improve configuration portability, and recommends dynamically registering metric keys using enums to enhance code maintainability and prevent silent failures.
| _salvage_tmpfs(ctx.report_dir, tmpfs_dir) | ||
|
|
||
| # Clean up tmpfs | ||
| shutil.rmtree(tmpfs_dir, ignore_errors=True) |
There was a problem hiding this comment.
This shutil.rmtree call prematurely deletes the temporary directory. The main run_benchmark function has a finally block that is responsible for cleaning up this directory after salvaging artifacts on exit or interrupt. Removing the directory here can cause race conditions or errors in the main cleanup logic. All cleanup should be consolidated in one place, preferably the finally block in run_benchmark.
|
|
||
| endpoint_config: | ||
| endpoints: | ||
| - "http://umbriel-b200-145.nvidia.com:8000" |
There was a problem hiding this comment.
The endpoint URL http://umbriel-b200-145.nvidia.com:8000 appears to be a hardcoded internal hostname. It's better to use a generic placeholder like http://localhost:8000 or an environment variable to make the configuration more portable and avoid exposing internal infrastructure details.
- "http://localhost:8000"| def _setup_kv_reader( | ||
| metrics_dir: Path, | ||
| streaming: bool, | ||
| ) -> BasicKVStoreReader: | ||
| """Create a KVStoreReader pre-registered with all metric keys.""" | ||
| reader = BasicKVStoreReader(metrics_dir) | ||
| # Counter keys (from MetricCounterKey enum) | ||
| for key in [ | ||
| "total_samples_issued", | ||
| "total_samples_completed", | ||
| "total_samples_failed", | ||
| "tracked_samples_issued", | ||
| "tracked_samples_completed", | ||
| "tracked_duration_ns", | ||
| "total_duration_ns", | ||
| ]: | ||
| reader.register_key(key, "counter") | ||
| # Series keys (from MetricSeriesKey enum) | ||
| for key in ["isl", "osl", "sample_latency_ns"]: | ||
| reader.register_key(key, "series") | ||
| reader.register_key("tpot_ns", "series", dtype=float) | ||
| if streaming: | ||
| for key in ["ttft_ns", "chunk_delta_ns"]: | ||
| reader.register_key(key, "series") | ||
| return reader |
There was a problem hiding this comment.
The function _setup_kv_reader uses hardcoded strings for metric keys (e.g., "tracked_samples_issued"). This is brittle; if the key names in the MetricCounterKey or MetricSeriesKey enums change, this code will silently fail to register the keys, leading to missing metrics. It's better to iterate over the enums to register the keys dynamically. This ensures that the reader is always in sync with the keys defined in the aggregator.
For example:
from inference_endpoint.async_utils.services.metrics_aggregator.aggregator import MetricCounterKey
from inference_endpoint.async_utils.services.metrics_aggregator.metrics_table import MetricSeriesKey
def _setup_kv_reader(
metrics_dir: Path,
streaming: bool,
) -> BasicKVStoreReader:
"""Create a KVStoreReader pre-registered with all metric keys."""
reader = BasicKVStoreReader(metrics_dir)
for key in MetricCounterKey:
reader.register_key(key.value, "counter")
series_keys = {
MetricSeriesKey.ISL: int,
MetricSeriesKey.OSL: int,
MetricSeriesKey.SAMPLE_LATENCY_NS: int,
MetricSeriesKey.TPOT_NS: float,
}
if streaming:
series_keys[MetricSeriesKey.TTFT_NS] = int
series_keys[MetricSeriesKey.CHUNK_DELTA_NS] = int
for key, dtype in series_keys.items():
reader.register_key(key.value, "series", dtype=dtype)
return reader
What does this PR do?
Major refactor which moves the multi-threaded system to an event loop based one to reduced Python context switching for higher throughput and performance.
Type of change
Related issues
This merges from a side branch which consists of multiple PRs:
Testing
Checklist