Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 : #define BOOST_CAPY_ASYNC_EVENT_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/ex/executor_ref.hpp>
19 : #include <boost/capy/io_result.hpp>
20 :
21 : #include <stop_token>
22 :
23 : #include <atomic>
24 : #include <coroutine>
25 : #include <new>
26 : #include <utility>
27 :
28 : /* async_event implementation notes
29 : =================================
30 :
31 : Same cancellation pattern as async_mutex (see that file for the
32 : full discussion on claimed_, stop_cb lifetime, member ordering,
33 : and threading assumptions).
34 :
35 : Key difference: set() wakes ALL waiters (broadcast), not one.
36 : It pops every waiter from the list and dispatches the ones it
37 : claims. Waiters already claimed by a stop callback are skipped.
38 :
39 : Because set() pops all waiters, a canceled waiter may have been
40 : removed from the list by set() before its await_resume runs.
41 : This requires a separate in_list_ flag (unlike async_mutex where
42 : active_ served double duty). await_resume only calls remove()
43 : when in_list_ is true.
44 : */
45 :
46 : namespace boost {
47 : namespace capy {
48 :
49 : /** An asynchronous event for coroutines.
50 :
51 : This event provides a way to notify multiple coroutines that some
52 : condition has occurred. When a coroutine awaits an unset event, it
53 : suspends and is added to a wait queue. When the event is set, all
54 : waiting coroutines are resumed.
55 :
56 : @par Cancellation
57 :
58 : When a coroutine is suspended waiting for the event and its stop
59 : token is triggered, the waiter completes with `error::canceled`
60 : instead of waiting for `set()`.
61 :
62 : Cancellation only applies while the coroutine is suspended in the
63 : wait queue. If the event is already set when `wait()` is called,
64 : the wait completes immediately even if the stop token is already
65 : signaled.
66 :
67 : @par Zero Allocation
68 :
69 : No heap allocation occurs for wait operations.
70 :
71 : @par Thread Safety
72 :
73 : The event operations are designed for single-threaded use on one
74 : executor. The stop callback may fire from any thread.
75 :
76 : @par Example
77 : @code
78 : async_event event;
79 :
80 : task<> waiter() {
81 : auto [ec] = co_await event.wait();
82 : if(ec)
83 : co_return;
84 : // ... event was set ...
85 : }
86 :
87 : task<> notifier() {
88 : // ... do some work ...
89 : event.set(); // Wake all waiters
90 : }
91 : @endcode
92 : */
93 : class async_event
94 : {
95 : public:
96 : class wait_awaiter;
97 :
98 : private:
99 : bool set_ = false;
100 : detail::intrusive_list<wait_awaiter> waiters_;
101 :
102 : public:
103 : /** Awaiter returned by wait().
104 : */
105 : class wait_awaiter
106 : : public detail::intrusive_list<wait_awaiter>::node
107 : {
108 : friend class async_event;
109 :
110 : async_event* e_;
111 : std::coroutine_handle<> h_;
112 : executor_ref ex_;
113 :
114 : // Declared before stop_cb_buf_: the callback
115 : // accesses these members, so they must still be
116 : // alive if the stop_cb_ destructor blocks.
117 : std::atomic<bool> claimed_{false};
118 : bool canceled_ = false;
119 : bool active_ = false;
120 : bool in_list_ = false;
121 :
122 : struct cancel_fn
123 : {
124 : wait_awaiter* self_;
125 :
126 20 : void operator()() const noexcept
127 : {
128 20 : if(!self_->claimed_.exchange(
129 : true, std::memory_order_acq_rel))
130 : {
131 19 : self_->canceled_ = true;
132 19 : self_->ex_.dispatch(self_->h_);
133 : }
134 20 : }
135 : };
136 :
137 : using stop_cb_t =
138 : std::stop_callback<cancel_fn>;
139 :
140 : // Aligned storage for stop_cb_t. Declared last:
141 : // its destructor may block while the callback
142 : // accesses the members above.
143 : #ifdef _MSC_VER
144 : # pragma warning(push)
145 : # pragma warning(disable: 4324) // padded due to alignas
146 : #endif
147 : alignas(stop_cb_t)
148 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
149 : #ifdef _MSC_VER
150 : # pragma warning(pop)
151 : #endif
152 :
153 31 : stop_cb_t& stop_cb_() noexcept
154 : {
155 : return *reinterpret_cast<stop_cb_t*>(
156 31 : stop_cb_buf_);
157 : }
158 :
159 : public:
160 235 : ~wait_awaiter()
161 : {
162 235 : if(active_)
163 1 : stop_cb_().~stop_cb_t();
164 235 : if(in_list_)
165 1 : e_->waiters_.remove(this);
166 235 : }
167 :
168 51 : explicit wait_awaiter(async_event* e) noexcept
169 51 : : e_(e)
170 : {
171 51 : }
172 :
173 184 : wait_awaiter(wait_awaiter&& o) noexcept
174 184 : : e_(o.e_)
175 184 : , h_(o.h_)
176 184 : , ex_(o.ex_)
177 184 : , claimed_(o.claimed_.load(
178 : std::memory_order_relaxed))
179 184 : , canceled_(o.canceled_)
180 184 : , active_(std::exchange(o.active_, false))
181 184 : , in_list_(std::exchange(o.in_list_, false))
182 : {
183 184 : }
184 :
185 : wait_awaiter(wait_awaiter const&) = delete;
186 : wait_awaiter& operator=(wait_awaiter const&) = delete;
187 : wait_awaiter& operator=(wait_awaiter&&) = delete;
188 :
189 51 : bool await_ready() const noexcept
190 : {
191 51 : return e_->set_;
192 : }
193 :
194 : /** IoAwaitable protocol overload. */
195 : std::coroutine_handle<>
196 41 : await_suspend(
197 : std::coroutine_handle<> h,
198 : executor_ref ex,
199 : std::stop_token token) noexcept
200 : {
201 41 : if(token.stop_requested())
202 : {
203 10 : canceled_ = true;
204 10 : return h;
205 : }
206 31 : h_ = h;
207 31 : ex_ = ex;
208 31 : e_->waiters_.push_back(this);
209 31 : in_list_ = true;
210 93 : ::new(stop_cb_buf_) stop_cb_t(
211 31 : token, cancel_fn{this});
212 31 : active_ = true;
213 31 : return std::noop_coroutine();
214 : }
215 :
216 48 : io_result<> await_resume() noexcept
217 : {
218 48 : if(active_)
219 : {
220 30 : stop_cb_().~stop_cb_t();
221 30 : active_ = false;
222 : }
223 48 : if(canceled_)
224 : {
225 29 : if(in_list_)
226 : {
227 19 : e_->waiters_.remove(this);
228 19 : in_list_ = false;
229 : }
230 29 : return {make_error_code(
231 29 : error::canceled)};
232 : }
233 19 : return {{}};
234 : }
235 : };
236 :
237 20 : async_event() = default;
238 :
239 : // Non-copyable, non-movable
240 : async_event(async_event const&) = delete;
241 : async_event& operator=(async_event const&) = delete;
242 :
243 : /** Returns an awaiter that waits until the event is set.
244 :
245 : If the event is already set, completes immediately.
246 :
247 : @return An awaitable yielding `(error_code)`.
248 : */
249 51 : wait_awaiter wait() noexcept
250 : {
251 51 : return wait_awaiter{this};
252 : }
253 :
254 : /** Sets the event.
255 :
256 : All waiting coroutines are resumed. Canceled waiters
257 : are skipped. Subsequent calls to wait() complete
258 : immediately until clear() is called.
259 : */
260 20 : void set() noexcept
261 : {
262 20 : set_ = true;
263 : for(;;)
264 : {
265 31 : auto* w = waiters_.pop_front();
266 31 : if(!w)
267 20 : break;
268 11 : w->in_list_ = false;
269 11 : if(!w->claimed_.exchange(
270 : true, std::memory_order_acq_rel))
271 : {
272 11 : w->ex_.dispatch(w->h_);
273 : }
274 11 : }
275 20 : }
276 :
277 : /** Clears the event.
278 :
279 : Subsequent calls to wait() will suspend until
280 : set() is called again.
281 : */
282 2 : void clear() noexcept
283 : {
284 2 : set_ = false;
285 2 : }
286 :
287 : /** Returns true if the event is currently set.
288 : */
289 8 : bool is_set() const noexcept
290 : {
291 8 : return set_;
292 : }
293 : };
294 :
295 : } // namespace capy
296 : } // namespace boost
297 :
298 : #endif
|