diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/boost/libs/fiber/examples/wait_stuff.cpp | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/boost/libs/fiber/examples/wait_stuff.cpp')
-rw-r--r-- | src/boost/libs/fiber/examples/wait_stuff.cpp | 984 |
1 files changed, 984 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/wait_stuff.cpp b/src/boost/libs/fiber/examples/wait_stuff.cpp new file mode 100644 index 00000000..00f64788 --- /dev/null +++ b/src/boost/libs/fiber/examples/wait_stuff.cpp @@ -0,0 +1,984 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#include <algorithm> +#include <cassert> +#include <chrono> +#include <iostream> +#include <memory> +#include <sstream> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/variant/variant.hpp> +#include <boost/variant/get.hpp> + +// These are wait_something() functions rather than when_something() +// functions. A big part of the point of the Fiber library is to model +// sequencing using the processor's instruction pointer rather than chains of +// callbacks. The future-oriented when_all() / when_any() functions are still +// based on chains of callbacks. With Fiber, we can do better. + +/***************************************************************************** +* Verbose +*****************************************************************************/ +class Verbose { +public: + Verbose( std::string const& d): + desc( d) { + std::cout << desc << " start" << std::endl; + } + + ~Verbose() { + std::cout << desc << " stop" << std::endl; + } + + Verbose( Verbose const&) = delete; + Verbose & operator=( Verbose const&) = delete; + +private: + const std::string desc; +}; + +/***************************************************************************** +* Runner and Example +*****************************************************************************/ +// collect and ultimately run every Example +class Runner { + typedef std::vector< std::pair< std::string, std::function< void() > > > function_list; + +public: + void add( std::string const& desc, std::function< void() > const& func) { + functions_.push_back( function_list::value_type( desc, func) ); + } + + void run() { + for ( function_list::value_type const& pair : functions_) { + Verbose v( pair.first); + pair.second(); + } + } + +private: + function_list functions_; +}; + +Runner runner; + +// Example allows us to embed Runner::add() calls at module scope +struct Example { + Example( Runner & runner, std::string const& desc, std::function< void() > const& func) { + runner.add( desc, func); + } +}; + +/***************************************************************************** +* example task functions +*****************************************************************************/ +//[wait_sleeper +template< typename T > +T sleeper_impl( T item, int ms, bool thrw = false) { + std::ostringstream descb, funcb; + descb << item; + std::string desc( descb.str() ); + funcb << " sleeper(" << item << ")"; + Verbose v( funcb.str() ); + + boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) ); + if ( thrw) { + throw std::runtime_error( desc); + } + return item; +} +//] + +inline +std::string sleeper( std::string const& item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +double sleeper( double item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +int sleeper(int item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +/***************************************************************************** +* Done +*****************************************************************************/ +//[wait_done +// Wrap canonical pattern for condition_variable + bool flag +struct Done { +private: + boost::fibers::condition_variable cond; + boost::fibers::mutex mutex; + bool ready = false; + +public: + typedef std::shared_ptr< Done > ptr; + + void wait() { + std::unique_lock< boost::fibers::mutex > lock( mutex); + cond.wait( lock, [this](){ return ready; }); + } + + void notify() { + { + std::unique_lock< boost::fibers::mutex > lock( mutex); + ready = true; + } // release mutex + cond.notify_one(); + } +}; +//] + +/***************************************************************************** +* when_any, simple completion +*****************************************************************************/ +//[wait_first_simple_impl +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_first_simple_impl( Done::ptr) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +template< typename Fn, typename ... Fns > +void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) { + boost::fibers::fiber( [done, function](){ + function(); + done->notify(); + }).detach(); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate Done, launch tasks, wait for Done +//[wait_first_simple +template< typename ... Fns > +void wait_first_simple( Fns && ... functions) { + // Use shared_ptr because each function's fiber will bind it separately, + // and we're going to return before the last of them completes. + auto done( std::make_shared< Done >() ); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); + done->wait(); +} +//] + +// example usage +Example wfs( runner, "wait_first_simple()", [](){ +//[wait_first_simple_ex + wait_first_simple( + [](){ sleeper("wfs_long", 150); }, + [](){ sleeper("wfs_medium", 100); }, + [](){ sleeper("wfs_short", 50); }); +//] +}); + +/***************************************************************************** +* when_any, return value +*****************************************************************************/ +// When there's only one function, call this overload +//[wait_first_value_impl +template< typename T, typename Fn > +void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + Fn && function) { + boost::fibers::fiber( [chan, function](){ + // Ignore channel_op_status returned by push(): + // might be closed; we simply don't care. + chan->push( function() ); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_value_impl< T >( chan, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_value_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_first_value +// Assume that all passed functions have the same return type. The return type +// of wait_first_value() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_value( Fn && function, Fns && ... functions) { + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + return value; +} +//] + +// example usage +Example wfv( runner, "wait_first_value()", [](){ +//[wait_first_value_ex + std::string result = wait_first_value( + [](){ return sleeper("wfv_third", 150); }, + [](){ return sleeper("wfv_second", 100); }, + [](){ return sleeper("wfv_first", 50); }); + std::cout << "wait_first_value() => " << result << std::endl; + assert(result == "wfv_first"); +//] +}); + +/***************************************************************************** +* when_any, produce first outcome, whether result or exception +*****************************************************************************/ +// When there's only one function, call this overload. +//[wait_first_outcome_impl +template< typename T, typename CHANP, typename Fn > +void wait_first_outcome_impl( CHANP chan, Fn && function) { + boost::fibers::fiber( + // Use std::bind() here for C++11 compatibility. C++11 lambda capture + // can't move a move-only Fn type, but bind() can. Let bind() move the + // channel pointer and the function into the bound object, passing + // references into the lambda. + std::bind( + []( CHANP & chan, + typename std::decay< Fn >::type & function) { + // Instantiate a packaged_task to capture any exception thrown by + // function. + boost::fibers::packaged_task< T() > task( function); + // Immediately run this packaged_task on same fiber. We want + // function() to have completed BEFORE we push the future. + task(); + // Pass the corresponding future to consumer. Ignore + // channel_op_status returned by push(): might be closed; we + // simply don't care. + chan->push( task.get_future() ); + }, + chan, + std::forward< Fn >( function) + )).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_outcome_impl( CHANP chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_outcome_impl< T >( chan, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_outcome_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +// Assume that all passed functions have the same return type. The return type +// of wait_first_outcome() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_outcome +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_outcome( Fn && function, Fns && ... functions) { + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + auto chanp(std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first future + future_t future( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + // either return value or throw exception + return future.get(); +} +//] + +// example usage +Example wfo( runner, "wait_first_outcome()", [](){ +//[wait_first_outcome_ex + std::string result = wait_first_outcome( + [](){ return sleeper("wfos_first", 50); }, + [](){ return sleeper("wfos_second", 100); }, + [](){ return sleeper("wfos_third", 150); }); + std::cout << "wait_first_outcome(success) => " << result << std::endl; + assert(result == "wfos_first"); + + std::string thrown; + try { + result = wait_first_outcome( + [](){ return sleeper("wfof_first", 50, true); }, + [](){ return sleeper("wfof_second", 100); }, + [](){ return sleeper("wfof_third", 150); }); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_outcome(fail) threw '" << thrown + << "'" << std::endl; + assert(thrown == "wfof_first"); +//] +}); + +/***************************************************************************** +* when_any, collect exceptions until success; throw exception_list if no +* success +*****************************************************************************/ +// define an exception to aggregate exception_ptrs; prefer +// std::exception_list (N4407 et al.) once that becomes available +//[exception_list +class exception_list : public std::runtime_error { +public: + exception_list( std::string const& what) : + std::runtime_error( what) { + } + + typedef std::vector< std::exception_ptr > bundle_t; + + // N4407 proposed std::exception_list API + typedef bundle_t::const_iterator iterator; + + std::size_t size() const noexcept { + return bundle_.size(); + } + + iterator begin() const noexcept { + return bundle_.begin(); + } + + iterator end() const noexcept { + return bundle_.end(); + } + + // extension to populate + void add( std::exception_ptr ep) { + bundle_.push_back( ep); + } + +private: + bundle_t bundle_; +}; +//] + +// Assume that all passed functions have the same return type. The return type +// of wait_first_success() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_success +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_success( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // instantiate exception_list, just in case + exception_list exceptions("wait_first_success() produced only errors"); + // retrieve up to 'count' results -- but stop there! + for ( std::size_t i = 0; i < count; ++i) { + // retrieve the next future + future_t future( chanp->value_pop() ); + // retrieve exception_ptr if any + std::exception_ptr error( future.get_exception_ptr() ); + // if no error, then yay, return value + if ( ! error) { + // close the channel: no subsequent push() has to succeed + chanp->close(); + // show caller the value we got + return future.get(); + } + + // error is non-null: collect + exceptions.add( error); + } + // We only arrive here when every passed function threw an exception. + // Throw our collection to inform caller. + throw exceptions; +} +//] + +// example usage +Example wfss( runner, "wait_first_success()", [](){ +//[wait_first_success_ex + std::string result = wait_first_success( + [](){ return sleeper("wfss_first", 50, true); }, + [](){ return sleeper("wfss_second", 100); }, + [](){ return sleeper("wfss_third", 150); }); + std::cout << "wait_first_success(success) => " << result << std::endl; + assert(result == "wfss_second"); +//] + + std::string thrown; + std::size_t count = 0; + try { + result = wait_first_success( + [](){ return sleeper("wfsf_first", 50, true); }, + [](){ return sleeper("wfsf_second", 100, true); }, + [](){ return sleeper("wfsf_third", 150, true); }); + } catch ( exception_list const& e) { + thrown = e.what(); + count = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_success(fail) threw '" << thrown << "': " + << count << " errors" << std::endl; + assert(thrown == "wait_first_success() produced only errors"); + assert(count == 3); +}); + +/***************************************************************************** +* when_any, heterogeneous +*****************************************************************************/ +//[wait_first_value_het +// No need to break out the first Fn for interface function: let the compiler +// complain if empty. +// Our functions have different return types, and we might have to return any +// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in +// parameter pack. +template< typename ... Fns > +boost::variant< typename std::result_of< Fns() >::type ... > +wait_first_value_het( Fns && ... functions) { + // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above. + typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( chanp, + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + return value; +} +//] + +// example usage +Example wfvh( runner, "wait_first_value_het()", [](){ +//[wait_first_value_het_ex + boost::variant< std::string, double, int > result = + wait_first_value_het( + [](){ return sleeper("wfvh_third", 150); }, + [](){ return sleeper(3.14, 100); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_first_value_het() => " << result << std::endl; + assert(boost::get< int >( result) == 17); +//] +}); + +/***************************************************************************** +* when_all, simple completion +*****************************************************************************/ +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +//[wait_all_simple_impl +template< typename Fn, typename ... Fns > +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier, + Fn && function, Fns && ... functions) { + boost::fibers::fiber( + std::bind( + []( std::shared_ptr< boost::fibers::barrier > & barrier, + typename std::decay< Fn >::type & function) mutable { + function(); + barrier->wait(); + }, + barrier, + std::forward< Fn >( function) + )).detach(); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate barrier, launch tasks, wait for barrier +//[wait_all_simple +template< typename ... Fns > +void wait_all_simple( Fns && ... functions) { + std::size_t count( sizeof ... ( functions) ); + // Initialize a barrier(count+1) because we'll immediately wait on it. We + // don't want to wake up until 'count' more fibers wait on it. Even though + // we'll stick around until the last of them completes, use shared_ptr + // anyway because it's easier to be confident about lifespan issues. + auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) ); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); + barrier->wait(); +} +//] + +// example usage +Example was( runner, "wait_all_simple()", [](){ +//[wait_all_simple_ex + wait_all_simple( + [](){ sleeper("was_long", 150); }, + [](){ sleeper("was_medium", 100); }, + [](){ sleeper("was_short", 50); }); +//] +}); + +/***************************************************************************** +* when_all, return values +*****************************************************************************/ +//[wait_nchannel +// Introduce a channel facade that closes the channel once a specific number +// of items has been pushed. This allows an arbitrary consumer to read until +// 'closed' without itself having to count items. +template< typename T > +class nchannel { +public: + nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + std::size_t lm): + chan_( chan), + limit_( lm) { + assert(chan_); + if ( 0 == limit_) { + chan_->close(); + } + } + + boost::fibers::channel_op_status push( T && va) { + boost::fibers::channel_op_status ok = + chan_->push( std::forward< T >( va) ); + if ( ok == boost::fibers::channel_op_status::success && + --limit_ == 0) { + // after the 'limit_'th successful push, close the channel + chan_->close(); + } + return ok; + } + +private: + std::shared_ptr< boost::fibers::buffered_channel< T > > chan_; + std::size_t limit_; +}; +//] + +// When there's only one function, call this overload +//[wait_all_values_impl +template< typename T, typename Fn > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, + Fn && function) { + boost::fibers::fiber( [chan, function](){ + chan->push(function()); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_all_values_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_all_values_source +// Return a shared_ptr<buffered_channel<T>> from which the caller can +// retrieve each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > > +wait_all_values_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + // make the channel + auto chanp( std::make_shared< channel_t >( 64) ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_all_values_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return chanp; +} +//] + +// When all passed functions have completed, return vector<T> containing +// collected results. Assume that all passed functions have the same return +// type. It is simply invalid to pass NO functions. +//[wait_all_values +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_values( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan = + wait_all_values_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // fill results vector + return_t value; + while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { + results.push_back( value); + } + // return vector to caller + return results; +} +//] + +Example wav( runner, "wait_all_values()", [](){ +//[wait_all_values_source_ex + std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan = + wait_all_values_source( + [](){ return sleeper("wavs_third", 150); }, + [](){ return sleeper("wavs_second", 100); }, + [](){ return sleeper("wavs_first", 50); }); + std::string value; + while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { + std::cout << "wait_all_values_source() => '" << value + << "'" << std::endl; + } +//] + +//[wait_all_values_ex + std::vector< std::string > values = + wait_all_values( + [](){ return sleeper("wav_late", 150); }, + [](){ return sleeper("wav_middle", 100); }, + [](){ return sleeper("wav_early", 50); }); +//] + std::cout << "wait_all_values() =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +}); + +/***************************************************************************** +* when_all, throw first exception +*****************************************************************************/ +//[wait_all_until_error_source +// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can +// get() each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< + boost::fibers::buffered_channel< + boost::fibers::future< + typename std::result_of< Fn() >::type > > > +wait_all_until_error_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + // make the channel + auto chanp( std::make_shared< channel_t >( 64) ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_first_outcome_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return chanp; +} +//] + +// When all passed functions have completed, return vector<T> containing +// collected results, or throw the first exception thrown by any of the passed +// functions. Assume that all passed functions have the same return type. It +// is simply invalid to pass NO functions. +//[wait_all_until_error +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_until_error( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< + boost::fibers::buffered_channel< future_t > > chan( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results vector + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + results.push_back( future.get() ); + } + // return vector to caller + return results; +} +//] + +Example waue( runner, "wait_all_until_error()", [](){ +//[wait_all_until_error_source_ex + typedef boost::fibers::future< std::string > future_t; + std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan = + wait_all_until_error_source( + [](){ return sleeper("wauess_third", 150); }, + [](){ return sleeper("wauess_second", 100); }, + [](){ return sleeper("wauess_first", 50); }); + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(success) => '" << value + << "'" << std::endl; + } +//] + + chan = wait_all_until_error_source( + [](){ return sleeper("wauesf_third", 150); }, + [](){ return sleeper("wauesf_second", 100, true); }, + [](){ return sleeper("wauesf_first", 50); }); +//[wait_all_until_error_ex + std::string thrown; +//<- + try { + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(fail) => '" << value + << "'" << std::endl; + } + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error_source(fail) threw '" << thrown + << "'" << std::endl; + + thrown.clear(); +//-> + try { + std::vector< std::string > values = wait_all_until_error( + [](){ return sleeper("waue_late", 150); }, + [](){ return sleeper("waue_middle", 100, true); }, + [](){ return sleeper("waue_early", 50); }); +//<- + std::cout << "wait_all_until_error(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +//-> + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error(fail) threw '" << thrown + << "'" << std::endl; +//] +}); + +/***************************************************************************** +* when_all, collect exceptions +*****************************************************************************/ +// When all passed functions have succeeded, return vector<T> containing +// collected results, or throw exception_list containing all exceptions thrown +// by any of the passed functions. Assume that all passed functions have the +// same return type. It is simply invalid to pass NO functions. +//[wait_all_collect_errors +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_collect_errors( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + exception_list exceptions("wait_all_collect_errors() exceptions"); + + // get channel + std::shared_ptr< + boost::fibers::buffered_channel< future_t > > chan( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results and/or exceptions vectors + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::exception_ptr exp = future.get_exception_ptr(); + if ( ! exp) { + results.push_back( future.get() ); + } else { + exceptions.add( exp); + } + } + // if there were any exceptions, throw + if ( exceptions.size() ) { + throw exceptions; + } + // no exceptions: return vector to caller + return results; +} +//] + +Example wace( runner, "wait_all_collect_errors()", [](){ + std::vector< std::string > values = wait_all_collect_errors( + [](){ return sleeper("waces_late", 150); }, + [](){ return sleeper("waces_middle", 100); }, + [](){ return sleeper("waces_early", 50); }); + std::cout << "wait_all_collect_errors(success) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + + std::string thrown; + std::size_t errors = 0; + try { + values = wait_all_collect_errors( + [](){ return sleeper("wacef_late", 150, true); }, + [](){ return sleeper("wacef_middle", 100, true); }, + [](){ return sleeper("wacef_early", 50); }); + std::cout << "wait_all_collect_errors(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + } catch ( exception_list const& e) { + thrown = e.what(); + errors = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_collect_errors(fail) threw '" << thrown + << "': " << errors << " errors" << std::endl; +}); + +/***************************************************************************** +* when_all, heterogeneous +*****************************************************************************/ +//[wait_all_members_get +template< typename Result, typename ... Futures > +Result wait_all_members_get( Futures && ... futures) { + // Fetch the results from the passed futures into Result's initializer + // list. It's true that the get() calls here will block the implicit + // iteration over futures -- but that doesn't matter because we won't be + // done until the slowest of them finishes anyway. As results are + // processed in argument-list order rather than order of completion, the + // leftmost get() to throw an exception will cause that exception to + // propagate to the caller. + return Result{ futures.get() ... }; +} +//] + +//[wait_all_members +// Explicitly pass Result. This can be any type capable of being initialized +// from the results of the passed functions, such as a struct. +template< typename Result, typename ... Fns > +Result wait_all_members( Fns && ... functions) { + // Run each of the passed functions on a separate fiber, passing all their + // futures to helper function for processing. + return wait_all_members_get< Result >( + boost::fibers::async( std::forward< Fns >( functions) ) ... ); +} +//] + +// used by following example +//[wait_Data +struct Data { + std::string str; + double inexact; + int exact; + + friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=; + ...*/ +//<- + { + return out << "Data{str='" << data.str << "', inexact=" << data.inexact + << ", exact=" << data.exact << "}"; + } +//-> +}; +//] + +// example usage +Example wam( runner, "wait_all_members()", [](){ +//[wait_all_members_data_ex + Data data = wait_all_members< Data >( + [](){ return sleeper("wams_left", 100); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_all_members<Data>(success) => " << data << std::endl; +//] + + std::string thrown; + try { + data = wait_all_members< Data >( + [](){ return sleeper("wamf_left", 100, true); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50, true); }); + std::cout << "wait_all_members<Data>(fail) => " << data << std::endl; + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_members<Data>(fail) threw '" << thrown + << '"' << std::endl; + +//[wait_all_members_vector_ex + // If we don't care about obtaining results as soon as they arrive, and we + // prefer a result vector in passed argument order rather than completion + // order, wait_all_members() is another possible implementation of + // wait_all_until_error(). + auto strings = wait_all_members< std::vector< std::string > >( + [](){ return sleeper("wamv_left", 150); }, + [](){ return sleeper("wamv_middle", 100); }, + [](){ return sleeper("wamv_right", 50); }); + std::cout << "wait_all_members<vector>() =>"; + for ( std::string const& str : strings) { + std::cout << " '" << str << "'"; + } + std::cout << std::endl; +//] +}); + + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + runner.run(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; +} |