libs/capy/include/boost/capy/ex/async_event.hpp

100.0% Lines (68/68) 100.0% Functions (13/13) 94.4% Branches (17/18)
libs/capy/include/boost/capy/ex/async_event.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 #define BOOST_CAPY_ASYNC_EVENT_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <boost/capy/io_result.hpp>
20
21 #include <stop_token>
22
23 #include <atomic>
24 #include <coroutine>
25 #include <new>
26 #include <utility>
27
28 /* async_event implementation notes
29 =================================
30
31 Same cancellation pattern as async_mutex (see that file for the
32 full discussion on claimed_, stop_cb lifetime, member ordering,
33 and threading assumptions).
34
35 Key difference: set() wakes ALL waiters (broadcast), not one.
36 It pops every waiter from the list and dispatches the ones it
37 claims. Waiters already claimed by a stop callback are skipped.
38
39 Because set() pops all waiters, a canceled waiter may have been
40 removed from the list by set() before its await_resume runs.
41 This requires a separate in_list_ flag (unlike async_mutex where
42 active_ served double duty). await_resume only calls remove()
43 when in_list_ is true.
44 */
45
46 namespace boost {
47 namespace capy {
48
49 /** An asynchronous event for coroutines.
50
51 This event provides a way to notify multiple coroutines that some
52 condition has occurred. When a coroutine awaits an unset event, it
53 suspends and is added to a wait queue. When the event is set, all
54 waiting coroutines are resumed.
55
56 @par Cancellation
57
58 When a coroutine is suspended waiting for the event and its stop
59 token is triggered, the waiter completes with `error::canceled`
60 instead of waiting for `set()`.
61
62 Cancellation only applies while the coroutine is suspended in the
63 wait queue. If the event is already set when `wait()` is called,
64 the wait completes immediately even if the stop token is already
65 signaled.
66
67 @par Zero Allocation
68
69 No heap allocation occurs for wait operations.
70
71 @par Thread Safety
72
73 The event operations are designed for single-threaded use on one
74 executor. The stop callback may fire from any thread.
75
76 @par Example
77 @code
78 async_event event;
79
80 task<> waiter() {
81 auto [ec] = co_await event.wait();
82 if(ec)
83 co_return;
84 // ... event was set ...
85 }
86
87 task<> notifier() {
88 // ... do some work ...
89 event.set(); // Wake all waiters
90 }
91 @endcode
92 */
93 class async_event
94 {
95 public:
96 class wait_awaiter;
97
98 private:
99 bool set_ = false;
100 detail::intrusive_list<wait_awaiter> waiters_;
101
102 public:
103 /** Awaiter returned by wait().
104 */
105 class wait_awaiter
106 : public detail::intrusive_list<wait_awaiter>::node
107 {
108 friend class async_event;
109
110 async_event* e_;
111 std::coroutine_handle<> h_;
112 executor_ref ex_;
113
114 // Declared before stop_cb_buf_: the callback
115 // accesses these members, so they must still be
116 // alive if the stop_cb_ destructor blocks.
117 std::atomic<bool> claimed_{false};
118 bool canceled_ = false;
119 bool active_ = false;
120 bool in_list_ = false;
121
122 struct cancel_fn
123 {
124 wait_awaiter* self_;
125
126 20 void operator()() const noexcept
127 {
128
2/2
✓ Branch 1 taken 19 times.
✓ Branch 2 taken 1 time.
20 if(!self_->claimed_.exchange(
129 true, std::memory_order_acq_rel))
130 {
131 19 self_->canceled_ = true;
132 19 self_->ex_.dispatch(self_->h_);
133 }
134 20 }
135 };
136
137 using stop_cb_t =
138 std::stop_callback<cancel_fn>;
139
140 // Aligned storage for stop_cb_t. Declared last:
141 // its destructor may block while the callback
142 // accesses the members above.
143 #ifdef _MSC_VER
144 # pragma warning(push)
145 # pragma warning(disable: 4324) // padded due to alignas
146 #endif
147 alignas(stop_cb_t)
148 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
149 #ifdef _MSC_VER
150 # pragma warning(pop)
151 #endif
152
153 31 stop_cb_t& stop_cb_() noexcept
154 {
155 return *reinterpret_cast<stop_cb_t*>(
156 31 stop_cb_buf_);
157 }
158
159 public:
160 235 ~wait_awaiter()
161 {
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 234 times.
235 if(active_)
163 1 stop_cb_().~stop_cb_t();
164
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 234 times.
235 if(in_list_)
165 1 e_->waiters_.remove(this);
166 235 }
167
168 51 explicit wait_awaiter(async_event* e) noexcept
169 51 : e_(e)
170 {
171 51 }
172
173 184 wait_awaiter(wait_awaiter&& o) noexcept
174 184 : e_(o.e_)
175 184 , h_(o.h_)
176 184 , ex_(o.ex_)
177 184 , claimed_(o.claimed_.load(
178 std::memory_order_relaxed))
179 184 , canceled_(o.canceled_)
180 184 , active_(std::exchange(o.active_, false))
181 184 , in_list_(std::exchange(o.in_list_, false))
182 {
183 184 }
184
185 wait_awaiter(wait_awaiter const&) = delete;
186 wait_awaiter& operator=(wait_awaiter const&) = delete;
187 wait_awaiter& operator=(wait_awaiter&&) = delete;
188
189 51 bool await_ready() const noexcept
190 {
191 51 return e_->set_;
192 }
193
194 /** IoAwaitable protocol overload. */
195 std::coroutine_handle<>
196 41 await_suspend(
197 std::coroutine_handle<> h,
198 executor_ref ex,
199 std::stop_token token) noexcept
200 {
201
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 31 times.
41 if(token.stop_requested())
202 {
203 10 canceled_ = true;
204 10 return h;
205 }
206 31 h_ = h;
207 31 ex_ = ex;
208 31 e_->waiters_.push_back(this);
209 31 in_list_ = true;
210 93 ::new(stop_cb_buf_) stop_cb_t(
211 31 token, cancel_fn{this});
212 31 active_ = true;
213 31 return std::noop_coroutine();
214 }
215
216 48 io_result<> await_resume() noexcept
217 {
218
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 18 times.
48 if(active_)
219 {
220 30 stop_cb_().~stop_cb_t();
221 30 active_ = false;
222 }
223
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 19 times.
48 if(canceled_)
224 {
225
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 10 times.
29 if(in_list_)
226 {
227 19 e_->waiters_.remove(this);
228 19 in_list_ = false;
229 }
230 29 return {make_error_code(
231 29 error::canceled)};
232 }
233 19 return {{}};
234 }
235 };
236
237 20 async_event() = default;
238
239 // Non-copyable, non-movable
240 async_event(async_event const&) = delete;
241 async_event& operator=(async_event const&) = delete;
242
243 /** Returns an awaiter that waits until the event is set.
244
245 If the event is already set, completes immediately.
246
247 @return An awaitable yielding `(error_code)`.
248 */
249 51 wait_awaiter wait() noexcept
250 {
251 51 return wait_awaiter{this};
252 }
253
254 /** Sets the event.
255
256 All waiting coroutines are resumed. Canceled waiters
257 are skipped. Subsequent calls to wait() complete
258 immediately until clear() is called.
259 */
260 20 void set() noexcept
261 {
262 20 set_ = true;
263 for(;;)
264 {
265 31 auto* w = waiters_.pop_front();
266
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 11 times.
31 if(!w)
267 20 break;
268 11 w->in_list_ = false;
269
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
11 if(!w->claimed_.exchange(
270 true, std::memory_order_acq_rel))
271 {
272 11 w->ex_.dispatch(w->h_);
273 }
274 11 }
275 20 }
276
277 /** Clears the event.
278
279 Subsequent calls to wait() will suspend until
280 set() is called again.
281 */
282 2 void clear() noexcept
283 {
284 2 set_ = false;
285 2 }
286
287 /** Returns true if the event is currently set.
288 */
289 8 bool is_set() const noexcept
290 {
291 8 return set_;
292 }
293 };
294
295 } // namespace capy
296 } // namespace boost
297
298 #endif
299