1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/coro.hpp>
16  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/error.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  

20  

21  
#include <stop_token>
21  
#include <stop_token>
22  

22  

23  
#include <atomic>
23  
#include <atomic>
24  
#include <coroutine>
24  
#include <coroutine>
25  
#include <new>
25  
#include <new>
26  
#include <utility>
26  
#include <utility>
27  

27  

28  
/*  async_event implementation notes
28  
/*  async_event implementation notes
29  
    =================================
29  
    =================================
30  

30  

31  
    Same cancellation pattern as async_mutex (see that file for the
31  
    Same cancellation pattern as async_mutex (see that file for the
32  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    full discussion on claimed_, stop_cb lifetime, member ordering,
33  
    and threading assumptions).
33  
    and threading assumptions).
34  

34  

35  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    Key difference: set() wakes ALL waiters (broadcast), not one.
36  
    It pops every waiter from the list and dispatches the ones it
36  
    It pops every waiter from the list and dispatches the ones it
37  
    claims. Waiters already claimed by a stop callback are skipped.
37  
    claims. Waiters already claimed by a stop callback are skipped.
38  

38  

39  
    Because set() pops all waiters, a canceled waiter may have been
39  
    Because set() pops all waiters, a canceled waiter may have been
40  
    removed from the list by set() before its await_resume runs.
40  
    removed from the list by set() before its await_resume runs.
41  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    This requires a separate in_list_ flag (unlike async_mutex where
42  
    active_ served double duty). await_resume only calls remove()
42  
    active_ served double duty). await_resume only calls remove()
43  
    when in_list_ is true.
43  
    when in_list_ is true.
44  
*/
44  
*/
45  

45  

46  
namespace boost {
46  
namespace boost {
47  
namespace capy {
47  
namespace capy {
48  

48  

49  
/** An asynchronous event for coroutines.
49  
/** An asynchronous event for coroutines.
50  

50  

51  
    This event provides a way to notify multiple coroutines that some
51  
    This event provides a way to notify multiple coroutines that some
52  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    condition has occurred. When a coroutine awaits an unset event, it
53  
    suspends and is added to a wait queue. When the event is set, all
53  
    suspends and is added to a wait queue. When the event is set, all
54  
    waiting coroutines are resumed.
54  
    waiting coroutines are resumed.
55  

55  

56  
    @par Cancellation
56  
    @par Cancellation
57  

57  

58  
    When a coroutine is suspended waiting for the event and its stop
58  
    When a coroutine is suspended waiting for the event and its stop
59  
    token is triggered, the waiter completes with `error::canceled`
59  
    token is triggered, the waiter completes with `error::canceled`
60  
    instead of waiting for `set()`.
60  
    instead of waiting for `set()`.
61  

61  

62  
    Cancellation only applies while the coroutine is suspended in the
62  
    Cancellation only applies while the coroutine is suspended in the
63  
    wait queue. If the event is already set when `wait()` is called,
63  
    wait queue. If the event is already set when `wait()` is called,
64  
    the wait completes immediately even if the stop token is already
64  
    the wait completes immediately even if the stop token is already
65  
    signaled.
65  
    signaled.
66  

66  

67  
    @par Zero Allocation
67  
    @par Zero Allocation
68  

68  

69  
    No heap allocation occurs for wait operations.
69  
    No heap allocation occurs for wait operations.
70  

70  

71  
    @par Thread Safety
71  
    @par Thread Safety
72  

72  

73  
    The event operations are designed for single-threaded use on one
73  
    The event operations are designed for single-threaded use on one
74  
    executor. The stop callback may fire from any thread.
74  
    executor. The stop callback may fire from any thread.
75  

75  

76  
    @par Example
76  
    @par Example
77  
    @code
77  
    @code
78  
    async_event event;
78  
    async_event event;
79  

79  

80  
    task<> waiter() {
80  
    task<> waiter() {
81  
        auto [ec] = co_await event.wait();
81  
        auto [ec] = co_await event.wait();
82  
        if(ec)
82  
        if(ec)
83  
            co_return;
83  
            co_return;
84  
        // ... event was set ...
84  
        // ... event was set ...
85  
    }
85  
    }
86  

86  

87  
    task<> notifier() {
87  
    task<> notifier() {
88  
        // ... do some work ...
88  
        // ... do some work ...
89  
        event.set();  // Wake all waiters
89  
        event.set();  // Wake all waiters
90  
    }
90  
    }
91  
    @endcode
91  
    @endcode
92  
*/
92  
*/
93  
class async_event
93  
class async_event
94  
{
94  
{
95  
public:
95  
public:
96  
    class wait_awaiter;
96  
    class wait_awaiter;
97  

97  

98  
private:
98  
private:
99  
    bool set_ = false;
99  
    bool set_ = false;
100  
    detail::intrusive_list<wait_awaiter> waiters_;
100  
    detail::intrusive_list<wait_awaiter> waiters_;
101  

101  

102  
public:
102  
public:
103  
    /** Awaiter returned by wait().
103  
    /** Awaiter returned by wait().
104  
    */
104  
    */
105  
    class wait_awaiter
105  
    class wait_awaiter
106  
        : public detail::intrusive_list<wait_awaiter>::node
106  
        : public detail::intrusive_list<wait_awaiter>::node
107  
    {
107  
    {
108  
        friend class async_event;
108  
        friend class async_event;
109  

109  

110  
        async_event* e_;
110  
        async_event* e_;
111  
        std::coroutine_handle<> h_;
111  
        std::coroutine_handle<> h_;
112  
        executor_ref ex_;
112  
        executor_ref ex_;
113  

113  

114  
        // Declared before stop_cb_buf_: the callback
114  
        // Declared before stop_cb_buf_: the callback
115  
        // accesses these members, so they must still be
115  
        // accesses these members, so they must still be
116  
        // alive if the stop_cb_ destructor blocks.
116  
        // alive if the stop_cb_ destructor blocks.
117  
        std::atomic<bool> claimed_{false};
117  
        std::atomic<bool> claimed_{false};
118  
        bool canceled_ = false;
118  
        bool canceled_ = false;
119  
        bool active_ = false;
119  
        bool active_ = false;
120  
        bool in_list_ = false;
120  
        bool in_list_ = false;
121  

121  

122  
        struct cancel_fn
122  
        struct cancel_fn
123  
        {
123  
        {
124  
            wait_awaiter* self_;
124  
            wait_awaiter* self_;
125  

125  

126  
            void operator()() const noexcept
126  
            void operator()() const noexcept
127  
            {
127  
            {
128  
                if(!self_->claimed_.exchange(
128  
                if(!self_->claimed_.exchange(
129  
                    true, std::memory_order_acq_rel))
129  
                    true, std::memory_order_acq_rel))
130  
                {
130  
                {
131  
                    self_->canceled_ = true;
131  
                    self_->canceled_ = true;
132  
                    self_->ex_.dispatch(self_->h_);
132  
                    self_->ex_.dispatch(self_->h_);
133  
                }
133  
                }
134  
            }
134  
            }
135  
        };
135  
        };
136  

136  

137  
        using stop_cb_t =
137  
        using stop_cb_t =
138  
            std::stop_callback<cancel_fn>;
138  
            std::stop_callback<cancel_fn>;
139  

139  

140  
        // Aligned storage for stop_cb_t. Declared last:
140  
        // Aligned storage for stop_cb_t. Declared last:
141  
        // its destructor may block while the callback
141  
        // its destructor may block while the callback
142  
        // accesses the members above.
142  
        // accesses the members above.
143  
#ifdef _MSC_VER
143  
#ifdef _MSC_VER
144  
# pragma warning(push)
144  
# pragma warning(push)
145  
# pragma warning(disable: 4324) // padded due to alignas
145  
# pragma warning(disable: 4324) // padded due to alignas
146  
#endif
146  
#endif
147  
        alignas(stop_cb_t)
147  
        alignas(stop_cb_t)
148  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
148  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
149  
#ifdef _MSC_VER
149  
#ifdef _MSC_VER
150  
# pragma warning(pop)
150  
# pragma warning(pop)
151  
#endif
151  
#endif
152  

152  

153  
        stop_cb_t& stop_cb_() noexcept
153  
        stop_cb_t& stop_cb_() noexcept
154  
        {
154  
        {
155  
            return *reinterpret_cast<stop_cb_t*>(
155  
            return *reinterpret_cast<stop_cb_t*>(
156  
                stop_cb_buf_);
156  
                stop_cb_buf_);
157  
        }
157  
        }
158  

158  

159  
    public:
159  
    public:
160  
        ~wait_awaiter()
160  
        ~wait_awaiter()
161  
        {
161  
        {
162  
            if(active_)
162  
            if(active_)
163  
                stop_cb_().~stop_cb_t();
163  
                stop_cb_().~stop_cb_t();
164  
            if(in_list_)
164  
            if(in_list_)
165  
                e_->waiters_.remove(this);
165  
                e_->waiters_.remove(this);
166  
        }
166  
        }
167  

167  

168  
        explicit wait_awaiter(async_event* e) noexcept
168  
        explicit wait_awaiter(async_event* e) noexcept
169  
            : e_(e)
169  
            : e_(e)
170  
        {
170  
        {
171  
        }
171  
        }
172  

172  

173  
        wait_awaiter(wait_awaiter&& o) noexcept
173  
        wait_awaiter(wait_awaiter&& o) noexcept
174  
            : e_(o.e_)
174  
            : e_(o.e_)
175  
            , h_(o.h_)
175  
            , h_(o.h_)
176  
            , ex_(o.ex_)
176  
            , ex_(o.ex_)
177  
            , claimed_(o.claimed_.load(
177  
            , claimed_(o.claimed_.load(
178  
                std::memory_order_relaxed))
178  
                std::memory_order_relaxed))
179  
            , canceled_(o.canceled_)
179  
            , canceled_(o.canceled_)
180  
            , active_(std::exchange(o.active_, false))
180  
            , active_(std::exchange(o.active_, false))
181  
            , in_list_(std::exchange(o.in_list_, false))
181  
            , in_list_(std::exchange(o.in_list_, false))
182  
        {
182  
        {
183  
        }
183  
        }
184  

184  

185  
        wait_awaiter(wait_awaiter const&) = delete;
185  
        wait_awaiter(wait_awaiter const&) = delete;
186  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
186  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
187  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
187  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
188  

188  

189  
        bool await_ready() const noexcept
189  
        bool await_ready() const noexcept
190  
        {
190  
        {
191  
            return e_->set_;
191  
            return e_->set_;
192  
        }
192  
        }
193  

193  

194  
        /** IoAwaitable protocol overload. */
194  
        /** IoAwaitable protocol overload. */
195  
        std::coroutine_handle<>
195  
        std::coroutine_handle<>
196  
        await_suspend(
196  
        await_suspend(
197  
            std::coroutine_handle<> h,
197  
            std::coroutine_handle<> h,
198  
            executor_ref ex,
198  
            executor_ref ex,
199  
            std::stop_token token) noexcept
199  
            std::stop_token token) noexcept
200  
        {
200  
        {
201  
            if(token.stop_requested())
201  
            if(token.stop_requested())
202  
            {
202  
            {
203  
                canceled_ = true;
203  
                canceled_ = true;
204  
                return h;
204  
                return h;
205  
            }
205  
            }
206  
            h_ = h;
206  
            h_ = h;
207  
            ex_ = ex;
207  
            ex_ = ex;
208  
            e_->waiters_.push_back(this);
208  
            e_->waiters_.push_back(this);
209  
            in_list_ = true;
209  
            in_list_ = true;
210  
            ::new(stop_cb_buf_) stop_cb_t(
210  
            ::new(stop_cb_buf_) stop_cb_t(
211  
                token, cancel_fn{this});
211  
                token, cancel_fn{this});
212  
            active_ = true;
212  
            active_ = true;
213  
            return std::noop_coroutine();
213  
            return std::noop_coroutine();
214  
        }
214  
        }
215  

215  

216  
        io_result<> await_resume() noexcept
216  
        io_result<> await_resume() noexcept
217  
        {
217  
        {
218  
            if(active_)
218  
            if(active_)
219  
            {
219  
            {
220  
                stop_cb_().~stop_cb_t();
220  
                stop_cb_().~stop_cb_t();
221  
                active_ = false;
221  
                active_ = false;
222  
            }
222  
            }
223  
            if(canceled_)
223  
            if(canceled_)
224  
            {
224  
            {
225  
                if(in_list_)
225  
                if(in_list_)
226  
                {
226  
                {
227  
                    e_->waiters_.remove(this);
227  
                    e_->waiters_.remove(this);
228  
                    in_list_ = false;
228  
                    in_list_ = false;
229  
                }
229  
                }
230  
                return {make_error_code(
230  
                return {make_error_code(
231  
                    error::canceled)};
231  
                    error::canceled)};
232  
            }
232  
            }
233  
            return {{}};
233  
            return {{}};
234  
        }
234  
        }
235  
    };
235  
    };
236  

236  

237  
    async_event() = default;
237  
    async_event() = default;
238  

238  

239  
    // Non-copyable, non-movable
239  
    // Non-copyable, non-movable
240  
    async_event(async_event const&) = delete;
240  
    async_event(async_event const&) = delete;
241  
    async_event& operator=(async_event const&) = delete;
241  
    async_event& operator=(async_event const&) = delete;
242  

242  

243  
    /** Returns an awaiter that waits until the event is set.
243  
    /** Returns an awaiter that waits until the event is set.
244  

244  

245  
        If the event is already set, completes immediately.
245  
        If the event is already set, completes immediately.
246  

246  

247  
        @return An awaitable yielding `(error_code)`.
247  
        @return An awaitable yielding `(error_code)`.
248  
    */
248  
    */
249  
    wait_awaiter wait() noexcept
249  
    wait_awaiter wait() noexcept
250  
    {
250  
    {
251  
        return wait_awaiter{this};
251  
        return wait_awaiter{this};
252  
    }
252  
    }
253  

253  

254  
    /** Sets the event.
254  
    /** Sets the event.
255  

255  

256  
        All waiting coroutines are resumed. Canceled waiters
256  
        All waiting coroutines are resumed. Canceled waiters
257  
        are skipped. Subsequent calls to wait() complete
257  
        are skipped. Subsequent calls to wait() complete
258  
        immediately until clear() is called.
258  
        immediately until clear() is called.
259  
    */
259  
    */
260  
    void set() noexcept
260  
    void set() noexcept
261  
    {
261  
    {
262  
        set_ = true;
262  
        set_ = true;
263  
        for(;;)
263  
        for(;;)
264  
        {
264  
        {
265  
            auto* w = waiters_.pop_front();
265  
            auto* w = waiters_.pop_front();
266  
            if(!w)
266  
            if(!w)
267  
                break;
267  
                break;
268  
            w->in_list_ = false;
268  
            w->in_list_ = false;
269  
            if(!w->claimed_.exchange(
269  
            if(!w->claimed_.exchange(
270  
                true, std::memory_order_acq_rel))
270  
                true, std::memory_order_acq_rel))
271  
            {
271  
            {
272  
                w->ex_.dispatch(w->h_);
272  
                w->ex_.dispatch(w->h_);
273  
            }
273  
            }
274  
        }
274  
        }
275  
    }
275  
    }
276  

276  

277  
    /** Clears the event.
277  
    /** Clears the event.
278  

278  

279  
        Subsequent calls to wait() will suspend until
279  
        Subsequent calls to wait() will suspend until
280  
        set() is called again.
280  
        set() is called again.
281  
    */
281  
    */
282  
    void clear() noexcept
282  
    void clear() noexcept
283  
    {
283  
    {
284  
        set_ = false;
284  
        set_ = false;
285  
    }
285  
    }
286  

286  

287  
    /** Returns true if the event is currently set.
287  
    /** Returns true if the event is currently set.
288  
    */
288  
    */
289  
    bool is_set() const noexcept
289  
    bool is_set() const noexcept
290  
    {
290  
    {
291  
        return set_;
291  
        return set_;
292  
    }
292  
    }
293  
};
293  
};
294  

294  

295  
} // namespace capy
295  
} // namespace capy
296  
} // namespace boost
296  
} // namespace boost
297  

297  

298  
#endif
298  
#endif