LCOV - code coverage report
Current view: top level - capy/ex - async_mutex.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 98.9 % 94 93
Test Date: 2026-02-10 18:54:58 Functions: 100.0 % 20 20

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
      11              : #define BOOST_CAPY_ASYNC_MUTEX_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/intrusive.hpp>
      15              : #include <boost/capy/concept/executor.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/error.hpp>
      18              : #include <boost/capy/ex/executor_ref.hpp>
      19              : #include <boost/capy/io_result.hpp>
      20              : 
      21              : #include <stop_token>
      22              : 
      23              : #include <atomic>
      24              : #include <coroutine>
      25              : #include <new>
      26              : #include <utility>
      27              : 
      28              : /*  async_mutex implementation notes
      29              :     ================================
      30              : 
      31              :     Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
      32              :     inherits intrusive_list<lock_awaiter>::node; the list is owned by
      33              :     async_mutex::waiters_.
      34              : 
      35              :     Cancellation via stop_token
      36              :     ---------------------------
      37              :     A std::stop_callback is registered in await_suspend. Two actors can
      38              :     race to resume the suspended coroutine: unlock() and the stop callback.
      39              :     An atomic bool `claimed_` resolves the race -- whoever does
      40              :     claimed_.exchange(true) and reads false wins. The loser does nothing.
      41              : 
      42              :     The stop callback calls ex_.dispatch(h_). If dispatch runs inline
      43              :     (same thread), the stop_callback is destroyed from within its own
      44              :     operator() via await_resume. This is safe: cancel_fn touches no
      45              :     members after dispatch returns (same pattern as delete-this).
      46              : 
      47              :     unlock() pops waiters from the front. If the popped waiter was
      48              :     already claimed by the stop callback, unlock() skips it and tries
      49              :     the next. await_resume removes the (still-linked) canceled waiter
      50              :     via waiters_.remove(this).
      51              : 
      52              :     The stop_callback lives in a union to suppress automatic
      53              :     construction/destruction. Placement new in await_suspend, explicit
      54              :     destructor call in await_resume and ~lock_awaiter.
      55              : 
      56              :     Member ordering constraint
      57              :     --------------------------
      58              :     The union containing stop_cb_ must be declared AFTER the members
      59              :     the callback accesses (h_, ex_, claimed_, canceled_). If the
      60              :     stop_cb_ destructor blocks waiting for a concurrent callback, those
      61              :     members must still be alive (C++ destroys in reverse declaration
      62              :     order).
      63              : 
      64              :     active_ flag
      65              :     ------------
      66              :     Tracks both list membership and stop_cb_ lifetime (they are always
      67              :     set and cleared together). Used by the destructor to clean up if the
      68              :     coroutine is destroyed while suspended (e.g. execution_context
      69              :     shutdown).
      70              : 
      71              :     Cancellation scope
      72              :     ------------------
      73              :     Cancellation only takes effect while the coroutine is suspended in
      74              :     the wait queue. If the mutex is unlocked, await_ready acquires it
      75              :     immediately without checking the stop token. This is intentional:
      76              :     the fast path has no token access and no overhead.
      77              : 
      78              :     Threading assumptions
      79              :     ---------------------
      80              :     - All list mutations happen on the executor thread (await_suspend,
      81              :       await_resume, unlock, ~lock_awaiter).
      82              :     - The stop callback may fire from any thread, but only touches
      83              :       claimed_ (atomic) and then calls dispatch. It never touches the
      84              :       list.
      85              :     - ~lock_awaiter must be called from the executor thread. This is
      86              :       guaranteed during normal shutdown but NOT if the coroutine frame
      87              :       is destroyed from another thread while a stop callback could
      88              :       fire (precondition violation, same as cppcoro/folly).
      89              : */
      90              : 
      91              : namespace boost {
      92              : namespace capy {
      93              : 
      94              : /** An asynchronous mutex for coroutines.
      95              : 
      96              :     This mutex provides mutual exclusion for coroutines without blocking.
      97              :     When a coroutine attempts to acquire a locked mutex, it suspends and
      98              :     is added to an intrusive wait queue. When the holder unlocks, the next
      99              :     waiter is resumed with the lock held.
     100              : 
     101              :     @par Cancellation
     102              : 
     103              :     When a coroutine is suspended waiting for the mutex and its stop
     104              :     token is triggered, the waiter completes with `error::canceled`
     105              :     instead of acquiring the lock.
     106              : 
     107              :     Cancellation only applies while the coroutine is suspended in the
     108              :     wait queue. If the mutex is unlocked when `lock()` is called, the
     109              :     lock is acquired immediately even if the stop token is already
     110              :     signaled.
     111              : 
     112              :     @par Zero Allocation
     113              : 
     114              :     No heap allocation occurs for lock operations.
     115              : 
     116              :     @par Thread Safety
     117              : 
     118              :     The mutex operations are designed for single-threaded use on one
     119              :     executor. The stop callback may fire from any thread.
     120              : 
     121              :     @par Example
     122              :     @code
     123              :     async_mutex cm;
     124              : 
     125              :     task<> protected_operation() {
     126              :         auto [ec] = co_await cm.lock();
     127              :         if(ec)
     128              :             co_return;
     129              :         // ... critical section ...
     130              :         cm.unlock();
     131              :     }
     132              : 
     133              :     // Or with RAII:
     134              :     task<> protected_operation() {
     135              :         auto [ec, guard] = co_await cm.scoped_lock();
     136              :         if(ec)
     137              :             co_return;
     138              :         // ... critical section ...
     139              :         // unlocks automatically
     140              :     }
     141              :     @endcode
     142              : */
     143              : class async_mutex
     144              : {
     145              : public:
     146              :     class lock_awaiter;
     147              :     class lock_guard;
     148              :     class lock_guard_awaiter;
     149              : 
     150              : private:
     151              :     bool locked_ = false;
     152              :     detail::intrusive_list<lock_awaiter> waiters_;
     153              : 
     154              : public:
     155              :     /** Awaiter returned by lock().
     156              :     */
     157              :     class lock_awaiter
     158              :         : public detail::intrusive_list<lock_awaiter>::node
     159              :     {
     160              :         friend class async_mutex;
     161              : 
     162              :         async_mutex* m_;
     163              :         std::coroutine_handle<> h_;
     164              :         executor_ref ex_;
     165              : 
     166              :         // These members must be declared before stop_cb_
     167              :         // (see comment on the union below).
     168              :         std::atomic<bool> claimed_{false};
     169              :         bool canceled_ = false;
     170              :         bool active_ = false;
     171              : 
     172              :         struct cancel_fn
     173              :         {
     174              :             lock_awaiter* self_;
     175              : 
     176            5 :             void operator()() const noexcept
     177              :             {
     178            5 :                 if(!self_->claimed_.exchange(
     179              :                     true, std::memory_order_acq_rel))
     180              :                 {
     181            5 :                     self_->canceled_ = true;
     182            5 :                     self_->ex_.dispatch(self_->h_);
     183              :                 }
     184            5 :             }
     185              :         };
     186              : 
     187              :         using stop_cb_t =
     188              :             std::stop_callback<cancel_fn>;
     189              : 
     190              :         // Aligned storage for stop_cb_t. Declared last:
     191              :         // its destructor may block while the callback
     192              :         // accesses the members above.
     193              : #ifdef _MSC_VER
     194              : # pragma warning(push)
     195              : # pragma warning(disable: 4324) // padded due to alignas
     196              : #endif
     197              :         alignas(stop_cb_t)
     198              :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     199              : #ifdef _MSC_VER
     200              : # pragma warning(pop)
     201              : #endif
     202              : 
     203           15 :         stop_cb_t& stop_cb_() noexcept
     204              :         {
     205              :             return *reinterpret_cast<stop_cb_t*>(
     206           15 :                 stop_cb_buf_);
     207              :         }
     208              : 
     209              :     public:
     210           62 :         ~lock_awaiter()
     211              :         {
     212           62 :             if(active_)
     213              :             {
     214            3 :                 stop_cb_().~stop_cb_t();
     215            3 :                 m_->waiters_.remove(this);
     216              :             }
     217           62 :         }
     218              : 
     219           31 :         explicit lock_awaiter(async_mutex* m) noexcept
     220           31 :             : m_(m)
     221              :         {
     222           31 :         }
     223              : 
     224           31 :         lock_awaiter(lock_awaiter&& o) noexcept
     225           31 :             : m_(o.m_)
     226           31 :             , h_(o.h_)
     227           31 :             , ex_(o.ex_)
     228           31 :             , claimed_(o.claimed_.load(
     229              :                 std::memory_order_relaxed))
     230           31 :             , canceled_(o.canceled_)
     231           31 :             , active_(std::exchange(o.active_, false))
     232              :         {
     233           31 :         }
     234              : 
     235              :         lock_awaiter(lock_awaiter const&) = delete;
     236              :         lock_awaiter& operator=(lock_awaiter const&) = delete;
     237              :         lock_awaiter& operator=(lock_awaiter&&) = delete;
     238              : 
     239           31 :         bool await_ready() const noexcept
     240              :         {
     241           31 :             if(!m_->locked_)
     242              :             {
     243           14 :                 m_->locked_ = true;
     244           14 :                 return true;
     245              :             }
     246           17 :             return false;
     247              :         }
     248              : 
     249              :         /** IoAwaitable protocol overload. */
     250              :         std::coroutine_handle<>
     251           17 :         await_suspend(
     252              :             std::coroutine_handle<> h,
     253              :             executor_ref ex,
     254              :             std::stop_token token = {}) noexcept
     255              :         {
     256           17 :             if(token.stop_requested())
     257              :             {
     258            2 :                 canceled_ = true;
     259            2 :                 return h;
     260              :             }
     261           15 :             h_ = h;
     262           15 :             ex_ = ex;
     263           15 :             m_->waiters_.push_back(this);
     264           45 :             ::new(stop_cb_buf_) stop_cb_t(
     265           15 :                 token, cancel_fn{this});
     266           15 :             active_ = true;
     267           15 :             return std::noop_coroutine();
     268              :         }
     269              : 
     270           28 :         io_result<> await_resume() noexcept
     271              :         {
     272           28 :             if(active_)
     273              :             {
     274           12 :                 stop_cb_().~stop_cb_t();
     275           12 :                 if(canceled_)
     276              :                 {
     277            5 :                     m_->waiters_.remove(this);
     278            5 :                     active_ = false;
     279            5 :                     return {make_error_code(
     280            5 :                         error::canceled)};
     281              :                 }
     282            7 :                 active_ = false;
     283              :             }
     284           23 :             if(canceled_)
     285            2 :                 return {make_error_code(
     286            2 :                     error::canceled)};
     287           21 :             return {{}};
     288              :         }
     289              :     };
     290              : 
     291              :     /** RAII lock guard for async_mutex.
     292              : 
     293              :         Automatically unlocks the mutex when destroyed.
     294              :     */
     295              :     class [[nodiscard]] lock_guard
     296              :     {
     297              :         async_mutex* m_;
     298              : 
     299              :     public:
     300            5 :         ~lock_guard()
     301              :         {
     302            5 :             if(m_)
     303            2 :                 m_->unlock();
     304            5 :         }
     305              : 
     306            2 :         lock_guard() noexcept
     307            2 :             : m_(nullptr)
     308              :         {
     309            2 :         }
     310              : 
     311            2 :         explicit lock_guard(async_mutex* m) noexcept
     312            2 :             : m_(m)
     313              :         {
     314            2 :         }
     315              : 
     316            1 :         lock_guard(lock_guard&& o) noexcept
     317            1 :             : m_(std::exchange(o.m_, nullptr))
     318              :         {
     319            1 :         }
     320              : 
     321              :         lock_guard& operator=(lock_guard&& o) noexcept
     322              :         {
     323              :             if(this != &o)
     324              :             {
     325              :                 if(m_)
     326              :                     m_->unlock();
     327              :                 m_ = std::exchange(o.m_, nullptr);
     328              :             }
     329              :             return *this;
     330              :         }
     331              : 
     332              :         lock_guard(lock_guard const&) = delete;
     333              :         lock_guard& operator=(lock_guard const&) = delete;
     334              :     };
     335              : 
     336              :     /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
     337              :     */
     338              :     class lock_guard_awaiter
     339              :     {
     340              :         async_mutex* m_;
     341              :         lock_awaiter inner_;
     342              : 
     343              :     public:
     344            4 :         explicit lock_guard_awaiter(async_mutex* m) noexcept
     345            4 :             : m_(m)
     346            4 :             , inner_(m)
     347              :         {
     348            4 :         }
     349              : 
     350            4 :         bool await_ready() const noexcept
     351              :         {
     352            4 :             return inner_.await_ready();
     353              :         }
     354              : 
     355              :         /** IoAwaitable protocol overload. */
     356              :         std::coroutine_handle<>
     357            2 :         await_suspend(
     358              :             std::coroutine_handle<> h,
     359              :             executor_ref ex,
     360              :             std::stop_token token = {}) noexcept
     361              :         {
     362            2 :             return inner_.await_suspend(h, ex, token);
     363              :         }
     364              : 
     365            4 :         io_result<lock_guard> await_resume() noexcept
     366              :         {
     367            4 :             auto r = inner_.await_resume();
     368            4 :             if(r.ec)
     369            2 :                 return {r.ec, {}};
     370            2 :             return {{}, lock_guard(m_)};
     371            4 :         }
     372              :     };
     373              : 
     374              :     async_mutex() = default;
     375              : 
     376              :     // Non-copyable, non-movable
     377              :     async_mutex(async_mutex const&) = delete;
     378              :     async_mutex& operator=(async_mutex const&) = delete;
     379              : 
     380              :     /** Returns an awaiter that acquires the mutex.
     381              : 
     382              :         @return An awaitable yielding `(error_code)`.
     383              :     */
     384           27 :     lock_awaiter lock() noexcept
     385              :     {
     386           27 :         return lock_awaiter{this};
     387              :     }
     388              : 
     389              :     /** Returns an awaiter that acquires the mutex with RAII.
     390              : 
     391              :         @return An awaitable yielding `(error_code,lock_guard)`.
     392              :     */
     393            4 :     lock_guard_awaiter scoped_lock() noexcept
     394              :     {
     395            4 :         return lock_guard_awaiter(this);
     396              :     }
     397              : 
     398              :     /** Releases the mutex.
     399              : 
     400              :         If waiters are queued, the next eligible waiter is
     401              :         resumed with the lock held. Canceled waiters are
     402              :         skipped. If no eligible waiter remains, the mutex
     403              :         becomes unlocked.
     404              :     */
     405           21 :     void unlock() noexcept
     406              :     {
     407              :         for(;;)
     408              :         {
     409           21 :             auto* waiter = waiters_.pop_front();
     410           21 :             if(!waiter)
     411              :             {
     412           14 :                 locked_ = false;
     413           14 :                 return;
     414              :             }
     415            7 :             if(!waiter->claimed_.exchange(
     416              :                 true, std::memory_order_acq_rel))
     417              :             {
     418            7 :                 waiter->ex_.dispatch(waiter->h_);
     419            7 :                 return;
     420              :             }
     421            0 :         }
     422              :     }
     423              : 
     424              :     /** Returns true if the mutex is currently locked.
     425              :     */
     426           22 :     bool is_locked() const noexcept
     427              :     {
     428           22 :         return locked_;
     429              :     }
     430              : };
     431              : 
     432              : } // namespace capy
     433              : } // namespace boost
     434              : 
     435              : #endif
        

Generated by: LCOV version 2.3