Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@
*/

/** @brief Name for the PyCapsule storing event loop pointer */
static const char *EVENT_LOOP_CAPSULE_NAME = "erlang_python.event_loop";

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

/** @brief Module attribute name for storing the event loop */
static const char *EVENT_LOOP_ATTR_NAME = "_loop";

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

/* ============================================================================
* Module State Structure
Expand Down Expand Up @@ -1267,7 +1267,7 @@
if (!enif_get_atom(env, argv[1], atom_buf, sizeof(atom_buf), ERL_NIF_LATIN1)) {
return make_error(env, "invalid_id");
}
strncpy(loop->loop_id, atom_buf, sizeof(loop->loop_id) - 1);

Check warning on line 1270 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]

Check warning on line 1270 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]
loop->loop_id[sizeof(loop->loop_id) - 1] = '\0';
} else {
size_t copy_len = id_bin.size < sizeof(loop->loop_id) - 1 ?
Expand Down Expand Up @@ -3700,6 +3700,25 @@
}

pthread_mutex_unlock(&loop->mutex);

/*
* Also send task_ready to the worker if one exists.
* This is needed for create_task: Python is not waiting on the condition
* variable, so we need to notify the Erlang worker to call process_ready_tasks.
*
* Uses the same coalescing logic as submit_task to avoid message floods.
*/
if (loop->has_worker) {
if (!atomic_exchange(&loop->task_wake_pending, true)) {
ErlNifEnv *msg_env = enif_alloc_env();
if (msg_env != NULL) {
ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready");
enif_send(NULL, &loop->worker_pid, msg_env, msg);
enif_free_env(msg_env);
}
}
}

return true;
}

Expand Down
4 changes: 3 additions & 1 deletion priv/_erlang_impl/_byte_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ async def process_bytes_async(channel_ref):
ch.send_bytes(b"HTTP/1.1 200 OK\\r\\n")
"""

from typing import Optional

__all__ = ['ByteChannel', 'ByteChannelClosed']


Expand Down Expand Up @@ -102,7 +104,7 @@ def send_bytes(self, data: bytes) -> bool:
# Use _channel_send which sends raw bytes
return erlang._channel_send(self._ref, bytes(data))

def try_receive_bytes(self) -> bytes | None:
def try_receive_bytes(self) -> Optional[bytes]:
"""Try to receive bytes without blocking.

Returns:
Expand Down
66 changes: 64 additions & 2 deletions test/py_byte_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
%% Close and drain tests
close_drain_bytes_erlang_test/1,
close_drain_bytes_python_sync_test/1,
close_drain_bytes_python_async_test/1
close_drain_bytes_python_async_test/1,
close_drain_bytes_create_task_async_test/1
]).

all() -> [
Expand Down Expand Up @@ -72,7 +73,8 @@ all() -> [
%% Close and drain tests
close_drain_bytes_erlang_test,
close_drain_bytes_python_sync_test,
close_drain_bytes_python_async_test
close_drain_bytes_python_async_test,
close_drain_bytes_create_task_async_test
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -530,3 +532,63 @@ async def async_drain_byte_channel(ch_ref):
py:eval(Ctx, <<"erlang.run(async_drain_byte_channel(ch))">>, #{<<"ch">> => Ch}),

ct:pal("Python async byte channel close+drain test passed").

%% @doc Test async drain with create_task when data arrives after task starts
%% This tests the notification callback path: task registers waiter, then data arrives
close_drain_bytes_create_task_async_test(_Config) ->
{ok, Ch} = py_byte_channel:new(),

%% Define the async drain task
Ctx = py:context(1),
ok = py:exec(Ctx, <<"
import erlang
from erlang import ByteChannel

async def drain_task(ch_ref, reply_pid):
'''Task that drains byte channel and sends results back.'''
try:
ch = ByteChannel(ch_ref)
chunks = []
async for chunk in ch:
chunks.append(chunk)
erlang.send(reply_pid, ('result', chunks))
except Exception as e:
erlang.send(reply_pid, ('error', str(e)))
">>),

%% Create the task BEFORE sending any data
%% This forces the task to register a waiter and wait for notifications
TaskRef = py_event_loop:create_task(
"__main__", "drain_task", [Ch, self()]),

%% Give the task time to start and register waiter
timer:sleep(100),

%% Now send data - should trigger notification callback
ok = py_byte_channel:send(Ch, <<"chunk1">>),
ok = py_byte_channel:send(Ch, <<"chunk2">>),
ok = py_byte_channel:send(Ch, <<"chunk3">>),

%% Close the channel to signal end of stream
ok = py_byte_channel:close(Ch),

%% Wait for result from the task
receive
{<<"result">>, [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>]} ->
ct:pal("create_task async drain with delayed data OK");
{<<"result">>, Other} ->
ct:pal("Unexpected result: ~p", [Other]),
ct:fail({unexpected_result, Other});
{<<"error">>, ErrMsg} ->
ct:pal("Task error: ~p", [ErrMsg]),
ct:fail({task_error, ErrMsg})
after 5000 ->
ct:fail("Timeout waiting for drain task result")
end,

%% Wait for task to complete
case py_event_loop:await(TaskRef, 5000) of
{ok, _} -> ok;
{error, AwaitErr} ->
ct:pal("Await error: ~p", [AwaitErr])
end.
66 changes: 64 additions & 2 deletions test/py_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
%% Close and drain tests
close_drain_erlang_test/1,
close_drain_python_sync_test/1,
close_drain_python_async_test/1
close_drain_python_async_test/1,
close_drain_create_task_async_test/1
]).

all() -> [
Expand Down Expand Up @@ -84,7 +85,8 @@ all() -> [
%% Close and drain tests
close_drain_erlang_test,
close_drain_python_sync_test,
close_drain_python_async_test
close_drain_python_async_test,
close_drain_create_task_async_test
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -707,3 +709,63 @@ async def async_drain_channel(ch_ref):
py:eval(Ctx, <<"erlang.run(async_drain_channel(ch))">>, #{<<"ch">> => Ch}),

ct:pal("Python async close+drain test passed").

%% @doc Test async drain with create_task when data arrives after task starts
%% This tests the notification callback path: task registers waiter, then data arrives
close_drain_create_task_async_test(_Config) ->
{ok, Ch} = py_channel:new(),

%% Define the async drain task
Ctx = py:context(1),
ok = py:exec(Ctx, <<"
import erlang
from erlang import Channel

async def drain_task(ch_ref, reply_pid):
'''Task that drains channel and sends results back.'''
try:
ch = Channel(ch_ref)
messages = []
async for msg in ch:
messages.append(msg)
erlang.send(reply_pid, ('result', messages))
except Exception as e:
erlang.send(reply_pid, ('error', str(e)))
">>),

%% Create the task BEFORE sending any data
%% This forces the task to register a waiter and wait for notifications
TaskRef = py_event_loop:create_task(
"__main__", "drain_task", [Ch, self()]),

%% Give the task time to start and register waiter
timer:sleep(100),

%% Now send data - should trigger notification callback
ok = py_channel:send(Ch, <<"msg1">>),
ok = py_channel:send(Ch, <<"msg2">>),
ok = py_channel:send(Ch, <<"msg3">>),

%% Close the channel to signal end of stream
ok = py_channel:close(Ch),

%% Wait for result from the task
receive
{<<"result">>, [<<"msg1">>, <<"msg2">>, <<"msg3">>]} ->
ct:pal("create_task async drain with delayed data OK");
{<<"result">>, Other} ->
ct:pal("Unexpected result: ~p", [Other]),
ct:fail({unexpected_result, Other});
{<<"error">>, ErrMsg} ->
ct:pal("Task error: ~p", [ErrMsg]),
ct:fail({task_error, ErrMsg})
after 5000 ->
ct:fail("Timeout waiting for drain task result")
end,

%% Wait for task to complete
case py_event_loop:await(TaskRef, 5000) of
{ok, _} -> ok;
{error, AwaitErr} ->
ct:pal("Await error: ~p", [AwaitErr])
end.
Loading