diff options
Diffstat (limited to 'src/boost/libs/fiber/examples/asio')
-rw-r--r-- | src/boost/libs/fiber/examples/asio/autoecho.cpp | 261 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/detail/yield.hpp | 328 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/exchange.cpp | 56 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/ps/publisher.cpp | 52 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/ps/server.cpp | 381 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/ps/subscriber.cpp | 53 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/round_robin.hpp | 194 | ||||
-rw-r--r-- | src/boost/libs/fiber/examples/asio/yield.hpp | 63 |
8 files changed, 1388 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/asio/autoecho.cpp b/src/boost/libs/fiber/examples/asio/autoecho.cpp new file mode 100644 index 00000000..06b4027a --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/autoecho.cpp @@ -0,0 +1,261 @@ +// Copyright 2003-2013 Christopher M. Kohlhoff +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <chrono> +#include <cstdlib> +#include <iomanip> +#include <iostream> +#include <map> +#include <memory> +#include <mutex> +#include <sstream> +#include <thread> + +#include <boost/asio.hpp> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> + +#include <boost/fiber/all.hpp> +#include "round_robin.hpp" +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr< tcp::socket > socket_ptr; + +const char* const alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + +/***************************************************************************** +* thread names +*****************************************************************************/ +class ThreadNames { +private: + std::map<std::thread::id, std::string> names_{}; + const char* next_{ alpha }; + std::mutex mtx_{}; + +public: + ThreadNames() = default; + + std::string lookup() { + std::unique_lock<std::mutex> lk( mtx_); + auto this_id( std::this_thread::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + BOOST_ASSERT( *next_); + std::string name(1, *next_++ ); + names_[ this_id ] = name; + return name; + } +}; + +ThreadNames thread_names; + +/***************************************************************************** +* fiber names +*****************************************************************************/ +class FiberNames { +private: + std::map<boost::fibers::fiber::id, std::string> names_{}; + unsigned next_{ 0 }; + boost::fibers::mutex mtx_{}; + +public: + FiberNames() = default; + + std::string lookup() { + std::unique_lock<boost::fibers::mutex> lk( mtx_); + auto this_id( boost::this_fiber::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + std::ostringstream out; + // Bake into the fiber's name the thread name on which we first + // lookup() its ID, to be able to spot when a fiber hops between + // threads. + out << thread_names.lookup() << next_++; + std::string name( out.str() ); + names_[ this_id ] = name; + return name; + } +}; + +FiberNames fiber_names; + +std::string tag() { + std::ostringstream out; + out << "Thread " << thread_names.lookup() << ": " + << std::setw(4) << fiber_names.lookup() << std::setw(0); + return out.str(); +} + +/***************************************************************************** +* message printing +*****************************************************************************/ +void print_( std::ostream& out) { + out << '\n'; +} + +template < typename T, typename... Ts > +void print_( std::ostream& out, T const& arg, Ts const&... args) { + out << arg; + print_(out, args...); +} + +template < typename... T > +void print( T const&... args ) { + std::ostringstream buffer; + print_( buffer, args...); + std::cout << buffer.str() << std::flush; +} + +/***************************************************************************** +* fiber function per server connection +*****************************************************************************/ +void session( socket_ptr sock) { + try { + for (;;) { + char data[max_length]; + boost::system::error_code ec; + std::size_t length = sock->async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), ": handled: ", std::string(data, length)); + boost::asio::async_write( + * sock, + boost::asio::buffer( data, length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + } + print( tag(), ": connection closed"); + } catch ( std::exception const& ex) { + print( tag(), ": caught exception : ", ex.what()); + } +} + +/***************************************************************************** +* listening server +*****************************************************************************/ +void server( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a) { + print( tag(), ": echo-server started"); + try { + for (;;) { + socket_ptr socket( new tcp::socket( * io_svc) ); + boost::system::error_code ec; + a.async_accept( + * socket, + boost::fibers::asio::yield[ec]); + if ( ec) { + throw boost::system::system_error( ec); //some other error + } else { + boost::fibers::fiber( session, socket).detach(); + } + } + } catch ( std::exception const& ex) { + print( tag(), ": caught exception : ", ex.what()); + } + io_svc->stop(); + print( tag(), ": echo-server stopped"); +} + +/***************************************************************************** +* fiber function per client +*****************************************************************************/ +void client( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a, + boost::fibers::barrier& barrier, unsigned iterations) { + print( tag(), ": echo-client started"); + for (unsigned count = 0; count < iterations; ++count) { + tcp::resolver resolver( * io_svc); + tcp::resolver::query query( tcp::v4(), "127.0.0.1", "9999"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( * io_svc); + boost::asio::connect( s, iterator); + for (unsigned msg = 0; msg < 1; ++msg) { + std::ostringstream msgbuf; + msgbuf << "from " << fiber_names.lookup() << " " << count << "." << msg; + std::string message(msgbuf.str()); + print( tag(), ": Sending: ", message); + boost::system::error_code ec; + boost::asio::async_write( + s, + boost::asio::buffer( message), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + char reply[max_length]; + size_t reply_length = s.async_read_some( + boost::asio::buffer( reply, max_length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), ": Reply : ", std::string( reply, reply_length)); + } + } + // done with all iterations, wait for rest of client fibers + if ( barrier.wait()) { + // exactly one barrier.wait() call returns true + // we're the lucky one + a.close(); + print( tag(), ": acceptor stopped"); + } + print( tag(), ": echo-client stopped"); +} + +/***************************************************************************** +* main +*****************************************************************************/ +int main( int argc, char* argv[]) { + try { +//[asio_rr_setup + std::shared_ptr< boost::asio::io_service > io_svc = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); +//] + print( "Thread ", thread_names.lookup(), ": started"); +//[asio_rr_launch_fibers + // server + tcp::acceptor a( * io_svc, tcp::endpoint( tcp::v4(), 9999) ); + boost::fibers::fiber( server, io_svc, std::ref( a) ).detach(); + // client + const unsigned iterations = 2; + const unsigned clients = 3; + boost::fibers::barrier b( clients); + for ( unsigned i = 0; i < clients; ++i) { + boost::fibers::fiber( + client, io_svc, std::ref( a), std::ref( b), iterations).detach(); + } +//] +//[asio_rr_run + io_svc->run(); +//] + print( tag(), ": io_service returned"); + print( "Thread ", thread_names.lookup(), ": stopping"); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + print("Exception: ", e.what(), "\n"); + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/detail/yield.hpp b/src/boost/libs/fiber/examples/asio/detail/yield.hpp new file mode 100644 index 00000000..01b680b5 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/detail/yield.hpp @@ -0,0 +1,328 @@ +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP +#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP + +#include <boost/asio/async_result.hpp> +#include <boost/asio/detail/config.hpp> +#include <boost/asio/handler_type.hpp> +#include <boost/assert.hpp> +#include <boost/atomic.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> +#include <boost/throw_exception.hpp> + +#include <boost/fiber/all.hpp> + +#include <mutex> // std::unique_lock + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +//[fibers_asio_yield_completion +// Bundle a completion bool flag with a spinlock to protect it. +struct yield_completion { + enum state_t { + init, + waiting, + complete + }; + + typedef fibers::detail::spinlock mutex_t; + typedef std::unique_lock< mutex_t > lock_t; + typedef boost::intrusive_ptr< yield_completion > ptr_t; + + std::atomic< std::size_t > use_count_{ 0 }; + mutex_t mtx_{}; + state_t state_{ init }; + + void wait() { + // yield_handler_base::operator()() will set state_ `complete` and + // attempt to wake a suspended fiber. It would be Bad if that call + // happened between our detecting (complete != state_) and suspending. + lock_t lk{ mtx_ }; + // If state_ is already set, we're done here: don't suspend. + if ( complete != state_) { + state_ = waiting; + // suspend(unique_lock<spinlock>) unlocks the lock in the act of + // resuming another fiber + fibers::context::active()->suspend( lk); + } + } + + friend void intrusive_ptr_add_ref( yield_completion * yc) noexcept { + BOOST_ASSERT( nullptr != yc); + yc->use_count_.fetch_add( 1, std::memory_order_relaxed); + } + + friend void intrusive_ptr_release( yield_completion * yc) noexcept { + BOOST_ASSERT( nullptr != yc); + if ( 1 == yc->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); + delete yc; + } + } +}; +//] + +//[fibers_asio_yield_handler_base +// This class encapsulates common elements between yield_handler<T> (capturing +// a value to return from asio async function) and yield_handler<void> (no +// such value). See yield_handler<T> and its <void> specialization below. Both +// yield_handler<T> and yield_handler<void> are passed by value through +// various layers of asio functions. In other words, they're potentially +// copied multiple times. So key data such as the yield_completion instance +// must be stored in our async_result<yield_handler<>> specialization, which +// should be instantiated only once. +class yield_handler_base { +public: + yield_handler_base( yield_t const& y) : + // capture the context* associated with the running fiber + ctx_{ boost::fibers::context::active() }, + // capture the passed yield_t + yt_( y ) { + } + + // completion callback passing only (error_code) + void operator()( boost::system::error_code const& ec) { + BOOST_ASSERT_MSG( ycomp_, + "Must inject yield_completion* " + "before calling yield_handler_base::operator()()"); + BOOST_ASSERT_MSG( yt_.ec_, + "Must inject boost::system::error_code* " + "before calling yield_handler_base::operator()()"); + // If originating fiber is busy testing state_ flag, wait until it + // has observed (completed != state_). + yield_completion::lock_t lk{ ycomp_->mtx_ }; + yield_completion::state_t state = ycomp_->state_; + // Notify a subsequent yield_completion::wait() call that it need not + // suspend. + ycomp_->state_ = yield_completion::complete; + // set the error_code bound by yield_t + * yt_.ec_ = ec; + // unlock the lock that protects state_ + lk.unlock(); + // If ctx_ is still active, e.g. because the async operation + // immediately called its callback (this method!) before the asio + // async function called async_result_base::get(), we must not set it + // ready. + if ( yield_completion::waiting == state) { + // wake the fiber + fibers::context::active()->schedule( ctx_); + } + } + +//private: + boost::fibers::context * ctx_; + yield_t yt_; + // We depend on this pointer to yield_completion, which will be injected + // by async_result. + yield_completion::ptr_t ycomp_{}; +}; +//] + +//[fibers_asio_yield_handler_T +// asio uses handler_type<completion token type, signature>::type to decide +// what to instantiate as the actual handler. Below, we specialize +// handler_type< yield_t, ... > to indicate yield_handler<>. So when you pass +// an instance of yield_t as an asio completion token, asio selects +// yield_handler<> as the actual handler class. +template< typename T > +class yield_handler: public yield_handler_base { +public: + // asio passes the completion token to the handler constructor + explicit yield_handler( yield_t const& y) : + yield_handler_base{ y } { + } + + // completion callback passing only value (T) + void operator()( T t) { + // just like callback passing success error_code + (*this)( boost::system::error_code(), std::move(t) ); + } + + // completion callback passing (error_code, T) + void operator()( boost::system::error_code const& ec, T t) { + BOOST_ASSERT_MSG( value_, + "Must inject value ptr " + "before caling yield_handler<T>::operator()()"); + // move the value to async_result<> instance BEFORE waking up a + // suspended fiber + * value_ = std::move( t); + // forward the call to base-class completion handler + yield_handler_base::operator()( ec); + } + +//private: + // pointer to destination for eventual value + // this must be injected by async_result before operator()() is called + T * value_{ nullptr }; +}; +//] + +//[fibers_asio_yield_handler_void +// yield_handler<void> is like yield_handler<T> without value_. In fact it's +// just like yield_handler_base. +template<> +class yield_handler< void >: public yield_handler_base { +public: + explicit yield_handler( yield_t const& y) : + yield_handler_base{ y } { + } + + // nullary completion callback + void operator()() { + ( * this)( boost::system::error_code() ); + } + + // inherit operator()(error_code) overload from base class + using yield_handler_base::operator(); +}; +//] + +// Specialize asio_handler_invoke hook to ensure that any exceptions thrown +// from the handler are propagated back to the caller +template< typename Fn, typename T > +void asio_handler_invoke( Fn&& fn, yield_handler< T > *) { + fn(); +} + +//[fibers_asio_async_result_base +// Factor out commonality between async_result<yield_handler<T>> and +// async_result<yield_handler<void>> +class async_result_base { +public: + explicit async_result_base( yield_handler_base & h) : + ycomp_{ new yield_completion{} } { + // Inject ptr to our yield_completion instance into this + // yield_handler<>. + h.ycomp_ = this->ycomp_; + // if yield_t didn't bind an error_code, make yield_handler_base's + // error_code* point to an error_code local to this object so + // yield_handler_base::operator() can unconditionally store through + // its error_code* + if ( ! h.yt_.ec_) { + h.yt_.ec_ = & ec_; + } + } + + void get() { + // Unless yield_handler_base::operator() has already been called, + // suspend the calling fiber until that call. + ycomp_->wait(); + // The only way our own ec_ member could have a non-default value is + // if our yield_handler did not have a bound error_code AND the + // completion callback passed a non-default error_code. + if ( ec_) { + throw_exception( boost::system::system_error{ ec_ } ); + } + } + +private: + // If yield_t does not bind an error_code instance, store into here. + boost::system::error_code ec_{}; + yield_completion::ptr_t ycomp_; +}; +//] + +}}}} + +namespace boost { +namespace asio { + +//[fibers_asio_async_result_T +// asio constructs an async_result<> instance from the yield_handler specified +// by handler_type<>::type. A particular asio async method constructs the +// yield_handler, constructs this async_result specialization from it, then +// returns the result of calling its get() method. +template< typename T > +class async_result< boost::fibers::asio::detail::yield_handler< T > > : + public boost::fibers::asio::detail::async_result_base { +public: + // type returned by get() + typedef T type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) : + boost::fibers::asio::detail::async_result_base{ h } { + // Inject ptr to our value_ member into yield_handler<>: result will + // be stored here. + h.value_ = & value_; + } + + // asio async method returns result of calling get() + type get() { + boost::fibers::asio::detail::async_result_base::get(); + return std::move( value_); + } + +private: + type value_{}; +}; +//] + +//[fibers_asio_async_result_void +// Without the need to handle a passed value, our yield_handler<void> +// specialization is just like async_result_base. +template<> +class async_result< boost::fibers::asio::detail::yield_handler< void > > : + public boost::fibers::asio::detail::async_result_base { +public: + typedef void type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h): + boost::fibers::asio::detail::async_result_base{ h } { + } +}; +//] + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts no parameters, +// use yield_handler<void>. +template< typename ReturnType > +struct handler_type< fibers::asio::yield_t, ReturnType() > +{ typedef fibers::asio::detail::yield_handler< void > type; }; + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts a data +// parameter, use yield_handler<parameter type> to return that parameter to +// the caller. +template< typename ReturnType, typename Arg1 > +struct handler_type< fibers::asio::yield_t, ReturnType( Arg1) > +{ typedef fibers::asio::detail::yield_handler< Arg1 > type; }; + +//[asio_handler_type +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts only +// error_code, use yield_handler<void>. yield_handler will take care of the +// error_code one way or another. +template< typename ReturnType > +struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code) > +{ typedef fibers::asio::detail::yield_handler< void > type; }; +//] + +// Handler type specialisation for fibers::asio::yield. +// When 'yield' is passed as a completion handler which accepts a data +// parameter and an error_code, use yield_handler<parameter type> to return +// just the parameter to the caller. yield_handler will take care of the +// error_code one way or another. +template< typename ReturnType, typename Arg2 > +struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code, Arg2) > +{ typedef fibers::asio::detail::yield_handler< Arg2 > type; }; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP diff --git a/src/boost/libs/fiber/examples/asio/exchange.cpp b/src/boost/libs/fiber/examples/asio/exchange.cpp new file mode 100644 index 00000000..8d7482af --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/exchange.cpp @@ -0,0 +1,56 @@ +// Copyright Arnaud Kapp, Oliver Kowalke 2016 +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <iostream> +#include <memory> +#include <thread> + +#include <boost/asio.hpp> + +#include <boost/fiber/all.hpp> +#include "round_robin.hpp" + +std::shared_ptr< boost::fibers::unbuffered_channel< int > > c; + +void foo() { + auto io_ptr = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr); + boost::fibers::fiber([io_ptr](){ + for ( int i = 0; i < 10; ++i) { + std::cout << "push " << i << std::endl; + c->push( i); + } + c->close(); + io_ptr->stop(); + }).detach(); + io_ptr->run(); +} + +void bar() { + auto io_ptr = std::make_shared< boost::asio::io_service >(); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr); + boost::fibers::fiber([io_ptr](){ + try { + for (;;) { + int i = c->value_pop(); + std::cout << "pop " << i << std::endl; + } + } catch ( std::exception const& e) { + std::cout << "exception: " << e.what() << std::endl; + } + io_ptr->stop(); + }).detach(); + io_ptr->run(); +} + +int main() { + c = std::make_shared< boost::fibers::unbuffered_channel< int > >(); + std::thread t1( foo); + std::thread t2( bar); + t2.join(); + t1.join(); + std::cout << "done." << std::endl; + return 0; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/publisher.cpp b/src/boost/libs/fiber/examples/asio/ps/publisher.cpp new file mode 100644 index 00000000..76a63d77 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/publisher.cpp @@ -0,0 +1,52 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include <cstdlib> +#include <cstring> +#include <iostream> + +#include <boost/asio.hpp> + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: publisher <host> <queue>\n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9997"); + tcp::resolver::iterator iterator = resolver.resolve(query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string queue( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, queue.c_str(), queue.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + std::cout << "publish: "; + char request[max_length]; + std::cin.getline( request, max_length); + boost::asio::write( s, boost::asio::buffer( request, max_length) ); + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/server.cpp b/src/boost/libs/fiber/examples/asio/ps/server.cpp new file mode 100644 index 00000000..aeb58f23 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/server.cpp @@ -0,0 +1,381 @@ +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include <cstddef> +#include <cstdlib> +#include <map> +#include <memory> +#include <set> +#include <iostream> +#include <string> + +#include <boost/asio.hpp> +#include <boost/utility.hpp> + +#include <boost/fiber/all.hpp> +#include "../round_robin.hpp" +#include "../yield.hpp" + +using boost::asio::ip::tcp; + +const std::size_t max_length = 1024; + +class subscriber_session; +typedef std::shared_ptr< subscriber_session > subscriber_session_ptr; + +// a queue has n subscribers (subscriptions) +// this class holds a list of subcribers for one queue +class subscriptions { +public: + ~subscriptions(); + + // subscribe to this queue + void subscribe( subscriber_session_ptr const& s) { + subscribers_.insert( s); + } + + // unsubscribe from this queue + void unsubscribe( subscriber_session_ptr const& s) { + subscribers_.erase(s); + } + + // publish a message, e.g. push this message to all subscribers + void publish( std::string const& msg); + +private: + // list of subscribers + std::set< subscriber_session_ptr > subscribers_; +}; + +// a class to register queues and to subsribe clients to this queues +class registry : private boost::noncopyable { +private: + typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont; + typedef queues_cont::iterator queues_iter; + + boost::fibers::mutex mtx_; + queues_cont queues_; + + void register_queue_( std::string const& queue) { + if ( queues_.end() != queues_.find( queue) ) { + throw std::runtime_error("queue already exists"); + } + queues_[queue] = std::make_shared< subscriptions >(); + std::cout << "new queue '" << queue << "' registered" << std::endl; + } + + void unregister_queue_( std::string const& queue) { + queues_.erase( queue); + std::cout << "queue '" << queue << "' unregistered" << std::endl; + } + + void subscribe_( std::string const& queue, subscriber_session_ptr s) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() == iter ) { + throw std::runtime_error("queue does not exist"); + } + iter->second->subscribe( s); + std::cout << "new subscription to queue '" << queue << "'" << std::endl; + } + + void unsubscribe_( std::string const& queue, subscriber_session_ptr s) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() != iter ) { + iter->second->unsubscribe( s); + } + } + + void publish_( std::string const& queue, std::string const& msg) { + queues_iter iter = queues_.find( queue); + if ( queues_.end() == iter ) { + throw std::runtime_error("queue does not exist"); + } + iter->second->publish( msg); + std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl; + } + +public: + // add a queue to registry + void register_queue( std::string const& queue) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + register_queue_( queue); + } + + // remove a queue from registry + void unregister_queue( std::string const& queue) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unregister_queue_( queue); + } + + // subscribe to a queue + void subscribe( std::string const& queue, subscriber_session_ptr s) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + subscribe_( queue, s); + } + + // unsubscribe from a queue + void unsubscribe( std::string const& queue, subscriber_session_ptr s) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unsubscribe_( queue, s); + } + + // publish a message to all subscribers registerd to the queue + void publish( std::string const& queue, std::string const& msg) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + publish_( queue, msg); + } +}; + +// a subscriber subscribes to a given queue in order to receive messages published on this queue +class subscriber_session : public std::enable_shared_from_this< subscriber_session > { +public: + explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : + socket_( * io_service), + reg_( reg) { + } + + tcp::socket& socket() { + return socket_; + } + + // this function is executed inside the fiber + void run() { + std::string queue; + try { + boost::system::error_code ec; + // read first message == queue name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data_), + boost::fibers::asio::yield[ec]); + if ( ec) { + throw std::runtime_error("no queue from subscriber"); + } + // first message ist equal to the queue name the publisher + // publishes to + queue = data_; + // subscribe to new queue + reg_.subscribe( queue, shared_from_this() ); + // read published messages + for (;;) { + // wait for a conditon-variable for new messages + // the fiber will be suspended until the condtion + // becomes true and the fiber is resumed + // published message is stored in buffer 'data_' + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cond_.wait( lk); + std::string data( data_); + lk.unlock(); + // message '<fini>' terminates subscription + if ( "<fini>" == data) { + break; + } + // async. write message to socket connected with + // subscriber + // async_write() returns if the complete message was writen + // the fiber is suspended in the meanwhile + boost::asio::async_write( + socket_, + boost::asio::buffer( data, data.size() ), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + std::cout << "subscriber::run(): '" << data << "' written" << std::endl; + } + } catch ( std::exception const& e) { + std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl; + } + // close socket + socket_.close(); + // unregister queue + reg_.unsubscribe( queue, shared_from_this() ); + } + + // called from publisher_session (running in other fiber) + void publish( std::string const& msg) { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + std::memset( data_, '\0', sizeof( data_)); + std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size())); + cond_.notify_one(); + } + +private: + tcp::socket socket_; + registry & reg_; + boost::fibers::mutex mtx_; + boost::fibers::condition_variable cond_; + // fixed size message + char data_[max_length]; +}; + + +subscriptions::~subscriptions() { + for ( subscriber_session_ptr s : subscribers_) { + s->publish("<fini>"); + } +} + +void +subscriptions::publish( std::string const& msg) { + for ( subscriber_session_ptr s : subscribers_) { + s->publish( msg); + } +} + +// a publisher publishes messages on its queue +// subscriber might register to this queue to get the published messages +class publisher_session : public std::enable_shared_from_this< publisher_session > { +public: + explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : + socket_( * io_service), + reg_( reg) { + } + + tcp::socket& socket() { + return socket_; + } + + // this function is executed inside the fiber + void run() { + std::string queue; + try { + boost::system::error_code ec; + // fixed size message + char data[max_length]; + // read first message == queue name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec) { + throw std::runtime_error("no queue from publisher"); + } + // first message ist equal to the queue name the publisher + // publishes to + queue = data; + // register the new queue + reg_.register_queue( queue); + // start publishing messages + for (;;) { + // read message from publisher asyncronous + // async_read() suspends this fiber until the complete emssage is read + // and stored in the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + // publish message to all subscribers + reg_.publish( queue, std::string( data) ); + } + } catch ( std::exception const& e) { + std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl; + } + // close socket + socket_.close(); + // unregister queue + reg_.unregister_queue( queue); + } + +private: + tcp::socket socket_; + registry & reg_; +}; + +typedef std::shared_ptr< publisher_session > publisher_session_ptr; + +// function accepts connections requests from clients acting as a publisher +void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service, + unsigned short port, + registry & reg) { + // create TCP-acceptor + tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); + // loop for accepting connection requests + for (;;) { + boost::system::error_code ec; + // create new publisher-session + // this instance will be associated with one publisher + publisher_session_ptr new_publisher_session = + std::make_shared< publisher_session >( io_service, std::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (publisher) + // is connected + acceptor.async_accept( + new_publisher_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new publisher in its own fiber (one fiber for one client) + boost::fibers::fiber( + std::bind( & publisher_session::run, new_publisher_session) ).detach(); + } + } +} + +// function accepts connections requests from clients acting as a subscriber +void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service, + unsigned short port, + registry & reg) { + // create TCP-acceptor + tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); + // loop for accepting connection requests + for (;;) { + boost::system::error_code ec; + // create new subscriber-session + // this instance will be associated with one subscriber + subscriber_session_ptr new_subscriber_session = + std::make_shared< subscriber_session >( io_service, std::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (subscriber) + // is connected + acceptor.async_accept( + new_subscriber_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new subscriber in its own fiber (one fiber for one client) + boost::fibers::fiber( + std::bind( & subscriber_session::run, new_subscriber_session) ).detach(); + } + } +} + + +int main( int argc, char* argv[]) { + try { + // create io_service for async. I/O + std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >(); + // register asio scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); + // registry for queues and its subscription + registry reg; + // create an acceptor for publishers, run it as fiber + boost::fibers::fiber( + accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach(); + // create an acceptor for subscribers, run it as fiber + boost::fibers::fiber( + accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach(); + // dispatch + io_service->run(); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp b/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp new file mode 100644 index 00000000..04e583e2 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/ps/subscriber.cpp @@ -0,0 +1,53 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include <cstdlib> +#include <cstring> +#include <iostream> + +#include <boost/asio.hpp> + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: subscriber <host> <queue>\n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9998"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string queue( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, queue.c_str(), queue.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + char reply[max_length]; + size_t reply_length = s.read_some( boost::asio::buffer( reply, max_length) ); + std::cout << "published: "; + std::cout.write( reply, reply_length); + std::cout << std::endl; + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/examples/asio/round_robin.hpp b/src/boost/libs/fiber/examples/asio/round_robin.hpp new file mode 100644 index 00000000..b06bb35c --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/round_robin.hpp @@ -0,0 +1,194 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H +#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H + +#include <chrono> +#include <cstddef> +#include <memory> +#include <mutex> +#include <queue> + +#include <boost/asio.hpp> +#include <boost/assert.hpp> +#include <boost/asio/steady_timer.hpp> +#include <boost/config.hpp> + +#include <boost/fiber/condition_variable.hpp> +#include <boost/fiber/context.hpp> +#include <boost/fiber/mutex.hpp> +#include <boost/fiber/operations.hpp> +#include <boost/fiber/scheduler.hpp> + +#include "yield.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +class round_robin : public algo::algorithm { +private: +//[asio_rr_suspend_timer + std::shared_ptr< boost::asio::io_service > io_svc_; + boost::asio::steady_timer suspend_timer_; +//] + boost::fibers::scheduler::ready_queue_type rqueue_{}; + boost::fibers::mutex mtx_{}; + boost::fibers::condition_variable cnd_{}; + std::size_t counter_{ 0 }; + +public: +//[asio_rr_service_top + struct service : public boost::asio::io_service::service { + static boost::asio::io_service::id id; + + std::unique_ptr< boost::asio::io_service::work > work_; + + service( boost::asio::io_service & io_svc) : + boost::asio::io_service::service( io_svc), + work_{ new boost::asio::io_service::work( io_svc) } { + } + + virtual ~service() {} + + service( service const&) = delete; + service & operator=( service const&) = delete; + + void shutdown_service() override final { + work_.reset(); + } + }; +//] + +//[asio_rr_ctor + round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) : + io_svc_( io_svc), + suspend_timer_( * io_svc_) { + // We use add_service() very deliberately. This will throw + // service_already_exists if you pass the same io_service instance to + // more than one round_robin instance. + boost::asio::add_service( * io_svc_, new service( * io_svc_) ); + io_svc_->post([this]() mutable { +//] +//[asio_rr_service_lambda + while ( ! io_svc_->stopped() ) { + if ( has_ready_fibers() ) { + // run all pending handlers in round_robin + while ( io_svc_->poll() ); + // block this fiber till all pending (ready) fibers are processed + // == round_robin::suspend_until() has been called + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cnd_.wait( lk); + } else { + // run one handler inside io_service + // if no handler available, block this thread + if ( ! io_svc_->run_one() ) { + break; + } + } + } +//] + }); + } + + void awakened( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/ + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + ++counter_; + } + } + + context * pick_next() noexcept { + context * ctx( nullptr); + if ( ! rqueue_.empty() ) { /*< + pop an item from the ready queue + >*/ + ctx = & rqueue_.front(); + rqueue_.pop_front(); + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() != ctx); + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + --counter_; + } + } + return ctx; + } + + bool has_ready_fibers() const noexcept { + return 0 < counter_; + } + +//[asio_rr_suspend_until + void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { + // Set a timer so at least one handler will eventually fire, causing + // run_one() to eventually return. + if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { + // Each expires_at(time_point) call cancels any previous pending + // call. We could inadvertently spin like this: + // dispatcher calls suspend_until() with earliest wake time + // suspend_until() sets suspend_timer_ + // lambda loop calls run_one() + // some other asio handler runs before timer expires + // run_one() returns to lambda loop + // lambda loop yields to dispatcher + // dispatcher finds no ready fibers + // dispatcher calls suspend_until() with SAME wake time + // suspend_until() sets suspend_timer_ to same time, canceling + // previous async_wait() + // lambda loop calls run_one() + // asio calls suspend_timer_ handler with operation_aborted + // run_one() returns to lambda loop... etc. etc. + // So only actually set the timer when we're passed a DIFFERENT + // abs_time value. + suspend_timer_.expires_at( abs_time); + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); + } + cnd_.notify_one(); + } +//] + +//[asio_rr_notify + void notify() noexcept { + // Something has happened that should wake one or more fibers BEFORE + // suspend_timer_ expires. Reset the timer to cause it to fire + // immediately, causing the run_one() call to return. In theory we + // could use cancel() because we don't care whether suspend_timer_'s + // handler is called with operation_aborted or success. However -- + // cancel() doesn't change the expiration time, and we use + // suspend_timer_'s expiration time to decide whether it's already + // set. If suspend_until() set some specific wake time, then notify() + // canceled it, then suspend_until() was called again with the same + // wake time, it would match suspend_timer_'s expiration time and we'd + // refrain from setting the timer. So instead of simply calling + // cancel(), reset the timer, which cancels the pending sleep AND sets + // a new expiration time. This will cause us to spin the loop twice -- + // once for the operation_aborted handler, once for timer expiration + // -- but that shouldn't be a big problem. + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); + suspend_timer_.expires_at( std::chrono::steady_clock::now() ); + } +//] +}; + +boost::asio::io_service::id round_robin::service::id; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H diff --git a/src/boost/libs/fiber/examples/asio/yield.hpp b/src/boost/libs/fiber/examples/asio/yield.hpp new file mode 100644 index 00000000..d76467f9 --- /dev/null +++ b/src/boost/libs/fiber/examples/asio/yield.hpp @@ -0,0 +1,63 @@ +// Copyright 2003-2013 Christopher M. Kohlhoff +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + + +#ifndef BOOST_FIBERS_ASIO_YIELD_HPP +#define BOOST_FIBERS_ASIO_YIELD_HPP + +#include <boost/config.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +//[fibers_asio_yield_t +class yield_t { +public: + yield_t() = default; + + /** + * @code + * static yield_t yield; + * boost::system::error_code myec; + * func(yield[myec]); + * @endcode + * @c yield[myec] returns an instance of @c yield_t whose @c ec_ points + * to @c myec. The expression @c yield[myec] "binds" @c myec to that + * (anonymous) @c yield_t instance, instructing @c func() to store any + * @c error_code it might produce into @c myec rather than throwing @c + * boost::system::system_error. + */ + yield_t operator[]( boost::system::error_code & ec) const { + yield_t tmp; + tmp.ec_ = & ec; + return tmp; + } + +//private: + // ptr to bound error_code instance if any + boost::system::error_code * ec_{ nullptr }; +}; +//] + +//[fibers_asio_yield +// canonical instance +thread_local yield_t yield{}; +//] + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include "detail/yield.hpp" + +#endif // BOOST_FIBERS_ASIO_YIELD_HPP |