diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 76e904d..f560655 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -3700,6 +3700,25 @@ bool event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, } pthread_mutex_unlock(&loop->mutex); + + /* + * Also send task_ready to the worker if one exists. + * This is needed for create_task: Python is not waiting on the condition + * variable, so we need to notify the Erlang worker to call process_ready_tasks. + * + * Uses the same coalescing logic as submit_task to avoid message floods. + */ + if (loop->has_worker) { + if (!atomic_exchange(&loop->task_wake_pending, true)) { + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env != NULL) { + ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready"); + enif_send(NULL, &loop->worker_pid, msg_env, msg); + enif_free_env(msg_env); + } + } + } + return true; } diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py index 2dea439..c3f1357 100644 --- a/priv/_erlang_impl/_byte_channel.py +++ b/priv/_erlang_impl/_byte_channel.py @@ -54,6 +54,8 @@ async def process_bytes_async(channel_ref): ch.send_bytes(b"HTTP/1.1 200 OK\\r\\n") """ +from typing import Optional + __all__ = ['ByteChannel', 'ByteChannelClosed'] @@ -102,7 +104,7 @@ def send_bytes(self, data: bytes) -> bool: # Use _channel_send which sends raw bytes return erlang._channel_send(self._ref, bytes(data)) - def try_receive_bytes(self) -> bytes | None: + def try_receive_bytes(self) -> Optional[bytes]: """Try to receive bytes without blocking. Returns: diff --git a/test/py_byte_channel_SUITE.erl b/test/py_byte_channel_SUITE.erl index 62a45c8..d7b3aa4 100644 --- a/test/py_byte_channel_SUITE.erl +++ b/test/py_byte_channel_SUITE.erl @@ -41,7 +41,8 @@ %% Close and drain tests close_drain_bytes_erlang_test/1, close_drain_bytes_python_sync_test/1, - close_drain_bytes_python_async_test/1 + close_drain_bytes_python_async_test/1, + close_drain_bytes_create_task_async_test/1 ]). all() -> [ @@ -72,7 +73,8 @@ all() -> [ %% Close and drain tests close_drain_bytes_erlang_test, close_drain_bytes_python_sync_test, - close_drain_bytes_python_async_test + close_drain_bytes_python_async_test, + close_drain_bytes_create_task_async_test ]. init_per_suite(Config) -> @@ -530,3 +532,63 @@ async def async_drain_byte_channel(ch_ref): py:eval(Ctx, <<"erlang.run(async_drain_byte_channel(ch))">>, #{<<"ch">> => Ch}), ct:pal("Python async byte channel close+drain test passed"). + +%% @doc Test async drain with create_task when data arrives after task starts +%% This tests the notification callback path: task registers waiter, then data arrives +close_drain_bytes_create_task_async_test(_Config) -> + {ok, Ch} = py_byte_channel:new(), + + %% Define the async drain task + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import erlang +from erlang import ByteChannel + +async def drain_task(ch_ref, reply_pid): + '''Task that drains byte channel and sends results back.''' + try: + ch = ByteChannel(ch_ref) + chunks = [] + async for chunk in ch: + chunks.append(chunk) + erlang.send(reply_pid, ('result', chunks)) + except Exception as e: + erlang.send(reply_pid, ('error', str(e))) +">>), + + %% Create the task BEFORE sending any data + %% This forces the task to register a waiter and wait for notifications + TaskRef = py_event_loop:create_task( + "__main__", "drain_task", [Ch, self()]), + + %% Give the task time to start and register waiter + timer:sleep(100), + + %% Now send data - should trigger notification callback + ok = py_byte_channel:send(Ch, <<"chunk1">>), + ok = py_byte_channel:send(Ch, <<"chunk2">>), + ok = py_byte_channel:send(Ch, <<"chunk3">>), + + %% Close the channel to signal end of stream + ok = py_byte_channel:close(Ch), + + %% Wait for result from the task + receive + {<<"result">>, [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>]} -> + ct:pal("create_task async drain with delayed data OK"); + {<<"result">>, Other} -> + ct:pal("Unexpected result: ~p", [Other]), + ct:fail({unexpected_result, Other}); + {<<"error">>, ErrMsg} -> + ct:pal("Task error: ~p", [ErrMsg]), + ct:fail({task_error, ErrMsg}) + after 5000 -> + ct:fail("Timeout waiting for drain task result") + end, + + %% Wait for task to complete + case py_event_loop:await(TaskRef, 5000) of + {ok, _} -> ok; + {error, AwaitErr} -> + ct:pal("Await error: ~p", [AwaitErr]) + end. diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index 41027e4..fe28517 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -47,7 +47,8 @@ %% Close and drain tests close_drain_erlang_test/1, close_drain_python_sync_test/1, - close_drain_python_async_test/1 + close_drain_python_async_test/1, + close_drain_create_task_async_test/1 ]). all() -> [ @@ -84,7 +85,8 @@ all() -> [ %% Close and drain tests close_drain_erlang_test, close_drain_python_sync_test, - close_drain_python_async_test + close_drain_python_async_test, + close_drain_create_task_async_test ]. init_per_suite(Config) -> @@ -707,3 +709,63 @@ async def async_drain_channel(ch_ref): py:eval(Ctx, <<"erlang.run(async_drain_channel(ch))">>, #{<<"ch">> => Ch}), ct:pal("Python async close+drain test passed"). + +%% @doc Test async drain with create_task when data arrives after task starts +%% This tests the notification callback path: task registers waiter, then data arrives +close_drain_create_task_async_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Define the async drain task + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import erlang +from erlang import Channel + +async def drain_task(ch_ref, reply_pid): + '''Task that drains channel and sends results back.''' + try: + ch = Channel(ch_ref) + messages = [] + async for msg in ch: + messages.append(msg) + erlang.send(reply_pid, ('result', messages)) + except Exception as e: + erlang.send(reply_pid, ('error', str(e))) +">>), + + %% Create the task BEFORE sending any data + %% This forces the task to register a waiter and wait for notifications + TaskRef = py_event_loop:create_task( + "__main__", "drain_task", [Ch, self()]), + + %% Give the task time to start and register waiter + timer:sleep(100), + + %% Now send data - should trigger notification callback + ok = py_channel:send(Ch, <<"msg1">>), + ok = py_channel:send(Ch, <<"msg2">>), + ok = py_channel:send(Ch, <<"msg3">>), + + %% Close the channel to signal end of stream + ok = py_channel:close(Ch), + + %% Wait for result from the task + receive + {<<"result">>, [<<"msg1">>, <<"msg2">>, <<"msg3">>]} -> + ct:pal("create_task async drain with delayed data OK"); + {<<"result">>, Other} -> + ct:pal("Unexpected result: ~p", [Other]), + ct:fail({unexpected_result, Other}); + {<<"error">>, ErrMsg} -> + ct:pal("Task error: ~p", [ErrMsg]), + ct:fail({task_error, ErrMsg}) + after 5000 -> + ct:fail("Timeout waiting for drain task result") + end, + + %% Wait for task to complete + case py_event_loop:await(TaskRef, 5000) of + {ok, _} -> ok; + {error, AwaitErr} -> + ct:pal("Await error: ~p", [AwaitErr]) + end.