diff options
Diffstat (limited to 'src/boost/libs/fiber/examples')
31 files changed, 4695 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/Jamfile.v2 b/src/boost/libs/fiber/examples/Jamfile.v2 new file mode 100644 index 00000000..0ebe2daf --- /dev/null +++ b/src/boost/libs/fiber/examples/Jamfile.v2 @@ -0,0 +1,53 @@ +# Boost.Fiber Library Examples Jamfile + +# Copyright Oliver Kowalke 2013. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +# For more information, see http://www.boost.org/ + +import common ; +import feature ; +import indirect ; +import modules ; +import os ; +import toolset ; + +project boost/fiber/example + : requirements + <library>../build//boost_fiber + <library>/boost/context//boost_context + <library>/boost/filesystem//boost_filesystem + <library>/boost/thread//boost_thread + <target-os>solaris:<linkflags>"-llgrp" + <target-os>solaris:<linkflags>"-lsocket" + <target-os>windows:<define>_WIN32_WINNT=0x0601 + <toolset>gcc,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>gcc,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <toolset>clang,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>clang,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <link>shared + <threading>multi + ; + +exe adapt_callbacks : adapt_callbacks.cpp ; +exe adapt_method_calls : adapt_method_calls.cpp ; +exe adapt_nonblocking : adapt_nonblocking.cpp ; +exe barrier : barrier.cpp ; +exe future : future.cpp ; +exe join : join.cpp ; +exe ping_pong : ping_pong.cpp ; +exe range_for : range_for.cpp ; +exe priority : priority.cpp ; +exe segmented_stack : segmented_stack.cpp ; +exe simple : simple.cpp ; +exe wait_stuff : wait_stuff.cpp ; +exe work_sharing : work_sharing.cpp ; +exe work_stealing : work_stealing.cpp ; + +exe asio/autoecho : asio/autoecho.cpp ; +exe asio/exchange : asio/exchange.cpp ; +exe asio/ps/publisher : asio/ps/publisher.cpp ; +exe asio/ps/server : asio/ps/server.cpp ; +exe asio/ps/subscriber : asio/ps/subscriber.cpp ; diff --git a/src/boost/libs/fiber/examples/adapt_callbacks.cpp b/src/boost/libs/fiber/examples/adapt_callbacks.cpp new file mode 100644 index 00000000..1c6bb502 --- /dev/null +++ b/src/boost/libs/fiber/examples/adapt_callbacks.cpp @@ -0,0 +1,316 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cassert> +#include <chrono> +#include <exception> +#include <iostream> +#include <sstream> +#include <thread> +#include <tuple> // std::tie() + +#include <boost/context/detail/apply.hpp> +#include <boost/fiber/all.hpp> + +#if defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) +/***************************************************************************** +* helper code to help callback +*****************************************************************************/ +template< typename Fn, typename ... Args > +class helper { +private: + typename std::decay< Fn >::type fn_; + std::tuple< typename std::decay< Args >::type ... > args_; + +public: + helper( Fn && fn, Args && ... args) : + fn_( std::forward< Fn >( fn) ), + args_( std::make_tuple( std::forward< Args >( args) ... ) ) { + } + + helper( helper && other) = default; + helper & operator=( helper && other) = default; + + helper( helper const&) = default; + helper & operator=( helper const&) = default; + + void operator()() { + boost::context::detail::apply( fn_, args_); + } +}; + +template< typename Fn, typename ... Args > +helper< Fn, Args ... > help( Fn && fn, Args && ... args) { + return helper< Fn, Args ... >( std::forward< Fn >( fn), std::forward< Args >( args) ... ); +} +#endif + +/***************************************************************************** +* example async API +*****************************************************************************/ +//[AsyncAPI +class AsyncAPI { +public: + // constructor acquires some resource that can be read and written + AsyncAPI(); + + // callbacks accept an int error code; 0 == success + typedef int errorcode; + + // write callback only needs to indicate success or failure + template< typename Fn > + void init_write( std::string const& data, Fn && callback); + + // read callback needs to accept both errorcode and data + template< typename Fn > + void init_read( Fn && callback); + + // ... other operations ... +//<- + void inject_error( errorcode ec); + +private: + std::string data_; + errorcode injected_; +//-> +}; +//] + +/***************************************************************************** +* fake AsyncAPI implementation... pay no attention to the little man behind +* the curtain... +*****************************************************************************/ +AsyncAPI::AsyncAPI() : + injected_( 0) { +} + +void AsyncAPI::inject_error( errorcode ec) { + injected_ = ec; +} + +template< typename Fn > +void AsyncAPI::init_write( std::string const& data, Fn && callback) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // update data_ (this might be an echo service) + if ( ! injected) { + data_ = data; + } + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling completion callback. Echo back to + // caller any previously-injected errorcode. +#if ! defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + std::thread( [injected,callback=std::forward< Fn >( callback)]() mutable { + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + callback( injected); + }).detach(); +#else + std::thread( + std::move( + help( std::forward< Fn >( callback), injected) ) ).detach(); +#endif +} + +template< typename Fn > +void AsyncAPI::init_read( Fn && callback) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // local copy of data_ so we can capture in lambda + std::string data( data_); + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling completion callback. Echo back to + // caller any previously-injected errorcode. +#if ! defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + std::thread( [injected,callback=std::forward< Fn >( callback),data]() mutable { + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + callback( injected, data); + }).detach(); +#else + std::thread( + std::move( + help( std::forward< Fn >( callback), injected, data) ) ).detach(); +#endif +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +// helper function used in a couple of the adapters +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode); + +//[callbacks_write_ec +AsyncAPI::errorcode write_ec( AsyncAPI & api, std::string const& data) { + boost::fibers::promise< AsyncAPI::errorcode > promise; + boost::fibers::future< AsyncAPI::errorcode > future( promise.get_future() ); + // In general, even though we block waiting for future::get() and therefore + // won't destroy 'promise' until promise::set_value() has been called, we + // are advised that with threads it's possible for ~promise() to be + // entered before promise::set_value() has returned. While that shouldn't + // happen with fibers::promise, a robust way to deal with the lifespan + // issue is to bind 'promise' into our lambda. Since promise is move-only, + // use initialization capture. +#if ! defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_write( + data, + [promise=std::move( promise)]( AsyncAPI::errorcode ec) mutable { + promise.set_value( ec); + }); + +#else // defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_write( + data, + std::bind([](boost::fibers::promise< AsyncAPI::errorcode > & promise, + AsyncAPI::errorcode ec) { + promise.set_value( ec); + }, + std::move( promise), + std::placeholders::_1) ); +#endif // BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES + + return future.get(); +} +//] + +//[callbacks_write +void write( AsyncAPI & api, std::string const& data) { + AsyncAPI::errorcode ec = write_ec( api, data); + if ( ec) { + throw make_exception("write", ec); + } +} +//] + +//[callbacks_read_ec +std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) { + typedef std::pair< AsyncAPI::errorcode, std::string > result_pair; + boost::fibers::promise< result_pair > promise; + boost::fibers::future< result_pair > future( promise.get_future() ); + // We promise that both 'promise' and 'future' will survive until our + // lambda has been called. +#if ! defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_read([promise=std::move( promise)]( AsyncAPI::errorcode ec, std::string const& data) mutable { + promise.set_value( result_pair( ec, data) ); + }); +#else // defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_read( + std::bind([]( boost::fibers::promise< result_pair > & promise, + AsyncAPI::errorcode ec, std::string const& data) mutable { + promise.set_value( result_pair( ec, data) ); + }, + std::move( promise), + std::placeholders::_1, + std::placeholders::_2) ); +#endif // BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES + return future.get(); +} +//] + +//[callbacks_read +std::string read( AsyncAPI & api) { + boost::fibers::promise< std::string > promise; + boost::fibers::future< std::string > future( promise.get_future() ); + // Both 'promise' and 'future' will survive until our lambda has been + // called. +#if ! defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_read([&promise]( AsyncAPI::errorcode ec, std::string const& data) mutable { + if ( ! ec) { + promise.set_value( data); + } else { + promise.set_exception( + std::make_exception_ptr( + make_exception("read", ec) ) ); + } + }); +#else // defined(BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES) + api.init_read( + std::bind([]( boost::fibers::promise< std::string > & promise, + AsyncAPI::errorcode ec, std::string const& data) mutable { + if ( ! ec) { + promise.set_value( data); + } else { + promise.set_exception( + std::make_exception_ptr( + make_exception("read", ec) ) ); + } + }, + std::move( promise), + std::placeholders::_1, + std::placeholders::_2) ); +#endif // BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES + return future.get(); +} +//] + +/***************************************************************************** +* helpers +*****************************************************************************/ +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode ec) { + std::ostringstream buffer; + buffer << "Error in AsyncAPI::" << desc << "(): " << ec; + return std::runtime_error( buffer.str() ); +} + +/***************************************************************************** +* driving logic +*****************************************************************************/ +int main( int argc, char *argv[]) { + AsyncAPI api; + + // successful write(): prime AsyncAPI with some data + write( api, "abcd"); + // successful read(): retrieve it + std::string data( read( api) ); + assert( data == "abcd"); + + // successful write_ec() + AsyncAPI::errorcode ec( write_ec( api, "efgh") ); + assert( ec == 0); + + // write_ec() with error + api.inject_error(1); + ec = write_ec( api, "ijkl"); + assert( ec == 1); + + // write() with error + std::string thrown; + api.inject_error(2); + try { + write(api, "mnop"); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert( thrown == make_exception("write", 2).what() ); + + // successful read_ec() +//[callbacks_read_ec_call + std::tie( ec, data) = read_ec( api); +//] + assert( ! ec); + assert( data == "efgh"); // last successful write_ec() + + // read_ec() with error + api.inject_error(3); + std::tie( ec, data) = read_ec( api); + assert( ec == 3); + // 'data' in unspecified state, don't test + + // read() with error + thrown.clear(); + api.inject_error(4); + try { + data = read(api); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert( thrown == make_exception("read", 4).what() ); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/adapt_method_calls.cpp b/src/boost/libs/fiber/examples/adapt_method_calls.cpp new file mode 100644 index 00000000..7cfd78df --- /dev/null +++ b/src/boost/libs/fiber/examples/adapt_method_calls.cpp @@ -0,0 +1,167 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <boost/fiber/all.hpp> +#include <memory> // std::shared_ptr +#include <thread> +#include <chrono> +#include <iostream> +#include <sstream> +#include <exception> +#include <cassert> + +/***************************************************************************** +* example async API +*****************************************************************************/ +// introduce class-scope typedef +struct AsyncAPIBase { + // error callback accepts an int error code; 0 == success + typedef int errorcode; +}; + +//[Response +// every async operation receives a subclass instance of this abstract base +// class through which to communicate its result +struct Response { + typedef std::shared_ptr< Response > ptr; + + // called if the operation succeeds + virtual void success( std::string const& data) = 0; + + // called if the operation fails + virtual void error( AsyncAPIBase::errorcode ec) = 0; +}; +//] + +// the actual async API +class AsyncAPI: public AsyncAPIBase { +public: + // constructor acquires some resource that can be read + AsyncAPI( std::string const& data); + +//[method_init_read + // derive Response subclass, instantiate, pass Response::ptr + void init_read( Response::ptr); +//] + + // ... other operations ... + void inject_error( errorcode ec); + +private: + std::string data_; + errorcode injected_; +}; + +/***************************************************************************** +* fake AsyncAPI implementation... pay no attention to the little man behind +* the curtain... +*****************************************************************************/ +AsyncAPI::AsyncAPI( std::string const& data) : + data_( data), + injected_( 0) { +} + +void AsyncAPI::inject_error( errorcode ec) { + injected_ = ec; +} + +void AsyncAPI::init_read( Response::ptr response) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // local copy of data_ so we can capture in lambda + std::string data( data_); + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling either completion method. + std::thread( [injected, response, data](){ + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + if ( ! injected) { + // no error, call success() + response->success( data); + } else { + // injected error, call error() + response->error( injected); + } + }).detach(); +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +// helper function +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode); + +//[PromiseResponse +class PromiseResponse: public Response { +public: + // called if the operation succeeds + virtual void success( std::string const& data) { + promise_.set_value( data); + } + + // called if the operation fails + virtual void error( AsyncAPIBase::errorcode ec) { + promise_.set_exception( + std::make_exception_ptr( + make_exception("read", ec) ) ); + } + + boost::fibers::future< std::string > get_future() { + return promise_.get_future(); + } + +private: + boost::fibers::promise< std::string > promise_; +}; +//] + +//[method_read +std::string read( AsyncAPI & api) { + // Because init_read() requires a shared_ptr, we must allocate our + // ResponsePromise on the heap, even though we know its lifespan. + auto promisep( std::make_shared< PromiseResponse >() ); + boost::fibers::future< std::string > future( promisep->get_future() ); + // Both 'promisep' and 'future' will survive until our lambda has been + // called. + api.init_read( promisep); + return future.get(); +} +//] + +/***************************************************************************** +* helpers +*****************************************************************************/ +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode ec) { + std::ostringstream buffer; + buffer << "Error in AsyncAPI::" << desc << "(): " << ec; + return std::runtime_error( buffer.str() ); +} + +/***************************************************************************** +* driving logic +*****************************************************************************/ +int main(int argc, char *argv[]) { + // prime AsyncAPI with some data + AsyncAPI api("abcd"); + + // successful read(): retrieve it + std::string data( read( api) ); + assert(data == "abcd"); + + // read() with error + std::string thrown; + api.inject_error(1); + try { + data = read( api); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert(thrown == make_exception("read", 1).what() ); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/adapt_nonblocking.cpp b/src/boost/libs/fiber/examples/adapt_nonblocking.cpp new file mode 100644 index 00000000..879e1789 --- /dev/null +++ b/src/boost/libs/fiber/examples/adapt_nonblocking.cpp @@ -0,0 +1,208 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <boost/fiber/all.hpp> +#include <iostream> +#include <sstream> +#include <exception> +#include <string> +#include <algorithm> // std::min() +#include <errno.h> // EWOULDBLOCK +#include <cassert> +#include <cstdio> + +/***************************************************************************** +* example nonblocking API +*****************************************************************************/ +//[NonblockingAPI +class NonblockingAPI { +public: + NonblockingAPI(); + + // nonblocking operation: may return EWOULDBLOCK + int read( std::string & data, std::size_t desired); + +/*= ...*/ +//<- + // for simulating a real nonblocking API + void set_data( std::string const& data, std::size_t chunksize); + void inject_error( int ec); + +private: + std::string data_; + int injected_; + unsigned tries_; + std::size_t chunksize_; +//-> +}; +//] + +/***************************************************************************** +* fake NonblockingAPI implementation... pay no attention to the little man +* behind the curtain... +*****************************************************************************/ +NonblockingAPI::NonblockingAPI() : + injected_( 0), + tries_( 0), + chunksize_( 9999) { +} + +void NonblockingAPI::set_data( std::string const& data, std::size_t chunksize) { + data_ = data; + chunksize_ = chunksize; + // This delimits the start of a new test. Reset state. + injected_ = 0; + tries_ = 0; +} + +void NonblockingAPI::inject_error( int ec) { + injected_ = ec; +} + +int NonblockingAPI::read( std::string & data, std::size_t desired) { + // in case of error + data.clear(); + + if ( injected_) { + // copy injected_ because we're about to reset it + auto injected( injected_); + injected_ = 0; + // after an error situation, restart success count + tries_ = 0; + return injected; + } + + if ( ++tries_ < 5) { + // no injected error, but the resource isn't yet ready + return EWOULDBLOCK; + } + + // tell caller there's nothing left + if ( data_.empty() ) { + return EOF; + } + + // okay, finally have some data + // but return minimum of desired and chunksize_ + std::size_t size( ( std::min)( desired, chunksize_) ); + data = data_.substr( 0, size); + // strip off what we just returned + data_ = data_.substr( size); + // reset I/O retries count for next time + tries_ = 0; + // success + return 0; +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +//[nonblocking_read_chunk +// guaranteed not to return EWOULDBLOCK +int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) { + int error; + while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) { + // not ready yet -- try again on the next iteration of the + // application's main loop + boost::this_fiber::yield(); + } + return error; +} +//] + +//[nonblocking_read_desired +// keep reading until desired length, EOF or error +// may return both partial data and nonzero error +int read_desired( NonblockingAPI & api, std::string & data, std::size_t desired) { + // we're going to accumulate results into 'data' + data.clear(); + std::string chunk; + int error = 0; + while ( data.length() < desired && + ( error = read_chunk( api, chunk, desired - data.length() ) ) == 0) { + data.append( chunk); + } + return error; +} +//] + +//[nonblocking_IncompleteRead +// exception class augmented with both partially-read data and errorcode +class IncompleteRead : public std::runtime_error { +public: + IncompleteRead( std::string const& what, std::string const& partial, int ec) : + std::runtime_error( what), + partial_( partial), + ec_( ec) { + } + + std::string get_partial() const { + return partial_; + } + + int get_errorcode() const { + return ec_; + } + +private: + std::string partial_; + int ec_; +}; +//] + +//[nonblocking_read +// read all desired data or throw IncompleteRead +std::string read( NonblockingAPI & api, std::size_t desired) { + std::string data; + int ec( read_desired( api, data, desired) ); + + // for present purposes, EOF isn't a failure + if ( 0 == ec || EOF == ec) { + return data; + } + + // oh oh, partial read + std::ostringstream msg; + msg << "NonblockingAPI::read() error " << ec << " after " + << data.length() << " of " << desired << " characters"; + throw IncompleteRead( msg.str(), data, ec); +} +//] + +int main( int argc, char *argv[]) { + NonblockingAPI api; + const std::string sample_data("abcdefghijklmnopqrstuvwxyz"); + + // Try just reading directly from NonblockingAPI + api.set_data( sample_data, 5); + std::string data; + int ec = api.read( data, 13); + // whoops, underlying resource not ready + assert(ec == EWOULDBLOCK); + assert(data.empty()); + + // successful read() + api.set_data( sample_data, 5); + data = read( api, 13); + assert(data == "abcdefghijklm"); + + // read() with error + api.set_data( sample_data, 5); + // don't accidentally pick either EOF or EWOULDBLOCK + assert(EOF != 1); + assert(EWOULDBLOCK != 1); + api.inject_error(1); + int thrown = 0; + try { + data = read( api, 13); + } catch ( IncompleteRead const& e) { + thrown = e.get_errorcode(); + } + assert(thrown == 1); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/asio/autoecho.cpp b/src/boost/libs/fiber/examples/asio/autoecho.cpp new file mode 100644 index 00000000..06b4027a --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/autoecho.cpp @@ -0,0 +1,261 @@ +// Copyright 2003-2013 Christopher M. Kohlhoff +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <cstdlib> +#include <iomanip> +#include <iostream> +#include <map> +#include <memory> +#include <mutex> +#include <sstream> +#include <thread> + +#include <boost/asio.hpp> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include "round_robin.hpp" +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr< tcp::socket > socket_ptr; + +const char* const alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + +/***************************************************************************** +* thread names +*****************************************************************************/ +class ThreadNames { +private: + std::map<std::thread::id, std::string> names_{}; + const char* next_{ alpha }; + std::mutex mtx_{}; + +public: + ThreadNames() = default; + + std::string lookup() { + std::unique_lock<std::mutex> lk( mtx_); + auto this_id( std::this_thread::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + BOOST_ASSERT( *next_); + std::string name(1, *next_++ ); + names_[ this_id ] = name; + return name; + } +}; + +ThreadNames thread_names; + +/***************************************************************************** +* fiber names +*****************************************************************************/ +class FiberNames { +private: + std::map<boost::fibers::fiber::id, std::string> names_{}; + unsigned next_{ 0 }; + boost::fibers::mutex mtx_{}; + +public: + FiberNames() = default; + + std::string lookup() { + std::unique_lock<boost::fibers::mutex> lk( mtx_); + auto this_id( boost::this_fiber::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + std::ostringstream out; + // Bake into the fiber's name the thread name on which we first + // lookup() its ID, to be able to spot when a fiber hops between + // threads. + out << thread_names.lookup() << next_++; + std::string name( out.str() ); + names_[ this_id ] = name; + return name; + } +}; + +FiberNames fiber_names; + +std::string tag() { + std::ostringstream out; + out << "Thread " << thread_names.lookup() << ": " + << std::setw(4) << fiber_names.lookup() << std::setw(0); + return out.str(); +} + +/***************************************************************************** +* message printing +*****************************************************************************/ +void print_( std::ostream& out) { + out << '\n'; +} + +template < typename T, typename... Ts > +void print_( std::ostream& out, T const& arg, Ts const&... args) { + out << arg; + print_(out, args...); +} + +template < typename... T > +void print( T const&... args ) { + std::ostringstream buffer; + print_( buffer, args...); + std::cout << buffer.str() << std::flush; +} + +/***************************************************************************** +* fiber function per server connection +*****************************************************************************/ +void session( socket_ptr sock) { + try { + for (;;) { + char data[max_length]; + boost::system::error_code ec; + std::size_t length = sock->async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), ": handled: ", std::string(data, length)); + boost::asio::async_write( + * sock, + boost::asio::buffer( data, length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + } + print( tag(), ": connection closed"); + } catch ( std::exception const& ex) { + print( tag(), ": caught exception : ", ex.what()); + } +} + +/***************************************************************************** +* listening server +*****************************************************************************/ +void server( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a) { + print( tag(), ": echo-server started"); + try { + for (;;) { + socket_ptr socket( new tcp::socket( * io_svc) ); + boost::system::error_code ec; + a.async_accept( + * socket, + boost::fibers::asio::yield[ec]); + if ( ec) { + throw boost::system::system_error( ec); //some other error + } else { + boost::fibers::fiber( session, socket).detach(); + } + } + } catch ( std::exception const& ex) { + print( tag(), ": caught exception : ", ex.what()); + } + io_svc->stop(); + print( tag(), ": echo-server stopped"); +} + +/***************************************************************************** +* fiber function per client +*****************************************************************************/ +void client( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a, + boost::fibers::barrier& barrier, unsigned iterations) { + print( tag(), ": echo-client started"); + for (unsigned count = 0; count < iterations; ++count) { + tcp::resolver resolver( * io_svc); + tcp::resolver::query query( tcp::v4(), "127.0.0.1", "9999"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( * io_svc); + boost::asio::connect( s, iterator); + for (unsigned msg = 0; msg < 1; ++msg) { + std::ostringstream msgbuf; + msgbuf << "from " << fiber_names.lookup() << " " << count << "." << msg; + std::string message(msgbuf.str()); + print( tag(), ": Sending: ", message); + boost::system::error_code ec; + boost::asio::async_write( + s, + boost::asio::buffer( message), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + char reply[max_length]; + size_t reply_length = s.async_read_some( + boost::asio::buffer( reply, max_length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), ": Reply : ", std::string( reply, reply_length)); + } + } + // done with all iterations, wait for rest of client fibers + if ( barrier.wait()) { + // exactly one barrier.wait() call returns true + // we're the lucky one + a.close(); + print( tag(), ": acceptor stopped"); + } + print( tag(), ": echo-client stopped"); +} + +/***************************************************************************** +* main +*****************************************************************************/ +int main( int argc, char* argv[]) { + try { +//[asio_rr_setup + std::shared_ptr< boost::asio::io_service > io_svc = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); +//] + print( "Thread ", thread_names.lookup(), ": started"); +//[asio_rr_launch_fibers + // server + tcp::acceptor a( * io_svc, tcp::endpoint( tcp::v4(), 9999) ); + boost::fibers::fiber( server, io_svc, std::ref( a) ).detach(); + // client + const unsigned iterations = 2; + const unsigned clients = 3; + boost::fibers::barrier b( clients); + for ( unsigned i = 0; i < clients; ++i) { + boost::fibers::fiber( + client, io_svc, std::ref( a), std::ref( b), iterations).detach(); + } +//] +//[asio_rr_run + io_svc->run(); +//] + print( tag(), ": io_service returned"); + print( "Thread ", thread_names.lookup(), ": stopping"); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + print("Exception: ", e.what(), "\n"); + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/detail/yield.hpp b/src/boost/libs/fiber/examples/asio/detail/yield.hpp new file mode 100644 index 00000000..01b680b5 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/detail/yield.hpp @@ -0,0 +1,328 @@ +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP +#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP + +#include <boost/asio/async_result.hpp> +#include <boost/asio/detail/config.hpp> +#include <boost/asio/handler_type.hpp> +#include <boost/assert.hpp> +#include <boost/atomic.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> +#include <boost/throw_exception.hpp> + +#include <boost/fiber/all.hpp> + +#include <mutex> // std::unique_lock + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +//[fibers_asio_yield_completion +// Bundle a completion bool flag with a spinlock to protect it. +struct yield_completion { + enum state_t { + init, + waiting, + complete + }; + + typedef fibers::detail::spinlock mutex_t; + typedef std::unique_lock< mutex_t > lock_t; + typedef boost::intrusive_ptr< yield_completion > ptr_t; + + std::atomic< std::size_t > use_count_{ 0 }; + mutex_t mtx_{}; + state_t state_{ init }; + + void wait() { + // yield_handler_base::operator()() will set state_ `complete` and + // attempt to wake a suspended fiber. It would be Bad if that call + // happened between our detecting (complete != state_) and suspending. + lock_t lk{ mtx_ }; + // If state_ is already set, we're done here: don't suspend. + if ( complete != state_) { + state_ = waiting; + // suspend(unique_lock<spinlock>) unlocks the lock in the act of + // resuming another fiber + fibers::context::active()->suspend( lk); + } + } + + friend void intrusive_ptr_add_ref( yield_completion * yc) noexcept { + BOOST_ASSERT( nullptr != yc); + yc->use_count_.fetch_add( 1, std::memory_order_relaxed); + } + + friend void intrusive_ptr_release( yield_completion * yc) noexcept { + BOOST_ASSERT( nullptr != yc); + if ( 1 == yc->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); + delete yc; + } + } +}; +//] + +//[fibers_asio_yield_handler_base +// This class encapsulates common elements between yield_handler<T> (capturing +// a value to return from asio async function) and yield_handler<void> (no +// such value). See yield_handler<T> and its <void> specialization below. Both +// yield_handler<T> and yield_handler<void> are passed by value through +// various layers of asio functions. In other words, they're potentially +// copied multiple times. So key data such as the yield_completion instance +// must be stored in our async_result<yield_handler<>> specialization, which +// should be instantiated only once. +class yield_handler_base { +public: + yield_handler_base( yield_t const& y) : + // capture the context* associated with the running fiber + ctx_{ boost::fibers::context::active() }, + // capture the passed yield_t + yt_( y ) { + } + + // completion callback passing only (error_code) + void operator()( boost::system::error_code const& ec) { + BOOST_ASSERT_MSG( ycomp_, + "Must inject yield_completion* " + "before calling yield_handler_base::operator()()"); + BOOST_ASSERT_MSG( yt_.ec_, + "Must inject boost::system::error_code* " + "before calling yield_handler_base::operator()()"); + // If originating fiber is busy testing state_ flag, wait until it + // has observed (completed != state_). + yield_completion::lock_t lk{ ycomp_->mtx_ }; + yield_completion::state_t state = ycomp_->state_; + // Notify a subsequent yield_completion::wait() call that it need not + // suspend. + ycomp_->state_ = yield_completion::complete; + // set the error_code bound by yield_t + * yt_.ec_ = ec; + // unlock the lock that protects state_ + lk.unlock(); + // If ctx_ is still active, e.g. because the async operation + // immediately called its callback (this method!) before the asio + // async function called async_result_base::get(), we must not set it + // ready. + if ( yield_completion::waiting == state) { + // wake the fiber + fibers::context::active()->schedule( ctx_); + } + } + +//private: + boost::fibers::context * ctx_; + yield_t yt_; + // We depend on this pointer to yield_completion, which will be injected + // by async_result. + yield_completion::ptr_t ycomp_{}; +}; +//] + +//[fibers_asio_yield_handler_T +// asio uses handler_type<completion token type, signature>::type to decide +// what to instantiate as the actual handler. Below, we specialize +// handler_type< yield_t, ... > to indicate yield_handler<>. So when you pass +// an instance of yield_t as an asio completion token, asio selects +// yield_handler<> as the actual handler class. +template< typename T > +class yield_handler: public yield_handler_base { +public: + // asio passes the completion token to the handler constructor + explicit yield_handler( yield_t const& y) : + yield_handler_base{ y } { + } + + // completion callback passing only value (T) + void operator()( T t) { + // just like callback passing success error_code + (*this)( boost::system::error_code(), std::move(t) ); + } + + // completion callback passing (error_code, T) + void operator()( boost::system::error_code const& ec, T t) { + BOOST_ASSERT_MSG( value_, + "Must inject value ptr " + "before caling yield_handler<T>::operator()()"); + // move the value to async_result<> instance BEFORE waking up a + // suspended fiber + * value_ = std::move( t); + // forward the call to base-class completion handler + yield_handler_base::operator()( ec); + } + +//private: + // pointer to destination for eventual value + // this must be injected by async_result before operator()() is called + T * value_{ nullptr }; +}; +//] + +//[fibers_asio_yield_handler_void +// yield_handler<void> is like yield_handler<T> without value_. In fact it's +// just like yield_handler_base. +template<> +class yield_handler< void >: public yield_handler_base { +public: + explicit yield_handler( yield_t const& y) : + yield_handler_base{ y } { + } + + // nullary completion callback + void operator()() { + ( * this)( boost::system::error_code() ); + } + + // inherit operator()(error_code) overload from base class + using yield_handler_base::operator(); +}; +//] + +// Specialize asio_handler_invoke hook to ensure that any exceptions thrown +// from the handler are propagated back to the caller +template< typename Fn, typename T > +void asio_handler_invoke( Fn&& fn, yield_handler< T > *) { + fn(); +} + +//[fibers_asio_async_result_base +// Factor out commonality between async_result<yield_handler<T>> and +// async_result<yield_handler<void>> +class async_result_base { +public: + explicit async_result_base( yield_handler_base & h) : + ycomp_{ new yield_completion{} } { + // Inject ptr to our yield_completion instance into this + // yield_handler<>. + h.ycomp_ = this->ycomp_; + // if yield_t didn't bind an error_code, make yield_handler_base's + // error_code* point to an error_code local to this object so + // yield_handler_base::operator() can unconditionally store through + // its error_code* + if ( ! h.yt_.ec_) { + h.yt_.ec_ = & ec_; + } + } + + void get() { + // Unless yield_handler_base::operator() has already been called, + // suspend the calling fiber until that call. + ycomp_->wait(); + // The only way our own ec_ member could have a non-default value is + // if our yield_handler did not have a bound error_code AND the + // completion callback passed a non-default error_code. + if ( ec_) { + throw_exception( boost::system::system_error{ ec_ } ); + } + } + +private: + // If yield_t does not bind an error_code instance, store into here. + boost::system::error_code ec_{}; + yield_completion::ptr_t ycomp_; +}; +//] + +}}}} + +namespace boost { +namespace asio { + +//[fibers_asio_async_result_T +// asio constructs an async_result<> instance from the yield_handler specified +// by handler_type<>::type. A particular asio async method constructs the +// yield_handler, constructs this async_result specialization from it, then +// returns the result of calling its get() method. +template< typename T > +class async_result< boost::fibers::asio::detail::yield_handler< T > > : + public boost::fibers::asio::detail::async_result_base { +public: + // type returned by get() + typedef T type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) : + boost::fibers::asio::detail::async_result_base{ h } { + // Inject ptr to our value_ member into yield_handler<>: result will + // be stored here. + h.value_ = & value_; + } + + // asio async method returns result of calling get() + type get() { + boost::fibers::asio::detail::async_result_base::get(); + return std::move( value_); + } + +private: + type value_{}; +}; +//] + +//[fibers_asio_async_result_void +// Without the need to handle a passed value, our yield_handler<void> +// specialization is just like async_result_base. +template<> +class async_result< boost::fibers::asio::detail::yield_handler< void > > : + public boost::fibers::asio::detail::async_result_base { +public: + typedef void type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h): + boost::fibers::asio::detail::async_result_base{ h } { + } +}; +//] + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts no parameters, +// use yield_handler<void>. +template< typename ReturnType > +struct handler_type< fibers::asio::yield_t, ReturnType() > +{ typedef fibers::asio::detail::yield_handler< void > type; }; + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts a data +// parameter, use yield_handler<parameter type> to return that parameter to +// the caller. +template< typename ReturnType, typename Arg1 > +struct handler_type< fibers::asio::yield_t, ReturnType( Arg1) > +{ typedef fibers::asio::detail::yield_handler< Arg1 > type; }; + +//[asio_handler_type +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts only +// error_code, use yield_handler<void>. yield_handler will take care of the +// error_code one way or another. +template< typename ReturnType > +struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code) > +{ typedef fibers::asio::detail::yield_handler< void > type; }; +//] + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts a data +// parameter and an error_code, use yield_handler<parameter type> to return +// just the parameter to the caller. yield_handler will take care of the +// error_code one way or another. +template< typename ReturnType, typename Arg2 > +struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code, Arg2) > +{ typedef fibers::asio::detail::yield_handler< Arg2 > type; }; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP diff --git a/src/boost/libs/fiber/examples/asio/exchange.cpp b/src/boost/libs/fiber/examples/asio/exchange.cpp new file mode 100644 index 00000000..8d7482af --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/exchange.cpp @@ -0,0 +1,56 @@ +// Copyright Arnaud Kapp, Oliver Kowalke 2016 +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <iostream> +#include <memory> +#include <thread> + +#include <boost/asio.hpp> + +#include <boost/fiber/all.hpp> +#include "round_robin.hpp" + +std::shared_ptr< boost::fibers::unbuffered_channel< int > > c; + +void foo() { + auto io_ptr = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr); + boost::fibers::fiber([io_ptr](){ + for ( int i = 0; i < 10; ++i) { + std::cout << "push " << i << std::endl; + c->push( i); + } + c->close(); + io_ptr->stop(); + }).detach(); + io_ptr->run(); +} + +void bar() { + auto io_ptr = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr); + boost::fibers::fiber([io_ptr](){ + try { + for (;;) { + int i = c->value_pop(); + std::cout << "pop " << i << std::endl; + } + } catch ( std::exception const& e) { + std::cout << "exception: " << e.what() << std::endl; + } + io_ptr->stop(); + }).detach(); + io_ptr->run(); +} + +int main() { + c = std::make_shared< boost::fibers::unbuffered_channel< int > >(); + std::thread t1( foo); + std::thread t2( bar); + t2.join(); + t1.join(); + std::cout << "done." << std::endl; + return 0; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/publisher.cpp b/src/boost/libs/fiber/examples/asio/ps/publisher.cpp new file mode 100644 index 00000000..76a63d77 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/publisher.cpp @@ -0,0 +1,52 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include <cstdlib> +#include <cstring> +#include <iostream> + +#include <boost/asio.hpp> + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: publisher <host> <queue>\n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9997"); + tcp::resolver::iterator iterator = resolver.resolve(query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string queue( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, queue.c_str(), queue.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + std::cout << "publish: "; + char request[max_length]; + std::cin.getline( request, max_length); + boost::asio::write( s, boost::asio::buffer( request, max_length) ); + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/server.cpp b/src/boost/libs/fiber/examples/asio/ps/server.cpp new file mode 100644 index 00000000..aeb58f23 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/server.cpp @@ -0,0 +1,381 @@ +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstddef> +#include <cstdlib> +#include <map> +#include <memory> +#include <set> +#include <iostream> +#include <string> + +#include <boost/asio.hpp> +#include <boost/utility.hpp> + +#include <boost/fiber/all.hpp> +#include "../round_robin.hpp" +#include "../yield.hpp" + +using boost::asio::ip::tcp; + +const std::size_t max_length = 1024; + +class subscriber_session; +typedef std::shared_ptr< subscriber_session > subscriber_session_ptr; + +// a queue has n subscribers (subscriptions) +// this class holds a list of subcribers for one queue +class subscriptions { +public: + ~subscriptions(); + + // subscribe to this queue + void subscribe( subscriber_session_ptr const& s) { + subscribers_.insert( s); + } + + // unsubscribe from this queue + void unsubscribe( subscriber_session_ptr const& s) { + subscribers_.erase(s); + } + + // publish a message, e.g. push this message to all subscribers + void publish( std::string const& msg); + +private: + // list of subscribers + std::set< subscriber_session_ptr > subscribers_; +}; + +// a class to register queues and to subsribe clients to this queues +class registry : private boost::noncopyable { +private: + typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont; + typedef queues_cont::iterator queues_iter; + + boost::fibers::mutex mtx_; + queues_cont queues_; + + void register_queue_( std::string const& queue) { + if ( queues_.end() != queues_.find( queue) ) { + throw std::runtime_error("queue already exists"); + } + queues_[queue] = std::make_shared< subscriptions >(); + std::cout << "new queue '" << queue << "' registered" << std::endl; + } + + void unregister_queue_( std::string const& queue) { + queues_.erase( queue); + std::cout << "queue '" << queue << "' unregistered" << std::endl; + } + + void subscribe_( std::string const& queue, subscriber_session_ptr s) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() == iter ) { + throw std::runtime_error("queue does not exist"); + } + iter->second->subscribe( s); + std::cout << "new subscription to queue '" << queue << "'" << std::endl; + } + + void unsubscribe_( std::string const& queue, subscriber_session_ptr s) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() != iter ) { + iter->second->unsubscribe( s); + } + } + + void publish_( std::string const& queue, std::string const& msg) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() == iter ) { + throw std::runtime_error("queue does not exist"); + } + iter->second->publish( msg); + std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl; + } + +public: + // add a queue to registry + void register_queue( std::string const& queue) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + register_queue_( queue); + } + + // remove a queue from registry + void unregister_queue( std::string const& queue) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unregister_queue_( queue); + } + + // subscribe to a queue + void subscribe( std::string const& queue, subscriber_session_ptr s) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + subscribe_( queue, s); + } + + // unsubscribe from a queue + void unsubscribe( std::string const& queue, subscriber_session_ptr s) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unsubscribe_( queue, s); + } + + // publish a message to all subscribers registerd to the queue + void publish( std::string const& queue, std::string const& msg) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + publish_( queue, msg); + } +}; + +// a subscriber subscribes to a given queue in order to receive messages published on this queue +class subscriber_session : public std::enable_shared_from_this< subscriber_session > { +public: + explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : + socket_( * io_service), + reg_( reg) { + } + + tcp::socket& socket() { + return socket_; + } + + // this function is executed inside the fiber + void run() { + std::string queue; + try { + boost::system::error_code ec; + // read first message == queue name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data_), + boost::fibers::asio::yield[ec]); + if ( ec) { + throw std::runtime_error("no queue from subscriber"); + } + // first message ist equal to the queue name the publisher + // publishes to + queue = data_; + // subscribe to new queue + reg_.subscribe( queue, shared_from_this() ); + // read published messages + for (;;) { + // wait for a conditon-variable for new messages + // the fiber will be suspended until the condtion + // becomes true and the fiber is resumed + // published message is stored in buffer 'data_' + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cond_.wait( lk); + std::string data( data_); + lk.unlock(); + // message '<fini>' terminates subscription + if ( "<fini>" == data) { + break; + } + // async. write message to socket connected with + // subscriber + // async_write() returns if the complete message was writen + // the fiber is suspended in the meanwhile + boost::asio::async_write( + socket_, + boost::asio::buffer( data, data.size() ), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + std::cout << "subscriber::run(): '" << data << "' written" << std::endl; + } + } catch ( std::exception const& e) { + std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl; + } + // close socket + socket_.close(); + // unregister queue + reg_.unsubscribe( queue, shared_from_this() ); + } + + // called from publisher_session (running in other fiber) + void publish( std::string const& msg) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + std::memset( data_, '\0', sizeof( data_)); + std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size())); + cond_.notify_one(); + } + +private: + tcp::socket socket_; + registry & reg_; + boost::fibers::mutex mtx_; + boost::fibers::condition_variable cond_; + // fixed size message + char data_[max_length]; +}; + + +subscriptions::~subscriptions() { + for ( subscriber_session_ptr s : subscribers_) { + s->publish("<fini>"); + } +} + +void +subscriptions::publish( std::string const& msg) { + for ( subscriber_session_ptr s : subscribers_) { + s->publish( msg); + } +} + +// a publisher publishes messages on its queue +// subscriber might register to this queue to get the published messages +class publisher_session : public std::enable_shared_from_this< publisher_session > { +public: + explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : + socket_( * io_service), + reg_( reg) { + } + + tcp::socket& socket() { + return socket_; + } + + // this function is executed inside the fiber + void run() { + std::string queue; + try { + boost::system::error_code ec; + // fixed size message + char data[max_length]; + // read first message == queue name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec) { + throw std::runtime_error("no queue from publisher"); + } + // first message ist equal to the queue name the publisher + // publishes to + queue = data; + // register the new queue + reg_.register_queue( queue); + // start publishing messages + for (;;) { + // read message from publisher asyncronous + // async_read() suspends this fiber until the complete emssage is read + // and stored in the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + // publish message to all subscribers + reg_.publish( queue, std::string( data) ); + } + } catch ( std::exception const& e) { + std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl; + } + // close socket + socket_.close(); + // unregister queue + reg_.unregister_queue( queue); + } + +private: + tcp::socket socket_; + registry & reg_; +}; + +typedef std::shared_ptr< publisher_session > publisher_session_ptr; + +// function accepts connections requests from clients acting as a publisher +void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service, + unsigned short port, + registry & reg) { + // create TCP-acceptor + tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); + // loop for accepting connection requests + for (;;) { + boost::system::error_code ec; + // create new publisher-session + // this instance will be associated with one publisher + publisher_session_ptr new_publisher_session = + std::make_shared< publisher_session >( io_service, std::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (publisher) + // is connected + acceptor.async_accept( + new_publisher_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new publisher in its own fiber (one fiber for one client) + boost::fibers::fiber( + std::bind( & publisher_session::run, new_publisher_session) ).detach(); + } + } +} + +// function accepts connections requests from clients acting as a subscriber +void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service, + unsigned short port, + registry & reg) { + // create TCP-acceptor + tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); + // loop for accepting connection requests + for (;;) { + boost::system::error_code ec; + // create new subscriber-session + // this instance will be associated with one subscriber + subscriber_session_ptr new_subscriber_session = + std::make_shared< subscriber_session >( io_service, std::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (subscriber) + // is connected + acceptor.async_accept( + new_subscriber_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new subscriber in its own fiber (one fiber for one client) + boost::fibers::fiber( + std::bind( & subscriber_session::run, new_subscriber_session) ).detach(); + } + } +} + + +int main( int argc, char* argv[]) { + try { + // create io_service for async. I/O + std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >(); + // register asio scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); + // registry for queues and its subscription + registry reg; + // create an acceptor for publishers, run it as fiber + boost::fibers::fiber( + accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach(); + // create an acceptor for subscribers, run it as fiber + boost::fibers::fiber( + accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach(); + // dispatch + io_service->run(); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp b/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp new file mode 100644 index 00000000..04e583e2 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp @@ -0,0 +1,53 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include <cstdlib> +#include <cstring> +#include <iostream> + +#include <boost/asio.hpp> + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: subscriber <host> <queue>\n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9998"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string queue( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, queue.c_str(), queue.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + char reply[max_length]; + size_t reply_length = s.read_some( boost::asio::buffer( reply, max_length) ); + std::cout << "published: "; + std::cout.write( reply, reply_length); + std::cout << std::endl; + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/round_robin.hpp b/src/boost/libs/fiber/examples/asio/round_robin.hpp new file mode 100644 index 00000000..b06bb35c --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/round_robin.hpp @@ -0,0 +1,194 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H +#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H + +#include <chrono> +#include <cstddef> +#include <memory> +#include <mutex> +#include <queue> + +#include <boost/asio.hpp> +#include <boost/assert.hpp> +#include <boost/asio/steady_timer.hpp> +#include <boost/config.hpp> + +#include <boost/fiber/condition_variable.hpp> +#include <boost/fiber/context.hpp> +#include <boost/fiber/mutex.hpp> +#include <boost/fiber/operations.hpp> +#include <boost/fiber/scheduler.hpp> + +#include "yield.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +class round_robin : public algo::algorithm { +private: +//[asio_rr_suspend_timer + std::shared_ptr< boost::asio::io_service > io_svc_; + boost::asio::steady_timer suspend_timer_; +//] + boost::fibers::scheduler::ready_queue_type rqueue_{}; + boost::fibers::mutex mtx_{}; + boost::fibers::condition_variable cnd_{}; + std::size_t counter_{ 0 }; + +public: +//[asio_rr_service_top + struct service : public boost::asio::io_service::service { + static boost::asio::io_service::id id; + + std::unique_ptr< boost::asio::io_service::work > work_; + + service( boost::asio::io_service & io_svc) : + boost::asio::io_service::service( io_svc), + work_{ new boost::asio::io_service::work( io_svc) } { + } + + virtual ~service() {} + + service( service const&) = delete; + service & operator=( service const&) = delete; + + void shutdown_service() override final { + work_.reset(); + } + }; +//] + +//[asio_rr_ctor + round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) : + io_svc_( io_svc), + suspend_timer_( * io_svc_) { + // We use add_service() very deliberately. This will throw + // service_already_exists if you pass the same io_service instance to + // more than one round_robin instance. + boost::asio::add_service( * io_svc_, new service( * io_svc_) ); + io_svc_->post([this]() mutable { +//] +//[asio_rr_service_lambda + while ( ! io_svc_->stopped() ) { + if ( has_ready_fibers() ) { + // run all pending handlers in round_robin + while ( io_svc_->poll() ); + // block this fiber till all pending (ready) fibers are processed + // == round_robin::suspend_until() has been called + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cnd_.wait( lk); + } else { + // run one handler inside io_service + // if no handler available, block this thread + if ( ! io_svc_->run_one() ) { + break; + } + } + } +//] + }); + } + + void awakened( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/ + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + ++counter_; + } + } + + context * pick_next() noexcept { + context * ctx( nullptr); + if ( ! rqueue_.empty() ) { /*< + pop an item from the ready queue + >*/ + ctx = & rqueue_.front(); + rqueue_.pop_front(); + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() != ctx); + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + --counter_; + } + } + return ctx; + } + + bool has_ready_fibers() const noexcept { + return 0 < counter_; + } + +//[asio_rr_suspend_until + void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { + // Set a timer so at least one handler will eventually fire, causing + // run_one() to eventually return. + if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { + // Each expires_at(time_point) call cancels any previous pending + // call. We could inadvertently spin like this: + // dispatcher calls suspend_until() with earliest wake time + // suspend_until() sets suspend_timer_ + // lambda loop calls run_one() + // some other asio handler runs before timer expires + // run_one() returns to lambda loop + // lambda loop yields to dispatcher + // dispatcher finds no ready fibers + // dispatcher calls suspend_until() with SAME wake time + // suspend_until() sets suspend_timer_ to same time, canceling + // previous async_wait() + // lambda loop calls run_one() + // asio calls suspend_timer_ handler with operation_aborted + // run_one() returns to lambda loop... etc. etc. + // So only actually set the timer when we're passed a DIFFERENT + // abs_time value. + suspend_timer_.expires_at( abs_time); + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); + } + cnd_.notify_one(); + } +//] + +//[asio_rr_notify + void notify() noexcept { + // Something has happened that should wake one or more fibers BEFORE + // suspend_timer_ expires. Reset the timer to cause it to fire + // immediately, causing the run_one() call to return. In theory we + // could use cancel() because we don't care whether suspend_timer_'s + // handler is called with operation_aborted or success. However -- + // cancel() doesn't change the expiration time, and we use + // suspend_timer_'s expiration time to decide whether it's already + // set. If suspend_until() set some specific wake time, then notify() + // canceled it, then suspend_until() was called again with the same + // wake time, it would match suspend_timer_'s expiration time and we'd + // refrain from setting the timer. So instead of simply calling + // cancel(), reset the timer, which cancels the pending sleep AND sets + // a new expiration time. This will cause us to spin the loop twice -- + // once for the operation_aborted handler, once for timer expiration + // -- but that shouldn't be a big problem. + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); + suspend_timer_.expires_at( std::chrono::steady_clock::now() ); + } +//] +}; + +boost::asio::io_service::id round_robin::service::id; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H diff --git a/src/boost/libs/fiber/examples/asio/yield.hpp b/src/boost/libs/fiber/examples/asio/yield.hpp new file mode 100644 index 00000000..d76467f9 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/yield.hpp @@ -0,0 +1,63 @@ +// Copyright 2003-2013 Christopher M. Kohlhoff +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + + +#ifndef BOOST_FIBERS_ASIO_YIELD_HPP +#define BOOST_FIBERS_ASIO_YIELD_HPP + +#include <boost/config.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +//[fibers_asio_yield_t +class yield_t { +public: + yield_t() = default; + + /** + * @code + * static yield_t yield; + * boost::system::error_code myec; + * func(yield[myec]); + * @endcode + * @c yield[myec] returns an instance of @c yield_t whose @c ec_ points + * to @c myec. The expression @c yield[myec] "binds" @c myec to that + * (anonymous) @c yield_t instance, instructing @c func() to store any + * @c error_code it might produce into @c myec rather than throwing @c + * boost::system::system_error. + */ + yield_t operator[]( boost::system::error_code & ec) const { + yield_t tmp; + tmp.ec_ = & ec; + return tmp; + } + +//private: + // ptr to bound error_code instance if any + boost::system::error_code * ec_{ nullptr }; +}; +//] + +//[fibers_asio_yield +// canonical instance +thread_local yield_t yield{}; +//] + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include "detail/yield.hpp" + +#endif // BOOST_FIBERS_ASIO_YIELD_HPP diff --git a/src/boost/libs/fiber/examples/barrier.cpp b/src/boost/libs/fiber/examples/barrier.cpp new file mode 100644 index 00000000..dedec6c6 --- /dev/null +++ b/src/boost/libs/fiber/examples/barrier.cpp @@ -0,0 +1,98 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <functional> +#include <iostream> +#include <stdexcept> + +#include <boost/assert.hpp> + +#include <boost/fiber/all.hpp> + +int value1 = 0; +int value2 = 0; + +inline +void fn1( boost::fibers::barrier & b) +{ + boost::fibers::fiber::id id = boost::this_fiber::get_id(); + std::cout << "fiber " << id << ": fn1 entered" << std::endl; + + ++value1; + std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl; + boost::this_fiber::yield(); + + std::cout << "fiber " << id << ": waits for barrier" << std::endl; + b.wait(); + std::cout << "fiber " << id << ": passed barrier" << std::endl; + + ++value1; + std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl; + boost::this_fiber::yield(); + + ++value1; + std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl; + boost::this_fiber::yield(); + + ++value1; + std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl; + boost::this_fiber::yield(); + + std::cout << "fiber " << id << ": fn1 returns" << std::endl; +} + +inline +void fn2( boost::fibers::barrier & b) +{ + boost::fibers::fiber::id id = boost::this_fiber::get_id(); + std::cout << "fiber " << id << ": fn2 entered" << std::endl; + + ++value2; + std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl; + boost::this_fiber::yield(); + + ++value2; + std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl; + boost::this_fiber::yield(); + + ++value2; + std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl; + boost::this_fiber::yield(); + + std::cout << "fiber " << id << ": waits for barrier" << std::endl; + b.wait(); + std::cout << "fiber " << id << ": passed barrier" << std::endl; + + ++value2; + std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl; + boost::this_fiber::yield(); + + std::cout << "fiber " << id << ": fn2 returns" << std::endl; +} + +int main() +{ + try + { + boost::fibers::barrier fb( 2); + + boost::fibers::fiber f1( & fn1, std::ref( fb) ); + boost::fibers::fiber f2( & fn2, std::ref( fb) ); + + f1.join(); + f2.join(); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } + catch ( std::exception const& e) + { std::cerr << "exception: " << e.what() << std::endl; } + catch (...) + { std::cerr << "unhandled exception" << std::endl; } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/cuda/Makefile b/src/boost/libs/fiber/examples/cuda/Makefile new file mode 100644 index 00000000..60a3098e --- /dev/null +++ b/src/boost/libs/fiber/examples/cuda/Makefile @@ -0,0 +1,29 @@ +CUDA_PATH := /opt/cuda + +NVCC := $(CUDA_PATH)/bin/nvcc + +CPPFLAGS := -O2 -std=c++11 +LDFLAGS := -g -L/usr/local/lib +INCLUDES := -I/usr/local/include -I$(CUDA_PATH)/include +LIBRARIES := -lboost_fiber -lboost_context -lboost_system -lboost_filesystem + +all: build + +build: single_stream multiple_streams + +single_stream.o:single_stream.cu + $(NVCC) $(INCLUDES) $(CPPFLAGS) -o $@ -c $< + +single_stream: single_stream.o + $(NVCC) $(LDFLAGS) -o $@ $+ $(LIBRARIES) + +multiple_streams.o:multiple_streams.cu + $(NVCC) $(INCLUDES) $(CPPFLAGS) -o $@ -c $< + +multiple_streams: multiple_streams.o + $(NVCC) $(LDFLAGS) -o $@ $+ $(LIBRARIES) + +clean: + rm -f single_stream single_stream.o multiple_streams multiple_streams.o + +clobber: clean diff --git a/src/boost/libs/fiber/examples/cuda/multiple_streams.cu b/src/boost/libs/fiber/examples/cuda/multiple_streams.cu new file mode 100644 index 00000000..0c665e29 --- /dev/null +++ b/src/boost/libs/fiber/examples/cuda/multiple_streams.cu @@ -0,0 +1,112 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + + +#include <chrono> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <random> +#include <tuple> + +#include <cuda.h> + +#include <boost/assert.hpp> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/cuda/waitfor.hpp> + +__global__ +void vector_add( int * a, int * b, int * c, int size) { + int idx = threadIdx.x + blockIdx.x * blockDim.x; + if ( idx < size) { + c[idx] = a[idx] + b[idx]; + } +} + +int main() { + try { + bool done = false; + boost::fibers::fiber f1( [&done]{ + std::cout << "f1: entered" << std::endl; + try { + cudaStream_t stream0, stream1; + cudaStreamCreate( & stream0); + cudaStreamCreate( & stream1); + int size = 1024 * 1024; + int full_size = 20 * size; + int * host_a, * host_b, * host_c; + cudaHostAlloc( & host_a, full_size * sizeof( int), cudaHostAllocDefault); + cudaHostAlloc( & host_b, full_size * sizeof( int), cudaHostAllocDefault); + cudaHostAlloc( & host_c, full_size * sizeof( int), cudaHostAllocDefault); + int * dev_a0, * dev_b0, * dev_c0; + int * dev_a1, * dev_b1, * dev_c1; + cudaMalloc( & dev_a0, size * sizeof( int) ); + cudaMalloc( & dev_b0, size * sizeof( int) ); + cudaMalloc( & dev_c0, size * sizeof( int) ); + cudaMalloc( & dev_a1, size * sizeof( int) ); + cudaMalloc( & dev_b1, size * sizeof( int) ); + cudaMalloc( & dev_c1, size * sizeof( int) ); + std::minstd_rand generator; + std::uniform_int_distribution<> distribution(1, 6); + for ( int i = 0; i < full_size; ++i) { + host_a[i] = distribution( generator); + host_b[i] = distribution( generator); + } + for ( int i = 0; i < full_size; i += 2 * size) { + cudaMemcpyAsync( dev_a0, host_a + i, size * sizeof( int), cudaMemcpyHostToDevice, stream0); + cudaMemcpyAsync( dev_a1, host_a + i + size, size * sizeof( int), cudaMemcpyHostToDevice, stream1); + cudaMemcpyAsync( dev_b0, host_b + i, size * sizeof( int), cudaMemcpyHostToDevice, stream0); + cudaMemcpyAsync( dev_b1, host_b + i + size, size * sizeof( int), cudaMemcpyHostToDevice, stream1); + vector_add<<< size / 256, 256, 0, stream0 >>>( dev_a0, dev_b0, dev_c0, size); + vector_add<<< size / 256, 256, 0, stream1 >>>( dev_a1, dev_b1, dev_c1, size); + cudaMemcpyAsync( host_c + i, dev_c0, size * sizeof( int), cudaMemcpyDeviceToHost, stream0); + cudaMemcpyAsync( host_c + i + size, dev_c1, size * sizeof( int), cudaMemcpyDeviceToHost, stream1); + } + auto results = boost::fibers::cuda::waitfor_all( stream0, stream1); + for ( auto & result : results) { + BOOST_ASSERT( stream0 == std::get< 0 >( result) || stream1 == std::get< 0 >( result) ); + BOOST_ASSERT( cudaSuccess == std::get< 1 >( result) ); + } + std::cout << "f1: GPU computation finished" << std::endl; + cudaFreeHost( host_a); + cudaFreeHost( host_b); + cudaFreeHost( host_c); + cudaFree( dev_a0); + cudaFree( dev_b0); + cudaFree( dev_c0); + cudaFree( dev_a1); + cudaFree( dev_b1); + cudaFree( dev_c1); + cudaStreamDestroy( stream0); + cudaStreamDestroy( stream1); + done = true; + } catch ( std::exception const& ex) { + std::cerr << "exception: " << ex.what() << std::endl; + } + std::cout << "f1: leaving" << std::endl; + }); + boost::fibers::fiber f2([&done]{ + std::cout << "f2: entered" << std::endl; + while ( ! done) { + std::cout << "f2: sleeping" << std::endl; + boost::this_fiber::sleep_for( std::chrono::milliseconds( 1 ) ); + } + std::cout << "f2: leaving" << std::endl; + }); + f1.join(); + f2.join(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/cuda/single_stream.cu b/src/boost/libs/fiber/examples/cuda/single_stream.cu new file mode 100644 index 00000000..79f398a1 --- /dev/null +++ b/src/boost/libs/fiber/examples/cuda/single_stream.cu @@ -0,0 +1,96 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <random> +#include <tuple> + +#include <cuda.h> + +#include <boost/assert.hpp> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/cuda/waitfor.hpp> + +__global__ +void vector_add( int * a, int * b, int * c, int size) { + int idx = threadIdx.x + blockIdx.x * blockDim.x; + if ( idx < size) { + c[idx] = a[idx] + b[idx]; + } +} + +int main() { + try { + bool done = false; + boost::fibers::fiber f1([&done]{ + std::cout << "f1: entered" << std::endl; + try { + cudaStream_t stream; + cudaStreamCreate( & stream); + int size = 1024 * 1024; + int full_size = 20 * size; + int * host_a, * host_b, * host_c; + cudaHostAlloc( & host_a, full_size * sizeof( int), cudaHostAllocDefault); + cudaHostAlloc( & host_b, full_size * sizeof( int), cudaHostAllocDefault); + cudaHostAlloc( & host_c, full_size * sizeof( int), cudaHostAllocDefault); + int * dev_a, * dev_b, * dev_c; + cudaMalloc( & dev_a, size * sizeof( int) ); + cudaMalloc( & dev_b, size * sizeof( int) ); + cudaMalloc( & dev_c, size * sizeof( int) ); + std::minstd_rand generator; + std::uniform_int_distribution<> distribution(1, 6); + for ( int i = 0; i < full_size; ++i) { + host_a[i] = distribution( generator); + host_b[i] = distribution( generator); + } + for ( int i = 0; i < full_size; i += size) { + cudaMemcpyAsync( dev_a, host_a + i, size * sizeof( int), cudaMemcpyHostToDevice, stream); + cudaMemcpyAsync( dev_b, host_b + i, size * sizeof( int), cudaMemcpyHostToDevice, stream); + vector_add<<< size / 256, 256, 0, stream >>>( dev_a, dev_b, dev_c, size); + cudaMemcpyAsync( host_c + i, dev_c, size * sizeof( int), cudaMemcpyDeviceToHost, stream); + } + auto result = boost::fibers::cuda::waitfor_all( stream); + BOOST_ASSERT( stream == std::get< 0 >( result) ); + BOOST_ASSERT( cudaSuccess == std::get< 1 >( result) ); + std::cout << "f1: GPU computation finished" << std::endl; + cudaFreeHost( host_a); + cudaFreeHost( host_b); + cudaFreeHost( host_c); + cudaFree( dev_a); + cudaFree( dev_b); + cudaFree( dev_c); + cudaStreamDestroy( stream); + done = true; + } catch ( std::exception const& ex) { + std::cerr << "exception: " << ex.what() << std::endl; + } + std::cout << "f1: leaving" << std::endl; + }); + boost::fibers::fiber f2([&done]{ + std::cout << "f2: entered" << std::endl; + while ( ! done) { + std::cout << "f2: sleeping" << std::endl; + boost::this_fiber::sleep_for( std::chrono::milliseconds( 1 ) ); + } + std::cout << "f2: leaving" << std::endl; + }); + f1.join(); + f2.join(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/future.cpp b/src/boost/libs/fiber/examples/future.cpp new file mode 100644 index 00000000..cc7c6b0f --- /dev/null +++ b/src/boost/libs/fiber/examples/future.cpp @@ -0,0 +1,50 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <exception> +#include <functional> +#include <iostream> +#include <string> + +#include <boost/fiber/all.hpp> + +inline +int fn( std::string const& str, int n) +{ + for ( int i = 0; i < n; ++i) + { + std::cout << i << ": " << str << std::endl; + boost::this_fiber::yield(); + } + + return n; +} + +void start() +{ + boost::fibers::future< int > fi( + boost::fibers::async( + std::bind( fn, "abc", 5) ) ); + fi.wait(); + std::cout << "fn() returned " << fi.get() << std::endl; +} + +int main() +{ + try + { + boost::fibers::fiber( start).join(); + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } + catch ( std::exception const& e) + { std::cerr << "exception: " << e.what() << std::endl; } + catch (...) + { std::cerr << "unhandled exception" << std::endl; } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/hip/Makefile b/src/boost/libs/fiber/examples/hip/Makefile new file mode 100644 index 00000000..d40e8c1a --- /dev/null +++ b/src/boost/libs/fiber/examples/hip/Makefile @@ -0,0 +1,29 @@ +HIP_PATH := /opt/rocm/hip + +HIPCC := $(HIP_PATH)/bin/hipcc + +CPPFLAGS := -O2 -std=c++11 +LDFLAGS := -L/usr/local/lib +INCLUDES := -I/usr/local/include -I$(HIP_PATH)/include +LIBRARIES := -lboost_fiber -lboost_context -lboost_system -lboost_filesystem + +all: build + +build: single_stream multiple_streams + +single_stream.o:single_stream.cpp + $(HIPCC) $(INCLUDES) $(CPPFLAGS) -o $@ -c $< + +single_stream: single_stream.o + $(HIPCC) $(LDFLAGS) -o $@ $+ $(LIBRARIES) + +multiple_streams.o:multiple_streams.cpp + $(HIPCC) $(INCLUDES) $(CPPFLAGS) -o $@ -c $< + +multiple_streams: multiple_streams.o + $(HIPCC) $(LDFLAGS) -o $@ $+ $(LIBRARIES) + +clean: + rm -f single_stream single_stream.o multiple_streams multiple_streams.o + +clobber: clean diff --git a/src/boost/libs/fiber/examples/hip/multiple_streams.cpp b/src/boost/libs/fiber/examples/hip/multiple_streams.cpp new file mode 100644 index 00000000..75a7449c --- /dev/null +++ b/src/boost/libs/fiber/examples/hip/multiple_streams.cpp @@ -0,0 +1,111 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <random> +#include <tuple> + +#include <hip/hip_runtime.h> + +#include <boost/assert.hpp> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/hip/waitfor.hpp> + +__global__ +void vector_add(hipLaunchParm lp, int * a, int * b, int * c, int size) { + int idx = threadIdx.x + blockIdx.x * blockDim.x; + if ( idx < size) { + c[idx] = a[idx] + b[idx]; + } +} + +int main() { + try { + bool done = false; + boost::fibers::fiber f1( [&done]{ + std::cout << "f1: entered" << std::endl; + try { + hipStream_t stream0, stream1; + hipStreamCreate( & stream0); + hipStreamCreate( & stream1); + int size = 1024 * 1024; + int full_size = 20 * size; + int * host_a, * host_b, * host_c; + hipHostMalloc( & host_a, full_size * sizeof( int), hipHostMallocDefault); + hipHostMalloc( & host_b, full_size * sizeof( int), hipHostMallocDefault); + hipHostMalloc( & host_c, full_size * sizeof( int), hipHostMallocDefault); + int * dev_a0, * dev_b0, * dev_c0; + int * dev_a1, * dev_b1, * dev_c1; + hipMalloc( & dev_a0, size * sizeof( int) ); + hipMalloc( & dev_b0, size * sizeof( int) ); + hipMalloc( & dev_c0, size * sizeof( int) ); + hipMalloc( & dev_a1, size * sizeof( int) ); + hipMalloc( & dev_b1, size * sizeof( int) ); + hipMalloc( & dev_c1, size * sizeof( int) ); + std::minstd_rand generator; + std::uniform_int_distribution<> distribution(1, 6); + for ( int i = 0; i < full_size; ++i) { + host_a[i] = distribution( generator); + host_b[i] = distribution( generator); + } + for ( int i = 0; i < full_size; i += 2 * size) { + hipMemcpyAsync( dev_a0, host_a + i, size * sizeof( int), hipMemcpyHostToDevice, stream0); + hipMemcpyAsync( dev_a1, host_a + i + size, size * sizeof( int), hipMemcpyHostToDevice, stream1); + hipMemcpyAsync( dev_b0, host_b + i, size * sizeof( int), hipMemcpyHostToDevice, stream0); + hipMemcpyAsync( dev_b1, host_b + i + size, size * sizeof( int), hipMemcpyHostToDevice, stream1); + hipLaunchKernel( vector_add, dim3(size / 256), dim3(256), 0, stream0, dev_a0, dev_b0, dev_c0, size); + hipLaunchKernel( vector_add, dim3(size / 256), dim3(256), 0, stream1, dev_a1, dev_b1, dev_c1, size); + hipMemcpyAsync( host_c + i, dev_c0, size * sizeof( int), hipMemcpyDeviceToHost, stream0); + hipMemcpyAsync( host_c + i + size, dev_c1, size * sizeof( int), hipMemcpyDeviceToHost, stream1); + } + auto results = boost::fibers::hip::waitfor_all( stream0, stream1); + for ( auto & result : results) { + BOOST_ASSERT( stream0 == std::get< 0 >( result) || stream1 == std::get< 0 >( result) ); + BOOST_ASSERT( hipSuccess == std::get< 1 >( result) ); + } + std::cout << "f1: GPU computation finished" << std::endl; + hipHostFree( host_a); + hipHostFree( host_b); + hipHostFree( host_c); + hipFree( dev_a0); + hipFree( dev_b0); + hipFree( dev_c0); + hipFree( dev_a1); + hipFree( dev_b1); + hipFree( dev_c1); + hipStreamDestroy( stream0); + hipStreamDestroy( stream1); + done = true; + } catch ( std::exception const& ex) { + std::cerr << "exception: " << ex.what() << std::endl; + } + std::cout << "f1: leaving" << std::endl; + }); + boost::fibers::fiber f2([&done]{ + std::cout << "f2: entered" << std::endl; + while ( ! done) { + std::cout << "f2: sleeping" << std::endl; + boost::this_fiber::sleep_for( std::chrono::milliseconds( 1 ) ); + } + std::cout << "f2: leaving" << std::endl; + }); + f1.join(); + f2.join(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/hip/single_stream.cpp b/src/boost/libs/fiber/examples/hip/single_stream.cpp new file mode 100644 index 00000000..1959528e --- /dev/null +++ b/src/boost/libs/fiber/examples/hip/single_stream.cpp @@ -0,0 +1,96 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <random> +#include <tuple> + +#include <hip/hip_runtime.h> + +#include <boost/assert.hpp> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/hip/waitfor.hpp> + +__global__ +void vector_add(hipLaunchParm lp, int * a, int * b, int * c, int size) { + int idx = threadIdx.x + blockIdx.x * blockDim.x; + if ( idx < size) { + c[idx] = a[idx] + b[idx]; + } +} + +int main() { + try { + bool done = false; + boost::fibers::fiber f1([&done]{ + std::cout << "f1: entered" << std::endl; + try { + hipStream_t stream; + hipStreamCreate( & stream); + int size = 1024 * 1024; + int full_size = 20 * size; + int * host_a, * host_b, * host_c; + hipHostMalloc( & host_a, full_size * sizeof( int), hipHostMallocDefault); + hipHostMalloc( & host_b, full_size * sizeof( int), hipHostMallocDefault); + hipHostMalloc( & host_c, full_size * sizeof( int), hipHostMallocDefault); + int * dev_a, * dev_b, * dev_c; + hipMalloc( & dev_a, size * sizeof( int) ); + hipMalloc( & dev_b, size * sizeof( int) ); + hipMalloc( & dev_c, size * sizeof( int) ); + std::minstd_rand generator; + std::uniform_int_distribution<> distribution(1, 6); + for ( int i = 0; i < full_size; ++i) { + host_a[i] = distribution( generator); + host_b[i] = distribution( generator); + } + for ( int i = 0; i < full_size; i += size) { + hipMemcpyAsync( dev_a, host_a + i, size * sizeof( int), hipMemcpyHostToDevice, stream); + hipMemcpyAsync( dev_b, host_b + i, size * sizeof( int), hipMemcpyHostToDevice, stream); + hipLaunchKernel( vector_add, dim3(size / 256), dim3(256), 0, stream, dev_a, dev_b, dev_c, size); + hipMemcpyAsync( host_c + i, dev_c, size * sizeof( int), hipMemcpyDeviceToHost, stream); + } + auto result = boost::fibers::hip::waitfor_all( stream); + BOOST_ASSERT( stream == std::get< 0 >( result) ); + BOOST_ASSERT( hipSuccess == std::get< 1 >( result) ); + std::cout << "f1: GPU computation finished" << std::endl; + hipHostFree( host_a); + hipHostFree( host_b); + hipHostFree( host_c); + hipFree( dev_a); + hipFree( dev_b); + hipFree( dev_c); + hipStreamDestroy( stream); + done = true; + } catch ( std::exception const& ex) { + std::cerr << "exception: " << ex.what() << std::endl; + } + std::cout << "f1: leaving" << std::endl; + }); + boost::fibers::fiber f2([&done]{ + std::cout << "f2: entered" << std::endl; + while ( ! done) { + std::cout << "f2: sleeping" << std::endl; + boost::this_fiber::sleep_for( std::chrono::milliseconds( 1 ) ); + } + std::cout << "f2: leaving" << std::endl; + }); + f1.join(); + f2.join(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/join.cpp b/src/boost/libs/fiber/examples/join.cpp new file mode 100644 index 00000000..7ece0500 --- /dev/null +++ b/src/boost/libs/fiber/examples/join.cpp @@ -0,0 +1,67 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <functional> +#include <stdexcept> +#include <iostream> +#include <string> + +#include <boost/fiber/all.hpp> + +int value1 = 0; +int value2 = 0; + +void fn1() +{ + boost::fibers::fiber::id this_id = boost::this_fiber::get_id(); + for ( int i = 0; i < 5; ++i) + { + ++value1; + std::cout << "fiber " << this_id << " fn1: increment value1: " << value1 << std::endl; + boost::this_fiber::yield(); + } + std::cout << "fiber " << this_id << " fn1: returns" << std::endl; +} + +void fn2( boost::fibers::fiber & f) +{ + boost::fibers::fiber::id this_id = boost::this_fiber::get_id(); + for ( int i = 0; i < 5; ++i) + { + ++value2; + std::cout << "fiber " << this_id << " fn2: increment value2: " << value2 << std::endl; + if ( i == 1) + { + boost::fibers::fiber::id id = f.get_id(); + std::cout << "fiber " << this_id << " fn2: joins fiber " << id << std::endl; + f.join(); + std::cout << "fiber " << this_id << " fn2: joined fiber " << id << std::endl; + } + boost::this_fiber::yield(); + } + std::cout << "fiber " << this_id << " fn2: returns" << std::endl; +} + +int main() +{ + try + { + boost::fibers::fiber f1( fn1); + boost::fibers::fiber f2( fn2, std::ref( f1) ); + + f2.join(); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } + catch ( std::exception const& e) + { std::cerr << "exception: " << e.what() << std::endl; } + catch (...) + { std::cerr << "unhandled exception" << std::endl; } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/numa/Jamfile.v2 b/src/boost/libs/fiber/examples/numa/Jamfile.v2 new file mode 100644 index 00000000..15420cb5 --- /dev/null +++ b/src/boost/libs/fiber/examples/numa/Jamfile.v2 @@ -0,0 +1,31 @@ +# Boost.Fiber Library Examples Jamfile + +# Copyright Oliver Kowalke 2017. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +# For more information, see http://www.boost.org/ + +import common ; +import feature ; +import indirect ; +import modules ; +import os ; +import toolset ; + +project boost/fiber/example/numa + : requirements + <library>../../build//boost_fiber + <library>../../build//boost_fiber_numa + <target-os>solaris:<linkflags>"-llgrp" + <target-os>windows:<define>_WIN32_WINNT=0x0601 + <toolset>gcc,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>gcc,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <toolset>clang,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>clang,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <link>static + <threading>multi + ; + +exe topology : topology.cpp ; diff --git a/src/boost/libs/fiber/examples/numa/topology.cpp b/src/boost/libs/fiber/examples/numa/topology.cpp new file mode 100644 index 00000000..9b85e3f4 --- /dev/null +++ b/src/boost/libs/fiber/examples/numa/topology.cpp @@ -0,0 +1,37 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <cstring> +#include <exception> +#include <iostream> +#include <vector> + +#include <boost/assert.hpp> +#include <boost/fiber/numa/topology.hpp> + +int main( int argc, char * argv[]) { + try { + std::vector< boost::fibers::numa::node > topo = boost::fibers::numa::topology(); + for ( auto n : topo) { + std::cout << "node: " << n.id << " | "; + std::cout << "cpus: "; + for ( auto cpu_id : n.logical_cpus) { + std::cout << cpu_id << " "; + } + std::cout << "| distance: "; + for ( auto d : n.distance) { + std::cout << d << " "; + } + std::cout << std::endl; + } + std::cout << "done" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& ex) { + std::cerr << "exception: " << ex.what() << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/ping_pong.cpp b/src/boost/libs/fiber/examples/ping_pong.cpp new file mode 100644 index 00000000..aa56a411 --- /dev/null +++ b/src/boost/libs/fiber/examples/ping_pong.cpp @@ -0,0 +1,48 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <iostream> +#include <stdexcept> +#include <string> + +#include <boost/fiber/all.hpp> + +int main() { + using channel_t = boost::fibers::buffered_channel< std::string >; + try { + channel_t chan1{ 2 }, chan2{ 2 }; + + boost::fibers::fiber fping([&chan1,&chan2]{ + chan1.push( "ping"); + std::cout << chan2.value_pop() << "\n"; + chan1.push( "ping"); + std::cout << chan2.value_pop() << "\n"; + chan1.push( "ping"); + std::cout << chan2.value_pop() << "\n"; + }); + boost::fibers::fiber fpong([&chan1,&chan2]{ + std::cout << chan1.value_pop() << "\n"; + chan2.push( "pong"); + std::cout << chan1.value_pop() << "\n"; + chan2.push( "pong"); + std::cout << chan1.value_pop() << "\n"; + chan2.push( "pong"); + }); + + fping.join(); + fpong.join(); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } + catch ( std::exception const& e) + { std::cerr << "exception: " << e.what() << std::endl; } + catch (...) + { std::cerr << "unhandled exception" << std::endl; } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/priority.cpp b/src/boost/libs/fiber/examples/priority.cpp new file mode 100644 index 00000000..5f9c51e8 --- /dev/null +++ b/src/boost/libs/fiber/examples/priority.cpp @@ -0,0 +1,353 @@ +// Copyright Nat Goodspeed 2014. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <algorithm> // std::find_if() + +#include <boost/fiber/all.hpp> +#include <boost/fiber/scheduler.hpp> + +class Verbose { +public: + Verbose( std::string const& d, std::string const& s="stop") : + desc( d), + stop( s) { + std::cout << desc << " start" << std::endl; + } + + ~Verbose() { + std::cout << desc << ' ' << stop << std::endl; + } + + Verbose( Verbose const&) = delete; + Verbose & operator=( Verbose const&) = delete; + +private: + std::string desc; + std::string stop; +}; + +//[priority_props +class priority_props : public boost::fibers::fiber_properties { +public: + priority_props( boost::fibers::context * ctx): + fiber_properties( ctx), /*< Your subclass constructor must accept a + [^[class_link context]*] and pass it to + the `fiber_properties` constructor. >*/ + priority_( 0) { + } + + int get_priority() const { + return priority_; /*< Provide read access methods at your own discretion. >*/ + } + + // Call this method to alter priority, because we must notify + // priority_scheduler of any change. + void set_priority( int p) { /*< + It's important to call `notify()` on any + change in a property that can affect the + scheduler's behavior. Therefore, such + modifications should only be performed + through an access method. >*/ + // Of course, it's only worth reshuffling the queue and all if we're + // actually changing the priority. + if ( p != priority_) { + priority_ = p; + notify(); + } + } + + // The fiber name of course is solely for purposes of this example + // program; it has nothing to do with implementing scheduler priority. + // This is a public data member -- not requiring set/get access methods -- + // because we need not inform the scheduler of any change. + std::string name; /*< A property that does not affect the scheduler does + not need access methods. >*/ +private: + int priority_; +}; +//] + +//[priority_scheduler +class priority_scheduler : + public boost::fibers::algo::algorithm_with_properties< priority_props > { +private: + typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/ rqueue_t; + + rqueue_t rqueue_; + std::mutex mtx_{}; + std::condition_variable cnd_{}; + bool flag_{ false }; + +public: + priority_scheduler() : + rqueue_() { + } + + // For a subclass of algorithm_with_properties<>, it's important to + // override the correct awakened() overload. + /*<< You must override the [member_link algorithm_with_properties..awakened] + method. This is how your scheduler receives notification of a + fiber that has become ready to run. >>*/ + virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept { + int ctx_priority = props.get_priority(); /*< `props` is the instance of + priority_props associated + with the passed fiber `ctx`. >*/ + // With this scheduler, fibers with higher priority values are + // preferred over fibers with lower priority values. But fibers with + // equal priority values are processed in round-robin fashion. So when + // we're handed a new context*, put it at the end of the fibers + // with that same priority. In other words: search for the first fiber + // in the queue with LOWER priority, and insert before that one. + rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(), + [ctx_priority,this]( boost::fibers::context & c) + { return properties( &c ).get_priority() < ctx_priority; })); + // Now, whether or not we found a fiber with lower priority, + // insert this new fiber here. + rqueue_.insert( i, * ctx); +//<- + + std::cout << "awakened(" << props.name << "): "; + describe_ready_queue(); +//-> + } + + /*<< You must override the [member_link algorithm_with_properties..pick_next] + method. This is how your scheduler actually advises the fiber manager + of the next fiber to run. >>*/ + virtual boost::fibers::context * pick_next() noexcept { + // if ready queue is empty, just tell caller + if ( rqueue_.empty() ) { + return nullptr; + } + boost::fibers::context * ctx( & rqueue_.front() ); + rqueue_.pop_front(); +//<- + std::cout << "pick_next() resuming " << properties( ctx).name << ": "; + describe_ready_queue(); +//-> + return ctx; + } + + /*<< You must override [member_link algorithm_with_properties..has_ready_fibers] + to inform the fiber manager of the state of your ready queue. >>*/ + virtual bool has_ready_fibers() const noexcept { + return ! rqueue_.empty(); + } + + /*<< Overriding [member_link algorithm_with_properties..property_change] + is optional. This override handles the case in which the running + fiber changes the priority of another ready fiber: a fiber already in + our queue. In that case, move the updated fiber within the queue. >>*/ + virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept { + // Although our priority_props class defines multiple properties, only + // one of them (priority) actually calls notify() when changed. The + // point of a property_change() override is to reshuffle the ready + // queue according to the updated priority value. +//<- + std::cout << "property_change(" << props.name << '(' << props.get_priority() + << ")): "; +//-> + + // 'ctx' might not be in our queue at all, if caller is changing the + // priority of (say) the running fiber. If it's not there, no need to + // move it: we'll handle it next time it hits awakened(). + if ( ! ctx->ready_is_linked()) { /*< + Your `property_change()` override must be able to + handle the case in which the passed `ctx` is not in + your ready queue. It might be running, or it might be + blocked. >*/ +//<- + // hopefully user will distinguish this case by noticing that + // the fiber with which we were called does not appear in the + // ready queue at all + describe_ready_queue(); +//-> + return; + } + + // Found ctx: unlink it + ctx->ready_unlink(); + + // Here we know that ctx was in our ready queue, but we've unlinked + // it. We happen to have a method that will (re-)add a context* to the + // right place in the ready queue. + awakened( ctx, props); + } +//<- + + void describe_ready_queue() { + if ( rqueue_.empty() ) { + std::cout << "[empty]"; + } else { + const char * delim = ""; + for ( boost::fibers::context & ctx : rqueue_) { + priority_props & props( properties( & ctx) ); + std::cout << delim << props.name << '(' << props.get_priority() << ')'; + delim = ", "; + } + } + std::cout << std::endl; + } +//-> + + void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept { + if ( (std::chrono::steady_clock::time_point::max)() == time_point) { + std::unique_lock< std::mutex > lk( mtx_); + cnd_.wait( lk, [this](){ return flag_; }); + flag_ = false; + } else { + std::unique_lock< std::mutex > lk( mtx_); + cnd_.wait_until( lk, time_point, [this](){ return flag_; }); + flag_ = false; + } + } + + void notify() noexcept { + std::unique_lock< std::mutex > lk( mtx_); + flag_ = true; + lk.unlock(); + cnd_.notify_all(); + } +}; +//] + +//[launch +template< typename Fn > +boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) { + boost::fibers::fiber fiber( func); + priority_props & props( fiber.properties< priority_props >() ); + props.name = name; + props.set_priority( priority); + return fiber; +} +//] + +void yield_fn() { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + for ( int i = 0; i < 3; ++i) { + std::cout << "fiber " << name << " yielding" << std::endl; + boost::this_fiber::yield(); + } +} + +void barrier_fn( boost::fibers::barrier & barrier) { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + std::cout << "fiber " << name << " waiting on barrier" << std::endl; + barrier.wait(); + std::cout << "fiber " << name << " yielding" << std::endl; + boost::this_fiber::yield(); +} + +//[change_fn +void change_fn( boost::fibers::fiber & other, + int other_priority, + boost::fibers::barrier& barrier) { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + +//<- + std::cout << "fiber " << name << " waiting on barrier" << std::endl; +//-> + barrier.wait(); + // We assume a couple things about 'other': + // - that it was also waiting on the same barrier + // - that it has lower priority than this fiber. + // If both are true, 'other' is now ready to run but is sitting in + // priority_scheduler's ready queue. Change its priority. + priority_props & other_props( + other.properties< priority_props >() ); +//<- + std::cout << "fiber " << name << " changing priority of " << other_props.name + << " to " << other_priority << std::endl; +//-> + other_props.set_priority( other_priority); +} +//] + +//[main +int main( int argc, char *argv[]) { + // make sure we use our priority_scheduler rather than default round_robin + boost::fibers::use_scheduling_algorithm< priority_scheduler >(); +/*= ...*/ +/*=}*/ +//] + Verbose v("main()"); + + // for clarity + std::cout << "main() setting name" << std::endl; +//[main_name + boost::this_fiber::properties< priority_props >().name = "main"; +//] + std::cout << "main() running tests" << std::endl; + + { + Verbose v("high-priority first", "stop\n"); + // verify that high-priority fiber always gets scheduled first + boost::fibers::fiber low( launch( yield_fn, "low", 1) ); + boost::fibers::fiber med( launch( yield_fn, "medium", 2) ); + boost::fibers::fiber hi( launch( yield_fn, "high", 3) ); + std::cout << "main: high.join()" << std::endl; + hi.join(); + std::cout << "main: medium.join()" << std::endl; + med.join(); + std::cout << "main: low.join()" << std::endl; + low.join(); + } + + { + Verbose v("same priority round-robin", "stop\n"); + // fibers of same priority are scheduled in round-robin order + boost::fibers::fiber a( launch( yield_fn, "a", 0) ); + boost::fibers::fiber b( launch( yield_fn, "b", 0) ); + boost::fibers::fiber c( launch( yield_fn, "c", 0) ); + std::cout << "main: a.join()" << std::endl; + a.join(); + std::cout << "main: b.join()" << std::endl; + b.join(); + std::cout << "main: c.join()" << std::endl; + c.join(); + } + + { + Verbose v("barrier wakes up all", "stop\n"); + // using a barrier wakes up all waiting fibers at the same time + boost::fibers::barrier barrier( 3); + boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) ); + boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) ); + boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) ); + std::cout << "main: low.join()" << std::endl; + low.join(); + std::cout << "main: medium.join()" << std::endl; + med.join(); + std::cout << "main: high.join()" << std::endl; + hi.join(); + } + + { + Verbose v("change priority", "stop\n"); + // change priority of a fiber in priority_scheduler's ready queue + boost::fibers::barrier barrier( 3); + boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) ); + boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) ); + boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) ); + std::cout << "main: a.join()" << std::endl; + std::cout << "main: a.join()" << std::endl; + a.join(); + std::cout << "main: b.join()" << std::endl; + b.join(); + std::cout << "main: c.join()" << std::endl; + c.join(); + } + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/range_for.cpp b/src/boost/libs/fiber/examples/range_for.cpp new file mode 100644 index 00000000..8a1dbba1 --- /dev/null +++ b/src/boost/libs/fiber/examples/range_for.cpp @@ -0,0 +1,53 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <iostream> +#include <stdexcept> +#include <string> + +#include <boost/fiber/all.hpp> + +typedef boost::fibers::unbuffered_channel< unsigned int > channel_t; + +void foo( channel_t & chan) { + chan.push( 1); + chan.push( 1); + chan.push( 2); + chan.push( 3); + chan.push( 5); + chan.push( 8); + chan.push( 12); + chan.close(); +} + +void bar( channel_t & chan) { + for ( unsigned int value : chan) { + std::cout << value << " "; + } + std::cout << std::endl; +} + +int main() { + try { + channel_t chan; + + boost::fibers::fiber f1( & foo, std::ref( chan) ); + boost::fibers::fiber f2( & bar, std::ref( chan) ); + + f1.join(); + f2.join(); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/segmented_stack.cpp b/src/boost/libs/fiber/examples/segmented_stack.cpp new file mode 100644 index 00000000..ce52ee10 --- /dev/null +++ b/src/boost/libs/fiber/examples/segmented_stack.cpp @@ -0,0 +1,74 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <iostream> +#include <thread> + +#include <boost/assert.hpp> +#include <boost/fiber/all.hpp> + +int count = 384; + +#ifdef BOOST_MSVC //MS VisualStudio +__declspec(noinline) void access( char *buf); +#else // GCC +void access( char *buf) __attribute__ ((noinline)); +#endif +void access( char *buf) +{ + buf[0] = '\0'; +} + +void bar( int i) +{ + char buf[4 * 1024]; + + if ( i > 0) + { + access( buf); + std::cout << i << ". iteration" << std::endl; + bar( i - 1); + } +} + +void foo() +{ + bar( count); + boost::this_fiber::yield(); +} + +void thread_fn() +{ + { + boost::fibers::fiber f( +#if defined(BOOST_USE_SEGMENTED_STACKS) + std::allocator_arg, + boost::fibers::segmented_stack( + boost::fibers::segmented_stack::traits_type::default_size() ), +#endif + foo); + f.join(); + } +} + +int main( int argc, char * argv[]) +{ +#if defined(BOOST_USE_SEGMENTED_STACKS) + std::cout << "using segmented_stack stacks: allocates " << count << " * 4kB == " << 4 * count << "kB on stack, "; + std::cout << "initial stack size = " << boost::fibers::segmented_stack::traits_type::default_size() / 1024 << "kB" << std::endl; + std::cout << "application should not fail" << std::endl; +#else + std::cout << "using standard stacks: allocates " << count << " * 4kB == " << 4 * count << "kB on stack, "; + std::cout << "initial stack size = " << boost::fibers::fixedsize_stack::traits_type::default_size() / 1024 << "kB" << std::endl; + std::cout << "application might fail" << std::endl; +#endif + + std::thread( thread_fn).join(); + + std::cout << "done." << std::endl; + + return 0; +} diff --git a/src/boost/libs/fiber/examples/simple.cpp b/src/boost/libs/fiber/examples/simple.cpp new file mode 100644 index 00000000..07b7b6fd --- /dev/null +++ b/src/boost/libs/fiber/examples/simple.cpp @@ -0,0 +1,39 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstdlib> +#include <iostream> +#include <memory> +#include <string> +#include <thread> + +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/all.hpp> + +inline +void fn( std::string const& str, int n) { + for ( int i = 0; i < n; ++i) { + std::cout << i << ": " << str << std::endl; + boost::this_fiber::yield(); + } +} + +int main() { + try { + boost::fibers::fiber f1( fn, "abc", 5); + std::cerr << "f1 : " << f1.get_id() << std::endl; + f1.join(); + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/wait_stuff.cpp b/src/boost/libs/fiber/examples/wait_stuff.cpp new file mode 100644 index 00000000..00f64788 --- /dev/null +++ b/src/boost/libs/fiber/examples/wait_stuff.cpp @@ -0,0 +1,984 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#include <algorithm> +#include <cassert> +#include <chrono> +#include <iostream> +#include <memory> +#include <sstream> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/variant/variant.hpp> +#include <boost/variant/get.hpp> + +// These are wait_something() functions rather than when_something() +// functions. A big part of the point of the Fiber library is to model +// sequencing using the processor's instruction pointer rather than chains of +// callbacks. The future-oriented when_all() / when_any() functions are still +// based on chains of callbacks. With Fiber, we can do better. + +/***************************************************************************** +* Verbose +*****************************************************************************/ +class Verbose { +public: + Verbose( std::string const& d): + desc( d) { + std::cout << desc << " start" << std::endl; + } + + ~Verbose() { + std::cout << desc << " stop" << std::endl; + } + + Verbose( Verbose const&) = delete; + Verbose & operator=( Verbose const&) = delete; + +private: + const std::string desc; +}; + +/***************************************************************************** +* Runner and Example +*****************************************************************************/ +// collect and ultimately run every Example +class Runner { + typedef std::vector< std::pair< std::string, std::function< void() > > > function_list; + +public: + void add( std::string const& desc, std::function< void() > const& func) { + functions_.push_back( function_list::value_type( desc, func) ); + } + + void run() { + for ( function_list::value_type const& pair : functions_) { + Verbose v( pair.first); + pair.second(); + } + } + +private: + function_list functions_; +}; + +Runner runner; + +// Example allows us to embed Runner::add() calls at module scope +struct Example { + Example( Runner & runner, std::string const& desc, std::function< void() > const& func) { + runner.add( desc, func); + } +}; + +/***************************************************************************** +* example task functions +*****************************************************************************/ +//[wait_sleeper +template< typename T > +T sleeper_impl( T item, int ms, bool thrw = false) { + std::ostringstream descb, funcb; + descb << item; + std::string desc( descb.str() ); + funcb << " sleeper(" << item << ")"; + Verbose v( funcb.str() ); + + boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) ); + if ( thrw) { + throw std::runtime_error( desc); + } + return item; +} +//] + +inline +std::string sleeper( std::string const& item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +double sleeper( double item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +int sleeper(int item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +/***************************************************************************** +* Done +*****************************************************************************/ +//[wait_done +// Wrap canonical pattern for condition_variable + bool flag +struct Done { +private: + boost::fibers::condition_variable cond; + boost::fibers::mutex mutex; + bool ready = false; + +public: + typedef std::shared_ptr< Done > ptr; + + void wait() { + std::unique_lock< boost::fibers::mutex > lock( mutex); + cond.wait( lock, [this](){ return ready; }); + } + + void notify() { + { + std::unique_lock< boost::fibers::mutex > lock( mutex); + ready = true; + } // release mutex + cond.notify_one(); + } +}; +//] + +/***************************************************************************** +* when_any, simple completion +*****************************************************************************/ +//[wait_first_simple_impl +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_first_simple_impl( Done::ptr) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +template< typename Fn, typename ... Fns > +void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) { + boost::fibers::fiber( [done, function](){ + function(); + done->notify(); + }).detach(); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate Done, launch tasks, wait for Done +//[wait_first_simple +template< typename ... Fns > +void wait_first_simple( Fns && ... functions) { + // Use shared_ptr because each function's fiber will bind it separately, + // and we're going to return before the last of them completes. + auto done( std::make_shared< Done >() ); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); + done->wait(); +} +//] + +// example usage +Example wfs( runner, "wait_first_simple()", [](){ +//[wait_first_simple_ex + wait_first_simple( + [](){ sleeper("wfs_long", 150); }, + [](){ sleeper("wfs_medium", 100); }, + [](){ sleeper("wfs_short", 50); }); +//] +}); + +/***************************************************************************** +* when_any, return value +*****************************************************************************/ +// When there's only one function, call this overload +//[wait_first_value_impl +template< typename T, typename Fn > +void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + Fn && function) { + boost::fibers::fiber( [chan, function](){ + // Ignore channel_op_status returned by push(): + // might be closed; we simply don't care. + chan->push( function() ); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_value_impl< T >( chan, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_value_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_first_value +// Assume that all passed functions have the same return type. The return type +// of wait_first_value() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_value( Fn && function, Fns && ... functions) { + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + return value; +} +//] + +// example usage +Example wfv( runner, "wait_first_value()", [](){ +//[wait_first_value_ex + std::string result = wait_first_value( + [](){ return sleeper("wfv_third", 150); }, + [](){ return sleeper("wfv_second", 100); }, + [](){ return sleeper("wfv_first", 50); }); + std::cout << "wait_first_value() => " << result << std::endl; + assert(result == "wfv_first"); +//] +}); + +/***************************************************************************** +* when_any, produce first outcome, whether result or exception +*****************************************************************************/ +// When there's only one function, call this overload. +//[wait_first_outcome_impl +template< typename T, typename CHANP, typename Fn > +void wait_first_outcome_impl( CHANP chan, Fn && function) { + boost::fibers::fiber( + // Use std::bind() here for C++11 compatibility. C++11 lambda capture + // can't move a move-only Fn type, but bind() can. Let bind() move the + // channel pointer and the function into the bound object, passing + // references into the lambda. + std::bind( + []( CHANP & chan, + typename std::decay< Fn >::type & function) { + // Instantiate a packaged_task to capture any exception thrown by + // function. + boost::fibers::packaged_task< T() > task( function); + // Immediately run this packaged_task on same fiber. We want + // function() to have completed BEFORE we push the future. + task(); + // Pass the corresponding future to consumer. Ignore + // channel_op_status returned by push(): might be closed; we + // simply don't care. + chan->push( task.get_future() ); + }, + chan, + std::forward< Fn >( function) + )).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_outcome_impl( CHANP chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_outcome_impl< T >( chan, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_outcome_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +// Assume that all passed functions have the same return type. The return type +// of wait_first_outcome() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_outcome +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_outcome( Fn && function, Fns && ... functions) { + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + auto chanp(std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first future + future_t future( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + // either return value or throw exception + return future.get(); +} +//] + +// example usage +Example wfo( runner, "wait_first_outcome()", [](){ +//[wait_first_outcome_ex + std::string result = wait_first_outcome( + [](){ return sleeper("wfos_first", 50); }, + [](){ return sleeper("wfos_second", 100); }, + [](){ return sleeper("wfos_third", 150); }); + std::cout << "wait_first_outcome(success) => " << result << std::endl; + assert(result == "wfos_first"); + + std::string thrown; + try { + result = wait_first_outcome( + [](){ return sleeper("wfof_first", 50, true); }, + [](){ return sleeper("wfof_second", 100); }, + [](){ return sleeper("wfof_third", 150); }); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_outcome(fail) threw '" << thrown + << "'" << std::endl; + assert(thrown == "wfof_first"); +//] +}); + +/***************************************************************************** +* when_any, collect exceptions until success; throw exception_list if no +* success +*****************************************************************************/ +// define an exception to aggregate exception_ptrs; prefer +// std::exception_list (N4407 et al.) once that becomes available +//[exception_list +class exception_list : public std::runtime_error { +public: + exception_list( std::string const& what) : + std::runtime_error( what) { + } + + typedef std::vector< std::exception_ptr > bundle_t; + + // N4407 proposed std::exception_list API + typedef bundle_t::const_iterator iterator; + + std::size_t size() const noexcept { + return bundle_.size(); + } + + iterator begin() const noexcept { + return bundle_.begin(); + } + + iterator end() const noexcept { + return bundle_.end(); + } + + // extension to populate + void add( std::exception_ptr ep) { + bundle_.push_back( ep); + } + +private: + bundle_t bundle_; +}; +//] + +// Assume that all passed functions have the same return type. The return type +// of wait_first_success() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_success +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_success( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( chanp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // instantiate exception_list, just in case + exception_list exceptions("wait_first_success() produced only errors"); + // retrieve up to 'count' results -- but stop there! + for ( std::size_t i = 0; i < count; ++i) { + // retrieve the next future + future_t future( chanp->value_pop() ); + // retrieve exception_ptr if any + std::exception_ptr error( future.get_exception_ptr() ); + // if no error, then yay, return value + if ( ! error) { + // close the channel: no subsequent push() has to succeed + chanp->close(); + // show caller the value we got + return future.get(); + } + + // error is non-null: collect + exceptions.add( error); + } + // We only arrive here when every passed function threw an exception. + // Throw our collection to inform caller. + throw exceptions; +} +//] + +// example usage +Example wfss( runner, "wait_first_success()", [](){ +//[wait_first_success_ex + std::string result = wait_first_success( + [](){ return sleeper("wfss_first", 50, true); }, + [](){ return sleeper("wfss_second", 100); }, + [](){ return sleeper("wfss_third", 150); }); + std::cout << "wait_first_success(success) => " << result << std::endl; + assert(result == "wfss_second"); +//] + + std::string thrown; + std::size_t count = 0; + try { + result = wait_first_success( + [](){ return sleeper("wfsf_first", 50, true); }, + [](){ return sleeper("wfsf_second", 100, true); }, + [](){ return sleeper("wfsf_third", 150, true); }); + } catch ( exception_list const& e) { + thrown = e.what(); + count = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_success(fail) threw '" << thrown << "': " + << count << " errors" << std::endl; + assert(thrown == "wait_first_success() produced only errors"); + assert(count == 3); +}); + +/***************************************************************************** +* when_any, heterogeneous +*****************************************************************************/ +//[wait_first_value_het +// No need to break out the first Fn for interface function: let the compiler +// complain if empty. +// Our functions have different return types, and we might have to return any +// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in +// parameter pack. +template< typename ... Fns > +boost::variant< typename std::result_of< Fns() >::type ... > +wait_first_value_het( Fns && ... functions) { + // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above. + typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + auto chanp( std::make_shared< channel_t >( 64) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( chanp, + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( chanp->value_pop() ); + // close the channel: no subsequent push() has to succeed + chanp->close(); + return value; +} +//] + +// example usage +Example wfvh( runner, "wait_first_value_het()", [](){ +//[wait_first_value_het_ex + boost::variant< std::string, double, int > result = + wait_first_value_het( + [](){ return sleeper("wfvh_third", 150); }, + [](){ return sleeper(3.14, 100); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_first_value_het() => " << result << std::endl; + assert(boost::get< int >( result) == 17); +//] +}); + +/***************************************************************************** +* when_all, simple completion +*****************************************************************************/ +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +//[wait_all_simple_impl +template< typename Fn, typename ... Fns > +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier, + Fn && function, Fns && ... functions) { + boost::fibers::fiber( + std::bind( + []( std::shared_ptr< boost::fibers::barrier > & barrier, + typename std::decay< Fn >::type & function) mutable { + function(); + barrier->wait(); + }, + barrier, + std::forward< Fn >( function) + )).detach(); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate barrier, launch tasks, wait for barrier +//[wait_all_simple +template< typename ... Fns > +void wait_all_simple( Fns && ... functions) { + std::size_t count( sizeof ... ( functions) ); + // Initialize a barrier(count+1) because we'll immediately wait on it. We + // don't want to wake up until 'count' more fibers wait on it. Even though + // we'll stick around until the last of them completes, use shared_ptr + // anyway because it's easier to be confident about lifespan issues. + auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) ); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); + barrier->wait(); +} +//] + +// example usage +Example was( runner, "wait_all_simple()", [](){ +//[wait_all_simple_ex + wait_all_simple( + [](){ sleeper("was_long", 150); }, + [](){ sleeper("was_medium", 100); }, + [](){ sleeper("was_short", 50); }); +//] +}); + +/***************************************************************************** +* when_all, return values +*****************************************************************************/ +//[wait_nchannel +// Introduce a channel facade that closes the channel once a specific number +// of items has been pushed. This allows an arbitrary consumer to read until +// 'closed' without itself having to count items. +template< typename T > +class nchannel { +public: + nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, + std::size_t lm): + chan_( chan), + limit_( lm) { + assert(chan_); + if ( 0 == limit_) { + chan_->close(); + } + } + + boost::fibers::channel_op_status push( T && va) { + boost::fibers::channel_op_status ok = + chan_->push( std::forward< T >( va) ); + if ( ok == boost::fibers::channel_op_status::success && + --limit_ == 0) { + // after the 'limit_'th successful push, close the channel + chan_->close(); + } + return ok; + } + +private: + std::shared_ptr< boost::fibers::buffered_channel< T > > chan_; + std::size_t limit_; +}; +//] + +// When there's only one function, call this overload +//[wait_all_values_impl +template< typename T, typename Fn > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, + Fn && function) { + boost::fibers::fiber( [chan, function](){ + chan->push(function()); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_all_values_impl< T >( chan, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_all_values_source +// Return a shared_ptr<buffered_channel<T>> from which the caller can +// retrieve each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > > +wait_all_values_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::buffered_channel< return_t > channel_t; + // make the channel + auto chanp( std::make_shared< channel_t >( 64) ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_all_values_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return chanp; +} +//] + +// When all passed functions have completed, return vector<T> containing +// collected results. Assume that all passed functions have the same return +// type. It is simply invalid to pass NO functions. +//[wait_all_values +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_values( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan = + wait_all_values_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // fill results vector + return_t value; + while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { + results.push_back( value); + } + // return vector to caller + return results; +} +//] + +Example wav( runner, "wait_all_values()", [](){ +//[wait_all_values_source_ex + std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan = + wait_all_values_source( + [](){ return sleeper("wavs_third", 150); }, + [](){ return sleeper("wavs_second", 100); }, + [](){ return sleeper("wavs_first", 50); }); + std::string value; + while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { + std::cout << "wait_all_values_source() => '" << value + << "'" << std::endl; + } +//] + +//[wait_all_values_ex + std::vector< std::string > values = + wait_all_values( + [](){ return sleeper("wav_late", 150); }, + [](){ return sleeper("wav_middle", 100); }, + [](){ return sleeper("wav_early", 50); }); +//] + std::cout << "wait_all_values() =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +}); + +/***************************************************************************** +* when_all, throw first exception +*****************************************************************************/ +//[wait_all_until_error_source +// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can +// get() each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< + boost::fibers::buffered_channel< + boost::fibers::future< + typename std::result_of< Fn() >::type > > > +wait_all_until_error_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::buffered_channel< future_t > channel_t; + // make the channel + auto chanp( std::make_shared< channel_t >( 64) ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_first_outcome_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return chanp; +} +//] + +// When all passed functions have completed, return vector<T> containing +// collected results, or throw the first exception thrown by any of the passed +// functions. Assume that all passed functions have the same return type. It +// is simply invalid to pass NO functions. +//[wait_all_until_error +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_until_error( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< + boost::fibers::buffered_channel< future_t > > chan( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results vector + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + results.push_back( future.get() ); + } + // return vector to caller + return results; +} +//] + +Example waue( runner, "wait_all_until_error()", [](){ +//[wait_all_until_error_source_ex + typedef boost::fibers::future< std::string > future_t; + std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan = + wait_all_until_error_source( + [](){ return sleeper("wauess_third", 150); }, + [](){ return sleeper("wauess_second", 100); }, + [](){ return sleeper("wauess_first", 50); }); + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(success) => '" << value + << "'" << std::endl; + } +//] + + chan = wait_all_until_error_source( + [](){ return sleeper("wauesf_third", 150); }, + [](){ return sleeper("wauesf_second", 100, true); }, + [](){ return sleeper("wauesf_first", 50); }); +//[wait_all_until_error_ex + std::string thrown; +//<- + try { + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(fail) => '" << value + << "'" << std::endl; + } + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error_source(fail) threw '" << thrown + << "'" << std::endl; + + thrown.clear(); +//-> + try { + std::vector< std::string > values = wait_all_until_error( + [](){ return sleeper("waue_late", 150); }, + [](){ return sleeper("waue_middle", 100, true); }, + [](){ return sleeper("waue_early", 50); }); +//<- + std::cout << "wait_all_until_error(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +//-> + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error(fail) threw '" << thrown + << "'" << std::endl; +//] +}); + +/***************************************************************************** +* when_all, collect exceptions +*****************************************************************************/ +// When all passed functions have succeeded, return vector<T> containing +// collected results, or throw exception_list containing all exceptions thrown +// by any of the passed functions. Assume that all passed functions have the +// same return type. It is simply invalid to pass NO functions. +//[wait_all_collect_errors +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_collect_errors( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + exception_list exceptions("wait_all_collect_errors() exceptions"); + + // get channel + std::shared_ptr< + boost::fibers::buffered_channel< future_t > > chan( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results and/or exceptions vectors + future_t future; + while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { + std::exception_ptr exp = future.get_exception_ptr(); + if ( ! exp) { + results.push_back( future.get() ); + } else { + exceptions.add( exp); + } + } + // if there were any exceptions, throw + if ( exceptions.size() ) { + throw exceptions; + } + // no exceptions: return vector to caller + return results; +} +//] + +Example wace( runner, "wait_all_collect_errors()", [](){ + std::vector< std::string > values = wait_all_collect_errors( + [](){ return sleeper("waces_late", 150); }, + [](){ return sleeper("waces_middle", 100); }, + [](){ return sleeper("waces_early", 50); }); + std::cout << "wait_all_collect_errors(success) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + + std::string thrown; + std::size_t errors = 0; + try { + values = wait_all_collect_errors( + [](){ return sleeper("wacef_late", 150, true); }, + [](){ return sleeper("wacef_middle", 100, true); }, + [](){ return sleeper("wacef_early", 50); }); + std::cout << "wait_all_collect_errors(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + } catch ( exception_list const& e) { + thrown = e.what(); + errors = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_collect_errors(fail) threw '" << thrown + << "': " << errors << " errors" << std::endl; +}); + +/***************************************************************************** +* when_all, heterogeneous +*****************************************************************************/ +//[wait_all_members_get +template< typename Result, typename ... Futures > +Result wait_all_members_get( Futures && ... futures) { + // Fetch the results from the passed futures into Result's initializer + // list. It's true that the get() calls here will block the implicit + // iteration over futures -- but that doesn't matter because we won't be + // done until the slowest of them finishes anyway. As results are + // processed in argument-list order rather than order of completion, the + // leftmost get() to throw an exception will cause that exception to + // propagate to the caller. + return Result{ futures.get() ... }; +} +//] + +//[wait_all_members +// Explicitly pass Result. This can be any type capable of being initialized +// from the results of the passed functions, such as a struct. +template< typename Result, typename ... Fns > +Result wait_all_members( Fns && ... functions) { + // Run each of the passed functions on a separate fiber, passing all their + // futures to helper function for processing. + return wait_all_members_get< Result >( + boost::fibers::async( std::forward< Fns >( functions) ) ... ); +} +//] + +// used by following example +//[wait_Data +struct Data { + std::string str; + double inexact; + int exact; + + friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=; + ...*/ +//<- + { + return out << "Data{str='" << data.str << "', inexact=" << data.inexact + << ", exact=" << data.exact << "}"; + } +//-> +}; +//] + +// example usage +Example wam( runner, "wait_all_members()", [](){ +//[wait_all_members_data_ex + Data data = wait_all_members< Data >( + [](){ return sleeper("wams_left", 100); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_all_members<Data>(success) => " << data << std::endl; +//] + + std::string thrown; + try { + data = wait_all_members< Data >( + [](){ return sleeper("wamf_left", 100, true); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50, true); }); + std::cout << "wait_all_members<Data>(fail) => " << data << std::endl; + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_members<Data>(fail) threw '" << thrown + << '"' << std::endl; + +//[wait_all_members_vector_ex + // If we don't care about obtaining results as soon as they arrive, and we + // prefer a result vector in passed argument order rather than completion + // order, wait_all_members() is another possible implementation of + // wait_all_until_error(). + auto strings = wait_all_members< std::vector< std::string > >( + [](){ return sleeper("wamv_left", 150); }, + [](){ return sleeper("wamv_middle", 100); }, + [](){ return sleeper("wamv_right", 50); }); + std::cout << "wait_all_members<vector>() =>"; + for ( std::string const& str : strings) { + std::cout << " '" << str << "'"; + } + std::cout << std::endl; +//] +}); + + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + runner.run(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/work_sharing.cpp b/src/boost/libs/fiber/examples/work_sharing.cpp new file mode 100644 index 00000000..f5e31e51 --- /dev/null +++ b/src/boost/libs/fiber/examples/work_sharing.cpp @@ -0,0 +1,130 @@ +// Copyright Nat Goodspeed + Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <condition_variable> +#include <cstddef> +#include <deque> +#include <iomanip> +#include <iostream> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> + +#include <boost/assert.hpp> + +#include <boost/fiber/all.hpp> + +#include <boost/fiber/detail/thread_barrier.hpp> + +static std::size_t fiber_count{ 0 }; +static std::mutex mtx_count{}; +static boost::fibers::condition_variable_any cnd_count{}; +typedef std::unique_lock< std::mutex > lock_type; + +/***************************************************************************** +* example fiber function +*****************************************************************************/ +//[fiber_fn_ws +void whatevah( char me) { + try { + std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/ + { + std::ostringstream buffer; + buffer << "fiber " << me << " started on thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/ + boost::this_fiber::yield(); /*< yield to other fibers >*/ + std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/ + if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/ + my_thread = new_thread; + std::ostringstream buffer; + buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + } + } catch ( ... ) { + } + lock_type lk( mtx_count); + if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/ + lk.unlock(); + cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/ + } +} +//] + +/***************************************************************************** +* example thread function +*****************************************************************************/ +//[thread_fn_ws +void thread( boost::fibers::detail::thread_barrier * b) { + std::ostringstream buffer; + buffer << "thread started " << std::this_thread::get_id() << std::endl; + std::cout << buffer.str() << std::flush; + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*< + Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to + join the work sharing. + >*/ + b->wait(); /*< sync with other threads: allow them to start processing >*/ + lock_type lk( mtx_count); + cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< + Suspend main fiber and resume worker fibers in the meanwhile. + Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) + if all worker fibers are complete. + >*/ + BOOST_ASSERT( 0 == fiber_count); +} +//] + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + std::cout << "main thread started " << std::this_thread::get_id() << std::endl; +//[main_ws + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*< + Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread + too, so each new fiber gets launched into the shared pool. + >*/ + + for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*< + Launch a number of worker fibers; each worker fiber picks up a character + that is passed as parameter to fiber-function `whatevah`. + Each worker fiber gets detached. + >*/ + boost::fibers::fiber([c](){ whatevah( c); }).detach(); + ++fiber_count; /*< Increment fiber counter for each new fiber. >*/ + } + boost::fibers::detail::thread_barrier b( 4); + std::thread threads[] = { /*< + Launch a couple of threads that join the work sharing. + >*/ + std::thread( thread, & b), + std::thread( thread, & b), + std::thread( thread, & b) + }; + b.wait(); /*< sync with other threads: allow them to start processing >*/ + { + lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count); + cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< + Suspend main fiber and resume worker fibers in the meanwhile. + Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) + if all worker fibers are complete. + >*/ + } /*< + Releasing lock of mtx_count is required before joining the threads, otherwise + the other threads would be blocked inside condition_variable::wait() and + would never return (deadlock). + >*/ + BOOST_ASSERT( 0 == fiber_count); + for ( std::thread & t : threads) { /*< wait for threads to terminate >*/ + t.join(); + } +//] + std::cout << "done." << std::endl; + return EXIT_SUCCESS; +} diff --git a/src/boost/libs/fiber/examples/work_stealing.cpp b/src/boost/libs/fiber/examples/work_stealing.cpp new file mode 100644 index 00000000..42c99491 --- /dev/null +++ b/src/boost/libs/fiber/examples/work_stealing.cpp @@ -0,0 +1,126 @@ +// Copyright Nat Goodspeed + Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <condition_variable> +#include <cstddef> +#include <deque> +#include <iomanip> +#include <iostream> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> + +#include <boost/assert.hpp> + +#include <boost/fiber/all.hpp> + +#include <boost/fiber/detail/thread_barrier.hpp> + +static std::size_t fiber_count{ 0 }; +static std::mutex mtx_count{}; +static boost::fibers::condition_variable_any cnd_count{}; +typedef std::unique_lock< std::mutex > lock_type; + +/***************************************************************************** +* example fiber function +*****************************************************************************/ +//[fiber_fn_ws +void whatevah( char me) { + try { + std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/ + { + std::ostringstream buffer; + buffer << "fiber " << me << " started on thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/ + boost::this_fiber::yield(); /*< yield to other fibers >*/ + std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/ + if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/ + my_thread = new_thread; + std::ostringstream buffer; + buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + } + } catch ( ... ) { + } + lock_type lk( mtx_count); + if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/ + lk.unlock(); + cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/ + } +} +//] + +/***************************************************************************** +* example thread function +*****************************************************************************/ +//[thread_fn_ws +void thread() { + std::ostringstream buffer; + buffer << "thread started " << std::this_thread::get_id() << std::endl; + std::cout << buffer.str() << std::flush; + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); /*< + Install the scheduling algorithm `boost::fibers::algo::work_stealing` in order to + join the work sharing. + >*/ + lock_type lk( mtx_count); + cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< + Suspend main fiber and resume worker fibers in the meanwhile. + Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) + if all worker fibers are complete. + >*/ + BOOST_ASSERT( 0 == fiber_count); +} +//] + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + std::cout << "main thread started " << std::this_thread::get_id() << std::endl; +//[main_ws + for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*< + Launch a number of worker fibers; each worker fiber picks up a character + that is passed as parameter to fiber-function `whatevah`. + Each worker fiber gets detached. + >*/ + boost::fibers::fiber([c](){ whatevah( c); }).detach(); + ++fiber_count; /*< Increment fiber counter for each new fiber. >*/ + } + std::thread threads[] = { /*< + Launch a couple of threads that join the work sharing. + >*/ + std::thread( thread), + std::thread( thread), + std::thread( thread) + }; + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); /*< + Install the scheduling algorithm `boost::fibers::algo::work_stealing` in the main thread + too, so each new fiber gets launched into the shared pool. + >*/ + { + lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count); + cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< + Suspend main fiber and resume worker fibers in the meanwhile. + Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) + if all worker fibers are complete. + >*/ + } /*< + Releasing lock of mtx_count is required before joining the threads, otherwise + the other threads would be blocked inside condition_variable::wait() and + would never return (deadlock). + >*/ + BOOST_ASSERT( 0 == fiber_count); + for ( std::thread & t : threads) { /*< wait for threads to terminate >*/ + t.join(); + } +//] + std::cout << "done." << std::endl; + return EXIT_SUCCESS; +} |