diff --git a/src/corosio/src/detail/epoll/op.hpp b/src/corosio/src/detail/epoll/op.hpp index ba6123e..3f29bfb 100644 --- a/src/corosio/src/detail/epoll/op.hpp +++ b/src/corosio/src/detail/epoll/op.hpp @@ -30,7 +30,6 @@ #include #include -#include #include #include #include @@ -96,7 +95,6 @@ struct epoll_op : scheduler_op std::size_t* bytes_out = nullptr; int fd = -1; - std::uint32_t events = 0; int errn = 0; std::size_t bytes_transferred = 0; @@ -116,7 +114,6 @@ struct epoll_op : scheduler_op void reset() noexcept { fd = -1; - events = 0; errn = 0; bytes_transferred = 0; cancelled.store(false, std::memory_order_relaxed); @@ -178,12 +175,6 @@ struct epoll_op : scheduler_op virtual void perform_io() noexcept {} }; -inline epoll_op* -get_epoll_op(scheduler_op* h) noexcept -{ - return static_cast(h->data()); -} - //------------------------------------------------------------------------------ struct epoll_connect_op : epoll_op diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index ec3a619..e60a594 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -30,44 +30,55 @@ #include /* - epoll Scheduler - =============== + epoll Scheduler - Single Reactor Model + ====================================== - The scheduler is the heart of the I/O event loop. It multiplexes I/O - readiness notifications from epoll with a completion queue for operations - that finished synchronously or were cancelled. + This scheduler uses a thread coordination strategy to provide handler + parallelism and avoid the thundering herd problem. + Instead of all threads blocking on epoll_wait(), one thread becomes the + "reactor" while others wait on a condition variable for handler work. + + Thread Model + ------------ + - ONE thread runs epoll_wait() at a time (the reactor thread) + - OTHER threads wait on wakeup_event_ (condition variable) for handlers + - When work is posted, exactly one waiting thread wakes via notify_one() + - This matches Windows IOCP semantics where N posted items wake N threads Event Loop Structure (do_one) ----------------------------- - 1. Check completion queue first (mutex-protected) - 2. If empty, call epoll_wait with calculated timeout - 3. Process timer expirations - 4. For each ready fd, claim the operation and perform I/O - 5. Push completed operations to completion queue - 6. Pop one and invoke its handler - - The completion queue exists because handlers must run outside the epoll - processing loop. This allows handlers to safely start new operations - on the same fd without corrupting iteration state. - - Wakeup Mechanism - ---------------- - An eventfd allows other threads (or cancel/post calls) to wake the - event loop from epoll_wait. We distinguish wakeup events from I/O by - storing nullptr in epoll_event.data.ptr for the eventfd. + 1. Lock mutex, try to pop handler from queue + 2. If got handler: execute it (unlocked), return + 3. If queue empty and no reactor running: become reactor + - Run epoll_wait (unlocked), queue I/O completions, loop back + 4. If queue empty and reactor running: wait on condvar for work + + The reactor_running_ flag ensures only one thread owns epoll_wait(). + After the reactor queues I/O completions, it loops back to try getting + a handler, giving priority to handler execution over more I/O polling. + + Wake Coordination (wake_one_thread_and_unlock) + ---------------------------------------------- + When posting work: + - If idle threads exist: notify_one() wakes exactly one worker + - Else if reactor running: interrupt via eventfd write + - Else: no-op (thread will find work when it checks queue) + + This is critical for matching IOCP behavior. With the old model, posting + N handlers would wake all threads (thundering herd). Now each post() + wakes at most one thread, and that thread handles exactly one item. Work Counting ------------- outstanding_work_ tracks pending operations. When it hits zero, run() - returns. This is how io_context knows there's nothing left to do. - Each operation increments on start, decrements on completion. + returns. Each operation increments on start, decrements on completion. Timer Integration ----------------- - Timers are handled by timer_service. The scheduler adjusts epoll_wait - timeout to wake in time for the nearest timer expiry. When a new timer - is scheduled earlier than current, timer_service calls wakeup() to - re-evaluate the timeout. + Timers are handled by timer_service. The reactor adjusts epoll_wait + timeout to wake for the nearest timer expiry. When a new timer is + scheduled earlier than current, timer_service calls interrupt_reactor() + to re-evaluate the timeout. */ namespace boost { @@ -112,6 +123,9 @@ epoll_scheduler( , outstanding_work_(0) , stopped_(false) , shutdown_(false) + , reactor_running_(false) + , reactor_interrupted_(false) + , idle_thread_count_(0) { epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); if (epoll_fd_ < 0) @@ -140,7 +154,7 @@ epoll_scheduler( timer_svc_->set_on_earliest_changed( timer_service::callback( this, - [](void* p) { static_cast(p)->wakeup(); })); + [](void* p) { static_cast(p)->interrupt_reactor(); })); } epoll_scheduler:: @@ -166,6 +180,8 @@ shutdown() lock.lock(); } + // Wake all waiting threads so they can exit + wakeup_event_.notify_all(); outstanding_work_.store(0, std::memory_order_release); } @@ -199,14 +215,12 @@ post(capy::coro h) const } }; - auto* ph = new post_handler(h); + auto ph = std::make_unique(h); outstanding_work_.fetch_add(1, std::memory_order_relaxed); - { - std::lock_guard lock(mutex_); - completed_ops_.push(ph); - } - wakeup(); + std::unique_lock lock(mutex_); + completed_ops_.push(ph.release()); + wake_one_thread_and_unlock(lock); } void @@ -215,11 +229,9 @@ post(scheduler_op* h) const { outstanding_work_.fetch_add(1, std::memory_order_relaxed); - { - std::lock_guard lock(mutex_); - completed_ops_.push(h); - } - wakeup(); + std::unique_lock lock(mutex_); + completed_ops_.push(h); + wake_one_thread_and_unlock(lock); } void @@ -255,7 +267,12 @@ stop() if (stopped_.compare_exchange_strong(expected, true, std::memory_order_release, std::memory_order_relaxed)) { - wakeup(); + // Wake all threads so they notice stopped_ and exit + { + std::lock_guard lock(mutex_); + wakeup_event_.notify_all(); + } + interrupt_reactor(); } } @@ -408,17 +425,57 @@ void epoll_scheduler:: work_finished() const noexcept { - outstanding_work_.fetch_sub(1, std::memory_order_acq_rel); + if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + // Last work item completed - wake all threads so they can exit. + // notify_all() wakes threads waiting on the condvar. + // interrupt_reactor() wakes the reactor thread blocked in epoll_wait(). + // Both are needed because they target different blocking mechanisms. + std::unique_lock lock(mutex_); + wakeup_event_.notify_all(); + if (reactor_running_ && !reactor_interrupted_) + { + reactor_interrupted_ = true; + lock.unlock(); + interrupt_reactor(); + } + } } void epoll_scheduler:: -wakeup() const +interrupt_reactor() const { std::uint64_t val = 1; [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); } +void +epoll_scheduler:: +wake_one_thread_and_unlock(std::unique_lock& lock) const +{ + if (idle_thread_count_ > 0) + { + // Idle worker exists - wake it via condvar + wakeup_event_.notify_one(); + lock.unlock(); + } + else if (reactor_running_ && !reactor_interrupted_) + { + // No idle workers but reactor is running - interrupt it so it + // can re-check the queue after processing current epoll events + reactor_interrupted_ = true; + lock.unlock(); + interrupt_reactor(); + } + else + { + // No one to wake - either reactor will pick up work when it + // re-checks queue, or next thread to call run() will get it + lock.unlock(); + } +} + struct work_guard { epoll_scheduler const* self; @@ -451,112 +508,153 @@ calculate_timeout(long requested_timeout_us) const static_cast(timer_timeout_us))); } +void +epoll_scheduler:: +run_reactor(std::unique_lock& lock) +{ + // Calculate timeout considering timers, use 0 if interrupted + long effective_timeout_us = reactor_interrupted_ ? 0 : calculate_timeout(-1); + + int timeout_ms; + if (effective_timeout_us < 0) + timeout_ms = -1; + else if (effective_timeout_us == 0) + timeout_ms = 0; + else + timeout_ms = static_cast((effective_timeout_us + 999) / 1000); + + lock.unlock(); + + epoll_event events[64]; + int nfds = ::epoll_wait(epoll_fd_, events, 64, timeout_ms); + int saved_errno = errno; // Save before process_expired() may overwrite + + // Process timers outside the lock - timer completions may call post() + // which needs to acquire the lock + timer_svc_->process_expired(); + + if (nfds < 0 && saved_errno != EINTR) + detail::throw_system_error(make_err(saved_errno), "epoll_wait"); + + // Process I/O completions - these become handlers in the queue + // Must re-acquire lock before modifying completed_ops_ + lock.lock(); + + int completions_queued = 0; + for (int i = 0; i < nfds; ++i) + { + if (events[i].data.ptr == nullptr) + { + // eventfd interrupt - just drain it + std::uint64_t val; + [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); + continue; + } + + auto* op = static_cast(events[i].data.ptr); + + bool was_registered = op->registered.exchange(false, std::memory_order_acq_rel); + if (!was_registered) + continue; + + unregister_fd(op->fd); + + if (events[i].events & (EPOLLERR | EPOLLHUP)) + { + int errn = 0; + socklen_t len = sizeof(errn); + if (::getsockopt(op->fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0) + errn = errno; + if (errn == 0) + errn = EIO; + op->complete(errn, 0); + } + else + { + op->perform_io(); + } + + completed_ops_.push(op); + ++completions_queued; + } + + // Wake idle workers if we queued I/O completions + if (completions_queued > 0) + { + if (completions_queued >= idle_thread_count_) + wakeup_event_.notify_all(); + else + for (int i = 0; i < completions_queued; ++i) + wakeup_event_.notify_one(); + } +} + std::size_t epoll_scheduler:: do_one(long timeout_us) { + std::unique_lock lock(mutex_); + + using clock = std::chrono::steady_clock; + auto deadline = (timeout_us > 0) + ? clock::now() + std::chrono::microseconds(timeout_us) + : clock::time_point{}; + for (;;) { if (stopped_.load(std::memory_order_acquire)) return 0; - scheduler_op* h = nullptr; - { - std::lock_guard lock(mutex_); - h = completed_ops_.pop(); - } + // Try to get a handler from the queue + scheduler_op* op = completed_ops_.pop(); - if (h) + if (op != nullptr) { + // Got a handler - execute it + lock.unlock(); work_guard g{this}; - (*h)(); + (*op)(); return 1; } + // Queue is empty - check if we should become reactor or wait if (outstanding_work_.load(std::memory_order_acquire) == 0) return 0; - long effective_timeout_us = calculate_timeout(timeout_us); - - int timeout_ms; - if (effective_timeout_us < 0) - timeout_ms = -1; - else if (effective_timeout_us == 0) - timeout_ms = 0; - else - timeout_ms = static_cast((effective_timeout_us + 999) / 1000); - - epoll_event events[64]; - int nfds = ::epoll_wait(epoll_fd_, events, 64, timeout_ms); + if (timeout_us == 0) + return 0; // Non-blocking poll - if (nfds < 0) + // Check if timeout has expired (for positive timeout_us) + long remaining_us = timeout_us; + if (timeout_us > 0) { - if (errno == EINTR) - { - if (timeout_us < 0) - continue; + auto now = clock::now(); + if (now >= deadline) return 0; - } - detail::throw_system_error(make_err(errno), "epoll_wait"); + remaining_us = std::chrono::duration_cast( + deadline - now).count(); } - timer_svc_->process_expired(); - - for (int i = 0; i < nfds; ++i) + if (!reactor_running_) { - if (events[i].data.ptr == nullptr) - { - std::uint64_t val; - [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); - continue; - } - - auto* op = static_cast(events[i].data.ptr); - - bool was_registered = op->registered.exchange(false, std::memory_order_acq_rel); - if (!was_registered) - continue; - - unregister_fd(op->fd); - - if (events[i].events & (EPOLLERR | EPOLLHUP)) - { - int errn = 0; - socklen_t len = sizeof(errn); - if (::getsockopt(op->fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0) - errn = errno; - if (errn == 0) - errn = EIO; - op->complete(errn, 0); - } - else - { - op->perform_io(); - } - - { - std::lock_guard lock(mutex_); - completed_ops_.push(op); - } - } - - if (stopped_.load(std::memory_order_acquire)) - return 0; + // No reactor running and queue empty - become the reactor + reactor_running_ = true; + reactor_interrupted_ = false; - { - std::lock_guard lock(mutex_); - h = completed_ops_.pop(); - } + run_reactor(lock); - if (h) - { - work_guard g{this}; - (*h)(); - return 1; + reactor_running_ = false; + // Loop back to check for handlers that reactor may have queued + continue; } - if (timeout_us >= 0) - return 0; + // Reactor is running in another thread - wait for work on condvar + ++idle_thread_count_; + if (timeout_us < 0) + wakeup_event_.wait(lock); + else + wakeup_event_.wait_for(lock, std::chrono::microseconds(remaining_us)); + --idle_thread_count_; } } diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 71dae27..ad1c9d6 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -36,12 +37,18 @@ struct epoll_op; /** Linux scheduler using epoll for I/O multiplexing. This scheduler implements the scheduler interface using Linux epoll - for efficient I/O event notification. It manages a queue of handlers - and provides blocking/non-blocking execution methods. + for efficient I/O event notification. It uses a single reactor model + where one thread runs epoll_wait while other threads + wait on a condition variable for handler work. This design provides: - The scheduler uses an eventfd to wake up epoll_wait when non-I/O - handlers are posted, enabling efficient integration of both - I/O completions and posted handlers. + - Handler parallelism: N posted handlers can execute on N threads + - No thundering herd: condition_variable wakes exactly one thread + - IOCP parity: Behavior matches Windows I/O completion port semantics + + When threads call run(), they first try to execute queued handlers. + If the queue is empty and no reactor is running, one thread becomes + the reactor and runs epoll_wait. Other threads wait on a condition + variable until handlers are available. @par Thread Safety All public member functions are thread-safe. @@ -123,17 +130,25 @@ class epoll_scheduler private: std::size_t do_one(long timeout_us); - void wakeup() const; + void run_reactor(std::unique_lock& lock); + void wake_one_thread_and_unlock(std::unique_lock& lock) const; + void interrupt_reactor() const; long calculate_timeout(long requested_timeout_us) const; int epoll_fd_; - int event_fd_; // for waking epoll_wait + int event_fd_; // for interrupting reactor mutable std::mutex mutex_; + mutable std::condition_variable wakeup_event_; mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; std::atomic stopped_; bool shutdown_; timer_service* timer_svc_ = nullptr; + + // Single reactor thread coordination + mutable bool reactor_running_ = false; + mutable bool reactor_interrupted_ = false; + mutable int idle_thread_count_ = 0; }; } // namespace detail diff --git a/src/corosio/src/detail/epoll/sockets.cpp b/src/corosio/src/detail/epoll/sockets.cpp new file mode 100644 index 0000000..af3f7e7 --- /dev/null +++ b/src/corosio/src/detail/epoll/sockets.cpp @@ -0,0 +1,533 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include "src/detail/config_backend.hpp" + +#if defined(BOOST_COROSIO_BACKEND_EPOLL) + +#include "src/detail/epoll/sockets.hpp" +#include "src/detail/endpoint_convert.hpp" +#include "src/detail/make_err.hpp" + +#include + +#include +#include +#include +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +//------------------------------------------------------------------------------ +// epoll_socket_impl +//------------------------------------------------------------------------------ + +epoll_socket_impl:: +epoll_socket_impl(epoll_sockets& svc) noexcept + : svc_(svc) +{ +} + +void +epoll_socket_impl:: +release() +{ + close_socket(); + svc_.destroy_impl(*this); +} + +void +epoll_socket_impl:: +connect( + std::coroutine_handle<> h, + capy::executor_ref d, + endpoint ep, + std::stop_token token, + system::error_code* ec) +{ + auto& op = conn_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.fd = fd_; + op.start(token); + + sockaddr_in addr = detail::to_sockaddr_in(ep); + int result = ::connect(fd_, reinterpret_cast(&addr), sizeof(addr)); + + if (result == 0) + { + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EINPROGRESS) + { + svc_.work_started(); + op.registered.store(true, std::memory_order_release); + svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); + return; + } + + op.complete(errno, 0); + svc_.post(&op); +} + +void +epoll_socket_impl:: +read_some( + std::coroutine_handle<> h, + capy::executor_ref d, + io_buffer_param param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_out) +{ + auto& op = rd_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token); + + capy::mutable_buffer bufs[epoll_read_op::max_buffers]; + op.iovec_count = static_cast(param.copy_to(bufs, epoll_read_op::max_buffers)); + + if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) + { + op.empty_buffer_read = true; + op.complete(0, 0); + svc_.post(&op); + return; + } + + for (int i = 0; i < op.iovec_count; ++i) + { + op.iovecs[i].iov_base = bufs[i].data(); + op.iovecs[i].iov_len = bufs[i].size(); + } + + ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count); + + if (n > 0) + { + op.complete(0, static_cast(n)); + svc_.post(&op); + return; + } + + if (n == 0) + { + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + svc_.work_started(); + op.registered.store(true, std::memory_order_release); + svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); + return; + } + + op.complete(errno, 0); + svc_.post(&op); +} + +void +epoll_socket_impl:: +write_some( + std::coroutine_handle<> h, + capy::executor_ref d, + io_buffer_param param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_out) +{ + auto& op = wr_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token); + + capy::mutable_buffer bufs[epoll_write_op::max_buffers]; + op.iovec_count = static_cast(param.copy_to(bufs, epoll_write_op::max_buffers)); + + if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) + { + op.complete(0, 0); + svc_.post(&op); + return; + } + + for (int i = 0; i < op.iovec_count; ++i) + { + op.iovecs[i].iov_base = bufs[i].data(); + op.iovecs[i].iov_len = bufs[i].size(); + } + + msghdr msg{}; + msg.msg_iov = op.iovecs; + msg.msg_iovlen = static_cast(op.iovec_count); + + ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL); + + if (n > 0) + { + op.complete(0, static_cast(n)); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + svc_.work_started(); + op.registered.store(true, std::memory_order_release); + svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); + return; + } + + op.complete(errno ? errno : EIO, 0); + svc_.post(&op); +} + +system::error_code +epoll_socket_impl:: +shutdown(socket::shutdown_type what) noexcept +{ + int how; + switch (what) + { + case socket::shutdown_receive: how = SHUT_RD; break; + case socket::shutdown_send: how = SHUT_WR; break; + case socket::shutdown_both: how = SHUT_RDWR; break; + default: + return make_err(EINVAL); + } + if (::shutdown(fd_, how) != 0) + return make_err(errno); + return {}; +} + +void +epoll_socket_impl:: +cancel() noexcept +{ + std::shared_ptr self; + try { + self = shared_from_this(); + } catch (const std::bad_weak_ptr&) { + return; + } + + auto cancel_op = [this, &self](epoll_op& op) { + bool was_registered = op.registered.exchange(false, std::memory_order_acq_rel); + op.request_cancel(); + if (was_registered) + { + svc_.scheduler().unregister_fd(fd_); + op.impl_ptr = self; + svc_.post(&op); + svc_.work_finished(); + } + }; + + cancel_op(conn_); + cancel_op(rd_); + cancel_op(wr_); +} + +void +epoll_socket_impl:: +close_socket() noexcept +{ + cancel(); + + if (fd_ >= 0) + { + ::close(fd_); + fd_ = -1; + } +} + +//------------------------------------------------------------------------------ +// epoll_acceptor_impl +//------------------------------------------------------------------------------ + +epoll_acceptor_impl:: +epoll_acceptor_impl(epoll_sockets& svc) noexcept + : svc_(svc) +{ +} + +void +epoll_acceptor_impl:: +release() +{ + close_socket(); + svc_.destroy_acceptor_impl(*this); +} + +void +epoll_acceptor_impl:: +accept( + std::coroutine_handle<> h, + capy::executor_ref d, + std::stop_token token, + system::error_code* ec, + io_object::io_object_impl** impl_out) +{ + auto& op = acc_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.impl_out = impl_out; + op.fd = fd_; + op.start(token); + + op.service_ptr = &svc_; + op.create_peer = [](void* svc_ptr, int new_fd) -> io_object::io_object_impl* { + auto& svc = *static_cast(svc_ptr); + auto& peer_impl = svc.create_impl(); + peer_impl.set_socket(new_fd); + return &peer_impl; + }; + + sockaddr_in addr{}; + socklen_t addrlen = sizeof(addr); + int accepted = ::accept4(fd_, reinterpret_cast(&addr), + &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (accepted >= 0) + { + auto& peer_impl = svc_.create_impl(); + peer_impl.set_socket(accepted); + op.accepted_fd = accepted; + op.peer_impl = &peer_impl; + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + svc_.work_started(); + op.registered.store(true, std::memory_order_release); + svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); + return; + } + + op.complete(errno, 0); + svc_.post(&op); +} + +void +epoll_acceptor_impl:: +cancel() noexcept +{ + std::shared_ptr self; + try { + self = shared_from_this(); + } catch (const std::bad_weak_ptr&) { + return; + } + + bool was_registered = acc_.registered.exchange(false, std::memory_order_acq_rel); + acc_.request_cancel(); + + if (was_registered) + { + svc_.scheduler().unregister_fd(fd_); + acc_.impl_ptr = self; + svc_.post(&acc_); + svc_.work_finished(); + } +} + +void +epoll_acceptor_impl:: +close_socket() noexcept +{ + cancel(); + + if (fd_ >= 0) + { + ::close(fd_); + fd_ = -1; + } +} + +//------------------------------------------------------------------------------ +// epoll_sockets +//------------------------------------------------------------------------------ + +epoll_sockets:: +epoll_sockets(capy::execution_context& ctx) + : sched_(ctx.use_service()) +{ +} + +epoll_sockets:: +~epoll_sockets() +{ +} + +void +epoll_sockets:: +shutdown() +{ + std::lock_guard lock(mutex_); + + while (auto* impl = socket_list_.pop_front()) + impl->close_socket(); + + while (auto* impl = acceptor_list_.pop_front()) + impl->close_socket(); + + socket_ptrs_.clear(); + acceptor_ptrs_.clear(); +} + +epoll_socket_impl& +epoll_sockets:: +create_impl() +{ + auto impl = std::make_shared(*this); + auto* raw = impl.get(); + + { + std::lock_guard lock(mutex_); + socket_list_.push_back(raw); + socket_ptrs_.emplace(raw, std::move(impl)); + } + + return *raw; +} + +void +epoll_sockets:: +destroy_impl(epoll_socket_impl& impl) +{ + std::lock_guard lock(mutex_); + socket_list_.remove(&impl); + socket_ptrs_.erase(&impl); +} + +system::error_code +epoll_sockets:: +open_socket(epoll_socket_impl& impl) +{ + impl.close_socket(); + + int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (fd < 0) + return make_err(errno); + + impl.fd_ = fd; + return {}; +} + +epoll_acceptor_impl& +epoll_sockets:: +create_acceptor_impl() +{ + auto impl = std::make_shared(*this); + auto* raw = impl.get(); + + { + std::lock_guard lock(mutex_); + acceptor_list_.push_back(raw); + acceptor_ptrs_.emplace(raw, std::move(impl)); + } + + return *raw; +} + +void +epoll_sockets:: +destroy_acceptor_impl(epoll_acceptor_impl& impl) +{ + std::lock_guard lock(mutex_); + acceptor_list_.remove(&impl); + acceptor_ptrs_.erase(&impl); +} + +system::error_code +epoll_sockets:: +open_acceptor( + epoll_acceptor_impl& impl, + endpoint ep, + int backlog) +{ + impl.close_socket(); + + int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (fd < 0) + return make_err(errno); + + int reuse = 1; + ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + + sockaddr_in addr = detail::to_sockaddr_in(ep); + if (::bind(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) + { + int errn = errno; + ::close(fd); + return make_err(errn); + } + + if (::listen(fd, backlog) < 0) + { + int errn = errno; + ::close(fd); + return make_err(errn); + } + + impl.fd_ = fd; + return {}; +} + +void +epoll_sockets:: +post(epoll_op* op) +{ + sched_.post(op); +} + +void +epoll_sockets:: +work_started() noexcept +{ + sched_.work_started(); +} + +void +epoll_sockets:: +work_finished() noexcept +{ + sched_.work_finished(); +} + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/corosio/src/detail/epoll/sockets.hpp b/src/corosio/src/detail/epoll/sockets.hpp index fb211b3..05129e4 100644 --- a/src/corosio/src/detail/epoll/sockets.hpp +++ b/src/corosio/src/detail/epoll/sockets.hpp @@ -24,21 +24,10 @@ #include "src/detail/epoll/op.hpp" #include "src/detail/epoll/scheduler.hpp" -#include "src/detail/endpoint_convert.hpp" -#include "src/detail/make_err.hpp" -#include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include +#include /* epoll Socket Implementation @@ -71,8 +60,9 @@ Impl Lifetime with shared_ptr ----------------------------- Socket and acceptor impls use enable_shared_from_this. The service owns - impls via shared_ptr vectors (socket_ptrs_, acceptor_ptrs_). When a user - calls close(), we call cancel() which posts pending ops to the scheduler. + impls via shared_ptr maps (socket_ptrs_, acceptor_ptrs_) keyed by raw + pointer for O(1) lookup and removal. When a user calls close(), we call + cancel() which posts pending ops to the scheduler. CRITICAL: The posted ops must keep the impl alive until they complete. Otherwise the scheduler would process a freed op (use-after-free). The @@ -81,8 +71,7 @@ to be destroyed if no other references exist. The intrusive_list (socket_list_, acceptor_list_) provides fast iteration - for shutdown cleanup. It stores raw pointers alongside the shared_ptr - ownership in the vectors. + for shutdown cleanup alongside the shared_ptr ownership in the maps. Service Ownership ----------------- @@ -137,21 +126,7 @@ class epoll_socket_impl system::error_code*, std::size_t*) override; - system::error_code shutdown(socket::shutdown_type what) noexcept override - { - int how; - switch (what) - { - case socket::shutdown_receive: how = SHUT_RD; break; - case socket::shutdown_send: how = SHUT_WR; break; - case socket::shutdown_both: how = SHUT_RDWR; break; - default: - return make_err(EINVAL); - } - if (::shutdown(fd_, how) != 0) - return make_err(errno); - return {}; - } + system::error_code shutdown(socket::shutdown_type what) noexcept override; int native_handle() const noexcept { return fd_; } bool is_open() const noexcept { return fd_ >= 0; } @@ -237,505 +212,12 @@ class epoll_sockets epoll_scheduler& sched_; std::mutex mutex_; - // Dual tracking: intrusive_list for fast shutdown iteration, - // vectors for shared_ptr ownership. See "Impl Lifetime" in file header. intrusive_list socket_list_; intrusive_list acceptor_list_; - std::vector> socket_ptrs_; - std::vector> acceptor_ptrs_; + std::unordered_map> socket_ptrs_; + std::unordered_map> acceptor_ptrs_; }; -//------------------------------------------------------------------------------ -// epoll_socket_impl implementation -//------------------------------------------------------------------------------ - -inline -epoll_socket_impl:: -epoll_socket_impl(epoll_sockets& svc) noexcept - : svc_(svc) -{ -} - -inline void -epoll_socket_impl:: -release() -{ - close_socket(); - svc_.destroy_impl(*this); -} - -inline void -epoll_socket_impl:: -connect( - std::coroutine_handle<> h, - capy::executor_ref d, - endpoint ep, - std::stop_token token, - system::error_code* ec) -{ - auto& op = conn_; - op.reset(); - op.h = h; - op.d = d; - op.ec_out = ec; - op.fd = fd_; - op.start(token); - - sockaddr_in addr = detail::to_sockaddr_in(ep); - int result = ::connect(fd_, reinterpret_cast(&addr), sizeof(addr)); - - if (result == 0) - { - op.complete(0, 0); - svc_.post(&op); - return; - } - - if (errno == EINPROGRESS) - { - svc_.work_started(); - op.registered.store(true, std::memory_order_release); - svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); - return; - } - - op.complete(errno, 0); - svc_.post(&op); -} - -inline void -epoll_socket_impl:: -read_some( - std::coroutine_handle<> h, - capy::executor_ref d, - io_buffer_param param, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_out) -{ - auto& op = rd_; - op.reset(); - op.h = h; - op.d = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.fd = fd_; - op.start(token); - - capy::mutable_buffer bufs[epoll_read_op::max_buffers]; - op.iovec_count = static_cast(param.copy_to(bufs, epoll_read_op::max_buffers)); - - if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) - { - op.empty_buffer_read = true; - op.complete(0, 0); - svc_.post(&op); - return; - } - - for (int i = 0; i < op.iovec_count; ++i) - { - op.iovecs[i].iov_base = bufs[i].data(); - op.iovecs[i].iov_len = bufs[i].size(); - } - - ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count); - - if (n > 0) - { - op.complete(0, static_cast(n)); - svc_.post(&op); - return; - } - - if (n == 0) - { - op.complete(0, 0); - svc_.post(&op); - return; - } - - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - svc_.work_started(); - op.registered.store(true, std::memory_order_release); - svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); - return; - } - - op.complete(errno, 0); - svc_.post(&op); -} - -inline void -epoll_socket_impl:: -write_some( - std::coroutine_handle<> h, - capy::executor_ref d, - io_buffer_param param, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_out) -{ - auto& op = wr_; - op.reset(); - op.h = h; - op.d = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.fd = fd_; - op.start(token); - - capy::mutable_buffer bufs[epoll_write_op::max_buffers]; - op.iovec_count = static_cast(param.copy_to(bufs, epoll_write_op::max_buffers)); - - if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) - { - op.complete(0, 0); - svc_.post(&op); - return; - } - - for (int i = 0; i < op.iovec_count; ++i) - { - op.iovecs[i].iov_base = bufs[i].data(); - op.iovecs[i].iov_len = bufs[i].size(); - } - - msghdr msg{}; - msg.msg_iov = op.iovecs; - msg.msg_iovlen = static_cast(op.iovec_count); - - ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL); - - if (n > 0) - { - op.complete(0, static_cast(n)); - svc_.post(&op); - return; - } - - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - svc_.work_started(); - op.registered.store(true, std::memory_order_release); - svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); - return; - } - - op.complete(errno ? errno : EIO, 0); - svc_.post(&op); -} - -inline void -epoll_socket_impl:: -cancel() noexcept -{ - std::shared_ptr self; - try { - self = shared_from_this(); - } catch (const std::bad_weak_ptr&) { - return; // Not yet managed by shared_ptr (during construction) - } - - auto cancel_op = [this, &self](epoll_op& op) { - bool was_registered = op.registered.exchange(false, std::memory_order_acq_rel); - op.request_cancel(); - if (was_registered) - { - svc_.scheduler().unregister_fd(fd_); - op.impl_ptr = self; // prevent use-after-free - svc_.post(&op); - svc_.work_finished(); - } - }; - - cancel_op(conn_); - cancel_op(rd_); - cancel_op(wr_); -} - -inline void -epoll_socket_impl:: -close_socket() noexcept -{ - cancel(); - - if (fd_ >= 0) - { - svc_.scheduler().unregister_fd(fd_); - ::close(fd_); - fd_ = -1; - } -} - -//------------------------------------------------------------------------------ -// epoll_acceptor_impl implementation -//------------------------------------------------------------------------------ - -inline -epoll_acceptor_impl:: -epoll_acceptor_impl(epoll_sockets& svc) noexcept - : svc_(svc) -{ -} - -inline void -epoll_acceptor_impl:: -release() -{ - close_socket(); - svc_.destroy_acceptor_impl(*this); -} - -inline void -epoll_acceptor_impl:: -accept( - std::coroutine_handle<> h, - capy::executor_ref d, - std::stop_token token, - system::error_code* ec, - io_object::io_object_impl** impl_out) -{ - auto& op = acc_; - op.reset(); - op.h = h; - op.d = d; - op.ec_out = ec; - op.impl_out = impl_out; - op.fd = fd_; - op.start(token); - - // Needed for deferred peer creation when accept completes via epoll path - op.service_ptr = &svc_; - op.create_peer = [](void* svc_ptr, int new_fd) -> io_object::io_object_impl* { - auto& svc = *static_cast(svc_ptr); - auto& peer_impl = svc.create_impl(); - peer_impl.set_socket(new_fd); - return &peer_impl; - }; - - sockaddr_in addr{}; - socklen_t addrlen = sizeof(addr); - int accepted = ::accept4(fd_, reinterpret_cast(&addr), - &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); - - if (accepted >= 0) - { - auto& peer_impl = svc_.create_impl(); - peer_impl.set_socket(accepted); - op.accepted_fd = accepted; - op.peer_impl = &peer_impl; - op.complete(0, 0); - svc_.post(&op); - return; - } - - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - svc_.work_started(); - op.registered.store(true, std::memory_order_release); - svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); - return; - } - - op.complete(errno, 0); - svc_.post(&op); -} - -inline void -epoll_acceptor_impl:: -cancel() noexcept -{ - bool was_registered = acc_.registered.exchange(false, std::memory_order_acq_rel); - acc_.request_cancel(); - - if (was_registered) - { - svc_.scheduler().unregister_fd(fd_); - try { - acc_.impl_ptr = shared_from_this(); // prevent use-after-free - } catch (const std::bad_weak_ptr&) {} - svc_.post(&acc_); - svc_.work_finished(); - } -} - -inline void -epoll_acceptor_impl:: -close_socket() noexcept -{ - cancel(); - - if (fd_ >= 0) - { - svc_.scheduler().unregister_fd(fd_); - ::close(fd_); - fd_ = -1; - } -} - -//------------------------------------------------------------------------------ -// epoll_sockets implementation -//------------------------------------------------------------------------------ - -inline -epoll_sockets:: -epoll_sockets(capy::execution_context& ctx) - : sched_(ctx.use_service()) -{ -} - -inline -epoll_sockets:: -~epoll_sockets() -{ -} - -inline void -epoll_sockets:: -shutdown() -{ - std::lock_guard lock(mutex_); - - while (auto* impl = socket_list_.pop_front()) - impl->close_socket(); - - while (auto* impl = acceptor_list_.pop_front()) - impl->close_socket(); - - // Impls may outlive this if in-flight ops hold impl_ptr refs - socket_ptrs_.clear(); - acceptor_ptrs_.clear(); -} - -inline epoll_socket_impl& -epoll_sockets:: -create_impl() -{ - auto impl = std::make_shared(*this); - - { - std::lock_guard lock(mutex_); - socket_list_.push_back(impl.get()); - socket_ptrs_.push_back(impl); - } - - return *impl; -} - -inline void -epoll_sockets:: -destroy_impl(epoll_socket_impl& impl) -{ - std::lock_guard lock(mutex_); - socket_list_.remove(&impl); - - // Impl may outlive this if pending ops hold impl_ptr refs - auto it = std::find_if(socket_ptrs_.begin(), socket_ptrs_.end(), - [&impl](const auto& ptr) { return ptr.get() == &impl; }); - if (it != socket_ptrs_.end()) - socket_ptrs_.erase(it); -} - -inline system::error_code -epoll_sockets:: -open_socket(epoll_socket_impl& impl) -{ - impl.close_socket(); - - int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); - if (fd < 0) - return make_err(errno); - - impl.fd_ = fd; - return {}; -} - -inline epoll_acceptor_impl& -epoll_sockets:: -create_acceptor_impl() -{ - auto impl = std::make_shared(*this); - - { - std::lock_guard lock(mutex_); - acceptor_list_.push_back(impl.get()); - acceptor_ptrs_.push_back(impl); - } - - return *impl; -} - -inline void -epoll_sockets:: -destroy_acceptor_impl(epoll_acceptor_impl& impl) -{ - std::lock_guard lock(mutex_); - acceptor_list_.remove(&impl); - - auto it = std::find_if(acceptor_ptrs_.begin(), acceptor_ptrs_.end(), - [&impl](const auto& ptr) { return ptr.get() == &impl; }); - if (it != acceptor_ptrs_.end()) - acceptor_ptrs_.erase(it); -} - -inline system::error_code -epoll_sockets:: -open_acceptor( - epoll_acceptor_impl& impl, - endpoint ep, - int backlog) -{ - impl.close_socket(); - - int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); - if (fd < 0) - return make_err(errno); - - int reuse = 1; - ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - - sockaddr_in addr = detail::to_sockaddr_in(ep); - if (::bind(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) - { - int errn = errno; - ::close(fd); - return make_err(errn); - } - - if (::listen(fd, backlog) < 0) - { - int errn = errno; - ::close(fd); - return make_err(errn); - } - - impl.fd_ = fd; - return {}; -} - -inline void -epoll_sockets:: -post(epoll_op* op) -{ - sched_.post(op); -} - -inline void -epoll_sockets:: -work_started() noexcept -{ - sched_.work_started(); -} - -inline void -epoll_sockets:: -work_finished() noexcept -{ - sched_.work_finished(); -} - } // namespace detail } // namespace corosio } // namespace boost diff --git a/test/unit/io_context.cpp b/test/unit/io_context.cpp index 2f3cf05..79ea190 100644 --- a/test/unit/io_context.cpp +++ b/test/unit/io_context.cpp @@ -12,8 +12,11 @@ #include -#include +#include #include +#include +#include +#include #include "test_suite.hpp" @@ -56,6 +59,42 @@ inline counter_coro make_coro(int& counter) return c; } +// Coroutine that increments an atomic counter when resumed +struct atomic_counter_coro +{ + struct promise_type + { + std::atomic* counter_ = nullptr; + + atomic_counter_coro get_return_object() + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + + void return_void() + { + if (counter_) + counter_->fetch_add(1, std::memory_order_relaxed); + } + + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h; + + operator capy::coro() const { return h; } +}; + +inline atomic_counter_coro make_atomic_coro(std::atomic& counter) +{ + auto c = []() -> atomic_counter_coro { co_return; }(); + c.h.promise().counter_ = &counter; + return c; +} + // Coroutine that checks running_in_this_thread when resumed struct check_coro { @@ -345,6 +384,79 @@ struct io_context_test BOOST_TEST(during == true); } + void + testMultithreaded() + { + io_context ioc; + auto ex = ioc.get_executor(); + std::atomic counter{0}; + constexpr int num_threads = 4; + constexpr int handlers_per_thread = 100; + constexpr int total_handlers = num_threads * handlers_per_thread; + + // Post handlers from multiple threads concurrently + std::vector posters; + for (int t = 0; t < num_threads; ++t) + { + posters.emplace_back([&ex, &counter]() { + for (int i = 0; i < handlers_per_thread; ++i) + ex.post(make_atomic_coro(counter)); + }); + } + + // Wait for all posters to finish + for (auto& t : posters) + t.join(); + + // Run with multiple threads + std::vector runners; + for (int t = 0; t < num_threads; ++t) + runners.emplace_back([&ioc]() { ioc.run(); }); + + // Wait for all runners to complete + for (auto& t : runners) + t.join(); + + BOOST_TEST(counter.load() == total_handlers); + } + + void + testMultithreadedStress() + { + // Stress test: multiple iterations of post-then-run with multiple threads + constexpr int iterations = 10; + constexpr int num_threads = 4; + constexpr int handlers_per_iteration = 100; + + for (int iter = 0; iter < iterations; ++iter) + { + io_context ioc; + auto ex = ioc.get_executor(); + std::atomic counter{0}; + + // Post all handlers first + for (int i = 0; i < handlers_per_iteration; ++i) + ex.post(make_atomic_coro(counter)); + + // Run with multiple threads + std::vector runners; + for (int t = 0; t < num_threads; ++t) + runners.emplace_back([&ioc]() { ioc.run(); }); + + for (auto& t : runners) + t.join(); + + auto count = counter.load(); + if (count != handlers_per_iteration) + { + std::ostringstream ss; + ss << "iteration " << iter << ": counter=" << count + << ", expected=" << handlers_per_iteration; + BOOST_ERROR(ss.str().c_str()); + } + } + } + void run() { @@ -359,6 +471,8 @@ struct io_context_test testRunOneUntil(); testRunFor(); testExecutorRunningInThisThread(); + testMultithreaded(); + testMultithreadedStress(); } };