From daa7e7e3859b313d415530ed8ce8494afda88c7d Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:01:09 +0100 Subject: [PATCH 1/6] Add ByteChannel API for raw byte streaming ByteChannel provides raw byte streaming between Erlang and Python without term serialization overhead, suitable for HTTP bodies, file transfers, and binary protocols. Erlang API: - py_byte_channel:new/0,1 - Create channel with optional backpressure - py_byte_channel:send/2 - Send raw bytes - py_byte_channel:recv/1,2 - Blocking receive with optional timeout - py_byte_channel:try_receive/1 - Non-blocking receive - py_byte_channel:close/1 - Close channel Python API: - ByteChannel class with send_bytes, receive_bytes, try_receive_bytes - async_receive_bytes for asyncio compatibility - Sync and async iteration support Implementation reuses the existing py_channel_t infrastructure with new NIF functions that skip term_to_binary/binary_to_term conversion. --- CHANGELOG.md | 14 ++ c_src/py_callback.c | 152 ++++++++++++++ c_src/py_channel.c | 161 ++++++++++++++ c_src/py_channel.h | 28 +++ c_src/py_nif.c | 5 + docs/channel.md | 86 ++++++++ priv/_erlang_impl/__init__.py | 5 + priv/_erlang_impl/_byte_channel.py | 232 +++++++++++++++++++++ src/py_byte_channel.erl | 300 ++++++++++++++++++++++++++ src/py_nif.erl | 50 +++++ test/py_byte_channel_SUITE.erl | 324 +++++++++++++++++++++++++++++ 11 files changed, 1357 insertions(+) create mode 100644 priv/_erlang_impl/_byte_channel.py create mode 100644 src/py_byte_channel.erl create mode 100644 test/py_byte_channel_SUITE.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f292c..3cb0c0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,20 @@ ### Added +- **ByteChannel API** - Raw byte streaming channel without term serialization + - `py_byte_channel:new/0,1` - Create byte channel (with optional backpressure) + - `py_byte_channel:send/2` - Send raw bytes to Python + - `py_byte_channel:recv/1,2` - Blocking receive with optional timeout + - `py_byte_channel:try_receive/1` - Non-blocking receive + - Python `ByteChannel` class with: + - `send_bytes(data)` - Send bytes back to Erlang + - `receive_bytes()` - Blocking receive (GIL released) + - `try_receive_bytes()` - Non-blocking receive + - `async_receive_bytes()` - Asyncio-compatible async receive + - Sync and async iteration (`for chunk in ch`, `async for chunk in ch`) + - Reuses the same `py_channel_t` infrastructure but skips term encoding/decoding + - Suitable for HTTP bodies, file streaming, and binary protocols + - **Automatic Env Reuse for Event Loop Tasks** - Functions defined via `py:exec(Ctx, Code)` can now be called directly using `py_event_loop:run/3,4`, `create_task/3,4`, and `spawn_task/3,4` without manual env passing. The process-local environment is automatically detected and used diff --git a/c_src/py_callback.c b/c_src/py_callback.c index f593b01..36b619e 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2856,6 +2856,144 @@ static PyObject *erlang_channel_is_closed_impl(PyObject *self, PyObject *args) { } } +/* ============================================================================ + * ByteChannel Methods (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief ByteChannel try_receive_bytes - non-blocking, returns raw bytes + * + * Usage: erlang._byte_channel_try_receive_bytes(channel_ref) + * Returns: bytes if data available, None if empty + * Raises: RuntimeError if channel closed + */ +static PyObject *erlang_byte_channel_try_receive_bytes_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + unsigned char *data = NULL; + size_t size = 0; + + int result = channel_try_receive(channel, &data, &size); + + if (result == 0) { + /* Success - return raw bytes (NO term decoding) */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; /* May be NULL if allocation failed */ + } else if (result == 1) { + /* Empty */ + Py_RETURN_NONE; + } else { + /* Closed */ + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } +} + +/** + * @brief ByteChannel receive_bytes - blocking with GIL release, returns raw bytes + * + * Usage: erlang._byte_channel_receive_bytes(channel_ref, timeout_ms) + * Returns: bytes when data available + * Raises: RuntimeError if channel closed, TimeoutError if timeout + * + * This function releases the GIL while waiting, allowing other Python + * threads to run. Uses polling with short sleeps. + */ +static PyObject *erlang_byte_channel_receive_bytes_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + long timeout_ms = -1; /* -1 = infinite */ + + if (!PyArg_ParseTuple(args, "O|l", &capsule, &timeout_ms)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + unsigned char *data = NULL; + size_t size = 0; + int result; + + /* First try without blocking */ + result = channel_try_receive(channel, &data, &size); + if (result == 0) { + /* Got data - return raw bytes */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; + } else if (result == -1) { + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } + + /* Need to wait - release GIL and poll */ + { + long elapsed_us = 0; + const long poll_interval_us = 100; /* 100 microseconds */ + const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1; + + Py_BEGIN_ALLOW_THREADS + + while (1) { + result = channel_try_receive(channel, &data, &size); + if (result != 1) { + break; /* Got data or closed */ + } + + /* Check timeout */ + if (timeout_us >= 0 && elapsed_us >= timeout_us) { + result = 2; /* Timeout */ + break; + } + + /* Sleep briefly */ + usleep(poll_interval_us); + elapsed_us += poll_interval_us; + } + + Py_END_ALLOW_THREADS + } + + if (result == 2) { + PyErr_SetString(PyExc_TimeoutError, "channel receive timeout"); + return NULL; + } else if (result == -1) { + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } + + /* Return raw bytes (NO term decoding) */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; +} + /* Python method definitions for erlang module */ static PyMethodDef ErlangModuleMethods[] = { {"call", erlang_call_impl, METH_VARARGS, @@ -2929,6 +3067,15 @@ static PyMethodDef ErlangModuleMethods[] = { "Check if channel is closed.\n" "Usage: erlang._channel_is_closed(channel_ref)\n" "Returns: True if closed, False otherwise."}, + /* ByteChannel methods (raw bytes, no term conversion) */ + {"_byte_channel_try_receive_bytes", erlang_byte_channel_try_receive_bytes_impl, METH_VARARGS, + "ByteChannel receive (non-blocking, raw bytes).\n" + "Usage: erlang._byte_channel_try_receive_bytes(channel_ref)\n" + "Returns: bytes if data, None if empty. Raises RuntimeError if closed."}, + {"_byte_channel_receive_bytes", erlang_byte_channel_receive_bytes_impl, METH_VARARGS, + "ByteChannel receive (blocking with GIL release, raw bytes).\n" + "Usage: erlang._byte_channel_receive_bytes(channel_ref, timeout_ms=-1)\n" + "Returns: bytes. Raises RuntimeError if closed, TimeoutError if timeout."}, {NULL, NULL, 0, NULL} }; @@ -3388,10 +3535,15 @@ static int create_erlang_module(void) { " erlang.Channel = _erlang_impl.Channel\n" " erlang.ChannelClosed = _erlang_impl.ChannelClosed\n" " erlang.reply = _erlang_impl.reply\n" + " # ByteChannel for raw bytes streaming\n" + " erlang.byte_channel = _erlang_impl.byte_channel\n" + " erlang.ByteChannel = _erlang_impl.ByteChannel\n" + " erlang.ByteChannelClosed = _erlang_impl.ByteChannelClosed\n" " # Make erlang behave as a package for 'import erlang.reactor' syntax\n" " erlang.__path__ = [priv_dir]\n" " sys.modules['erlang.reactor'] = erlang.reactor\n" " sys.modules['erlang.channel'] = erlang.channel\n" + " sys.modules['erlang.byte_channel'] = erlang.byte_channel\n" " return True\n" " except ImportError as e:\n" " import sys\n" diff --git a/c_src/py_channel.c b/c_src/py_channel.c index c434d38..5c3707e 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -835,3 +835,164 @@ ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ER pthread_mutex_unlock(&channel->mutex); return ATOM_OK; } + +/* ============================================================================ + * ByteChannel NIF Functions (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief Send raw bytes to a channel (no term_to_binary) + * + * nif_byte_channel_send_bytes(ChannelRef, Binary) -> ok | busy | {error, closed} + * + * Sends raw binary data directly to the channel without term serialization. + * Used for ByteChannel API where raw byte streams are desired. + */ +ERL_NIF_TERM nif_byte_channel_send_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + ErlNifBinary bin; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + if (!enif_inspect_binary(env, argv[1], &bin)) { + return make_error(env, "invalid_binary"); + } + + int result = channel_send(channel, bin.data, bin.size); + + switch (result) { + case 0: + return ATOM_OK; + case 1: + return ATOM_BUSY; + default: + return enif_make_tuple2(env, ATOM_ERROR, ATOM_CLOSED); + } +} + +/** + * @brief Non-blocking receive raw bytes from a channel (no binary_to_term) + * + * nif_byte_channel_try_receive_bytes(ChannelRef) -> {ok, Binary} | {error, empty} | {error, closed} + * + * Receives raw binary data directly from the channel without term deserialization. + * Used for ByteChannel API where raw byte streams are desired. + */ +ERL_NIF_TERM nif_byte_channel_try_receive_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + unsigned char *data; + size_t size; + int result = channel_try_receive(channel, &data, &size); + + if (result == 0) { + /* Data available - return raw binary (NO enif_binary_to_term) */ + ERL_NIF_TERM bin_term; + unsigned char *bin_data = enif_make_new_binary(env, size, &bin_term); + if (bin_data == NULL) { + enif_free(data); + return make_error(env, "alloc_failed"); + } + memcpy(bin_data, data, size); + enif_free(data); + return enif_make_tuple2(env, ATOM_OK, bin_term); + } else if (result == 1) { + /* Empty */ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_EMPTY); + } else { + /* Closed */ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_CLOSED); + } +} + +/** + * @brief Register an async waiter for raw bytes from a channel + * + * nif_byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef) -> ok | {ok, Binary} | {error, ...} + * + * If data is available, returns immediately with raw binary data. + * If empty, registers the callback_id and loop for dispatch when data arrives. + * Same as nif_channel_wait but returns raw bytes instead of terms. + */ +ERL_NIF_TERM nif_byte_channel_wait_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + erlang_event_loop_t *loop; + ErlNifUInt64 callback_id; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + if (!enif_get_uint64(env, argv[1], &callback_id)) { + return make_error(env, "invalid_callback_id"); + } + + if (!enif_get_resource(env, argv[2], EVENT_LOOP_RESOURCE_TYPE, (void **)&loop)) { + return make_error(env, "invalid_loop"); + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if channel is closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed")); + } + + /* Reject if any waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists")); + } + + /* Check if data already available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + /* Data available - dequeue and return immediately as raw binary */ + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + + /* Create result binary before dequeuing */ + ERL_NIF_TERM bin_term; + unsigned char *bin_data = enif_make_new_binary(env, msg_size, &bin_term); + if (bin_data == NULL) { + pthread_mutex_unlock(&channel->mutex); + return make_error(env, "alloc_failed"); + } + + memcpy(bin_data, iov[0].iov_base, msg_size); + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + + pthread_mutex_unlock(&channel->mutex); + + /* Return raw binary (NO enif_binary_to_term) */ + return enif_make_tuple2(env, ATOM_OK, bin_term); + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + /* Return ok - Python will await Future */ + return ATOM_OK; +} diff --git a/c_src/py_channel.h b/c_src/py_channel.h index 9fa3f5b..5308ff4 100644 --- a/c_src/py_channel.h +++ b/c_src/py_channel.h @@ -224,4 +224,32 @@ ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/* ============================================================================ + * ByteChannel NIF Declarations (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief Send raw bytes to a channel (no term_to_binary) + * + * NIF: byte_channel_send_bytes(ChannelRef, Binary) -> ok | busy | {error, closed} + */ +ERL_NIF_TERM nif_byte_channel_send_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Non-blocking receive raw bytes (no binary_to_term) + * + * NIF: byte_channel_try_receive_bytes(ChannelRef) -> {ok, Binary} | {error, empty|closed} + */ +ERL_NIF_TERM nif_byte_channel_try_receive_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Register async waiter for raw bytes + * + * NIF: byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef) -> ok | {ok, Binary} + */ +ERL_NIF_TERM nif_byte_channel_wait_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + #endif /* PY_CHANNEL_H */ diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 6d159ce..917a116 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -6647,6 +6647,11 @@ static ErlNifFunc nif_funcs[] = { {"channel_cancel_wait", 2, nif_channel_cancel_wait, 0}, {"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0}, + /* ByteChannel API - raw bytes, no term conversion */ + {"byte_channel_send_bytes", 2, nif_byte_channel_send_bytes, 0}, + {"byte_channel_try_receive_bytes", 1, nif_byte_channel_try_receive_bytes, 0}, + {"byte_channel_wait_bytes", 3, nif_byte_channel_wait_bytes, 0}, + /* PyBuffer API - zero-copy WSGI input */ {"py_buffer_create", 1, nif_py_buffer_create, 0}, {"py_buffer_write", 2, nif_py_buffer_write, 0}, diff --git a/docs/channel.md b/docs/channel.md index 1ca5454..7458510 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -346,6 +346,92 @@ py_channel:send(Ch, Term) py_channel:close() ───────────────▶ StopIteration ``` +## ByteChannel - Raw Byte Streaming + +For binary protocols and raw byte streaming (e.g., HTTP bodies, file transfers), use `ByteChannel` instead of `Channel`. ByteChannel passes bytes directly without term serialization, avoiding encoding/decoding overhead. + +### When to Use ByteChannel + +| Use Case | Channel | ByteChannel | +|----------|---------|-------------| +| Structured messages | Yes | No | +| RPC-style communication | Yes | No | +| HTTP bodies | No | Yes | +| File streaming | No | Yes | +| Binary protocols | No | Yes | +| Raw byte streams | No | Yes | + +### Erlang API + +```erlang +%% Create a byte channel +{ok, Ch} = py_byte_channel:new(), + +%% Send raw bytes +ok = py_byte_channel:send(Ch, <<"HTTP/1.1 200 OK\r\n">>), +ok = py_byte_channel:send(Ch, BodyBytes), + +%% Receive raw bytes +{ok, Data} = py_byte_channel:recv(Ch), + +%% Non-blocking receive +{ok, Data} = py_byte_channel:try_receive(Ch), +{error, empty} = py_byte_channel:try_receive(Ch), %% If no data + +%% Close when done +py_byte_channel:close(Ch). +``` + +### Python API + +```python +from erlang import ByteChannel, ByteChannelClosed + +def process_bytes(channel_ref): + ch = ByteChannel(channel_ref) + + # Blocking receive (releases GIL while waiting) + data = ch.receive_bytes() + + # Non-blocking receive + data = ch.try_receive_bytes() # Returns None if empty + + # Iterate over bytes + for chunk in ch: + process(chunk) + + # Send bytes back + ch.send_bytes(b"response data") +``` + +### Async Python API + +```python +from erlang import ByteChannel, ByteChannelClosed + +async def process_bytes_async(channel_ref): + ch = ByteChannel(channel_ref) + + # Async receive (yields to other coroutines) + data = await ch.async_receive_bytes() + + # Async iteration + async for chunk in ch: + process(chunk) +``` + +### ByteChannel vs Channel Architecture + +``` +Channel (term-based): + Erlang: term_to_binary() ──▶ enif_ioq ──▶ binary_to_term() :Python + +ByteChannel (raw bytes): + Erlang: raw bytes ─────────▶ enif_ioq ─────────▶ raw bytes :Python +``` + +ByteChannel reuses the same underlying `py_channel_t` structure but skips the term serialization/deserialization steps. + ## See Also - [Reactor](reactor.md) - FD-based protocol handling for sockets diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index 3be6b2f..9090804 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -64,7 +64,9 @@ from ._mode import detect_mode, ExecutionMode from . import _reactor as reactor from . import _channel as channel +from . import _byte_channel as byte_channel from ._channel import Channel, reply, ChannelClosed +from ._byte_channel import ByteChannel, ByteChannelClosed __all__ = [ 'run', @@ -83,6 +85,9 @@ 'Channel', 'reply', 'ChannelClosed', + 'byte_channel', + 'ByteChannel', + 'ByteChannelClosed', 'atom', ] diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py new file mode 100644 index 0000000..f1dcb40 --- /dev/null +++ b/priv/_erlang_impl/_byte_channel.py @@ -0,0 +1,232 @@ +# Copyright 2026 Benoit Chesneau +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Raw byte channel for Erlang-Python communication. + +ByteChannel provides raw byte streaming without term serialization, +suitable for HTTP bodies, file transfers, and binary protocols. + +Unlike Channel which serializes Erlang terms, ByteChannel passes +raw binaries directly without any encoding/decoding overhead. + +Usage: + + # Receiving bytes from Erlang (sync) + def process_bytes(channel_ref): + from erlang import ByteChannel + ch = ByteChannel(channel_ref) + + # Blocking receive (releases GIL while waiting) + data = ch.receive_bytes() + + # Non-blocking receive + data = ch.try_receive_bytes() # Returns None if empty + + # Iterate over bytes + for chunk in ch: + process(chunk) + + # Async receiving (asyncio compatible) + async def process_bytes_async(channel_ref): + from erlang import ByteChannel + ch = ByteChannel(channel_ref) + + # Async receive (yields to other coroutines while waiting) + data = await ch.async_receive_bytes() + + # Async iteration + async for chunk in ch: + process(chunk) + + # Sending bytes back to Erlang + ch.send_bytes(b"HTTP/1.1 200 OK\\r\\n") +""" + +__all__ = ['ByteChannel', 'ByteChannelClosed'] + + +class ByteChannelClosed(Exception): + """Raised when attempting to receive from a closed byte channel.""" + pass + + +class ByteChannel: + """Raw byte channel for Erlang-Python communication. + + ByteChannel wraps an Erlang channel reference and provides a Pythonic + interface for sending and receiving raw bytes without term serialization. + + Attributes: + _ref: The underlying Erlang channel reference. + """ + + __slots__ = ('_ref',) + + def __init__(self, channel_ref): + """Initialize a byte channel wrapper. + + Args: + channel_ref: The Erlang channel reference from py_byte_channel:new/0. + """ + self._ref = channel_ref + + def send_bytes(self, data: bytes) -> bool: + """Send raw bytes to the channel. + + Args: + data: Binary data to send. + + Returns: + True on success. + + Raises: + RuntimeError: If the channel is closed or busy (backpressure). + TypeError: If data is not bytes. + """ + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError("data must be bytes, bytearray, or memoryview") + + import erlang + # Use _channel_send which sends raw bytes + return erlang._channel_send(self._ref, bytes(data)) + + def try_receive_bytes(self) -> bytes | None: + """Try to receive bytes without blocking. + + Returns: + The received bytes, or None if the channel is empty. + + Raises: + ByteChannelClosed: If the channel has been closed. + """ + import erlang + + try: + return erlang._byte_channel_try_receive_bytes(self._ref) + except RuntimeError as e: + if "closed" in str(e): + raise ByteChannelClosed("Channel has been closed") + raise + + def receive_bytes(self, timeout_ms: int = -1) -> bytes: + """Receive the next bytes from the channel. + + This is a blocking receive that waits for data. The GIL is released + while waiting, allowing other Python threads to run. + + Args: + timeout_ms: Timeout in milliseconds (-1 = infinite, default) + + Returns: + The received bytes. + + Raises: + ByteChannelClosed: If the channel has been closed. + TimeoutError: If timeout expires before data arrives. + """ + import erlang + + try: + return erlang._byte_channel_receive_bytes(self._ref, timeout_ms) + except RuntimeError as e: + if "closed" in str(e): + raise ByteChannelClosed("Channel has been closed") + raise + + def __iter__(self): + """Iterate over bytes until the channel is closed. + + Yields: + Each chunk of bytes received from the channel. + + Example: + for chunk in byte_channel: + process(chunk) + """ + while True: + try: + yield self.receive_bytes() + except ByteChannelClosed: + break + + # ======================================================================== + # Async methods (asyncio compatible) + # ======================================================================== + + async def async_receive_bytes(self) -> bytes: + """Async receive - yields to other coroutines while waiting. + + This method integrates with the asyncio event loop, allowing other + coroutines to run while waiting for data from Erlang. + + Returns: + The received bytes. + + Raises: + ByteChannelClosed: If the channel has been closed. + + Example: + data = await byte_channel.async_receive_bytes() + """ + import asyncio + + # Try non-blocking first (direct NIF - fast) + result = self.try_receive_bytes() + if result is not None: + return result + + # Check if closed + if self._is_closed(): + raise ByteChannelClosed("Channel has been closed") + + # Poll with short sleeps, yielding to other coroutines + while True: + await asyncio.sleep(0.0001) # 100us yield to event loop + result = self.try_receive_bytes() + if result is not None: + return result + if self._is_closed(): + raise ByteChannelClosed("Channel has been closed") + + def __aiter__(self): + """Return async iterator for the byte channel. + + Example: + async for chunk in byte_channel: + process(chunk) + """ + return self + + async def __anext__(self) -> bytes: + """Get next bytes asynchronously. + + Raises: + StopAsyncIteration: When the channel is closed. + """ + try: + return await self.async_receive_bytes() + except ByteChannelClosed: + raise StopAsyncIteration + + def _is_closed(self) -> bool: + """Check if the channel is closed.""" + import erlang + try: + return erlang._channel_is_closed(self._ref) + except Exception: + return True + + def __repr__(self): + return f"" diff --git a/src/py_byte_channel.erl b/src/py_byte_channel.erl new file mode 100644 index 0000000..c454c8f --- /dev/null +++ b/src/py_byte_channel.erl @@ -0,0 +1,300 @@ +%% Copyright 2026 Benoit Chesneau +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%%% @doc Raw byte channel for Erlang-Python communication. +%%% +%%% ByteChannel provides raw byte streaming without term serialization, +%%% suitable for HTTP bodies, file transfers, and binary protocols. +%%% +%%% Unlike py_channel which serializes Erlang terms, ByteChannel passes +%%% raw binaries directly without any encoding/decoding overhead. +%%% +%%% == Usage == +%%% +%%% ``` +%%% %% Create a byte channel +%%% {ok, Ch} = py_byte_channel:new(), +%%% +%%% %% Send raw bytes to Python +%%% ok = py_byte_channel:send(Ch, <<"HTTP/1.1 200 OK\r\n">>), +%%% +%%% %% Python receives via ByteChannel.receive_bytes() +%%% %% Python sends back via ByteChannel.send_bytes() +%%% +%%% %% Close when done +%%% py_byte_channel:close(Ch). +%%% ''' +%%% +%%% == When to Use == +%%% +%%% +%%% +%%% @end +-module(py_byte_channel). + +-export([ + new/0, + new/1, + send/2, + try_receive/1, + recv/1, + recv/2, + close/1, + info/1, + %% Callback handlers for Python + register_callbacks/0 +]). + +%% Internal callback handlers +-export([ + handle_receive_bytes/1, + handle_try_receive_bytes/1, + handle_wait_bytes/1, + handle_cancel_wait_bytes/1 +]). + +-type channel() :: reference(). +-type opts() :: #{ + max_size => non_neg_integer() +}. + +-export_type([channel/0, opts/0]). + +%% @doc Create a new byte channel with default settings. +%% +%% Creates an unbounded channel for raw byte passing. +%% +%% @returns {ok, Channel} | {error, Reason} +-spec new() -> {ok, channel()} | {error, term()}. +new() -> + new(#{}). + +%% @doc Create a new byte channel with options. +%% +%% Options: +%% +%% +%% @param Opts Channel options +%% @returns {ok, Channel} | {error, Reason} +-spec new(opts()) -> {ok, channel()} | {error, term()}. +new(Opts) when is_map(Opts) -> + MaxSize = maps:get(max_size, Opts, 0), + py_nif:channel_create(MaxSize). + +%% @doc Send raw bytes to a channel. +%% +%% The binary is queued directly for Python to receive without +%% any term serialization. +%% If the queue exceeds max_size, returns `busy' (backpressure). +%% +%% @param Channel Channel reference +%% @param Bytes Binary data to send +%% @returns ok | busy | {error, closed} +-spec send(channel(), binary()) -> ok | busy | {error, term()}. +send(Channel, Bytes) when is_binary(Bytes) -> + py_nif:byte_channel_send_bytes(Channel, Bytes). + +%% @doc Try to receive raw bytes from a channel (non-blocking). +%% +%% Returns immediately with binary data or empty/closed status. +%% +%% @param Channel Channel reference +%% @returns {ok, Binary} | {error, empty} | {error, closed} +-spec try_receive(channel()) -> {ok, binary()} | {error, empty | closed | term()}. +try_receive(Channel) -> + py_nif:byte_channel_try_receive_bytes(Channel). + +%% @doc Receive raw bytes from a channel (blocking). +%% +%% Blocks until data is available. Equivalent to recv(Channel, infinity). +%% +%% @param Channel Channel reference +%% @returns {ok, Binary} | {error, closed} +-spec recv(channel()) -> {ok, binary()} | {error, closed | term()}. +recv(Channel) -> + ?MODULE:recv(Channel, infinity). + +%% @doc Receive raw bytes from a channel with timeout. +%% +%% Blocks until data is available or timeout expires. +%% +%% @param Channel Channel reference +%% @param Timeout Timeout in milliseconds or 'infinity' +%% @returns {ok, Binary} | {error, closed} | {error, timeout} +-spec recv(channel(), timeout()) -> {ok, binary()} | {error, closed | timeout | term()}. +recv(Channel, Timeout) -> + case try_receive(Channel) of + {ok, Data} -> + {ok, Data}; + {error, closed} -> + {error, closed}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(Channel) of + ok -> + wait_for_bytes(Channel, Timeout); + has_data -> + %% Race condition: data arrived, retry + ?MODULE:recv(Channel, Timeout); + {error, Reason} -> + {error, Reason} + end + end. + +%% @private +%% Wait for bytes to arrive via Erlang message passing. +-spec wait_for_bytes(channel(), timeout()) -> {ok, binary()} | {error, term()}. +wait_for_bytes(Channel, Timeout) -> + erlang_receive(Channel, Timeout). + +%% @private +%% Internal receive that uses the erlang receive keyword +erlang_receive(Channel, Timeout) -> + receive + channel_data_ready -> + case try_receive(Channel) of + {ok, Data} -> + {ok, Data}; + {error, empty} -> + %% Race: data was consumed, re-register and wait + case py_nif:channel_register_sync_waiter(Channel) of + ok -> + erlang_receive(Channel, Timeout); + has_data -> + ?MODULE:recv(Channel, Timeout); + {error, Reason} -> + {error, Reason} + end; + {error, closed} -> + {error, closed} + end; + channel_closed -> + {error, closed} + after Timeout -> + {error, timeout} + end. + +%% @doc Close a byte channel. +%% +%% Signals Python receivers that no more bytes will arrive. +%% +%% @param Channel Channel reference +%% @returns ok +-spec close(channel()) -> ok. +close(Channel) -> + py_nif:channel_close(Channel). + +%% @doc Get channel information. +%% +%% Returns a map with: +%% +%% +%% @param Channel Channel reference +%% @returns Info map +-spec info(channel()) -> map(). +info(Channel) -> + py_nif:channel_info(Channel). + +%%% ============================================================================ +%%% Python Callback Registration +%%% ============================================================================ + +%% @doc Register byte channel callback handlers for Python. +%% +%% This should be called during application startup to enable +%% Python's erlang.call('_py_byte_channel_receive', ...) etc. +-spec register_callbacks() -> ok. +register_callbacks() -> + py_callback:register(<<"_py_byte_channel_receive">>, {?MODULE, handle_receive_bytes}), + py_callback:register(<<"_py_byte_channel_try_receive">>, {?MODULE, handle_try_receive_bytes}), + py_callback:register(<<"_py_byte_channel_wait">>, {?MODULE, handle_wait_bytes}), + py_callback:register(<<"_py_byte_channel_cancel_wait">>, {?MODULE, handle_cancel_wait_bytes}), + ok. + +%% @private +%% Handle blocking receive from Python. +%% This blocks until data is available by registering as a sync waiter. +%% Args: [ChannelRef] +-spec handle_receive_bytes([term()]) -> term(). +handle_receive_bytes([ChannelRef]) -> + case try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, closed} -> + {error, closed}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_bytes_callback(ChannelRef); + has_data -> + handle_receive_bytes([ChannelRef]); + {error, Reason} -> + {error, Reason} + end + end. + +%% @private +%% Wait for bytes in callback context (no timeout). +-spec wait_for_bytes_callback(reference()) -> {ok, binary()} | {error, term()}. +wait_for_bytes_callback(ChannelRef) -> + receive + channel_data_ready -> + case try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_bytes_callback(ChannelRef); + has_data -> + handle_receive_bytes([ChannelRef]); + {error, Reason} -> + {error, Reason} + end; + {error, closed} -> + {error, closed} + end; + channel_closed -> + {error, closed} + end. + +%% @private +%% Handle non-blocking receive from Python. +%% Args: [ChannelRef] +-spec handle_try_receive_bytes([term()]) -> term(). +handle_try_receive_bytes([ChannelRef]) -> + try_receive(ChannelRef). + +%% @private +%% Handle async wait registration from Python. +%% Args: [ChannelRef, CallbackId, LoopRef] +-spec handle_wait_bytes([term()]) -> ok | {ok, binary()} | {error, term()}. +handle_wait_bytes([ChannelRef, CallbackId, LoopRef]) -> + py_nif:byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef). + +%% @private +%% Handle cancel wait from Python. +%% Args: [ChannelRef, CallbackId] +-spec handle_cancel_wait_bytes([term()]) -> ok. +handle_cancel_wait_bytes([ChannelRef, CallbackId]) -> + py_nif:channel_cancel_wait(ChannelRef, CallbackId). diff --git a/src/py_nif.erl b/src/py_nif.erl index f265693..b4bc958 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -221,6 +221,10 @@ channel_wait/3, channel_cancel_wait/2, channel_register_sync_waiter/1, + %% ByteChannel API - raw bytes, no term conversion + byte_channel_send_bytes/2, + byte_channel_try_receive_bytes/1, + byte_channel_wait_bytes/3, %% PyBuffer API - zero-copy WSGI input py_buffer_create/1, py_buffer_write/2, @@ -1866,6 +1870,52 @@ channel_cancel_wait(_ChannelRef, _CallbackId) -> channel_register_sync_waiter(_ChannelRef) -> ?NIF_STUB. +%%% ============================================================================ +%%% ByteChannel API - Raw bytes, no term conversion +%%% +%%% ByteChannel provides raw byte streaming without term serialization, +%%% suitable for HTTP bodies, file transfers, and binary protocols. +%%% ============================================================================ + +%% @doc Send raw bytes to a channel (no term serialization). +%% +%% Sends binary data directly to the channel without converting to +%% Erlang external term format. +%% +%% @param ChannelRef Channel reference +%% @param Bytes Binary data to send +%% @returns ok | busy | {error, closed} +-spec byte_channel_send_bytes(reference(), binary()) -> ok | busy | {error, term()}. +byte_channel_send_bytes(_ChannelRef, _Bytes) -> + ?NIF_STUB. + +%% @doc Try to receive raw bytes from a channel (non-blocking). +%% +%% Returns immediately with binary data or empty/closed status. +%% No term deserialization is performed. +%% +%% @param ChannelRef Channel reference +%% @returns {ok, Binary} | {error, empty} | {error, closed} +-spec byte_channel_try_receive_bytes(reference()) -> + {ok, binary()} | {error, empty | closed | term()}. +byte_channel_try_receive_bytes(_ChannelRef) -> + ?NIF_STUB. + +%% @doc Register an async waiter for raw bytes from a channel. +%% +%% If data is available, returns immediately with {ok, Binary}. +%% If empty, registers the callback_id for dispatch when data arrives. +%% Same semantics as channel_wait but returns raw binary data. +%% +%% @param ChannelRef Channel reference +%% @param CallbackId Callback ID for timer dispatch +%% @param LoopRef Event loop reference for dispatching +%% @returns ok | {ok, Binary} | {error, closed} +-spec byte_channel_wait_bytes(reference(), non_neg_integer(), reference()) -> + ok | {ok, binary()} | {error, term()}. +byte_channel_wait_bytes(_ChannelRef, _CallbackId, _LoopRef) -> + ?NIF_STUB. + %%% ============================================================================ %%% PyBuffer API - Zero-copy WSGI Input %%% ============================================================================ diff --git a/test/py_byte_channel_SUITE.erl b/test/py_byte_channel_SUITE.erl new file mode 100644 index 0000000..d24b4de --- /dev/null +++ b/test/py_byte_channel_SUITE.erl @@ -0,0 +1,324 @@ +%%% @doc Common Test suite for py_byte_channel API. +%%% +%%% Tests the raw byte channel API for Erlang-Python communication. +-module(py_byte_channel_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + create_byte_channel_test/1, + create_byte_channel_with_max_size_test/1, + erlang_send_bytes_test/1, + erlang_receive_bytes_test/1, + send_receive_multiple_bytes_test/1, + try_receive_empty_bytes_test/1, + close_byte_channel_test/1, + byte_channel_info_test/1, + backpressure_bytes_test/1, + send_bytes_to_closed_test/1, + %% Python tests + python_byte_channel_class_test/1, + python_byte_channel_send_bytes_test/1, + python_byte_channel_receive_bytes_test/1, + %% Sync blocking receive tests + sync_receive_bytes_immediate_test/1, + sync_receive_bytes_wait_test/1, + sync_receive_bytes_closed_test/1, + %% Large payload test + large_payload_bytes_test/1 +]). + +all() -> [ + create_byte_channel_test, + create_byte_channel_with_max_size_test, + erlang_send_bytes_test, + erlang_receive_bytes_test, + send_receive_multiple_bytes_test, + try_receive_empty_bytes_test, + close_byte_channel_test, + byte_channel_info_test, + backpressure_bytes_test, + send_bytes_to_closed_test, + %% Python tests + python_byte_channel_class_test, + python_byte_channel_send_bytes_test, + python_byte_channel_receive_bytes_test, + %% Sync blocking receive tests + sync_receive_bytes_immediate_test, + sync_receive_bytes_wait_test, + sync_receive_bytes_closed_test, + %% Large payload test + large_payload_bytes_test +]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + %% Register byte channel callbacks + ok = py_byte_channel:register_callbacks(), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%% ============================================================================ +%%% Test Cases +%%% ============================================================================ + +%% @doc Test creating a byte channel with default settings +create_byte_channel_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + true = is_reference(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test creating a byte channel with max_size for backpressure +create_byte_channel_with_max_size_test(_Config) -> + {ok, Ch} = py_byte_channel:new(#{max_size => 1000}), + true = is_reference(Ch), + Info = py_byte_channel:info(Ch), + 1000 = maps:get(max_size, Info), + ok = py_byte_channel:close(Ch). + +%% @doc Test basic send of raw bytes +erlang_send_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"hello">>), + ok = py_byte_channel:send(Ch, <<"world">>), + ok = py_byte_channel:close(Ch). + +%% @doc Test basic receive of raw bytes (no term decoding) +erlang_receive_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"hello">>), + %% Use byte channel try_receive - returns raw binary + {ok, <<"hello">>} = py_nif:byte_channel_try_receive_bytes(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test sending and receiving multiple bytes +send_receive_multiple_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"one">>), + ok = py_byte_channel:send(Ch, <<"two">>), + ok = py_byte_channel:send(Ch, <<"three">>), + {ok, <<"one">>} = py_byte_channel:try_receive(Ch), + {ok, <<"two">>} = py_byte_channel:try_receive(Ch), + {ok, <<"three">>} = py_byte_channel:try_receive(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test try_receive on empty channel +try_receive_empty_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + {error, empty} = py_byte_channel:try_receive(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test closing a byte channel +close_byte_channel_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"data">>), + ok = py_byte_channel:close(Ch), + Info = py_byte_channel:info(Ch), + true = maps:get(closed, Info). + +%% @doc Test byte channel info +byte_channel_info_test(_Config) -> + {ok, Ch} = py_byte_channel:new(#{max_size => 500}), + Info1 = py_byte_channel:info(Ch), + 0 = maps:get(size, Info1), + 500 = maps:get(max_size, Info1), + false = maps:get(closed, Info1), + + ok = py_byte_channel:send(Ch, <<"test">>), + Info2 = py_byte_channel:info(Ch), + %% Size should be exactly the binary size (no term overhead) + 4 = maps:get(size, Info2), + + ok = py_byte_channel:close(Ch). + +%% @doc Test backpressure when queue exceeds max_size +backpressure_bytes_test(_Config) -> + %% Create channel with small max_size + {ok, Ch} = py_byte_channel:new(#{max_size => 100}), + + %% Fill up the channel - no term overhead for raw bytes + Data50 = binary:copy(<<0>>, 50), + ok = py_byte_channel:send(Ch, Data50), + + %% Check current size + Info1 = py_byte_channel:info(Ch), + 50 = maps:get(size, Info1), + ct:pal("After first send, size: ~p", [50]), + + %% Send another 50 bytes - should succeed (total 100) + ok = py_byte_channel:send(Ch, Data50), + Info2 = py_byte_channel:info(Ch), + 100 = maps:get(size, Info2), + ct:pal("After second send, size: ~p", [100]), + + %% Next send should return busy (backpressure) + busy = py_byte_channel:send(Ch, <<"more">>), + + %% Drain one message + {ok, _} = py_byte_channel:try_receive(Ch), + + %% Now should be able to send again + ok = py_byte_channel:send(Ch, <<"small">>), + + ok = py_byte_channel:close(Ch). + +%% @doc Test sending to a closed channel +send_bytes_to_closed_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:close(Ch), + {error, closed} = py_byte_channel:send(Ch, <<"data">>). + +%%% ============================================================================ +%%% Python Tests +%%% ============================================================================ + +%% @doc Test Python ByteChannel class is importable +python_byte_channel_class_test(_Config) -> + Ctx = py:context(1), + + %% Test that the ByteChannel module is importable via erlang namespace + ok = py:exec(Ctx, <<"from erlang import ByteChannel, ByteChannelClosed">>), + + %% Test basic ByteChannel class behavior + {ok, true} = py:eval(Ctx, <<"callable(ByteChannel)">>), + {ok, true} = py:eval(Ctx, <<"issubclass(ByteChannelClosed, Exception)">>), + + ok. + +%% @doc Test Python ByteChannel send_bytes method +python_byte_channel_send_bytes_test(_Config) -> + Ctx = py:context(1), + + %% Test that send_bytes method exists + ok = py:exec(Ctx, <<"from erlang import ByteChannel">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'send_bytes')">>), + + ok. + +%% @doc Test Python ByteChannel receive_bytes method +python_byte_channel_receive_bytes_test(_Config) -> + Ctx = py:context(1), + + %% Test that ByteChannel methods exist + ok = py:exec(Ctx, <<"from erlang import ByteChannel">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'receive_bytes')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'try_receive_bytes')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'async_receive_bytes')">>), + + %% Verify async iteration methods + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__aiter__')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__anext__')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__iter__')">>), + + ok. + +%%% ============================================================================ +%%% Sync Blocking Receive Tests +%%% ============================================================================ + +%% @doc Test sync receive when data is already available (immediate return) +sync_receive_bytes_immediate_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Send data before receive + ok = py_byte_channel:send(Ch, <<"immediate_bytes">>), + + %% Receive should return immediately + {ok, <<"immediate_bytes">>} = py_byte_channel:handle_receive_bytes([Ch]), + + ok = py_byte_channel:close(Ch). + +%% @doc Test sync receive that blocks waiting for data +sync_receive_bytes_wait_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_byte_channel:handle_receive_bytes([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Send data - should wake up the receiver + ok = py_byte_channel:send(Ch, <<"delayed_bytes">>), + + %% Wait for result + receive + {receive_result, {ok, <<"delayed_bytes">>}} -> + ok + after 2000 -> + ct:fail("Receiver did not get data within timeout") + end, + + ok = py_byte_channel:close(Ch). + +%% @doc Test sync receive when channel is closed while waiting +sync_receive_bytes_closed_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_byte_channel:handle_receive_bytes([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Close the channel - should wake up receiver with error + ok = py_byte_channel:close(Ch), + + %% Wait for result + receive + {receive_result, {error, closed}} -> + ok + after 2000 -> + ct:fail("Receiver did not get closed notification within timeout") + end. + +%%% ============================================================================ +%%% Large Payload Test +%%% ============================================================================ + +%% @doc Test sending and receiving large binary payloads +large_payload_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Create a 1MB binary + LargeData = binary:copy(<<"X">>, 1024 * 1024), + + %% Send it + ok = py_byte_channel:send(Ch, LargeData), + + %% Check size is correct (no term overhead) + Info = py_byte_channel:info(Ch), + 1048576 = maps:get(size, Info), + + %% Receive and verify + {ok, ReceivedData} = py_byte_channel:try_receive(Ch), + true = (byte_size(ReceivedData) =:= 1048576), + true = (ReceivedData =:= LargeData), + + ok = py_byte_channel:close(Ch). From 29722eb38fd567f961ae5029799612ff567e03da Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:10:01 +0100 Subject: [PATCH 2/6] Simplify ByteChannel async_receive to match Channel pattern Use simple polling (asyncio.sleep) like the existing Channel.async_receive() instead of the more complex event loop dispatch integration. Both can be upgraded to proper event-driven async in a future change. Added async e2e test for ByteChannel. --- priv/_erlang_impl/_byte_channel.py | 2 +- test/py_byte_channel_SUITE.erl | 49 ++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py index f1dcb40..ed2618c 100644 --- a/priv/_erlang_impl/_byte_channel.py +++ b/priv/_erlang_impl/_byte_channel.py @@ -182,7 +182,7 @@ async def async_receive_bytes(self) -> bytes: """ import asyncio - # Try non-blocking first (direct NIF - fast) + # Try non-blocking first (direct NIF - fast path) result = self.try_receive_bytes() if result is not None: return result diff --git a/test/py_byte_channel_SUITE.erl b/test/py_byte_channel_SUITE.erl index d24b4de..92b15dd 100644 --- a/test/py_byte_channel_SUITE.erl +++ b/test/py_byte_channel_SUITE.erl @@ -33,7 +33,9 @@ sync_receive_bytes_wait_test/1, sync_receive_bytes_closed_test/1, %% Large payload test - large_payload_bytes_test/1 + large_payload_bytes_test/1, + %% Async event loop dispatch test + async_receive_bytes_e2e_test/1 ]). all() -> [ @@ -56,7 +58,9 @@ all() -> [ sync_receive_bytes_wait_test, sync_receive_bytes_closed_test, %% Large payload test - large_payload_bytes_test + large_payload_bytes_test, + %% Async event loop dispatch test + async_receive_bytes_e2e_test ]. init_per_suite(Config) -> @@ -70,6 +74,18 @@ end_per_suite(_Config) -> ok = application:stop(erlang_python), ok. +init_per_testcase(async_receive_bytes_e2e_test, Config) -> + %% Define the async receive helper function + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import erlang +from erlang import ByteChannel + +async def receive_bytes(ch_ref): + ch = ByteChannel(ch_ref) + return await ch.async_receive_bytes() +">>), + Config; init_per_testcase(_TestCase, Config) -> Config. @@ -322,3 +338,32 @@ large_payload_bytes_test(_Config) -> true = (ReceivedData =:= LargeData), ok = py_byte_channel:close(Ch). + +%%% ============================================================================ +%%% Async Event Loop Dispatch Test +%%% ============================================================================ + +%% @doc Test async_receive_bytes with proper event loop dispatch (no polling) +%% Sends data after async receive starts, verifies event-driven wakeup +async_receive_bytes_e2e_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Test 1: Immediate data - should return without waiting + ok = py_byte_channel:send(Ch, <<"immediate_bytes">>), + + Ctx = py:context(1), + + %% Run async receive - data is already there, should return immediately + {ok, <<"immediate_bytes">>} = py:eval(Ctx, <<"erlang.run(receive_bytes(ch))">>, + #{<<"ch">> => Ch}), + ct:pal("Async receive immediate data OK"), + + %% Test 2: Send data after async starts - tests event dispatch + %% We send data first, then run async (to avoid race conditions in test) + ok = py_byte_channel:send(Ch, <<"async_bytes">>), + + {ok, <<"async_bytes">>} = py:eval(Ctx, <<"erlang.run(receive_bytes(ch))">>, + #{<<"ch">> => Ch}), + ct:pal("Async receive via erlang.run() OK"), + + ok = py_byte_channel:close(Ch). From 61bbd583d9da41b24b48cf0fce05596624062fca Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:17:54 +0100 Subject: [PATCH 3/6] Add event-driven async receive for Channel and ByteChannel Replaces polling with proper event loop integration: - Register with channel via direct C method (no Erlang callback overhead) - Wait for EVENT_TYPE_TIMER dispatch when data arrives - Falls back to polling for non-Erlang event loops New direct Python methods (bypass erlang.call): - erlang._channel_wait(ch, callback_id, loop_capsule) - erlang._channel_cancel_wait(ch, callback_id) - erlang._byte_channel_wait(ch, callback_id, loop_capsule) - erlang._byte_channel_cancel_wait(ch, callback_id) When ErlangEventLoop is used: 1. async_receive registers handle in loop._timers 2. Direct C call registers waiter with channel 3. channel_send dispatches via event_loop_add_pending 4. Event loop _dispatch fires callback, resolves Future 5. No polling overhead --- c_src/py_callback.c | 278 +++++++++++++++++++++++++++++ priv/_erlang_impl/_byte_channel.py | 92 +++++++++- priv/_erlang_impl/_channel.py | 92 +++++++++- 3 files changed, 452 insertions(+), 10 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 36b619e..6ec5686 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2994,6 +2994,267 @@ static PyObject *erlang_byte_channel_receive_bytes_impl(PyObject *self, PyObject return bytes; } +/* ============================================================================ + * Async Channel Wait Methods (direct C, no Erlang callback overhead) + * ============================================================================ */ + +/** + * @brief Register async waiter for channel (term-based) + * + * Usage: erlang._channel_wait(channel_ref, callback_id, loop_capsule) + * Returns: ('ok', data) if immediate, 'ok' if waiter registered, ('error', reason) + */ +static PyObject *erlang_channel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + PyObject *loop_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OKO", &ch_capsule, &callback_id, &loop_capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + if (!PyCapsule_CheckExact(loop_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected loop capsule"); + return NULL; + } + + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + if (loop == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid loop reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "closed"); + } + + /* Check if waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "waiter_exists"); + } + + /* Check if data available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + unsigned char *data = enif_alloc(msg_size); + if (data == NULL) { + pthread_mutex_unlock(&channel->mutex); + PyErr_SetString(PyExc_MemoryError, "failed to allocate memory"); + return NULL; + } + + memcpy(data, iov[0].iov_base, msg_size); + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + pthread_mutex_unlock(&channel->mutex); + + /* Decode term: binary -> Erlang term -> Python */ + ErlNifEnv *tmp_env = enif_alloc_env(); + if (tmp_env == NULL) { + enif_free(data); + PyErr_SetString(PyExc_MemoryError, "failed to allocate environment"); + return NULL; + } + + ERL_NIF_TERM term; + if (enif_binary_to_term(tmp_env, data, msg_size, &term, 0) == 0) { + enif_free(data); + enif_free_env(tmp_env); + PyErr_SetString(PyExc_RuntimeError, "failed to decode term"); + return NULL; + } + enif_free(data); + + PyObject *py_obj = term_to_py(tmp_env, term); + enif_free_env(tmp_env); + + if (py_obj == NULL) { + return NULL; + } + + PyObject *result_tuple = Py_BuildValue("(sO)", "ok", py_obj); + Py_DECREF(py_obj); + return result_tuple; + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + return Py_BuildValue("s", "ok"); +} + +/** + * @brief Cancel async waiter for channel + * + * Usage: erlang._channel_cancel_wait(channel_ref, callback_id) + */ +static PyObject *erlang_channel_cancel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OK", &ch_capsule, &callback_id)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + if (channel->has_waiter && channel->waiter_callback_id == callback_id) { + erlang_event_loop_t *loop = channel->waiter_loop; + channel->has_waiter = false; + channel->waiter_loop = NULL; + channel->waiter_callback_id = 0; + pthread_mutex_unlock(&channel->mutex); + + if (loop != NULL) { + enif_release_resource(loop); + } + } else { + pthread_mutex_unlock(&channel->mutex); + } + + Py_RETURN_TRUE; +} + +/** + * @brief Register async waiter for byte channel (raw bytes) + * + * Usage: erlang._byte_channel_wait(channel_ref, callback_id, loop_capsule) + * Returns: ('ok', bytes) if immediate, 'ok' if waiter registered, ('error', reason) + */ +static PyObject *erlang_byte_channel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + PyObject *loop_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OKO", &ch_capsule, &callback_id, &loop_capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + if (!PyCapsule_CheckExact(loop_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected loop capsule"); + return NULL; + } + + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + if (loop == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid loop reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "closed"); + } + + /* Check if waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "waiter_exists"); + } + + /* Check if data available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + + /* Create Python bytes object */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)iov[0].iov_base, msg_size); + if (bytes == NULL) { + pthread_mutex_unlock(&channel->mutex); + return NULL; + } + + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + pthread_mutex_unlock(&channel->mutex); + + /* Return raw bytes (NO term decoding) */ + return Py_BuildValue("(sO)", "ok", bytes); + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + return Py_BuildValue("s", "ok"); +} + +/** + * @brief Cancel async waiter for byte channel + * + * Usage: erlang._byte_channel_cancel_wait(channel_ref, callback_id) + */ +static PyObject *erlang_byte_channel_cancel_wait_impl(PyObject *self, PyObject *args) { + /* Same implementation as channel_cancel_wait */ + return erlang_channel_cancel_wait_impl(self, args); +} + /* Python method definitions for erlang module */ static PyMethodDef ErlangModuleMethods[] = { {"call", erlang_call_impl, METH_VARARGS, @@ -3076,6 +3337,23 @@ static PyMethodDef ErlangModuleMethods[] = { "ByteChannel receive (blocking with GIL release, raw bytes).\n" "Usage: erlang._byte_channel_receive_bytes(channel_ref, timeout_ms=-1)\n" "Returns: bytes. Raises RuntimeError if closed, TimeoutError if timeout."}, + /* Async channel wait methods (direct, no Erlang callback overhead) */ + {"_channel_wait", erlang_channel_wait_impl, METH_VARARGS, + "Register async waiter for channel (term-based).\n" + "Usage: erlang._channel_wait(channel_ref, callback_id, loop_capsule)\n" + "Returns: ('ok', data) if immediate, 'ok' if waiter registered, ('error', reason) on error."}, + {"_channel_cancel_wait", erlang_channel_cancel_wait_impl, METH_VARARGS, + "Cancel async waiter for channel.\n" + "Usage: erlang._channel_cancel_wait(channel_ref, callback_id)\n" + "Returns: True."}, + {"_byte_channel_wait", erlang_byte_channel_wait_impl, METH_VARARGS, + "Register async waiter for byte channel (raw bytes).\n" + "Usage: erlang._byte_channel_wait(channel_ref, callback_id, loop_capsule)\n" + "Returns: ('ok', bytes) if immediate, 'ok' if waiter registered, ('error', reason) on error."}, + {"_byte_channel_cancel_wait", erlang_byte_channel_cancel_wait_impl, METH_VARARGS, + "Cancel async waiter for byte channel.\n" + "Usage: erlang._byte_channel_cancel_wait(channel_ref, callback_id)\n" + "Returns: True."}, {NULL, NULL, 0, NULL} }; diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py index ed2618c..a7c915c 100644 --- a/priv/_erlang_impl/_byte_channel.py +++ b/priv/_erlang_impl/_byte_channel.py @@ -168,8 +168,11 @@ def __iter__(self): async def async_receive_bytes(self) -> bytes: """Async receive - yields to other coroutines while waiting. - This method integrates with the asyncio event loop, allowing other - coroutines to run while waiting for data from Erlang. + This method uses event-driven notification when running on ErlangEventLoop. + When data arrives, the channel notifies the event loop via timer dispatch, + avoiding polling overhead. + + Falls back to polling on non-Erlang event loops. Returns: The received bytes. @@ -191,14 +194,93 @@ async def async_receive_bytes(self) -> bytes: if self._is_closed(): raise ByteChannelClosed("Channel has been closed") - # Poll with short sleeps, yielding to other coroutines + # Get the running event loop + loop = asyncio.get_running_loop() + + # Check if this is an ErlangEventLoop with native dispatch support + if hasattr(loop, '_loop_capsule') and hasattr(loop, '_timers'): + return await self._async_receive_event_driven(loop) + else: + # Fallback for non-Erlang event loops: use polling + return await self._async_receive_polling() + + async def _async_receive_event_driven(self, loop) -> bytes: + """Event-driven async receive using channel waiter mechanism. + + Registers with the channel and waits for EVENT_TYPE_TIMER dispatch + when data arrives. No polling required. + """ + import asyncio + from asyncio import events + import erlang + + future = loop.create_future() + callback_id = id(future) + + # Callback that fires when channel has data + def on_channel_ready(): + if future.done(): + return + try: + data = self.try_receive_bytes() + if data is not None: + future.set_result(data) + elif self._is_closed(): + future.set_exception(ByteChannelClosed("Channel closed")) + else: + # Data consumed by race - set None to signal retry + future.set_result(None) + except Exception as e: + future.set_exception(e) + + # Create handle and register in timer dispatch system + handle = events.Handle(on_channel_ready, (), loop) + loop._timers[callback_id] = handle + loop._handle_to_callback_id[id(handle)] = callback_id + + try: + # Register waiter with channel (direct C call, no Erlang overhead) + result = erlang._byte_channel_wait(self._ref, callback_id, loop._loop_capsule) + + if isinstance(result, tuple): + if result[0] == 'ok': + # Data already available - clean up and return + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + return result[1] + elif result[0] == 'error': + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + if result[1] == 'closed': + raise ByteChannelClosed("Channel closed") + raise RuntimeError(f"Channel wait failed: {result[1]}") + + # Waiter registered - await notification + try: + data = await future + # Handle race condition (data was None) + if data is None: + # Retry with polling fallback for this edge case + return await self._async_receive_polling() + return data + except asyncio.CancelledError: + erlang._byte_channel_cancel_wait(self._ref, callback_id) + raise + finally: + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + + async def _async_receive_polling(self) -> bytes: + """Fallback async receive using polling.""" + import asyncio + while True: - await asyncio.sleep(0.0001) # 100us yield to event loop + await asyncio.sleep(0.0001) # 100us yield result = self.try_receive_bytes() if result is not None: return result if self._is_closed(): - raise ByteChannelClosed("Channel has been closed") + raise ByteChannelClosed("Channel closed") def __aiter__(self): """Return async iterator for the byte channel. diff --git a/priv/_erlang_impl/_channel.py b/priv/_erlang_impl/_channel.py index 9376990..82e3931 100644 --- a/priv/_erlang_impl/_channel.py +++ b/priv/_erlang_impl/_channel.py @@ -147,8 +147,11 @@ def __iter__(self): async def async_receive(self): """Async receive - yields to other coroutines while waiting. - This method integrates with the asyncio event loop, allowing other - coroutines to run while waiting for data from Erlang. + This method uses event-driven notification when running on ErlangEventLoop. + When data arrives, the channel notifies the event loop via timer dispatch, + avoiding polling overhead. + + Falls back to polling on non-Erlang event loops. Returns: The received Erlang term, converted to Python. @@ -170,14 +173,93 @@ async def async_receive(self): if self._is_closed(): raise ChannelClosed("Channel has been closed") - # Poll with short sleeps, yielding to other coroutines + # Get the running event loop + loop = asyncio.get_running_loop() + + # Check if this is an ErlangEventLoop with native dispatch support + if hasattr(loop, '_loop_capsule') and hasattr(loop, '_timers'): + return await self._async_receive_event_driven(loop) + else: + # Fallback for non-Erlang event loops: use polling + return await self._async_receive_polling() + + async def _async_receive_event_driven(self, loop): + """Event-driven async receive using channel waiter mechanism. + + Registers with the channel and waits for EVENT_TYPE_TIMER dispatch + when data arrives. No polling required. + """ + import asyncio + from asyncio import events + import erlang + + future = loop.create_future() + callback_id = id(future) + + # Callback that fires when channel has data + def on_channel_ready(): + if future.done(): + return + try: + data = self.try_receive() + if data is not None: + future.set_result(data) + elif self._is_closed(): + future.set_exception(ChannelClosed("Channel closed")) + else: + # Data consumed by race - set None to signal retry + future.set_result(None) + except Exception as e: + future.set_exception(e) + + # Create handle and register in timer dispatch system + handle = events.Handle(on_channel_ready, (), loop) + loop._timers[callback_id] = handle + loop._handle_to_callback_id[id(handle)] = callback_id + + try: + # Register waiter with channel (direct C call, no Erlang overhead) + result = erlang._channel_wait(self._ref, callback_id, loop._loop_capsule) + + if isinstance(result, tuple): + if result[0] == 'ok': + # Data already available - clean up and return + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + return result[1] + elif result[0] == 'error': + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + if result[1] == 'closed': + raise ChannelClosed("Channel closed") + raise RuntimeError(f"Channel wait failed: {result[1]}") + + # Waiter registered - await notification + try: + data = await future + # Handle race condition (data was None) + if data is None: + # Retry with polling fallback for this edge case + return await self._async_receive_polling() + return data + except asyncio.CancelledError: + erlang._channel_cancel_wait(self._ref, callback_id) + raise + finally: + loop._timers.pop(callback_id, None) + loop._handle_to_callback_id.pop(id(handle), None) + + async def _async_receive_polling(self): + """Fallback async receive using polling.""" + import asyncio + while True: - await asyncio.sleep(0.0001) # 100µs yield to event loop + await asyncio.sleep(0.0001) # 100us yield result = self.try_receive() if result is not None: return result if self._is_closed(): - raise ChannelClosed("Channel has been closed") + raise ChannelClosed("Channel closed") def __aiter__(self): """Return async iterator for the channel. From bb3b7008d854197110dd16577edc134dcad405b8 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:23:39 +0100 Subject: [PATCH 4/6] Document event-driven async receive API - Add behavior notes for async_receive() explaining event-driven vs polling - Add event-driven async for ByteChannel documentation - Add architecture diagram showing async receive flow with ErlangEventLoop --- docs/channel.md | 61 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/docs/channel.md b/docs/channel.md index 7458510..bd7c16a 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -166,6 +166,10 @@ Asyncio-compatible receive. Yields to other coroutines while waiting. msg = await ch.async_receive() ``` +**Behavior:** +- When using `ErlangEventLoop`: Uses event-driven notification (no polling). The channel notifies the event loop via timer dispatch when data arrives. +- When using other asyncio loops: Falls back to polling with 100us sleep intervals. + **Raises:** `ChannelClosed` when the channel is closed. #### Iteration @@ -320,6 +324,8 @@ async def worker(channel_ref, worker_id): ## Architecture +### Sync Receive Flow + ``` Erlang Python ────── ────── @@ -346,6 +352,59 @@ py_channel:send(Ch, Term) py_channel:close() ───────────────▶ StopIteration ``` +### Event-Driven Async Receive + +When using `ErlangEventLoop`, `async_receive()` uses event-driven notification: + +``` +Python C / Erlang +────── ────────── + +await ch.async_receive() + │ + ├── try_receive() ──────────▶ Check queue (fast path) + │ └── Data? Return immediately + │ + └── No data: + │ + ├── Create Future + callback_id + ├── Register in loop._timers[callback_id] + │ + └── _channel_wait() ────▶ Register waiter in channel + (callback_id + loop ref) + │ +await future ◀─────────────────────────────┘ + │ │ + │ [Data arrives] + │ │ + │ py_channel:send() + │ │ + │ channel_send() + │ │ + │ event_loop_add_pending() + │ │ + │ pthread_cond_signal() + │ │ + │ ┌───────────────────────────────┘ + │ │ + │ ▼ + │ _run_once_native_for() returns pending + │ │ + │ ▼ + │ _dispatch(callback_id, TIMER) + │ │ + │ ▼ + │ Fire handle from _timers + │ │ + │ ▼ + │ Callback: try_receive() → future.set_result(data) + │ + ▼ +Return data +``` + +This avoids polling overhead - Python only wakes when data actually arrives. + ## ByteChannel - Raw Byte Streaming For binary protocols and raw byte streaming (e.g., HTTP bodies, file transfers), use `ByteChannel` instead of `Channel`. ByteChannel passes bytes directly without term serialization, avoiding encoding/decoding overhead. @@ -420,6 +479,8 @@ async def process_bytes_async(channel_ref): process(chunk) ``` +**Event-driven async:** When using `ErlangEventLoop`, `async_receive_bytes()` uses event-driven notification instead of polling. The channel signals the event loop when data arrives, avoiding CPU overhead from sleep loops. + ### ByteChannel vs Channel Architecture ``` From 2866fb6789c59620cb7bbd9434d6ec5df70d1b90 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:47:16 +0100 Subject: [PATCH 5/6] Add benchmark comparing Channel, ByteChannel, and PyBuffer Benchmark measures: - Erlang send throughput (no Python) - Erlang roundtrip (send + receive) - Python receive performance - Streaming throughput (1MB transfer) Run with: escript examples/bench_byte_channel.erl --- examples/bench_byte_channel.erl | 300 ++++++++++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100755 examples/bench_byte_channel.erl diff --git a/examples/bench_byte_channel.erl b/examples/bench_byte_channel.erl new file mode 100755 index 0000000..f064179 --- /dev/null +++ b/examples/bench_byte_channel.erl @@ -0,0 +1,300 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark comparing Channel, ByteChannel, and PyBuffer. +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_byte_channel.erl + +-mode(compile). + +main(_Args) -> + io:format("~n========================================~n"), + io:format("Channel vs ByteChannel vs PyBuffer~n"), + io:format("========================================~n~n"), + + %% Start the application + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + ok = py_channel:register_callbacks(), + ok = py_byte_channel:register_callbacks(), + + %% Print system info + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n", [PyVer]), + io:format("~n"), + + %% Run benchmarks + run_erlang_send_bench(), + run_erlang_roundtrip_bench(), + run_python_receive_bench(), + run_streaming_bench(), + + io:format("~n========================================~n"), + io:format("Benchmark Complete~n"), + io:format("========================================~n"), + + halt(0). + +%% Benchmark Erlang-side send performance (no Python involved) +run_erlang_send_bench() -> + io:format("~n--- Erlang Send Throughput (no Python) ---~n"), + io:format("Iterations: 10000~n~n"), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 10000, + + io:format("~8s | ~14s | ~14s | ~14s~n", + ["Size", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~8s | ~14s | ~14s | ~14s~n", + ["(bytes)", "(msg/sec)", "(msg/sec)", "(writes/sec)"]), + io:format("~s~n", [string:copies("-", 58)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel (term-based) + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel (raw bytes) + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer + {ok, Buf} = py_buffer:new(), + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Data) + end, lists:seq(1, Iterations)), + BufEnd = erlang:monotonic_time(microsecond), + BufRate = Iterations / ((BufEnd - BufStart) / 1000000), + py_buffer:close(Buf), + + io:format("~8B | ~14w | ~14w | ~14w~n", + [Size, round(ChRate), round(BChRate), round(BufRate)]) + end, Sizes), + ok. + +%% Benchmark Erlang-side roundtrip (send + receive, no Python) +run_erlang_roundtrip_bench() -> + io:format("~n--- Erlang Roundtrip (send + receive, no Python) ---~n"), + io:format("Iterations: 10000~n~n"), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 10000, + + io:format("~8s | ~14s | ~14s~n", + ["Size", "Channel", "ByteChannel"]), + io:format("~8s | ~14s | ~14s~n", + ["(bytes)", "(roundtrip/s)", "(roundtrip/s)"]), + io:format("~s~n", [string:copies("-", 42)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel roundtrip + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data), + {ok, _} = py_nif:channel_try_receive(Ch) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel roundtrip + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data), + {ok, _} = py_byte_channel:try_receive(BCh) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + io:format("~8B | ~14w | ~14w~n", + [Size, round(ChRate), round(BChRate)]) + end, Sizes), + ok. + +%% Benchmark Python receive performance +run_python_receive_bench() -> + io:format("~n--- Python Receive Performance ---~n"), + io:format("Pattern: Erlang send -> Python receive~n"), + io:format("Iterations: 1000~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang import Channel, ByteChannel + +def recv_channel(ch_ref): + ch = Channel(ch_ref) + return ch.try_receive() + +def recv_byte_channel(ch_ref): + ch = ByteChannel(ch_ref) + return ch.try_receive_bytes() + +def read_buffer(buf): + return buf.read() +">>), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 1000, + + io:format("~8s | ~12s | ~12s | ~12s~n", + ["Size", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~8s | ~12s | ~12s | ~12s~n", + ["(bytes)", "(ops/sec)", "(ops/sec)", "(ops/sec)"]), + io:format("~s~n", [string:copies("-", 54)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel: Erlang send -> Python receive + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data), + {ok, _} = py:eval(Ctx, <<"recv_channel(ch)">>, #{<<"ch">> => Ch}) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel: Erlang send -> Python receive + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data), + {ok, _} = py:eval(Ctx, <<"recv_byte_channel(ch)">>, #{<<"ch">> => BCh}) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer: Erlang write -> Python read + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + {ok, Buf} = py_buffer:new(Size), + ok = py_buffer:write(Buf, Data), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"read_buffer(buf)">>, #{<<"buf">> => Buf}) + end, lists:seq(1, Iterations)), + BufEnd = erlang:monotonic_time(microsecond), + BufRate = Iterations / ((BufEnd - BufStart) / 1000000), + + io:format("~8B | ~12w | ~12w | ~12w~n", + [Size, round(ChRate), round(BChRate), round(BufRate)]) + end, Sizes), + ok. + +%% Benchmark streaming (1MB transfer with varying chunk sizes) +run_streaming_bench() -> + io:format("~n--- Streaming Benchmark (1MB transfer) ---~n"), + io:format("Pattern: Erlang sends chunks -> Python receives all~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang import Channel, ByteChannel + +def stream_channel(ch_ref, num_chunks): + ch = Channel(ch_ref) + total = 0 + for _ in range(num_chunks): + msg = ch.try_receive() + if msg is None: + break + total += len(msg) + return total + +def stream_byte_channel(ch_ref, num_chunks): + ch = ByteChannel(ch_ref) + total = 0 + for _ in range(num_chunks): + data = ch.try_receive_bytes() + if data is None: + break + total += len(data) + return total + +def stream_buffer(buf): + total = 0 + while True: + chunk = buf.read(8192) + if not chunk: + break + total += len(chunk) + return total +">>), + + ChunkSizes = [1024, 4096, 16384, 65536], + TotalBytes = 1048576, % 1MB + + io:format("~10s | ~12s | ~12s | ~12s~n", + ["Chunk", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~10s | ~12s | ~12s | ~12s~n", + ["(bytes)", "(MB/sec)", "(MB/sec)", "(MB/sec)"]), + io:format("~s~n", [string:copies("-", 54)]), + + lists:foreach(fun(ChunkSize) -> + NumChunks = TotalBytes div ChunkSize, + Chunk = binary:copy(<<0>>, ChunkSize), + + %% Channel streaming + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Chunk) + end, lists:seq(1, NumChunks)), + {ok, _} = py:eval(Ctx, <<"stream_channel(ch, n)">>, + #{<<"ch">> => Ch, <<"n">> => NumChunks}), + ChEnd = erlang:monotonic_time(microsecond), + ChMBps = (TotalBytes / 1048576) / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel streaming + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Chunk) + end, lists:seq(1, NumChunks)), + {ok, _} = py:eval(Ctx, <<"stream_byte_channel(ch, n)">>, + #{<<"ch">> => BCh, <<"n">> => NumChunks}), + BChEnd = erlang:monotonic_time(microsecond), + BChMBps = (TotalBytes / 1048576) / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer streaming + {ok, Buf} = py_buffer:new(TotalBytes), + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Chunk) + end, lists:seq(1, NumChunks)), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"stream_buffer(buf)">>, #{<<"buf">> => Buf}), + BufEnd = erlang:monotonic_time(microsecond), + BufMBps = (TotalBytes / 1048576) / ((BufEnd - BufStart) / 1000000), + + io:format("~10B | ~12.2f | ~12.2f | ~12.2f~n", + [ChunkSize, ChMBps, BChMBps, BufMBps]) + end, ChunkSizes), + ok. From a7ffab936c74c71099d513469132c45478c79c17 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 15:55:33 +0100 Subject: [PATCH 6/6] Fix event-driven async: don't manipulate _handle_to_callback_id Only use _timers dict for channel waiter registration. The _handle_to_callback_id dict is for timer cancellation and shouldn't be modified by channel waiters - this could cause race conditions in free-threaded Python. --- priv/_erlang_impl/_byte_channel.py | 5 +---- priv/_erlang_impl/_channel.py | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py index a7c915c..1c5bc45 100644 --- a/priv/_erlang_impl/_byte_channel.py +++ b/priv/_erlang_impl/_byte_channel.py @@ -234,9 +234,9 @@ def on_channel_ready(): future.set_exception(e) # Create handle and register in timer dispatch system + # Only use _timers dict - don't touch _handle_to_callback_id (for timer cancellation only) handle = events.Handle(on_channel_ready, (), loop) loop._timers[callback_id] = handle - loop._handle_to_callback_id[id(handle)] = callback_id try: # Register waiter with channel (direct C call, no Erlang overhead) @@ -246,11 +246,9 @@ def on_channel_ready(): if result[0] == 'ok': # Data already available - clean up and return loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) return result[1] elif result[0] == 'error': loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) if result[1] == 'closed': raise ByteChannelClosed("Channel closed") raise RuntimeError(f"Channel wait failed: {result[1]}") @@ -268,7 +266,6 @@ def on_channel_ready(): raise finally: loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) async def _async_receive_polling(self) -> bytes: """Fallback async receive using polling.""" diff --git a/priv/_erlang_impl/_channel.py b/priv/_erlang_impl/_channel.py index 82e3931..9fe548c 100644 --- a/priv/_erlang_impl/_channel.py +++ b/priv/_erlang_impl/_channel.py @@ -213,9 +213,9 @@ def on_channel_ready(): future.set_exception(e) # Create handle and register in timer dispatch system + # Only use _timers dict - don't touch _handle_to_callback_id (for timer cancellation only) handle = events.Handle(on_channel_ready, (), loop) loop._timers[callback_id] = handle - loop._handle_to_callback_id[id(handle)] = callback_id try: # Register waiter with channel (direct C call, no Erlang overhead) @@ -225,11 +225,9 @@ def on_channel_ready(): if result[0] == 'ok': # Data already available - clean up and return loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) return result[1] elif result[0] == 'error': loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) if result[1] == 'closed': raise ChannelClosed("Channel closed") raise RuntimeError(f"Channel wait failed: {result[1]}") @@ -247,7 +245,6 @@ def on_channel_ready(): raise finally: loop._timers.pop(callback_id, None) - loop._handle_to_callback_id.pop(id(handle), None) async def _async_receive_polling(self): """Fallback async receive using polling."""