Skip to content

Add batched publishing to pubsub#281

Open
nv-alicheng wants to merge 4 commits intofeat/alicheng-pubsub-integrationfrom
feat/alicheng-batched-publisher
Open

Add batched publishing to pubsub#281
nv-alicheng wants to merge 4 commits intofeat/alicheng-pubsub-integrationfrom
feat/alicheng-batched-publisher

Conversation

@nv-alicheng
Copy link
Copy Markdown
Collaborator

What does this PR do?

Adds a 'send_threshold' to publish EventRecords as a batch to reduce the number of syscalls being invoked by ZMQ.

A large bottleneck was found on ARM when testing via the max throughput test server - msync cannot keep up with the batches. As such, the msync call has been removed for now:

Implications:

  • Live metrics cannot be added before KVStore / MetricsAggregator is redesigned (i.e. AIPerf style or Prometheus metrics)
  • Higher max throughput: 30k -> 50k.
  • Batching becomes a new 'subscription' topic, so subscribers must subscribe to it - since Publisher does not know what the subscribers are subscribing to, unnecessary records for other topics may appear in the batch. Currently, both major subscriber service subscribe to all topics, so this is not an issue.

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@nv-alicheng nv-alicheng requested a review from a team as a code owner April 14, 2026 01:44
@github-actions
Copy link
Copy Markdown

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@github-actions github-actions bot requested review from arekay-nv and nvzhihanj April 14, 2026 01:44
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request transitions the load generator to a fully asynchronous architecture and introduces a batched ZMQ event publishing system to significantly improve throughput. Key architectural changes include the implementation of a multi-phase BenchmarkSession and specialized LoadStrategy protocols. Feedback highlights a potential collision risk with the BATCH_TOPIC prefix and notes that the batching mechanism bypasses native ZMQ topic filtering, which breaks the contract for subscribers with specific filters. Additionally, a concern was raised regarding the removal of the timeout during service cleanup, which could lead to indefinite hangs if a subprocess fails to exit gracefully.

equal to the length of the longest topic string.
"""

BATCH_TOPIC: Final[bytes] = b"batch".ljust(TOPIC_FRAME_SIZE, b"\0")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using b"batch" as the topic prefix for batched messages can collide with any event category or type named "batch". Since ZMQ topics are simple byte prefixes, a single record of type "batch" will be incorrectly identified as a batch message by the subscriber, leading to decode failures and dropped records. Consider using a prefix that cannot be a valid event type string, such as one starting with a null byte.

Suggested change
BATCH_TOPIC: Final[bytes] = b"batch".ljust(TOPIC_FRAME_SIZE, b"\0")
BATCH_TOPIC: Final[bytes] = b"\0batch".ljust(TOPIC_FRAME_SIZE, b"\0")

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a hallucination to me. There is no such 'batch' topic, this is intentional.

Comment on lines +241 to +242
# Always subscribe to batch topic so batched messages are received
self._socket.setsockopt(zmq.SUBSCRIBE, BATCH_TOPIC)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Batching bypasses ZMQ's native topic filtering. Subscribers always subscribe to BATCH_TOPIC and will receive all records contained in a batch, regardless of their specific topic filters. This breaks the contract of the topics parameter and can lead to significant unnecessary processing overhead for subscribers with narrow filters. If this is an intentional design trade-off for performance, it should be more prominently documented as a limitation of the filtering system.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional and is documented as a limitation as a performance tradeoff.

publisher.close()
await asyncio.to_thread(launcher.wait_for_exit, 10.0)
logger.info("Waiting for services to finish processing...")
await asyncio.to_thread(launcher.wait_for_exit, None)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Changing the timeout from 10.0 to None in launcher.wait_for_exit introduces a risk of the benchmark process hanging indefinitely if a service subprocess fails to exit gracefully. It is safer to maintain a reasonable timeout to ensure the main process can always terminate.

Suggested change
await asyncio.to_thread(launcher.wait_for_exit, None)
await asyncio.to_thread(launcher.wait_for_exit, 10.0)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intentional since on ARM, the MetricsAggregator service can take a while to catch up. After removing the msync call, it no longer seems to be the case. Maybe we can add in a 60s timeout?

Comment thread docs/load_generator/DESIGN.md
Comment thread review-council-output.md
@@ -1,104 +1,65 @@
# Review Council — PR "Make Loadgen Async" (Round 4)
# Review Council — Batched ZMQ Publisher
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this file? Doesn't seem it should be part of commit

Comment on lines +692 to +694
## Benchmark Data Summary

From `.cursor_artifacts/async_lg_benchmarks/` (MaxThroughputServer + real HTTPEndpointClient):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely obsolete

Copy link
Copy Markdown
Collaborator

@nvzhihanj nvzhihanj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Claude (Codex unavailable) | Depth: thorough

Found 11 issues across 4 files.

10 existing inline comments were already present. Overlapping issues excluded.

if self._batch_buffer:
self._flush_batch()

def _flush_batch(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] high (data-integrity): _flush_batch() clears buffer before send — data lost on ZMQ error. The buffer is swapped out before _send_frame() runs:

buf = self._batch_buffer
self._batch_buffer = []          # cleared here
# ...
self._send_frame(...)            # if this raises, buf is gone

zmq.Again is caught in _drain_pending, but zmq.ZMQError (e.g., ETERM when context is terminated) can escape the handler. Since buf goes out of scope, the records are unrecoverable. Consider re-queuing buf into _batch_buffer on exception, or only clearing after successful send.

(event hooks feeding the metrics recorder).

## Responsibilities
class BenchmarkSession:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] high (bug): BenchmarkSession.__init__ signature is wrong in 3 ways vs actual code (session.py:224):

# Doc (here):
def __init__(self, issuer, event_publisher: EventRecordPublisher, on_sample_complete: Callable[[QueryResult], None] | None = None)

# Actual:
def __init__(self, issuer, event_publisher: EventPublisher, loop: asyncio.AbstractEventLoop,
             on_sample_complete: Callable[[QueryResult | StreamChunk], None] | None = None,
             session_id: str | None = None)
  1. Missing required loop param — callers following this doc get TypeError
  2. EventRecordPublisher → actual is EventPublisher (different protocol)
  3. Callable[[QueryResult], None] → actual is Callable[[QueryResult | StreamChunk], None]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid, seems like the doc change is lagging behind

### SessionResult

```python
@dataclass(frozen=True, slots=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] high (bug): PhaseResult doc doesn't match session.py:76:

  1. Doc: @dataclass(frozen=True, slots=True) → Actual: @dataclass(frozen=True) (no slots=True)
  2. Doc includes report: Report | Nonethis field does not exist in the actual struct
  3. Doc omits issued_count: int which IS in the actual struct

The phantom report field makes the code example at line 1069 (pr.report.qps()) actively misleading — this would raise AttributeError at runtime.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems valid:

I assume the all doc changes need to be reworked on

```python
class SampleIssuer(Protocol):
def issue(self, query: Query) -> None: ...
def poll(self) -> QueryResult | StreamChunk | None: ...
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] medium (bug): SampleIssuer doc includes poll() — not in actual code. The actual SampleIssuer Protocol (session.py:110-119) has:

class SampleIssuer(Protocol):
    def issue(self, query: Query) -> None: ...
    async def recv(self) -> QueryResult | StreamChunk | None: ...
    def shutdown(self) -> None: ...

No poll() method. The doc below says "Uses recv() exclusively — no poll() spin" (line 260) which contradicts having poll() in the protocol.

client = await HTTPEndpointClient.create(ctx.http_config, loop)
issuer = HttpClientSampleIssuer(client)
zmq_ctx = ManagedZMQContext()
publisher = ZmqEventRecordPublisher(pub_socket_name, zmq_ctx, loop=loop)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] medium (bug): CLI Integration code is stale — contradicts this PR's own changes. The snippet shows:

publisher = ZmqEventRecordPublisher(pub_socket_name, zmq_ctx, loop=loop)

But execute.py:417 (changed in this PR) now uses:

publisher = EventPublisherService(zmq_ctx)

Also, run_benchmark_async is documented as returning tuple[SessionResult, ResponseCollector] but the actual return type is BenchmarkResult (a dataclass wrapping session, collector, report).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a rebase issue?

end_time_ns: int


@dataclass(frozen=True, slots=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] low (bug): Doc: @dataclass(frozen=True, slots=True) → Actual (session.py:87): @dataclass(frozen=True). Same discrepancy as PhaseResult above.


**`_receive_responses()`** — concurrent coroutine, purely async:

```python
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] low (bug): Pseudocode doesn't match session.py:366-381:

  • Doc: while True: with if self._done and self._inflight <= 0: break
  • Actual: while not self._done: — no inflight check; task is cancelled externally by run()
  • Doc: self.issuer → actual: self._issuer

The shutdown contract is fundamentally different: the doc implies self-termination on drain, but the actual task is externally cancelled.

except Exception as e:
logger.warning(f"Client cleanup error: {e}")
logger.info(
"Closing publisher (buffer=%d, pending=%d)...",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] low (design): Accessing private attributes of the publisher in a log statement:

len(publisher._batch_buffer),   # private attr
len(publisher._pending),        # private attr

These are internal to ZmqEventRecordPublisher and not part of the EventPublisherService public interface. If renamed, this silently raises AttributeError at runtime. Consider adding a publisher.stats or publisher.buffered_count property.

raise NotImplementedError("Subclasses must implement this method.")

@abstractmethod
def flush(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] low (api-contract): Inconsistent abstract method pattern. send() (line 279) and close() (line 292) use raise NotImplementedError(...), but flush() uses pass. The docstring says "No-op for unbuffered implementations" — if that's the intent, consider making it a non-abstract method with pass body, so unbuffered subclasses don't need to override it.

**Timestamp fidelity:**

### `SampleIssuer` (abstract base class — implemented externally)
- ISSUED: `monotonic_ns()` taken immediately before `issuer.issue()`. The ZMQ push is
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] low (bug): Timestamp fidelity claim is stale under new batching. The doc says "The ZMQ push is sync and non-blocking" — but with batched publishing, publish() now just buffers the payload. The actual ZMQ send is deferred until threshold or explicit flush(). The ISSUED timestamp may not correlate with when the record actually enters the transport.

@nvzhihanj
Copy link
Copy Markdown
Collaborator

Review Council — Multi-AI Code Review

Reviewed by: Claude (Codex unavailable) | Depth: thorough

Found 11 issues across 4 files. All line numbers verified against source files at HEAD.

10 existing inline comments from prior reviews were already present. Overlapping issues excluded.

🔴 Must Fix (high) — 3 issues

Issues that will actively mislead readers or risk data loss.

# File Line Category Summary
1 pubsub.py 113 data-integrity _flush_batch() clears buffer before _send_frame() — if ZMQ raises (e.g., ETERM), buffered records are permanently lost
2 DESIGN.md 127 doc-accuracy BenchmarkSession.__init__ missing required loop param, wrong EventRecordPublisher type (actual: EventPublisher), wrong callback type
3 DESIGN.md 460 doc-accuracy PhaseResult has phantom report: Report | None field that doesn't exist, missing issued_count, wrong decorator — code example at line 1069 would raise AttributeError

🟡 Should Fix (medium) — 3 issues

Real inaccuracies or design issues.

# File Line Category Summary
4 DESIGN.md 440 doc-accuracy SampleIssuer protocol includes poll() method that doesn't exist in actual code
5 DESIGN.md 925 doc-accuracy CLI Integration snippet uses old ZmqEventRecordPublisher — this PR changed it to EventPublisherService; return type also wrong
6 DESIGN.md 982 doc-accuracy ResponseCollector.on_complete → actual: on_complete_hook; missing count attr; wrong guard condition

🔵 Consider (low) — 5 issues

Valid improvements, could be follow-ups.

# File Line Category Summary
7 DESIGN.md 471 doc-accuracy SessionResult has slots=True in doc but not in code
8 DESIGN.md 250 doc-accuracy _receive_responses pseudocode differs from actual — wrong loop condition and shutdown model
9 execute.py 537 design Direct access to private publisher._batch_buffer and ._pending attrs
10 protocol.py 282 api-contract flush() uses pass body while send()/close() use raise NotImplementedError
11 DESIGN.md 280 doc-accuracy ISSUED timestamp claim stale under batching — publish() now buffers, send is deferred

Testing gaps (not posted inline — lines outside diff)

  • No test for threshold-triggered auto-flush (threshold=1000 path in pubsub.py:101 is untested)
  • No test for ZmqEventRecordSubscriber._buffer deque drain — subsequent receive() calls after batch decode are untested

🤖 Generated with Claude Code — Review Council

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Minor cleanup/clarifications. Please update with the perf table.

+-- STARTED
+-- [saturation] strategy.execute() → NO drain (keep in-flight saturated)
+-- [perf phase 1] START_PERFORMANCE_TRACKING → strategy.execute() → drain → STOP_PERFORMANCE_TRACKING → snapshot report
+-- [saturation] strategy.execute() → drain
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No drain for the second perf phase?

"""Phase types control tracking and reporting behavior."""
PERFORMANCE = "performance" # Tracked, produces a report
ACCURACY = "accuracy" # Untracked, for eval scoring
SATURATION = "saturation" # Untracked, ramp up concurrency before perf phase
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just rename this to WARMUP to avoid any confusion? Seems like there will be a lot of questions around difference between warmup and saturation so why not use the established term?


## Component Map
1. Publish `SessionEventType.STARTED`
2. Start receiver coroutine (`_receive_responses`)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this constrain us to have a single response handler for all phases? One option can be to create a wrapper for each phase - that way, the response handler, dataset, runtime setting, sample order and load strategy can be grouped together.


```python
@dataclass(frozen=True, slots=True)
class PhaseResult:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include the dataset as well - might be a good idea if we want to add more details such as sha of the dataset for checking.

extra_eager: If True, publish() blocks until the message is sent.
Useful for testing or when EventRecords are used as a
synchronization mechanism (e.g., ENDED as a stop signal).
isolated_event_loop: If True, runs on a separate event loop thread.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc for send_threshold

Comment thread review-council-output.md
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray file?

E->>M: ZMQ PUB (EventRecord)
B->>I: issue(query)
I->>W: ZMQ PUSH (Query)
W->>W: HTTP request → endpoint
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the non-first chunk response event. Might be tricky to add, but can add a note that it is missing from the diagram.

Comment on lines +725 to +738
## Removed Constructs

| Removed | Reason |
| ----------------------------------------------- | --------------------------------------------- |
| `Sample` class | Replaced by `Query` (frozen `msgspec.Struct`) |
| `Sample.__setattr__` hack | UUID generated before `Query` construction |
| `SampleEventHandler` singleton | Events via `EventPublisher` ZMQ PUB/SUB |
| `IssuedSample` dataclass | `uuid_to_index` dict on session is sufficient |
| `Scheduler` class hierarchy | Replaced by `LoadStrategy` + factory function |
| `LoadGenerator` / `SchedulerBasedLoadGenerator` | Replaced by `LoadStrategy` |
| `threading.Thread` in `BenchmarkSession` | Fully async |
| `threading.Condition` in `ConcurrencyScheduler` | `asyncio.Semaphore` |
| `HttpClientSampleIssuer._handle_responses` | Session owns the receive loop |

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Comment on lines +741 to +752
## File Structure

```
src/inference_endpoint/load_generator/
├── __init__.py # Public exports
├── session.py # BenchmarkSession, SessionResult
├── strategy.py # LoadStrategy protocol, TimedIssueStrategy,
│ # BurstStrategy, ConcurrencyStrategy,
│ # create_load_strategy()
├── sample_order.py # SampleOrder, WithoutReplacement, WithReplacement
└── delay.py # poisson_delay_fn, uniform_delay_fn
```
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants