libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

83.3% Lines (414/497) 93.8% Functions (45/48) 70.6% Branches (214/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105 int inline_budget;
106
107 165 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 165 : key(k)
109 165 , next(n)
110 165 , private_outstanding_work(0)
111 165 , inline_budget(0)
112 {
113 165 }
114 };
115
116 namespace {
117
118 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
119
120 struct thread_context_guard
121 {
122 scheduler_context frame_;
123
124 165 explicit thread_context_guard(
125 epoll_scheduler const* ctx) noexcept
126 165 : frame_(ctx, context_stack.get())
127 {
128 165 context_stack.set(&frame_);
129 165 }
130
131 165 ~thread_context_guard() noexcept
132 {
133
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 165 times.
165 if (!frame_.private_queue.empty())
134 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
135 165 context_stack.set(frame_.next);
136 165 }
137 };
138
139 scheduler_context*
140 432514 find_context(epoll_scheduler const* self) noexcept
141 {
142
2/2
✓ Branch 1 taken 430864 times.
✓ Branch 2 taken 1650 times.
432514 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
143
1/2
✓ Branch 0 taken 430864 times.
✗ Branch 1 not taken.
430864 if (c->key == self)
144 430864 return c;
145 1650 return nullptr;
146 }
147
148 } // namespace
149
150 void
151 79629 epoll_scheduler::
152 reset_inline_budget() const noexcept
153 {
154
1/2
✓ Branch 1 taken 79629 times.
✗ Branch 2 not taken.
79629 if (auto* ctx = find_context(this))
155 79629 ctx->inline_budget = max_inline_budget_;
156 79629 }
157
158 bool
159 221828 epoll_scheduler::
160 try_consume_inline_budget() const noexcept
161 {
162
1/2
✓ Branch 1 taken 221828 times.
✗ Branch 2 not taken.
221828 if (auto* ctx = find_context(this))
163 {
164
2/2
✓ Branch 0 taken 147953 times.
✓ Branch 1 taken 73875 times.
221828 if (ctx->inline_budget > 0)
165 {
166 147953 --ctx->inline_budget;
167 147953 return true;
168 }
169 }
170 73875 return false;
171 }
172
173 void
174 54897 descriptor_state::
175 operator()()
176 {
177 54897 is_enqueued_.store(false, std::memory_order_relaxed);
178
179 // Take ownership of impl ref set by close_socket() to prevent
180 // the owning impl from being freed while we're executing
181 54897 auto prevent_impl_destruction = std::move(impl_ref_);
182
183 54897 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54897 times.
54897 if (ev == 0)
185 {
186 scheduler_->compensating_work_started();
187 return;
188 }
189
190 54897 op_queue local_ops;
191
192 54897 int err = 0;
193
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 54896 times.
54897 if (ev & EPOLLERR)
194 {
195 1 socklen_t len = sizeof(err);
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
197 err = errno;
198
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
199 1 err = EIO;
200 }
201
202 {
203
1/1
✓ Branch 1 taken 54897 times.
54897 std::lock_guard lock(mutex);
204
2/2
✓ Branch 0 taken 15124 times.
✓ Branch 1 taken 39773 times.
54897 if (ev & EPOLLIN)
205 {
206
2/2
✓ Branch 0 taken 2821 times.
✓ Branch 1 taken 12303 times.
15124 if (read_op)
207 {
208 2821 auto* rd = read_op;
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (err)
210 rd->complete(err, 0);
211 else
212 2821 rd->perform_io();
213
214
2/4
✓ Branch 0 taken 2821 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2821 times.
2821 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
215 {
216 rd->errn = 0;
217 }
218 else
219 {
220 2821 read_op = nullptr;
221 2821 local_ops.push(rd);
222 }
223 }
224 else
225 {
226 12303 read_ready = true;
227 }
228 }
229
2/2
✓ Branch 0 taken 52127 times.
✓ Branch 1 taken 2770 times.
54897 if (ev & EPOLLOUT)
230 {
231
3/4
✓ Branch 0 taken 49354 times.
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49354 times.
52127 bool had_write_op = (connect_op || write_op);
232
2/2
✓ Branch 0 taken 2773 times.
✓ Branch 1 taken 49354 times.
52127 if (connect_op)
233 {
234 2773 auto* cn = connect_op;
235
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2773 times.
2773 if (err)
236 cn->complete(err, 0);
237 else
238 2773 cn->perform_io();
239 2773 connect_op = nullptr;
240 2773 local_ops.push(cn);
241 }
242
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 52127 times.
52127 if (write_op)
243 {
244 auto* wr = write_op;
245 if (err)
246 wr->complete(err, 0);
247 else
248 wr->perform_io();
249
250 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
251 {
252 wr->errn = 0;
253 }
254 else
255 {
256 write_op = nullptr;
257 local_ops.push(wr);
258 }
259 }
260
2/2
✓ Branch 0 taken 49354 times.
✓ Branch 1 taken 2773 times.
52127 if (!had_write_op)
261 49354 write_ready = true;
262 }
263
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 54896 times.
54897 if (err)
264 {
265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
266 {
267 read_op->complete(err, 0);
268 local_ops.push(std::exchange(read_op, nullptr));
269 }
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
271 {
272 write_op->complete(err, 0);
273 local_ops.push(std::exchange(write_op, nullptr));
274 }
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
276 {
277 connect_op->complete(err, 0);
278 local_ops.push(std::exchange(connect_op, nullptr));
279 }
280 }
281 54897 }
282
283 // Execute first handler inline — the scheduler's work_cleanup
284 // accounts for this as the "consumed" work item
285 54897 scheduler_op* first = local_ops.pop();
286
2/2
✓ Branch 0 taken 5594 times.
✓ Branch 1 taken 49303 times.
54897 if (first)
287 {
288
1/1
✓ Branch 1 taken 5594 times.
5594 scheduler_->post_deferred_completions(local_ops);
289
1/1
✓ Branch 1 taken 5594 times.
5594 (*first)();
290 }
291 else
292 {
293 49303 scheduler_->compensating_work_started();
294 }
295 54897 }
296
297 190 epoll_scheduler::
298 epoll_scheduler(
299 capy::execution_context& ctx,
300 190 int)
301 190 : epoll_fd_(-1)
302 190 , event_fd_(-1)
303 190 , timer_fd_(-1)
304 190 , outstanding_work_(0)
305 190 , stopped_(false)
306 190 , shutdown_(false)
307 190 , task_running_{false}
308 190 , task_interrupted_(false)
309 380 , state_(0)
310 {
311 190 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (epoll_fd_ < 0)
313 detail::throw_system_error(make_err(errno), "epoll_create1");
314
315 190 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
316
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (event_fd_ < 0)
317 {
318 int errn = errno;
319 ::close(epoll_fd_);
320 detail::throw_system_error(make_err(errn), "eventfd");
321 }
322
323 190 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 190 times.
190 if (timer_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "timerfd_create");
330 }
331
332 190 epoll_event ev{};
333 190 ev.events = EPOLLIN | EPOLLET;
334 190 ev.data.ptr = nullptr;
335
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 190 times.
190 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
336 {
337 int errn = errno;
338 ::close(timer_fd_);
339 ::close(event_fd_);
340 ::close(epoll_fd_);
341 detail::throw_system_error(make_err(errn), "epoll_ctl");
342 }
343
344 190 epoll_event timer_ev{};
345 190 timer_ev.events = EPOLLIN | EPOLLERR;
346 190 timer_ev.data.ptr = &timer_fd_;
347
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 190 times.
190 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
348 {
349 int errn = errno;
350 ::close(timer_fd_);
351 ::close(event_fd_);
352 ::close(epoll_fd_);
353 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
354 }
355
356
1/1
✓ Branch 1 taken 190 times.
190 timer_svc_ = &get_timer_service(ctx, *this);
357
1/1
✓ Branch 3 taken 190 times.
190 timer_svc_->set_on_earliest_changed(
358 timer_service::callback(
359 this,
360 [](void* p) {
361 2965 auto* self = static_cast<epoll_scheduler*>(p);
362 2965 self->timerfd_stale_.store(true, std::memory_order_release);
363
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2965 times.
2965 if (self->task_running_.load(std::memory_order_acquire))
364 self->interrupt_reactor();
365 2965 }));
366
367 // Initialize resolver service
368
1/1
✓ Branch 1 taken 190 times.
190 get_resolver_service(ctx, *this);
369
370 // Initialize signal service
371
1/1
✓ Branch 1 taken 190 times.
190 get_signal_service(ctx, *this);
372
373 // Push task sentinel to interleave reactor runs with handler execution
374 190 completed_ops_.push(&task_op_);
375 190 }
376
377 380 epoll_scheduler::
378 190 ~epoll_scheduler()
379 {
380
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (timer_fd_ >= 0)
381 190 ::close(timer_fd_);
382
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (event_fd_ >= 0)
383 190 ::close(event_fd_);
384
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (epoll_fd_ >= 0)
385 190 ::close(epoll_fd_);
386 380 }
387
388 void
389 190 epoll_scheduler::
390 shutdown()
391 {
392 {
393
1/1
✓ Branch 1 taken 190 times.
190 std::unique_lock lock(mutex_);
394 190 shutdown_ = true;
395
396
2/2
✓ Branch 1 taken 190 times.
✓ Branch 2 taken 190 times.
380 while (auto* h = completed_ops_.pop())
397 {
398
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (h == &task_op_)
399 190 continue;
400 lock.unlock();
401 h->destroy();
402 lock.lock();
403 190 }
404
405 190 signal_all(lock);
406 190 }
407
408 190 outstanding_work_.store(0, std::memory_order_release);
409
410
1/2
✓ Branch 0 taken 190 times.
✗ Branch 1 not taken.
190 if (event_fd_ >= 0)
411 190 interrupt_reactor();
412 190 }
413
414 void
415 4712 epoll_scheduler::
416 post(std::coroutine_handle<> h) const
417 {
418 struct post_handler final
419 : scheduler_op
420 {
421 std::coroutine_handle<> h_;
422
423 explicit
424 4712 post_handler(std::coroutine_handle<> h)
425 4712 : h_(h)
426 {
427 4712 }
428
429 9424 ~post_handler() = default;
430
431 4712 void operator()() override
432 {
433 4712 auto h = h_;
434
1/2
✓ Branch 0 taken 4712 times.
✗ Branch 1 not taken.
4712 delete this;
435
1/1
✓ Branch 1 taken 4712 times.
4712 h.resume();
436 4712 }
437
438 void destroy() override
439 {
440 delete this;
441 }
442 };
443
444
1/1
✓ Branch 1 taken 4712 times.
4712 auto ph = std::make_unique<post_handler>(h);
445
446 // Fast path: same thread posts to private queue
447 // Only count locally; work_cleanup batches to global counter
448
2/2
✓ Branch 1 taken 3088 times.
✓ Branch 2 taken 1624 times.
4712 if (auto* ctx = find_context(this))
449 {
450 3088 ++ctx->private_outstanding_work;
451 3088 ctx->private_queue.push(ph.release());
452 3088 return;
453 }
454
455 // Slow path: cross-thread post requires mutex
456 1624 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
457
458
1/1
✓ Branch 1 taken 1624 times.
1624 std::unique_lock lock(mutex_);
459 1624 completed_ops_.push(ph.release());
460
1/1
✓ Branch 1 taken 1624 times.
1624 wake_one_thread_and_unlock(lock);
461 4712 }
462
463 void
464 77042 epoll_scheduler::
465 post(scheduler_op* h) const
466 {
467 // Fast path: same thread posts to private queue
468 // Only count locally; work_cleanup batches to global counter
469
2/2
✓ Branch 1 taken 77016 times.
✓ Branch 2 taken 26 times.
77042 if (auto* ctx = find_context(this))
470 {
471 77016 ++ctx->private_outstanding_work;
472 77016 ctx->private_queue.push(h);
473 77016 return;
474 }
475
476 // Slow path: cross-thread post requires mutex
477 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
478
479
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
480 26 completed_ops_.push(h);
481
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
482 26 }
483
484 void
485 3510 epoll_scheduler::
486 on_work_started() noexcept
487 {
488 3510 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
489 3510 }
490
491 void
492 3478 epoll_scheduler::
493 on_work_finished() noexcept
494 {
495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3478 times.
6956 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
496 stop();
497 3478 }
498
499 bool
500 148480 epoll_scheduler::
501 running_in_this_thread() const noexcept
502 {
503
2/2
✓ Branch 1 taken 148269 times.
✓ Branch 2 taken 211 times.
148480 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
504
1/2
✓ Branch 0 taken 148269 times.
✗ Branch 1 not taken.
148269 if (c->key == this)
505 148269 return true;
506 211 return false;
507 }
508
509 void
510 34 epoll_scheduler::
511 stop()
512 {
513
1/1
✓ Branch 1 taken 34 times.
34 std::unique_lock lock(mutex_);
514
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 15 times.
34 if (!stopped_)
515 {
516 19 stopped_ = true;
517 19 signal_all(lock);
518
1/1
✓ Branch 1 taken 19 times.
19 interrupt_reactor();
519 }
520 34 }
521
522 bool
523 16 epoll_scheduler::
524 stopped() const noexcept
525 {
526 16 std::unique_lock lock(mutex_);
527 32 return stopped_;
528 16 }
529
530 void
531 49 epoll_scheduler::
532 restart()
533 {
534
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
535 49 stopped_ = false;
536 49 }
537
538 std::size_t
539 176 epoll_scheduler::
540 run()
541 {
542
2/2
✓ Branch 1 taken 25 times.
✓ Branch 2 taken 151 times.
352 if (outstanding_work_.load(std::memory_order_acquire) == 0)
543 {
544
1/1
✓ Branch 1 taken 25 times.
25 stop();
545 25 return 0;
546 }
547
548 151 thread_context_guard ctx(this);
549
1/1
✓ Branch 1 taken 151 times.
151 std::unique_lock lock(mutex_);
550
551 151 std::size_t n = 0;
552 for (;;)
553 {
554
3/3
✓ Branch 1 taken 136787 times.
✓ Branch 3 taken 151 times.
✓ Branch 4 taken 136636 times.
136787 if (!do_one(lock, -1, &ctx.frame_))
555 151 break;
556
1/2
✓ Branch 1 taken 136636 times.
✗ Branch 2 not taken.
136636 if (n != (std::numeric_limits<std::size_t>::max)())
557 136636 ++n;
558
2/2
✓ Branch 1 taken 59504 times.
✓ Branch 2 taken 77132 times.
136636 if (!lock.owns_lock())
559
1/1
✓ Branch 1 taken 59504 times.
59504 lock.lock();
560 }
561 151 return n;
562 151 }
563
564 std::size_t
565 2 epoll_scheduler::
566 run_one()
567 {
568
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
569 {
570 stop();
571 return 0;
572 }
573
574 2 thread_context_guard ctx(this);
575
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
576
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
577 2 }
578
579 std::size_t
580 14 epoll_scheduler::
581 wait_one(long usec)
582 {
583
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
584 {
585
1/1
✓ Branch 1 taken 5 times.
5 stop();
586 5 return 0;
587 }
588
589 9 thread_context_guard ctx(this);
590
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
591
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
592 9 }
593
594 std::size_t
595 2 epoll_scheduler::
596 poll()
597 {
598
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
599 {
600
1/1
✓ Branch 1 taken 1 time.
1 stop();
601 1 return 0;
602 }
603
604 1 thread_context_guard ctx(this);
605
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
606
607 1 std::size_t n = 0;
608 for (;;)
609 {
610
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
611 1 break;
612
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
613 2 ++n;
614
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
615
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
616 }
617 1 return n;
618 1 }
619
620 std::size_t
621 4 epoll_scheduler::
622 poll_one()
623 {
624
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
625 {
626
1/1
✓ Branch 1 taken 2 times.
2 stop();
627 2 return 0;
628 }
629
630 2 thread_context_guard ctx(this);
631
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
632
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
633 2 }
634
635 void
636 5618 epoll_scheduler::
637 register_descriptor(int fd, descriptor_state* desc) const
638 {
639 5618 epoll_event ev{};
640 5618 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
641 5618 ev.data.ptr = desc;
642
643
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5618 times.
5618 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
644 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
645
646 5618 desc->registered_events = ev.events;
647 5618 desc->fd = fd;
648 5618 desc->scheduler_ = this;
649
650
1/1
✓ Branch 1 taken 5618 times.
5618 std::lock_guard lock(desc->mutex);
651 5618 desc->read_ready = false;
652 5618 desc->write_ready = false;
653 5618 }
654
655 void
656 5618 epoll_scheduler::
657 deregister_descriptor(int fd) const
658 {
659 5618 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
660 5618 }
661
662 void
663 5752 epoll_scheduler::
664 work_started() const noexcept
665 {
666 5752 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
667 5752 }
668
669 void
670 10403 epoll_scheduler::
671 work_finished() const noexcept
672 {
673
2/2
✓ Branch 0 taken 149 times.
✓ Branch 1 taken 10254 times.
20806 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
674 {
675 // Last work item completed - wake all threads so they can exit.
676 // signal_all() wakes threads waiting on the condvar.
677 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
678 // Both are needed because they target different blocking mechanisms.
679 149 std::unique_lock lock(mutex_);
680 149 signal_all(lock);
681
5/6
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 146 times.
149 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
682 {
683 3 task_interrupted_ = true;
684 3 lock.unlock();
685 3 interrupt_reactor();
686 }
687 149 }
688 10403 }
689
690 void
691 49303 epoll_scheduler::
692 compensating_work_started() const noexcept
693 {
694 49303 auto* ctx = find_context(this);
695
1/2
✓ Branch 0 taken 49303 times.
✗ Branch 1 not taken.
49303 if (ctx)
696 49303 ++ctx->private_outstanding_work;
697 49303 }
698
699 void
700 epoll_scheduler::
701 drain_thread_queue(op_queue& queue, long count) const
702 {
703 // Note: outstanding_work_ was already incremented when posting
704 std::unique_lock lock(mutex_);
705 completed_ops_.splice(queue);
706 if (count > 0)
707 maybe_unlock_and_signal_one(lock);
708 }
709
710 void
711 5594 epoll_scheduler::
712 post_deferred_completions(op_queue& ops) const
713 {
714
1/2
✓ Branch 1 taken 5594 times.
✗ Branch 2 not taken.
5594 if (ops.empty())
715 5594 return;
716
717 // Fast path: if on scheduler thread, use private queue
718 if (auto* ctx = find_context(this))
719 {
720 ctx->private_queue.splice(ops);
721 return;
722 }
723
724 // Slow path: add to global queue and wake a thread
725 std::unique_lock lock(mutex_);
726 completed_ops_.splice(ops);
727 wake_one_thread_and_unlock(lock);
728 }
729
730 void
731 238 epoll_scheduler::
732 interrupt_reactor() const
733 {
734 // Only write if not already armed to avoid redundant writes
735 238 bool expected = false;
736
2/2
✓ Branch 1 taken 225 times.
✓ Branch 2 taken 13 times.
238 if (eventfd_armed_.compare_exchange_strong(expected, true,
737 std::memory_order_release, std::memory_order_relaxed))
738 {
739 225 std::uint64_t val = 1;
740
1/1
✓ Branch 1 taken 225 times.
225 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
741 }
742 238 }
743
744 void
745 358 epoll_scheduler::
746 signal_all(std::unique_lock<std::mutex>&) const
747 {
748 358 state_ |= 1;
749 358 cond_.notify_all();
750 358 }
751
752 bool
753 1650 epoll_scheduler::
754 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
755 {
756 1650 state_ |= 1;
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1650 times.
1650 if (state_ > 1)
758 {
759 lock.unlock();
760 cond_.notify_one();
761 return true;
762 }
763 1650 return false;
764 }
765
766 void
767 170634 epoll_scheduler::
768 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
769 {
770 170634 state_ |= 1;
771 170634 bool have_waiters = state_ > 1;
772 170634 lock.unlock();
773
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 170634 times.
170634 if (have_waiters)
774 cond_.notify_one();
775 170634 }
776
777 void
778 2 epoll_scheduler::
779 clear_signal() const
780 {
781 2 state_ &= ~std::size_t(1);
782 2 }
783
784 void
785 2 epoll_scheduler::
786 wait_for_signal(std::unique_lock<std::mutex>& lock) const
787 {
788
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 while ((state_ & 1) == 0)
789 {
790 2 state_ += 2;
791 2 cond_.wait(lock);
792 2 state_ -= 2;
793 }
794 2 }
795
796 void
797 epoll_scheduler::
798 wait_for_signal_for(
799 std::unique_lock<std::mutex>& lock,
800 long timeout_us) const
801 {
802 if ((state_ & 1) == 0)
803 {
804 state_ += 2;
805 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
806 state_ -= 2;
807 }
808 }
809
810 void
811 1650 epoll_scheduler::
812 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
813 {
814
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1650 times.
1650 if (maybe_unlock_and_signal_one(lock))
815 return;
816
817
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1624 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1624 times.
1650 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
818 {
819 26 task_interrupted_ = true;
820 26 lock.unlock();
821 26 interrupt_reactor();
822 }
823 else
824 {
825 1624 lock.unlock();
826 }
827 }
828
829 /** RAII guard for handler execution work accounting.
830
831 Handler consumes 1 work item, may produce N new items via fast-path posts.
832 Net change = N - 1:
833 - If N > 1: add (N-1) to global (more work produced than consumed)
834 - If N == 1: net zero, do nothing
835 - If N < 1: call work_finished() (work consumed, may trigger stop)
836
837 Also drains private queue to global for other threads to process.
838 */
839 struct work_cleanup
840 {
841 epoll_scheduler const* scheduler;
842 std::unique_lock<std::mutex>* lock;
843 scheduler_context* ctx;
844
845 136651 ~work_cleanup()
846 {
847
1/2
✓ Branch 0 taken 136651 times.
✗ Branch 1 not taken.
136651 if (ctx)
848 {
849 136651 long produced = ctx->private_outstanding_work;
850
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 136649 times.
136651 if (produced > 1)
851 2 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
852
2/2
✓ Branch 0 taken 10213 times.
✓ Branch 1 taken 126436 times.
136649 else if (produced < 1)
853 10213 scheduler->work_finished();
854 // produced == 1: net zero, handler consumed what it produced
855 136651 ctx->private_outstanding_work = 0;
856
857
2/2
✓ Branch 1 taken 77135 times.
✓ Branch 2 taken 59516 times.
136651 if (!ctx->private_queue.empty())
858 {
859 77135 lock->lock();
860 77135 scheduler->completed_ops_.splice(ctx->private_queue);
861 }
862 }
863 else
864 {
865 // No thread context - slow-path op was already counted globally
866 scheduler->work_finished();
867 }
868 136651 }
869 };
870
871 /** RAII guard for reactor work accounting.
872
873 Reactor only produces work via timer/signal callbacks posting handlers.
874 Unlike handler execution which consumes 1, the reactor consumes nothing.
875 All produced work must be flushed to global counter.
876 */
877 struct task_cleanup
878 {
879 epoll_scheduler const* scheduler;
880 std::unique_lock<std::mutex>* lock;
881 scheduler_context* ctx;
882
883 39797 ~task_cleanup()
884 39797 {
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39797 times.
39797 if (!ctx)
886 return;
887
888
2/2
✓ Branch 0 taken 2965 times.
✓ Branch 1 taken 36832 times.
39797 if (ctx->private_outstanding_work > 0)
889 {
890 2965 scheduler->outstanding_work_.fetch_add(
891 2965 ctx->private_outstanding_work, std::memory_order_relaxed);
892 2965 ctx->private_outstanding_work = 0;
893 }
894
895
2/2
✓ Branch 1 taken 2965 times.
✓ Branch 2 taken 36832 times.
39797 if (!ctx->private_queue.empty())
896 {
897
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2965 times.
2965 if (!lock->owns_lock())
898 lock->lock();
899 2965 scheduler->completed_ops_.splice(ctx->private_queue);
900 }
901 39797 }
902 };
903
904 void
905 5926 epoll_scheduler::
906 update_timerfd() const
907 {
908 5926 auto nearest = timer_svc_->nearest_expiry();
909
910 5926 itimerspec ts{};
911 5926 int flags = 0;
912
913
3/3
✓ Branch 2 taken 5926 times.
✓ Branch 4 taken 5886 times.
✓ Branch 5 taken 40 times.
5926 if (nearest == timer_service::time_point::max())
914 {
915 // No timers - disarm by setting to 0 (relative)
916 }
917 else
918 {
919 5886 auto now = std::chrono::steady_clock::now();
920
3/3
✓ Branch 1 taken 5886 times.
✓ Branch 4 taken 24 times.
✓ Branch 5 taken 5862 times.
5886 if (nearest <= now)
921 {
922 // Use 1ns instead of 0 - zero disarms the timerfd
923 24 ts.it_value.tv_nsec = 1;
924 }
925 else
926 {
927 5862 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
928
1/1
✓ Branch 1 taken 5862 times.
11724 nearest - now).count();
929 5862 ts.it_value.tv_sec = nsec / 1000000000;
930 5862 ts.it_value.tv_nsec = nsec % 1000000000;
931 // Ensure non-zero to avoid disarming if duration rounds to 0
932
3/4
✓ Branch 0 taken 5858 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5858 times.
5862 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
933 ts.it_value.tv_nsec = 1;
934 }
935 }
936
937
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5926 times.
5926 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
938 detail::throw_system_error(make_err(errno), "timerfd_settime");
939 5926 }
940
941 void
942 39797 epoll_scheduler::
943 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
944 {
945
2/2
✓ Branch 0 taken 33983 times.
✓ Branch 1 taken 5814 times.
39797 int timeout_ms = task_interrupted_ ? 0 : -1;
946
947
2/2
✓ Branch 1 taken 5814 times.
✓ Branch 2 taken 33983 times.
39797 if (lock.owns_lock())
948
1/1
✓ Branch 1 taken 5814 times.
5814 lock.unlock();
949
950 39797 task_cleanup on_exit{this, &lock, ctx};
951
952 // Flush deferred timerfd programming before blocking
953
2/2
✓ Branch 1 taken 2961 times.
✓ Branch 2 taken 36836 times.
39797 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
954
1/1
✓ Branch 1 taken 2961 times.
2961 update_timerfd();
955
956 // Event loop runs without mutex held
957 epoll_event events[128];
958
1/1
✓ Branch 1 taken 39797 times.
39797 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
959
960
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 39797 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
39797 if (nfds < 0 && errno != EINTR)
961 detail::throw_system_error(make_err(errno), "epoll_wait");
962
963 39797 bool check_timers = false;
964 39797 op_queue local_ops;
965
966 // Process events without holding the mutex
967
2/2
✓ Branch 0 taken 57897 times.
✓ Branch 1 taken 39797 times.
97694 for (int i = 0; i < nfds; ++i)
968 {
969
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 57862 times.
57897 if (events[i].data.ptr == nullptr)
970 {
971 std::uint64_t val;
972
1/1
✓ Branch 1 taken 35 times.
35 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
973 35 eventfd_armed_.store(false, std::memory_order_relaxed);
974 35 continue;
975 35 }
976
977
2/2
✓ Branch 0 taken 2965 times.
✓ Branch 1 taken 54897 times.
57862 if (events[i].data.ptr == &timer_fd_)
978 {
979 std::uint64_t expirations;
980
1/1
✓ Branch 1 taken 2965 times.
2965 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
981 2965 check_timers = true;
982 2965 continue;
983 2965 }
984
985 // Deferred I/O: just set ready events and enqueue descriptor
986 // No per-descriptor mutex locking in reactor hot path!
987 54897 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
988 54897 desc->add_ready_events(events[i].events);
989
990 // Only enqueue if not already enqueued
991 54897 bool expected = false;
992
1/2
✓ Branch 1 taken 54897 times.
✗ Branch 2 not taken.
54897 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
993 std::memory_order_release, std::memory_order_relaxed))
994 {
995 54897 local_ops.push(desc);
996 }
997 }
998
999 // Process timers only when timerfd fires
1000
2/2
✓ Branch 0 taken 2965 times.
✓ Branch 1 taken 36832 times.
39797 if (check_timers)
1001 {
1002
1/1
✓ Branch 1 taken 2965 times.
2965 timer_svc_->process_expired();
1003
1/1
✓ Branch 1 taken 2965 times.
2965 update_timerfd();
1004 }
1005
1006
1/1
✓ Branch 1 taken 39797 times.
39797 lock.lock();
1007
1008
2/2
✓ Branch 1 taken 33570 times.
✓ Branch 2 taken 6227 times.
39797 if (!local_ops.empty())
1009 33570 completed_ops_.splice(local_ops);
1010 39797 }
1011
1012 std::size_t
1013 136803 epoll_scheduler::
1014 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1015 {
1016 for (;;)
1017 {
1018
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 176599 times.
176602 if (stopped_)
1019 3 return 0;
1020
1021 176599 scheduler_op* op = completed_ops_.pop();
1022
1023 // Handle reactor sentinel - time to poll for I/O
1024
2/2
✓ Branch 0 taken 39941 times.
✓ Branch 1 taken 136658 times.
176599 if (op == &task_op_)
1025 {
1026 39941 bool more_handlers = !completed_ops_.empty();
1027
1028 // Nothing to run the reactor for: no pending work to wait on,
1029 // or caller requested a non-blocking poll
1030
4/4
✓ Branch 0 taken 5958 times.
✓ Branch 1 taken 33983 times.
✓ Branch 2 taken 144 times.
✓ Branch 3 taken 39797 times.
45899 if (!more_handlers &&
1031
3/4
✓ Branch 1 taken 5814 times.
✓ Branch 2 taken 144 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5814 times.
11916 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1032 timeout_us == 0))
1033 {
1034 144 completed_ops_.push(&task_op_);
1035 144 return 0;
1036 }
1037
1038
3/4
✓ Branch 0 taken 5814 times.
✓ Branch 1 taken 33983 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5814 times.
39797 task_interrupted_ = more_handlers || timeout_us == 0;
1039 39797 task_running_.store(true, std::memory_order_release);
1040
1041
2/2
✓ Branch 0 taken 33983 times.
✓ Branch 1 taken 5814 times.
39797 if (more_handlers)
1042 33983 unlock_and_signal_one(lock);
1043
1044 39797 run_task(lock, ctx);
1045
1046 39797 task_running_.store(false, std::memory_order_relaxed);
1047 39797 completed_ops_.push(&task_op_);
1048 39797 continue;
1049 39797 }
1050
1051 // Handle operation
1052
2/2
✓ Branch 0 taken 136651 times.
✓ Branch 1 taken 7 times.
136658 if (op != nullptr)
1053 {
1054
1/2
✓ Branch 1 taken 136651 times.
✗ Branch 2 not taken.
136651 if (!completed_ops_.empty())
1055
1/1
✓ Branch 1 taken 136651 times.
136651 unlock_and_signal_one(lock);
1056 else
1057 lock.unlock();
1058
1059 136651 work_cleanup on_exit{this, &lock, ctx};
1060
1061
1/1
✓ Branch 1 taken 136651 times.
136651 (*op)();
1062 136651 return 1;
1063 136651 }
1064
1065 // No pending work to wait on, or caller requested non-blocking poll
1066
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 2 times.
14 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1067 timeout_us == 0)
1068 5 return 0;
1069
1070 2 clear_signal();
1071
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (timeout_us < 0)
1072 2 wait_for_signal(lock);
1073 else
1074 wait_for_signal_for(lock, timeout_us);
1075 39799 }
1076 }
1077
1078 } // namespace boost::corosio::detail
1079
1080 #endif
1081