diff --git a/c_src/py_channel.c b/c_src/py_channel.c index 5c3707e..3b1aaa6 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -145,8 +145,9 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) bool should_resume = (channel->waiting != NULL); /* Check if there's an async waiter to dispatch. - * We only clear the waiter state after successful dispatch to avoid - * lost wakeups if the event queue is full. */ + * IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition. + * With task_ready notification, the callback can fire before we re-acquire the mutex. + * If dispatch fails (rare), data is still in channel for next receive. */ erlang_event_loop_t *loop_to_wake = NULL; uint64_t callback_id = 0; bool has_async_waiter = channel->has_waiter; @@ -154,7 +155,10 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) if (has_async_waiter) { loop_to_wake = channel->waiter_loop; callback_id = channel->waiter_callback_id; - /* Don't clear yet - will clear after successful dispatch */ + /* Clear waiter state now to avoid race with fast callback */ + channel->has_waiter = false; + channel->waiter_loop = NULL; + channel->waiter_callback_id = 0; } /* Check if there's a sync waiter to notify */ @@ -163,7 +167,8 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) if (has_sync_waiter) { sync_waiter = channel->sync_waiter_pid; - /* Don't clear yet - will clear after successful send */ + /* Clear waiter state now to avoid race */ + channel->has_sync_waiter = false; } pthread_mutex_unlock(&channel->mutex); @@ -175,19 +180,9 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) /* Dispatch async waiter via timer dispatch (same path as timers) */ if (loop_to_wake != NULL) { - bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); - if (dispatched) { - /* Successfully dispatched - now clear the waiter state */ - pthread_mutex_lock(&channel->mutex); - if (channel->has_waiter && channel->waiter_callback_id == callback_id) { - channel->has_waiter = false; - channel->waiter_loop = NULL; - } - pthread_mutex_unlock(&channel->mutex); - /* Release the reference we kept in channel_wait */ - enif_release_resource(loop_to_wake); - } - /* If dispatch failed, waiter remains registered for next send */ + event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); + /* Release the reference we kept in channel_wait */ + enif_release_resource(loop_to_wake); } /* Notify sync waiter via Erlang message */ @@ -197,14 +192,7 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) enif_send(NULL, &sync_waiter, msg_env, enif_make_atom(msg_env, "channel_data_ready")); enif_free_env(msg_env); - /* Successfully notified - clear the waiter state */ - pthread_mutex_lock(&channel->mutex); - if (channel->has_sync_waiter) { - channel->has_sync_waiter = false; - } - pthread_mutex_unlock(&channel->mutex); } - /* If alloc failed, waiter remains registered for next send */ } return 0; @@ -241,8 +229,9 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { bool should_resume = (channel->waiting != NULL); /* Check if there's an async waiter to dispatch. - * We only clear the waiter state after successful dispatch to avoid - * lost wakeups if the event queue is full. */ + * IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition. + * With task_ready notification, the callback can fire before we re-acquire the mutex. + * If dispatch fails (rare), data is still in channel for next receive. */ erlang_event_loop_t *loop_to_wake = NULL; uint64_t callback_id = 0; bool has_async_waiter = channel->has_waiter; @@ -250,7 +239,10 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { if (has_async_waiter) { loop_to_wake = channel->waiter_loop; callback_id = channel->waiter_callback_id; - /* Don't clear yet - will clear after successful dispatch */ + /* Clear waiter state now to avoid race with fast callback */ + channel->has_waiter = false; + channel->waiter_loop = NULL; + channel->waiter_callback_id = 0; } /* Check if there's a sync waiter to notify */ @@ -259,7 +251,8 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { if (has_sync_waiter) { sync_waiter = channel->sync_waiter_pid; - /* Don't clear yet - will clear after successful send */ + /* Clear waiter state now to avoid race */ + channel->has_sync_waiter = false; } pthread_mutex_unlock(&channel->mutex); @@ -270,19 +263,9 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { /* Dispatch async waiter via timer dispatch (same path as timers) */ if (loop_to_wake != NULL) { - bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); - if (dispatched) { - /* Successfully dispatched - now clear the waiter state */ - pthread_mutex_lock(&channel->mutex); - if (channel->has_waiter && channel->waiter_callback_id == callback_id) { - channel->has_waiter = false; - channel->waiter_loop = NULL; - } - pthread_mutex_unlock(&channel->mutex); - /* Release the reference we kept in channel_wait */ - enif_release_resource(loop_to_wake); - } - /* If dispatch failed, waiter remains registered for next send */ + event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); + /* Release the reference we kept in channel_wait */ + enif_release_resource(loop_to_wake); } /* Notify sync waiter via Erlang message */ @@ -292,14 +275,7 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { enif_send(NULL, &sync_waiter, msg_env, enif_make_atom(msg_env, "channel_data_ready")); enif_free_env(msg_env); - /* Successfully notified - clear the waiter state */ - pthread_mutex_lock(&channel->mutex); - if (channel->has_sync_waiter) { - channel->has_sync_waiter = false; - } - pthread_mutex_unlock(&channel->mutex); } - /* If alloc failed, waiter remains registered for next send */ } return 0;