libs/capy/include/boost/capy/ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% Functions (20/20) 90.9% Branches (20/22)
libs/capy/include/boost/capy/ex/async_mutex.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <boost/capy/io_result.hpp>
20
21 #include <stop_token>
22
23 #include <atomic>
24 #include <coroutine>
25 #include <new>
26 #include <utility>
27
28 /* async_mutex implementation notes
29 ================================
30
31 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
32 inherits intrusive_list<lock_awaiter>::node; the list is owned by
33 async_mutex::waiters_.
34
35 Cancellation via stop_token
36 ---------------------------
37 A std::stop_callback is registered in await_suspend. Two actors can
38 race to resume the suspended coroutine: unlock() and the stop callback.
39 An atomic bool `claimed_` resolves the race -- whoever does
40 claimed_.exchange(true) and reads false wins. The loser does nothing.
41
42 The stop callback calls ex_.dispatch(h_). If dispatch runs inline
43 (same thread), the stop_callback is destroyed from within its own
44 operator() via await_resume. This is safe: cancel_fn touches no
45 members after dispatch returns (same pattern as delete-this).
46
47 unlock() pops waiters from the front. If the popped waiter was
48 already claimed by the stop callback, unlock() skips it and tries
49 the next. await_resume removes the (still-linked) canceled waiter
50 via waiters_.remove(this).
51
52 The stop_callback lives in a union to suppress automatic
53 construction/destruction. Placement new in await_suspend, explicit
54 destructor call in await_resume and ~lock_awaiter.
55
56 Member ordering constraint
57 --------------------------
58 The union containing stop_cb_ must be declared AFTER the members
59 the callback accesses (h_, ex_, claimed_, canceled_). If the
60 stop_cb_ destructor blocks waiting for a concurrent callback, those
61 members must still be alive (C++ destroys in reverse declaration
62 order).
63
64 active_ flag
65 ------------
66 Tracks both list membership and stop_cb_ lifetime (they are always
67 set and cleared together). Used by the destructor to clean up if the
68 coroutine is destroyed while suspended (e.g. execution_context
69 shutdown).
70
71 Cancellation scope
72 ------------------
73 Cancellation only takes effect while the coroutine is suspended in
74 the wait queue. If the mutex is unlocked, await_ready acquires it
75 immediately without checking the stop token. This is intentional:
76 the fast path has no token access and no overhead.
77
78 Threading assumptions
79 ---------------------
80 - All list mutations happen on the executor thread (await_suspend,
81 await_resume, unlock, ~lock_awaiter).
82 - The stop callback may fire from any thread, but only touches
83 claimed_ (atomic) and then calls dispatch. It never touches the
84 list.
85 - ~lock_awaiter must be called from the executor thread. This is
86 guaranteed during normal shutdown but NOT if the coroutine frame
87 is destroyed from another thread while a stop callback could
88 fire (precondition violation, same as cppcoro/folly).
89 */
90
91 namespace boost {
92 namespace capy {
93
94 /** An asynchronous mutex for coroutines.
95
96 This mutex provides mutual exclusion for coroutines without blocking.
97 When a coroutine attempts to acquire a locked mutex, it suspends and
98 is added to an intrusive wait queue. When the holder unlocks, the next
99 waiter is resumed with the lock held.
100
101 @par Cancellation
102
103 When a coroutine is suspended waiting for the mutex and its stop
104 token is triggered, the waiter completes with `error::canceled`
105 instead of acquiring the lock.
106
107 Cancellation only applies while the coroutine is suspended in the
108 wait queue. If the mutex is unlocked when `lock()` is called, the
109 lock is acquired immediately even if the stop token is already
110 signaled.
111
112 @par Zero Allocation
113
114 No heap allocation occurs for lock operations.
115
116 @par Thread Safety
117
118 The mutex operations are designed for single-threaded use on one
119 executor. The stop callback may fire from any thread.
120
121 @par Example
122 @code
123 async_mutex cm;
124
125 task<> protected_operation() {
126 auto [ec] = co_await cm.lock();
127 if(ec)
128 co_return;
129 // ... critical section ...
130 cm.unlock();
131 }
132
133 // Or with RAII:
134 task<> protected_operation() {
135 auto [ec, guard] = co_await cm.scoped_lock();
136 if(ec)
137 co_return;
138 // ... critical section ...
139 // unlocks automatically
140 }
141 @endcode
142 */
143 class async_mutex
144 {
145 public:
146 class lock_awaiter;
147 class lock_guard;
148 class lock_guard_awaiter;
149
150 private:
151 bool locked_ = false;
152 detail::intrusive_list<lock_awaiter> waiters_;
153
154 public:
155 /** Awaiter returned by lock().
156 */
157 class lock_awaiter
158 : public detail::intrusive_list<lock_awaiter>::node
159 {
160 friend class async_mutex;
161
162 async_mutex* m_;
163 std::coroutine_handle<> h_;
164 executor_ref ex_;
165
166 // These members must be declared before stop_cb_
167 // (see comment on the union below).
168 std::atomic<bool> claimed_{false};
169 bool canceled_ = false;
170 bool active_ = false;
171
172 struct cancel_fn
173 {
174 lock_awaiter* self_;
175
176 5 void operator()() const noexcept
177 {
178
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 if(!self_->claimed_.exchange(
179 true, std::memory_order_acq_rel))
180 {
181 5 self_->canceled_ = true;
182 5 self_->ex_.dispatch(self_->h_);
183 }
184 5 }
185 };
186
187 using stop_cb_t =
188 std::stop_callback<cancel_fn>;
189
190 // Aligned storage for stop_cb_t. Declared last:
191 // its destructor may block while the callback
192 // accesses the members above.
193 #ifdef _MSC_VER
194 # pragma warning(push)
195 # pragma warning(disable: 4324) // padded due to alignas
196 #endif
197 alignas(stop_cb_t)
198 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
199 #ifdef _MSC_VER
200 # pragma warning(pop)
201 #endif
202
203 15 stop_cb_t& stop_cb_() noexcept
204 {
205 return *reinterpret_cast<stop_cb_t*>(
206 15 stop_cb_buf_);
207 }
208
209 public:
210 62 ~lock_awaiter()
211 {
212
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 59 times.
62 if(active_)
213 {
214 3 stop_cb_().~stop_cb_t();
215 3 m_->waiters_.remove(this);
216 }
217 62 }
218
219 31 explicit lock_awaiter(async_mutex* m) noexcept
220 31 : m_(m)
221 {
222 31 }
223
224 31 lock_awaiter(lock_awaiter&& o) noexcept
225 31 : m_(o.m_)
226 31 , h_(o.h_)
227 31 , ex_(o.ex_)
228 31 , claimed_(o.claimed_.load(
229 std::memory_order_relaxed))
230 31 , canceled_(o.canceled_)
231 31 , active_(std::exchange(o.active_, false))
232 {
233 31 }
234
235 lock_awaiter(lock_awaiter const&) = delete;
236 lock_awaiter& operator=(lock_awaiter const&) = delete;
237 lock_awaiter& operator=(lock_awaiter&&) = delete;
238
239 31 bool await_ready() const noexcept
240 {
241
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 17 times.
31 if(!m_->locked_)
242 {
243 14 m_->locked_ = true;
244 14 return true;
245 }
246 17 return false;
247 }
248
249 /** IoAwaitable protocol overload. */
250 std::coroutine_handle<>
251 17 await_suspend(
252 std::coroutine_handle<> h,
253 executor_ref ex,
254 std::stop_token token = {}) noexcept
255 {
256
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 15 times.
17 if(token.stop_requested())
257 {
258 2 canceled_ = true;
259 2 return h;
260 }
261 15 h_ = h;
262 15 ex_ = ex;
263 15 m_->waiters_.push_back(this);
264 45 ::new(stop_cb_buf_) stop_cb_t(
265 15 token, cancel_fn{this});
266 15 active_ = true;
267 15 return std::noop_coroutine();
268 }
269
270 28 io_result<> await_resume() noexcept
271 {
272
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16 times.
28 if(active_)
273 {
274 12 stop_cb_().~stop_cb_t();
275
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 7 times.
12 if(canceled_)
276 {
277 5 m_->waiters_.remove(this);
278 5 active_ = false;
279 5 return {make_error_code(
280 5 error::canceled)};
281 }
282 7 active_ = false;
283 }
284
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 21 times.
23 if(canceled_)
285 2 return {make_error_code(
286 2 error::canceled)};
287 21 return {{}};
288 }
289 };
290
291 /** RAII lock guard for async_mutex.
292
293 Automatically unlocks the mutex when destroyed.
294 */
295 class [[nodiscard]] lock_guard
296 {
297 async_mutex* m_;
298
299 public:
300 5 ~lock_guard()
301 {
302
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(m_)
303 2 m_->unlock();
304 5 }
305
306 2 lock_guard() noexcept
307 2 : m_(nullptr)
308 {
309 2 }
310
311 2 explicit lock_guard(async_mutex* m) noexcept
312 2 : m_(m)
313 {
314 2 }
315
316 1 lock_guard(lock_guard&& o) noexcept
317 1 : m_(std::exchange(o.m_, nullptr))
318 {
319 1 }
320
321 lock_guard& operator=(lock_guard&& o) noexcept
322 {
323 if(this != &o)
324 {
325 if(m_)
326 m_->unlock();
327 m_ = std::exchange(o.m_, nullptr);
328 }
329 return *this;
330 }
331
332 lock_guard(lock_guard const&) = delete;
333 lock_guard& operator=(lock_guard const&) = delete;
334 };
335
336 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
337 */
338 class lock_guard_awaiter
339 {
340 async_mutex* m_;
341 lock_awaiter inner_;
342
343 public:
344 4 explicit lock_guard_awaiter(async_mutex* m) noexcept
345 4 : m_(m)
346 4 , inner_(m)
347 {
348 4 }
349
350 4 bool await_ready() const noexcept
351 {
352 4 return inner_.await_ready();
353 }
354
355 /** IoAwaitable protocol overload. */
356 std::coroutine_handle<>
357 2 await_suspend(
358 std::coroutine_handle<> h,
359 executor_ref ex,
360 std::stop_token token = {}) noexcept
361 {
362 2 return inner_.await_suspend(h, ex, token);
363 }
364
365 4 io_result<lock_guard> await_resume() noexcept
366 {
367 4 auto r = inner_.await_resume();
368
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if(r.ec)
369 2 return {r.ec, {}};
370 2 return {{}, lock_guard(m_)};
371 4 }
372 };
373
374 async_mutex() = default;
375
376 // Non-copyable, non-movable
377 async_mutex(async_mutex const&) = delete;
378 async_mutex& operator=(async_mutex const&) = delete;
379
380 /** Returns an awaiter that acquires the mutex.
381
382 @return An awaitable yielding `(error_code)`.
383 */
384 27 lock_awaiter lock() noexcept
385 {
386 27 return lock_awaiter{this};
387 }
388
389 /** Returns an awaiter that acquires the mutex with RAII.
390
391 @return An awaitable yielding `(error_code,lock_guard)`.
392 */
393 4 lock_guard_awaiter scoped_lock() noexcept
394 {
395 4 return lock_guard_awaiter(this);
396 }
397
398 /** Releases the mutex.
399
400 If waiters are queued, the next eligible waiter is
401 resumed with the lock held. Canceled waiters are
402 skipped. If no eligible waiter remains, the mutex
403 becomes unlocked.
404 */
405 21 void unlock() noexcept
406 {
407 for(;;)
408 {
409 21 auto* waiter = waiters_.pop_front();
410
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7 times.
21 if(!waiter)
411 {
412 14 locked_ = false;
413 14 return;
414 }
415
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 if(!waiter->claimed_.exchange(
416 true, std::memory_order_acq_rel))
417 {
418 7 waiter->ex_.dispatch(waiter->h_);
419 7 return;
420 }
421 }
422 }
423
424 /** Returns true if the mutex is currently locked.
425 */
426 22 bool is_locked() const noexcept
427 {
428 22 return locked_;
429 }
430 };
431
432 } // namespace capy
433 } // namespace boost
434
435 #endif
436