Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/task.hpp>
20 :
21 : #include <array>
22 : #include <atomic>
23 : #include <exception>
24 : #include <optional>
25 : #include <ranges>
26 : #include <stdexcept>
27 : #include <stop_token>
28 : #include <tuple>
29 : #include <type_traits>
30 : #include <utility>
31 : #include <variant>
32 : #include <vector>
33 :
34 : /*
35 : when_any - Race multiple tasks, return first completion
36 : ========================================================
37 :
38 : OVERVIEW:
39 : ---------
40 : when_any launches N tasks concurrently and completes when the FIRST task
41 : finishes (success or failure). It then requests stop for all siblings and
42 : waits for them to acknowledge before returning.
43 :
44 : ARCHITECTURE:
45 : -------------
46 : The design mirrors when_all but with inverted completion semantics:
47 :
48 : when_all: complete when remaining_count reaches 0 (all done)
49 : when_any: complete when has_winner becomes true (first done)
50 : BUT still wait for remaining_count to reach 0 for cleanup
51 :
52 : Key components:
53 : - when_any_state: Shared state tracking winner and completion
54 : - when_any_runner: Wrapper coroutine for each child task
55 : - when_any_launcher: Awaitable that starts all runners concurrently
56 :
57 : CRITICAL INVARIANTS:
58 : --------------------
59 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 : 2. All tasks must complete before parent resumes (cleanup safety)
61 : 3. Stop is requested immediately when winner is determined
62 : 4. Only the winner's result/exception is stored
63 :
64 : TYPE DEDUPLICATION:
65 : -------------------
66 : std::variant requires unique alternative types. Since when_any can race
67 : tasks with identical return types (e.g., three task<int>), we must
68 : deduplicate types before constructing the variant.
69 :
70 : Example: when_any(task<int>, task<string>, task<int>)
71 : - Raw types after void->monostate: int, string, int
72 : - Deduplicated variant: std::variant<int, string>
73 : - Return: pair<size_t, variant<int, string>>
74 :
75 : The winner_index tells you which task won (0, 1, or 2), while the variant
76 : holds the result. Use the index to determine how to interpret the variant.
77 :
78 : VOID HANDLING:
79 : --------------
80 : void tasks contribute std::monostate to the variant (then deduplicated).
81 : All-void tasks result in: pair<size_t, variant<monostate>>
82 :
83 : MEMORY MODEL:
84 : -------------
85 : Synchronization chain from winner's write to parent's read:
86 :
87 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 : 5. Parent coroutine resumes and reads result_/winner_exception_
93 :
94 : Synchronization analysis:
95 : - All fetch_sub operations on remaining_count_ form a release sequence
96 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 : in the modification order of remaining_count_
98 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 : modification order, establishing happens-before from winner's writes
100 : - Executor dispatch() is expected to provide queue-based synchronization
101 : (release-on-post, acquire-on-execute) completing the chain to parent
102 : - Even inline executors work (same thread = sequenced-before)
103 :
104 : Alternative considered: Adding winner_ready_ atomic (set with release after
105 : storing winner data, acquired before reading) would make synchronization
106 : self-contained and not rely on executor implementation details. Current
107 : approach is correct but requires careful reasoning about release sequences
108 : and executor behavior.
109 :
110 : EXCEPTION SEMANTICS:
111 : --------------------
112 : Unlike when_all (which captures first exception, discards others), when_any
113 : treats exceptions as valid completions. If the winning task threw, that
114 : exception is rethrown. Exceptions from non-winners are silently discarded.
115 : */
116 :
117 : namespace boost {
118 : namespace capy {
119 :
120 : namespace detail {
121 :
122 : /** Convert void to monostate for variant storage.
123 :
124 : std::variant<void, ...> is ill-formed, so void tasks contribute
125 : std::monostate to the result variant instead. Non-void types
126 : pass through unchanged.
127 :
128 : @tparam T The type to potentially convert (void becomes monostate).
129 : */
130 : template<typename T>
131 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132 :
133 : // Type deduplication: std::variant requires unique alternative types.
134 : // Fold left over the type list, appending each type only if not already present.
135 : template<typename Variant, typename T>
136 : struct variant_append_if_unique;
137 :
138 : template<typename... Vs, typename T>
139 : struct variant_append_if_unique<std::variant<Vs...>, T>
140 : {
141 : using type = std::conditional_t<
142 : (std::is_same_v<T, Vs> || ...),
143 : std::variant<Vs...>,
144 : std::variant<Vs..., T>>;
145 : };
146 :
147 : template<typename Accumulated, typename... Remaining>
148 : struct deduplicate_impl;
149 :
150 : template<typename Accumulated>
151 : struct deduplicate_impl<Accumulated>
152 : {
153 : using type = Accumulated;
154 : };
155 :
156 : template<typename Accumulated, typename T, typename... Rest>
157 : struct deduplicate_impl<Accumulated, T, Rest...>
158 : {
159 : using next = typename variant_append_if_unique<Accumulated, T>::type;
160 : using type = typename deduplicate_impl<next, Rest...>::type;
161 : };
162 :
163 : // Deduplicated variant; void types become monostate before deduplication
164 : template<typename T0, typename... Ts>
165 : using unique_variant_t = typename deduplicate_impl<
166 : std::variant<void_to_monostate_t<T0>>,
167 : void_to_monostate_t<Ts>...>::type;
168 :
169 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 : // when multiple tasks share the same return type.
171 : template<typename T0, typename... Ts>
172 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173 :
174 : /** Core shared state for when_any operations.
175 :
176 : Contains all members and methods common to both heterogeneous (variadic)
177 : and homogeneous (range) when_any implementations. State classes embed
178 : this via composition to avoid CRTP destructor ordering issues.
179 :
180 : @par Thread Safety
181 : Atomic operations protect winner selection and completion count.
182 : */
183 : struct when_any_core
184 : {
185 : std::atomic<std::size_t> remaining_count_;
186 : std::size_t winner_index_{0};
187 : std::exception_ptr winner_exception_;
188 : std::stop_source stop_source_;
189 :
190 : // Bridges parent's stop token to our stop_source
191 : struct stop_callback_fn
192 : {
193 : std::stop_source* source_;
194 9 : void operator()() const noexcept { source_->request_stop(); }
195 : };
196 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
197 : std::optional<stop_callback_t> parent_stop_callback_;
198 :
199 : coro continuation_;
200 : executor_ref caller_ex_;
201 :
202 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
203 : std::atomic<bool> has_winner_{false};
204 :
205 65 : explicit when_any_core(std::size_t count) noexcept
206 65 : : remaining_count_(count)
207 : {
208 65 : }
209 :
210 : /** Atomically claim winner status; exactly one task succeeds. */
211 190 : bool try_win(std::size_t index) noexcept
212 : {
213 190 : bool expected = false;
214 190 : if(has_winner_.compare_exchange_strong(
215 : expected, true, std::memory_order_acq_rel))
216 : {
217 65 : winner_index_ = index;
218 65 : stop_source_.request_stop();
219 65 : return true;
220 : }
221 125 : return false;
222 : }
223 :
224 : /** @pre try_win() returned true. */
225 8 : void set_winner_exception(std::exception_ptr ep) noexcept
226 : {
227 8 : winner_exception_ = ep;
228 8 : }
229 :
230 : // Runners signal completion directly via final_suspend; no member function needed.
231 : };
232 :
233 : /** Shared state for heterogeneous when_any operation.
234 :
235 : Coordinates winner selection, result storage, and completion tracking
236 : for all child tasks in a when_any operation. Uses composition with
237 : when_any_core for shared functionality.
238 :
239 : @par Lifetime
240 : Allocated on the parent coroutine's frame, outlives all runners.
241 :
242 : @tparam T0 First task's result type.
243 : @tparam Ts Remaining tasks' result types.
244 : */
245 : template<typename T0, typename... Ts>
246 : struct when_any_state
247 : {
248 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
249 : using variant_type = unique_variant_t<T0, Ts...>;
250 :
251 : when_any_core core_;
252 : std::optional<variant_type> result_;
253 : std::array<coro, task_count> runner_handles_{};
254 :
255 43 : when_any_state()
256 43 : : core_(task_count)
257 : {
258 43 : }
259 :
260 : // Runners self-destruct in final_suspend. No destruction needed here.
261 :
262 : /** @pre core_.try_win() returned true.
263 : @note Uses in_place_type (not index) because variant is deduplicated.
264 : */
265 : template<typename T>
266 35 : void set_winner_result(T value)
267 : noexcept(std::is_nothrow_move_constructible_v<T>)
268 : {
269 35 : result_.emplace(std::in_place_type<T>, std::move(value));
270 35 : }
271 :
272 : /** @pre core_.try_win() returned true. */
273 3 : void set_winner_void() noexcept
274 : {
275 3 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
276 3 : }
277 : };
278 :
279 : /** Wrapper coroutine that runs a single child task for when_any.
280 :
281 : Propagates executor/stop_token to the child, attempts to claim winner
282 : status on completion, and signals completion for cleanup coordination.
283 :
284 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
285 : */
286 : template<typename StateType>
287 : struct when_any_runner
288 : {
289 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
290 : {
291 : StateType* state_ = nullptr;
292 : std::size_t index_ = 0;
293 : executor_ref ex_;
294 : std::stop_token stop_token_;
295 :
296 190 : when_any_runner get_return_object() noexcept
297 : {
298 190 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 : }
300 :
301 : // Starts suspended; launcher sets up state/ex/token then resumes
302 190 : std::suspend_always initial_suspend() noexcept
303 : {
304 190 : return {};
305 : }
306 :
307 190 : auto final_suspend() noexcept
308 : {
309 : struct awaiter
310 : {
311 : promise_type* p_;
312 190 : bool await_ready() const noexcept { return false; }
313 190 : void await_suspend(coro h) noexcept
314 : {
315 : // Extract everything needed for signaling before
316 : // self-destruction. Inline dispatch may destroy
317 : // state, so we can't access members after.
318 190 : auto& core = p_->state_->core_;
319 190 : auto* counter = &core.remaining_count_;
320 190 : auto caller_ex = core.caller_ex_;
321 190 : auto cont = core.continuation_;
322 :
323 : // Self-destruct first - state no longer destroys runners
324 190 : h.destroy();
325 :
326 : // Signal completion. If last, dispatch parent.
327 : // Uses only local copies - safe even if state
328 : // is destroyed during inline dispatch.
329 190 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
330 190 : if(remaining == 1)
331 65 : caller_ex.dispatch(cont);
332 190 : }
333 0 : void await_resume() const noexcept {}
334 : };
335 190 : return awaiter{this};
336 : }
337 :
338 178 : void return_void() noexcept {}
339 :
340 : // Exceptions are valid completions in when_any (unlike when_all)
341 12 : void unhandled_exception()
342 : {
343 12 : if(state_->core_.try_win(index_))
344 8 : state_->core_.set_winner_exception(std::current_exception());
345 12 : }
346 :
347 : /** Injects executor and stop token into child awaitables. */
348 : template<class Awaitable>
349 : struct transform_awaiter
350 : {
351 : std::decay_t<Awaitable> a_;
352 : promise_type* p_;
353 :
354 190 : bool await_ready() { return a_.await_ready(); }
355 190 : auto await_resume() { return a_.await_resume(); }
356 :
357 : template<class Promise>
358 185 : auto await_suspend(std::coroutine_handle<Promise> h)
359 : {
360 185 : return a_.await_suspend(h, p_->ex_, p_->stop_token_);
361 : }
362 : };
363 :
364 : template<class Awaitable>
365 190 : auto await_transform(Awaitable&& a)
366 : {
367 : using A = std::decay_t<Awaitable>;
368 : if constexpr (IoAwaitable<A>)
369 : {
370 : return transform_awaiter<Awaitable>{
371 380 : std::forward<Awaitable>(a), this};
372 : }
373 : else
374 : {
375 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
376 : }
377 190 : }
378 : };
379 :
380 : std::coroutine_handle<promise_type> h_;
381 :
382 190 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
383 190 : : h_(h)
384 : {
385 190 : }
386 :
387 : // Enable move for all clang versions - some versions need it
388 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
389 :
390 : // Non-copyable
391 : when_any_runner(when_any_runner const&) = delete;
392 : when_any_runner& operator=(when_any_runner const&) = delete;
393 : when_any_runner& operator=(when_any_runner&&) = delete;
394 :
395 190 : auto release() noexcept
396 : {
397 190 : return std::exchange(h_, nullptr);
398 : }
399 : };
400 :
401 : /** Wraps a child awaitable, attempts to claim winner on completion.
402 :
403 : Uses requires-expressions to detect state capabilities:
404 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
405 : - set_winner_result(): for non-void tasks
406 : - Neither: for homogeneous void tasks (no result storage)
407 : */
408 : template<IoAwaitable Awaitable, typename StateType>
409 : when_any_runner<StateType>
410 190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
411 : {
412 : using T = awaitable_result_t<Awaitable>;
413 : if constexpr (std::is_void_v<T>)
414 : {
415 : co_await std::move(inner);
416 : if(state->core_.try_win(index))
417 : {
418 : // Heterogeneous void tasks store monostate in the variant
419 : if constexpr (requires { state->set_winner_void(); })
420 : state->set_winner_void();
421 : // Homogeneous void tasks have no result to store
422 : }
423 : }
424 : else
425 : {
426 : auto result = co_await std::move(inner);
427 : if(state->core_.try_win(index))
428 : {
429 : // Defensive: move should not throw (already moved once), but we
430 : // catch just in case since an uncaught exception would be devastating.
431 : try
432 : {
433 : state->set_winner_result(std::move(result));
434 : }
435 : catch(...)
436 : {
437 : state->core_.set_winner_exception(std::current_exception());
438 : }
439 : }
440 : }
441 380 : }
442 :
443 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
444 : template<IoAwaitable... Awaitables>
445 : class when_any_launcher
446 : {
447 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
448 :
449 : std::tuple<Awaitables...>* tasks_;
450 : state_type* state_;
451 :
452 : public:
453 43 : when_any_launcher(
454 : std::tuple<Awaitables...>* tasks,
455 : state_type* state)
456 43 : : tasks_(tasks)
457 43 : , state_(state)
458 : {
459 43 : }
460 :
461 43 : bool await_ready() const noexcept
462 : {
463 43 : return sizeof...(Awaitables) == 0;
464 : }
465 :
466 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
467 : destroys this object before await_suspend returns. Must not reference
468 : `this` after the final launch_one call.
469 : */
470 : template<Executor Ex>
471 43 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
472 : {
473 43 : state_->core_.continuation_ = continuation;
474 43 : state_->core_.caller_ex_ = caller_ex;
475 :
476 43 : if(parent_token.stop_possible())
477 : {
478 18 : state_->core_.parent_stop_callback_.emplace(
479 : parent_token,
480 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
481 :
482 9 : if(parent_token.stop_requested())
483 3 : state_->core_.stop_source_.request_stop();
484 : }
485 :
486 43 : auto token = state_->core_.stop_source_.get_token();
487 86 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
488 43 : (..., launch_one<Is>(caller_ex, token));
489 43 : }(std::index_sequence_for<Awaitables...>{});
490 :
491 86 : return std::noop_coroutine();
492 43 : }
493 :
494 43 : void await_resume() const noexcept
495 : {
496 43 : }
497 :
498 : private:
499 : /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
500 : template<std::size_t I, Executor Ex>
501 105 : void launch_one(Ex const& caller_ex, std::stop_token token)
502 : {
503 105 : auto runner = make_when_any_runner(
504 105 : std::move(std::get<I>(*tasks_)), state_, I);
505 :
506 105 : auto h = runner.release();
507 105 : h.promise().state_ = state_;
508 105 : h.promise().index_ = I;
509 105 : h.promise().ex_ = caller_ex;
510 105 : h.promise().stop_token_ = token;
511 :
512 105 : coro ch{h};
513 105 : state_->runner_handles_[I] = ch;
514 105 : caller_ex.dispatch(ch);
515 105 : }
516 : };
517 :
518 : } // namespace detail
519 :
520 : /** Wait for the first awaitable to complete.
521 :
522 : Races multiple heterogeneous awaitables concurrently and returns when the
523 : first one completes. The result includes the winner's index and a
524 : deduplicated variant containing the result value.
525 :
526 : @par Suspends
527 : The calling coroutine suspends when co_await is invoked. All awaitables
528 : are launched concurrently and execute in parallel. The coroutine resumes
529 : only after all awaitables have completed, even though the winner is
530 : determined by the first to finish.
531 :
532 : @par Completion Conditions
533 : @li Winner is determined when the first awaitable completes (success or exception)
534 : @li Only one task can claim winner status via atomic compare-exchange
535 : @li Once a winner exists, stop is requested for all remaining siblings
536 : @li Parent coroutine resumes only after all siblings acknowledge completion
537 : @li The winner's result is returned; if the winner threw, the exception is rethrown
538 :
539 : @par Cancellation Semantics
540 : Cancellation is supported via stop_token propagated through the
541 : IoAwaitable protocol:
542 : @li Each child awaitable receives a stop_token derived from a shared stop_source
543 : @li When the parent's stop token is activated, the stop is forwarded to all children
544 : @li When a winner is determined, stop_source_.request_stop() is called immediately
545 : @li Siblings must handle cancellation gracefully and complete before parent resumes
546 : @li Stop requests are cooperative; tasks must check and respond to them
547 :
548 : @par Concurrency/Overlap
549 : All awaitables are launched concurrently before any can complete.
550 : The launcher iterates through the arguments, starting each task on the
551 : caller's executor. Tasks may execute in parallel on multi-threaded
552 : executors or interleave on single-threaded executors. There is no
553 : guaranteed ordering of task completion.
554 :
555 : @par Notable Error Conditions
556 : @li Winner exception: if the winning task threw, that exception is rethrown
557 : @li Non-winner exceptions: silently discarded (only winner's result matters)
558 : @li Cancellation: tasks may complete via cancellation without throwing
559 :
560 : @par Example
561 : @code
562 : task<void> example() {
563 : auto [index, result] = co_await when_any(
564 : fetch_from_primary(), // task<Response>
565 : fetch_from_backup() // task<Response>
566 : );
567 : // index is 0 or 1, result holds the winner's Response
568 : auto response = std::get<Response>(result);
569 : }
570 : @endcode
571 :
572 : @par Example with Heterogeneous Types
573 : @code
574 : task<void> mixed_types() {
575 : auto [index, result] = co_await when_any(
576 : fetch_int(), // task<int>
577 : fetch_string() // task<std::string>
578 : );
579 : if (index == 0)
580 : std::cout << "Got int: " << std::get<int>(result) << "\n";
581 : else
582 : std::cout << "Got string: " << std::get<std::string>(result) << "\n";
583 : }
584 : @endcode
585 :
586 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
587 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
588 : @param a0 The first awaitable to race.
589 : @param as Additional awaitables to race concurrently.
590 : @return A task yielding a pair of (winner_index, result_variant).
591 :
592 : @throws Rethrows the winner's exception if the winning task threw an exception.
593 :
594 : @par Remarks
595 : Awaitables are moved into the coroutine frame; original objects become
596 : empty after the call. When multiple awaitables share the same return type,
597 : the variant is deduplicated to contain only unique types. Use the winner
598 : index to determine which awaitable completed first. Void awaitables
599 : contribute std::monostate to the variant.
600 :
601 : @see when_all, IoAwaitable
602 : */
603 : template<IoAwaitable A0, IoAwaitable... As>
604 43 : [[nodiscard]] auto when_any(A0 a0, As... as)
605 : -> task<detail::when_any_result_t<
606 : detail::awaitable_result_t<A0>,
607 : detail::awaitable_result_t<As>...>>
608 : {
609 : using result_type = detail::when_any_result_t<
610 : detail::awaitable_result_t<A0>,
611 : detail::awaitable_result_t<As>...>;
612 :
613 : detail::when_any_state<
614 : detail::awaitable_result_t<A0>,
615 : detail::awaitable_result_t<As>...> state;
616 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
617 :
618 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
619 :
620 : if(state.core_.winner_exception_)
621 : std::rethrow_exception(state.core_.winner_exception_);
622 :
623 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
624 86 : }
625 :
626 : /** Concept for ranges of full I/O awaitables.
627 :
628 : A range satisfies `IoAwaitableRange` if it is a sized input range
629 : whose value type satisfies @ref IoAwaitable. This enables when_any
630 : to accept any container or view of awaitables, not just std::vector.
631 :
632 : @tparam R The range type.
633 :
634 : @par Requirements
635 : @li `R` must satisfy `std::ranges::input_range`
636 : @li `R` must satisfy `std::ranges::sized_range`
637 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
638 :
639 : @par Syntactic Requirements
640 : Given `r` of type `R`:
641 : @li `std::ranges::begin(r)` is valid
642 : @li `std::ranges::end(r)` is valid
643 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
644 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
645 :
646 : @par Example
647 : @code
648 : template<IoAwaitableRange R>
649 : task<void> race_all(R&& awaitables) {
650 : auto winner = co_await when_any(std::forward<R>(awaitables));
651 : // Process winner...
652 : }
653 : @endcode
654 :
655 : @see when_any, IoAwaitable
656 : */
657 : template<typename R>
658 : concept IoAwaitableRange =
659 : std::ranges::input_range<R> &&
660 : std::ranges::sized_range<R> &&
661 : IoAwaitable<std::ranges::range_value_t<R>>;
662 :
663 : namespace detail {
664 :
665 : /** Shared state for homogeneous when_any (range overload).
666 :
667 : Uses composition with when_any_core for shared functionality.
668 : Simpler than heterogeneous: optional<T> instead of variant, vector
669 : instead of array for runner handles.
670 : */
671 : template<typename T>
672 : struct when_any_homogeneous_state
673 : {
674 : when_any_core core_;
675 : std::optional<T> result_;
676 : std::vector<coro> runner_handles_;
677 :
678 19 : explicit when_any_homogeneous_state(std::size_t count)
679 19 : : core_(count)
680 38 : , runner_handles_(count)
681 : {
682 19 : }
683 :
684 : // Runners self-destruct in final_suspend. No destruction needed here.
685 :
686 : /** @pre core_.try_win() returned true. */
687 17 : void set_winner_result(T value)
688 : noexcept(std::is_nothrow_move_constructible_v<T>)
689 : {
690 17 : result_.emplace(std::move(value));
691 17 : }
692 : };
693 :
694 : /** Specialization for void tasks (no result storage needed). */
695 : template<>
696 : struct when_any_homogeneous_state<void>
697 : {
698 : when_any_core core_;
699 : std::vector<coro> runner_handles_;
700 :
701 3 : explicit when_any_homogeneous_state(std::size_t count)
702 3 : : core_(count)
703 6 : , runner_handles_(count)
704 : {
705 3 : }
706 :
707 : // Runners self-destruct in final_suspend. No destruction needed here.
708 :
709 : // No set_winner_result - void tasks have no result to store
710 : };
711 :
712 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
713 : template<IoAwaitableRange Range>
714 : class when_any_homogeneous_launcher
715 : {
716 : using Awaitable = std::ranges::range_value_t<Range>;
717 : using T = awaitable_result_t<Awaitable>;
718 :
719 : Range* range_;
720 : when_any_homogeneous_state<T>* state_;
721 :
722 : public:
723 22 : when_any_homogeneous_launcher(
724 : Range* range,
725 : when_any_homogeneous_state<T>* state)
726 22 : : range_(range)
727 22 : , state_(state)
728 : {
729 22 : }
730 :
731 22 : bool await_ready() const noexcept
732 : {
733 22 : return std::ranges::empty(*range_);
734 : }
735 :
736 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
737 : destroys this object before await_suspend returns. Must not reference
738 : `this` after dispatching begins.
739 :
740 : Two-phase approach:
741 : 1. Create all runners (safe - no dispatch yet)
742 : 2. Dispatch all runners (any may complete synchronously)
743 : */
744 : template<Executor Ex>
745 22 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
746 : {
747 22 : state_->core_.continuation_ = continuation;
748 22 : state_->core_.caller_ex_ = caller_ex;
749 :
750 22 : if(parent_token.stop_possible())
751 : {
752 14 : state_->core_.parent_stop_callback_.emplace(
753 : parent_token,
754 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
755 :
756 7 : if(parent_token.stop_requested())
757 4 : state_->core_.stop_source_.request_stop();
758 : }
759 :
760 22 : auto token = state_->core_.stop_source_.get_token();
761 :
762 : // Phase 1: Create all runners without dispatching.
763 : // This iterates over *range_ safely because no runners execute yet.
764 22 : std::size_t index = 0;
765 107 : for(auto&& a : *range_)
766 : {
767 85 : auto runner = make_when_any_runner(
768 85 : std::move(a), state_, index);
769 :
770 85 : auto h = runner.release();
771 85 : h.promise().state_ = state_;
772 85 : h.promise().index_ = index;
773 85 : h.promise().ex_ = caller_ex;
774 85 : h.promise().stop_token_ = token;
775 :
776 85 : state_->runner_handles_[index] = coro{h};
777 85 : ++index;
778 : }
779 :
780 : // Phase 2: Dispatch all runners. Any may complete synchronously.
781 : // After last dispatch, state_ and this may be destroyed.
782 : // Use raw pointer/count captured before dispatching.
783 22 : coro* handles = state_->runner_handles_.data();
784 22 : std::size_t count = state_->runner_handles_.size();
785 107 : for(std::size_t i = 0; i < count; ++i)
786 85 : caller_ex.dispatch(handles[i]);
787 :
788 44 : return std::noop_coroutine();
789 22 : }
790 :
791 22 : void await_resume() const noexcept
792 : {
793 22 : }
794 : };
795 :
796 : } // namespace detail
797 :
798 : /** Wait for the first awaitable to complete (range overload).
799 :
800 : Races a range of awaitables with the same result type. Accepts any
801 : sized input range of IoAwaitable types, enabling use with arrays,
802 : spans, or custom containers.
803 :
804 : @par Suspends
805 : The calling coroutine suspends when co_await is invoked. All awaitables
806 : in the range are launched concurrently and execute in parallel. The
807 : coroutine resumes only after all awaitables have completed, even though
808 : the winner is determined by the first to finish.
809 :
810 : @par Completion Conditions
811 : @li Winner is determined when the first awaitable completes (success or exception)
812 : @li Only one task can claim winner status via atomic compare-exchange
813 : @li Once a winner exists, stop is requested for all remaining siblings
814 : @li Parent coroutine resumes only after all siblings acknowledge completion
815 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
816 :
817 : @par Cancellation Semantics
818 : Cancellation is supported via stop_token propagated through the
819 : IoAwaitable protocol:
820 : @li Each child awaitable receives a stop_token derived from a shared stop_source
821 : @li When the parent's stop token is activated, the stop is forwarded to all children
822 : @li When a winner is determined, stop_source_.request_stop() is called immediately
823 : @li Siblings must handle cancellation gracefully and complete before parent resumes
824 : @li Stop requests are cooperative; tasks must check and respond to them
825 :
826 : @par Concurrency/Overlap
827 : All awaitables are launched concurrently before any can complete.
828 : The launcher iterates through the range, starting each task on the
829 : caller's executor. Tasks may execute in parallel on multi-threaded
830 : executors or interleave on single-threaded executors. There is no
831 : guaranteed ordering of task completion.
832 :
833 : @par Notable Error Conditions
834 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
835 : @li Winner exception: if the winning task threw, that exception is rethrown
836 : @li Non-winner exceptions: silently discarded (only winner's result matters)
837 : @li Cancellation: tasks may complete via cancellation without throwing
838 :
839 : @par Example
840 : @code
841 : task<void> example() {
842 : std::array<task<Response>, 3> requests = {
843 : fetch_from_server(0),
844 : fetch_from_server(1),
845 : fetch_from_server(2)
846 : };
847 :
848 : auto [index, response] = co_await when_any(std::move(requests));
849 : }
850 : @endcode
851 :
852 : @par Example with Vector
853 : @code
854 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
855 : std::vector<task<Response>> requests;
856 : for (auto const& server : servers)
857 : requests.push_back(fetch_from(server));
858 :
859 : auto [index, response] = co_await when_any(std::move(requests));
860 : co_return response;
861 : }
862 : @endcode
863 :
864 : @tparam R Range type satisfying IoAwaitableRange.
865 : @param awaitables Range of awaitables to race concurrently (must not be empty).
866 : @return A task yielding a pair of (winner_index, result).
867 :
868 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
869 : @throws Rethrows the winner's exception if the winning task threw an exception.
870 :
871 : @par Remarks
872 : Elements are moved from the range; for lvalue ranges, the original
873 : container will have moved-from elements after this call. The range
874 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
875 : the variadic overload, no variant wrapper is needed since all tasks
876 : share the same return type.
877 :
878 : @see when_any, IoAwaitableRange
879 : */
880 : template<IoAwaitableRange R>
881 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
882 21 : [[nodiscard]] auto when_any(R&& awaitables)
883 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
884 : {
885 : using Awaitable = std::ranges::range_value_t<R>;
886 : using T = detail::awaitable_result_t<Awaitable>;
887 : using result_type = std::pair<std::size_t, T>;
888 : using OwnedRange = std::remove_cvref_t<R>;
889 :
890 : auto count = std::ranges::size(awaitables);
891 : if(count == 0)
892 : throw std::invalid_argument("when_any requires at least one awaitable");
893 :
894 : // Move/copy range onto coroutine frame to ensure lifetime
895 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
896 :
897 : detail::when_any_homogeneous_state<T> state(count);
898 :
899 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
900 :
901 : if(state.core_.winner_exception_)
902 : std::rethrow_exception(state.core_.winner_exception_);
903 :
904 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
905 42 : }
906 :
907 : /** Wait for the first awaitable to complete (void range overload).
908 :
909 : Races a range of void-returning awaitables. Since void awaitables have
910 : no result value, only the winner's index is returned.
911 :
912 : @par Suspends
913 : The calling coroutine suspends when co_await is invoked. All awaitables
914 : in the range are launched concurrently and execute in parallel. The
915 : coroutine resumes only after all awaitables have completed, even though
916 : the winner is determined by the first to finish.
917 :
918 : @par Completion Conditions
919 : @li Winner is determined when the first awaitable completes (success or exception)
920 : @li Only one task can claim winner status via atomic compare-exchange
921 : @li Once a winner exists, stop is requested for all remaining siblings
922 : @li Parent coroutine resumes only after all siblings acknowledge completion
923 : @li The winner's index is returned; if the winner threw, the exception is rethrown
924 :
925 : @par Cancellation Semantics
926 : Cancellation is supported via stop_token propagated through the
927 : IoAwaitable protocol:
928 : @li Each child awaitable receives a stop_token derived from a shared stop_source
929 : @li When the parent's stop token is activated, the stop is forwarded to all children
930 : @li When a winner is determined, stop_source_.request_stop() is called immediately
931 : @li Siblings must handle cancellation gracefully and complete before parent resumes
932 : @li Stop requests are cooperative; tasks must check and respond to them
933 :
934 : @par Concurrency/Overlap
935 : All awaitables are launched concurrently before any can complete.
936 : The launcher iterates through the range, starting each task on the
937 : caller's executor. Tasks may execute in parallel on multi-threaded
938 : executors or interleave on single-threaded executors. There is no
939 : guaranteed ordering of task completion.
940 :
941 : @par Notable Error Conditions
942 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
943 : @li Winner exception: if the winning task threw, that exception is rethrown
944 : @li Non-winner exceptions: silently discarded (only winner's result matters)
945 : @li Cancellation: tasks may complete via cancellation without throwing
946 :
947 : @par Example
948 : @code
949 : task<void> example() {
950 : std::vector<task<void>> tasks;
951 : for (int i = 0; i < 5; ++i)
952 : tasks.push_back(background_work(i));
953 :
954 : std::size_t winner = co_await when_any(std::move(tasks));
955 : // winner is the index of the first task to complete
956 : }
957 : @endcode
958 :
959 : @par Example with Timeout
960 : @code
961 : task<void> with_timeout() {
962 : std::vector<task<void>> tasks;
963 : tasks.push_back(long_running_operation());
964 : tasks.push_back(delay(std::chrono::seconds(5)));
965 :
966 : std::size_t winner = co_await when_any(std::move(tasks));
967 : if (winner == 1) {
968 : // Timeout occurred
969 : }
970 : }
971 : @endcode
972 :
973 : @tparam R Range type satisfying IoAwaitableRange with void result.
974 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
975 : @return A task yielding the winner's index (zero-based).
976 :
977 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
978 : @throws Rethrows the winner's exception if the winning task threw an exception.
979 :
980 : @par Remarks
981 : Elements are moved from the range; for lvalue ranges, the original
982 : container will have moved-from elements after this call. The range
983 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
984 : the non-void overload, no result storage is needed since void tasks
985 : produce no value.
986 :
987 : @see when_any, IoAwaitableRange
988 : */
989 : template<IoAwaitableRange R>
990 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
991 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
992 : {
993 : using OwnedRange = std::remove_cvref_t<R>;
994 :
995 : auto count = std::ranges::size(awaitables);
996 : if(count == 0)
997 : throw std::invalid_argument("when_any requires at least one awaitable");
998 :
999 : // Move/copy range onto coroutine frame to ensure lifetime
1000 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
1001 :
1002 : detail::when_any_homogeneous_state<void> state(count);
1003 :
1004 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1005 :
1006 : if(state.core_.winner_exception_)
1007 : std::rethrow_exception(state.core_.winner_exception_);
1008 :
1009 : co_return state.core_.winner_index_;
1010 6 : }
1011 :
1012 : } // namespace capy
1013 : } // namespace boost
1014 :
1015 : #endif
|