From 9dbcadec14ac207b4edbb404a356621616df9dcf Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 4 Feb 2026 15:40:08 +0100 Subject: [PATCH 1/5] Refactor epoll reactor to not hold mutex during event loop Reduce mutex contention by processing events into a local queue without holding the mutex. The mutex is only acquired briefly when splicing completions into the completed_ops_ queue. Changes: - Process events into a local op_queue without holding the mutex - Only acquire mutex for completed_ops_ splice operation - Add check_timers flag to only process timers when timerfd fires - Cache last timerfd expiry to skip redundant timerfd_settime calls --- src/corosio/src/detail/epoll/scheduler.cpp | 54 +++++++++++++++------- src/corosio/src/detail/epoll/scheduler.hpp | 3 ++ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index 2c421aeb..c1c3bc30 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -536,6 +536,12 @@ update_timerfd() const { auto nearest = timer_svc_->nearest_expiry(); + // Skip syscall if expiry hasn't changed + if (nearest == last_timerfd_expiry_) + return; + + last_timerfd_expiry_ = nearest; + itimerspec ts{}; int flags = 0; @@ -576,19 +582,20 @@ run_reactor(std::unique_lock& lock) lock.unlock(); + // --- Event loop runs WITHOUT the mutex (like Asio) --- + epoll_event events[128]; int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); int saved_errno = errno; - timer_svc_->process_expired(); - update_timerfd(); - if (nfds < 0 && saved_errno != EINTR) detail::throw_system_error(make_err(saved_errno), "epoll_wait"); - lock.lock(); - + bool check_timers = false; + op_queue local_ops; int completions_queued = 0; + + // Process events without holding the mutex for (int i = 0; i < nfds; ++i) { if (events[i].data.ptr == nullptr) @@ -599,9 +606,11 @@ run_reactor(std::unique_lock& lock) continue; } - // timerfd_settime() in update_timerfd() resets the readable state if (events[i].data.ptr == &timer_fd_) + { + check_timers = true; continue; + } auto* desc = static_cast(events[i].data.ptr); std::uint32_t ev = events[i].events; @@ -624,7 +633,7 @@ run_reactor(std::unique_lock& lock) if (err) { op->complete(err, 0); - completed_ops_.push(op); + local_ops.push(op); ++completions_queued; } else @@ -637,7 +646,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(op); + local_ops.push(op); ++completions_queued; } } @@ -650,14 +659,13 @@ run_reactor(std::unique_lock& lock) if (ev & EPOLLOUT) { - // Connect uses write readiness - try it first auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel); if (conn_op) { if (err) { conn_op->complete(err, 0); - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } else @@ -670,7 +678,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } } @@ -682,7 +690,7 @@ run_reactor(std::unique_lock& lock) if (err) { write_op->complete(err, 0); - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } else @@ -695,7 +703,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } } @@ -705,14 +713,13 @@ run_reactor(std::unique_lock& lock) desc->write_ready.store(true, std::memory_order_release); } - // Handle error for ops not processed above (no EPOLLIN/EPOLLOUT) if (err && !(ev & (EPOLLIN | EPOLLOUT))) { auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel); if (read_op) { read_op->complete(err, 0); - completed_ops_.push(read_op); + local_ops.push(read_op); ++completions_queued; } @@ -720,7 +727,7 @@ run_reactor(std::unique_lock& lock) if (write_op) { write_op->complete(err, 0); - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } @@ -728,12 +735,25 @@ run_reactor(std::unique_lock& lock) if (conn_op) { conn_op->complete(err, 0); - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } } } + // Process timers only when timerfd fires (like Asio's check_timers pattern) + if (check_timers) + { + timer_svc_->process_expired(); + update_timerfd(); + } + + // --- Acquire mutex only for queue operations --- + lock.lock(); + + if (!local_ops.empty()) + completed_ops_.splice(local_ops); + if (completions_queued > 0) { if (completions_queued == 1) diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 25530c57..686afbf7 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -156,6 +156,9 @@ class epoll_scheduler // Edge-triggered eventfd state mutable std::atomic eventfd_armed_{false}; + // Track last timerfd expiry to avoid redundant timerfd_settime calls + mutable timer_service::time_point last_timerfd_expiry_{timer_service::time_point::max()}; + // Sentinel operation for interleaving reactor runs with handler execution. // Ensures the reactor runs periodically even when handlers are continuously // posted, preventing timer starvation. From 931a23d5a5d3e6d11c351ef86bd8b8d26f1341a8 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 4 Feb 2026 15:52:23 +0100 Subject: [PATCH 2/5] Avoid thundering herd in reactor wake-up Only wake idle threads, and only as many as we have work available. This prevents waking all threads when only a few completions arrive. --- src/corosio/src/detail/epoll/scheduler.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index c1c3bc30..14b3221f 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -754,12 +754,12 @@ run_reactor(std::unique_lock& lock) if (!local_ops.empty()) completed_ops_.splice(local_ops); - if (completions_queued > 0) + // Only wake threads that are actually idle, and only as many as we have work + if (completions_queued > 0 && idle_thread_count_ > 0) { - if (completions_queued == 1) + int threads_to_wake = (std::min)(completions_queued, idle_thread_count_); + for (int i = 0; i < threads_to_wake; ++i) wakeup_event_.notify_one(); - else - wakeup_event_.notify_all(); } } From 3e4587d4110baf9fa700f14526867f8aefb7fd8b Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 4 Feb 2026 16:44:44 +0100 Subject: [PATCH 3/5] Add thread-local private queue to bypass mutex contention When posting work from within the scheduler's run loop, use a thread-local queue instead of acquiring the global mutex. This matches Asio's thread_info::private_op_queue optimization. - Extend scheduler_context with private_queue and work counter - Fast path in post() detects same-thread via context_stack - Drain points: before blocking, after reactor splice, on exit - Reduces futex calls from ~450K to 1 in multi-threaded benchmarks --- src/corosio/src/detail/epoll/scheduler.cpp | 82 +++++++++++++++++++++- src/corosio/src/detail/epoll/scheduler.hpp | 11 +++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index 14b3221f..2d98a099 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -92,6 +92,15 @@ struct scheduler_context { epoll_scheduler const* key; scheduler_context* next; + op_queue private_queue; + long private_outstanding_work; + + scheduler_context(epoll_scheduler const* k, scheduler_context* n) + : key(k) + , next(n) + , private_outstanding_work(0) + { + } }; corosio::detail::thread_local_ptr context_stack; @@ -102,17 +111,28 @@ struct thread_context_guard explicit thread_context_guard( epoll_scheduler const* ctx) noexcept - : frame_{ctx, context_stack.get()} + : frame_(ctx, context_stack.get()) { context_stack.set(&frame_); } ~thread_context_guard() noexcept { + if (!frame_.private_queue.empty()) + frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work); context_stack.set(frame_.next); } }; +scheduler_context* +find_context(epoll_scheduler const* self) noexcept +{ + for (auto* c = context_stack.get(); c != nullptr; c = c->next) + if (c->key == self) + return c; + return nullptr; +} + } // namespace epoll_scheduler:: @@ -259,6 +279,17 @@ post(capy::coro h) const }; auto ph = std::make_unique(h); + + // Fast path: same thread posts to private queue without locking + if (auto* ctx = find_context(this)) + { + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + ++ctx->private_outstanding_work; + ctx->private_queue.push(ph.release()); + return; + } + + // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); std::unique_lock lock(mutex_); @@ -270,6 +301,16 @@ void epoll_scheduler:: post(scheduler_op* h) const { + // Fast path: same thread posts to private queue without locking + if (auto* ctx = find_context(this)) + { + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + ++ctx->private_outstanding_work; + ctx->private_queue.push(h); + return; + } + + // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); std::unique_lock lock(mutex_); @@ -489,6 +530,17 @@ work_finished() const noexcept } } +void +epoll_scheduler:: +drain_thread_queue(op_queue& queue, long count) const +{ + std::lock_guard lock(mutex_); + // Note: outstanding_work_ was already incremented when posting + completed_ops_.splice(queue); + if (count > 0) + wakeup_event_.notify_all(); +} + void epoll_scheduler:: interrupt_reactor() const @@ -548,7 +600,6 @@ update_timerfd() const if (nearest == timer_service::time_point::max()) { // No timers - disarm by setting to 0 (relative) - // ts is already zeroed } else { @@ -754,6 +805,17 @@ run_reactor(std::unique_lock& lock) if (!local_ops.empty()) completed_ops_.splice(local_ops); + // Drain private queue (outstanding_work_ was already incremented when posting) + if (auto* ctx = find_context(this)) + { + if (!ctx->private_queue.empty()) + { + completions_queued += ctx->private_outstanding_work; + ctx->private_outstanding_work = 0; + completed_ops_.splice(ctx->private_queue); + } + } + // Only wake threads that are actually idle, and only as many as we have work if (completions_queued > 0 && idle_thread_count_ > 0) { @@ -778,7 +840,10 @@ do_one(long timeout_us) if (op == &task_op_) { - bool more_handlers = !completed_ops_.empty(); + // Check both global queue and private queue for pending handlers + auto* ctx = find_context(this); + bool more_handlers = !completed_ops_.empty() || + (ctx && !ctx->private_queue.empty()); if (!more_handlers) { @@ -821,6 +886,17 @@ do_one(long timeout_us) if (timeout_us == 0) return 0; + // Drain private queue before blocking (outstanding_work_ was already incremented) + if (auto* ctx = find_context(this)) + { + if (!ctx->private_queue.empty()) + { + ctx->private_outstanding_work = 0; + completed_ops_.splice(ctx->private_queue); + continue; + } + } + ++idle_thread_count_; if (timeout_us < 0) wakeup_event_.wait(lock); diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 686afbf7..069c0c42 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -70,6 +70,7 @@ class epoll_scheduler capy::execution_context& ctx, int concurrency_hint = -1); + /// Destroy the scheduler. ~epoll_scheduler(); epoll_scheduler(epoll_scheduler const&) = delete; @@ -130,6 +131,16 @@ class epoll_scheduler /** For use by I/O operations to track completed work. */ void work_finished() const noexcept override; + /** Drain work from thread context's private queue to global queue. + + Called by thread_context_guard destructor when a thread exits run(). + Transfers pending work to the global queue under mutex protection. + + @param queue The private queue to drain. + @param count Item count for wakeup decisions (wakes other threads if positive). + */ + void drain_thread_queue(op_queue& queue, long count) const; + private: std::size_t do_one(long timeout_us); void run_reactor(std::unique_lock& lock); From 892a4ef7475aa4875856bbf1a40d66dcf2164be1 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 4 Feb 2026 17:03:10 +0100 Subject: [PATCH 4/5] Add 16-thread benchmark configuration --- bench/asio/http_server_bench.cpp | 1 + bench/corosio/http_server_bench.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/bench/asio/http_server_bench.cpp b/bench/asio/http_server_bench.cpp index 34488648..81ff5771 100644 --- a/bench/asio/http_server_bench.cpp +++ b/bench/asio/http_server_bench.cpp @@ -364,6 +364,7 @@ void run_http_server_benchmarks( collector.add( bench_multithread( 2, 32, 31250 ) ); collector.add( bench_multithread( 4, 32, 31250 ) ); collector.add( bench_multithread( 8, 32, 31250 ) ); + collector.add( bench_multithread( 16, 32, 31250 ) ); } } diff --git a/bench/corosio/http_server_bench.cpp b/bench/corosio/http_server_bench.cpp index 4a642786..814dab3f 100644 --- a/bench/corosio/http_server_bench.cpp +++ b/bench/corosio/http_server_bench.cpp @@ -373,6 +373,7 @@ void run_http_server_benchmarks( collector.add( bench_multithread( 2, 32, 31250 ) ); collector.add( bench_multithread( 4, 32, 31250 ) ); collector.add( bench_multithread( 8, 32, 31250 ) ); + collector.add( bench_multithread( 16, 32, 31250 ) ); } } From 5bce71e88eeaa6c8e67bd2e7da28c184aa690b1e Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 4 Feb 2026 17:31:56 +0100 Subject: [PATCH 5/5] Fix timerfd handling in epoll reactor - Consume timerfd on expiry to prevent epoll busy-spinning (level-triggered fd must be read to clear readable state) - Remove last_timerfd_expiry_ caching optimization to match Asio (eliminates data race between timer callback and reactor thread) --- src/corosio/src/detail/epoll/scheduler.cpp | 8 ++------ src/corosio/src/detail/epoll/scheduler.hpp | 2 -- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index 2d98a099..537d3b62 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -588,12 +588,6 @@ update_timerfd() const { auto nearest = timer_svc_->nearest_expiry(); - // Skip syscall if expiry hasn't changed - if (nearest == last_timerfd_expiry_) - return; - - last_timerfd_expiry_ = nearest; - itimerspec ts{}; int flags = 0; @@ -659,6 +653,8 @@ run_reactor(std::unique_lock& lock) if (events[i].data.ptr == &timer_fd_) { + std::uint64_t expirations; + [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations)); check_timers = true; continue; } diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 069c0c42..4d1f2ce6 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -167,8 +167,6 @@ class epoll_scheduler // Edge-triggered eventfd state mutable std::atomic eventfd_armed_{false}; - // Track last timerfd expiry to avoid redundant timerfd_settime calls - mutable timer_service::time_point last_timerfd_expiry_{timer_service::time_point::max()}; // Sentinel operation for interleaving reactor runs with handler execution. // Ensures the reactor runs periodically even when handlers are continuously