Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench/asio/http_server_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ void run_http_server_benchmarks(
collector.add( bench_multithread( 2, 32, 31250 ) );
collector.add( bench_multithread( 4, 32, 31250 ) );
collector.add( bench_multithread( 8, 32, 31250 ) );
collector.add( bench_multithread( 16, 32, 31250 ) );
}
}

Expand Down
1 change: 1 addition & 0 deletions bench/corosio/http_server_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ void run_http_server_benchmarks(
collector.add( bench_multithread<Context>( 2, 32, 31250 ) );
collector.add( bench_multithread<Context>( 4, 32, 31250 ) );
collector.add( bench_multithread<Context>( 8, 32, 31250 ) );
collector.add( bench_multithread<Context>( 16, 32, 31250 ) );
}
}

Expand Down
140 changes: 116 additions & 24 deletions src/corosio/src/detail/epoll/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ struct scheduler_context
{
epoll_scheduler const* key;
scheduler_context* next;
op_queue private_queue;
long private_outstanding_work;

scheduler_context(epoll_scheduler const* k, scheduler_context* n)
: key(k)
, next(n)
, private_outstanding_work(0)
{
}
};

corosio::detail::thread_local_ptr<scheduler_context> context_stack;
Expand All @@ -102,17 +111,28 @@ struct thread_context_guard

explicit thread_context_guard(
epoll_scheduler const* ctx) noexcept
: frame_{ctx, context_stack.get()}
: frame_(ctx, context_stack.get())
{
context_stack.set(&frame_);
}

~thread_context_guard() noexcept
{
if (!frame_.private_queue.empty())
frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
context_stack.set(frame_.next);
}
};

scheduler_context*
find_context(epoll_scheduler const* self) noexcept
{
for (auto* c = context_stack.get(); c != nullptr; c = c->next)
if (c->key == self)
return c;
return nullptr;
}

} // namespace

epoll_scheduler::
Expand Down Expand Up @@ -259,6 +279,17 @@ post(capy::coro h) const
};

auto ph = std::make_unique<post_handler>(h);

// Fast path: same thread posts to private queue without locking
if (auto* ctx = find_context(this))
{
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
++ctx->private_outstanding_work;
ctx->private_queue.push(ph.release());
return;
}

// Slow path: cross-thread post requires mutex
outstanding_work_.fetch_add(1, std::memory_order_relaxed);

std::unique_lock lock(mutex_);
Expand All @@ -270,6 +301,16 @@ void
epoll_scheduler::
post(scheduler_op* h) const
{
// Fast path: same thread posts to private queue without locking
if (auto* ctx = find_context(this))
{
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
++ctx->private_outstanding_work;
ctx->private_queue.push(h);
return;
}

// Slow path: cross-thread post requires mutex
outstanding_work_.fetch_add(1, std::memory_order_relaxed);

std::unique_lock lock(mutex_);
Expand Down Expand Up @@ -489,6 +530,17 @@ work_finished() const noexcept
}
}

void
epoll_scheduler::
drain_thread_queue(op_queue& queue, long count) const
{
std::lock_guard lock(mutex_);
// Note: outstanding_work_ was already incremented when posting
completed_ops_.splice(queue);
if (count > 0)
wakeup_event_.notify_all();
}
Comment on lines +533 to +542
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure drained work wakes or interrupts the reactor when no idle threads.

notify_all() only wakes condvar waiters. If no threads are idle and the reactor is waiting, the newly queued handlers can sit until an unrelated wakeup. Consider reusing the existing wake/interrupt logic so queued work is promptly observed.

🛠️ Suggested fix
 void
 epoll_scheduler::
 drain_thread_queue(op_queue& queue, long count) const
 {
-    std::lock_guard lock(mutex_);
+    std::unique_lock lock(mutex_);
     // Note: outstanding_work_ was already incremented when posting
     completed_ops_.splice(queue);
     if (count > 0)
-        wakeup_event_.notify_all();
+        wake_one_thread_and_unlock(lock);
 }
🤖 Prompt for AI Agents
In `@src/corosio/src/detail/epoll/scheduler.cpp` around lines 533 - 542, The
drain_thread_queue implementation uses wakeup_event_.notify_all() which only
wakes condvar waiters and can leave the reactor unaware when no threads are
idle; change drain_thread_queue (and its call sites if needed) to reuse the
scheduler's existing reactor wake/interrupt path instead of a raw notify_all:
after splicing into completed_ops_ (and preserving outstanding_work_), invoke
the scheduler's established wake/interrupt helper (the same function used
elsewhere to interrupt or wake the reactor) so queued handlers are promptly
observed by the reactor; reference epoll_scheduler::drain_thread_queue,
completed_ops_, outstanding_work_, and wakeup_event_ when locating the change.


void
epoll_scheduler::
interrupt_reactor() const
Expand Down Expand Up @@ -542,7 +594,6 @@ update_timerfd() const
if (nearest == timer_service::time_point::max())
{
// No timers - disarm by setting to 0 (relative)
// ts is already zeroed
}
else
{
Expand Down Expand Up @@ -576,19 +627,20 @@ run_reactor(std::unique_lock<std::mutex>& lock)

lock.unlock();

// --- Event loop runs WITHOUT the mutex (like Asio) ---

epoll_event events[128];
int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
int saved_errno = errno;

timer_svc_->process_expired();
update_timerfd();

if (nfds < 0 && saved_errno != EINTR)
detail::throw_system_error(make_err(saved_errno), "epoll_wait");

lock.lock();

bool check_timers = false;
op_queue local_ops;
int completions_queued = 0;

// Process events without holding the mutex
for (int i = 0; i < nfds; ++i)
{
if (events[i].data.ptr == nullptr)
Expand All @@ -599,9 +651,13 @@ run_reactor(std::unique_lock<std::mutex>& lock)
continue;
}

// timerfd_settime() in update_timerfd() resets the readable state
if (events[i].data.ptr == &timer_fd_)
{
std::uint64_t expirations;
[[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
check_timers = true;
continue;
}

auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
std::uint32_t ev = events[i].events;
Expand All @@ -624,7 +680,7 @@ run_reactor(std::unique_lock<std::mutex>& lock)
if (err)
{
op->complete(err, 0);
completed_ops_.push(op);
local_ops.push(op);
++completions_queued;
}
else
Expand All @@ -637,7 +693,7 @@ run_reactor(std::unique_lock<std::mutex>& lock)
}
else
{
completed_ops_.push(op);
local_ops.push(op);
++completions_queued;
}
}
Expand All @@ -650,14 +706,13 @@ run_reactor(std::unique_lock<std::mutex>& lock)

if (ev & EPOLLOUT)
{
// Connect uses write readiness - try it first
auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
if (conn_op)
{
if (err)
{
conn_op->complete(err, 0);
completed_ops_.push(conn_op);
local_ops.push(conn_op);
++completions_queued;
}
else
Expand All @@ -670,7 +725,7 @@ run_reactor(std::unique_lock<std::mutex>& lock)
}
else
{
completed_ops_.push(conn_op);
local_ops.push(conn_op);
++completions_queued;
}
}
Expand All @@ -682,7 +737,7 @@ run_reactor(std::unique_lock<std::mutex>& lock)
if (err)
{
write_op->complete(err, 0);
completed_ops_.push(write_op);
local_ops.push(write_op);
++completions_queued;
}
else
Expand All @@ -695,7 +750,7 @@ run_reactor(std::unique_lock<std::mutex>& lock)
}
else
{
completed_ops_.push(write_op);
local_ops.push(write_op);
++completions_queued;
}
}
Expand All @@ -705,41 +760,64 @@ run_reactor(std::unique_lock<std::mutex>& lock)
desc->write_ready.store(true, std::memory_order_release);
}

// Handle error for ops not processed above (no EPOLLIN/EPOLLOUT)
if (err && !(ev & (EPOLLIN | EPOLLOUT)))
{
auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
if (read_op)
{
read_op->complete(err, 0);
completed_ops_.push(read_op);
local_ops.push(read_op);
++completions_queued;
}

auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
if (write_op)
{
write_op->complete(err, 0);
completed_ops_.push(write_op);
local_ops.push(write_op);
++completions_queued;
}

auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
if (conn_op)
{
conn_op->complete(err, 0);
completed_ops_.push(conn_op);
local_ops.push(conn_op);
++completions_queued;
}
}
}

if (completions_queued > 0)
// Process timers only when timerfd fires (like Asio's check_timers pattern)
if (check_timers)
{
timer_svc_->process_expired();
update_timerfd();
}

// --- Acquire mutex only for queue operations ---
lock.lock();

if (!local_ops.empty())
completed_ops_.splice(local_ops);

// Drain private queue (outstanding_work_ was already incremented when posting)
if (auto* ctx = find_context(this))
{
if (completions_queued == 1)
if (!ctx->private_queue.empty())
{
completions_queued += ctx->private_outstanding_work;
ctx->private_outstanding_work = 0;
completed_ops_.splice(ctx->private_queue);
}
}

// Only wake threads that are actually idle, and only as many as we have work
if (completions_queued > 0 && idle_thread_count_ > 0)
{
int threads_to_wake = (std::min)(completions_queued, idle_thread_count_);
for (int i = 0; i < threads_to_wake; ++i)
wakeup_event_.notify_one();
else
wakeup_event_.notify_all();
}
}

Expand All @@ -758,7 +836,10 @@ do_one(long timeout_us)

if (op == &task_op_)
{
bool more_handlers = !completed_ops_.empty();
// Check both global queue and private queue for pending handlers
auto* ctx = find_context(this);
bool more_handlers = !completed_ops_.empty() ||
(ctx && !ctx->private_queue.empty());

if (!more_handlers)
{
Expand Down Expand Up @@ -801,6 +882,17 @@ do_one(long timeout_us)
if (timeout_us == 0)
return 0;

// Drain private queue before blocking (outstanding_work_ was already incremented)
if (auto* ctx = find_context(this))
{
if (!ctx->private_queue.empty())
{
ctx->private_outstanding_work = 0;
completed_ops_.splice(ctx->private_queue);
continue;
}
}

++idle_thread_count_;
if (timeout_us < 0)
wakeup_event_.wait(lock);
Expand Down
12 changes: 12 additions & 0 deletions src/corosio/src/detail/epoll/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class epoll_scheduler
capy::execution_context& ctx,
int concurrency_hint = -1);

/// Destroy the scheduler.
~epoll_scheduler();

epoll_scheduler(epoll_scheduler const&) = delete;
Expand Down Expand Up @@ -130,6 +131,16 @@ class epoll_scheduler
/** For use by I/O operations to track completed work. */
void work_finished() const noexcept override;

/** Drain work from thread context's private queue to global queue.

Called by thread_context_guard destructor when a thread exits run().
Transfers pending work to the global queue under mutex protection.

@param queue The private queue to drain.
@param count Item count for wakeup decisions (wakes other threads if positive).
*/
void drain_thread_queue(op_queue& queue, long count) const;

private:
std::size_t do_one(long timeout_us);
void run_reactor(std::unique_lock<std::mutex>& lock);
Expand All @@ -156,6 +167,7 @@ class epoll_scheduler
// Edge-triggered eventfd state
mutable std::atomic<bool> eventfd_armed_{false};


// Sentinel operation for interleaving reactor runs with handler execution.
// Ensures the reactor runs periodically even when handlers are continuously
// posted, preventing timer starvation.
Expand Down
Loading