LCOV - code coverage report
Current view: top level - capy/ex - async_event.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 68 68
Test Date: 2026-02-10 18:54:58 Functions: 100.0 % 13 13

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
      11              : #define BOOST_CAPY_ASYNC_EVENT_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/intrusive.hpp>
      15              : #include <boost/capy/concept/executor.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/error.hpp>
      18              : #include <boost/capy/ex/executor_ref.hpp>
      19              : #include <boost/capy/io_result.hpp>
      20              : 
      21              : #include <stop_token>
      22              : 
      23              : #include <atomic>
      24              : #include <coroutine>
      25              : #include <new>
      26              : #include <utility>
      27              : 
      28              : /*  async_event implementation notes
      29              :     =================================
      30              : 
      31              :     Same cancellation pattern as async_mutex (see that file for the
      32              :     full discussion on claimed_, stop_cb lifetime, member ordering,
      33              :     and threading assumptions).
      34              : 
      35              :     Key difference: set() wakes ALL waiters (broadcast), not one.
      36              :     It pops every waiter from the list and dispatches the ones it
      37              :     claims. Waiters already claimed by a stop callback are skipped.
      38              : 
      39              :     Because set() pops all waiters, a canceled waiter may have been
      40              :     removed from the list by set() before its await_resume runs.
      41              :     This requires a separate in_list_ flag (unlike async_mutex where
      42              :     active_ served double duty). await_resume only calls remove()
      43              :     when in_list_ is true.
      44              : */
      45              : 
      46              : namespace boost {
      47              : namespace capy {
      48              : 
      49              : /** An asynchronous event for coroutines.
      50              : 
      51              :     This event provides a way to notify multiple coroutines that some
      52              :     condition has occurred. When a coroutine awaits an unset event, it
      53              :     suspends and is added to a wait queue. When the event is set, all
      54              :     waiting coroutines are resumed.
      55              : 
      56              :     @par Cancellation
      57              : 
      58              :     When a coroutine is suspended waiting for the event and its stop
      59              :     token is triggered, the waiter completes with `error::canceled`
      60              :     instead of waiting for `set()`.
      61              : 
      62              :     Cancellation only applies while the coroutine is suspended in the
      63              :     wait queue. If the event is already set when `wait()` is called,
      64              :     the wait completes immediately even if the stop token is already
      65              :     signaled.
      66              : 
      67              :     @par Zero Allocation
      68              : 
      69              :     No heap allocation occurs for wait operations.
      70              : 
      71              :     @par Thread Safety
      72              : 
      73              :     The event operations are designed for single-threaded use on one
      74              :     executor. The stop callback may fire from any thread.
      75              : 
      76              :     @par Example
      77              :     @code
      78              :     async_event event;
      79              : 
      80              :     task<> waiter() {
      81              :         auto [ec] = co_await event.wait();
      82              :         if(ec)
      83              :             co_return;
      84              :         // ... event was set ...
      85              :     }
      86              : 
      87              :     task<> notifier() {
      88              :         // ... do some work ...
      89              :         event.set();  // Wake all waiters
      90              :     }
      91              :     @endcode
      92              : */
      93              : class async_event
      94              : {
      95              : public:
      96              :     class wait_awaiter;
      97              : 
      98              : private:
      99              :     bool set_ = false;
     100              :     detail::intrusive_list<wait_awaiter> waiters_;
     101              : 
     102              : public:
     103              :     /** Awaiter returned by wait().
     104              :     */
     105              :     class wait_awaiter
     106              :         : public detail::intrusive_list<wait_awaiter>::node
     107              :     {
     108              :         friend class async_event;
     109              : 
     110              :         async_event* e_;
     111              :         std::coroutine_handle<> h_;
     112              :         executor_ref ex_;
     113              : 
     114              :         // Declared before stop_cb_buf_: the callback
     115              :         // accesses these members, so they must still be
     116              :         // alive if the stop_cb_ destructor blocks.
     117              :         std::atomic<bool> claimed_{false};
     118              :         bool canceled_ = false;
     119              :         bool active_ = false;
     120              :         bool in_list_ = false;
     121              : 
     122              :         struct cancel_fn
     123              :         {
     124              :             wait_awaiter* self_;
     125              : 
     126           20 :             void operator()() const noexcept
     127              :             {
     128           20 :                 if(!self_->claimed_.exchange(
     129              :                     true, std::memory_order_acq_rel))
     130              :                 {
     131           19 :                     self_->canceled_ = true;
     132           19 :                     self_->ex_.dispatch(self_->h_);
     133              :                 }
     134           20 :             }
     135              :         };
     136              : 
     137              :         using stop_cb_t =
     138              :             std::stop_callback<cancel_fn>;
     139              : 
     140              :         // Aligned storage for stop_cb_t. Declared last:
     141              :         // its destructor may block while the callback
     142              :         // accesses the members above.
     143              : #ifdef _MSC_VER
     144              : # pragma warning(push)
     145              : # pragma warning(disable: 4324) // padded due to alignas
     146              : #endif
     147              :         alignas(stop_cb_t)
     148              :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     149              : #ifdef _MSC_VER
     150              : # pragma warning(pop)
     151              : #endif
     152              : 
     153           31 :         stop_cb_t& stop_cb_() noexcept
     154              :         {
     155              :             return *reinterpret_cast<stop_cb_t*>(
     156           31 :                 stop_cb_buf_);
     157              :         }
     158              : 
     159              :     public:
     160          235 :         ~wait_awaiter()
     161              :         {
     162          235 :             if(active_)
     163            1 :                 stop_cb_().~stop_cb_t();
     164          235 :             if(in_list_)
     165            1 :                 e_->waiters_.remove(this);
     166          235 :         }
     167              : 
     168           51 :         explicit wait_awaiter(async_event* e) noexcept
     169           51 :             : e_(e)
     170              :         {
     171           51 :         }
     172              : 
     173          184 :         wait_awaiter(wait_awaiter&& o) noexcept
     174          184 :             : e_(o.e_)
     175          184 :             , h_(o.h_)
     176          184 :             , ex_(o.ex_)
     177          184 :             , claimed_(o.claimed_.load(
     178              :                 std::memory_order_relaxed))
     179          184 :             , canceled_(o.canceled_)
     180          184 :             , active_(std::exchange(o.active_, false))
     181          184 :             , in_list_(std::exchange(o.in_list_, false))
     182              :         {
     183          184 :         }
     184              : 
     185              :         wait_awaiter(wait_awaiter const&) = delete;
     186              :         wait_awaiter& operator=(wait_awaiter const&) = delete;
     187              :         wait_awaiter& operator=(wait_awaiter&&) = delete;
     188              : 
     189           51 :         bool await_ready() const noexcept
     190              :         {
     191           51 :             return e_->set_;
     192              :         }
     193              : 
     194              :         /** IoAwaitable protocol overload. */
     195              :         std::coroutine_handle<>
     196           41 :         await_suspend(
     197              :             std::coroutine_handle<> h,
     198              :             executor_ref ex,
     199              :             std::stop_token token) noexcept
     200              :         {
     201           41 :             if(token.stop_requested())
     202              :             {
     203           10 :                 canceled_ = true;
     204           10 :                 return h;
     205              :             }
     206           31 :             h_ = h;
     207           31 :             ex_ = ex;
     208           31 :             e_->waiters_.push_back(this);
     209           31 :             in_list_ = true;
     210           93 :             ::new(stop_cb_buf_) stop_cb_t(
     211           31 :                 token, cancel_fn{this});
     212           31 :             active_ = true;
     213           31 :             return std::noop_coroutine();
     214              :         }
     215              : 
     216           48 :         io_result<> await_resume() noexcept
     217              :         {
     218           48 :             if(active_)
     219              :             {
     220           30 :                 stop_cb_().~stop_cb_t();
     221           30 :                 active_ = false;
     222              :             }
     223           48 :             if(canceled_)
     224              :             {
     225           29 :                 if(in_list_)
     226              :                 {
     227           19 :                     e_->waiters_.remove(this);
     228           19 :                     in_list_ = false;
     229              :                 }
     230           29 :                 return {make_error_code(
     231           29 :                     error::canceled)};
     232              :             }
     233           19 :             return {{}};
     234              :         }
     235              :     };
     236              : 
     237           20 :     async_event() = default;
     238              : 
     239              :     // Non-copyable, non-movable
     240              :     async_event(async_event const&) = delete;
     241              :     async_event& operator=(async_event const&) = delete;
     242              : 
     243              :     /** Returns an awaiter that waits until the event is set.
     244              : 
     245              :         If the event is already set, completes immediately.
     246              : 
     247              :         @return An awaitable yielding `(error_code)`.
     248              :     */
     249           51 :     wait_awaiter wait() noexcept
     250              :     {
     251           51 :         return wait_awaiter{this};
     252              :     }
     253              : 
     254              :     /** Sets the event.
     255              : 
     256              :         All waiting coroutines are resumed. Canceled waiters
     257              :         are skipped. Subsequent calls to wait() complete
     258              :         immediately until clear() is called.
     259              :     */
     260           20 :     void set() noexcept
     261              :     {
     262           20 :         set_ = true;
     263              :         for(;;)
     264              :         {
     265           31 :             auto* w = waiters_.pop_front();
     266           31 :             if(!w)
     267           20 :                 break;
     268           11 :             w->in_list_ = false;
     269           11 :             if(!w->claimed_.exchange(
     270              :                 true, std::memory_order_acq_rel))
     271              :             {
     272           11 :                 w->ex_.dispatch(w->h_);
     273              :             }
     274           11 :         }
     275           20 :     }
     276              : 
     277              :     /** Clears the event.
     278              : 
     279              :         Subsequent calls to wait() will suspend until
     280              :         set() is called again.
     281              :     */
     282            2 :     void clear() noexcept
     283              :     {
     284            2 :         set_ = false;
     285            2 :     }
     286              : 
     287              :     /** Returns true if the event is currently set.
     288              :     */
     289            8 :     bool is_set() const noexcept
     290              :     {
     291            8 :         return set_;
     292              :     }
     293              : };
     294              : 
     295              : } // namespace capy
     296              : } // namespace boost
     297              : 
     298              : #endif
        

Generated by: LCOV version 2.3