libs/capy/include/boost/capy/when_any.hpp

99.2% Lines (131/132) 96.2% Functions (432/449) 100.0% Branches (27/27)
libs/capy/include/boost/capy/when_any.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <ranges>
26 #include <stdexcept>
27 #include <stop_token>
28 #include <tuple>
29 #include <type_traits>
30 #include <utility>
31 #include <variant>
32 #include <vector>
33
34 /*
35 when_any - Race multiple tasks, return first completion
36 ========================================================
37
38 OVERVIEW:
39 ---------
40 when_any launches N tasks concurrently and completes when the FIRST task
41 finishes (success or failure). It then requests stop for all siblings and
42 waits for them to acknowledge before returning.
43
44 ARCHITECTURE:
45 -------------
46 The design mirrors when_all but with inverted completion semantics:
47
48 when_all: complete when remaining_count reaches 0 (all done)
49 when_any: complete when has_winner becomes true (first done)
50 BUT still wait for remaining_count to reach 0 for cleanup
51
52 Key components:
53 - when_any_state: Shared state tracking winner and completion
54 - when_any_runner: Wrapper coroutine for each child task
55 - when_any_launcher: Awaitable that starts all runners concurrently
56
57 CRITICAL INVARIANTS:
58 --------------------
59 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 2. All tasks must complete before parent resumes (cleanup safety)
61 3. Stop is requested immediately when winner is determined
62 4. Only the winner's result/exception is stored
63
64 TYPE DEDUPLICATION:
65 -------------------
66 std::variant requires unique alternative types. Since when_any can race
67 tasks with identical return types (e.g., three task<int>), we must
68 deduplicate types before constructing the variant.
69
70 Example: when_any(task<int>, task<string>, task<int>)
71 - Raw types after void->monostate: int, string, int
72 - Deduplicated variant: std::variant<int, string>
73 - Return: pair<size_t, variant<int, string>>
74
75 The winner_index tells you which task won (0, 1, or 2), while the variant
76 holds the result. Use the index to determine how to interpret the variant.
77
78 VOID HANDLING:
79 --------------
80 void tasks contribute std::monostate to the variant (then deduplicated).
81 All-void tasks result in: pair<size_t, variant<monostate>>
82
83 MEMORY MODEL:
84 -------------
85 Synchronization chain from winner's write to parent's read:
86
87 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 5. Parent coroutine resumes and reads result_/winner_exception_
93
94 Synchronization analysis:
95 - All fetch_sub operations on remaining_count_ form a release sequence
96 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 in the modification order of remaining_count_
98 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 modification order, establishing happens-before from winner's writes
100 - Executor dispatch() is expected to provide queue-based synchronization
101 (release-on-post, acquire-on-execute) completing the chain to parent
102 - Even inline executors work (same thread = sequenced-before)
103
104 Alternative considered: Adding winner_ready_ atomic (set with release after
105 storing winner data, acquired before reading) would make synchronization
106 self-contained and not rely on executor implementation details. Current
107 approach is correct but requires careful reasoning about release sequences
108 and executor behavior.
109
110 EXCEPTION SEMANTICS:
111 --------------------
112 Unlike when_all (which captures first exception, discards others), when_any
113 treats exceptions as valid completions. If the winning task threw, that
114 exception is rethrown. Exceptions from non-winners are silently discarded.
115 */
116
117 namespace boost {
118 namespace capy {
119
120 namespace detail {
121
122 /** Convert void to monostate for variant storage.
123
124 std::variant<void, ...> is ill-formed, so void tasks contribute
125 std::monostate to the result variant instead. Non-void types
126 pass through unchanged.
127
128 @tparam T The type to potentially convert (void becomes monostate).
129 */
130 template<typename T>
131 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132
133 // Type deduplication: std::variant requires unique alternative types.
134 // Fold left over the type list, appending each type only if not already present.
135 template<typename Variant, typename T>
136 struct variant_append_if_unique;
137
138 template<typename... Vs, typename T>
139 struct variant_append_if_unique<std::variant<Vs...>, T>
140 {
141 using type = std::conditional_t<
142 (std::is_same_v<T, Vs> || ...),
143 std::variant<Vs...>,
144 std::variant<Vs..., T>>;
145 };
146
147 template<typename Accumulated, typename... Remaining>
148 struct deduplicate_impl;
149
150 template<typename Accumulated>
151 struct deduplicate_impl<Accumulated>
152 {
153 using type = Accumulated;
154 };
155
156 template<typename Accumulated, typename T, typename... Rest>
157 struct deduplicate_impl<Accumulated, T, Rest...>
158 {
159 using next = typename variant_append_if_unique<Accumulated, T>::type;
160 using type = typename deduplicate_impl<next, Rest...>::type;
161 };
162
163 // Deduplicated variant; void types become monostate before deduplication
164 template<typename T0, typename... Ts>
165 using unique_variant_t = typename deduplicate_impl<
166 std::variant<void_to_monostate_t<T0>>,
167 void_to_monostate_t<Ts>...>::type;
168
169 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 // when multiple tasks share the same return type.
171 template<typename T0, typename... Ts>
172 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173
174 /** Core shared state for when_any operations.
175
176 Contains all members and methods common to both heterogeneous (variadic)
177 and homogeneous (range) when_any implementations. State classes embed
178 this via composition to avoid CRTP destructor ordering issues.
179
180 @par Thread Safety
181 Atomic operations protect winner selection and completion count.
182 */
183 struct when_any_core
184 {
185 std::atomic<std::size_t> remaining_count_;
186 std::size_t winner_index_{0};
187 std::exception_ptr winner_exception_;
188 std::stop_source stop_source_;
189
190 // Bridges parent's stop token to our stop_source
191 struct stop_callback_fn
192 {
193 std::stop_source* source_;
194 9 void operator()() const noexcept { source_->request_stop(); }
195 };
196 using stop_callback_t = std::stop_callback<stop_callback_fn>;
197 std::optional<stop_callback_t> parent_stop_callback_;
198
199 coro continuation_;
200 executor_ref caller_ex_;
201
202 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
203 std::atomic<bool> has_winner_{false};
204
205 65 explicit when_any_core(std::size_t count) noexcept
206 65 : remaining_count_(count)
207 {
208 65 }
209
210 /** Atomically claim winner status; exactly one task succeeds. */
211 190 bool try_win(std::size_t index) noexcept
212 {
213 190 bool expected = false;
214
2/2
✓ Branch 1 taken 65 times.
✓ Branch 2 taken 125 times.
190 if(has_winner_.compare_exchange_strong(
215 expected, true, std::memory_order_acq_rel))
216 {
217 65 winner_index_ = index;
218 65 stop_source_.request_stop();
219 65 return true;
220 }
221 125 return false;
222 }
223
224 /** @pre try_win() returned true. */
225 8 void set_winner_exception(std::exception_ptr ep) noexcept
226 {
227 8 winner_exception_ = ep;
228 8 }
229
230 // Runners signal completion directly via final_suspend; no member function needed.
231 };
232
233 /** Shared state for heterogeneous when_any operation.
234
235 Coordinates winner selection, result storage, and completion tracking
236 for all child tasks in a when_any operation. Uses composition with
237 when_any_core for shared functionality.
238
239 @par Lifetime
240 Allocated on the parent coroutine's frame, outlives all runners.
241
242 @tparam T0 First task's result type.
243 @tparam Ts Remaining tasks' result types.
244 */
245 template<typename T0, typename... Ts>
246 struct when_any_state
247 {
248 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
249 using variant_type = unique_variant_t<T0, Ts...>;
250
251 when_any_core core_;
252 std::optional<variant_type> result_;
253 std::array<coro, task_count> runner_handles_{};
254
255 43 when_any_state()
256 43 : core_(task_count)
257 {
258 43 }
259
260 // Runners self-destruct in final_suspend. No destruction needed here.
261
262 /** @pre core_.try_win() returned true.
263 @note Uses in_place_type (not index) because variant is deduplicated.
264 */
265 template<typename T>
266 35 void set_winner_result(T value)
267 noexcept(std::is_nothrow_move_constructible_v<T>)
268 {
269 35 result_.emplace(std::in_place_type<T>, std::move(value));
270 35 }
271
272 /** @pre core_.try_win() returned true. */
273 3 void set_winner_void() noexcept
274 {
275 3 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
276 3 }
277 };
278
279 /** Wrapper coroutine that runs a single child task for when_any.
280
281 Propagates executor/stop_token to the child, attempts to claim winner
282 status on completion, and signals completion for cleanup coordination.
283
284 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
285 */
286 template<typename StateType>
287 struct when_any_runner
288 {
289 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
290 {
291 StateType* state_ = nullptr;
292 std::size_t index_ = 0;
293 executor_ref ex_;
294 std::stop_token stop_token_;
295
296 190 when_any_runner get_return_object() noexcept
297 {
298 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 }
300
301 // Starts suspended; launcher sets up state/ex/token then resumes
302 190 std::suspend_always initial_suspend() noexcept
303 {
304 190 return {};
305 }
306
307 190 auto final_suspend() noexcept
308 {
309 struct awaiter
310 {
311 promise_type* p_;
312 bool await_ready() const noexcept { return false; }
313 void await_suspend(coro h) noexcept
314 {
315 // Extract everything needed for signaling before
316 // self-destruction. Inline dispatch may destroy
317 // state, so we can't access members after.
318 auto& core = p_->state_->core_;
319 auto* counter = &core.remaining_count_;
320 auto caller_ex = core.caller_ex_;
321 auto cont = core.continuation_;
322
323 // Self-destruct first - state no longer destroys runners
324 h.destroy();
325
326 // Signal completion. If last, dispatch parent.
327 // Uses only local copies - safe even if state
328 // is destroyed during inline dispatch.
329 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
330 if(remaining == 1)
331 caller_ex.dispatch(cont);
332 }
333 void await_resume() const noexcept {}
334 };
335 190 return awaiter{this};
336 }
337
338 178 void return_void() noexcept {}
339
340 // Exceptions are valid completions in when_any (unlike when_all)
341 12 void unhandled_exception()
342 {
343
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 4 times.
12 if(state_->core_.try_win(index_))
344 8 state_->core_.set_winner_exception(std::current_exception());
345 12 }
346
347 /** Injects executor and stop token into child awaitables. */
348 template<class Awaitable>
349 struct transform_awaiter
350 {
351 std::decay_t<Awaitable> a_;
352 promise_type* p_;
353
354 190 bool await_ready() { return a_.await_ready(); }
355 190 auto await_resume() { return a_.await_resume(); }
356
357 template<class Promise>
358 185 auto await_suspend(std::coroutine_handle<Promise> h)
359 {
360 185 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
361 }
362 };
363
364 template<class Awaitable>
365 190 auto await_transform(Awaitable&& a)
366 {
367 using A = std::decay_t<Awaitable>;
368 if constexpr (IoAwaitable<A>)
369 {
370 return transform_awaiter<Awaitable>{
371 380 std::forward<Awaitable>(a), this};
372 }
373 else
374 {
375 static_assert(sizeof(A) == 0, "requires IoAwaitable");
376 }
377 190 }
378 };
379
380 std::coroutine_handle<promise_type> h_;
381
382 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
383 190 : h_(h)
384 {
385 190 }
386
387 // Enable move for all clang versions - some versions need it
388 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
389
390 // Non-copyable
391 when_any_runner(when_any_runner const&) = delete;
392 when_any_runner& operator=(when_any_runner const&) = delete;
393 when_any_runner& operator=(when_any_runner&&) = delete;
394
395 190 auto release() noexcept
396 {
397 190 return std::exchange(h_, nullptr);
398 }
399 };
400
401 /** Wraps a child awaitable, attempts to claim winner on completion.
402
403 Uses requires-expressions to detect state capabilities:
404 - set_winner_void(): for heterogeneous void tasks (stores monostate)
405 - set_winner_result(): for non-void tasks
406 - Neither: for homogeneous void tasks (no result storage)
407 */
408 template<IoAwaitable Awaitable, typename StateType>
409 when_any_runner<StateType>
410
1/1
✓ Branch 1 taken 190 times.
190 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
411 {
412 using T = awaitable_result_t<Awaitable>;
413 if constexpr (std::is_void_v<T>)
414 {
415 co_await std::move(inner);
416 if(state->core_.try_win(index))
417 {
418 // Heterogeneous void tasks store monostate in the variant
419 if constexpr (requires { state->set_winner_void(); })
420 state->set_winner_void();
421 // Homogeneous void tasks have no result to store
422 }
423 }
424 else
425 {
426 auto result = co_await std::move(inner);
427 if(state->core_.try_win(index))
428 {
429 // Defensive: move should not throw (already moved once), but we
430 // catch just in case since an uncaught exception would be devastating.
431 try
432 {
433 state->set_winner_result(std::move(result));
434 }
435 catch(...)
436 {
437 state->core_.set_winner_exception(std::current_exception());
438 }
439 }
440 }
441 380 }
442
443 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
444 template<IoAwaitable... Awaitables>
445 class when_any_launcher
446 {
447 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
448
449 std::tuple<Awaitables...>* tasks_;
450 state_type* state_;
451
452 public:
453 43 when_any_launcher(
454 std::tuple<Awaitables...>* tasks,
455 state_type* state)
456 43 : tasks_(tasks)
457 43 , state_(state)
458 {
459 43 }
460
461 43 bool await_ready() const noexcept
462 {
463 43 return sizeof...(Awaitables) == 0;
464 }
465
466 /** CRITICAL: If the last task finishes synchronously, parent resumes and
467 destroys this object before await_suspend returns. Must not reference
468 `this` after the final launch_one call.
469 */
470 template<Executor Ex>
471 43 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
472 {
473 43 state_->core_.continuation_ = continuation;
474 43 state_->core_.caller_ex_ = caller_ex;
475
476
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 34 times.
43 if(parent_token.stop_possible())
477 {
478 18 state_->core_.parent_stop_callback_.emplace(
479 parent_token,
480 9 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
481
482
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 6 times.
9 if(parent_token.stop_requested())
483 3 state_->core_.stop_source_.request_stop();
484 }
485
486 43 auto token = state_->core_.stop_source_.get_token();
487 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
488 (..., launch_one<Is>(caller_ex, token));
489
1/1
✓ Branch 1 taken 43 times.
43 }(std::index_sequence_for<Awaitables...>{});
490
491 86 return std::noop_coroutine();
492 43 }
493
494 43 void await_resume() const noexcept
495 {
496 43 }
497
498 private:
499 /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
500 template<std::size_t I, Executor Ex>
501 105 void launch_one(Ex const& caller_ex, std::stop_token token)
502 {
503
1/1
✓ Branch 2 taken 105 times.
105 auto runner = make_when_any_runner(
504 105 std::move(std::get<I>(*tasks_)), state_, I);
505
506 105 auto h = runner.release();
507 105 h.promise().state_ = state_;
508 105 h.promise().index_ = I;
509 105 h.promise().ex_ = caller_ex;
510 105 h.promise().stop_token_ = token;
511
512 105 coro ch{h};
513 105 state_->runner_handles_[I] = ch;
514
1/1
✓ Branch 1 taken 105 times.
105 caller_ex.dispatch(ch);
515 105 }
516 };
517
518 } // namespace detail
519
520 /** Wait for the first awaitable to complete.
521
522 Races multiple heterogeneous awaitables concurrently and returns when the
523 first one completes. The result includes the winner's index and a
524 deduplicated variant containing the result value.
525
526 @par Suspends
527 The calling coroutine suspends when co_await is invoked. All awaitables
528 are launched concurrently and execute in parallel. The coroutine resumes
529 only after all awaitables have completed, even though the winner is
530 determined by the first to finish.
531
532 @par Completion Conditions
533 @li Winner is determined when the first awaitable completes (success or exception)
534 @li Only one task can claim winner status via atomic compare-exchange
535 @li Once a winner exists, stop is requested for all remaining siblings
536 @li Parent coroutine resumes only after all siblings acknowledge completion
537 @li The winner's result is returned; if the winner threw, the exception is rethrown
538
539 @par Cancellation Semantics
540 Cancellation is supported via stop_token propagated through the
541 IoAwaitable protocol:
542 @li Each child awaitable receives a stop_token derived from a shared stop_source
543 @li When the parent's stop token is activated, the stop is forwarded to all children
544 @li When a winner is determined, stop_source_.request_stop() is called immediately
545 @li Siblings must handle cancellation gracefully and complete before parent resumes
546 @li Stop requests are cooperative; tasks must check and respond to them
547
548 @par Concurrency/Overlap
549 All awaitables are launched concurrently before any can complete.
550 The launcher iterates through the arguments, starting each task on the
551 caller's executor. Tasks may execute in parallel on multi-threaded
552 executors or interleave on single-threaded executors. There is no
553 guaranteed ordering of task completion.
554
555 @par Notable Error Conditions
556 @li Winner exception: if the winning task threw, that exception is rethrown
557 @li Non-winner exceptions: silently discarded (only winner's result matters)
558 @li Cancellation: tasks may complete via cancellation without throwing
559
560 @par Example
561 @code
562 task<void> example() {
563 auto [index, result] = co_await when_any(
564 fetch_from_primary(), // task<Response>
565 fetch_from_backup() // task<Response>
566 );
567 // index is 0 or 1, result holds the winner's Response
568 auto response = std::get<Response>(result);
569 }
570 @endcode
571
572 @par Example with Heterogeneous Types
573 @code
574 task<void> mixed_types() {
575 auto [index, result] = co_await when_any(
576 fetch_int(), // task<int>
577 fetch_string() // task<std::string>
578 );
579 if (index == 0)
580 std::cout << "Got int: " << std::get<int>(result) << "\n";
581 else
582 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
583 }
584 @endcode
585
586 @tparam A0 First awaitable type (must satisfy IoAwaitable).
587 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
588 @param a0 The first awaitable to race.
589 @param as Additional awaitables to race concurrently.
590 @return A task yielding a pair of (winner_index, result_variant).
591
592 @throws Rethrows the winner's exception if the winning task threw an exception.
593
594 @par Remarks
595 Awaitables are moved into the coroutine frame; original objects become
596 empty after the call. When multiple awaitables share the same return type,
597 the variant is deduplicated to contain only unique types. Use the winner
598 index to determine which awaitable completed first. Void awaitables
599 contribute std::monostate to the variant.
600
601 @see when_all, IoAwaitable
602 */
603 template<IoAwaitable A0, IoAwaitable... As>
604
1/1
✓ Branch 1 taken 43 times.
43 [[nodiscard]] auto when_any(A0 a0, As... as)
605 -> task<detail::when_any_result_t<
606 detail::awaitable_result_t<A0>,
607 detail::awaitable_result_t<As>...>>
608 {
609 using result_type = detail::when_any_result_t<
610 detail::awaitable_result_t<A0>,
611 detail::awaitable_result_t<As>...>;
612
613 detail::when_any_state<
614 detail::awaitable_result_t<A0>,
615 detail::awaitable_result_t<As>...> state;
616 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
617
618 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
619
620 if(state.core_.winner_exception_)
621 std::rethrow_exception(state.core_.winner_exception_);
622
623 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
624 86 }
625
626 /** Concept for ranges of full I/O awaitables.
627
628 A range satisfies `IoAwaitableRange` if it is a sized input range
629 whose value type satisfies @ref IoAwaitable. This enables when_any
630 to accept any container or view of awaitables, not just std::vector.
631
632 @tparam R The range type.
633
634 @par Requirements
635 @li `R` must satisfy `std::ranges::input_range`
636 @li `R` must satisfy `std::ranges::sized_range`
637 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
638
639 @par Syntactic Requirements
640 Given `r` of type `R`:
641 @li `std::ranges::begin(r)` is valid
642 @li `std::ranges::end(r)` is valid
643 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
644 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
645
646 @par Example
647 @code
648 template<IoAwaitableRange R>
649 task<void> race_all(R&& awaitables) {
650 auto winner = co_await when_any(std::forward<R>(awaitables));
651 // Process winner...
652 }
653 @endcode
654
655 @see when_any, IoAwaitable
656 */
657 template<typename R>
658 concept IoAwaitableRange =
659 std::ranges::input_range<R> &&
660 std::ranges::sized_range<R> &&
661 IoAwaitable<std::ranges::range_value_t<R>>;
662
663 namespace detail {
664
665 /** Shared state for homogeneous when_any (range overload).
666
667 Uses composition with when_any_core for shared functionality.
668 Simpler than heterogeneous: optional<T> instead of variant, vector
669 instead of array for runner handles.
670 */
671 template<typename T>
672 struct when_any_homogeneous_state
673 {
674 when_any_core core_;
675 std::optional<T> result_;
676 std::vector<coro> runner_handles_;
677
678 19 explicit when_any_homogeneous_state(std::size_t count)
679 19 : core_(count)
680
1/1
✓ Branch 2 taken 19 times.
38 , runner_handles_(count)
681 {
682 19 }
683
684 // Runners self-destruct in final_suspend. No destruction needed here.
685
686 /** @pre core_.try_win() returned true. */
687 17 void set_winner_result(T value)
688 noexcept(std::is_nothrow_move_constructible_v<T>)
689 {
690 17 result_.emplace(std::move(value));
691 17 }
692 };
693
694 /** Specialization for void tasks (no result storage needed). */
695 template<>
696 struct when_any_homogeneous_state<void>
697 {
698 when_any_core core_;
699 std::vector<coro> runner_handles_;
700
701 3 explicit when_any_homogeneous_state(std::size_t count)
702 3 : core_(count)
703
1/1
✓ Branch 1 taken 3 times.
6 , runner_handles_(count)
704 {
705 3 }
706
707 // Runners self-destruct in final_suspend. No destruction needed here.
708
709 // No set_winner_result - void tasks have no result to store
710 };
711
712 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
713 template<IoAwaitableRange Range>
714 class when_any_homogeneous_launcher
715 {
716 using Awaitable = std::ranges::range_value_t<Range>;
717 using T = awaitable_result_t<Awaitable>;
718
719 Range* range_;
720 when_any_homogeneous_state<T>* state_;
721
722 public:
723 22 when_any_homogeneous_launcher(
724 Range* range,
725 when_any_homogeneous_state<T>* state)
726 22 : range_(range)
727 22 , state_(state)
728 {
729 22 }
730
731 22 bool await_ready() const noexcept
732 {
733 22 return std::ranges::empty(*range_);
734 }
735
736 /** CRITICAL: If the last task finishes synchronously, parent resumes and
737 destroys this object before await_suspend returns. Must not reference
738 `this` after dispatching begins.
739
740 Two-phase approach:
741 1. Create all runners (safe - no dispatch yet)
742 2. Dispatch all runners (any may complete synchronously)
743 */
744 template<Executor Ex>
745 22 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
746 {
747 22 state_->core_.continuation_ = continuation;
748 22 state_->core_.caller_ex_ = caller_ex;
749
750
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 15 times.
22 if(parent_token.stop_possible())
751 {
752 14 state_->core_.parent_stop_callback_.emplace(
753 parent_token,
754 7 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
755
756
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 3 times.
7 if(parent_token.stop_requested())
757 4 state_->core_.stop_source_.request_stop();
758 }
759
760 22 auto token = state_->core_.stop_source_.get_token();
761
762 // Phase 1: Create all runners without dispatching.
763 // This iterates over *range_ safely because no runners execute yet.
764 22 std::size_t index = 0;
765
2/2
✓ Branch 4 taken 85 times.
✓ Branch 5 taken 22 times.
107 for(auto&& a : *range_)
766 {
767
1/1
✓ Branch 2 taken 85 times.
85 auto runner = make_when_any_runner(
768 85 std::move(a), state_, index);
769
770 85 auto h = runner.release();
771 85 h.promise().state_ = state_;
772 85 h.promise().index_ = index;
773 85 h.promise().ex_ = caller_ex;
774 85 h.promise().stop_token_ = token;
775
776 85 state_->runner_handles_[index] = coro{h};
777 85 ++index;
778 }
779
780 // Phase 2: Dispatch all runners. Any may complete synchronously.
781 // After last dispatch, state_ and this may be destroyed.
782 // Use raw pointer/count captured before dispatching.
783 22 coro* handles = state_->runner_handles_.data();
784 22 std::size_t count = state_->runner_handles_.size();
785
2/2
✓ Branch 0 taken 85 times.
✓ Branch 1 taken 22 times.
107 for(std::size_t i = 0; i < count; ++i)
786
1/1
✓ Branch 1 taken 85 times.
85 caller_ex.dispatch(handles[i]);
787
788 44 return std::noop_coroutine();
789 22 }
790
791 22 void await_resume() const noexcept
792 {
793 22 }
794 };
795
796 } // namespace detail
797
798 /** Wait for the first awaitable to complete (range overload).
799
800 Races a range of awaitables with the same result type. Accepts any
801 sized input range of IoAwaitable types, enabling use with arrays,
802 spans, or custom containers.
803
804 @par Suspends
805 The calling coroutine suspends when co_await is invoked. All awaitables
806 in the range are launched concurrently and execute in parallel. The
807 coroutine resumes only after all awaitables have completed, even though
808 the winner is determined by the first to finish.
809
810 @par Completion Conditions
811 @li Winner is determined when the first awaitable completes (success or exception)
812 @li Only one task can claim winner status via atomic compare-exchange
813 @li Once a winner exists, stop is requested for all remaining siblings
814 @li Parent coroutine resumes only after all siblings acknowledge completion
815 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
816
817 @par Cancellation Semantics
818 Cancellation is supported via stop_token propagated through the
819 IoAwaitable protocol:
820 @li Each child awaitable receives a stop_token derived from a shared stop_source
821 @li When the parent's stop token is activated, the stop is forwarded to all children
822 @li When a winner is determined, stop_source_.request_stop() is called immediately
823 @li Siblings must handle cancellation gracefully and complete before parent resumes
824 @li Stop requests are cooperative; tasks must check and respond to them
825
826 @par Concurrency/Overlap
827 All awaitables are launched concurrently before any can complete.
828 The launcher iterates through the range, starting each task on the
829 caller's executor. Tasks may execute in parallel on multi-threaded
830 executors or interleave on single-threaded executors. There is no
831 guaranteed ordering of task completion.
832
833 @par Notable Error Conditions
834 @li Empty range: throws std::invalid_argument immediately (not via co_return)
835 @li Winner exception: if the winning task threw, that exception is rethrown
836 @li Non-winner exceptions: silently discarded (only winner's result matters)
837 @li Cancellation: tasks may complete via cancellation without throwing
838
839 @par Example
840 @code
841 task<void> example() {
842 std::array<task<Response>, 3> requests = {
843 fetch_from_server(0),
844 fetch_from_server(1),
845 fetch_from_server(2)
846 };
847
848 auto [index, response] = co_await when_any(std::move(requests));
849 }
850 @endcode
851
852 @par Example with Vector
853 @code
854 task<Response> fetch_fastest(std::vector<Server> const& servers) {
855 std::vector<task<Response>> requests;
856 for (auto const& server : servers)
857 requests.push_back(fetch_from(server));
858
859 auto [index, response] = co_await when_any(std::move(requests));
860 co_return response;
861 }
862 @endcode
863
864 @tparam R Range type satisfying IoAwaitableRange.
865 @param awaitables Range of awaitables to race concurrently (must not be empty).
866 @return A task yielding a pair of (winner_index, result).
867
868 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
869 @throws Rethrows the winner's exception if the winning task threw an exception.
870
871 @par Remarks
872 Elements are moved from the range; for lvalue ranges, the original
873 container will have moved-from elements after this call. The range
874 is moved onto the coroutine frame to ensure lifetime safety. Unlike
875 the variadic overload, no variant wrapper is needed since all tasks
876 share the same return type.
877
878 @see when_any, IoAwaitableRange
879 */
880 template<IoAwaitableRange R>
881 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
882
1/1
✓ Branch 1 taken 21 times.
21 [[nodiscard]] auto when_any(R&& awaitables)
883 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
884 {
885 using Awaitable = std::ranges::range_value_t<R>;
886 using T = detail::awaitable_result_t<Awaitable>;
887 using result_type = std::pair<std::size_t, T>;
888 using OwnedRange = std::remove_cvref_t<R>;
889
890 auto count = std::ranges::size(awaitables);
891 if(count == 0)
892 throw std::invalid_argument("when_any requires at least one awaitable");
893
894 // Move/copy range onto coroutine frame to ensure lifetime
895 OwnedRange owned_awaitables = std::forward<R>(awaitables);
896
897 detail::when_any_homogeneous_state<T> state(count);
898
899 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
900
901 if(state.core_.winner_exception_)
902 std::rethrow_exception(state.core_.winner_exception_);
903
904 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
905 42 }
906
907 /** Wait for the first awaitable to complete (void range overload).
908
909 Races a range of void-returning awaitables. Since void awaitables have
910 no result value, only the winner's index is returned.
911
912 @par Suspends
913 The calling coroutine suspends when co_await is invoked. All awaitables
914 in the range are launched concurrently and execute in parallel. The
915 coroutine resumes only after all awaitables have completed, even though
916 the winner is determined by the first to finish.
917
918 @par Completion Conditions
919 @li Winner is determined when the first awaitable completes (success or exception)
920 @li Only one task can claim winner status via atomic compare-exchange
921 @li Once a winner exists, stop is requested for all remaining siblings
922 @li Parent coroutine resumes only after all siblings acknowledge completion
923 @li The winner's index is returned; if the winner threw, the exception is rethrown
924
925 @par Cancellation Semantics
926 Cancellation is supported via stop_token propagated through the
927 IoAwaitable protocol:
928 @li Each child awaitable receives a stop_token derived from a shared stop_source
929 @li When the parent's stop token is activated, the stop is forwarded to all children
930 @li When a winner is determined, stop_source_.request_stop() is called immediately
931 @li Siblings must handle cancellation gracefully and complete before parent resumes
932 @li Stop requests are cooperative; tasks must check and respond to them
933
934 @par Concurrency/Overlap
935 All awaitables are launched concurrently before any can complete.
936 The launcher iterates through the range, starting each task on the
937 caller's executor. Tasks may execute in parallel on multi-threaded
938 executors or interleave on single-threaded executors. There is no
939 guaranteed ordering of task completion.
940
941 @par Notable Error Conditions
942 @li Empty range: throws std::invalid_argument immediately (not via co_return)
943 @li Winner exception: if the winning task threw, that exception is rethrown
944 @li Non-winner exceptions: silently discarded (only winner's result matters)
945 @li Cancellation: tasks may complete via cancellation without throwing
946
947 @par Example
948 @code
949 task<void> example() {
950 std::vector<task<void>> tasks;
951 for (int i = 0; i < 5; ++i)
952 tasks.push_back(background_work(i));
953
954 std::size_t winner = co_await when_any(std::move(tasks));
955 // winner is the index of the first task to complete
956 }
957 @endcode
958
959 @par Example with Timeout
960 @code
961 task<void> with_timeout() {
962 std::vector<task<void>> tasks;
963 tasks.push_back(long_running_operation());
964 tasks.push_back(delay(std::chrono::seconds(5)));
965
966 std::size_t winner = co_await when_any(std::move(tasks));
967 if (winner == 1) {
968 // Timeout occurred
969 }
970 }
971 @endcode
972
973 @tparam R Range type satisfying IoAwaitableRange with void result.
974 @param awaitables Range of void awaitables to race concurrently (must not be empty).
975 @return A task yielding the winner's index (zero-based).
976
977 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
978 @throws Rethrows the winner's exception if the winning task threw an exception.
979
980 @par Remarks
981 Elements are moved from the range; for lvalue ranges, the original
982 container will have moved-from elements after this call. The range
983 is moved onto the coroutine frame to ensure lifetime safety. Unlike
984 the non-void overload, no result storage is needed since void tasks
985 produce no value.
986
987 @see when_any, IoAwaitableRange
988 */
989 template<IoAwaitableRange R>
990 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
991
1/1
✓ Branch 1 taken 3 times.
3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
992 {
993 using OwnedRange = std::remove_cvref_t<R>;
994
995 auto count = std::ranges::size(awaitables);
996 if(count == 0)
997 throw std::invalid_argument("when_any requires at least one awaitable");
998
999 // Move/copy range onto coroutine frame to ensure lifetime
1000 OwnedRange owned_awaitables = std::forward<R>(awaitables);
1001
1002 detail::when_any_homogeneous_state<void> state(count);
1003
1004 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1005
1006 if(state.core_.winner_exception_)
1007 std::rethrow_exception(state.core_.winner_exception_);
1008
1009 co_return state.core_.winner_index_;
1010 6 }
1011
1012 } // namespace capy
1013 } // namespace boost
1014
1015 #endif
1016