Skip to content

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215

Merged
griffinmilsap merged 29 commits intodevfrom
feature/sync-lowlevel-api
Feb 26, 2026
Merged

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215
griffinmilsap merged 29 commits intodevfrom
feature/sync-lowlevel-api

Conversation

@griffinmilsap
Copy link
Collaborator

Summary

  • Adds a synchronous low‑level API (SyncContext, SyncPublisher, SyncSubscriber, init, spin, spin_once) so users can publish/subscribe without asyncio.
  • Adds ROS2‑style usage examples for both sync and async APIs.
  • Improves GraphServer auto‑start control via GraphContext(auto_start=...) and GraphService.ensure(auto_start=...).
  • Adds tests for the sync wrapper and a small perf script to quantify overhead.

Motivation

Many users find asyncio intimidating; the goal is to let them use ezmsg with a simple, synchronous API similar to ROS2 (with ez.sync.init(...), spin(), spin_once()), while preserving ezmsg’s backpressure semantics and zero‑copy safety.
This wrapper is explicitly for ergonomics, not peak throughput.

Implementation Details

New Sync API (src/ezmsg/core/sync.py)

  • SyncContext wraps GraphContext and runs an asyncio event loop in a background thread using new_threaded_event_loop.
  • create_publisher / create_subscription call the underlying async API via asyncio.run_coroutine_threadsafe.
  • spin() / spin_once() pull messages directly via recv_zero_copy() and only release backpressure after the user callback returns.
    • This preserves backpressure and avoids the prior “queueing” behavior that could release SHM‑backed messages too early.
  • spin_once() returns a boolean for “did work”.
  • Handles CacheMiss gracefully when a publisher exits under backpressure (stale notifications).

GraphServer Auto‑Start Control

  • GraphService.ensure(auto_start: bool | None = None)
    • None preserves existing “auto‑start only when address is not specified + no env override.”
    • True / False overrides.
  • GraphContext(..., auto_start=...) passes through to GraphService.ensure.
  • ez.sync.init(..., auto_start=...) mirrors GraphContext defaulting to None.

Examples

  • examples/simple_publisher.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        pub = ctx.create_publisher(TOPIC, force_tcp=True)

        print("Publisher Task Launched")
        count = 0
        try:
            while True:
                output = f"{count=}"
                pub.publish(output)
                print(output)
                time.sleep(0.1)
                count += 1
        except KeyboardInterrupt:
            pass
        print("Publisher Task Concluded")

    print("Done")
 
if __name__ == '__main__':
    main()
  • examples/simple_subscriber.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        print("Subscriber Task Launched")

        def on_message(msg: str) -> None:
            # Uncomment if you want to witness backpressure!
            # time.sleep(1.0)
            print(msg)

        ctx.create_subscription(TOPIC, callback=on_message)
        ez.sync.spin(ctx)

    print("Subscriber Task Concluded")

if __name__ == '__main__':
    main()
  • examples/simple_async_publisher.py
  • examples/simple_async_subscriber.py

Tests

  • tests/test_sync_api.py

Perf Script

  • tests/perf_sync_overhead.py

Threads and Concurrency Model

  • The sync wrapper runs one background asyncio loop thread for all async work.
  • The calling thread runs user code (spin, spin_once, callbacks).
  • Every publish() / recv() call crosses threads via run_coroutine_threadsafe.
    • This is safe and preserves backpressure, but introduces measurable overhead.

Performance

Measured using tests/perf_sync_overhead.py (local macOS example):

async: 0.0520s total, 5.20 us/msg, 192,203 msg/s
sync : 1.1506s total, 115.06 us/msg, 8,691 msg/s
overhead: 2111.5%

Interpretation:

  • Sync wrapper is significantly slower for micro‑messages due to per‑message thread hops.
  • The sync API is intended for ergonomics, not maximum throughput.
  • For high‑rate paths, the async API remains recommended.

Files Changed / Added

  • Added src/ezmsg/core/sync.py
  • Updated src/ezmsg/core/__init__.py
  • Updated src/ezmsg/core/graphcontext.py
  • Updated src/ezmsg/core/graphserver.py
  • Added examples/simple_publisher.py
  • Added examples/simple_subscriber.py
  • Added examples/simple_async_publisher.py
  • Added examples/simple_async_subscriber.py
  • Added tests/test_sync_api.py
  • Added tests/perf_sync_overhead.py
  • Removed examples/lowlevel.py (replaced by split examples)

Copy link
Contributor

@KonradPilch KonradPilch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great addition Griffin.

I couldn't see anything I'd change. I've run the copilot check just in case.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a synchronous “ROS2-style” low-level API layer on top of ezmsg’s async pub/sub, plus examples/tests, and introduces explicit control over GraphServer auto-start behavior via auto_start.

Changes:

  • Introduces ezmsg.core.sync (SyncContext, SyncPublisher, SyncSubscriber, init, spin, spin_once) backed by a background asyncio loop thread.
  • Extends GraphServer bring-up behavior with GraphContext(auto_start=...) and GraphService.ensure(auto_start=...).
  • Adds split sync/async low-level examples, sync API tests, and a perf comparison script.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
src/ezmsg/core/sync.py New synchronous wrapper API around GraphContext/Publisher/Subscriber, including callback-based spinning.
src/ezmsg/core/graphserver.py Adds auto_start override to GraphService.ensure and logs GraphServer startup.
src/ezmsg/core/graphcontext.py Plumbs auto_start through GraphContext to GraphService.ensure.
src/ezmsg/core/__init__.py Exposes the new sync API/module from ezmsg.core.
examples/simple_publisher.py New sync publisher example using ez.sync.init.
examples/simple_subscriber.py New sync subscriber example using spin().
examples/simple_async_publisher.py New async publisher example (replacement for prior combined example).
examples/simple_async_subscriber.py New async subscriber example (replacement for prior combined example).
examples/lowlevel_api.py Removes the previous combined low-level example in favor of split examples.
tests/test_sync_api.py Adds tests for sync autostart/spin_once, backpressure behavior, and CacheMiss handling.
tests/perf_sync_overhead.py Adds a standalone script to measure sync-wrapper overhead vs async usage.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

KonradPilch and others added 21 commits February 17, 2026 20:42
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…alculate cur_pubs from self._channels.keys()
…ts-set-of-current-publishers-after-processing-update-preventing-disconnection-of-old-publishers

hot fix 217 - recalculate Subscriber's pub keys on demand
Deprecate `zero-copy` keyword argument in `@ez.subscriber`
…le True: try...except KeyboardInterrupt — the same pattern already used for the stop barrier. This ensures SIGINT can't skip context.revert(), which is what cancels the pub/sub internal tasks.
…t; add timeout for context revert second attempt.
Wrap shutdown_units().result() and context.revert().result() with whi…
@griffinmilsap griffinmilsap merged commit 7f6ca7b into dev Feb 26, 2026
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants