diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f292c..3cb0c0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,20 @@ ### Added +- **ByteChannel API** - Raw byte streaming channel without term serialization + - `py_byte_channel:new/0,1` - Create byte channel (with optional backpressure) + - `py_byte_channel:send/2` - Send raw bytes to Python + - `py_byte_channel:recv/1,2` - Blocking receive with optional timeout + - `py_byte_channel:try_receive/1` - Non-blocking receive + - Python `ByteChannel` class with: + - `send_bytes(data)` - Send bytes back to Erlang + - `receive_bytes()` - Blocking receive (GIL released) + - `try_receive_bytes()` - Non-blocking receive + - `async_receive_bytes()` - Asyncio-compatible async receive + - Sync and async iteration (`for chunk in ch`, `async for chunk in ch`) + - Reuses the same `py_channel_t` infrastructure but skips term encoding/decoding + - Suitable for HTTP bodies, file streaming, and binary protocols + - **Automatic Env Reuse for Event Loop Tasks** - Functions defined via `py:exec(Ctx, Code)` can now be called directly using `py_event_loop:run/3,4`, `create_task/3,4`, and `spawn_task/3,4` without manual env passing. The process-local environment is automatically detected and used diff --git a/c_src/py_callback.c b/c_src/py_callback.c index f593b01..6ec5686 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2856,6 +2856,405 @@ static PyObject *erlang_channel_is_closed_impl(PyObject *self, PyObject *args) { } } +/* ============================================================================ + * ByteChannel Methods (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief ByteChannel try_receive_bytes - non-blocking, returns raw bytes + * + * Usage: erlang._byte_channel_try_receive_bytes(channel_ref) + * Returns: bytes if data available, None if empty + * Raises: RuntimeError if channel closed + */ +static PyObject *erlang_byte_channel_try_receive_bytes_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + unsigned char *data = NULL; + size_t size = 0; + + int result = channel_try_receive(channel, &data, &size); + + if (result == 0) { + /* Success - return raw bytes (NO term decoding) */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; /* May be NULL if allocation failed */ + } else if (result == 1) { + /* Empty */ + Py_RETURN_NONE; + } else { + /* Closed */ + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } +} + +/** + * @brief ByteChannel receive_bytes - blocking with GIL release, returns raw bytes + * + * Usage: erlang._byte_channel_receive_bytes(channel_ref, timeout_ms) + * Returns: bytes when data available + * Raises: RuntimeError if channel closed, TimeoutError if timeout + * + * This function releases the GIL while waiting, allowing other Python + * threads to run. Uses polling with short sleeps. + */ +static PyObject *erlang_byte_channel_receive_bytes_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + long timeout_ms = -1; /* -1 = infinite */ + + if (!PyArg_ParseTuple(args, "O|l", &capsule, &timeout_ms)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + unsigned char *data = NULL; + size_t size = 0; + int result; + + /* First try without blocking */ + result = channel_try_receive(channel, &data, &size); + if (result == 0) { + /* Got data - return raw bytes */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; + } else if (result == -1) { + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } + + /* Need to wait - release GIL and poll */ + { + long elapsed_us = 0; + const long poll_interval_us = 100; /* 100 microseconds */ + const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1; + + Py_BEGIN_ALLOW_THREADS + + while (1) { + result = channel_try_receive(channel, &data, &size); + if (result != 1) { + break; /* Got data or closed */ + } + + /* Check timeout */ + if (timeout_us >= 0 && elapsed_us >= timeout_us) { + result = 2; /* Timeout */ + break; + } + + /* Sleep briefly */ + usleep(poll_interval_us); + elapsed_us += poll_interval_us; + } + + Py_END_ALLOW_THREADS + } + + if (result == 2) { + PyErr_SetString(PyExc_TimeoutError, "channel receive timeout"); + return NULL; + } else if (result == -1) { + PyErr_SetString(PyExc_RuntimeError, "channel closed"); + return NULL; + } + + /* Return raw bytes (NO term decoding) */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size); + enif_free(data); + return bytes; +} + +/* ============================================================================ + * Async Channel Wait Methods (direct C, no Erlang callback overhead) + * ============================================================================ */ + +/** + * @brief Register async waiter for channel (term-based) + * + * Usage: erlang._channel_wait(channel_ref, callback_id, loop_capsule) + * Returns: ('ok', data) if immediate, 'ok' if waiter registered, ('error', reason) + */ +static PyObject *erlang_channel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + PyObject *loop_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OKO", &ch_capsule, &callback_id, &loop_capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + if (!PyCapsule_CheckExact(loop_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected loop capsule"); + return NULL; + } + + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + if (loop == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid loop reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "closed"); + } + + /* Check if waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "waiter_exists"); + } + + /* Check if data available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + unsigned char *data = enif_alloc(msg_size); + if (data == NULL) { + pthread_mutex_unlock(&channel->mutex); + PyErr_SetString(PyExc_MemoryError, "failed to allocate memory"); + return NULL; + } + + memcpy(data, iov[0].iov_base, msg_size); + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + pthread_mutex_unlock(&channel->mutex); + + /* Decode term: binary -> Erlang term -> Python */ + ErlNifEnv *tmp_env = enif_alloc_env(); + if (tmp_env == NULL) { + enif_free(data); + PyErr_SetString(PyExc_MemoryError, "failed to allocate environment"); + return NULL; + } + + ERL_NIF_TERM term; + if (enif_binary_to_term(tmp_env, data, msg_size, &term, 0) == 0) { + enif_free(data); + enif_free_env(tmp_env); + PyErr_SetString(PyExc_RuntimeError, "failed to decode term"); + return NULL; + } + enif_free(data); + + PyObject *py_obj = term_to_py(tmp_env, term); + enif_free_env(tmp_env); + + if (py_obj == NULL) { + return NULL; + } + + PyObject *result_tuple = Py_BuildValue("(sO)", "ok", py_obj); + Py_DECREF(py_obj); + return result_tuple; + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + return Py_BuildValue("s", "ok"); +} + +/** + * @brief Cancel async waiter for channel + * + * Usage: erlang._channel_cancel_wait(channel_ref, callback_id) + */ +static PyObject *erlang_channel_cancel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OK", &ch_capsule, &callback_id)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + if (channel->has_waiter && channel->waiter_callback_id == callback_id) { + erlang_event_loop_t *loop = channel->waiter_loop; + channel->has_waiter = false; + channel->waiter_loop = NULL; + channel->waiter_callback_id = 0; + pthread_mutex_unlock(&channel->mutex); + + if (loop != NULL) { + enif_release_resource(loop); + } + } else { + pthread_mutex_unlock(&channel->mutex); + } + + Py_RETURN_TRUE; +} + +/** + * @brief Register async waiter for byte channel (raw bytes) + * + * Usage: erlang._byte_channel_wait(channel_ref, callback_id, loop_capsule) + * Returns: ('ok', bytes) if immediate, 'ok' if waiter registered, ('error', reason) + */ +static PyObject *erlang_byte_channel_wait_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *ch_capsule; + PyObject *loop_capsule; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OKO", &ch_capsule, &callback_id, &loop_capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(ch_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(ch_capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + if (!PyCapsule_CheckExact(loop_capsule)) { + PyErr_SetString(PyExc_TypeError, "expected loop capsule"); + return NULL; + } + + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + if (loop == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid loop reference"); + return NULL; + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "closed"); + } + + /* Check if waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return Py_BuildValue("(ss)", "error", "waiter_exists"); + } + + /* Check if data available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + + /* Create Python bytes object */ + PyObject *bytes = PyBytes_FromStringAndSize((char *)iov[0].iov_base, msg_size); + if (bytes == NULL) { + pthread_mutex_unlock(&channel->mutex); + return NULL; + } + + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + pthread_mutex_unlock(&channel->mutex); + + /* Return raw bytes (NO term decoding) */ + return Py_BuildValue("(sO)", "ok", bytes); + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + return Py_BuildValue("s", "ok"); +} + +/** + * @brief Cancel async waiter for byte channel + * + * Usage: erlang._byte_channel_cancel_wait(channel_ref, callback_id) + */ +static PyObject *erlang_byte_channel_cancel_wait_impl(PyObject *self, PyObject *args) { + /* Same implementation as channel_cancel_wait */ + return erlang_channel_cancel_wait_impl(self, args); +} + /* Python method definitions for erlang module */ static PyMethodDef ErlangModuleMethods[] = { {"call", erlang_call_impl, METH_VARARGS, @@ -2929,6 +3328,32 @@ static PyMethodDef ErlangModuleMethods[] = { "Check if channel is closed.\n" "Usage: erlang._channel_is_closed(channel_ref)\n" "Returns: True if closed, False otherwise."}, + /* ByteChannel methods (raw bytes, no term conversion) */ + {"_byte_channel_try_receive_bytes", erlang_byte_channel_try_receive_bytes_impl, METH_VARARGS, + "ByteChannel receive (non-blocking, raw bytes).\n" + "Usage: erlang._byte_channel_try_receive_bytes(channel_ref)\n" + "Returns: bytes if data, None if empty. Raises RuntimeError if closed."}, + {"_byte_channel_receive_bytes", erlang_byte_channel_receive_bytes_impl, METH_VARARGS, + "ByteChannel receive (blocking with GIL release, raw bytes).\n" + "Usage: erlang._byte_channel_receive_bytes(channel_ref, timeout_ms=-1)\n" + "Returns: bytes. Raises RuntimeError if closed, TimeoutError if timeout."}, + /* Async channel wait methods (direct, no Erlang callback overhead) */ + {"_channel_wait", erlang_channel_wait_impl, METH_VARARGS, + "Register async waiter for channel (term-based).\n" + "Usage: erlang._channel_wait(channel_ref, callback_id, loop_capsule)\n" + "Returns: ('ok', data) if immediate, 'ok' if waiter registered, ('error', reason) on error."}, + {"_channel_cancel_wait", erlang_channel_cancel_wait_impl, METH_VARARGS, + "Cancel async waiter for channel.\n" + "Usage: erlang._channel_cancel_wait(channel_ref, callback_id)\n" + "Returns: True."}, + {"_byte_channel_wait", erlang_byte_channel_wait_impl, METH_VARARGS, + "Register async waiter for byte channel (raw bytes).\n" + "Usage: erlang._byte_channel_wait(channel_ref, callback_id, loop_capsule)\n" + "Returns: ('ok', bytes) if immediate, 'ok' if waiter registered, ('error', reason) on error."}, + {"_byte_channel_cancel_wait", erlang_byte_channel_cancel_wait_impl, METH_VARARGS, + "Cancel async waiter for byte channel.\n" + "Usage: erlang._byte_channel_cancel_wait(channel_ref, callback_id)\n" + "Returns: True."}, {NULL, NULL, 0, NULL} }; @@ -3388,10 +3813,15 @@ static int create_erlang_module(void) { " erlang.Channel = _erlang_impl.Channel\n" " erlang.ChannelClosed = _erlang_impl.ChannelClosed\n" " erlang.reply = _erlang_impl.reply\n" + " # ByteChannel for raw bytes streaming\n" + " erlang.byte_channel = _erlang_impl.byte_channel\n" + " erlang.ByteChannel = _erlang_impl.ByteChannel\n" + " erlang.ByteChannelClosed = _erlang_impl.ByteChannelClosed\n" " # Make erlang behave as a package for 'import erlang.reactor' syntax\n" " erlang.__path__ = [priv_dir]\n" " sys.modules['erlang.reactor'] = erlang.reactor\n" " sys.modules['erlang.channel'] = erlang.channel\n" + " sys.modules['erlang.byte_channel'] = erlang.byte_channel\n" " return True\n" " except ImportError as e:\n" " import sys\n" diff --git a/c_src/py_channel.c b/c_src/py_channel.c index c434d38..5c3707e 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -835,3 +835,164 @@ ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ER pthread_mutex_unlock(&channel->mutex); return ATOM_OK; } + +/* ============================================================================ + * ByteChannel NIF Functions (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief Send raw bytes to a channel (no term_to_binary) + * + * nif_byte_channel_send_bytes(ChannelRef, Binary) -> ok | busy | {error, closed} + * + * Sends raw binary data directly to the channel without term serialization. + * Used for ByteChannel API where raw byte streams are desired. + */ +ERL_NIF_TERM nif_byte_channel_send_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + ErlNifBinary bin; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + if (!enif_inspect_binary(env, argv[1], &bin)) { + return make_error(env, "invalid_binary"); + } + + int result = channel_send(channel, bin.data, bin.size); + + switch (result) { + case 0: + return ATOM_OK; + case 1: + return ATOM_BUSY; + default: + return enif_make_tuple2(env, ATOM_ERROR, ATOM_CLOSED); + } +} + +/** + * @brief Non-blocking receive raw bytes from a channel (no binary_to_term) + * + * nif_byte_channel_try_receive_bytes(ChannelRef) -> {ok, Binary} | {error, empty} | {error, closed} + * + * Receives raw binary data directly from the channel without term deserialization. + * Used for ByteChannel API where raw byte streams are desired. + */ +ERL_NIF_TERM nif_byte_channel_try_receive_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + unsigned char *data; + size_t size; + int result = channel_try_receive(channel, &data, &size); + + if (result == 0) { + /* Data available - return raw binary (NO enif_binary_to_term) */ + ERL_NIF_TERM bin_term; + unsigned char *bin_data = enif_make_new_binary(env, size, &bin_term); + if (bin_data == NULL) { + enif_free(data); + return make_error(env, "alloc_failed"); + } + memcpy(bin_data, data, size); + enif_free(data); + return enif_make_tuple2(env, ATOM_OK, bin_term); + } else if (result == 1) { + /* Empty */ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_EMPTY); + } else { + /* Closed */ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_CLOSED); + } +} + +/** + * @brief Register an async waiter for raw bytes from a channel + * + * nif_byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef) -> ok | {ok, Binary} | {error, ...} + * + * If data is available, returns immediately with raw binary data. + * If empty, registers the callback_id and loop for dispatch when data arrives. + * Same as nif_channel_wait but returns raw bytes instead of terms. + */ +ERL_NIF_TERM nif_byte_channel_wait_bytes(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + erlang_event_loop_t *loop; + ErlNifUInt64 callback_id; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + if (!enif_get_uint64(env, argv[1], &callback_id)) { + return make_error(env, "invalid_callback_id"); + } + + if (!enif_get_resource(env, argv[2], EVENT_LOOP_RESOURCE_TYPE, (void **)&loop)) { + return make_error(env, "invalid_loop"); + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if channel is closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed")); + } + + /* Reject if any waiter already exists */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists")); + } + + /* Check if data already available */ + size_t queue_size = enif_ioq_size(channel->queue); + if (queue_size > 0) { + /* Data available - dequeue and return immediately as raw binary */ + SysIOVec *iov; + int iovcnt; + iov = enif_ioq_peek(channel->queue, &iovcnt); + + if (iovcnt > 0 && iov != NULL && iov[0].iov_len > 0) { + size_t msg_size = iov[0].iov_len; + + /* Create result binary before dequeuing */ + ERL_NIF_TERM bin_term; + unsigned char *bin_data = enif_make_new_binary(env, msg_size, &bin_term); + if (bin_data == NULL) { + pthread_mutex_unlock(&channel->mutex); + return make_error(env, "alloc_failed"); + } + + memcpy(bin_data, iov[0].iov_base, msg_size); + enif_ioq_deq(channel->queue, msg_size, NULL); + channel->current_size -= msg_size; + + pthread_mutex_unlock(&channel->mutex); + + /* Return raw binary (NO enif_binary_to_term) */ + return enif_make_tuple2(env, ATOM_OK, bin_term); + } + } + + /* No data - register waiter */ + enif_keep_resource(loop); + + channel->waiter_loop = loop; + channel->waiter_callback_id = callback_id; + channel->has_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + + /* Return ok - Python will await Future */ + return ATOM_OK; +} diff --git a/c_src/py_channel.h b/c_src/py_channel.h index 9fa3f5b..5308ff4 100644 --- a/c_src/py_channel.h +++ b/c_src/py_channel.h @@ -224,4 +224,32 @@ ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/* ============================================================================ + * ByteChannel NIF Declarations (raw bytes, no term conversion) + * ============================================================================ */ + +/** + * @brief Send raw bytes to a channel (no term_to_binary) + * + * NIF: byte_channel_send_bytes(ChannelRef, Binary) -> ok | busy | {error, closed} + */ +ERL_NIF_TERM nif_byte_channel_send_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Non-blocking receive raw bytes (no binary_to_term) + * + * NIF: byte_channel_try_receive_bytes(ChannelRef) -> {ok, Binary} | {error, empty|closed} + */ +ERL_NIF_TERM nif_byte_channel_try_receive_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Register async waiter for raw bytes + * + * NIF: byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef) -> ok | {ok, Binary} + */ +ERL_NIF_TERM nif_byte_channel_wait_bytes(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + #endif /* PY_CHANNEL_H */ diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 6d159ce..917a116 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -6647,6 +6647,11 @@ static ErlNifFunc nif_funcs[] = { {"channel_cancel_wait", 2, nif_channel_cancel_wait, 0}, {"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0}, + /* ByteChannel API - raw bytes, no term conversion */ + {"byte_channel_send_bytes", 2, nif_byte_channel_send_bytes, 0}, + {"byte_channel_try_receive_bytes", 1, nif_byte_channel_try_receive_bytes, 0}, + {"byte_channel_wait_bytes", 3, nif_byte_channel_wait_bytes, 0}, + /* PyBuffer API - zero-copy WSGI input */ {"py_buffer_create", 1, nif_py_buffer_create, 0}, {"py_buffer_write", 2, nif_py_buffer_write, 0}, diff --git a/docs/channel.md b/docs/channel.md index 1ca5454..bd7c16a 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -166,6 +166,10 @@ Asyncio-compatible receive. Yields to other coroutines while waiting. msg = await ch.async_receive() ``` +**Behavior:** +- When using `ErlangEventLoop`: Uses event-driven notification (no polling). The channel notifies the event loop via timer dispatch when data arrives. +- When using other asyncio loops: Falls back to polling with 100us sleep intervals. + **Raises:** `ChannelClosed` when the channel is closed. #### Iteration @@ -320,6 +324,8 @@ async def worker(channel_ref, worker_id): ## Architecture +### Sync Receive Flow + ``` Erlang Python ────── ────── @@ -346,6 +352,147 @@ py_channel:send(Ch, Term) py_channel:close() ───────────────▶ StopIteration ``` +### Event-Driven Async Receive + +When using `ErlangEventLoop`, `async_receive()` uses event-driven notification: + +``` +Python C / Erlang +────── ────────── + +await ch.async_receive() + │ + ├── try_receive() ──────────▶ Check queue (fast path) + │ └── Data? Return immediately + │ + └── No data: + │ + ├── Create Future + callback_id + ├── Register in loop._timers[callback_id] + │ + └── _channel_wait() ────▶ Register waiter in channel + (callback_id + loop ref) + │ +await future ◀─────────────────────────────┘ + │ │ + │ [Data arrives] + │ │ + │ py_channel:send() + │ │ + │ channel_send() + │ │ + │ event_loop_add_pending() + │ │ + │ pthread_cond_signal() + │ │ + │ ┌───────────────────────────────┘ + │ │ + │ ▼ + │ _run_once_native_for() returns pending + │ │ + │ ▼ + │ _dispatch(callback_id, TIMER) + │ │ + │ ▼ + │ Fire handle from _timers + │ │ + │ ▼ + │ Callback: try_receive() → future.set_result(data) + │ + ▼ +Return data +``` + +This avoids polling overhead - Python only wakes when data actually arrives. + +## ByteChannel - Raw Byte Streaming + +For binary protocols and raw byte streaming (e.g., HTTP bodies, file transfers), use `ByteChannel` instead of `Channel`. ByteChannel passes bytes directly without term serialization, avoiding encoding/decoding overhead. + +### When to Use ByteChannel + +| Use Case | Channel | ByteChannel | +|----------|---------|-------------| +| Structured messages | Yes | No | +| RPC-style communication | Yes | No | +| HTTP bodies | No | Yes | +| File streaming | No | Yes | +| Binary protocols | No | Yes | +| Raw byte streams | No | Yes | + +### Erlang API + +```erlang +%% Create a byte channel +{ok, Ch} = py_byte_channel:new(), + +%% Send raw bytes +ok = py_byte_channel:send(Ch, <<"HTTP/1.1 200 OK\r\n">>), +ok = py_byte_channel:send(Ch, BodyBytes), + +%% Receive raw bytes +{ok, Data} = py_byte_channel:recv(Ch), + +%% Non-blocking receive +{ok, Data} = py_byte_channel:try_receive(Ch), +{error, empty} = py_byte_channel:try_receive(Ch), %% If no data + +%% Close when done +py_byte_channel:close(Ch). +``` + +### Python API + +```python +from erlang import ByteChannel, ByteChannelClosed + +def process_bytes(channel_ref): + ch = ByteChannel(channel_ref) + + # Blocking receive (releases GIL while waiting) + data = ch.receive_bytes() + + # Non-blocking receive + data = ch.try_receive_bytes() # Returns None if empty + + # Iterate over bytes + for chunk in ch: + process(chunk) + + # Send bytes back + ch.send_bytes(b"response data") +``` + +### Async Python API + +```python +from erlang import ByteChannel, ByteChannelClosed + +async def process_bytes_async(channel_ref): + ch = ByteChannel(channel_ref) + + # Async receive (yields to other coroutines) + data = await ch.async_receive_bytes() + + # Async iteration + async for chunk in ch: + process(chunk) +``` + +**Event-driven async:** When using `ErlangEventLoop`, `async_receive_bytes()` uses event-driven notification instead of polling. The channel signals the event loop when data arrives, avoiding CPU overhead from sleep loops. + +### ByteChannel vs Channel Architecture + +``` +Channel (term-based): + Erlang: term_to_binary() ──▶ enif_ioq ──▶ binary_to_term() :Python + +ByteChannel (raw bytes): + Erlang: raw bytes ─────────▶ enif_ioq ─────────▶ raw bytes :Python +``` + +ByteChannel reuses the same underlying `py_channel_t` structure but skips the term serialization/deserialization steps. + ## See Also - [Reactor](reactor.md) - FD-based protocol handling for sockets diff --git a/examples/bench_byte_channel.erl b/examples/bench_byte_channel.erl new file mode 100755 index 0000000..f064179 --- /dev/null +++ b/examples/bench_byte_channel.erl @@ -0,0 +1,300 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark comparing Channel, ByteChannel, and PyBuffer. +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_byte_channel.erl + +-mode(compile). + +main(_Args) -> + io:format("~n========================================~n"), + io:format("Channel vs ByteChannel vs PyBuffer~n"), + io:format("========================================~n~n"), + + %% Start the application + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + ok = py_channel:register_callbacks(), + ok = py_byte_channel:register_callbacks(), + + %% Print system info + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n", [PyVer]), + io:format("~n"), + + %% Run benchmarks + run_erlang_send_bench(), + run_erlang_roundtrip_bench(), + run_python_receive_bench(), + run_streaming_bench(), + + io:format("~n========================================~n"), + io:format("Benchmark Complete~n"), + io:format("========================================~n"), + + halt(0). + +%% Benchmark Erlang-side send performance (no Python involved) +run_erlang_send_bench() -> + io:format("~n--- Erlang Send Throughput (no Python) ---~n"), + io:format("Iterations: 10000~n~n"), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 10000, + + io:format("~8s | ~14s | ~14s | ~14s~n", + ["Size", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~8s | ~14s | ~14s | ~14s~n", + ["(bytes)", "(msg/sec)", "(msg/sec)", "(writes/sec)"]), + io:format("~s~n", [string:copies("-", 58)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel (term-based) + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel (raw bytes) + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer + {ok, Buf} = py_buffer:new(), + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Data) + end, lists:seq(1, Iterations)), + BufEnd = erlang:monotonic_time(microsecond), + BufRate = Iterations / ((BufEnd - BufStart) / 1000000), + py_buffer:close(Buf), + + io:format("~8B | ~14w | ~14w | ~14w~n", + [Size, round(ChRate), round(BChRate), round(BufRate)]) + end, Sizes), + ok. + +%% Benchmark Erlang-side roundtrip (send + receive, no Python) +run_erlang_roundtrip_bench() -> + io:format("~n--- Erlang Roundtrip (send + receive, no Python) ---~n"), + io:format("Iterations: 10000~n~n"), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 10000, + + io:format("~8s | ~14s | ~14s~n", + ["Size", "Channel", "ByteChannel"]), + io:format("~8s | ~14s | ~14s~n", + ["(bytes)", "(roundtrip/s)", "(roundtrip/s)"]), + io:format("~s~n", [string:copies("-", 42)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel roundtrip + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data), + {ok, _} = py_nif:channel_try_receive(Ch) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel roundtrip + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data), + {ok, _} = py_byte_channel:try_receive(BCh) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + io:format("~8B | ~14w | ~14w~n", + [Size, round(ChRate), round(BChRate)]) + end, Sizes), + ok. + +%% Benchmark Python receive performance +run_python_receive_bench() -> + io:format("~n--- Python Receive Performance ---~n"), + io:format("Pattern: Erlang send -> Python receive~n"), + io:format("Iterations: 1000~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang import Channel, ByteChannel + +def recv_channel(ch_ref): + ch = Channel(ch_ref) + return ch.try_receive() + +def recv_byte_channel(ch_ref): + ch = ByteChannel(ch_ref) + return ch.try_receive_bytes() + +def read_buffer(buf): + return buf.read() +">>), + + Sizes = [64, 256, 1024, 4096, 16384], + Iterations = 1000, + + io:format("~8s | ~12s | ~12s | ~12s~n", + ["Size", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~8s | ~12s | ~12s | ~12s~n", + ["(bytes)", "(ops/sec)", "(ops/sec)", "(ops/sec)"]), + io:format("~s~n", [string:copies("-", 54)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + + %% Channel: Erlang send -> Python receive + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data), + {ok, _} = py:eval(Ctx, <<"recv_channel(ch)">>, #{<<"ch">> => Ch}) + end, lists:seq(1, Iterations)), + ChEnd = erlang:monotonic_time(microsecond), + ChRate = Iterations / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel: Erlang send -> Python receive + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Data), + {ok, _} = py:eval(Ctx, <<"recv_byte_channel(ch)">>, #{<<"ch">> => BCh}) + end, lists:seq(1, Iterations)), + BChEnd = erlang:monotonic_time(microsecond), + BChRate = Iterations / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer: Erlang write -> Python read + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + {ok, Buf} = py_buffer:new(Size), + ok = py_buffer:write(Buf, Data), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"read_buffer(buf)">>, #{<<"buf">> => Buf}) + end, lists:seq(1, Iterations)), + BufEnd = erlang:monotonic_time(microsecond), + BufRate = Iterations / ((BufEnd - BufStart) / 1000000), + + io:format("~8B | ~12w | ~12w | ~12w~n", + [Size, round(ChRate), round(BChRate), round(BufRate)]) + end, Sizes), + ok. + +%% Benchmark streaming (1MB transfer with varying chunk sizes) +run_streaming_bench() -> + io:format("~n--- Streaming Benchmark (1MB transfer) ---~n"), + io:format("Pattern: Erlang sends chunks -> Python receives all~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang import Channel, ByteChannel + +def stream_channel(ch_ref, num_chunks): + ch = Channel(ch_ref) + total = 0 + for _ in range(num_chunks): + msg = ch.try_receive() + if msg is None: + break + total += len(msg) + return total + +def stream_byte_channel(ch_ref, num_chunks): + ch = ByteChannel(ch_ref) + total = 0 + for _ in range(num_chunks): + data = ch.try_receive_bytes() + if data is None: + break + total += len(data) + return total + +def stream_buffer(buf): + total = 0 + while True: + chunk = buf.read(8192) + if not chunk: + break + total += len(chunk) + return total +">>), + + ChunkSizes = [1024, 4096, 16384, 65536], + TotalBytes = 1048576, % 1MB + + io:format("~10s | ~12s | ~12s | ~12s~n", + ["Chunk", "Channel", "ByteChannel", "PyBuffer"]), + io:format("~10s | ~12s | ~12s | ~12s~n", + ["(bytes)", "(MB/sec)", "(MB/sec)", "(MB/sec)"]), + io:format("~s~n", [string:copies("-", 54)]), + + lists:foreach(fun(ChunkSize) -> + NumChunks = TotalBytes div ChunkSize, + Chunk = binary:copy(<<0>>, ChunkSize), + + %% Channel streaming + {ok, Ch} = py_channel:new(), + ChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Chunk) + end, lists:seq(1, NumChunks)), + {ok, _} = py:eval(Ctx, <<"stream_channel(ch, n)">>, + #{<<"ch">> => Ch, <<"n">> => NumChunks}), + ChEnd = erlang:monotonic_time(microsecond), + ChMBps = (TotalBytes / 1048576) / ((ChEnd - ChStart) / 1000000), + py_channel:close(Ch), + + %% ByteChannel streaming + {ok, BCh} = py_byte_channel:new(), + BChStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_byte_channel:send(BCh, Chunk) + end, lists:seq(1, NumChunks)), + {ok, _} = py:eval(Ctx, <<"stream_byte_channel(ch, n)">>, + #{<<"ch">> => BCh, <<"n">> => NumChunks}), + BChEnd = erlang:monotonic_time(microsecond), + BChMBps = (TotalBytes / 1048576) / ((BChEnd - BChStart) / 1000000), + py_byte_channel:close(BCh), + + %% PyBuffer streaming + {ok, Buf} = py_buffer:new(TotalBytes), + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Chunk) + end, lists:seq(1, NumChunks)), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"stream_buffer(buf)">>, #{<<"buf">> => Buf}), + BufEnd = erlang:monotonic_time(microsecond), + BufMBps = (TotalBytes / 1048576) / ((BufEnd - BufStart) / 1000000), + + io:format("~10B | ~12.2f | ~12.2f | ~12.2f~n", + [ChunkSize, ChMBps, BChMBps, BufMBps]) + end, ChunkSizes), + ok. diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index 3be6b2f..9090804 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -64,7 +64,9 @@ from ._mode import detect_mode, ExecutionMode from . import _reactor as reactor from . import _channel as channel +from . import _byte_channel as byte_channel from ._channel import Channel, reply, ChannelClosed +from ._byte_channel import ByteChannel, ByteChannelClosed __all__ = [ 'run', @@ -83,6 +85,9 @@ 'Channel', 'reply', 'ChannelClosed', + 'byte_channel', + 'ByteChannel', + 'ByteChannelClosed', 'atom', ] diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py new file mode 100644 index 0000000..1c5bc45 --- /dev/null +++ b/priv/_erlang_impl/_byte_channel.py @@ -0,0 +1,311 @@ +# Copyright 2026 Benoit Chesneau +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Raw byte channel for Erlang-Python communication. + +ByteChannel provides raw byte streaming without term serialization, +suitable for HTTP bodies, file transfers, and binary protocols. + +Unlike Channel which serializes Erlang terms, ByteChannel passes +raw binaries directly without any encoding/decoding overhead. + +Usage: + + # Receiving bytes from Erlang (sync) + def process_bytes(channel_ref): + from erlang import ByteChannel + ch = ByteChannel(channel_ref) + + # Blocking receive (releases GIL while waiting) + data = ch.receive_bytes() + + # Non-blocking receive + data = ch.try_receive_bytes() # Returns None if empty + + # Iterate over bytes + for chunk in ch: + process(chunk) + + # Async receiving (asyncio compatible) + async def process_bytes_async(channel_ref): + from erlang import ByteChannel + ch = ByteChannel(channel_ref) + + # Async receive (yields to other coroutines while waiting) + data = await ch.async_receive_bytes() + + # Async iteration + async for chunk in ch: + process(chunk) + + # Sending bytes back to Erlang + ch.send_bytes(b"HTTP/1.1 200 OK\\r\\n") +""" + +__all__ = ['ByteChannel', 'ByteChannelClosed'] + + +class ByteChannelClosed(Exception): + """Raised when attempting to receive from a closed byte channel.""" + pass + + +class ByteChannel: + """Raw byte channel for Erlang-Python communication. + + ByteChannel wraps an Erlang channel reference and provides a Pythonic + interface for sending and receiving raw bytes without term serialization. + + Attributes: + _ref: The underlying Erlang channel reference. + """ + + __slots__ = ('_ref',) + + def __init__(self, channel_ref): + """Initialize a byte channel wrapper. + + Args: + channel_ref: The Erlang channel reference from py_byte_channel:new/0. + """ + self._ref = channel_ref + + def send_bytes(self, data: bytes) -> bool: + """Send raw bytes to the channel. + + Args: + data: Binary data to send. + + Returns: + True on success. + + Raises: + RuntimeError: If the channel is closed or busy (backpressure). + TypeError: If data is not bytes. + """ + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError("data must be bytes, bytearray, or memoryview") + + import erlang + # Use _channel_send which sends raw bytes + return erlang._channel_send(self._ref, bytes(data)) + + def try_receive_bytes(self) -> bytes | None: + """Try to receive bytes without blocking. + + Returns: + The received bytes, or None if the channel is empty. + + Raises: + ByteChannelClosed: If the channel has been closed. + """ + import erlang + + try: + return erlang._byte_channel_try_receive_bytes(self._ref) + except RuntimeError as e: + if "closed" in str(e): + raise ByteChannelClosed("Channel has been closed") + raise + + def receive_bytes(self, timeout_ms: int = -1) -> bytes: + """Receive the next bytes from the channel. + + This is a blocking receive that waits for data. The GIL is released + while waiting, allowing other Python threads to run. + + Args: + timeout_ms: Timeout in milliseconds (-1 = infinite, default) + + Returns: + The received bytes. + + Raises: + ByteChannelClosed: If the channel has been closed. + TimeoutError: If timeout expires before data arrives. + """ + import erlang + + try: + return erlang._byte_channel_receive_bytes(self._ref, timeout_ms) + except RuntimeError as e: + if "closed" in str(e): + raise ByteChannelClosed("Channel has been closed") + raise + + def __iter__(self): + """Iterate over bytes until the channel is closed. + + Yields: + Each chunk of bytes received from the channel. + + Example: + for chunk in byte_channel: + process(chunk) + """ + while True: + try: + yield self.receive_bytes() + except ByteChannelClosed: + break + + # ======================================================================== + # Async methods (asyncio compatible) + # ======================================================================== + + async def async_receive_bytes(self) -> bytes: + """Async receive - yields to other coroutines while waiting. + + This method uses event-driven notification when running on ErlangEventLoop. + When data arrives, the channel notifies the event loop via timer dispatch, + avoiding polling overhead. + + Falls back to polling on non-Erlang event loops. + + Returns: + The received bytes. + + Raises: + ByteChannelClosed: If the channel has been closed. + + Example: + data = await byte_channel.async_receive_bytes() + """ + import asyncio + + # Try non-blocking first (direct NIF - fast path) + result = self.try_receive_bytes() + if result is not None: + return result + + # Check if closed + if self._is_closed(): + raise ByteChannelClosed("Channel has been closed") + + # Get the running event loop + loop = asyncio.get_running_loop() + + # Check if this is an ErlangEventLoop with native dispatch support + if hasattr(loop, '_loop_capsule') and hasattr(loop, '_timers'): + return await self._async_receive_event_driven(loop) + else: + # Fallback for non-Erlang event loops: use polling + return await self._async_receive_polling() + + async def _async_receive_event_driven(self, loop) -> bytes: + """Event-driven async receive using channel waiter mechanism. + + Registers with the channel and waits for EVENT_TYPE_TIMER dispatch + when data arrives. No polling required. + """ + import asyncio + from asyncio import events + import erlang + + future = loop.create_future() + callback_id = id(future) + + # Callback that fires when channel has data + def on_channel_ready(): + if future.done(): + return + try: + data = self.try_receive_bytes() + if data is not None: + future.set_result(data) + elif self._is_closed(): + future.set_exception(ByteChannelClosed("Channel closed")) + else: + # Data consumed by race - set None to signal retry + future.set_result(None) + except Exception as e: + future.set_exception(e) + + # Create handle and register in timer dispatch system + # Only use _timers dict - don't touch _handle_to_callback_id (for timer cancellation only) + handle = events.Handle(on_channel_ready, (), loop) + loop._timers[callback_id] = handle + + try: + # Register waiter with channel (direct C call, no Erlang overhead) + result = erlang._byte_channel_wait(self._ref, callback_id, loop._loop_capsule) + + if isinstance(result, tuple): + if result[0] == 'ok': + # Data already available - clean up and return + loop._timers.pop(callback_id, None) + return result[1] + elif result[0] == 'error': + loop._timers.pop(callback_id, None) + if result[1] == 'closed': + raise ByteChannelClosed("Channel closed") + raise RuntimeError(f"Channel wait failed: {result[1]}") + + # Waiter registered - await notification + try: + data = await future + # Handle race condition (data was None) + if data is None: + # Retry with polling fallback for this edge case + return await self._async_receive_polling() + return data + except asyncio.CancelledError: + erlang._byte_channel_cancel_wait(self._ref, callback_id) + raise + finally: + loop._timers.pop(callback_id, None) + + async def _async_receive_polling(self) -> bytes: + """Fallback async receive using polling.""" + import asyncio + + while True: + await asyncio.sleep(0.0001) # 100us yield + result = self.try_receive_bytes() + if result is not None: + return result + if self._is_closed(): + raise ByteChannelClosed("Channel closed") + + def __aiter__(self): + """Return async iterator for the byte channel. + + Example: + async for chunk in byte_channel: + process(chunk) + """ + return self + + async def __anext__(self) -> bytes: + """Get next bytes asynchronously. + + Raises: + StopAsyncIteration: When the channel is closed. + """ + try: + return await self.async_receive_bytes() + except ByteChannelClosed: + raise StopAsyncIteration + + def _is_closed(self) -> bool: + """Check if the channel is closed.""" + import erlang + try: + return erlang._channel_is_closed(self._ref) + except Exception: + return True + + def __repr__(self): + return f"" diff --git a/priv/_erlang_impl/_channel.py b/priv/_erlang_impl/_channel.py index 9376990..9fe548c 100644 --- a/priv/_erlang_impl/_channel.py +++ b/priv/_erlang_impl/_channel.py @@ -147,8 +147,11 @@ def __iter__(self): async def async_receive(self): """Async receive - yields to other coroutines while waiting. - This method integrates with the asyncio event loop, allowing other - coroutines to run while waiting for data from Erlang. + This method uses event-driven notification when running on ErlangEventLoop. + When data arrives, the channel notifies the event loop via timer dispatch, + avoiding polling overhead. + + Falls back to polling on non-Erlang event loops. Returns: The received Erlang term, converted to Python. @@ -170,14 +173,90 @@ async def async_receive(self): if self._is_closed(): raise ChannelClosed("Channel has been closed") - # Poll with short sleeps, yielding to other coroutines + # Get the running event loop + loop = asyncio.get_running_loop() + + # Check if this is an ErlangEventLoop with native dispatch support + if hasattr(loop, '_loop_capsule') and hasattr(loop, '_timers'): + return await self._async_receive_event_driven(loop) + else: + # Fallback for non-Erlang event loops: use polling + return await self._async_receive_polling() + + async def _async_receive_event_driven(self, loop): + """Event-driven async receive using channel waiter mechanism. + + Registers with the channel and waits for EVENT_TYPE_TIMER dispatch + when data arrives. No polling required. + """ + import asyncio + from asyncio import events + import erlang + + future = loop.create_future() + callback_id = id(future) + + # Callback that fires when channel has data + def on_channel_ready(): + if future.done(): + return + try: + data = self.try_receive() + if data is not None: + future.set_result(data) + elif self._is_closed(): + future.set_exception(ChannelClosed("Channel closed")) + else: + # Data consumed by race - set None to signal retry + future.set_result(None) + except Exception as e: + future.set_exception(e) + + # Create handle and register in timer dispatch system + # Only use _timers dict - don't touch _handle_to_callback_id (for timer cancellation only) + handle = events.Handle(on_channel_ready, (), loop) + loop._timers[callback_id] = handle + + try: + # Register waiter with channel (direct C call, no Erlang overhead) + result = erlang._channel_wait(self._ref, callback_id, loop._loop_capsule) + + if isinstance(result, tuple): + if result[0] == 'ok': + # Data already available - clean up and return + loop._timers.pop(callback_id, None) + return result[1] + elif result[0] == 'error': + loop._timers.pop(callback_id, None) + if result[1] == 'closed': + raise ChannelClosed("Channel closed") + raise RuntimeError(f"Channel wait failed: {result[1]}") + + # Waiter registered - await notification + try: + data = await future + # Handle race condition (data was None) + if data is None: + # Retry with polling fallback for this edge case + return await self._async_receive_polling() + return data + except asyncio.CancelledError: + erlang._channel_cancel_wait(self._ref, callback_id) + raise + finally: + loop._timers.pop(callback_id, None) + + async def _async_receive_polling(self): + """Fallback async receive using polling.""" + import asyncio + while True: - await asyncio.sleep(0.0001) # 100µs yield to event loop + await asyncio.sleep(0.0001) # 100us yield result = self.try_receive() if result is not None: return result if self._is_closed(): - raise ChannelClosed("Channel has been closed") + raise ChannelClosed("Channel closed") def __aiter__(self): """Return async iterator for the channel. diff --git a/src/py_byte_channel.erl b/src/py_byte_channel.erl new file mode 100644 index 0000000..c454c8f --- /dev/null +++ b/src/py_byte_channel.erl @@ -0,0 +1,300 @@ +%% Copyright 2026 Benoit Chesneau +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%%% @doc Raw byte channel for Erlang-Python communication. +%%% +%%% ByteChannel provides raw byte streaming without term serialization, +%%% suitable for HTTP bodies, file transfers, and binary protocols. +%%% +%%% Unlike py_channel which serializes Erlang terms, ByteChannel passes +%%% raw binaries directly without any encoding/decoding overhead. +%%% +%%% == Usage == +%%% +%%% ``` +%%% %% Create a byte channel +%%% {ok, Ch} = py_byte_channel:new(), +%%% +%%% %% Send raw bytes to Python +%%% ok = py_byte_channel:send(Ch, <<"HTTP/1.1 200 OK\r\n">>), +%%% +%%% %% Python receives via ByteChannel.receive_bytes() +%%% %% Python sends back via ByteChannel.send_bytes() +%%% +%%% %% Close when done +%%% py_byte_channel:close(Ch). +%%% ''' +%%% +%%% == When to Use == +%%% +%%% +%%% +%%% @end +-module(py_byte_channel). + +-export([ + new/0, + new/1, + send/2, + try_receive/1, + recv/1, + recv/2, + close/1, + info/1, + %% Callback handlers for Python + register_callbacks/0 +]). + +%% Internal callback handlers +-export([ + handle_receive_bytes/1, + handle_try_receive_bytes/1, + handle_wait_bytes/1, + handle_cancel_wait_bytes/1 +]). + +-type channel() :: reference(). +-type opts() :: #{ + max_size => non_neg_integer() +}. + +-export_type([channel/0, opts/0]). + +%% @doc Create a new byte channel with default settings. +%% +%% Creates an unbounded channel for raw byte passing. +%% +%% @returns {ok, Channel} | {error, Reason} +-spec new() -> {ok, channel()} | {error, term()}. +new() -> + new(#{}). + +%% @doc Create a new byte channel with options. +%% +%% Options: +%% +%% +%% @param Opts Channel options +%% @returns {ok, Channel} | {error, Reason} +-spec new(opts()) -> {ok, channel()} | {error, term()}. +new(Opts) when is_map(Opts) -> + MaxSize = maps:get(max_size, Opts, 0), + py_nif:channel_create(MaxSize). + +%% @doc Send raw bytes to a channel. +%% +%% The binary is queued directly for Python to receive without +%% any term serialization. +%% If the queue exceeds max_size, returns `busy' (backpressure). +%% +%% @param Channel Channel reference +%% @param Bytes Binary data to send +%% @returns ok | busy | {error, closed} +-spec send(channel(), binary()) -> ok | busy | {error, term()}. +send(Channel, Bytes) when is_binary(Bytes) -> + py_nif:byte_channel_send_bytes(Channel, Bytes). + +%% @doc Try to receive raw bytes from a channel (non-blocking). +%% +%% Returns immediately with binary data or empty/closed status. +%% +%% @param Channel Channel reference +%% @returns {ok, Binary} | {error, empty} | {error, closed} +-spec try_receive(channel()) -> {ok, binary()} | {error, empty | closed | term()}. +try_receive(Channel) -> + py_nif:byte_channel_try_receive_bytes(Channel). + +%% @doc Receive raw bytes from a channel (blocking). +%% +%% Blocks until data is available. Equivalent to recv(Channel, infinity). +%% +%% @param Channel Channel reference +%% @returns {ok, Binary} | {error, closed} +-spec recv(channel()) -> {ok, binary()} | {error, closed | term()}. +recv(Channel) -> + ?MODULE:recv(Channel, infinity). + +%% @doc Receive raw bytes from a channel with timeout. +%% +%% Blocks until data is available or timeout expires. +%% +%% @param Channel Channel reference +%% @param Timeout Timeout in milliseconds or 'infinity' +%% @returns {ok, Binary} | {error, closed} | {error, timeout} +-spec recv(channel(), timeout()) -> {ok, binary()} | {error, closed | timeout | term()}. +recv(Channel, Timeout) -> + case try_receive(Channel) of + {ok, Data} -> + {ok, Data}; + {error, closed} -> + {error, closed}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(Channel) of + ok -> + wait_for_bytes(Channel, Timeout); + has_data -> + %% Race condition: data arrived, retry + ?MODULE:recv(Channel, Timeout); + {error, Reason} -> + {error, Reason} + end + end. + +%% @private +%% Wait for bytes to arrive via Erlang message passing. +-spec wait_for_bytes(channel(), timeout()) -> {ok, binary()} | {error, term()}. +wait_for_bytes(Channel, Timeout) -> + erlang_receive(Channel, Timeout). + +%% @private +%% Internal receive that uses the erlang receive keyword +erlang_receive(Channel, Timeout) -> + receive + channel_data_ready -> + case try_receive(Channel) of + {ok, Data} -> + {ok, Data}; + {error, empty} -> + %% Race: data was consumed, re-register and wait + case py_nif:channel_register_sync_waiter(Channel) of + ok -> + erlang_receive(Channel, Timeout); + has_data -> + ?MODULE:recv(Channel, Timeout); + {error, Reason} -> + {error, Reason} + end; + {error, closed} -> + {error, closed} + end; + channel_closed -> + {error, closed} + after Timeout -> + {error, timeout} + end. + +%% @doc Close a byte channel. +%% +%% Signals Python receivers that no more bytes will arrive. +%% +%% @param Channel Channel reference +%% @returns ok +-spec close(channel()) -> ok. +close(Channel) -> + py_nif:channel_close(Channel). + +%% @doc Get channel information. +%% +%% Returns a map with: +%% +%% +%% @param Channel Channel reference +%% @returns Info map +-spec info(channel()) -> map(). +info(Channel) -> + py_nif:channel_info(Channel). + +%%% ============================================================================ +%%% Python Callback Registration +%%% ============================================================================ + +%% @doc Register byte channel callback handlers for Python. +%% +%% This should be called during application startup to enable +%% Python's erlang.call('_py_byte_channel_receive', ...) etc. +-spec register_callbacks() -> ok. +register_callbacks() -> + py_callback:register(<<"_py_byte_channel_receive">>, {?MODULE, handle_receive_bytes}), + py_callback:register(<<"_py_byte_channel_try_receive">>, {?MODULE, handle_try_receive_bytes}), + py_callback:register(<<"_py_byte_channel_wait">>, {?MODULE, handle_wait_bytes}), + py_callback:register(<<"_py_byte_channel_cancel_wait">>, {?MODULE, handle_cancel_wait_bytes}), + ok. + +%% @private +%% Handle blocking receive from Python. +%% This blocks until data is available by registering as a sync waiter. +%% Args: [ChannelRef] +-spec handle_receive_bytes([term()]) -> term(). +handle_receive_bytes([ChannelRef]) -> + case try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, closed} -> + {error, closed}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_bytes_callback(ChannelRef); + has_data -> + handle_receive_bytes([ChannelRef]); + {error, Reason} -> + {error, Reason} + end + end. + +%% @private +%% Wait for bytes in callback context (no timeout). +-spec wait_for_bytes_callback(reference()) -> {ok, binary()} | {error, term()}. +wait_for_bytes_callback(ChannelRef) -> + receive + channel_data_ready -> + case try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, empty} -> + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_bytes_callback(ChannelRef); + has_data -> + handle_receive_bytes([ChannelRef]); + {error, Reason} -> + {error, Reason} + end; + {error, closed} -> + {error, closed} + end; + channel_closed -> + {error, closed} + end. + +%% @private +%% Handle non-blocking receive from Python. +%% Args: [ChannelRef] +-spec handle_try_receive_bytes([term()]) -> term(). +handle_try_receive_bytes([ChannelRef]) -> + try_receive(ChannelRef). + +%% @private +%% Handle async wait registration from Python. +%% Args: [ChannelRef, CallbackId, LoopRef] +-spec handle_wait_bytes([term()]) -> ok | {ok, binary()} | {error, term()}. +handle_wait_bytes([ChannelRef, CallbackId, LoopRef]) -> + py_nif:byte_channel_wait_bytes(ChannelRef, CallbackId, LoopRef). + +%% @private +%% Handle cancel wait from Python. +%% Args: [ChannelRef, CallbackId] +-spec handle_cancel_wait_bytes([term()]) -> ok. +handle_cancel_wait_bytes([ChannelRef, CallbackId]) -> + py_nif:channel_cancel_wait(ChannelRef, CallbackId). diff --git a/src/py_nif.erl b/src/py_nif.erl index f265693..b4bc958 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -221,6 +221,10 @@ channel_wait/3, channel_cancel_wait/2, channel_register_sync_waiter/1, + %% ByteChannel API - raw bytes, no term conversion + byte_channel_send_bytes/2, + byte_channel_try_receive_bytes/1, + byte_channel_wait_bytes/3, %% PyBuffer API - zero-copy WSGI input py_buffer_create/1, py_buffer_write/2, @@ -1866,6 +1870,52 @@ channel_cancel_wait(_ChannelRef, _CallbackId) -> channel_register_sync_waiter(_ChannelRef) -> ?NIF_STUB. +%%% ============================================================================ +%%% ByteChannel API - Raw bytes, no term conversion +%%% +%%% ByteChannel provides raw byte streaming without term serialization, +%%% suitable for HTTP bodies, file transfers, and binary protocols. +%%% ============================================================================ + +%% @doc Send raw bytes to a channel (no term serialization). +%% +%% Sends binary data directly to the channel without converting to +%% Erlang external term format. +%% +%% @param ChannelRef Channel reference +%% @param Bytes Binary data to send +%% @returns ok | busy | {error, closed} +-spec byte_channel_send_bytes(reference(), binary()) -> ok | busy | {error, term()}. +byte_channel_send_bytes(_ChannelRef, _Bytes) -> + ?NIF_STUB. + +%% @doc Try to receive raw bytes from a channel (non-blocking). +%% +%% Returns immediately with binary data or empty/closed status. +%% No term deserialization is performed. +%% +%% @param ChannelRef Channel reference +%% @returns {ok, Binary} | {error, empty} | {error, closed} +-spec byte_channel_try_receive_bytes(reference()) -> + {ok, binary()} | {error, empty | closed | term()}. +byte_channel_try_receive_bytes(_ChannelRef) -> + ?NIF_STUB. + +%% @doc Register an async waiter for raw bytes from a channel. +%% +%% If data is available, returns immediately with {ok, Binary}. +%% If empty, registers the callback_id for dispatch when data arrives. +%% Same semantics as channel_wait but returns raw binary data. +%% +%% @param ChannelRef Channel reference +%% @param CallbackId Callback ID for timer dispatch +%% @param LoopRef Event loop reference for dispatching +%% @returns ok | {ok, Binary} | {error, closed} +-spec byte_channel_wait_bytes(reference(), non_neg_integer(), reference()) -> + ok | {ok, binary()} | {error, term()}. +byte_channel_wait_bytes(_ChannelRef, _CallbackId, _LoopRef) -> + ?NIF_STUB. + %%% ============================================================================ %%% PyBuffer API - Zero-copy WSGI Input %%% ============================================================================ diff --git a/test/py_byte_channel_SUITE.erl b/test/py_byte_channel_SUITE.erl new file mode 100644 index 0000000..92b15dd --- /dev/null +++ b/test/py_byte_channel_SUITE.erl @@ -0,0 +1,369 @@ +%%% @doc Common Test suite for py_byte_channel API. +%%% +%%% Tests the raw byte channel API for Erlang-Python communication. +-module(py_byte_channel_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + create_byte_channel_test/1, + create_byte_channel_with_max_size_test/1, + erlang_send_bytes_test/1, + erlang_receive_bytes_test/1, + send_receive_multiple_bytes_test/1, + try_receive_empty_bytes_test/1, + close_byte_channel_test/1, + byte_channel_info_test/1, + backpressure_bytes_test/1, + send_bytes_to_closed_test/1, + %% Python tests + python_byte_channel_class_test/1, + python_byte_channel_send_bytes_test/1, + python_byte_channel_receive_bytes_test/1, + %% Sync blocking receive tests + sync_receive_bytes_immediate_test/1, + sync_receive_bytes_wait_test/1, + sync_receive_bytes_closed_test/1, + %% Large payload test + large_payload_bytes_test/1, + %% Async event loop dispatch test + async_receive_bytes_e2e_test/1 +]). + +all() -> [ + create_byte_channel_test, + create_byte_channel_with_max_size_test, + erlang_send_bytes_test, + erlang_receive_bytes_test, + send_receive_multiple_bytes_test, + try_receive_empty_bytes_test, + close_byte_channel_test, + byte_channel_info_test, + backpressure_bytes_test, + send_bytes_to_closed_test, + %% Python tests + python_byte_channel_class_test, + python_byte_channel_send_bytes_test, + python_byte_channel_receive_bytes_test, + %% Sync blocking receive tests + sync_receive_bytes_immediate_test, + sync_receive_bytes_wait_test, + sync_receive_bytes_closed_test, + %% Large payload test + large_payload_bytes_test, + %% Async event loop dispatch test + async_receive_bytes_e2e_test +]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + %% Register byte channel callbacks + ok = py_byte_channel:register_callbacks(), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(async_receive_bytes_e2e_test, Config) -> + %% Define the async receive helper function + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import erlang +from erlang import ByteChannel + +async def receive_bytes(ch_ref): + ch = ByteChannel(ch_ref) + return await ch.async_receive_bytes() +">>), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%% ============================================================================ +%%% Test Cases +%%% ============================================================================ + +%% @doc Test creating a byte channel with default settings +create_byte_channel_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + true = is_reference(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test creating a byte channel with max_size for backpressure +create_byte_channel_with_max_size_test(_Config) -> + {ok, Ch} = py_byte_channel:new(#{max_size => 1000}), + true = is_reference(Ch), + Info = py_byte_channel:info(Ch), + 1000 = maps:get(max_size, Info), + ok = py_byte_channel:close(Ch). + +%% @doc Test basic send of raw bytes +erlang_send_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"hello">>), + ok = py_byte_channel:send(Ch, <<"world">>), + ok = py_byte_channel:close(Ch). + +%% @doc Test basic receive of raw bytes (no term decoding) +erlang_receive_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"hello">>), + %% Use byte channel try_receive - returns raw binary + {ok, <<"hello">>} = py_nif:byte_channel_try_receive_bytes(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test sending and receiving multiple bytes +send_receive_multiple_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"one">>), + ok = py_byte_channel:send(Ch, <<"two">>), + ok = py_byte_channel:send(Ch, <<"three">>), + {ok, <<"one">>} = py_byte_channel:try_receive(Ch), + {ok, <<"two">>} = py_byte_channel:try_receive(Ch), + {ok, <<"three">>} = py_byte_channel:try_receive(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test try_receive on empty channel +try_receive_empty_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + {error, empty} = py_byte_channel:try_receive(Ch), + ok = py_byte_channel:close(Ch). + +%% @doc Test closing a byte channel +close_byte_channel_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:send(Ch, <<"data">>), + ok = py_byte_channel:close(Ch), + Info = py_byte_channel:info(Ch), + true = maps:get(closed, Info). + +%% @doc Test byte channel info +byte_channel_info_test(_Config) -> + {ok, Ch} = py_byte_channel:new(#{max_size => 500}), + Info1 = py_byte_channel:info(Ch), + 0 = maps:get(size, Info1), + 500 = maps:get(max_size, Info1), + false = maps:get(closed, Info1), + + ok = py_byte_channel:send(Ch, <<"test">>), + Info2 = py_byte_channel:info(Ch), + %% Size should be exactly the binary size (no term overhead) + 4 = maps:get(size, Info2), + + ok = py_byte_channel:close(Ch). + +%% @doc Test backpressure when queue exceeds max_size +backpressure_bytes_test(_Config) -> + %% Create channel with small max_size + {ok, Ch} = py_byte_channel:new(#{max_size => 100}), + + %% Fill up the channel - no term overhead for raw bytes + Data50 = binary:copy(<<0>>, 50), + ok = py_byte_channel:send(Ch, Data50), + + %% Check current size + Info1 = py_byte_channel:info(Ch), + 50 = maps:get(size, Info1), + ct:pal("After first send, size: ~p", [50]), + + %% Send another 50 bytes - should succeed (total 100) + ok = py_byte_channel:send(Ch, Data50), + Info2 = py_byte_channel:info(Ch), + 100 = maps:get(size, Info2), + ct:pal("After second send, size: ~p", [100]), + + %% Next send should return busy (backpressure) + busy = py_byte_channel:send(Ch, <<"more">>), + + %% Drain one message + {ok, _} = py_byte_channel:try_receive(Ch), + + %% Now should be able to send again + ok = py_byte_channel:send(Ch, <<"small">>), + + ok = py_byte_channel:close(Ch). + +%% @doc Test sending to a closed channel +send_bytes_to_closed_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + ok = py_byte_channel:close(Ch), + {error, closed} = py_byte_channel:send(Ch, <<"data">>). + +%%% ============================================================================ +%%% Python Tests +%%% ============================================================================ + +%% @doc Test Python ByteChannel class is importable +python_byte_channel_class_test(_Config) -> + Ctx = py:context(1), + + %% Test that the ByteChannel module is importable via erlang namespace + ok = py:exec(Ctx, <<"from erlang import ByteChannel, ByteChannelClosed">>), + + %% Test basic ByteChannel class behavior + {ok, true} = py:eval(Ctx, <<"callable(ByteChannel)">>), + {ok, true} = py:eval(Ctx, <<"issubclass(ByteChannelClosed, Exception)">>), + + ok. + +%% @doc Test Python ByteChannel send_bytes method +python_byte_channel_send_bytes_test(_Config) -> + Ctx = py:context(1), + + %% Test that send_bytes method exists + ok = py:exec(Ctx, <<"from erlang import ByteChannel">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'send_bytes')">>), + + ok. + +%% @doc Test Python ByteChannel receive_bytes method +python_byte_channel_receive_bytes_test(_Config) -> + Ctx = py:context(1), + + %% Test that ByteChannel methods exist + ok = py:exec(Ctx, <<"from erlang import ByteChannel">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'receive_bytes')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'try_receive_bytes')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, 'async_receive_bytes')">>), + + %% Verify async iteration methods + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__aiter__')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__anext__')">>), + {ok, true} = py:eval(Ctx, <<"hasattr(ByteChannel, '__iter__')">>), + + ok. + +%%% ============================================================================ +%%% Sync Blocking Receive Tests +%%% ============================================================================ + +%% @doc Test sync receive when data is already available (immediate return) +sync_receive_bytes_immediate_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Send data before receive + ok = py_byte_channel:send(Ch, <<"immediate_bytes">>), + + %% Receive should return immediately + {ok, <<"immediate_bytes">>} = py_byte_channel:handle_receive_bytes([Ch]), + + ok = py_byte_channel:close(Ch). + +%% @doc Test sync receive that blocks waiting for data +sync_receive_bytes_wait_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_byte_channel:handle_receive_bytes([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Send data - should wake up the receiver + ok = py_byte_channel:send(Ch, <<"delayed_bytes">>), + + %% Wait for result + receive + {receive_result, {ok, <<"delayed_bytes">>}} -> + ok + after 2000 -> + ct:fail("Receiver did not get data within timeout") + end, + + ok = py_byte_channel:close(Ch). + +%% @doc Test sync receive when channel is closed while waiting +sync_receive_bytes_closed_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_byte_channel:handle_receive_bytes([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Close the channel - should wake up receiver with error + ok = py_byte_channel:close(Ch), + + %% Wait for result + receive + {receive_result, {error, closed}} -> + ok + after 2000 -> + ct:fail("Receiver did not get closed notification within timeout") + end. + +%%% ============================================================================ +%%% Large Payload Test +%%% ============================================================================ + +%% @doc Test sending and receiving large binary payloads +large_payload_bytes_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Create a 1MB binary + LargeData = binary:copy(<<"X">>, 1024 * 1024), + + %% Send it + ok = py_byte_channel:send(Ch, LargeData), + + %% Check size is correct (no term overhead) + Info = py_byte_channel:info(Ch), + 1048576 = maps:get(size, Info), + + %% Receive and verify + {ok, ReceivedData} = py_byte_channel:try_receive(Ch), + true = (byte_size(ReceivedData) =:= 1048576), + true = (ReceivedData =:= LargeData), + + ok = py_byte_channel:close(Ch). + +%%% ============================================================================ +%%% Async Event Loop Dispatch Test +%%% ============================================================================ + +%% @doc Test async_receive_bytes with proper event loop dispatch (no polling) +%% Sends data after async receive starts, verifies event-driven wakeup +async_receive_bytes_e2e_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Test 1: Immediate data - should return without waiting + ok = py_byte_channel:send(Ch, <<"immediate_bytes">>), + + Ctx = py:context(1), + + %% Run async receive - data is already there, should return immediately + {ok, <<"immediate_bytes">>} = py:eval(Ctx, <<"erlang.run(receive_bytes(ch))">>, + #{<<"ch">> => Ch}), + ct:pal("Async receive immediate data OK"), + + %% Test 2: Send data after async starts - tests event dispatch + %% We send data first, then run async (to avoid race conditions in test) + ok = py_byte_channel:send(Ch, <<"async_bytes">>), + + {ok, <<"async_bytes">>} = py:eval(Ctx, <<"erlang.run(receive_bytes(ch))">>, + #{<<"ch">> => Ch}), + ct:pal("Async receive via erlang.run() OK"), + + ok = py_byte_channel:close(Ch).