LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 99.3 % 145 144
Test Date: 2026-02-10 18:54:58 Functions: 93.3 % 519 484

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Michael Vandeberg
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11              : #define BOOST_CAPY_WHEN_ANY_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/concept/executor.hpp>
      15              : #include <boost/capy/concept/io_awaitable.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <boost/capy/ex/frame_allocator.hpp>
      19              : #include <boost/capy/task.hpp>
      20              : 
      21              : #include <array>
      22              : #include <atomic>
      23              : #include <exception>
      24              : #include <optional>
      25              : #include <ranges>
      26              : #include <stdexcept>
      27              : #include <stop_token>
      28              : #include <tuple>
      29              : #include <type_traits>
      30              : #include <utility>
      31              : #include <variant>
      32              : #include <vector>
      33              : 
      34              : /*
      35              :    when_any - Race multiple tasks, return first completion
      36              :    ========================================================
      37              : 
      38              :    OVERVIEW:
      39              :    ---------
      40              :    when_any launches N tasks concurrently and completes when the FIRST task
      41              :    finishes (success or failure). It then requests stop for all siblings and
      42              :    waits for them to acknowledge before returning.
      43              : 
      44              :    ARCHITECTURE:
      45              :    -------------
      46              :    The design mirrors when_all but with inverted completion semantics:
      47              : 
      48              :      when_all:  complete when remaining_count reaches 0 (all done)
      49              :      when_any:  complete when has_winner becomes true (first done)
      50              :                 BUT still wait for remaining_count to reach 0 for cleanup
      51              : 
      52              :    Key components:
      53              :      - when_any_state:    Shared state tracking winner and completion
      54              :      - when_any_runner:   Wrapper coroutine for each child task
      55              :      - when_any_launcher: Awaitable that starts all runners concurrently
      56              : 
      57              :    CRITICAL INVARIANTS:
      58              :    --------------------
      59              :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      60              :    2. All tasks must complete before parent resumes (cleanup safety)
      61              :    3. Stop is requested immediately when winner is determined
      62              :    4. Only the winner's result/exception is stored
      63              : 
      64              :    TYPE DEDUPLICATION:
      65              :    -------------------
      66              :    std::variant requires unique alternative types. Since when_any can race
      67              :    tasks with identical return types (e.g., three task<int>), we must
      68              :    deduplicate types before constructing the variant.
      69              : 
      70              :    Example: when_any(task<int>, task<string>, task<int>)
      71              :      - Raw types after void->monostate: int, string, int
      72              :      - Deduplicated variant: std::variant<int, string>
      73              :      - Return: pair<size_t, variant<int, string>>
      74              : 
      75              :    The winner_index tells you which task won (0, 1, or 2), while the variant
      76              :    holds the result. Use the index to determine how to interpret the variant.
      77              : 
      78              :    VOID HANDLING:
      79              :    --------------
      80              :    void tasks contribute std::monostate to the variant (then deduplicated).
      81              :    All-void tasks result in: pair<size_t, variant<monostate>>
      82              : 
      83              :    MEMORY MODEL:
      84              :    -------------
      85              :    Synchronization chain from winner's write to parent's read:
      86              : 
      87              :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      88              :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      89              :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      90              :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      91              :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      92              :    5. Parent coroutine resumes and reads result_/winner_exception_
      93              : 
      94              :    Synchronization analysis:
      95              :    - All fetch_sub operations on remaining_count_ form a release sequence
      96              :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      97              :      in the modification order of remaining_count_
      98              :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      99              :      modification order, establishing happens-before from winner's writes
     100              :    - Executor dispatch() is expected to provide queue-based synchronization
     101              :      (release-on-post, acquire-on-execute) completing the chain to parent
     102              :    - Even inline executors work (same thread = sequenced-before)
     103              : 
     104              :    Alternative considered: Adding winner_ready_ atomic (set with release after
     105              :    storing winner data, acquired before reading) would make synchronization
     106              :    self-contained and not rely on executor implementation details. Current
     107              :    approach is correct but requires careful reasoning about release sequences
     108              :    and executor behavior.
     109              : 
     110              :    EXCEPTION SEMANTICS:
     111              :    --------------------
     112              :    Unlike when_all (which captures first exception, discards others), when_any
     113              :    treats exceptions as valid completions. If the winning task threw, that
     114              :    exception is rethrown. Exceptions from non-winners are silently discarded.
     115              : */
     116              : 
     117              : namespace boost {
     118              : namespace capy {
     119              : 
     120              : namespace detail {
     121              : 
     122              : /** Convert void to monostate for variant storage.
     123              : 
     124              :     std::variant<void, ...> is ill-formed, so void tasks contribute
     125              :     std::monostate to the result variant instead. Non-void types
     126              :     pass through unchanged.
     127              : 
     128              :     @tparam T The type to potentially convert (void becomes monostate).
     129              : */
     130              : template<typename T>
     131              : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     132              : 
     133              : // Type deduplication: std::variant requires unique alternative types.
     134              : // Fold left over the type list, appending each type only if not already present.
     135              : template<typename Variant, typename T>
     136              : struct variant_append_if_unique;
     137              : 
     138              : template<typename... Vs, typename T>
     139              : struct variant_append_if_unique<std::variant<Vs...>, T>
     140              : {
     141              :     using type = std::conditional_t<
     142              :         (std::is_same_v<T, Vs> || ...),
     143              :         std::variant<Vs...>,
     144              :         std::variant<Vs..., T>>;
     145              : };
     146              : 
     147              : template<typename Accumulated, typename... Remaining>
     148              : struct deduplicate_impl;
     149              : 
     150              : template<typename Accumulated>
     151              : struct deduplicate_impl<Accumulated>
     152              : {
     153              :     using type = Accumulated;
     154              : };
     155              : 
     156              : template<typename Accumulated, typename T, typename... Rest>
     157              : struct deduplicate_impl<Accumulated, T, Rest...>
     158              : {
     159              :     using next = typename variant_append_if_unique<Accumulated, T>::type;
     160              :     using type = typename deduplicate_impl<next, Rest...>::type;
     161              : };
     162              : 
     163              : // Deduplicated variant; void types become monostate before deduplication
     164              : template<typename T0, typename... Ts>
     165              : using unique_variant_t = typename deduplicate_impl<
     166              :     std::variant<void_to_monostate_t<T0>>,
     167              :     void_to_monostate_t<Ts>...>::type;
     168              : 
     169              : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
     170              : // when multiple tasks share the same return type.
     171              : template<typename T0, typename... Ts>
     172              : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
     173              : 
     174              : /** Core shared state for when_any operations.
     175              : 
     176              :     Contains all members and methods common to both heterogeneous (variadic)
     177              :     and homogeneous (range) when_any implementations. State classes embed
     178              :     this via composition to avoid CRTP destructor ordering issues.
     179              : 
     180              :     @par Thread Safety
     181              :     Atomic operations protect winner selection and completion count.
     182              : */
     183              : struct when_any_core
     184              : {
     185              :     std::atomic<std::size_t> remaining_count_;
     186              :     std::size_t winner_index_{0};
     187              :     std::exception_ptr winner_exception_;
     188              :     std::stop_source stop_source_;
     189              : 
     190              :     // Bridges parent's stop token to our stop_source
     191              :     struct stop_callback_fn
     192              :     {
     193              :         std::stop_source* source_;
     194            9 :         void operator()() const noexcept { source_->request_stop(); }
     195              :     };
     196              :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     197              :     std::optional<stop_callback_t> parent_stop_callback_;
     198              : 
     199              :     coro continuation_;
     200              :     executor_ref caller_ex_;
     201              : 
     202              :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     203              :     std::atomic<bool> has_winner_{false};
     204              : 
     205           65 :     explicit when_any_core(std::size_t count) noexcept
     206           65 :         : remaining_count_(count)
     207              :     {
     208           65 :     }
     209              : 
     210              :     /** Atomically claim winner status; exactly one task succeeds. */
     211          190 :     bool try_win(std::size_t index) noexcept
     212              :     {
     213          190 :         bool expected = false;
     214          190 :         if(has_winner_.compare_exchange_strong(
     215              :             expected, true, std::memory_order_acq_rel))
     216              :         {
     217           65 :             winner_index_ = index;
     218           65 :             stop_source_.request_stop();
     219           65 :             return true;
     220              :         }
     221          125 :         return false;
     222              :     }
     223              : 
     224              :     /** @pre try_win() returned true. */
     225            8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     226              :     {
     227            8 :         winner_exception_ = ep;
     228            8 :     }
     229              : 
     230              :     // Runners signal completion directly via final_suspend; no member function needed.
     231              : };
     232              : 
     233              : /** Shared state for heterogeneous when_any operation.
     234              : 
     235              :     Coordinates winner selection, result storage, and completion tracking
     236              :     for all child tasks in a when_any operation. Uses composition with
     237              :     when_any_core for shared functionality.
     238              : 
     239              :     @par Lifetime
     240              :     Allocated on the parent coroutine's frame, outlives all runners.
     241              : 
     242              :     @tparam T0 First task's result type.
     243              :     @tparam Ts Remaining tasks' result types.
     244              : */
     245              : template<typename T0, typename... Ts>
     246              : struct when_any_state
     247              : {
     248              :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     249              :     using variant_type = unique_variant_t<T0, Ts...>;
     250              : 
     251              :     when_any_core core_;
     252              :     std::optional<variant_type> result_;
     253              :     std::array<coro, task_count> runner_handles_{};
     254              : 
     255           43 :     when_any_state()
     256           43 :         : core_(task_count)
     257              :     {
     258           43 :     }
     259              : 
     260              :     // Runners self-destruct in final_suspend. No destruction needed here.
     261              : 
     262              :     /** @pre core_.try_win() returned true.
     263              :         @note Uses in_place_type (not index) because variant is deduplicated.
     264              :     */
     265              :     template<typename T>
     266           35 :     void set_winner_result(T value)
     267              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     268              :     {
     269           35 :         result_.emplace(std::in_place_type<T>, std::move(value));
     270           35 :     }
     271              : 
     272              :     /** @pre core_.try_win() returned true. */
     273            3 :     void set_winner_void() noexcept
     274              :     {
     275            3 :         result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
     276            3 :     }
     277              : };
     278              : 
     279              : /** Wrapper coroutine that runs a single child task for when_any.
     280              : 
     281              :     Propagates executor/stop_token to the child, attempts to claim winner
     282              :     status on completion, and signals completion for cleanup coordination.
     283              : 
     284              :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     285              : */
     286              : template<typename StateType>
     287              : struct when_any_runner
     288              : {
     289              :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     290              :     {
     291              :         StateType* state_ = nullptr;
     292              :         std::size_t index_ = 0;
     293              :         executor_ref ex_;
     294              :         std::stop_token stop_token_;
     295              : 
     296          190 :         when_any_runner get_return_object() noexcept
     297              :         {
     298          190 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     299              :         }
     300              : 
     301              :         // Starts suspended; launcher sets up state/ex/token then resumes
     302          190 :         std::suspend_always initial_suspend() noexcept
     303              :         {
     304          190 :             return {};
     305              :         }
     306              : 
     307          190 :         auto final_suspend() noexcept
     308              :         {
     309              :             struct awaiter
     310              :             {
     311              :                 promise_type* p_;
     312          190 :                 bool await_ready() const noexcept { return false; }
     313          190 :                 void await_suspend(coro h) noexcept
     314              :                 {
     315              :                     // Extract everything needed for signaling before
     316              :                     // self-destruction. Inline dispatch may destroy
     317              :                     // state, so we can't access members after.
     318          190 :                     auto& core = p_->state_->core_;
     319          190 :                     auto* counter = &core.remaining_count_;
     320          190 :                     auto caller_ex = core.caller_ex_;
     321          190 :                     auto cont = core.continuation_;
     322              : 
     323              :                     // Self-destruct first - state no longer destroys runners
     324          190 :                     h.destroy();
     325              : 
     326              :                     // Signal completion. If last, dispatch parent.
     327              :                     // Uses only local copies - safe even if state
     328              :                     // is destroyed during inline dispatch.
     329          190 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     330          190 :                     if(remaining == 1)
     331           65 :                         caller_ex.dispatch(cont);
     332          190 :                 }
     333            0 :                 void await_resume() const noexcept {}
     334              :             };
     335          190 :             return awaiter{this};
     336              :         }
     337              : 
     338          178 :         void return_void() noexcept {}
     339              : 
     340              :         // Exceptions are valid completions in when_any (unlike when_all)
     341           12 :         void unhandled_exception()
     342              :         {
     343           12 :             if(state_->core_.try_win(index_))
     344            8 :                 state_->core_.set_winner_exception(std::current_exception());
     345           12 :         }
     346              : 
     347              :         /** Injects executor and stop token into child awaitables. */
     348              :         template<class Awaitable>
     349              :         struct transform_awaiter
     350              :         {
     351              :             std::decay_t<Awaitable> a_;
     352              :             promise_type* p_;
     353              : 
     354          190 :             bool await_ready() { return a_.await_ready(); }
     355          190 :             auto await_resume() { return a_.await_resume(); }
     356              : 
     357              :             template<class Promise>
     358          185 :             auto await_suspend(std::coroutine_handle<Promise> h)
     359              :             {
     360          185 :                 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
     361              :             }
     362              :         };
     363              : 
     364              :         template<class Awaitable>
     365          190 :         auto await_transform(Awaitable&& a)
     366              :         {
     367              :             using A = std::decay_t<Awaitable>;
     368              :             if constexpr (IoAwaitable<A>)
     369              :             {
     370              :                 return transform_awaiter<Awaitable>{
     371          380 :                     std::forward<Awaitable>(a), this};
     372              :             }
     373              :             else
     374              :             {
     375              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     376              :             }
     377          190 :         }
     378              :     };
     379              : 
     380              :     std::coroutine_handle<promise_type> h_;
     381              : 
     382          190 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     383          190 :         : h_(h)
     384              :     {
     385          190 :     }
     386              : 
     387              :     // Enable move for all clang versions - some versions need it
     388              :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     389              : 
     390              :     // Non-copyable
     391              :     when_any_runner(when_any_runner const&) = delete;
     392              :     when_any_runner& operator=(when_any_runner const&) = delete;
     393              :     when_any_runner& operator=(when_any_runner&&) = delete;
     394              : 
     395          190 :     auto release() noexcept
     396              :     {
     397          190 :         return std::exchange(h_, nullptr);
     398              :     }
     399              : };
     400              : 
     401              : /** Wraps a child awaitable, attempts to claim winner on completion.
     402              : 
     403              :     Uses requires-expressions to detect state capabilities:
     404              :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     405              :     - set_winner_result(): for non-void tasks
     406              :     - Neither: for homogeneous void tasks (no result storage)
     407              : */
     408              : template<IoAwaitable Awaitable, typename StateType>
     409              : when_any_runner<StateType>
     410          190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     411              : {
     412              :     using T = awaitable_result_t<Awaitable>;
     413              :     if constexpr (std::is_void_v<T>)
     414              :     {
     415              :         co_await std::move(inner);
     416              :         if(state->core_.try_win(index))
     417              :         {
     418              :             // Heterogeneous void tasks store monostate in the variant
     419              :             if constexpr (requires { state->set_winner_void(); })
     420              :                 state->set_winner_void();
     421              :             // Homogeneous void tasks have no result to store
     422              :         }
     423              :     }
     424              :     else
     425              :     {
     426              :         auto result = co_await std::move(inner);
     427              :         if(state->core_.try_win(index))
     428              :         {
     429              :             // Defensive: move should not throw (already moved once), but we
     430              :             // catch just in case since an uncaught exception would be devastating.
     431              :             try
     432              :             {
     433              :                 state->set_winner_result(std::move(result));
     434              :             }
     435              :             catch(...)
     436              :             {
     437              :                 state->core_.set_winner_exception(std::current_exception());
     438              :             }
     439              :         }
     440              :     }
     441          380 : }
     442              : 
     443              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     444              : template<IoAwaitable... Awaitables>
     445              : class when_any_launcher
     446              : {
     447              :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     448              : 
     449              :     std::tuple<Awaitables...>* tasks_;
     450              :     state_type* state_;
     451              : 
     452              : public:
     453           43 :     when_any_launcher(
     454              :         std::tuple<Awaitables...>* tasks,
     455              :         state_type* state)
     456           43 :         : tasks_(tasks)
     457           43 :         , state_(state)
     458              :     {
     459           43 :     }
     460              : 
     461           43 :     bool await_ready() const noexcept
     462              :     {
     463           43 :         return sizeof...(Awaitables) == 0;
     464              :     }
     465              : 
     466              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     467              :         destroys this object before await_suspend returns. Must not reference
     468              :         `this` after the final launch_one call.
     469              :     */
     470              :     template<Executor Ex>
     471           43 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     472              :     {
     473           43 :         state_->core_.continuation_ = continuation;
     474           43 :         state_->core_.caller_ex_ = caller_ex;
     475              : 
     476           43 :         if(parent_token.stop_possible())
     477              :         {
     478           18 :             state_->core_.parent_stop_callback_.emplace(
     479              :                 parent_token,
     480            9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     481              : 
     482            9 :             if(parent_token.stop_requested())
     483            3 :                 state_->core_.stop_source_.request_stop();
     484              :         }
     485              : 
     486           43 :         auto token = state_->core_.stop_source_.get_token();
     487           86 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     488           43 :             (..., launch_one<Is>(caller_ex, token));
     489           43 :         }(std::index_sequence_for<Awaitables...>{});
     490              : 
     491           86 :         return std::noop_coroutine();
     492           43 :     }
     493              : 
     494           43 :     void await_resume() const noexcept
     495              :     {
     496           43 :     }
     497              : 
     498              : private:
     499              :     /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
     500              :     template<std::size_t I, Executor Ex>
     501          105 :     void launch_one(Ex const& caller_ex, std::stop_token token)
     502              :     {
     503          105 :         auto runner = make_when_any_runner(
     504          105 :             std::move(std::get<I>(*tasks_)), state_, I);
     505              : 
     506          105 :         auto h = runner.release();
     507          105 :         h.promise().state_ = state_;
     508          105 :         h.promise().index_ = I;
     509          105 :         h.promise().ex_ = caller_ex;
     510          105 :         h.promise().stop_token_ = token;
     511              : 
     512          105 :         coro ch{h};
     513          105 :         state_->runner_handles_[I] = ch;
     514          105 :         caller_ex.dispatch(ch);
     515          105 :     }
     516              : };
     517              : 
     518              : } // namespace detail
     519              : 
     520              : /** Wait for the first awaitable to complete.
     521              : 
     522              :     Races multiple heterogeneous awaitables concurrently and returns when the
     523              :     first one completes. The result includes the winner's index and a
     524              :     deduplicated variant containing the result value.
     525              : 
     526              :     @par Suspends
     527              :     The calling coroutine suspends when co_await is invoked. All awaitables
     528              :     are launched concurrently and execute in parallel. The coroutine resumes
     529              :     only after all awaitables have completed, even though the winner is
     530              :     determined by the first to finish.
     531              : 
     532              :     @par Completion Conditions
     533              :     @li Winner is determined when the first awaitable completes (success or exception)
     534              :     @li Only one task can claim winner status via atomic compare-exchange
     535              :     @li Once a winner exists, stop is requested for all remaining siblings
     536              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     537              :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     538              : 
     539              :     @par Cancellation Semantics
     540              :     Cancellation is supported via stop_token propagated through the
     541              :     IoAwaitable protocol:
     542              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     543              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     544              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     545              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     546              :     @li Stop requests are cooperative; tasks must check and respond to them
     547              : 
     548              :     @par Concurrency/Overlap
     549              :     All awaitables are launched concurrently before any can complete.
     550              :     The launcher iterates through the arguments, starting each task on the
     551              :     caller's executor. Tasks may execute in parallel on multi-threaded
     552              :     executors or interleave on single-threaded executors. There is no
     553              :     guaranteed ordering of task completion.
     554              : 
     555              :     @par Notable Error Conditions
     556              :     @li Winner exception: if the winning task threw, that exception is rethrown
     557              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     558              :     @li Cancellation: tasks may complete via cancellation without throwing
     559              : 
     560              :     @par Example
     561              :     @code
     562              :     task<void> example() {
     563              :         auto [index, result] = co_await when_any(
     564              :             fetch_from_primary(),   // task<Response>
     565              :             fetch_from_backup()     // task<Response>
     566              :         );
     567              :         // index is 0 or 1, result holds the winner's Response
     568              :         auto response = std::get<Response>(result);
     569              :     }
     570              :     @endcode
     571              : 
     572              :     @par Example with Heterogeneous Types
     573              :     @code
     574              :     task<void> mixed_types() {
     575              :         auto [index, result] = co_await when_any(
     576              :             fetch_int(),      // task<int>
     577              :             fetch_string()    // task<std::string>
     578              :         );
     579              :         if (index == 0)
     580              :             std::cout << "Got int: " << std::get<int>(result) << "\n";
     581              :         else
     582              :             std::cout << "Got string: " << std::get<std::string>(result) << "\n";
     583              :     }
     584              :     @endcode
     585              : 
     586              :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     587              :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     588              :     @param a0 The first awaitable to race.
     589              :     @param as Additional awaitables to race concurrently.
     590              :     @return A task yielding a pair of (winner_index, result_variant).
     591              : 
     592              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     593              : 
     594              :     @par Remarks
     595              :     Awaitables are moved into the coroutine frame; original objects become
     596              :     empty after the call. When multiple awaitables share the same return type,
     597              :     the variant is deduplicated to contain only unique types. Use the winner
     598              :     index to determine which awaitable completed first. Void awaitables
     599              :     contribute std::monostate to the variant.
     600              : 
     601              :     @see when_all, IoAwaitable
     602              : */
     603              : template<IoAwaitable A0, IoAwaitable... As>
     604           43 : [[nodiscard]] auto when_any(A0 a0, As... as)
     605              :     -> task<detail::when_any_result_t<
     606              :         detail::awaitable_result_t<A0>,
     607              :         detail::awaitable_result_t<As>...>>
     608              : {
     609              :     using result_type = detail::when_any_result_t<
     610              :         detail::awaitable_result_t<A0>,
     611              :         detail::awaitable_result_t<As>...>;
     612              : 
     613              :     detail::when_any_state<
     614              :         detail::awaitable_result_t<A0>,
     615              :         detail::awaitable_result_t<As>...> state;
     616              :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     617              : 
     618              :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     619              : 
     620              :     if(state.core_.winner_exception_)
     621              :         std::rethrow_exception(state.core_.winner_exception_);
     622              : 
     623              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     624           86 : }
     625              : 
     626              : /** Concept for ranges of full I/O awaitables.
     627              : 
     628              :     A range satisfies `IoAwaitableRange` if it is a sized input range
     629              :     whose value type satisfies @ref IoAwaitable. This enables when_any
     630              :     to accept any container or view of awaitables, not just std::vector.
     631              : 
     632              :     @tparam R The range type.
     633              : 
     634              :     @par Requirements
     635              :     @li `R` must satisfy `std::ranges::input_range`
     636              :     @li `R` must satisfy `std::ranges::sized_range`
     637              :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     638              : 
     639              :     @par Syntactic Requirements
     640              :     Given `r` of type `R`:
     641              :     @li `std::ranges::begin(r)` is valid
     642              :     @li `std::ranges::end(r)` is valid
     643              :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     644              :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     645              : 
     646              :     @par Example
     647              :     @code
     648              :     template<IoAwaitableRange R>
     649              :     task<void> race_all(R&& awaitables) {
     650              :         auto winner = co_await when_any(std::forward<R>(awaitables));
     651              :         // Process winner...
     652              :     }
     653              :     @endcode
     654              : 
     655              :     @see when_any, IoAwaitable
     656              : */
     657              : template<typename R>
     658              : concept IoAwaitableRange =
     659              :     std::ranges::input_range<R> &&
     660              :     std::ranges::sized_range<R> &&
     661              :     IoAwaitable<std::ranges::range_value_t<R>>;
     662              : 
     663              : namespace detail {
     664              : 
     665              : /** Shared state for homogeneous when_any (range overload).
     666              : 
     667              :     Uses composition with when_any_core for shared functionality.
     668              :     Simpler than heterogeneous: optional<T> instead of variant, vector
     669              :     instead of array for runner handles.
     670              : */
     671              : template<typename T>
     672              : struct when_any_homogeneous_state
     673              : {
     674              :     when_any_core core_;
     675              :     std::optional<T> result_;
     676              :     std::vector<coro> runner_handles_;
     677              : 
     678           19 :     explicit when_any_homogeneous_state(std::size_t count)
     679           19 :         : core_(count)
     680           38 :         , runner_handles_(count)
     681              :     {
     682           19 :     }
     683              : 
     684              :     // Runners self-destruct in final_suspend. No destruction needed here.
     685              : 
     686              :     /** @pre core_.try_win() returned true. */
     687           17 :     void set_winner_result(T value)
     688              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     689              :     {
     690           17 :         result_.emplace(std::move(value));
     691           17 :     }
     692              : };
     693              : 
     694              : /** Specialization for void tasks (no result storage needed). */
     695              : template<>
     696              : struct when_any_homogeneous_state<void>
     697              : {
     698              :     when_any_core core_;
     699              :     std::vector<coro> runner_handles_;
     700              : 
     701            3 :     explicit when_any_homogeneous_state(std::size_t count)
     702            3 :         : core_(count)
     703            6 :         , runner_handles_(count)
     704              :     {
     705            3 :     }
     706              : 
     707              :     // Runners self-destruct in final_suspend. No destruction needed here.
     708              : 
     709              :     // No set_winner_result - void tasks have no result to store
     710              : };
     711              : 
     712              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     713              : template<IoAwaitableRange Range>
     714              : class when_any_homogeneous_launcher
     715              : {
     716              :     using Awaitable = std::ranges::range_value_t<Range>;
     717              :     using T = awaitable_result_t<Awaitable>;
     718              : 
     719              :     Range* range_;
     720              :     when_any_homogeneous_state<T>* state_;
     721              : 
     722              : public:
     723           22 :     when_any_homogeneous_launcher(
     724              :         Range* range,
     725              :         when_any_homogeneous_state<T>* state)
     726           22 :         : range_(range)
     727           22 :         , state_(state)
     728              :     {
     729           22 :     }
     730              : 
     731           22 :     bool await_ready() const noexcept
     732              :     {
     733           22 :         return std::ranges::empty(*range_);
     734              :     }
     735              : 
     736              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     737              :         destroys this object before await_suspend returns. Must not reference
     738              :         `this` after dispatching begins.
     739              : 
     740              :         Two-phase approach:
     741              :         1. Create all runners (safe - no dispatch yet)
     742              :         2. Dispatch all runners (any may complete synchronously)
     743              :     */
     744              :     template<Executor Ex>
     745           22 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     746              :     {
     747           22 :         state_->core_.continuation_ = continuation;
     748           22 :         state_->core_.caller_ex_ = caller_ex;
     749              : 
     750           22 :         if(parent_token.stop_possible())
     751              :         {
     752           14 :             state_->core_.parent_stop_callback_.emplace(
     753              :                 parent_token,
     754            7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     755              : 
     756            7 :             if(parent_token.stop_requested())
     757            4 :                 state_->core_.stop_source_.request_stop();
     758              :         }
     759              : 
     760           22 :         auto token = state_->core_.stop_source_.get_token();
     761              : 
     762              :         // Phase 1: Create all runners without dispatching.
     763              :         // This iterates over *range_ safely because no runners execute yet.
     764           22 :         std::size_t index = 0;
     765          107 :         for(auto&& a : *range_)
     766              :         {
     767           85 :             auto runner = make_when_any_runner(
     768           85 :                 std::move(a), state_, index);
     769              : 
     770           85 :             auto h = runner.release();
     771           85 :             h.promise().state_ = state_;
     772           85 :             h.promise().index_ = index;
     773           85 :             h.promise().ex_ = caller_ex;
     774           85 :             h.promise().stop_token_ = token;
     775              : 
     776           85 :             state_->runner_handles_[index] = coro{h};
     777           85 :             ++index;
     778              :         }
     779              : 
     780              :         // Phase 2: Dispatch all runners. Any may complete synchronously.
     781              :         // After last dispatch, state_ and this may be destroyed.
     782              :         // Use raw pointer/count captured before dispatching.
     783           22 :         coro* handles = state_->runner_handles_.data();
     784           22 :         std::size_t count = state_->runner_handles_.size();
     785          107 :         for(std::size_t i = 0; i < count; ++i)
     786           85 :             caller_ex.dispatch(handles[i]);
     787              : 
     788           44 :         return std::noop_coroutine();
     789           22 :     }
     790              : 
     791           22 :     void await_resume() const noexcept
     792              :     {
     793           22 :     }
     794              : };
     795              : 
     796              : } // namespace detail
     797              : 
     798              : /** Wait for the first awaitable to complete (range overload).
     799              : 
     800              :     Races a range of awaitables with the same result type. Accepts any
     801              :     sized input range of IoAwaitable types, enabling use with arrays,
     802              :     spans, or custom containers.
     803              : 
     804              :     @par Suspends
     805              :     The calling coroutine suspends when co_await is invoked. All awaitables
     806              :     in the range are launched concurrently and execute in parallel. The
     807              :     coroutine resumes only after all awaitables have completed, even though
     808              :     the winner is determined by the first to finish.
     809              : 
     810              :     @par Completion Conditions
     811              :     @li Winner is determined when the first awaitable completes (success or exception)
     812              :     @li Only one task can claim winner status via atomic compare-exchange
     813              :     @li Once a winner exists, stop is requested for all remaining siblings
     814              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     815              :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     816              : 
     817              :     @par Cancellation Semantics
     818              :     Cancellation is supported via stop_token propagated through the
     819              :     IoAwaitable protocol:
     820              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     821              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     822              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     823              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     824              :     @li Stop requests are cooperative; tasks must check and respond to them
     825              : 
     826              :     @par Concurrency/Overlap
     827              :     All awaitables are launched concurrently before any can complete.
     828              :     The launcher iterates through the range, starting each task on the
     829              :     caller's executor. Tasks may execute in parallel on multi-threaded
     830              :     executors or interleave on single-threaded executors. There is no
     831              :     guaranteed ordering of task completion.
     832              : 
     833              :     @par Notable Error Conditions
     834              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     835              :     @li Winner exception: if the winning task threw, that exception is rethrown
     836              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     837              :     @li Cancellation: tasks may complete via cancellation without throwing
     838              : 
     839              :     @par Example
     840              :     @code
     841              :     task<void> example() {
     842              :         std::array<task<Response>, 3> requests = {
     843              :             fetch_from_server(0),
     844              :             fetch_from_server(1),
     845              :             fetch_from_server(2)
     846              :         };
     847              : 
     848              :         auto [index, response] = co_await when_any(std::move(requests));
     849              :     }
     850              :     @endcode
     851              : 
     852              :     @par Example with Vector
     853              :     @code
     854              :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     855              :         std::vector<task<Response>> requests;
     856              :         for (auto const& server : servers)
     857              :             requests.push_back(fetch_from(server));
     858              : 
     859              :         auto [index, response] = co_await when_any(std::move(requests));
     860              :         co_return response;
     861              :     }
     862              :     @endcode
     863              : 
     864              :     @tparam R Range type satisfying IoAwaitableRange.
     865              :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     866              :     @return A task yielding a pair of (winner_index, result).
     867              : 
     868              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     869              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     870              : 
     871              :     @par Remarks
     872              :     Elements are moved from the range; for lvalue ranges, the original
     873              :     container will have moved-from elements after this call. The range
     874              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     875              :     the variadic overload, no variant wrapper is needed since all tasks
     876              :     share the same return type.
     877              : 
     878              :     @see when_any, IoAwaitableRange
     879              : */
     880              : template<IoAwaitableRange R>
     881              :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     882           21 : [[nodiscard]] auto when_any(R&& awaitables)
     883              :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     884              : {
     885              :     using Awaitable = std::ranges::range_value_t<R>;
     886              :     using T = detail::awaitable_result_t<Awaitable>;
     887              :     using result_type = std::pair<std::size_t, T>;
     888              :     using OwnedRange = std::remove_cvref_t<R>;
     889              : 
     890              :     auto count = std::ranges::size(awaitables);
     891              :     if(count == 0)
     892              :         throw std::invalid_argument("when_any requires at least one awaitable");
     893              : 
     894              :     // Move/copy range onto coroutine frame to ensure lifetime
     895              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     896              : 
     897              :     detail::when_any_homogeneous_state<T> state(count);
     898              : 
     899              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     900              : 
     901              :     if(state.core_.winner_exception_)
     902              :         std::rethrow_exception(state.core_.winner_exception_);
     903              : 
     904              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     905           42 : }
     906              : 
     907              : /** Wait for the first awaitable to complete (void range overload).
     908              : 
     909              :     Races a range of void-returning awaitables. Since void awaitables have
     910              :     no result value, only the winner's index is returned.
     911              : 
     912              :     @par Suspends
     913              :     The calling coroutine suspends when co_await is invoked. All awaitables
     914              :     in the range are launched concurrently and execute in parallel. The
     915              :     coroutine resumes only after all awaitables have completed, even though
     916              :     the winner is determined by the first to finish.
     917              : 
     918              :     @par Completion Conditions
     919              :     @li Winner is determined when the first awaitable completes (success or exception)
     920              :     @li Only one task can claim winner status via atomic compare-exchange
     921              :     @li Once a winner exists, stop is requested for all remaining siblings
     922              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     923              :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     924              : 
     925              :     @par Cancellation Semantics
     926              :     Cancellation is supported via stop_token propagated through the
     927              :     IoAwaitable protocol:
     928              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     929              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     930              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     931              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     932              :     @li Stop requests are cooperative; tasks must check and respond to them
     933              : 
     934              :     @par Concurrency/Overlap
     935              :     All awaitables are launched concurrently before any can complete.
     936              :     The launcher iterates through the range, starting each task on the
     937              :     caller's executor. Tasks may execute in parallel on multi-threaded
     938              :     executors or interleave on single-threaded executors. There is no
     939              :     guaranteed ordering of task completion.
     940              : 
     941              :     @par Notable Error Conditions
     942              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     943              :     @li Winner exception: if the winning task threw, that exception is rethrown
     944              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     945              :     @li Cancellation: tasks may complete via cancellation without throwing
     946              : 
     947              :     @par Example
     948              :     @code
     949              :     task<void> example() {
     950              :         std::vector<task<void>> tasks;
     951              :         for (int i = 0; i < 5; ++i)
     952              :             tasks.push_back(background_work(i));
     953              : 
     954              :         std::size_t winner = co_await when_any(std::move(tasks));
     955              :         // winner is the index of the first task to complete
     956              :     }
     957              :     @endcode
     958              : 
     959              :     @par Example with Timeout
     960              :     @code
     961              :     task<void> with_timeout() {
     962              :         std::vector<task<void>> tasks;
     963              :         tasks.push_back(long_running_operation());
     964              :         tasks.push_back(delay(std::chrono::seconds(5)));
     965              : 
     966              :         std::size_t winner = co_await when_any(std::move(tasks));
     967              :         if (winner == 1) {
     968              :             // Timeout occurred
     969              :         }
     970              :     }
     971              :     @endcode
     972              : 
     973              :     @tparam R Range type satisfying IoAwaitableRange with void result.
     974              :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     975              :     @return A task yielding the winner's index (zero-based).
     976              : 
     977              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     978              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     979              : 
     980              :     @par Remarks
     981              :     Elements are moved from the range; for lvalue ranges, the original
     982              :     container will have moved-from elements after this call. The range
     983              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     984              :     the non-void overload, no result storage is needed since void tasks
     985              :     produce no value.
     986              : 
     987              :     @see when_any, IoAwaitableRange
     988              : */
     989              : template<IoAwaitableRange R>
     990              :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     991            3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     992              : {
     993              :     using OwnedRange = std::remove_cvref_t<R>;
     994              : 
     995              :     auto count = std::ranges::size(awaitables);
     996              :     if(count == 0)
     997              :         throw std::invalid_argument("when_any requires at least one awaitable");
     998              : 
     999              :     // Move/copy range onto coroutine frame to ensure lifetime
    1000              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
    1001              : 
    1002              :     detail::when_any_homogeneous_state<void> state(count);
    1003              : 
    1004              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
    1005              : 
    1006              :     if(state.core_.winner_exception_)
    1007              :         std::rethrow_exception(state.core_.winner_exception_);
    1008              : 
    1009              :     co_return state.core_.winner_index_;
    1010            6 : }
    1011              : 
    1012              : } // namespace capy
    1013              : } // namespace boost
    1014              : 
    1015              : #endif
        

Generated by: LCOV version 2.3