-
Notifications
You must be signed in to change notification settings - Fork 0
Document zero-copy semantics #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: low-level-api
Are you sure you want to change the base?
Changes from all commits
6ac5ee3
636fe2d
85e2241
c8a1925
cda905f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| Transport and Messaging Internals | ||
| ################################# | ||
|
|
||
| This page documents the runtime components and data-transport behavior used by `ezmsg`. These internals apply equally to the low-level and high-level APIs. | ||
|
|
||
| GraphServer, Publisher, Subscriber, Channel | ||
| =========================================== | ||
|
|
||
| - **GraphServer**: a lightweight TCP service that tracks the topic DAG, keeps a registry of publishers/subscribers, and notifies subscribers when their upstream publishers change. It also brokers shared-memory segment creation and attachment. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A few of the terms here like TCP and DAG might need explanation. My solution: link to a glossary for terms like this (otherwise it would only clutter the text). No need to do this here - for a future PR. |
||
| - **Publisher**: a client that registers a topic with the GraphServer, opens a channel server (TCP listener), and broadcasts messages. It owns shared-memory buffers and enforces backpressure so buffers are not reused until all subscribers have released a message. | ||
| - **Subscriber**: a client that registers a topic with the GraphServer, receives updates that list the upstream publisher IDs it should listen to, and maintains local channels for those publishers. | ||
| - **Channel**: a process-local "middle-man" for a single publisher. It receives message telemetry from the publisher (or direct local messages), caches the most recent messages, and notifies all local subscribers in that process. | ||
|
|
||
| Connection Lifecycle (Publisher -> GraphServer -> Subscriber -> Channel) | ||
| ======================================================================== | ||
|
|
||
| 1. A **Publisher** connects to the **GraphServer**, allocates shared-memory buffers, and registers its topic. It starts a small TCP server so Channels can connect back to it, then reports that server's address to the GraphServer. | ||
| 2. A **Subscriber** connects to the **GraphServer** and registers its topic. The GraphServer computes upstream publishers from the topic DAG and sends the Subscriber an `UPDATE` list of publisher IDs. | ||
| 3. For each publisher ID, the Subscriber asks the local **ChannelManager** to register a **Channel**. If a Channel does not yet exist in this process, it is created by: | ||
| 4. The **Channel** requests a channel allocation from the GraphServer for the given publisher ID. The GraphServer returns a new channel ID plus the publisher's server address. | ||
| 5. The **Channel** connects to the Publisher's channel server, receives the topic and shared-memory name, and attempts to attach to shared memory. It reports whether SHM attach succeeded plus its process ID. | ||
| 6. The **Publisher** completes the handshake by returning the configured number of buffers. The Channel is now ready to receive messages and notify local subscribers. | ||
| 7. When graph topology changes (connect/disconnect or new publishers), the GraphServer sends new `UPDATE` messages and Subscribers add or remove Channels as needed. | ||
|
|
||
| Transport Selection: Local, SHM, TCP | ||
| ==================================== | ||
|
|
||
| `ezmsg` uses the fastest transport available per Publisher/Channel pair: | ||
|
|
||
| - **Local transport (same process)**: the Publisher pushes the object directly into the Channel (`put_local`), and the Channel stores it in the `MessageCache` without serialization. This is the lowest-overhead path. | ||
| - **Shared memory (different process, SHM OK)**: the Publisher serializes the object using `MessageMarshal` (pickle protocol 5 with buffer support), writes it into a ring of shared-memory buffers, and notifies the Channel with a `TX_SHM` message. The Channel reads from shared memory using the message ID and caches the deserialized object. | ||
| - **TCP (fallback or forced)**: if SHM is unavailable (attach failed, remote host) or `force_tcp=True`, the Publisher sends a `TX_TCP` payload (header + serialized buffers) directly over the channel socket. The Channel deserializes it and caches the result. | ||
|
Comment on lines
+28
to
+32
|
||
|
|
||
| MessageCache and Deserialization Overhead | ||
| ========================================= | ||
|
|
||
| Every Channel maintains a fixed-size `MessageCache` (one slot per shared-memory buffer). This cache is the key to reducing deserialization overhead: | ||
|
|
||
| - For SHM and TCP, the Channel **deserializes each message once per process** and stores the object in the cache. | ||
| - All Subscribers in that process read the same cached object -- they do not deep copy. | ||
| - When all local Subscribers have released a message, the Channel frees the cache entry and acknowledges the Publisher so the buffer can be reused. | ||
|
|
||
| This design keeps serialization/deserialization out of the hot path for local delivery and prevents repeated deserialization when multiple Subscribers in the same process read the same message. | ||
|
|
||
| Publisher-Channel Buffers and Backpressure | ||
| ========================================== | ||
|
|
||
| Every Publisher (and thus every Channel) is configured with a fixed number of buffers (`num_buffers`, default 32). These buffers define a ring: | ||
|
|
||
| - Each message gets a monotonically increasing `msg_id`. | ||
| - The buffer index is `msg_id % num_buffers`. | ||
| - The Channel maintains a `MessageCache` with the same size, so each buffer index maps to one cache slot. | ||
|
|
||
| Backpressure is the mechanism that prevents a buffer slot from being overwritten while any subscriber still needs its message: | ||
|
|
||
| - The **Publisher** checks whether the next buffer index is free. If not, it waits until all leases for that buffer are released. | ||
| - When the **Channel** notifies local subscribers of a new message, it **leases** that buffer index for each subscriber. This records "who still needs this message." | ||
| - When a subscriber finishes reading the message (or drops it in leaky mode), the Channel **releases** that subscriber’s lease. | ||
| - Once all leases for that buffer index are released, the Channel clears the cache entry and acknowledges the Publisher so it can reuse the slot. | ||
|
|
||
| Backpressure works the same way for local, SHM, and TCP delivery. The transport only affects how the Channel receives the message bytes; buffer ownership and release are always enforced by the same lease/ack mechanism. | ||
|
|
||
| Zero-copy Semantics and Message Ownership | ||
| ========================================= | ||
|
|
||
| Publishers in `ezmsg` serialize messages to shared memory, and eventually into the process-local `MessageCache` owned by each Channel. That Channel-level cache is shared by all Subscribers attached to that Channel in the same process. Subscribers receive a "zero-copy" view of this message that is: | ||
|
|
||
| - The originally published object itself in the case local transport was used. | ||
| - Backed by Publisher-controlled shared memory if SHM transport was used. | ||
| - A shared/cached version of the deserialized object if TCP transport was used. | ||
|
|
||
| This results in very low messaging overhead and high messaging performance with **some important safety considerations.** | ||
|
|
||
| .. important:: Treat incoming messages as **immutable**. If you need to modify data or republish it, do **not** modify data in place without a very strong understanding of the implications. Generally, you should **create a new message or copy the payload first**. Do **not** republish the same object instance you received. | ||
|
|
||
| Why this matters (two concrete failure modes): | ||
|
|
||
| 1. **Mutating a locally cached message affects other subscribers.** Example: Publisher A and Subscribers B/C/D are in the same process. If Subscriber B mutates the message it received, Subscribers C and D will see those changes because they are reading the same cached object. | ||
| 2. **Republishing a shared-memory-backed message can corrupt downstream readers.** Example: Publisher A (Process X) publishes an `AxisArray` into shared memory, Subscriber B (Process Y) receives it, then republishes *the same object* via Publisher C to Subscriber D (also in Process Y). Because Publisher C and Subscriber D are local, the object is passed via the local cache. Meanwhile Publisher A sees the shared-memory buffer as free (because Subscriber B has acknowledged receipt and backpressure has been released) and overwrites it, so Subscriber D now observes mutated data. | ||
|
|
||
| In the low-level API, subscriber messages are received via `Subscriber.recv()` or `Subscriber.recv_zero_copy()`. `recv()` wraps `recv_zero_copy()` and returns a deep copy of the message; `recv_zero_copy()` yields the shared cached object. | ||
|
|
||
| **The high-level `ezmsg` API uses zero-copy semantics for subscriber delivery in all cases. This has been the effective behavior since 2023 (V3.2.0+) and is now the documented behavior.** This specifically applies to coroutines wrapped with the high-level `@ez.subscriber` decorator (the `message` you receive **is that shared object**). | ||
|
|
||
| .. note:: The `zero_copy` argument (default `False`) on `@ez.subscriber` previously controlled whether a deepcopy was performed before delivering the message. As detailed in `issue #209 <https://github.com/ezmsg-org/ezmsg/issues/209>`_, a backend change in early 2023 (V3.2.0+) resulted in this argument being ignored, so all coroutines decorated with `@ez.subscriber` receive zero-copy input messages. The maintainers have decided this is preferable behavior because it optimizes message throughput at the expense of code safety. In an upcoming version of `ezmsg`, the `zero_copy` keyword argument will be formally deprecated/retired since it no longer has any effect. If you would like to restore the safety previously guaranteed by `zero_copy=False`, add `message = deepcopy(message)` as the first line in your coroutine. | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is the right place for this (important) note. At some point in the near future, I think I will expand on the high-level design and move that there. I expect that I will additionally refactor that document to have a separate high-level API explanation (to mirror the low-level API document).