Add batched publishing to pubsub#281
Add batched publishing to pubsub#281nv-alicheng wants to merge 4 commits intofeat/alicheng-pubsub-integrationfrom
Conversation
… in MetricsAggregator service
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request transitions the load generator to a fully asynchronous architecture and introduces a batched ZMQ event publishing system to significantly improve throughput. Key architectural changes include the implementation of a multi-phase BenchmarkSession and specialized LoadStrategy protocols. Feedback highlights a potential collision risk with the BATCH_TOPIC prefix and notes that the batching mechanism bypasses native ZMQ topic filtering, which breaks the contract for subscribers with specific filters. Additionally, a concern was raised regarding the removal of the timeout during service cleanup, which could lead to indefinite hangs if a subprocess fails to exit gracefully.
| equal to the length of the longest topic string. | ||
| """ | ||
|
|
||
| BATCH_TOPIC: Final[bytes] = b"batch".ljust(TOPIC_FRAME_SIZE, b"\0") |
There was a problem hiding this comment.
Using b"batch" as the topic prefix for batched messages can collide with any event category or type named "batch". Since ZMQ topics are simple byte prefixes, a single record of type "batch" will be incorrectly identified as a batch message by the subscriber, leading to decode failures and dropped records. Consider using a prefix that cannot be a valid event type string, such as one starting with a null byte.
| BATCH_TOPIC: Final[bytes] = b"batch".ljust(TOPIC_FRAME_SIZE, b"\0") | |
| BATCH_TOPIC: Final[bytes] = b"\0batch".ljust(TOPIC_FRAME_SIZE, b"\0") |
There was a problem hiding this comment.
This seems like a hallucination to me. There is no such 'batch' topic, this is intentional.
| # Always subscribe to batch topic so batched messages are received | ||
| self._socket.setsockopt(zmq.SUBSCRIBE, BATCH_TOPIC) |
There was a problem hiding this comment.
Batching bypasses ZMQ's native topic filtering. Subscribers always subscribe to BATCH_TOPIC and will receive all records contained in a batch, regardless of their specific topic filters. This breaks the contract of the topics parameter and can lead to significant unnecessary processing overhead for subscribers with narrow filters. If this is an intentional design trade-off for performance, it should be more prominently documented as a limitation of the filtering system.
There was a problem hiding this comment.
This is intentional and is documented as a limitation as a performance tradeoff.
| publisher.close() | ||
| await asyncio.to_thread(launcher.wait_for_exit, 10.0) | ||
| logger.info("Waiting for services to finish processing...") | ||
| await asyncio.to_thread(launcher.wait_for_exit, None) |
There was a problem hiding this comment.
Changing the timeout from 10.0 to None in launcher.wait_for_exit introduces a risk of the benchmark process hanging indefinitely if a service subprocess fails to exit gracefully. It is safer to maintain a reasonable timeout to ensure the main process can always terminate.
| await asyncio.to_thread(launcher.wait_for_exit, None) | |
| await asyncio.to_thread(launcher.wait_for_exit, 10.0) |
There was a problem hiding this comment.
This was intentional since on ARM, the MetricsAggregator service can take a while to catch up. After removing the msync call, it no longer seems to be the case. Maybe we can add in a 60s timeout?
| @@ -1,104 +1,65 @@ | |||
| # Review Council — PR "Make Loadgen Async" (Round 4) | |||
| # Review Council — Batched ZMQ Publisher | |||
There was a problem hiding this comment.
What's this file? Doesn't seem it should be part of commit
| ## Benchmark Data Summary | ||
|
|
||
| From `.cursor_artifacts/async_lg_benchmarks/` (MaxThroughputServer + real HTTPEndpointClient): |
nvzhihanj
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by: Claude (Codex unavailable) | Depth: thorough
Found 11 issues across 4 files.
10 existing inline comments were already present. Overlapping issues excluded.
| if self._batch_buffer: | ||
| self._flush_batch() | ||
|
|
||
| def _flush_batch(self) -> None: |
There was a problem hiding this comment.
[Claude] high (data-integrity): _flush_batch() clears buffer before send — data lost on ZMQ error. The buffer is swapped out before _send_frame() runs:
buf = self._batch_buffer
self._batch_buffer = [] # cleared here
# ...
self._send_frame(...) # if this raises, buf is gonezmq.Again is caught in _drain_pending, but zmq.ZMQError (e.g., ETERM when context is terminated) can escape the handler. Since buf goes out of scope, the records are unrecoverable. Consider re-queuing buf into _batch_buffer on exception, or only clearing after successful send.
| (event hooks feeding the metrics recorder). | ||
|
|
||
| ## Responsibilities | ||
| class BenchmarkSession: |
There was a problem hiding this comment.
[Claude] high (bug): BenchmarkSession.__init__ signature is wrong in 3 ways vs actual code (session.py:224):
# Doc (here):
def __init__(self, issuer, event_publisher: EventRecordPublisher, on_sample_complete: Callable[[QueryResult], None] | None = None)
# Actual:
def __init__(self, issuer, event_publisher: EventPublisher, loop: asyncio.AbstractEventLoop,
on_sample_complete: Callable[[QueryResult | StreamChunk], None] | None = None,
session_id: str | None = None)- Missing required
loopparam — callers following this doc getTypeError EventRecordPublisher→ actual isEventPublisher(different protocol)Callable[[QueryResult], None]→ actual isCallable[[QueryResult | StreamChunk], None]
There was a problem hiding this comment.
Valid, seems like the doc change is lagging behind
| ### SessionResult | ||
|
|
||
| ```python | ||
| @dataclass(frozen=True, slots=True) |
There was a problem hiding this comment.
[Claude] high (bug): PhaseResult doc doesn't match session.py:76:
- Doc:
@dataclass(frozen=True, slots=True)→ Actual:@dataclass(frozen=True)(noslots=True) - Doc includes
report: Report | None— this field does not exist in the actual struct - Doc omits
issued_count: intwhich IS in the actual struct
The phantom report field makes the code example at line 1069 (pr.report.qps()) actively misleading — this would raise AttributeError at runtime.
There was a problem hiding this comment.
Seems valid:
I assume the all doc changes need to be reworked on
| ```python | ||
| class SampleIssuer(Protocol): | ||
| def issue(self, query: Query) -> None: ... | ||
| def poll(self) -> QueryResult | StreamChunk | None: ... |
There was a problem hiding this comment.
[Claude] medium (bug): SampleIssuer doc includes poll() — not in actual code. The actual SampleIssuer Protocol (session.py:110-119) has:
class SampleIssuer(Protocol):
def issue(self, query: Query) -> None: ...
async def recv(self) -> QueryResult | StreamChunk | None: ...
def shutdown(self) -> None: ...No poll() method. The doc below says "Uses recv() exclusively — no poll() spin" (line 260) which contradicts having poll() in the protocol.
| client = await HTTPEndpointClient.create(ctx.http_config, loop) | ||
| issuer = HttpClientSampleIssuer(client) | ||
| zmq_ctx = ManagedZMQContext() | ||
| publisher = ZmqEventRecordPublisher(pub_socket_name, zmq_ctx, loop=loop) |
There was a problem hiding this comment.
[Claude] medium (bug): CLI Integration code is stale — contradicts this PR's own changes. The snippet shows:
publisher = ZmqEventRecordPublisher(pub_socket_name, zmq_ctx, loop=loop)But execute.py:417 (changed in this PR) now uses:
publisher = EventPublisherService(zmq_ctx)Also, run_benchmark_async is documented as returning tuple[SessionResult, ResponseCollector] but the actual return type is BenchmarkResult (a dataclass wrapping session, collector, report).
There was a problem hiding this comment.
Is this a rebase issue?
| end_time_ns: int | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) |
There was a problem hiding this comment.
[Claude] low (bug): Doc: @dataclass(frozen=True, slots=True) → Actual (session.py:87): @dataclass(frozen=True). Same discrepancy as PhaseResult above.
|
|
||
| **`_receive_responses()`** — concurrent coroutine, purely async: | ||
|
|
||
| ```python |
There was a problem hiding this comment.
[Claude] low (bug): Pseudocode doesn't match session.py:366-381:
- Doc:
while True:withif self._done and self._inflight <= 0: break - Actual:
while not self._done:— no inflight check; task is cancelled externally byrun() - Doc:
self.issuer→ actual:self._issuer
The shutdown contract is fundamentally different: the doc implies self-termination on drain, but the actual task is externally cancelled.
| except Exception as e: | ||
| logger.warning(f"Client cleanup error: {e}") | ||
| logger.info( | ||
| "Closing publisher (buffer=%d, pending=%d)...", |
There was a problem hiding this comment.
[Claude] low (design): Accessing private attributes of the publisher in a log statement:
len(publisher._batch_buffer), # private attr
len(publisher._pending), # private attrThese are internal to ZmqEventRecordPublisher and not part of the EventPublisherService public interface. If renamed, this silently raises AttributeError at runtime. Consider adding a publisher.stats or publisher.buffered_count property.
| raise NotImplementedError("Subclasses must implement this method.") | ||
|
|
||
| @abstractmethod | ||
| def flush(self) -> None: |
There was a problem hiding this comment.
[Claude] low (api-contract): Inconsistent abstract method pattern. send() (line 279) and close() (line 292) use raise NotImplementedError(...), but flush() uses pass. The docstring says "No-op for unbuffered implementations" — if that's the intent, consider making it a non-abstract method with pass body, so unbuffered subclasses don't need to override it.
| **Timestamp fidelity:** | ||
|
|
||
| ### `SampleIssuer` (abstract base class — implemented externally) | ||
| - ISSUED: `monotonic_ns()` taken immediately before `issuer.issue()`. The ZMQ push is |
There was a problem hiding this comment.
[Claude] low (bug): Timestamp fidelity claim is stale under new batching. The doc says "The ZMQ push is sync and non-blocking" — but with batched publishing, publish() now just buffers the payload. The actual ZMQ send is deferred until threshold or explicit flush(). The ISSUED timestamp may not correlate with when the record actually enters the transport.
Review Council — Multi-AI Code ReviewReviewed by: Claude (Codex unavailable) | Depth: thorough Found 11 issues across 4 files. All line numbers verified against source files at HEAD. 10 existing inline comments from prior reviews were already present. Overlapping issues excluded. 🔴 Must Fix (high) — 3 issuesIssues that will actively mislead readers or risk data loss.
🟡 Should Fix (medium) — 3 issuesReal inaccuracies or design issues.
🔵 Consider (low) — 5 issuesValid improvements, could be follow-ups.
Testing gaps (not posted inline — lines outside diff)
🤖 Generated with Claude Code — Review Council |
arekay-nv
left a comment
There was a problem hiding this comment.
Thanks. Minor cleanup/clarifications. Please update with the perf table.
| +-- STARTED | ||
| +-- [saturation] strategy.execute() → NO drain (keep in-flight saturated) | ||
| +-- [perf phase 1] START_PERFORMANCE_TRACKING → strategy.execute() → drain → STOP_PERFORMANCE_TRACKING → snapshot report | ||
| +-- [saturation] strategy.execute() → drain |
There was a problem hiding this comment.
No drain for the second perf phase?
| """Phase types control tracking and reporting behavior.""" | ||
| PERFORMANCE = "performance" # Tracked, produces a report | ||
| ACCURACY = "accuracy" # Untracked, for eval scoring | ||
| SATURATION = "saturation" # Untracked, ramp up concurrency before perf phase |
There was a problem hiding this comment.
Should we just rename this to WARMUP to avoid any confusion? Seems like there will be a lot of questions around difference between warmup and saturation so why not use the established term?
|
|
||
| ## Component Map | ||
| 1. Publish `SessionEventType.STARTED` | ||
| 2. Start receiver coroutine (`_receive_responses`) |
There was a problem hiding this comment.
Does this constrain us to have a single response handler for all phases? One option can be to create a wrapper for each phase - that way, the response handler, dataset, runtime setting, sample order and load strategy can be grouped together.
|
|
||
| ```python | ||
| @dataclass(frozen=True, slots=True) | ||
| class PhaseResult: |
There was a problem hiding this comment.
Should this include the dataset as well - might be a good idea if we want to add more details such as sha of the dataset for checking.
| extra_eager: If True, publish() blocks until the message is sent. | ||
| Useful for testing or when EventRecords are used as a | ||
| synchronization mechanism (e.g., ENDED as a stop signal). | ||
| isolated_event_loop: If True, runs on a separate event loop thread. |
There was a problem hiding this comment.
missing doc for send_threshold
| E->>M: ZMQ PUB (EventRecord) | ||
| B->>I: issue(query) | ||
| I->>W: ZMQ PUSH (Query) | ||
| W->>W: HTTP request → endpoint |
There was a problem hiding this comment.
Missing the non-first chunk response event. Might be tricky to add, but can add a note that it is missing from the diagram.
| ## Removed Constructs | ||
|
|
||
| | Removed | Reason | | ||
| | ----------------------------------------------- | --------------------------------------------- | | ||
| | `Sample` class | Replaced by `Query` (frozen `msgspec.Struct`) | | ||
| | `Sample.__setattr__` hack | UUID generated before `Query` construction | | ||
| | `SampleEventHandler` singleton | Events via `EventPublisher` ZMQ PUB/SUB | | ||
| | `IssuedSample` dataclass | `uuid_to_index` dict on session is sufficient | | ||
| | `Scheduler` class hierarchy | Replaced by `LoadStrategy` + factory function | | ||
| | `LoadGenerator` / `SchedulerBasedLoadGenerator` | Replaced by `LoadStrategy` | | ||
| | `threading.Thread` in `BenchmarkSession` | Fully async | | ||
| | `threading.Condition` in `ConcurrencyScheduler` | `asyncio.Semaphore` | | ||
| | `HttpClientSampleIssuer._handle_responses` | Session owns the receive loop | | ||
|
|
| ## File Structure | ||
|
|
||
| ``` | ||
| src/inference_endpoint/load_generator/ | ||
| ├── __init__.py # Public exports | ||
| ├── session.py # BenchmarkSession, SessionResult | ||
| ├── strategy.py # LoadStrategy protocol, TimedIssueStrategy, | ||
| │ # BurstStrategy, ConcurrencyStrategy, | ||
| │ # create_load_strategy() | ||
| ├── sample_order.py # SampleOrder, WithoutReplacement, WithReplacement | ||
| └── delay.py # poisson_delay_fn, uniform_delay_fn | ||
| ``` |
What does this PR do?
Adds a 'send_threshold' to publish EventRecords as a batch to reduce the number of syscalls being invoked by ZMQ.
A large bottleneck was found on ARM when testing via the max throughput test server - msync cannot keep up with the batches. As such, the
msynccall has been removed for now:Implications:
Type of change
Related issues
Testing
Checklist