Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/ex/executor_ref.hpp>
19 : #include <boost/capy/io_result.hpp>
20 :
21 : #include <stop_token>
22 :
23 : #include <atomic>
24 : #include <coroutine>
25 : #include <new>
26 : #include <utility>
27 :
28 : /* async_mutex implementation notes
29 : ================================
30 :
31 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
32 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
33 : async_mutex::waiters_.
34 :
35 : Cancellation via stop_token
36 : ---------------------------
37 : A std::stop_callback is registered in await_suspend. Two actors can
38 : race to resume the suspended coroutine: unlock() and the stop callback.
39 : An atomic bool `claimed_` resolves the race -- whoever does
40 : claimed_.exchange(true) and reads false wins. The loser does nothing.
41 :
42 : The stop callback calls ex_.dispatch(h_). If dispatch runs inline
43 : (same thread), the stop_callback is destroyed from within its own
44 : operator() via await_resume. This is safe: cancel_fn touches no
45 : members after dispatch returns (same pattern as delete-this).
46 :
47 : unlock() pops waiters from the front. If the popped waiter was
48 : already claimed by the stop callback, unlock() skips it and tries
49 : the next. await_resume removes the (still-linked) canceled waiter
50 : via waiters_.remove(this).
51 :
52 : The stop_callback lives in a union to suppress automatic
53 : construction/destruction. Placement new in await_suspend, explicit
54 : destructor call in await_resume and ~lock_awaiter.
55 :
56 : Member ordering constraint
57 : --------------------------
58 : The union containing stop_cb_ must be declared AFTER the members
59 : the callback accesses (h_, ex_, claimed_, canceled_). If the
60 : stop_cb_ destructor blocks waiting for a concurrent callback, those
61 : members must still be alive (C++ destroys in reverse declaration
62 : order).
63 :
64 : active_ flag
65 : ------------
66 : Tracks both list membership and stop_cb_ lifetime (they are always
67 : set and cleared together). Used by the destructor to clean up if the
68 : coroutine is destroyed while suspended (e.g. execution_context
69 : shutdown).
70 :
71 : Cancellation scope
72 : ------------------
73 : Cancellation only takes effect while the coroutine is suspended in
74 : the wait queue. If the mutex is unlocked, await_ready acquires it
75 : immediately without checking the stop token. This is intentional:
76 : the fast path has no token access and no overhead.
77 :
78 : Threading assumptions
79 : ---------------------
80 : - All list mutations happen on the executor thread (await_suspend,
81 : await_resume, unlock, ~lock_awaiter).
82 : - The stop callback may fire from any thread, but only touches
83 : claimed_ (atomic) and then calls dispatch. It never touches the
84 : list.
85 : - ~lock_awaiter must be called from the executor thread. This is
86 : guaranteed during normal shutdown but NOT if the coroutine frame
87 : is destroyed from another thread while a stop callback could
88 : fire (precondition violation, same as cppcoro/folly).
89 : */
90 :
91 : namespace boost {
92 : namespace capy {
93 :
94 : /** An asynchronous mutex for coroutines.
95 :
96 : This mutex provides mutual exclusion for coroutines without blocking.
97 : When a coroutine attempts to acquire a locked mutex, it suspends and
98 : is added to an intrusive wait queue. When the holder unlocks, the next
99 : waiter is resumed with the lock held.
100 :
101 : @par Cancellation
102 :
103 : When a coroutine is suspended waiting for the mutex and its stop
104 : token is triggered, the waiter completes with `error::canceled`
105 : instead of acquiring the lock.
106 :
107 : Cancellation only applies while the coroutine is suspended in the
108 : wait queue. If the mutex is unlocked when `lock()` is called, the
109 : lock is acquired immediately even if the stop token is already
110 : signaled.
111 :
112 : @par Zero Allocation
113 :
114 : No heap allocation occurs for lock operations.
115 :
116 : @par Thread Safety
117 :
118 : The mutex operations are designed for single-threaded use on one
119 : executor. The stop callback may fire from any thread.
120 :
121 : @par Example
122 : @code
123 : async_mutex cm;
124 :
125 : task<> protected_operation() {
126 : auto [ec] = co_await cm.lock();
127 : if(ec)
128 : co_return;
129 : // ... critical section ...
130 : cm.unlock();
131 : }
132 :
133 : // Or with RAII:
134 : task<> protected_operation() {
135 : auto [ec, guard] = co_await cm.scoped_lock();
136 : if(ec)
137 : co_return;
138 : // ... critical section ...
139 : // unlocks automatically
140 : }
141 : @endcode
142 : */
143 : class async_mutex
144 : {
145 : public:
146 : class lock_awaiter;
147 : class lock_guard;
148 : class lock_guard_awaiter;
149 :
150 : private:
151 : bool locked_ = false;
152 : detail::intrusive_list<lock_awaiter> waiters_;
153 :
154 : public:
155 : /** Awaiter returned by lock().
156 : */
157 : class lock_awaiter
158 : : public detail::intrusive_list<lock_awaiter>::node
159 : {
160 : friend class async_mutex;
161 :
162 : async_mutex* m_;
163 : std::coroutine_handle<> h_;
164 : executor_ref ex_;
165 :
166 : // These members must be declared before stop_cb_
167 : // (see comment on the union below).
168 : std::atomic<bool> claimed_{false};
169 : bool canceled_ = false;
170 : bool active_ = false;
171 :
172 : struct cancel_fn
173 : {
174 : lock_awaiter* self_;
175 :
176 5 : void operator()() const noexcept
177 : {
178 5 : if(!self_->claimed_.exchange(
179 : true, std::memory_order_acq_rel))
180 : {
181 5 : self_->canceled_ = true;
182 5 : self_->ex_.dispatch(self_->h_);
183 : }
184 5 : }
185 : };
186 :
187 : using stop_cb_t =
188 : std::stop_callback<cancel_fn>;
189 :
190 : // Aligned storage for stop_cb_t. Declared last:
191 : // its destructor may block while the callback
192 : // accesses the members above.
193 : #ifdef _MSC_VER
194 : # pragma warning(push)
195 : # pragma warning(disable: 4324) // padded due to alignas
196 : #endif
197 : alignas(stop_cb_t)
198 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
199 : #ifdef _MSC_VER
200 : # pragma warning(pop)
201 : #endif
202 :
203 15 : stop_cb_t& stop_cb_() noexcept
204 : {
205 : return *reinterpret_cast<stop_cb_t*>(
206 15 : stop_cb_buf_);
207 : }
208 :
209 : public:
210 62 : ~lock_awaiter()
211 : {
212 62 : if(active_)
213 : {
214 3 : stop_cb_().~stop_cb_t();
215 3 : m_->waiters_.remove(this);
216 : }
217 62 : }
218 :
219 31 : explicit lock_awaiter(async_mutex* m) noexcept
220 31 : : m_(m)
221 : {
222 31 : }
223 :
224 31 : lock_awaiter(lock_awaiter&& o) noexcept
225 31 : : m_(o.m_)
226 31 : , h_(o.h_)
227 31 : , ex_(o.ex_)
228 31 : , claimed_(o.claimed_.load(
229 : std::memory_order_relaxed))
230 31 : , canceled_(o.canceled_)
231 31 : , active_(std::exchange(o.active_, false))
232 : {
233 31 : }
234 :
235 : lock_awaiter(lock_awaiter const&) = delete;
236 : lock_awaiter& operator=(lock_awaiter const&) = delete;
237 : lock_awaiter& operator=(lock_awaiter&&) = delete;
238 :
239 31 : bool await_ready() const noexcept
240 : {
241 31 : if(!m_->locked_)
242 : {
243 14 : m_->locked_ = true;
244 14 : return true;
245 : }
246 17 : return false;
247 : }
248 :
249 : /** IoAwaitable protocol overload. */
250 : std::coroutine_handle<>
251 17 : await_suspend(
252 : std::coroutine_handle<> h,
253 : executor_ref ex,
254 : std::stop_token token = {}) noexcept
255 : {
256 17 : if(token.stop_requested())
257 : {
258 2 : canceled_ = true;
259 2 : return h;
260 : }
261 15 : h_ = h;
262 15 : ex_ = ex;
263 15 : m_->waiters_.push_back(this);
264 45 : ::new(stop_cb_buf_) stop_cb_t(
265 15 : token, cancel_fn{this});
266 15 : active_ = true;
267 15 : return std::noop_coroutine();
268 : }
269 :
270 28 : io_result<> await_resume() noexcept
271 : {
272 28 : if(active_)
273 : {
274 12 : stop_cb_().~stop_cb_t();
275 12 : if(canceled_)
276 : {
277 5 : m_->waiters_.remove(this);
278 5 : active_ = false;
279 5 : return {make_error_code(
280 5 : error::canceled)};
281 : }
282 7 : active_ = false;
283 : }
284 23 : if(canceled_)
285 2 : return {make_error_code(
286 2 : error::canceled)};
287 21 : return {{}};
288 : }
289 : };
290 :
291 : /** RAII lock guard for async_mutex.
292 :
293 : Automatically unlocks the mutex when destroyed.
294 : */
295 : class [[nodiscard]] lock_guard
296 : {
297 : async_mutex* m_;
298 :
299 : public:
300 5 : ~lock_guard()
301 : {
302 5 : if(m_)
303 2 : m_->unlock();
304 5 : }
305 :
306 2 : lock_guard() noexcept
307 2 : : m_(nullptr)
308 : {
309 2 : }
310 :
311 2 : explicit lock_guard(async_mutex* m) noexcept
312 2 : : m_(m)
313 : {
314 2 : }
315 :
316 1 : lock_guard(lock_guard&& o) noexcept
317 1 : : m_(std::exchange(o.m_, nullptr))
318 : {
319 1 : }
320 :
321 : lock_guard& operator=(lock_guard&& o) noexcept
322 : {
323 : if(this != &o)
324 : {
325 : if(m_)
326 : m_->unlock();
327 : m_ = std::exchange(o.m_, nullptr);
328 : }
329 : return *this;
330 : }
331 :
332 : lock_guard(lock_guard const&) = delete;
333 : lock_guard& operator=(lock_guard const&) = delete;
334 : };
335 :
336 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
337 : */
338 : class lock_guard_awaiter
339 : {
340 : async_mutex* m_;
341 : lock_awaiter inner_;
342 :
343 : public:
344 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
345 4 : : m_(m)
346 4 : , inner_(m)
347 : {
348 4 : }
349 :
350 4 : bool await_ready() const noexcept
351 : {
352 4 : return inner_.await_ready();
353 : }
354 :
355 : /** IoAwaitable protocol overload. */
356 : std::coroutine_handle<>
357 2 : await_suspend(
358 : std::coroutine_handle<> h,
359 : executor_ref ex,
360 : std::stop_token token = {}) noexcept
361 : {
362 2 : return inner_.await_suspend(h, ex, token);
363 : }
364 :
365 4 : io_result<lock_guard> await_resume() noexcept
366 : {
367 4 : auto r = inner_.await_resume();
368 4 : if(r.ec)
369 2 : return {r.ec, {}};
370 2 : return {{}, lock_guard(m_)};
371 4 : }
372 : };
373 :
374 : async_mutex() = default;
375 :
376 : // Non-copyable, non-movable
377 : async_mutex(async_mutex const&) = delete;
378 : async_mutex& operator=(async_mutex const&) = delete;
379 :
380 : /** Returns an awaiter that acquires the mutex.
381 :
382 : @return An awaitable yielding `(error_code)`.
383 : */
384 27 : lock_awaiter lock() noexcept
385 : {
386 27 : return lock_awaiter{this};
387 : }
388 :
389 : /** Returns an awaiter that acquires the mutex with RAII.
390 :
391 : @return An awaitable yielding `(error_code,lock_guard)`.
392 : */
393 4 : lock_guard_awaiter scoped_lock() noexcept
394 : {
395 4 : return lock_guard_awaiter(this);
396 : }
397 :
398 : /** Releases the mutex.
399 :
400 : If waiters are queued, the next eligible waiter is
401 : resumed with the lock held. Canceled waiters are
402 : skipped. If no eligible waiter remains, the mutex
403 : becomes unlocked.
404 : */
405 21 : void unlock() noexcept
406 : {
407 : for(;;)
408 : {
409 21 : auto* waiter = waiters_.pop_front();
410 21 : if(!waiter)
411 : {
412 14 : locked_ = false;
413 14 : return;
414 : }
415 7 : if(!waiter->claimed_.exchange(
416 : true, std::memory_order_acq_rel))
417 : {
418 7 : waiter->ex_.dispatch(waiter->h_);
419 7 : return;
420 : }
421 0 : }
422 : }
423 :
424 : /** Returns true if the mutex is currently locked.
425 : */
426 22 : bool is_locked() const noexcept
427 : {
428 22 : return locked_;
429 : }
430 : };
431 :
432 : } // namespace capy
433 : } // namespace boost
434 :
435 : #endif
|