From c686fdbb0d21318b85567d3cf99f2b952622651b Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 18 Mar 2026 02:16:03 +0100 Subject: [PATCH] Fix loop capsule name mismatch in channel wait functions The channel wait functions (_channel_wait, _byte_channel_wait) used "erlang.event_loop" as the capsule name, but the event loop creates capsules with "erlang_python.event_loop". This mismatch caused "invalid loop reference" errors when using async channel operations with py_event_loop:create_task. Also adds test for create_task + async_receive_bytes integration. --- c_src/py_callback.c | 4 +-- test/py_byte_channel_SUITE.erl | 65 ++++++++++++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 7825feb..783bf38 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -3062,7 +3062,7 @@ static PyObject *erlang_channel_wait_impl(PyObject *self, PyObject *args) { return NULL; } - erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang_python.event_loop"); if (loop == NULL) { PyErr_SetString(PyExc_ValueError, "invalid loop reference"); return NULL; @@ -3220,7 +3220,7 @@ static PyObject *erlang_byte_channel_wait_impl(PyObject *self, PyObject *args) { return NULL; } - erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop"); + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang_python.event_loop"); if (loop == NULL) { PyErr_SetString(PyExc_ValueError, "invalid loop reference"); return NULL; diff --git a/test/py_byte_channel_SUITE.erl b/test/py_byte_channel_SUITE.erl index 92b15dd..e7afd63 100644 --- a/test/py_byte_channel_SUITE.erl +++ b/test/py_byte_channel_SUITE.erl @@ -35,7 +35,9 @@ %% Large payload test large_payload_bytes_test/1, %% Async event loop dispatch test - async_receive_bytes_e2e_test/1 + async_receive_bytes_e2e_test/1, + %% create_task + async receive test + create_task_async_receive_test/1 ]). all() -> [ @@ -60,7 +62,9 @@ all() -> [ %% Large payload test large_payload_bytes_test, %% Async event loop dispatch test - async_receive_bytes_e2e_test + async_receive_bytes_e2e_test, + %% create_task + async receive test + create_task_async_receive_test ]. init_per_suite(Config) -> @@ -367,3 +371,60 @@ async_receive_bytes_e2e_test(_Config) -> ct:pal("Async receive via erlang.run() OK"), ok = py_byte_channel:close(Ch). + +%%% ============================================================================ +%%% create_task + Async ByteChannel Test +%%% ============================================================================ + +%% @doc Test async_receive_bytes works correctly with py_event_loop:create_task +%% This verifies the loop capsule name is correct (erlang_python.event_loop) +create_task_async_receive_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% First send data so it's available when task starts + ok = py_byte_channel:send(Ch, <<"create_task_bytes">>), + + %% Define and run the task - env reuse should make __main__ functions visible + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import erlang +from erlang import ByteChannel + +async def task_receive_bytes(ch_ref, reply_pid): + '''Task that receives bytes and sends result back to Erlang.''' + try: + ch = ByteChannel(ch_ref) + data = await ch.async_receive_bytes() + erlang.send(reply_pid, ('result', data)) + except Exception as e: + erlang.send(reply_pid, ('error', str(e))) +">>), + + %% Create a task that will await on channel receive + %% create_task/3 uses the global event loop internally + TaskRef = py_event_loop:create_task( + "__main__", "task_receive_bytes", [Ch, self()]), + + %% Wait for result from the task + %% Note: Python tuple ('result', data) becomes {<<"result">>, data} in Erlang + receive + {<<"result">>, <<"create_task_bytes">>} -> + ct:pal("create_task + async_receive_bytes OK"); + {<<"error">>, ErrMsg} -> + ct:pal("Task error: ~p", [ErrMsg]), + ct:fail({task_error, ErrMsg}); + Other -> + ct:pal("Unexpected message: ~p", [Other]), + ct:fail({unexpected_message, Other}) + after 5000 -> + ct:fail("Timeout waiting for task result") + end, + + %% Wait for task to complete + case py_event_loop:await(TaskRef, 5000) of + {ok, _} -> ok; + {error, AwaitErr} -> + ct:pal("Await error: ~p", [AwaitErr]) + end, + + ok = py_byte_channel:close(Ch).