summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/fiber/examples/asio/ps/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/boost/libs/fiber/examples/asio/ps/server.cpp')
-rw-r--r--src/boost/libs/fiber/examples/asio/ps/server.cpp381
1 files changed, 381 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/asio/ps/server.cpp b/src/boost/libs/fiber/examples/asio/ps/server.cpp
new file mode 100644
index 000000000..aeb58f236
--- /dev/null
+++ b/src/boost/libs/fiber/examples/asio/ps/server.cpp
@@ -0,0 +1,381 @@
+// Copyright Oliver Kowalke 2015.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <cstddef>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <set>
+#include <iostream>
+#include <string>
+
+#include <boost/asio.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/fiber/all.hpp>
+#include "../round_robin.hpp"
+#include "../yield.hpp"
+
+using boost::asio::ip::tcp;
+
+const std::size_t max_length = 1024;
+
+class subscriber_session;
+typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
+
+// a queue has n subscribers (subscriptions)
+// this class holds a list of subcribers for one queue
+class subscriptions {
+public:
+ ~subscriptions();
+
+ // subscribe to this queue
+ void subscribe( subscriber_session_ptr const& s) {
+ subscribers_.insert( s);
+ }
+
+ // unsubscribe from this queue
+ void unsubscribe( subscriber_session_ptr const& s) {
+ subscribers_.erase(s);
+ }
+
+ // publish a message, e.g. push this message to all subscribers
+ void publish( std::string const& msg);
+
+private:
+ // list of subscribers
+ std::set< subscriber_session_ptr > subscribers_;
+};
+
+// a class to register queues and to subsribe clients to this queues
+class registry : private boost::noncopyable {
+private:
+ typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
+ typedef queues_cont::iterator queues_iter;
+
+ boost::fibers::mutex mtx_;
+ queues_cont queues_;
+
+ void register_queue_( std::string const& queue) {
+ if ( queues_.end() != queues_.find( queue) ) {
+ throw std::runtime_error("queue already exists");
+ }
+ queues_[queue] = std::make_shared< subscriptions >();
+ std::cout << "new queue '" << queue << "' registered" << std::endl;
+ }
+
+ void unregister_queue_( std::string const& queue) {
+ queues_.erase( queue);
+ std::cout << "queue '" << queue << "' unregistered" << std::endl;
+ }
+
+ void subscribe_( std::string const& queue, subscriber_session_ptr s) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() == iter ) {
+ throw std::runtime_error("queue does not exist");
+ }
+ iter->second->subscribe( s);
+ std::cout << "new subscription to queue '" << queue << "'" << std::endl;
+ }
+
+ void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() != iter ) {
+ iter->second->unsubscribe( s);
+ }
+ }
+
+ void publish_( std::string const& queue, std::string const& msg) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() == iter ) {
+ throw std::runtime_error("queue does not exist");
+ }
+ iter->second->publish( msg);
+ std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
+ }
+
+public:
+ // add a queue to registry
+ void register_queue( std::string const& queue) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ register_queue_( queue);
+ }
+
+ // remove a queue from registry
+ void unregister_queue( std::string const& queue) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ unregister_queue_( queue);
+ }
+
+ // subscribe to a queue
+ void subscribe( std::string const& queue, subscriber_session_ptr s) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ subscribe_( queue, s);
+ }
+
+ // unsubscribe from a queue
+ void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ unsubscribe_( queue, s);
+ }
+
+ // publish a message to all subscribers registerd to the queue
+ void publish( std::string const& queue, std::string const& msg) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ publish_( queue, msg);
+ }
+};
+
+// a subscriber subscribes to a given queue in order to receive messages published on this queue
+class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
+public:
+ explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+ socket_( * io_service),
+ reg_( reg) {
+ }
+
+ tcp::socket& socket() {
+ return socket_;
+ }
+
+ // this function is executed inside the fiber
+ void run() {
+ std::string queue;
+ try {
+ boost::system::error_code ec;
+ // read first message == queue name
+ // async_ready() returns if the the complete message is read
+ // until this the fiber is suspended until the complete message
+ // is read int the given buffer 'data'
+ boost::asio::async_read(
+ socket_,
+ boost::asio::buffer( data_),
+ boost::fibers::asio::yield[ec]);
+ if ( ec) {
+ throw std::runtime_error("no queue from subscriber");
+ }
+ // first message ist equal to the queue name the publisher
+ // publishes to
+ queue = data_;
+ // subscribe to new queue
+ reg_.subscribe( queue, shared_from_this() );
+ // read published messages
+ for (;;) {
+ // wait for a conditon-variable for new messages
+ // the fiber will be suspended until the condtion
+ // becomes true and the fiber is resumed
+ // published message is stored in buffer 'data_'
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ cond_.wait( lk);
+ std::string data( data_);
+ lk.unlock();
+ // message '<fini>' terminates subscription
+ if ( "<fini>" == data) {
+ break;
+ }
+ // async. write message to socket connected with
+ // subscriber
+ // async_write() returns if the complete message was writen
+ // the fiber is suspended in the meanwhile
+ boost::asio::async_write(
+ socket_,
+ boost::asio::buffer( data, data.size() ),
+ boost::fibers::asio::yield[ec]);
+ if ( ec == boost::asio::error::eof) {
+ break; //connection closed cleanly by peer
+ } else if ( ec) {
+ throw boost::system::system_error( ec); //some other error
+ }
+ std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
+ }
+ } catch ( std::exception const& e) {
+ std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
+ }
+ // close socket
+ socket_.close();
+ // unregister queue
+ reg_.unsubscribe( queue, shared_from_this() );
+ }
+
+ // called from publisher_session (running in other fiber)
+ void publish( std::string const& msg) {
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ std::memset( data_, '\0', sizeof( data_));
+ std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
+ cond_.notify_one();
+ }
+
+private:
+ tcp::socket socket_;
+ registry & reg_;
+ boost::fibers::mutex mtx_;
+ boost::fibers::condition_variable cond_;
+ // fixed size message
+ char data_[max_length];
+};
+
+
+subscriptions::~subscriptions() {
+ for ( subscriber_session_ptr s : subscribers_) {
+ s->publish("<fini>");
+ }
+}
+
+void
+subscriptions::publish( std::string const& msg) {
+ for ( subscriber_session_ptr s : subscribers_) {
+ s->publish( msg);
+ }
+}
+
+// a publisher publishes messages on its queue
+// subscriber might register to this queue to get the published messages
+class publisher_session : public std::enable_shared_from_this< publisher_session > {
+public:
+ explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+ socket_( * io_service),
+ reg_( reg) {
+ }
+
+ tcp::socket& socket() {
+ return socket_;
+ }
+
+ // this function is executed inside the fiber
+ void run() {
+ std::string queue;
+ try {
+ boost::system::error_code ec;
+ // fixed size message
+ char data[max_length];
+ // read first message == queue name
+ // async_ready() returns if the the complete message is read
+ // until this the fiber is suspended until the complete message
+ // is read int the given buffer 'data'
+ boost::asio::async_read(
+ socket_,
+ boost::asio::buffer( data),
+ boost::fibers::asio::yield[ec]);
+ if ( ec) {
+ throw std::runtime_error("no queue from publisher");
+ }
+ // first message ist equal to the queue name the publisher
+ // publishes to
+ queue = data;
+ // register the new queue
+ reg_.register_queue( queue);
+ // start publishing messages
+ for (;;) {
+ // read message from publisher asyncronous
+ // async_read() suspends this fiber until the complete emssage is read
+ // and stored in the given buffer 'data'
+ boost::asio::async_read(
+ socket_,
+ boost::asio::buffer( data),
+ boost::fibers::asio::yield[ec]);
+ if ( ec == boost::asio::error::eof) {
+ break; //connection closed cleanly by peer
+ } else if ( ec) {
+ throw boost::system::system_error( ec); //some other error
+ }
+ // publish message to all subscribers
+ reg_.publish( queue, std::string( data) );
+ }
+ } catch ( std::exception const& e) {
+ std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
+ }
+ // close socket
+ socket_.close();
+ // unregister queue
+ reg_.unregister_queue( queue);
+ }
+
+private:
+ tcp::socket socket_;
+ registry & reg_;
+};
+
+typedef std::shared_ptr< publisher_session > publisher_session_ptr;
+
+// function accepts connections requests from clients acting as a publisher
+void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
+ unsigned short port,
+ registry & reg) {
+ // create TCP-acceptor
+ tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
+ // loop for accepting connection requests
+ for (;;) {
+ boost::system::error_code ec;
+ // create new publisher-session
+ // this instance will be associated with one publisher
+ publisher_session_ptr new_publisher_session =
+ std::make_shared< publisher_session >( io_service, std::ref( reg) );
+ // async. accept of new connection request
+ // this function will suspend this execution context (fiber) until a
+ // connection was established, after returning from this function a new client (publisher)
+ // is connected
+ acceptor.async_accept(
+ new_publisher_session->socket(),
+ boost::fibers::asio::yield[ec]);
+ if ( ! ec) {
+ // run the new publisher in its own fiber (one fiber for one client)
+ boost::fibers::fiber(
+ std::bind( & publisher_session::run, new_publisher_session) ).detach();
+ }
+ }
+}
+
+// function accepts connections requests from clients acting as a subscriber
+void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
+ unsigned short port,
+ registry & reg) {
+ // create TCP-acceptor
+ tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
+ // loop for accepting connection requests
+ for (;;) {
+ boost::system::error_code ec;
+ // create new subscriber-session
+ // this instance will be associated with one subscriber
+ subscriber_session_ptr new_subscriber_session =
+ std::make_shared< subscriber_session >( io_service, std::ref( reg) );
+ // async. accept of new connection request
+ // this function will suspend this execution context (fiber) until a
+ // connection was established, after returning from this function a new client (subscriber)
+ // is connected
+ acceptor.async_accept(
+ new_subscriber_session->socket(),
+ boost::fibers::asio::yield[ec]);
+ if ( ! ec) {
+ // run the new subscriber in its own fiber (one fiber for one client)
+ boost::fibers::fiber(
+ std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
+ }
+ }
+}
+
+
+int main( int argc, char* argv[]) {
+ try {
+ // create io_service for async. I/O
+ std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
+ // register asio scheduler
+ boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
+ // registry for queues and its subscription
+ registry reg;
+ // create an acceptor for publishers, run it as fiber
+ boost::fibers::fiber(
+ accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
+ // create an acceptor for subscribers, run it as fiber
+ boost::fibers::fiber(
+ accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
+ // dispatch
+ io_service->run();
+ return EXIT_SUCCESS;
+ } catch ( std::exception const& e) {
+ std::cerr << "Exception: " << e.what() << "\n";
+ }
+
+ return EXIT_FAILURE;
+}