summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/fiber/examples/wait_stuff.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/boost/libs/fiber/examples/wait_stuff.cpp
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/boost/libs/fiber/examples/wait_stuff.cpp')
-rw-r--r--src/boost/libs/fiber/examples/wait_stuff.cpp984
1 files changed, 984 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/wait_stuff.cpp b/src/boost/libs/fiber/examples/wait_stuff.cpp
new file mode 100644
index 00000000..00f64788
--- /dev/null
+++ b/src/boost/libs/fiber/examples/wait_stuff.cpp
@@ -0,0 +1,984 @@
+// Copyright Nat Goodspeed 2015.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+//
+#include <algorithm>
+#include <cassert>
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <boost/fiber/all.hpp>
+#include <boost/variant/variant.hpp>
+#include <boost/variant/get.hpp>
+
+// These are wait_something() functions rather than when_something()
+// functions. A big part of the point of the Fiber library is to model
+// sequencing using the processor's instruction pointer rather than chains of
+// callbacks. The future-oriented when_all() / when_any() functions are still
+// based on chains of callbacks. With Fiber, we can do better.
+
+/*****************************************************************************
+* Verbose
+*****************************************************************************/
+class Verbose {
+public:
+ Verbose( std::string const& d):
+ desc( d) {
+ std::cout << desc << " start" << std::endl;
+ }
+
+ ~Verbose() {
+ std::cout << desc << " stop" << std::endl;
+ }
+
+ Verbose( Verbose const&) = delete;
+ Verbose & operator=( Verbose const&) = delete;
+
+private:
+ const std::string desc;
+};
+
+/*****************************************************************************
+* Runner and Example
+*****************************************************************************/
+// collect and ultimately run every Example
+class Runner {
+ typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
+
+public:
+ void add( std::string const& desc, std::function< void() > const& func) {
+ functions_.push_back( function_list::value_type( desc, func) );
+ }
+
+ void run() {
+ for ( function_list::value_type const& pair : functions_) {
+ Verbose v( pair.first);
+ pair.second();
+ }
+ }
+
+private:
+ function_list functions_;
+};
+
+Runner runner;
+
+// Example allows us to embed Runner::add() calls at module scope
+struct Example {
+ Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
+ runner.add( desc, func);
+ }
+};
+
+/*****************************************************************************
+* example task functions
+*****************************************************************************/
+//[wait_sleeper
+template< typename T >
+T sleeper_impl( T item, int ms, bool thrw = false) {
+ std::ostringstream descb, funcb;
+ descb << item;
+ std::string desc( descb.str() );
+ funcb << " sleeper(" << item << ")";
+ Verbose v( funcb.str() );
+
+ boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
+ if ( thrw) {
+ throw std::runtime_error( desc);
+ }
+ return item;
+}
+//]
+
+inline
+std::string sleeper( std::string const& item, int ms, bool thrw = false) {
+ return sleeper_impl( item, ms, thrw);
+}
+
+inline
+double sleeper( double item, int ms, bool thrw = false) {
+ return sleeper_impl( item, ms, thrw);
+}
+
+inline
+int sleeper(int item, int ms, bool thrw = false) {
+ return sleeper_impl( item, ms, thrw);
+}
+
+/*****************************************************************************
+* Done
+*****************************************************************************/
+//[wait_done
+// Wrap canonical pattern for condition_variable + bool flag
+struct Done {
+private:
+ boost::fibers::condition_variable cond;
+ boost::fibers::mutex mutex;
+ bool ready = false;
+
+public:
+ typedef std::shared_ptr< Done > ptr;
+
+ void wait() {
+ std::unique_lock< boost::fibers::mutex > lock( mutex);
+ cond.wait( lock, [this](){ return ready; });
+ }
+
+ void notify() {
+ {
+ std::unique_lock< boost::fibers::mutex > lock( mutex);
+ ready = true;
+ } // release mutex
+ cond.notify_one();
+ }
+};
+//]
+
+/*****************************************************************************
+* when_any, simple completion
+*****************************************************************************/
+//[wait_first_simple_impl
+// Degenerate case: when there are no functions to wait for, return
+// immediately.
+void wait_first_simple_impl( Done::ptr) {
+}
+
+// When there's at least one function to wait for, launch it and recur to
+// process the rest.
+template< typename Fn, typename ... Fns >
+void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
+ boost::fibers::fiber( [done, function](){
+ function();
+ done->notify();
+ }).detach();
+ wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
+}
+//]
+
+// interface function: instantiate Done, launch tasks, wait for Done
+//[wait_first_simple
+template< typename ... Fns >
+void wait_first_simple( Fns && ... functions) {
+ // Use shared_ptr because each function's fiber will bind it separately,
+ // and we're going to return before the last of them completes.
+ auto done( std::make_shared< Done >() );
+ wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
+ done->wait();
+}
+//]
+
+// example usage
+Example wfs( runner, "wait_first_simple()", [](){
+//[wait_first_simple_ex
+ wait_first_simple(
+ [](){ sleeper("wfs_long", 150); },
+ [](){ sleeper("wfs_medium", 100); },
+ [](){ sleeper("wfs_short", 50); });
+//]
+});
+
+/*****************************************************************************
+* when_any, return value
+*****************************************************************************/
+// When there's only one function, call this overload
+//[wait_first_value_impl
+template< typename T, typename Fn >
+void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
+ Fn && function) {
+ boost::fibers::fiber( [chan, function](){
+ // Ignore channel_op_status returned by push():
+ // might be closed; we simply don't care.
+ chan->push( function() );
+ }).detach();
+}
+//]
+
+// When there are two or more functions, call this overload
+template< typename T, typename Fn0, typename Fn1, typename ... Fns >
+void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
+ Fn0 && function0,
+ Fn1 && function1,
+ Fns && ... functions) {
+ // process the first function using the single-function overload
+ wait_first_value_impl< T >( chan,
+ std::forward< Fn0 >( function0) );
+ // then recur to process the rest
+ wait_first_value_impl< T >( chan,
+ std::forward< Fn1 >( function1),
+ std::forward< Fns >( functions) ... );
+}
+
+//[wait_first_value
+// Assume that all passed functions have the same return type. The return type
+// of wait_first_value() is the return type of the first passed function. It is
+// simply invalid to pass NO functions.
+template< typename Fn, typename ... Fns >
+typename std::result_of< Fn() >::type
+wait_first_value( Fn && function, Fns && ... functions) {
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
+ // launch all the relevant fibers
+ wait_first_value_impl< return_t >( chanp,
+ std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // retrieve the first value
+ return_t value( chanp->value_pop() );
+ // close the channel: no subsequent push() has to succeed
+ chanp->close();
+ return value;
+}
+//]
+
+// example usage
+Example wfv( runner, "wait_first_value()", [](){
+//[wait_first_value_ex
+ std::string result = wait_first_value(
+ [](){ return sleeper("wfv_third", 150); },
+ [](){ return sleeper("wfv_second", 100); },
+ [](){ return sleeper("wfv_first", 50); });
+ std::cout << "wait_first_value() => " << result << std::endl;
+ assert(result == "wfv_first");
+//]
+});
+
+/*****************************************************************************
+* when_any, produce first outcome, whether result or exception
+*****************************************************************************/
+// When there's only one function, call this overload.
+//[wait_first_outcome_impl
+template< typename T, typename CHANP, typename Fn >
+void wait_first_outcome_impl( CHANP chan, Fn && function) {
+ boost::fibers::fiber(
+ // Use std::bind() here for C++11 compatibility. C++11 lambda capture
+ // can't move a move-only Fn type, but bind() can. Let bind() move the
+ // channel pointer and the function into the bound object, passing
+ // references into the lambda.
+ std::bind(
+ []( CHANP & chan,
+ typename std::decay< Fn >::type & function) {
+ // Instantiate a packaged_task to capture any exception thrown by
+ // function.
+ boost::fibers::packaged_task< T() > task( function);
+ // Immediately run this packaged_task on same fiber. We want
+ // function() to have completed BEFORE we push the future.
+ task();
+ // Pass the corresponding future to consumer. Ignore
+ // channel_op_status returned by push(): might be closed; we
+ // simply don't care.
+ chan->push( task.get_future() );
+ },
+ chan,
+ std::forward< Fn >( function)
+ )).detach();
+}
+//]
+
+// When there are two or more functions, call this overload
+template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
+void wait_first_outcome_impl( CHANP chan,
+ Fn0 && function0,
+ Fn1 && function1,
+ Fns && ... functions) {
+ // process the first function using the single-function overload
+ wait_first_outcome_impl< T >( chan,
+ std::forward< Fn0 >( function0) );
+ // then recur to process the rest
+ wait_first_outcome_impl< T >( chan,
+ std::forward< Fn1 >( function1),
+ std::forward< Fns >( functions) ... );
+}
+
+// Assume that all passed functions have the same return type. The return type
+// of wait_first_outcome() is the return type of the first passed function. It is
+// simply invalid to pass NO functions.
+//[wait_first_outcome
+template< typename Fn, typename ... Fns >
+typename std::result_of< Fn() >::type
+wait_first_outcome( Fn && function, Fns && ... functions) {
+ // In this case, the value we pass through the channel is actually a
+ // future -- which is already ready. future can carry either a value or an
+ // exception.
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef boost::fibers::future< return_t > future_t;
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
+ auto chanp(std::make_shared< channel_t >( 64) );
+ // launch all the relevant fibers
+ wait_first_outcome_impl< return_t >( chanp,
+ std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // retrieve the first future
+ future_t future( chanp->value_pop() );
+ // close the channel: no subsequent push() has to succeed
+ chanp->close();
+ // either return value or throw exception
+ return future.get();
+}
+//]
+
+// example usage
+Example wfo( runner, "wait_first_outcome()", [](){
+//[wait_first_outcome_ex
+ std::string result = wait_first_outcome(
+ [](){ return sleeper("wfos_first", 50); },
+ [](){ return sleeper("wfos_second", 100); },
+ [](){ return sleeper("wfos_third", 150); });
+ std::cout << "wait_first_outcome(success) => " << result << std::endl;
+ assert(result == "wfos_first");
+
+ std::string thrown;
+ try {
+ result = wait_first_outcome(
+ [](){ return sleeper("wfof_first", 50, true); },
+ [](){ return sleeper("wfof_second", 100); },
+ [](){ return sleeper("wfof_third", 150); });
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_first_outcome(fail) threw '" << thrown
+ << "'" << std::endl;
+ assert(thrown == "wfof_first");
+//]
+});
+
+/*****************************************************************************
+* when_any, collect exceptions until success; throw exception_list if no
+* success
+*****************************************************************************/
+// define an exception to aggregate exception_ptrs; prefer
+// std::exception_list (N4407 et al.) once that becomes available
+//[exception_list
+class exception_list : public std::runtime_error {
+public:
+ exception_list( std::string const& what) :
+ std::runtime_error( what) {
+ }
+
+ typedef std::vector< std::exception_ptr > bundle_t;
+
+ // N4407 proposed std::exception_list API
+ typedef bundle_t::const_iterator iterator;
+
+ std::size_t size() const noexcept {
+ return bundle_.size();
+ }
+
+ iterator begin() const noexcept {
+ return bundle_.begin();
+ }
+
+ iterator end() const noexcept {
+ return bundle_.end();
+ }
+
+ // extension to populate
+ void add( std::exception_ptr ep) {
+ bundle_.push_back( ep);
+ }
+
+private:
+ bundle_t bundle_;
+};
+//]
+
+// Assume that all passed functions have the same return type. The return type
+// of wait_first_success() is the return type of the first passed function. It is
+// simply invalid to pass NO functions.
+//[wait_first_success
+template< typename Fn, typename ... Fns >
+typename std::result_of< Fn() >::type
+wait_first_success( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ // In this case, the value we pass through the channel is actually a
+ // future -- which is already ready. future can carry either a value or an
+ // exception.
+ typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
+ typedef boost::fibers::future< return_t > future_t;
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
+ // launch all the relevant fibers
+ wait_first_outcome_impl< return_t >( chanp,
+ std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // instantiate exception_list, just in case
+ exception_list exceptions("wait_first_success() produced only errors");
+ // retrieve up to 'count' results -- but stop there!
+ for ( std::size_t i = 0; i < count; ++i) {
+ // retrieve the next future
+ future_t future( chanp->value_pop() );
+ // retrieve exception_ptr if any
+ std::exception_ptr error( future.get_exception_ptr() );
+ // if no error, then yay, return value
+ if ( ! error) {
+ // close the channel: no subsequent push() has to succeed
+ chanp->close();
+ // show caller the value we got
+ return future.get();
+ }
+
+ // error is non-null: collect
+ exceptions.add( error);
+ }
+ // We only arrive here when every passed function threw an exception.
+ // Throw our collection to inform caller.
+ throw exceptions;
+}
+//]
+
+// example usage
+Example wfss( runner, "wait_first_success()", [](){
+//[wait_first_success_ex
+ std::string result = wait_first_success(
+ [](){ return sleeper("wfss_first", 50, true); },
+ [](){ return sleeper("wfss_second", 100); },
+ [](){ return sleeper("wfss_third", 150); });
+ std::cout << "wait_first_success(success) => " << result << std::endl;
+ assert(result == "wfss_second");
+//]
+
+ std::string thrown;
+ std::size_t count = 0;
+ try {
+ result = wait_first_success(
+ [](){ return sleeper("wfsf_first", 50, true); },
+ [](){ return sleeper("wfsf_second", 100, true); },
+ [](){ return sleeper("wfsf_third", 150, true); });
+ } catch ( exception_list const& e) {
+ thrown = e.what();
+ count = e.size();
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_first_success(fail) threw '" << thrown << "': "
+ << count << " errors" << std::endl;
+ assert(thrown == "wait_first_success() produced only errors");
+ assert(count == 3);
+});
+
+/*****************************************************************************
+* when_any, heterogeneous
+*****************************************************************************/
+//[wait_first_value_het
+// No need to break out the first Fn for interface function: let the compiler
+// complain if empty.
+// Our functions have different return types, and we might have to return any
+// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
+// parameter pack.
+template< typename ... Fns >
+boost::variant< typename std::result_of< Fns() >::type ... >
+wait_first_value_het( Fns && ... functions) {
+ // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
+ typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
+ // launch all the relevant fibers
+ wait_first_value_impl< return_t >( chanp,
+ std::forward< Fns >( functions) ... );
+ // retrieve the first value
+ return_t value( chanp->value_pop() );
+ // close the channel: no subsequent push() has to succeed
+ chanp->close();
+ return value;
+}
+//]
+
+// example usage
+Example wfvh( runner, "wait_first_value_het()", [](){
+//[wait_first_value_het_ex
+ boost::variant< std::string, double, int > result =
+ wait_first_value_het(
+ [](){ return sleeper("wfvh_third", 150); },
+ [](){ return sleeper(3.14, 100); },
+ [](){ return sleeper(17, 50); });
+ std::cout << "wait_first_value_het() => " << result << std::endl;
+ assert(boost::get< int >( result) == 17);
+//]
+});
+
+/*****************************************************************************
+* when_all, simple completion
+*****************************************************************************/
+// Degenerate case: when there are no functions to wait for, return
+// immediately.
+void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
+}
+
+// When there's at least one function to wait for, launch it and recur to
+// process the rest.
+//[wait_all_simple_impl
+template< typename Fn, typename ... Fns >
+void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
+ Fn && function, Fns && ... functions) {
+ boost::fibers::fiber(
+ std::bind(
+ []( std::shared_ptr< boost::fibers::barrier > & barrier,
+ typename std::decay< Fn >::type & function) mutable {
+ function();
+ barrier->wait();
+ },
+ barrier,
+ std::forward< Fn >( function)
+ )).detach();
+ wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
+}
+//]
+
+// interface function: instantiate barrier, launch tasks, wait for barrier
+//[wait_all_simple
+template< typename ... Fns >
+void wait_all_simple( Fns && ... functions) {
+ std::size_t count( sizeof ... ( functions) );
+ // Initialize a barrier(count+1) because we'll immediately wait on it. We
+ // don't want to wake up until 'count' more fibers wait on it. Even though
+ // we'll stick around until the last of them completes, use shared_ptr
+ // anyway because it's easier to be confident about lifespan issues.
+ auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
+ wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
+ barrier->wait();
+}
+//]
+
+// example usage
+Example was( runner, "wait_all_simple()", [](){
+//[wait_all_simple_ex
+ wait_all_simple(
+ [](){ sleeper("was_long", 150); },
+ [](){ sleeper("was_medium", 100); },
+ [](){ sleeper("was_short", 50); });
+//]
+});
+
+/*****************************************************************************
+* when_all, return values
+*****************************************************************************/
+//[wait_nchannel
+// Introduce a channel facade that closes the channel once a specific number
+// of items has been pushed. This allows an arbitrary consumer to read until
+// 'closed' without itself having to count items.
+template< typename T >
+class nchannel {
+public:
+ nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
+ std::size_t lm):
+ chan_( chan),
+ limit_( lm) {
+ assert(chan_);
+ if ( 0 == limit_) {
+ chan_->close();
+ }
+ }
+
+ boost::fibers::channel_op_status push( T && va) {
+ boost::fibers::channel_op_status ok =
+ chan_->push( std::forward< T >( va) );
+ if ( ok == boost::fibers::channel_op_status::success &&
+ --limit_ == 0) {
+ // after the 'limit_'th successful push, close the channel
+ chan_->close();
+ }
+ return ok;
+ }
+
+private:
+ std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
+ std::size_t limit_;
+};
+//]
+
+// When there's only one function, call this overload
+//[wait_all_values_impl
+template< typename T, typename Fn >
+void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
+ Fn && function) {
+ boost::fibers::fiber( [chan, function](){
+ chan->push(function());
+ }).detach();
+}
+//]
+
+// When there are two or more functions, call this overload
+template< typename T, typename Fn0, typename Fn1, typename ... Fns >
+void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
+ Fn0 && function0,
+ Fn1 && function1,
+ Fns && ... functions) {
+ // process the first function using the single-function overload
+ wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
+ // then recur to process the rest
+ wait_all_values_impl< T >( chan,
+ std::forward< Fn1 >( function1),
+ std::forward< Fns >( functions) ... );
+}
+
+//[wait_all_values_source
+// Return a shared_ptr<buffered_channel<T>> from which the caller can
+// retrieve each new result as it arrives, until 'closed'.
+template< typename Fn, typename ... Fns >
+std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
+wait_all_values_source( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
+ // make the channel
+ auto chanp( std::make_shared< channel_t >( 64) );
+ // and make an nchannel facade to close it after 'count' items
+ auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
+ // pass that nchannel facade to all the relevant fibers
+ wait_all_values_impl< return_t >( ncp,
+ std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // then return the channel for consumer
+ return chanp;
+}
+//]
+
+// When all passed functions have completed, return vector<T> containing
+// collected results. Assume that all passed functions have the same return
+// type. It is simply invalid to pass NO functions.
+//[wait_all_values
+template< typename Fn, typename ... Fns >
+std::vector< typename std::result_of< Fn() >::type >
+wait_all_values( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef std::vector< return_t > vector_t;
+ vector_t results;
+ results.reserve( count);
+
+ // get channel
+ std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
+ wait_all_values_source( std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // fill results vector
+ return_t value;
+ while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
+ results.push_back( value);
+ }
+ // return vector to caller
+ return results;
+}
+//]
+
+Example wav( runner, "wait_all_values()", [](){
+//[wait_all_values_source_ex
+ std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
+ wait_all_values_source(
+ [](){ return sleeper("wavs_third", 150); },
+ [](){ return sleeper("wavs_second", 100); },
+ [](){ return sleeper("wavs_first", 50); });
+ std::string value;
+ while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
+ std::cout << "wait_all_values_source() => '" << value
+ << "'" << std::endl;
+ }
+//]
+
+//[wait_all_values_ex
+ std::vector< std::string > values =
+ wait_all_values(
+ [](){ return sleeper("wav_late", 150); },
+ [](){ return sleeper("wav_middle", 100); },
+ [](){ return sleeper("wav_early", 50); });
+//]
+ std::cout << "wait_all_values() =>";
+ for ( std::string const& v : values) {
+ std::cout << " '" << v << "'";
+ }
+ std::cout << std::endl;
+});
+
+/*****************************************************************************
+* when_all, throw first exception
+*****************************************************************************/
+//[wait_all_until_error_source
+// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
+// get() each new result as it arrives, until 'closed'.
+template< typename Fn, typename ... Fns >
+std::shared_ptr<
+ boost::fibers::buffered_channel<
+ boost::fibers::future<
+ typename std::result_of< Fn() >::type > > >
+wait_all_until_error_source( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef boost::fibers::future< return_t > future_t;
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
+ // make the channel
+ auto chanp( std::make_shared< channel_t >( 64) );
+ // and make an nchannel facade to close it after 'count' items
+ auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
+ // pass that nchannel facade to all the relevant fibers
+ wait_first_outcome_impl< return_t >( ncp,
+ std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... );
+ // then return the channel for consumer
+ return chanp;
+}
+//]
+
+// When all passed functions have completed, return vector<T> containing
+// collected results, or throw the first exception thrown by any of the passed
+// functions. Assume that all passed functions have the same return type. It
+// is simply invalid to pass NO functions.
+//[wait_all_until_error
+template< typename Fn, typename ... Fns >
+std::vector< typename std::result_of< Fn() >::type >
+wait_all_until_error( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef typename boost::fibers::future< return_t > future_t;
+ typedef std::vector< return_t > vector_t;
+ vector_t results;
+ results.reserve( count);
+
+ // get channel
+ std::shared_ptr<
+ boost::fibers::buffered_channel< future_t > > chan(
+ wait_all_until_error_source( std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... ) );
+ // fill results vector
+ future_t future;
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
+ results.push_back( future.get() );
+ }
+ // return vector to caller
+ return results;
+}
+//]
+
+Example waue( runner, "wait_all_until_error()", [](){
+//[wait_all_until_error_source_ex
+ typedef boost::fibers::future< std::string > future_t;
+ std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
+ wait_all_until_error_source(
+ [](){ return sleeper("wauess_third", 150); },
+ [](){ return sleeper("wauess_second", 100); },
+ [](){ return sleeper("wauess_first", 50); });
+ future_t future;
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
+ std::string value( future.get() );
+ std::cout << "wait_all_until_error_source(success) => '" << value
+ << "'" << std::endl;
+ }
+//]
+
+ chan = wait_all_until_error_source(
+ [](){ return sleeper("wauesf_third", 150); },
+ [](){ return sleeper("wauesf_second", 100, true); },
+ [](){ return sleeper("wauesf_first", 50); });
+//[wait_all_until_error_ex
+ std::string thrown;
+//<-
+ try {
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
+ std::string value( future.get() );
+ std::cout << "wait_all_until_error_source(fail) => '" << value
+ << "'" << std::endl;
+ }
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_all_until_error_source(fail) threw '" << thrown
+ << "'" << std::endl;
+
+ thrown.clear();
+//->
+ try {
+ std::vector< std::string > values = wait_all_until_error(
+ [](){ return sleeper("waue_late", 150); },
+ [](){ return sleeper("waue_middle", 100, true); },
+ [](){ return sleeper("waue_early", 50); });
+//<-
+ std::cout << "wait_all_until_error(fail) =>";
+ for ( std::string const& v : values) {
+ std::cout << " '" << v << "'";
+ }
+ std::cout << std::endl;
+//->
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_all_until_error(fail) threw '" << thrown
+ << "'" << std::endl;
+//]
+});
+
+/*****************************************************************************
+* when_all, collect exceptions
+*****************************************************************************/
+// When all passed functions have succeeded, return vector<T> containing
+// collected results, or throw exception_list containing all exceptions thrown
+// by any of the passed functions. Assume that all passed functions have the
+// same return type. It is simply invalid to pass NO functions.
+//[wait_all_collect_errors
+template< typename Fn, typename ... Fns >
+std::vector< typename std::result_of< Fn() >::type >
+wait_all_collect_errors( Fn && function, Fns && ... functions) {
+ std::size_t count( 1 + sizeof ... ( functions) );
+ typedef typename std::result_of< Fn() >::type return_t;
+ typedef typename boost::fibers::future< return_t > future_t;
+ typedef std::vector< return_t > vector_t;
+ vector_t results;
+ results.reserve( count);
+ exception_list exceptions("wait_all_collect_errors() exceptions");
+
+ // get channel
+ std::shared_ptr<
+ boost::fibers::buffered_channel< future_t > > chan(
+ wait_all_until_error_source( std::forward< Fn >( function),
+ std::forward< Fns >( functions) ... ) );
+ // fill results and/or exceptions vectors
+ future_t future;
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
+ std::exception_ptr exp = future.get_exception_ptr();
+ if ( ! exp) {
+ results.push_back( future.get() );
+ } else {
+ exceptions.add( exp);
+ }
+ }
+ // if there were any exceptions, throw
+ if ( exceptions.size() ) {
+ throw exceptions;
+ }
+ // no exceptions: return vector to caller
+ return results;
+}
+//]
+
+Example wace( runner, "wait_all_collect_errors()", [](){
+ std::vector< std::string > values = wait_all_collect_errors(
+ [](){ return sleeper("waces_late", 150); },
+ [](){ return sleeper("waces_middle", 100); },
+ [](){ return sleeper("waces_early", 50); });
+ std::cout << "wait_all_collect_errors(success) =>";
+ for ( std::string const& v : values) {
+ std::cout << " '" << v << "'";
+ }
+ std::cout << std::endl;
+
+ std::string thrown;
+ std::size_t errors = 0;
+ try {
+ values = wait_all_collect_errors(
+ [](){ return sleeper("wacef_late", 150, true); },
+ [](){ return sleeper("wacef_middle", 100, true); },
+ [](){ return sleeper("wacef_early", 50); });
+ std::cout << "wait_all_collect_errors(fail) =>";
+ for ( std::string const& v : values) {
+ std::cout << " '" << v << "'";
+ }
+ std::cout << std::endl;
+ } catch ( exception_list const& e) {
+ thrown = e.what();
+ errors = e.size();
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_all_collect_errors(fail) threw '" << thrown
+ << "': " << errors << " errors" << std::endl;
+});
+
+/*****************************************************************************
+* when_all, heterogeneous
+*****************************************************************************/
+//[wait_all_members_get
+template< typename Result, typename ... Futures >
+Result wait_all_members_get( Futures && ... futures) {
+ // Fetch the results from the passed futures into Result's initializer
+ // list. It's true that the get() calls here will block the implicit
+ // iteration over futures -- but that doesn't matter because we won't be
+ // done until the slowest of them finishes anyway. As results are
+ // processed in argument-list order rather than order of completion, the
+ // leftmost get() to throw an exception will cause that exception to
+ // propagate to the caller.
+ return Result{ futures.get() ... };
+}
+//]
+
+//[wait_all_members
+// Explicitly pass Result. This can be any type capable of being initialized
+// from the results of the passed functions, such as a struct.
+template< typename Result, typename ... Fns >
+Result wait_all_members( Fns && ... functions) {
+ // Run each of the passed functions on a separate fiber, passing all their
+ // futures to helper function for processing.
+ return wait_all_members_get< Result >(
+ boost::fibers::async( std::forward< Fns >( functions) ) ... );
+}
+//]
+
+// used by following example
+//[wait_Data
+struct Data {
+ std::string str;
+ double inexact;
+ int exact;
+
+ friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
+ ...*/
+//<-
+ {
+ return out << "Data{str='" << data.str << "', inexact=" << data.inexact
+ << ", exact=" << data.exact << "}";
+ }
+//->
+};
+//]
+
+// example usage
+Example wam( runner, "wait_all_members()", [](){
+//[wait_all_members_data_ex
+ Data data = wait_all_members< Data >(
+ [](){ return sleeper("wams_left", 100); },
+ [](){ return sleeper(3.14, 150); },
+ [](){ return sleeper(17, 50); });
+ std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
+//]
+
+ std::string thrown;
+ try {
+ data = wait_all_members< Data >(
+ [](){ return sleeper("wamf_left", 100, true); },
+ [](){ return sleeper(3.14, 150); },
+ [](){ return sleeper(17, 50, true); });
+ std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
+ } catch ( std::exception const& e) {
+ thrown = e.what();
+ }
+ std::cout << "wait_all_members<Data>(fail) threw '" << thrown
+ << '"' << std::endl;
+
+//[wait_all_members_vector_ex
+ // If we don't care about obtaining results as soon as they arrive, and we
+ // prefer a result vector in passed argument order rather than completion
+ // order, wait_all_members() is another possible implementation of
+ // wait_all_until_error().
+ auto strings = wait_all_members< std::vector< std::string > >(
+ [](){ return sleeper("wamv_left", 150); },
+ [](){ return sleeper("wamv_middle", 100); },
+ [](){ return sleeper("wamv_right", 50); });
+ std::cout << "wait_all_members<vector>() =>";
+ for ( std::string const& str : strings) {
+ std::cout << " '" << str << "'";
+ }
+ std::cout << std::endl;
+//]
+});
+
+
+/*****************************************************************************
+* main()
+*****************************************************************************/
+int main( int argc, char *argv[]) {
+ runner.run();
+ std::cout << "done." << std::endl;
+ return EXIT_SUCCESS;
+}