Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 24 additions & 48 deletions c_src/py_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,20 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
bool should_resume = (channel->waiting != NULL);

/* Check if there's an async waiter to dispatch.
* We only clear the waiter state after successful dispatch to avoid
* lost wakeups if the event queue is full. */
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
* With task_ready notification, the callback can fire before we re-acquire the mutex.
* If dispatch fails (rare), data is still in channel for next receive. */
erlang_event_loop_t *loop_to_wake = NULL;
uint64_t callback_id = 0;
bool has_async_waiter = channel->has_waiter;

if (has_async_waiter) {
loop_to_wake = channel->waiter_loop;
callback_id = channel->waiter_callback_id;
/* Don't clear yet - will clear after successful dispatch */
/* Clear waiter state now to avoid race with fast callback */
channel->has_waiter = false;
channel->waiter_loop = NULL;
channel->waiter_callback_id = 0;
}

/* Check if there's a sync waiter to notify */
Expand All @@ -163,7 +167,8 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)

if (has_sync_waiter) {
sync_waiter = channel->sync_waiter_pid;
/* Don't clear yet - will clear after successful send */
/* Clear waiter state now to avoid race */
channel->has_sync_waiter = false;
}

pthread_mutex_unlock(&channel->mutex);
Expand All @@ -175,19 +180,9 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)

/* Dispatch async waiter via timer dispatch (same path as timers) */
if (loop_to_wake != NULL) {
bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
if (dispatched) {
/* Successfully dispatched - now clear the waiter state */
pthread_mutex_lock(&channel->mutex);
if (channel->has_waiter && channel->waiter_callback_id == callback_id) {
channel->has_waiter = false;
channel->waiter_loop = NULL;
}
pthread_mutex_unlock(&channel->mutex);
/* Release the reference we kept in channel_wait */
enif_release_resource(loop_to_wake);
}
/* If dispatch failed, waiter remains registered for next send */
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
/* Release the reference we kept in channel_wait */
enif_release_resource(loop_to_wake);
}

/* Notify sync waiter via Erlang message */
Expand All @@ -197,14 +192,7 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
enif_send(NULL, &sync_waiter, msg_env,
enif_make_atom(msg_env, "channel_data_ready"));
enif_free_env(msg_env);
/* Successfully notified - clear the waiter state */
pthread_mutex_lock(&channel->mutex);
if (channel->has_sync_waiter) {
channel->has_sync_waiter = false;
}
pthread_mutex_unlock(&channel->mutex);
}
/* If alloc failed, waiter remains registered for next send */
}

return 0;
Expand Down Expand Up @@ -241,16 +229,20 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
bool should_resume = (channel->waiting != NULL);

/* Check if there's an async waiter to dispatch.
* We only clear the waiter state after successful dispatch to avoid
* lost wakeups if the event queue is full. */
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
* With task_ready notification, the callback can fire before we re-acquire the mutex.
* If dispatch fails (rare), data is still in channel for next receive. */
erlang_event_loop_t *loop_to_wake = NULL;
uint64_t callback_id = 0;
bool has_async_waiter = channel->has_waiter;

if (has_async_waiter) {
loop_to_wake = channel->waiter_loop;
callback_id = channel->waiter_callback_id;
/* Don't clear yet - will clear after successful dispatch */
/* Clear waiter state now to avoid race with fast callback */
channel->has_waiter = false;
channel->waiter_loop = NULL;
channel->waiter_callback_id = 0;
}

/* Check if there's a sync waiter to notify */
Expand All @@ -259,7 +251,8 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {

if (has_sync_waiter) {
sync_waiter = channel->sync_waiter_pid;
/* Don't clear yet - will clear after successful send */
/* Clear waiter state now to avoid race */
channel->has_sync_waiter = false;
}

pthread_mutex_unlock(&channel->mutex);
Expand All @@ -270,19 +263,9 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {

/* Dispatch async waiter via timer dispatch (same path as timers) */
if (loop_to_wake != NULL) {
bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
if (dispatched) {
/* Successfully dispatched - now clear the waiter state */
pthread_mutex_lock(&channel->mutex);
if (channel->has_waiter && channel->waiter_callback_id == callback_id) {
channel->has_waiter = false;
channel->waiter_loop = NULL;
}
pthread_mutex_unlock(&channel->mutex);
/* Release the reference we kept in channel_wait */
enif_release_resource(loop_to_wake);
}
/* If dispatch failed, waiter remains registered for next send */
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
/* Release the reference we kept in channel_wait */
enif_release_resource(loop_to_wake);
}

/* Notify sync waiter via Erlang message */
Expand All @@ -292,14 +275,7 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
enif_send(NULL, &sync_waiter, msg_env,
enif_make_atom(msg_env, "channel_data_ready"));
enif_free_env(msg_env);
/* Successfully notified - clear the waiter state */
pthread_mutex_lock(&channel->mutex);
if (channel->has_sync_waiter) {
channel->has_sync_waiter = false;
}
pthread_mutex_unlock(&channel->mutex);
}
/* If alloc failed, waiter remains registered for next send */
}

return 0;
Expand Down
Loading