libs/corosio/src/corosio/src/detail/epoll/op.hpp

81.4% Lines (79/97) 80.0% Functions (16/20) 50.0% Branches (12/24)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Set during registration only (no mutex needed)
125 std::uint32_t registered_events = 0;
126 int fd = -1;
127
128 // For deferred I/O - set by reactor, read by scheduler
129 std::atomic<std::uint32_t> ready_events_{0};
130 std::atomic<bool> is_enqueued_{false};
131 epoll_scheduler const* scheduler_ = nullptr;
132
133 // Prevents impl destruction while this descriptor_state is queued.
134 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 std::shared_ptr<void> impl_ref_;
136
137 /// Add ready events atomically.
138 54897 void add_ready_events(std::uint32_t ev) noexcept
139 {
140 54897 ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 54897 }
142
143 /// Perform deferred I/O and queue completions.
144 void operator()() override;
145
146 /// Destroy without invoking.
147 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
148 /// the self-referential cycle set by close_socket().
149 void destroy() override { impl_ref_.reset(); }
150 };
151
152 struct epoll_op : scheduler_op
153 {
154 struct canceller
155 {
156 epoll_op* op;
157 void operator()() const noexcept;
158 };
159
160 std::coroutine_handle<> h;
161 capy::executor_ref ex;
162 std::error_code* ec_out = nullptr;
163 std::size_t* bytes_out = nullptr;
164
165 int fd = -1;
166 int errn = 0;
167 std::size_t bytes_transferred = 0;
168
169 std::atomic<bool> cancelled{false};
170 std::optional<std::stop_callback<canceller>> stop_cb;
171
172 // Prevents use-after-free when socket is closed with pending ops.
173 // See "Impl Lifetime Management" in file header.
174 std::shared_ptr<void> impl_ptr;
175
176 // For stop_token cancellation - pointer to owning socket/acceptor impl.
177 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
178 epoll_socket_impl* socket_impl_ = nullptr;
179 epoll_acceptor_impl* acceptor_impl_ = nullptr;
180
181 16732 epoll_op() = default;
182
183 227582 void reset() noexcept
184 {
185 227582 fd = -1;
186 227582 errn = 0;
187 227582 bytes_transferred = 0;
188 227582 cancelled.store(false, std::memory_order_relaxed);
189 227582 impl_ptr.reset();
190 227582 socket_impl_ = nullptr;
191 227582 acceptor_impl_ = nullptr;
192 227582 }
193
194 // Defined in sockets.cpp where epoll_socket_impl is complete
195 void operator()() override;
196
197 36940 virtual bool is_read_operation() const noexcept { return false; }
198 virtual void cancel() noexcept = 0;
199
200 void destroy() override
201 {
202 stop_cb.reset();
203 impl_ptr.reset();
204 }
205
206 25538 void request_cancel() noexcept
207 {
208 25538 cancelled.store(true, std::memory_order_release);
209 25538 }
210
211 76848 void start(std::stop_token token, epoll_socket_impl* impl)
212 {
213 76848 cancelled.store(false, std::memory_order_release);
214 76848 stop_cb.reset();
215 76848 socket_impl_ = impl;
216 76848 acceptor_impl_ = nullptr;
217
218
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 76749 times.
76848 if (token.stop_possible())
219 99 stop_cb.emplace(token, canceller{this});
220 76848 }
221
222 2781 void start(std::stop_token token, epoll_acceptor_impl* impl)
223 {
224 2781 cancelled.store(false, std::memory_order_release);
225 2781 stop_cb.reset();
226 2781 socket_impl_ = nullptr;
227 2781 acceptor_impl_ = impl;
228
229
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2772 times.
2781 if (token.stop_possible())
230 9 stop_cb.emplace(token, canceller{this});
231 2781 }
232
233 79563 void complete(int err, std::size_t bytes) noexcept
234 {
235 79563 errn = err;
236 79563 bytes_transferred = bytes;
237 79563 }
238
239 virtual void perform_io() noexcept {}
240 };
241
242
243 struct epoll_connect_op : epoll_op
244 {
245 endpoint target_endpoint;
246
247 2773 void reset() noexcept
248 {
249 2773 epoll_op::reset();
250 2773 target_endpoint = endpoint{};
251 2773 }
252
253 2773 void perform_io() noexcept override
254 {
255 // connect() completion status is retrieved via SO_ERROR, not return value
256 2773 int err = 0;
257 2773 socklen_t len = sizeof(err);
258
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2773 times.
2773 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
259 err = errno;
260 2773 complete(err, 0);
261 2773 }
262
263 // Defined in sockets.cpp where epoll_socket_impl is complete
264 void operator()() override;
265 void cancel() noexcept override;
266 };
267
268
269 struct epoll_read_op : epoll_op
270 {
271 static constexpr std::size_t max_buffers = 16;
272 iovec iovecs[max_buffers];
273 int iovec_count = 0;
274 bool empty_buffer_read = false;
275
276 36978 bool is_read_operation() const noexcept override
277 {
278 36978 return !empty_buffer_read;
279 }
280
281 111090 void reset() noexcept
282 {
283 111090 epoll_op::reset();
284 111090 iovec_count = 0;
285 111090 empty_buffer_read = false;
286 111090 }
287
288 143 void perform_io() noexcept override
289 {
290 ssize_t n;
291 do {
292 143 n = ::readv(fd, iovecs, iovec_count);
293
3/4
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 51 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 92 times.
143 } while (n < 0 && errno == EINTR);
294
295
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 92 times.
143 if (n >= 0)
296 51 complete(0, static_cast<std::size_t>(n));
297 else
298 92 complete(errno, 0);
299 143 }
300
301 void cancel() noexcept override;
302 };
303
304
305 struct epoll_write_op : epoll_op
306 {
307 static constexpr std::size_t max_buffers = 16;
308 iovec iovecs[max_buffers];
309 int iovec_count = 0;
310
311 110938 void reset() noexcept
312 {
313 110938 epoll_op::reset();
314 110938 iovec_count = 0;
315 110938 }
316
317 void perform_io() noexcept override
318 {
319 msghdr msg{};
320 msg.msg_iov = iovecs;
321 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
322
323 ssize_t n;
324 do {
325 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
326 } while (n < 0 && errno == EINTR);
327
328 if (n >= 0)
329 complete(0, static_cast<std::size_t>(n));
330 else
331 complete(errno, 0);
332 }
333
334 void cancel() noexcept override;
335 };
336
337
338 struct epoll_accept_op : epoll_op
339 {
340 int accepted_fd = -1;
341 io_object::io_object_impl** impl_out = nullptr;
342 sockaddr_in peer_addr{};
343
344 2781 void reset() noexcept
345 {
346 2781 epoll_op::reset();
347 2781 accepted_fd = -1;
348 2781 impl_out = nullptr;
349 2781 peer_addr = {};
350 2781 }
351
352 2770 void perform_io() noexcept override
353 {
354 2770 socklen_t addrlen = sizeof(peer_addr);
355 int new_fd;
356 do {
357 2770 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&peer_addr),
358 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
359
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2770 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2770 } while (new_fd < 0 && errno == EINTR);
360
361
1/2
✓ Branch 0 taken 2770 times.
✗ Branch 1 not taken.
2770 if (new_fd >= 0)
362 {
363 2770 accepted_fd = new_fd;
364 2770 complete(0, 0);
365 }
366 else
367 {
368 complete(errno, 0);
369 }
370 2770 }
371
372 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
373 void operator()() override;
374 void cancel() noexcept override;
375 };
376
377 } // namespace boost::corosio::detail
378
379 #endif // BOOST_COROSIO_HAS_EPOLL
380
381 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
382