summaryrefslogtreecommitdiffstats
path: root/ml/dlib/dlib/bridge/bridge.h
diff options
context:
space:
mode:
Diffstat (limited to 'ml/dlib/dlib/bridge/bridge.h')
-rw-r--r--ml/dlib/dlib/bridge/bridge.h669
1 files changed, 669 insertions, 0 deletions
diff --git a/ml/dlib/dlib/bridge/bridge.h b/ml/dlib/dlib/bridge/bridge.h
new file mode 100644
index 000000000..da4e0bd7e
--- /dev/null
+++ b/ml/dlib/dlib/bridge/bridge.h
@@ -0,0 +1,669 @@
+// Copyright (C) 2011 Davis E. King (davis@dlib.net)
+// License: Boost Software License See LICENSE.txt for the full license.
+#ifndef DLIB_BRIDGe_Hh_
+#define DLIB_BRIDGe_Hh_
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "bridge_abstract.h"
+#include "../pipe.h"
+#include "../threads.h"
+#include "../serialize.h"
+#include "../sockets.h"
+#include "../sockstreambuf.h"
+#include "../logger.h"
+#include "../algs.h"
+
+
+namespace dlib
+{
+
+// ----------------------------------------------------------------------------------------
+
+ struct connect_to_ip_and_port
+ {
+ connect_to_ip_and_port (
+ const std::string& ip_,
+ unsigned short port_
+ ): ip(ip_), port(port_)
+ {
+ // make sure requires clause is not broken
+ DLIB_ASSERT(is_ip_address(ip) && port != 0,
+ "\t connect_to_ip_and_port()"
+ << "\n\t Invalid inputs were given to this function"
+ << "\n\t ip: " << ip
+ << "\n\t port: " << port
+ << "\n\t this: " << this
+ );
+ }
+
+ private:
+ friend class bridge;
+ const std::string ip;
+ const unsigned short port;
+ };
+
+ inline connect_to_ip_and_port connect_to (
+ const network_address& addr
+ )
+ {
+ // make sure requires clause is not broken
+ DLIB_ASSERT(addr.port != 0,
+ "\t connect_to_ip_and_port()"
+ << "\n\t The TCP port to connect to can't be 0."
+ << "\n\t addr.port: " << addr.port
+ );
+
+ if (is_ip_address(addr.host_address))
+ {
+ return connect_to_ip_and_port(addr.host_address, addr.port);
+ }
+ else
+ {
+ std::string ip;
+ if(hostname_to_ip(addr.host_address,ip))
+ throw socket_error(ERESOLVE,"unable to resolve '" + addr.host_address + "' in connect_to()");
+
+ return connect_to_ip_and_port(ip, addr.port);
+ }
+ }
+
+ struct listen_on_port
+ {
+ listen_on_port(
+ unsigned short port_
+ ) : port(port_)
+ {
+ // make sure requires clause is not broken
+ DLIB_ASSERT( port != 0,
+ "\t listen_on_port()"
+ << "\n\t Invalid inputs were given to this function"
+ << "\n\t port: " << port
+ << "\n\t this: " << this
+ );
+ }
+
+ private:
+ friend class bridge;
+ const unsigned short port;
+ };
+
+ template <typename pipe_type>
+ struct bridge_transmit_decoration
+ {
+ bridge_transmit_decoration (
+ pipe_type& p_
+ ) : p(p_) {}
+
+ private:
+ friend class bridge;
+ pipe_type& p;
+ };
+
+ template <typename pipe_type>
+ bridge_transmit_decoration<pipe_type> transmit ( pipe_type& p) { return bridge_transmit_decoration<pipe_type>(p); }
+
+ template <typename pipe_type>
+ struct bridge_receive_decoration
+ {
+ bridge_receive_decoration (
+ pipe_type& p_
+ ) : p(p_) {}
+
+ private:
+ friend class bridge;
+ pipe_type& p;
+ };
+
+ template <typename pipe_type>
+ bridge_receive_decoration<pipe_type> receive ( pipe_type& p) { return bridge_receive_decoration<pipe_type>(p); }
+
+// ----------------------------------------------------------------------------------------
+
+ struct bridge_status
+ {
+ bridge_status() : is_connected(false), foreign_port(0){}
+
+ bool is_connected;
+ unsigned short foreign_port;
+ std::string foreign_ip;
+ };
+
+ inline void serialize ( const bridge_status& , std::ostream& )
+ {
+ throw serialization_error("It is illegal to serialize bridge_status objects.");
+ }
+
+ inline void deserialize ( bridge_status& , std::istream& )
+ {
+ throw serialization_error("It is illegal to serialize bridge_status objects.");
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ namespace impl_brns
+ {
+ class impl_bridge_base
+ {
+ public:
+
+ virtual ~impl_bridge_base() {}
+
+ virtual bridge_status get_bridge_status (
+ ) const = 0;
+ };
+
+ template <
+ typename transmit_pipe_type,
+ typename receive_pipe_type
+ >
+ class impl_bridge : public impl_bridge_base, private noncopyable, private multithreaded_object
+ {
+ /*!
+ CONVENTION
+ - if (list) then
+ - this object is supposed to be listening on the list object for incoming
+ connections when not connected.
+ - else
+ - this object is supposed to be attempting to connect to ip:port when
+ not connected.
+
+ - get_bridge_status() == current_bs
+ !*/
+ public:
+
+ impl_bridge (
+ unsigned short listen_port,
+ transmit_pipe_type* transmit_pipe_,
+ receive_pipe_type* receive_pipe_
+ ) :
+ s(m),
+ receive_thread_active(false),
+ transmit_thread_active(false),
+ port(0),
+ transmit_pipe(transmit_pipe_),
+ receive_pipe(receive_pipe_),
+ dlog("dlib.bridge"),
+ keepalive_code(0),
+ message_code(1)
+ {
+ int status = create_listener(list, listen_port);
+ if (status == PORTINUSE)
+ {
+ std::ostringstream sout;
+ sout << "Error, the port " << listen_port << " is already in use.";
+ throw socket_error(EPORT_IN_USE, sout.str());
+ }
+ else if (status == OTHER_ERROR)
+ {
+ throw socket_error("Unable to create listening socket for an unknown reason.");
+ }
+
+ register_thread(*this, &impl_bridge::transmit_thread);
+ register_thread(*this, &impl_bridge::receive_thread);
+ register_thread(*this, &impl_bridge::connect_thread);
+
+ start();
+ }
+
+ impl_bridge (
+ const std::string ip_,
+ unsigned short port_,
+ transmit_pipe_type* transmit_pipe_,
+ receive_pipe_type* receive_pipe_
+ ) :
+ s(m),
+ receive_thread_active(false),
+ transmit_thread_active(false),
+ port(port_),
+ ip(ip_),
+ transmit_pipe(transmit_pipe_),
+ receive_pipe(receive_pipe_),
+ dlog("dlib.bridge"),
+ keepalive_code(0),
+ message_code(1)
+ {
+ register_thread(*this, &impl_bridge::transmit_thread);
+ register_thread(*this, &impl_bridge::receive_thread);
+ register_thread(*this, &impl_bridge::connect_thread);
+
+ start();
+ }
+
+ ~impl_bridge()
+ {
+ // tell the threads to terminate
+ stop();
+
+ // save current pipe enabled status so we can restore it to however
+ // it was before this destructor ran.
+ bool transmit_enabled = true;
+ bool receive_enabled = true;
+
+ // make any calls blocked on a pipe return immediately.
+ if (transmit_pipe)
+ {
+ transmit_enabled = transmit_pipe->is_dequeue_enabled();
+ transmit_pipe->disable_dequeue();
+ }
+ if (receive_pipe)
+ {
+ receive_enabled = receive_pipe->is_enqueue_enabled();
+ receive_pipe->disable_enqueue();
+ }
+
+ {
+ auto_mutex lock(m);
+ s.broadcast();
+ // Shutdown the connection if we have one. This will cause
+ // all blocked I/O calls to return an error.
+ if (con)
+ con->shutdown();
+ }
+
+ // wait for all the threads to terminate.
+ wait();
+
+ if (transmit_pipe && transmit_enabled)
+ transmit_pipe->enable_dequeue();
+ if (receive_pipe && receive_enabled)
+ receive_pipe->enable_enqueue();
+ }
+
+ bridge_status get_bridge_status (
+ ) const
+ {
+ auto_mutex lock(current_bs_mutex);
+ return current_bs;
+ }
+
+ private:
+
+
+ template <typename pipe_type>
+ typename enable_if<is_convertible<bridge_status, typename pipe_type::type> >::type enqueue_bridge_status (
+ pipe_type* p,
+ const bridge_status& status
+ )
+ {
+ if (p)
+ {
+ typename pipe_type::type temp(status);
+ p->enqueue(temp);
+ }
+ }
+
+ template <typename pipe_type>
+ typename disable_if<is_convertible<bridge_status, typename pipe_type::type> >::type enqueue_bridge_status (
+ pipe_type* ,
+ const bridge_status&
+ )
+ {
+ }
+
+ void connect_thread (
+ )
+ {
+ while (!should_stop())
+ {
+ auto_mutex lock(m);
+ int status = OTHER_ERROR;
+ if (list)
+ {
+ do
+ {
+ status = list->accept(con, 1000);
+ } while (status == TIMEOUT && !should_stop());
+ }
+ else
+ {
+ status = create_connection(con, port, ip);
+ }
+
+ if (should_stop())
+ break;
+
+ if (status != 0)
+ {
+ // The last connection attempt failed. So pause for a little bit before making another attempt.
+ s.wait_or_timeout(2000);
+ continue;
+ }
+
+ dlog << LINFO << "Established new connection to " << con->get_foreign_ip() << ":" << con->get_foreign_port() << ".";
+
+ bridge_status temp_bs;
+ { auto_mutex lock(current_bs_mutex);
+ current_bs.is_connected = true;
+ current_bs.foreign_port = con->get_foreign_port();
+ current_bs.foreign_ip = con->get_foreign_ip();
+ temp_bs = current_bs;
+ }
+ enqueue_bridge_status(receive_pipe, temp_bs);
+
+
+ receive_thread_active = true;
+ transmit_thread_active = true;
+
+ s.broadcast();
+
+ // Wait for the transmit and receive threads to end before we continue.
+ // This way we don't invalidate the con pointer while it is in use.
+ while (receive_thread_active || transmit_thread_active)
+ s.wait();
+
+
+ dlog << LINFO << "Closed connection to " << con->get_foreign_ip() << ":" << con->get_foreign_port() << ".";
+ { auto_mutex lock(current_bs_mutex);
+ current_bs.is_connected = false;
+ current_bs.foreign_port = con->get_foreign_port();
+ current_bs.foreign_ip = con->get_foreign_ip();
+ temp_bs = current_bs;
+ }
+ enqueue_bridge_status(receive_pipe, temp_bs);
+ }
+
+ }
+
+
+ void receive_thread (
+ )
+ {
+ while (true)
+ {
+ // wait until we have a connection
+ { auto_mutex lock(m);
+ while (!receive_thread_active && !should_stop())
+ {
+ s.wait();
+ }
+
+ if (should_stop())
+ break;
+ }
+
+
+
+ try
+ {
+ if (receive_pipe)
+ {
+ sockstreambuf buf(con);
+ std::istream in(&buf);
+ typename receive_pipe_type::type item;
+ // This isn't necessary but doing it avoids a warning about
+ // item being uninitialized sometimes.
+ assign_zero_if_built_in_scalar_type(item);
+
+ while (in.peek() != EOF)
+ {
+ unsigned char code;
+ in.read((char*)&code, sizeof(code));
+ if (code == message_code)
+ {
+ deserialize(item, in);
+ receive_pipe->enqueue(item);
+ }
+ }
+ }
+ else
+ {
+ // Since we don't have a receive pipe to put messages into we will
+ // just read the bytes from the connection and ignore them.
+ char buf[1000];
+ while (con->read(buf, sizeof(buf)) > 0) ;
+ }
+ }
+ catch (std::bad_alloc& )
+ {
+ dlog << LERROR << "std::bad_alloc thrown while deserializing message from "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port();
+ }
+ catch (dlib::serialization_error& e)
+ {
+ dlog << LERROR << "dlib::serialization_error thrown while deserializing message from "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port()
+ << ".\nThe exception error message is: \n" << e.what();
+ }
+ catch (std::exception& e)
+ {
+ dlog << LERROR << "std::exception thrown while deserializing message from "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port()
+ << ".\nThe exception error message is: \n" << e.what();
+ }
+
+
+
+
+ con->shutdown();
+ auto_mutex lock(m);
+ receive_thread_active = false;
+ s.broadcast();
+ }
+
+ auto_mutex lock(m);
+ receive_thread_active = false;
+ s.broadcast();
+ }
+
+ void transmit_thread (
+ )
+ {
+ while (true)
+ {
+ // wait until we have a connection
+ { auto_mutex lock(m);
+ while (!transmit_thread_active && !should_stop())
+ {
+ s.wait();
+ }
+
+ if (should_stop())
+ break;
+ }
+
+
+
+ try
+ {
+ sockstreambuf buf(con);
+ std::ostream out(&buf);
+ typename transmit_pipe_type::type item;
+ // This isn't necessary but doing it avoids a warning about
+ // item being uninitialized sometimes.
+ assign_zero_if_built_in_scalar_type(item);
+
+
+ while (out)
+ {
+ bool dequeue_timed_out = false;
+ if (transmit_pipe )
+ {
+ if (transmit_pipe->dequeue_or_timeout(item,1000))
+ {
+ out.write((char*)&message_code, sizeof(message_code));
+ serialize(item, out);
+ if (transmit_pipe->size() == 0)
+ out.flush();
+
+ continue;
+ }
+
+ dequeue_timed_out = (transmit_pipe->is_enabled() && transmit_pipe->is_dequeue_enabled());
+ }
+
+ // Pause for about a second. Note that we use a wait_or_timeout() call rather
+ // than sleep() here because we want to wake up immediately if this object is
+ // being destructed rather than hang for a second.
+ if (!dequeue_timed_out)
+ {
+ auto_mutex lock(m);
+ if (should_stop())
+ break;
+
+ s.wait_or_timeout(1000);
+ }
+ // Just send the keepalive byte periodically so we can
+ // tell if the connection is alive.
+ out.write((char*)&keepalive_code, sizeof(keepalive_code));
+ out.flush();
+ }
+ }
+ catch (std::bad_alloc& )
+ {
+ dlog << LERROR << "std::bad_alloc thrown while serializing message to "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port();
+ }
+ catch (dlib::serialization_error& e)
+ {
+ dlog << LERROR << "dlib::serialization_error thrown while serializing message to "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port()
+ << ".\nThe exception error message is: \n" << e.what();
+ }
+ catch (std::exception& e)
+ {
+ dlog << LERROR << "std::exception thrown while serializing message to "
+ << con->get_foreign_ip() << ":" << con->get_foreign_port()
+ << ".\nThe exception error message is: \n" << e.what();
+ }
+
+
+
+
+ con->shutdown();
+ auto_mutex lock(m);
+ transmit_thread_active = false;
+ s.broadcast();
+ }
+
+ auto_mutex lock(m);
+ transmit_thread_active = false;
+ s.broadcast();
+ }
+
+ mutex m;
+ signaler s;
+ bool receive_thread_active;
+ bool transmit_thread_active;
+ std::unique_ptr<connection> con;
+ std::unique_ptr<listener> list;
+ const unsigned short port;
+ const std::string ip;
+ transmit_pipe_type* const transmit_pipe;
+ receive_pipe_type* const receive_pipe;
+ logger dlog;
+ const unsigned char keepalive_code;
+ const unsigned char message_code;
+
+ mutex current_bs_mutex;
+ bridge_status current_bs;
+ };
+ }
+
+
+// ----------------------------------------------------------------------------------------
+
+ class bridge : noncopyable
+ {
+ public:
+
+ bridge () {}
+
+ template < typename T, typename U, typename V >
+ bridge (
+ T network_parameters,
+ U pipe1,
+ V pipe2
+ ) { reconfigure(network_parameters,pipe1,pipe2); }
+
+ template < typename T, typename U>
+ bridge (
+ T network_parameters,
+ U pipe
+ ) { reconfigure(network_parameters,pipe); }
+
+
+ void clear (
+ )
+ {
+ pimpl.reset();
+ }
+
+ template < typename T, typename R >
+ void reconfigure (
+ listen_on_port network_parameters,
+ bridge_transmit_decoration<T> transmit_pipe,
+ bridge_receive_decoration<R> receive_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,R>(network_parameters.port, &transmit_pipe.p, &receive_pipe.p)); }
+
+ template < typename T, typename R >
+ void reconfigure (
+ listen_on_port network_parameters,
+ bridge_receive_decoration<R> receive_pipe,
+ bridge_transmit_decoration<T> transmit_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,R>(network_parameters.port, &transmit_pipe.p, &receive_pipe.p)); }
+
+ template < typename T >
+ void reconfigure (
+ listen_on_port network_parameters,
+ bridge_transmit_decoration<T> transmit_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,T>(network_parameters.port, &transmit_pipe.p, 0)); }
+
+ template < typename R >
+ void reconfigure (
+ listen_on_port network_parameters,
+ bridge_receive_decoration<R> receive_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<R,R>(network_parameters.port, 0, &receive_pipe.p)); }
+
+
+
+
+ template < typename T, typename R >
+ void reconfigure (
+ connect_to_ip_and_port network_parameters,
+ bridge_transmit_decoration<T> transmit_pipe,
+ bridge_receive_decoration<R> receive_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,R>(network_parameters.ip, network_parameters.port, &transmit_pipe.p, &receive_pipe.p)); }
+
+ template < typename T, typename R >
+ void reconfigure (
+ connect_to_ip_and_port network_parameters,
+ bridge_receive_decoration<R> receive_pipe,
+ bridge_transmit_decoration<T> transmit_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,R>(network_parameters.ip, network_parameters.port, &transmit_pipe.p, &receive_pipe.p)); }
+
+ template < typename R >
+ void reconfigure (
+ connect_to_ip_and_port network_parameters,
+ bridge_receive_decoration<R> receive_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<R,R>(network_parameters.ip, network_parameters.port, 0, &receive_pipe.p)); }
+
+ template < typename T >
+ void reconfigure (
+ connect_to_ip_and_port network_parameters,
+ bridge_transmit_decoration<T> transmit_pipe
+ ) { pimpl.reset(); pimpl.reset(new impl_brns::impl_bridge<T,T>(network_parameters.ip, network_parameters.port, &transmit_pipe.p, 0)); }
+
+
+ bridge_status get_bridge_status (
+ ) const
+ {
+ if (pimpl)
+ return pimpl->get_bridge_status();
+ else
+ return bridge_status();
+ }
+
+ private:
+
+ std::unique_ptr<impl_brns::impl_bridge_base> pimpl;
+ };
+
+// ----------------------------------------------------------------------------------------
+
+}
+
+#endif // DLIB_BRIDGe_Hh_
+