diff --git a/bench/asio/http_server_bench.cpp b/bench/asio/http_server_bench.cpp index 34488648..81ff5771 100644 --- a/bench/asio/http_server_bench.cpp +++ b/bench/asio/http_server_bench.cpp @@ -364,6 +364,7 @@ void run_http_server_benchmarks( collector.add( bench_multithread( 2, 32, 31250 ) ); collector.add( bench_multithread( 4, 32, 31250 ) ); collector.add( bench_multithread( 8, 32, 31250 ) ); + collector.add( bench_multithread( 16, 32, 31250 ) ); } } diff --git a/bench/corosio/http_server_bench.cpp b/bench/corosio/http_server_bench.cpp index 4a642786..814dab3f 100644 --- a/bench/corosio/http_server_bench.cpp +++ b/bench/corosio/http_server_bench.cpp @@ -373,6 +373,7 @@ void run_http_server_benchmarks( collector.add( bench_multithread( 2, 32, 31250 ) ); collector.add( bench_multithread( 4, 32, 31250 ) ); collector.add( bench_multithread( 8, 32, 31250 ) ); + collector.add( bench_multithread( 16, 32, 31250 ) ); } } diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index 2c421aeb..537d3b62 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -92,6 +92,15 @@ struct scheduler_context { epoll_scheduler const* key; scheduler_context* next; + op_queue private_queue; + long private_outstanding_work; + + scheduler_context(epoll_scheduler const* k, scheduler_context* n) + : key(k) + , next(n) + , private_outstanding_work(0) + { + } }; corosio::detail::thread_local_ptr context_stack; @@ -102,17 +111,28 @@ struct thread_context_guard explicit thread_context_guard( epoll_scheduler const* ctx) noexcept - : frame_{ctx, context_stack.get()} + : frame_(ctx, context_stack.get()) { context_stack.set(&frame_); } ~thread_context_guard() noexcept { + if (!frame_.private_queue.empty()) + frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work); context_stack.set(frame_.next); } }; +scheduler_context* +find_context(epoll_scheduler const* self) noexcept +{ + for (auto* c = context_stack.get(); c != nullptr; c = c->next) + if (c->key == self) + return c; + return nullptr; +} + } // namespace epoll_scheduler:: @@ -259,6 +279,17 @@ post(capy::coro h) const }; auto ph = std::make_unique(h); + + // Fast path: same thread posts to private queue without locking + if (auto* ctx = find_context(this)) + { + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + ++ctx->private_outstanding_work; + ctx->private_queue.push(ph.release()); + return; + } + + // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); std::unique_lock lock(mutex_); @@ -270,6 +301,16 @@ void epoll_scheduler:: post(scheduler_op* h) const { + // Fast path: same thread posts to private queue without locking + if (auto* ctx = find_context(this)) + { + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + ++ctx->private_outstanding_work; + ctx->private_queue.push(h); + return; + } + + // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); std::unique_lock lock(mutex_); @@ -489,6 +530,17 @@ work_finished() const noexcept } } +void +epoll_scheduler:: +drain_thread_queue(op_queue& queue, long count) const +{ + std::lock_guard lock(mutex_); + // Note: outstanding_work_ was already incremented when posting + completed_ops_.splice(queue); + if (count > 0) + wakeup_event_.notify_all(); +} + void epoll_scheduler:: interrupt_reactor() const @@ -542,7 +594,6 @@ update_timerfd() const if (nearest == timer_service::time_point::max()) { // No timers - disarm by setting to 0 (relative) - // ts is already zeroed } else { @@ -576,19 +627,20 @@ run_reactor(std::unique_lock& lock) lock.unlock(); + // --- Event loop runs WITHOUT the mutex (like Asio) --- + epoll_event events[128]; int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); int saved_errno = errno; - timer_svc_->process_expired(); - update_timerfd(); - if (nfds < 0 && saved_errno != EINTR) detail::throw_system_error(make_err(saved_errno), "epoll_wait"); - lock.lock(); - + bool check_timers = false; + op_queue local_ops; int completions_queued = 0; + + // Process events without holding the mutex for (int i = 0; i < nfds; ++i) { if (events[i].data.ptr == nullptr) @@ -599,9 +651,13 @@ run_reactor(std::unique_lock& lock) continue; } - // timerfd_settime() in update_timerfd() resets the readable state if (events[i].data.ptr == &timer_fd_) + { + std::uint64_t expirations; + [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations)); + check_timers = true; continue; + } auto* desc = static_cast(events[i].data.ptr); std::uint32_t ev = events[i].events; @@ -624,7 +680,7 @@ run_reactor(std::unique_lock& lock) if (err) { op->complete(err, 0); - completed_ops_.push(op); + local_ops.push(op); ++completions_queued; } else @@ -637,7 +693,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(op); + local_ops.push(op); ++completions_queued; } } @@ -650,14 +706,13 @@ run_reactor(std::unique_lock& lock) if (ev & EPOLLOUT) { - // Connect uses write readiness - try it first auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel); if (conn_op) { if (err) { conn_op->complete(err, 0); - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } else @@ -670,7 +725,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } } @@ -682,7 +737,7 @@ run_reactor(std::unique_lock& lock) if (err) { write_op->complete(err, 0); - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } else @@ -695,7 +750,7 @@ run_reactor(std::unique_lock& lock) } else { - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } } @@ -705,14 +760,13 @@ run_reactor(std::unique_lock& lock) desc->write_ready.store(true, std::memory_order_release); } - // Handle error for ops not processed above (no EPOLLIN/EPOLLOUT) if (err && !(ev & (EPOLLIN | EPOLLOUT))) { auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel); if (read_op) { read_op->complete(err, 0); - completed_ops_.push(read_op); + local_ops.push(read_op); ++completions_queued; } @@ -720,7 +774,7 @@ run_reactor(std::unique_lock& lock) if (write_op) { write_op->complete(err, 0); - completed_ops_.push(write_op); + local_ops.push(write_op); ++completions_queued; } @@ -728,18 +782,42 @@ run_reactor(std::unique_lock& lock) if (conn_op) { conn_op->complete(err, 0); - completed_ops_.push(conn_op); + local_ops.push(conn_op); ++completions_queued; } } } - if (completions_queued > 0) + // Process timers only when timerfd fires (like Asio's check_timers pattern) + if (check_timers) + { + timer_svc_->process_expired(); + update_timerfd(); + } + + // --- Acquire mutex only for queue operations --- + lock.lock(); + + if (!local_ops.empty()) + completed_ops_.splice(local_ops); + + // Drain private queue (outstanding_work_ was already incremented when posting) + if (auto* ctx = find_context(this)) { - if (completions_queued == 1) + if (!ctx->private_queue.empty()) + { + completions_queued += ctx->private_outstanding_work; + ctx->private_outstanding_work = 0; + completed_ops_.splice(ctx->private_queue); + } + } + + // Only wake threads that are actually idle, and only as many as we have work + if (completions_queued > 0 && idle_thread_count_ > 0) + { + int threads_to_wake = (std::min)(completions_queued, idle_thread_count_); + for (int i = 0; i < threads_to_wake; ++i) wakeup_event_.notify_one(); - else - wakeup_event_.notify_all(); } } @@ -758,7 +836,10 @@ do_one(long timeout_us) if (op == &task_op_) { - bool more_handlers = !completed_ops_.empty(); + // Check both global queue and private queue for pending handlers + auto* ctx = find_context(this); + bool more_handlers = !completed_ops_.empty() || + (ctx && !ctx->private_queue.empty()); if (!more_handlers) { @@ -801,6 +882,17 @@ do_one(long timeout_us) if (timeout_us == 0) return 0; + // Drain private queue before blocking (outstanding_work_ was already incremented) + if (auto* ctx = find_context(this)) + { + if (!ctx->private_queue.empty()) + { + ctx->private_outstanding_work = 0; + completed_ops_.splice(ctx->private_queue); + continue; + } + } + ++idle_thread_count_; if (timeout_us < 0) wakeup_event_.wait(lock); diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 25530c57..4d1f2ce6 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -70,6 +70,7 @@ class epoll_scheduler capy::execution_context& ctx, int concurrency_hint = -1); + /// Destroy the scheduler. ~epoll_scheduler(); epoll_scheduler(epoll_scheduler const&) = delete; @@ -130,6 +131,16 @@ class epoll_scheduler /** For use by I/O operations to track completed work. */ void work_finished() const noexcept override; + /** Drain work from thread context's private queue to global queue. + + Called by thread_context_guard destructor when a thread exits run(). + Transfers pending work to the global queue under mutex protection. + + @param queue The private queue to drain. + @param count Item count for wakeup decisions (wakes other threads if positive). + */ + void drain_thread_queue(op_queue& queue, long count) const; + private: std::size_t do_one(long timeout_us); void run_reactor(std::unique_lock& lock); @@ -156,6 +167,7 @@ class epoll_scheduler // Edge-triggered eventfd state mutable std::atomic eventfd_armed_{false}; + // Sentinel operation for interleaving reactor runs with handler execution. // Ensures the reactor runs periodically even when handlers are continuously // posted, preventing timer starvation.