From 5faffdb1778c304e85bdb4440fc206bcf49f7620 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 08:42:21 +0100 Subject: [PATCH 1/5] Quick wins for event loop performance - Fix capsule init bug: initialize pending_capacity after memset - Cache loop reference in persistent_term for O(1) lookup - Coalesce task_ready messages to reduce mailbox amplification --- c_src/py_event_loop.c | 9 +++++++++ src/py_event_loop.erl | 15 ++++++++++++++- src/py_event_worker.erl | 28 +++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 1573b7f..76e904d 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -1087,6 +1087,9 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc, /* Initialize fields */ memset(loop, 0, sizeof(erlang_event_loop_t)); + /* Initialize pending_capacity (memset zeros it, but we need the initial value) */ + loop->pending_capacity = INITIAL_PENDING_CAPACITY; + if (pthread_mutex_init(&loop->mutex, NULL) != 0) { enif_release_resource(loop); return make_error(env, "mutex_init_failed"); @@ -6410,6 +6413,9 @@ static PyObject *py_loop_new(PyObject *self, PyObject *args) { /* Initialize fields */ memset(loop, 0, sizeof(erlang_event_loop_t)); + /* Initialize pending_capacity (memset zeros it, but we need the initial value) */ + loop->pending_capacity = INITIAL_PENDING_CAPACITY; + if (pthread_mutex_init(&loop->mutex, NULL) != 0) { enif_release_resource(loop); PyErr_SetString(PyExc_RuntimeError, "Failed to initialize mutex"); @@ -7446,6 +7452,9 @@ int create_default_event_loop(ErlNifEnv *env) { /* Initialize fields */ memset(loop, 0, sizeof(erlang_event_loop_t)); + /* Initialize pending_capacity (memset zeros it, but we need the initial value) */ + loop->pending_capacity = INITIAL_PENDING_CAPACITY; + if (pthread_mutex_init(&loop->mutex, NULL) != 0) { enif_release_resource(loop); return -1; diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index f6dad5d..5c4816b 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -70,7 +70,14 @@ stop() -> -spec get_loop() -> {ok, reference()} | {error, not_started}. get_loop() -> - gen_server:call(?MODULE, get_loop). + %% Fast path: check persistent_term cache first (O(1) lookup) + case persistent_term:get({?MODULE, loop_ref}, undefined) of + undefined -> + %% Slow path: loop not cached yet, get from gen_server + gen_server:call(?MODULE, get_loop); + LoopRef -> + {ok, LoopRef} + end. %% @doc Register event loop callbacks for Python access. -spec register_callbacks() -> ok. @@ -332,6 +339,8 @@ init([]) -> ok = py_nif:set_python_event_loop(LoopRef), %% Set ErlangEventLoop as the default asyncio policy ok = set_default_policy(), + %% Cache loop reference for fast O(1) lookup + persistent_term:put({?MODULE, loop_ref}, LoopRef), {ok, #state{ loop_ref = LoopRef, worker_pid = WorkerPid, @@ -393,6 +402,8 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) -> {ok, RouterPid} = py_event_router:start_link(LoopRef), ok = py_nif:set_shared_router(RouterPid), ok = py_nif:set_python_event_loop(LoopRef), + %% Cache loop reference for fast O(1) lookup + persistent_term:put({?MODULE, loop_ref}, LoopRef), NewState = State#state{ loop_ref = LoopRef, worker_pid = WorkerPid, @@ -417,6 +428,8 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid, router_pid = RouterPid}) -> + %% Clear persistent_term cache + persistent_term:erase({?MODULE, loop_ref}), %% Reset asyncio policy back to default before destroying the loop reset_default_policy(), %% Clean up worker (scalable I/O model) diff --git a/src/py_event_worker.erl b/src/py_event_worker.erl index efa455e..884465c 100644 --- a/src/py_event_worker.erl +++ b/src/py_event_worker.erl @@ -46,13 +46,13 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({select, FdRes, _Ref, ready_input}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, read), %% Trigger event processing after FD event dispatch - self() ! task_ready, + maybe_send_task_ready(), {noreply, State}; handle_info({select, FdRes, _Ref, ready_output}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, write), %% Trigger event processing after FD event dispatch - self() ! task_ready, + maybe_send_task_ready(), {noreply, State}; handle_info({start_timer, _LoopRef, DelayMs, CallbackId, TimerRef}, State) -> @@ -86,7 +86,7 @@ handle_info({timeout, TimerRef}, State) -> NewTimers = maps:remove(TimerRef, Timers), %% Trigger event processing after timer dispatch %% This ensures _run_once is called to handle the timer callback - self() ! task_ready, + maybe_send_task_ready(), {noreply, State#state{timers = NewTimers}} end; @@ -96,6 +96,8 @@ handle_info({select, _FdRes, _Ref, cancelled}, State) -> {noreply, State}; %% This is sent via enif_send when a new async task is submitted. %% Uses a drain-until-empty loop to handle tasks submitted during processing. handle_info(task_ready, #state{loop_ref = LoopRef} = State) -> + %% Clear the pending flag - we're processing now + put(task_ready_pending, false), drain_tasks_loop(LoopRef), {noreply, State}; @@ -121,7 +123,9 @@ drain_tasks_loop(LoopRef) -> ok -> %% Check if more task_ready messages arrived during processing receive - task_ready -> drain_tasks_loop(LoopRef) + task_ready -> + put(task_ready_pending, false), + drain_tasks_loop(LoopRef) after 0 -> ok end; @@ -130,7 +134,7 @@ drain_tasks_loop(LoopRef) -> %% Send task_ready to self and return, allowing the gen_server %% to process other messages (select, timers) before continuing. %% This prevents starvation under sustained task traffic. - self() ! task_ready, + maybe_send_task_ready(), ok; {error, py_loop_not_set} -> ok; @@ -141,3 +145,17 @@ drain_tasks_loop(LoopRef) -> error_logger:warning_msg("py_event_worker: task processing failed: ~p~n", [Reason]), ok end. + +%% @doc Send task_ready message only if one isn't already pending. +%% Uses process dictionary to coalesce multiple wakeup requests. +maybe_send_task_ready() -> + case get(task_ready_pending) of + true -> + %% Already pending, no need to send another + ok; + _ -> + %% No pending message, send one and mark as pending + put(task_ready_pending, true), + self() ! task_ready, + ok + end. From bd8b929f1cbc91cd6731fece9bdc596990890aca Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 12:51:49 +0100 Subject: [PATCH 2/5] Drain sockets to EAGAIN with budget in transport callbacks - TCP/UDP _read_ready: loop up to 16 reads until EAGAIN - TCP _write_ready_cb: loop up to 16 writes until EAGAIN - Add note for future native task queue optimization --- c_src/py_event_loop.h | 5 ++ priv/_erlang_impl/_transport.py | 129 +++++++++++++++++++------------- 2 files changed, 83 insertions(+), 51 deletions(-) diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index ff868fa..2ff1210 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -345,6 +345,11 @@ typedef struct erlang_event_loop { uint32_t interp_id; /* ========== Async Task Queue (uvloop-inspired) ========== */ + /* + * Future optimization: Replace serialized task queue with native MPSC + * ring buffer to avoid enif_term_to_binary/enif_binary_to_term overhead. + * See task_entry_t/task_ring_t design in optimization plan. + */ /** @brief Python ErlangEventLoop instance (direct ref, no thread-local) */ PyObject *py_loop; diff --git a/priv/_erlang_impl/_transport.py b/priv/_erlang_impl/_transport.py index 5b9a76d..4b53c65 100644 --- a/priv/_erlang_impl/_transport.py +++ b/priv/_erlang_impl/_transport.py @@ -69,29 +69,39 @@ async def _start(self): self._loop.call_soon(self._protocol.connection_made, self) self._loop.add_reader(self._fileno, self._read_ready) + # Maximum reads per callback to avoid starving other events + _max_reads_per_call = 16 + def _read_ready(self): - """Called when data is available to read.""" + """Called when data is available to read. + + Drains socket until EAGAIN with a budget to avoid starvation. + """ if self._conn_lost: return - try: - data = self._sock.recv(self.max_size) - except (BlockingIOError, InterruptedError): - return - except Exception as exc: - self._fatal_error(exc, 'Fatal read error') - return - if data: - self._protocol.data_received(data) - else: - # Connection closed (EOF received) - self._loop.remove_reader(self._fileno) - keep_open = self._protocol.eof_received() - # If eof_received returns False/None, close the transport - if not keep_open: - self._closing = True - self._conn_lost += 1 - self._call_connection_lost(None) + for _ in range(self._max_reads_per_call): + try: + data = self._sock.recv(self.max_size) + except (BlockingIOError, InterruptedError): + # EAGAIN - no more data available + return + except Exception as exc: + self._fatal_error(exc, 'Fatal read error') + return + + if data: + self._protocol.data_received(data) + else: + # Connection closed (EOF received) + self._loop.remove_reader(self._fileno) + keep_open = self._protocol.eof_received() + # If eof_received returns False/None, close the transport + if not keep_open: + self._closing = True + self._conn_lost += 1 + self._call_connection_lost(None) + return def write(self, data): """Write data to the transport.""" @@ -122,30 +132,38 @@ def write(self, data): self._buffer.extend(data) + # Maximum writes per callback to avoid starving other events + _max_writes_per_call = 16 + def _write_ready_cb(self): - """Called when socket is ready for writing.""" - remaining = len(self._buffer) - self._buffer_offset - if remaining <= 0: - self._loop.remove_writer(self._fileno) - if self._closing: - self._call_connection_lost(None) - return + """Called when socket is ready for writing. + + Drains buffer until EAGAIN with a budget to avoid starvation. + """ + for _ in range(self._max_writes_per_call): + remaining = len(self._buffer) - self._buffer_offset + if remaining <= 0: + self._loop.remove_writer(self._fileno) + if self._closing: + self._call_connection_lost(None) + return - try: - # Use memoryview with offset for O(1) access to remaining data - data_view = memoryview(self._buffer)[self._buffer_offset:] - n = self._sock.send(data_view) - except (BlockingIOError, InterruptedError): - return - except Exception as exc: - self._loop.remove_writer(self._fileno) - self._fatal_error(exc, 'Fatal write error') - return + try: + # Use memoryview with offset for O(1) access to remaining data + data_view = memoryview(self._buffer)[self._buffer_offset:] + n = self._sock.send(data_view) + except (BlockingIOError, InterruptedError): + # EAGAIN - socket buffer full + return + except Exception as exc: + self._loop.remove_writer(self._fileno) + self._fatal_error(exc, 'Fatal write error') + return - if n: - self._buffer_offset += n # O(1) offset update instead of O(n) deletion + if n: + self._buffer_offset += n # O(1) offset update instead of O(n) deletion - # Check if buffer is fully consumed + # Check if buffer is fully consumed after budget exhausted if self._buffer_offset >= len(self._buffer): # Reset buffer when fully consumed self._buffer = self._buffer_factory() @@ -258,6 +276,9 @@ class ErlangDatagramTransport(transports.DatagramTransport): max_size = 256 * 1024 # 256 KB + # Maximum reads per callback to avoid starving other events + _max_reads_per_call = 16 + def __init__(self, loop, sock, protocol, address=None, extra=None): super().__init__(extra) self._loop = loop @@ -282,21 +303,27 @@ async def _start(self): self._loop.add_reader(self._fileno, self._read_ready) def _read_ready(self): - """Called when data is available to read.""" + """Called when data is available to read. + + Drains socket until EAGAIN with a budget to avoid starvation. + """ if self._conn_lost: return - try: - data, addr = self._sock.recvfrom(self.max_size) - except (BlockingIOError, InterruptedError): - return - except OSError as exc: - self._protocol.error_received(exc) - return - except Exception as exc: - self._fatal_error(exc, 'Fatal read error on datagram transport') - return - self._protocol.datagram_received(data, addr) + for _ in range(self._max_reads_per_call): + try: + data, addr = self._sock.recvfrom(self.max_size) + except (BlockingIOError, InterruptedError): + # EAGAIN - no more data available + return + except OSError as exc: + self._protocol.error_received(exc) + return + except Exception as exc: + self._fatal_error(exc, 'Fatal read error on datagram transport') + return + + self._protocol.datagram_received(data, addr) def sendto(self, data, addr=None): """Send data to the transport.""" From c299c94f10f33be506ab260ed35c03125b0b71bb Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 12:52:08 +0100 Subject: [PATCH 3/5] Remove persistent_term cache to fix race condition The persistent_term cache for loop_ref could return a reference being destroyed during shutdown, causing ASAN errors. Revert to gen_server:call for safety. --- src/py_event_loop.erl | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 5c4816b..f6dad5d 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -70,14 +70,7 @@ stop() -> -spec get_loop() -> {ok, reference()} | {error, not_started}. get_loop() -> - %% Fast path: check persistent_term cache first (O(1) lookup) - case persistent_term:get({?MODULE, loop_ref}, undefined) of - undefined -> - %% Slow path: loop not cached yet, get from gen_server - gen_server:call(?MODULE, get_loop); - LoopRef -> - {ok, LoopRef} - end. + gen_server:call(?MODULE, get_loop). %% @doc Register event loop callbacks for Python access. -spec register_callbacks() -> ok. @@ -339,8 +332,6 @@ init([]) -> ok = py_nif:set_python_event_loop(LoopRef), %% Set ErlangEventLoop as the default asyncio policy ok = set_default_policy(), - %% Cache loop reference for fast O(1) lookup - persistent_term:put({?MODULE, loop_ref}, LoopRef), {ok, #state{ loop_ref = LoopRef, worker_pid = WorkerPid, @@ -402,8 +393,6 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) -> {ok, RouterPid} = py_event_router:start_link(LoopRef), ok = py_nif:set_shared_router(RouterPid), ok = py_nif:set_python_event_loop(LoopRef), - %% Cache loop reference for fast O(1) lookup - persistent_term:put({?MODULE, loop_ref}, LoopRef), NewState = State#state{ loop_ref = LoopRef, worker_pid = WorkerPid, @@ -428,8 +417,6 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid, router_pid = RouterPid}) -> - %% Clear persistent_term cache - persistent_term:erase({?MODULE, loop_ref}), %% Reset asyncio policy back to default before destroying the loop reset_default_policy(), %% Clean up worker (scalable I/O model) From 5921a91ca20ada129112ebe0765deda2fbcd92e7 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 12:52:18 +0100 Subject: [PATCH 4/5] Fix venv_info to check both state attributes Check both _active_venv and _venv_site_packages exist before returning active venv info, preventing errors from partial venv state. --- src/py.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/py.erl b/src/py.erl index f748487..b78b2b8 100644 --- a/src/py.erl +++ b/src/py.erl @@ -1089,7 +1089,8 @@ deactivate_venv() -> %% Returns a map with venv_path and site_packages, or none if no venv is active. -spec venv_info() -> {ok, map() | none} | {error, term()}. venv_info() -> - Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if hasattr(__import__('sys'), '_active_venv') else {'active': False})">>, + %% Check both attributes exist to handle partial activation/deactivation state + Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if (hasattr(__import__('sys'), '_active_venv') and hasattr(__import__('sys'), '_venv_site_packages')) else {'active': False})">>, eval(Code). %% @private From 2df726e099eb5ba73b46d4146a99e7947bf13695 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 17 Mar 2026 12:52:28 +0100 Subject: [PATCH 5/5] Handle erlang.ProcessError in _run_and_send Catch ProcessError when sending async task results back to caller. If the caller process is gone, silently ignore the send failure. --- priv/_erlang_impl/_loop.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/priv/_erlang_impl/_loop.py b/priv/_erlang_impl/_loop.py index e75013b..8be028b 100644 --- a/priv/_erlang_impl/_loop.py +++ b/priv/_erlang_impl/_loop.py @@ -1339,10 +1339,19 @@ async def _run_and_send(coro, caller_pid, ref): try: result = await coro - erlang.send(caller_pid, (async_result, ref, (ok, result))) + try: + erlang.send(caller_pid, (async_result, ref, (ok, result))) + except erlang.ProcessError: + pass # Caller gone, nothing to do except asyncio.CancelledError: - erlang.send(caller_pid, (async_result, ref, (error, 'cancelled'))) + try: + erlang.send(caller_pid, (async_result, ref, (error, 'cancelled'))) + except erlang.ProcessError: + pass # Caller gone, nothing to do except Exception as e: import traceback tb = traceback.format_exc() - erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}'))) + try: + erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}'))) + except erlang.ProcessError: + pass # Caller gone, nothing to do