Skip to content

Major refactor to move multi-threaded load generator to async event loops#282

Open
nv-alicheng wants to merge 6 commits intomainfrom
feat/alicheng-pubsub-integration
Open

Major refactor to move multi-threaded load generator to async event loops#282
nv-alicheng wants to merge 6 commits intomainfrom
feat/alicheng-pubsub-integration

Conversation

@nv-alicheng
Copy link
Copy Markdown
Collaborator

What does this PR do?

Major refactor which moves the multi-threaded system to an event loop based one to reduced Python context switching for higher throughput and performance.

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

This merges from a side branch which consists of multiple PRs:

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

nv-alicheng and others added 6 commits April 13, 2026 11:07
* Add KVStore, ready-check mechanism, and ServiceLauncher

- BasicKVStore (mmap on /dev/shm) for cross-process metric reads
- Ready-check protocol for subprocess startup synchronization
- ServiceLauncher for managing subprocess lifecycle
- Refactored metrics_table to remove duplicated code

* Address PR comments

* Move MetricsAggregator to use Enums instead of strings to track metrics

* Minor comments, fixes. Restrict filemode to 600 instead of 666

* fix: address PR #215 review feedback - crash detection bug, _grow() capacity update ordering, document ARM workaround

* Add guard to avoid no-op flush() syscall on x86

* Pass in KVStore to triggers to avoid injection during table.add_trigger

* Fix timeout to be across all processes, not per process
* Add report building

* Cleanup of PR comments, typing fix for KVStore to explicitly specify uint64 or float64 for precision purposes

* Fix zip() usage, remove overly strict guard for TPS value

* Move report_builder.py to be a classmethod in Report. Use 'lower' instead of linear interpolation for numpy percentile
* Add design doc for async load-generator

* feat: add async load strategies, sample ordering, and delay functions (Phase 1)

* feat: async BenchmarkSession with multi-phase support, PhaseIssuer, and stale completion filtering (Phase 2)

* feat: rewrite HttpClientSampleIssuer for async session, add HTTPEndpointClient.create() factory (Phase 3)

* refactor: remove legacy load generator (Sample, Scheduler, SampleEventHandler) and update references

* Fix test_end_to_end_oracle.py to use new loadgen

* Integrate pubsub into execute.py

* Fix integration with accuracy tests and Scorer

* PR fixes, add full e2e accuracy integration test

* Fix for sockets in some docker contexts

* Fix race condition bug where workers were closing+reopening sockets after timeout errors which could cause non-existent socket errors

* Remove hardcoded 1-min drain timeout

* Remove lazy imports, fix bug with accuracy datasets discarding preprocessing tag, which may cause overwrites

* Resolve report at end of session rather than end of phase

* Fix name mismatch in metrics KVStore using str(enum) rather than enum.value

* Add report.txt detailed report back in

* Show error if tmpfs is used on ARM systems. Remove StreamChunk(is_complete=True) codepaths

* Remove unused is_complete flag from StreamChunk

* PR Comments - documentation fixes, ZMQ context scoping

* Remove lazy imports from tests
@nv-alicheng nv-alicheng requested a review from a team April 14, 2026 02:07
@github-actions
Copy link
Copy Markdown

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@github-actions github-actions bot requested review from arekay-nv and nvzhihanj April 14, 2026 02:07
loop: asyncio.AbstractEventLoop | None,
streaming: bool,
) -> None:
for key in MetricCounterKey:
# ---------------------------------------------------------------------------


class FakeDataset(Dataset):
recv (async ZMQ recv), shutdown.
"""

def issue(self, query: Query) -> None: ...
"""

def issue(self, query: Query) -> None: ...
async def recv(self) -> QueryResult | StreamChunk | None: ...

def issue(self, query: Query) -> None: ...
async def recv(self) -> QueryResult | StreamChunk | None: ...
def shutdown(self) -> None: ...
class EventPublisher(Protocol):
"""Publishes EventRecords to the metrics pipeline."""

def publish(self, event_record: EventRecord) -> None: ...

def issue(self, sample_index: int) -> str | None:
"""Issue a sample. Returns query_id, or None if the session is stopping."""
...

async def execute(self, phase_issuer: PhaseIssuerProtocol) -> int:
"""Drive sample issuance. Returns count of samples issued."""
...

Used by ConcurrencyStrategy to release semaphore slots.
"""
...

injector = asyncio.create_task(inject_streaming_responses())
result = await asyncio.wait_for(session.run(phases), timeout=5.0)
await injector
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request transitions the load generator to an asynchronous architecture and replaces the SQLite-based metrics pipeline with a high-performance, mmap-backed key-value store. It introduces a new BenchmarkSession for phase orchestration and optimized async strategies for Poisson, burst, and concurrency load patterns. The review feedback identifies a potential race condition caused by premature temporary directory deletion in the scoring artifact logic, suggests using generic placeholders for endpoint URLs to improve configuration portability, and recommends dynamically registering metric keys using enums to enhance code maintainability and prevent silent failures.

_salvage_tmpfs(ctx.report_dir, tmpfs_dir)

# Clean up tmpfs
shutil.rmtree(tmpfs_dir, ignore_errors=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This shutil.rmtree call prematurely deletes the temporary directory. The main run_benchmark function has a finally block that is responsible for cleaning up this directory after salvaging artifacts on exit or interrupt. Removing the directory here can cause race conditions or errors in the main cleanup logic. All cleanup should be consolidated in one place, preferably the finally block in run_benchmark.

Comment thread configs/gptoss_test.yaml

endpoint_config:
endpoints:
- "http://umbriel-b200-145.nvidia.com:8000"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The endpoint URL http://umbriel-b200-145.nvidia.com:8000 appears to be a hardcoded internal hostname. It's better to use a generic placeholder like http://localhost:8000 or an environment variable to make the configuration more portable and avoid exposing internal infrastructure details.

    - "http://localhost:8000"

Comment on lines +371 to +395
def _setup_kv_reader(
metrics_dir: Path,
streaming: bool,
) -> BasicKVStoreReader:
"""Create a KVStoreReader pre-registered with all metric keys."""
reader = BasicKVStoreReader(metrics_dir)
# Counter keys (from MetricCounterKey enum)
for key in [
"total_samples_issued",
"total_samples_completed",
"total_samples_failed",
"tracked_samples_issued",
"tracked_samples_completed",
"tracked_duration_ns",
"total_duration_ns",
]:
reader.register_key(key, "counter")
# Series keys (from MetricSeriesKey enum)
for key in ["isl", "osl", "sample_latency_ns"]:
reader.register_key(key, "series")
reader.register_key("tpot_ns", "series", dtype=float)
if streaming:
for key in ["ttft_ns", "chunk_delta_ns"]:
reader.register_key(key, "series")
return reader
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function _setup_kv_reader uses hardcoded strings for metric keys (e.g., "tracked_samples_issued"). This is brittle; if the key names in the MetricCounterKey or MetricSeriesKey enums change, this code will silently fail to register the keys, leading to missing metrics. It's better to iterate over the enums to register the keys dynamically. This ensures that the reader is always in sync with the keys defined in the aggregator.

For example:

from inference_endpoint.async_utils.services.metrics_aggregator.aggregator import MetricCounterKey
from inference_endpoint.async_utils.services.metrics_aggregator.metrics_table import MetricSeriesKey

def _setup_kv_reader(
    metrics_dir: Path,
    streaming: bool,
) -> BasicKVStoreReader:
    """Create a KVStoreReader pre-registered with all metric keys."""
    reader = BasicKVStoreReader(metrics_dir)
    for key in MetricCounterKey:
        reader.register_key(key.value, "counter")

    series_keys = {
        MetricSeriesKey.ISL: int,
        MetricSeriesKey.OSL: int,
        MetricSeriesKey.SAMPLE_LATENCY_NS: int,
        MetricSeriesKey.TPOT_NS: float,
    }
    if streaming:
        series_keys[MetricSeriesKey.TTFT_NS] = int
        series_keys[MetricSeriesKey.CHUNK_DELTA_NS] = int

    for key, dtype in series_keys.items():
        reader.register_key(key.value, "series", dtype=dtype)

    return reader

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant