summaryrefslogtreecommitdiffstats
path: root/src/lib/http/client.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/http/client.cc')
-rw-r--r--src/lib/http/client.cc2062
1 files changed, 2062 insertions, 0 deletions
diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc
new file mode 100644
index 0000000..1f139a3
--- /dev/null
+++ b/src/lib/http/client.cc
@@ -0,0 +1,2062 @@
+// Copyright (C) 2018-2023 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service_thread_pool.h>
+#include <asiolink/tls_socket.h>
+#include <http/client.h>
+#include <http/http_log.h>
+#include <http/http_messages.h>
+#include <http/response_json.h>
+#include <http/response_parser.h>
+#include <util/boost_time_utils.h>
+#include <util/multi_threading_mgr.h>
+#include <util/unlock_guard.h>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/weak_ptr.hpp>
+
+#include <atomic>
+#include <array>
+#include <functional>
+#include <iostream>
+#include <map>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::http;
+using namespace isc::util;
+using namespace boost::posix_time;
+
+namespace ph = std::placeholders;
+
+namespace {
+
+/// @brief Maximum size of the HTTP message that can be logged.
+///
+/// The part of the HTTP message beyond this value is truncated.
+constexpr size_t MAX_LOGGED_MESSAGE_SIZE = 1024;
+
+/// @brief TCP / TLS socket callback function type.
+typedef std::function<void(boost::system::error_code ec, size_t length)>
+SocketCallbackFunction;
+
+/// @brief Socket callback class required by the TCPSocket and TLSSocket APIs.
+///
+/// Its function call operator ignores callbacks invoked with "operation aborted"
+/// error codes. Such status codes are generated when the posted IO operations
+/// are canceled.
+class SocketCallback {
+public:
+
+ /// @brief Constructor.
+ ///
+ /// Stores pointer to a callback function provided by a caller.
+ ///
+ /// @param socket_callback Pointer to a callback function.
+ SocketCallback(SocketCallbackFunction socket_callback)
+ : callback_(socket_callback) {
+ }
+
+ /// @brief Function call operator.
+ ///
+ /// Invokes the callback for all error codes except "operation aborted".
+ ///
+ /// @param ec Error code.
+ /// @param length Length of the data transmitted over the socket.
+ void operator()(boost::system::error_code ec, size_t length = 0) {
+ if (ec.value() == boost::asio::error::operation_aborted) {
+ return;
+ }
+ callback_(ec, length);
+ }
+
+private:
+
+ /// @brief Holds pointer to a supplied callback.
+ SocketCallbackFunction callback_;
+
+};
+
+class ConnectionPool;
+
+/// @brief Shared pointer to a connection pool.
+typedef boost::shared_ptr<ConnectionPool> ConnectionPoolPtr;
+
+/// @brief Client side HTTP connection to the server.
+///
+/// Each connection is established with a unique destination identified by the
+/// specified URL and TLS context. Multiple requests to the same destination
+/// can be sent over the same connection, if the connection is persistent.
+/// If the server closes the TCP connection (e.g. after sending a response),
+/// the connection is closed.
+///
+/// If new request is created while the previous request is still in progress,
+/// the new request is stored in the FIFO queue. The queued requests to the
+/// particular URL are sent to the server when the current transaction ends.
+///
+/// The communication over the transport socket is asynchronous. The caller is
+/// notified about the completion of the transaction via a callback that the
+/// caller supplies when initiating the transaction.
+class Connection : public boost::enable_shared_from_this<Connection> {
+public:
+
+ /// @brief Constructor.
+ ///
+ /// @param io_service IO service to be used for the connection.
+ /// @param tls_context TLS context to be used for the connection.
+ /// @param conn_pool Back pointer to the connection pool to which this
+ /// connection belongs.
+ /// @param url URL associated with this connection.
+ explicit Connection(IOService& io_service,
+ const TlsContextPtr& tls_context,
+ const ConnectionPoolPtr& conn_pool,
+ const Url& url);
+
+ /// @brief Destructor.
+ ~Connection();
+
+ /// @brief Starts new asynchronous transaction (HTTP request and response).
+ ///
+ /// This method expects that all pointers provided as argument are non-null.
+ ///
+ /// @param request Pointer to the request to be sent to the server.
+ /// @param response Pointer to the object into which the response is stored.
+ /// The caller should create a response object of the type which matches the
+ /// content type expected by the caller, e.g. HttpResponseJson when JSON
+ /// content type is expected to be received.
+ /// @param request_timeout Request timeout in milliseconds.
+ /// @param callback Pointer to the callback function to be invoked when the
+ /// transaction completes.
+ /// @param connect_callback Pointer to the callback function to be invoked
+ /// when the client connects to the server.
+ /// @param handshake_callback Optional callback invoked when the client
+ /// performs the TLS handshake with the server.
+ /// @param close_callback Pointer to the callback function to be invoked
+ /// when the client closes the socket to the server.
+ void doTransaction(const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback);
+
+ /// @brief Closes the socket and cancels the request timer.
+ void close();
+
+ /// @brief Checks if a transaction has been initiated over this connection.
+ ///
+ /// @return true if transaction has been initiated, false otherwise.
+ bool isTransactionOngoing() const {
+ return (started_);
+ }
+
+ /// @brief Checks if the socket has been closed.
+ ///
+ /// @return true if the socket has been closed.
+ bool isClosed() const {
+ return (closed_);
+ }
+
+ /// @brief Checks if the peer has closed the idle socket at its side.
+ ///
+ /// If the socket is open but is not usable the peer has closed
+ /// the socket at its side so we close it.
+ void isClosedByPeer();
+
+ /// @brief Checks if a socket descriptor belongs to this connection.
+ ///
+ /// @param socket_fd socket descriptor to check
+ ///
+ /// @return True if the socket fd belongs to this connection.
+ bool isMySocket(int socket_fd) const;
+
+ /// @brief Checks and logs if premature transaction timeout is suspected.
+ ///
+ /// There are cases when the premature timeout occurs, e.g. as a result of
+ /// moving system clock, during the transaction. In such case, the
+ /// @c terminate function is called which resets the transaction state but
+ /// the transaction handlers may be already waiting for the execution.
+ /// Each such handler should call this function to check if the transaction
+ /// it is participating in is still alive. If it is not, it should simply
+ /// return. This method also logs such situation.
+ ///
+ /// @param transid identifier of the transaction for which the handler
+ /// is being invoked. It is compared against the current transaction
+ /// id for this connection.
+ ///
+ /// @return true if the premature timeout is suspected, false otherwise.
+ bool checkPrematureTimeout(const uint64_t transid);
+
+private:
+
+ /// @brief Starts new asynchronous transaction (HTTP request and response).
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// This method expects that all pointers provided as argument are non-null.
+ ///
+ /// @param request Pointer to the request to be sent to the server.
+ /// @param response Pointer to the object into which the response is stored.
+ /// The caller should create a response object of the type which matches the
+ /// content type expected by the caller, e.g. HttpResponseJson when JSON
+ /// content type is expected to be received.
+ /// @param request_timeout Request timeout in milliseconds.
+ /// @param callback Pointer to the callback function to be invoked when the
+ /// transaction completes.
+ /// @param connect_callback Pointer to the callback function to be invoked
+ /// when the client connects to the server.
+ /// @param handshake_callback Optional callback invoked when the client
+ /// performs the TLS handshake with the server.
+ /// @param close_callback Pointer to the callback function to be invoked
+ /// when the client closes the socket to the server.
+ void doTransactionInternal(const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback);
+
+ /// @brief Closes the socket and cancels the request timer.
+ ///
+ /// Should be called in a thread safe context.
+ void closeInternal();
+
+ /// @brief Checks if the peer has closed the socket at its side.
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// If the socket is open but is not usable the peer has closed
+ /// the socket at its side so we close it.
+ void isClosedByPeerInternal();
+
+ /// @brief Checks and logs if premature transaction timeout is suspected.
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// There are cases when the premature timeout occurs, e.g. as a result of
+ /// moving system clock, during the transaction. In such case, the
+ /// @c terminate function is called which resets the transaction state but
+ /// the transaction handlers may be already waiting for the execution.
+ /// Each such handler should call this function to check if the transaction
+ /// it is participating in is still alive. If it is not, it should simply
+ /// return. This method also logs such situation.
+ ///
+ /// @param transid identifier of the transaction for which the handler
+ /// is being invoked. It is compared against the current transaction
+ /// id for this connection.
+ ///
+ /// @return true if the premature timeout is suspected, false otherwise.
+ bool checkPrematureTimeoutInternal(const uint64_t transid);
+
+ /// @brief Resets the state of the object.
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// In particular, it removes instances of objects provided for the previous
+ /// transaction by a caller. It doesn't close the socket, though.
+ void resetState();
+
+ /// @brief Performs tasks required after receiving a response or after an
+ /// error.
+ ///
+ /// This method triggers user's callback, resets the state of the connection
+ /// and initiates next transaction if there is any transaction queued for the
+ /// URL associated with this connection.
+ ///
+ /// @param ec Error code received as a result of the IO operation.
+ /// @param parsing_error Message parsing error.
+ void terminate(const boost::system::error_code& ec,
+ const std::string& parsing_error = "");
+
+ /// @brief Performs tasks required after receiving a response or after an
+ /// error.
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// This method triggers user's callback, resets the state of the connection
+ /// and initiates next transaction if there is any transaction queued for the
+ /// URL associated with this connection.
+ ///
+ /// @param ec Error code received as a result of the IO operation.
+ /// @param parsing_error Message parsing error.
+ void terminateInternal(const boost::system::error_code& ec,
+ const std::string& parsing_error = "");
+
+ /// @brief Run parser and check if more data is needed.
+ ///
+ /// @param ec Error code received as a result of the IO operation.
+ /// @param length Number of bytes received.
+ ///
+ /// @return true if more data is needed, false otherwise.
+ bool runParser(const boost::system::error_code& ec, size_t length);
+
+ /// @brief Run parser and check if more data is needed.
+ ///
+ /// Should be called in a thread safe context.
+ ///
+ /// @param ec Error code received as a result of the IO operation.
+ /// @param length Number of bytes received.
+ ///
+ /// @return true if more data is needed, false otherwise.
+ bool runParserInternal(const boost::system::error_code& ec, size_t length);
+
+ /// @brief This method schedules timer or reschedules existing timer.
+ ///
+ /// @param request_timeout New timer interval in milliseconds.
+ void scheduleTimer(const long request_timeout);
+
+ /// @brief Asynchronously performs the TLS handshake.
+ ///
+ /// The TLS handshake is performed once on TLS sockets.
+ ///
+ /// @param transid Current transaction id.
+ void doHandshake(const uint64_t transid);
+
+ /// @brief Asynchronously sends data over the socket.
+ ///
+ /// The data sent over the socket are stored in the @c buf_.
+ ///
+ /// @param transid Current transaction id.
+ void doSend(const uint64_t transid);
+
+ /// @brief Asynchronously receives data over the socket.
+ ///
+ /// The data received over the socket are store into the @c input_buf_.
+ ///
+ /// @param transid Current transaction id.
+ void doReceive(const uint64_t transid);
+
+ /// @brief Local callback invoked when the connection is established.
+ ///
+ /// If the connection is successfully established, this callback will start
+ /// to asynchronously send the request over the socket or perform the
+ /// TLS handshake with the server.
+ ///
+ /// @param Pointer to the callback to be invoked when client connects to
+ /// the server.
+ /// @param transid Current transaction id.
+ /// @param ec Error code being a result of the connection attempt.
+ void connectCallback(HttpClient::ConnectHandler connect_callback,
+ const uint64_t transid,
+ const boost::system::error_code& ec);
+
+ /// @brief Local callback invoked when the handshake is performed.
+ ///
+ /// If the handshake is successfully performed, this callback will start
+ /// to asynchronously send the request over the socket.
+ ///
+ /// @param Pointer to the callback to be invoked when client performs
+ /// the TLS handshake with the server.
+ /// @param transid Current transaction id.
+ /// @param ec Error code being a result of the connection attempt.
+ void handshakeCallback(HttpClient::HandshakeHandler handshake_callback,
+ const uint64_t transid,
+ const boost::system::error_code& ec);
+
+ /// @brief Local callback invoked when an attempt to send a portion of data
+ /// over the socket has ended.
+ ///
+ /// The portion of data that has been sent is removed from the buffer. If all
+ /// data from the buffer were sent, the callback will start to asynchronously
+ /// receive a response from the server.
+ ///
+ /// @param transid Current transaction id.
+ /// @param ec Error code being a result of sending the data.
+ /// @param length Number of bytes sent.
+ void sendCallback(const uint64_t transid, const boost::system::error_code& ec,
+ size_t length);
+
+ /// @brief Local callback invoked when an attempt to receive a portion of data
+ /// over the socket has ended.
+ ///
+ /// @param transid Current transaction id.
+ /// @param ec Error code being a result of receiving the data.
+ /// @param length Number of bytes received.
+ void receiveCallback(const uint64_t transid, const boost::system::error_code& ec,
+ size_t length);
+
+ /// @brief Local callback invoked when request timeout occurs.
+ void timerCallback();
+
+ /// @brief Local callback invoked when the connection is closed.
+ ///
+ /// Invokes the close callback (if one), passing in the socket's
+ /// descriptor, when the connection's socket about to be closed.
+ /// The callback invocation is wrapped in a try-catch to ensure
+ /// exception safety.
+ ///
+ /// @param clear dictates whether or not the callback is discarded
+ /// after invocation. Defaults to false.
+ void closeCallback(const bool clear = false);
+
+ /// @brief Pointer to the connection pool owning this connection.
+ ///
+ /// This is a weak pointer to avoid circular dependency between the
+ /// Connection and ConnectionPool.
+ boost::weak_ptr<ConnectionPool> conn_pool_;
+
+ /// @brief URL for this connection.
+ Url url_;
+
+ /// @brief TLS context for this connection.
+ TlsContextPtr tls_context_;
+
+ /// @brief TCP socket to be used for this connection.
+ std::unique_ptr<TCPSocket<SocketCallback> > tcp_socket_;
+
+ /// @brief TLS socket to be used for this connection.
+ std::unique_ptr<TLSSocket<SocketCallback> > tls_socket_;
+
+ /// @brief Interval timer used for detecting request timeouts.
+ IntervalTimer timer_;
+
+ /// @brief Holds currently sent request.
+ HttpRequestPtr current_request_;
+
+ /// @brief Holds pointer to an object where response is to be stored.
+ HttpResponsePtr current_response_;
+
+ /// @brief Pointer to the HTTP response parser.
+ HttpResponseParserPtr parser_;
+
+ /// @brief User supplied callback.
+ HttpClient::RequestHandler current_callback_;
+
+ /// @brief Output buffer.
+ std::string buf_;
+
+ /// @brief Input buffer.
+ std::array<char, 32768> input_buf_;
+
+ /// @brief Identifier of the current transaction.
+ uint64_t current_transid_;
+
+ /// @brief User supplied handshake callback.
+ HttpClient::HandshakeHandler handshake_callback_;
+
+ /// @brief User supplied close callback.
+ HttpClient::CloseHandler close_callback_;
+
+ /// @brief Flag to indicate that a transaction is running.
+ std::atomic<bool> started_;
+
+ /// @brief Flag to indicate that the TLS handshake has to be performed.
+ std::atomic<bool> need_handshake_;
+
+ /// @brief Flag to indicate that the socket was closed.
+ std::atomic<bool> closed_;
+
+ /// @brief Mutex to protect the internal state.
+ std::mutex mutex_;
+};
+
+/// @brief Shared pointer to the connection.
+typedef boost::shared_ptr<Connection> ConnectionPtr;
+
+/// @brief Connection pool for managing multiple connections.
+///
+/// Connection pool creates and destroys URL destinations. It manages
+/// connections to and requests for URLs. Each time a request is
+/// submitted for a URL, it assigns it to an available idle connection,
+/// or if no idle connections are available, pushes the request on the queue
+/// for that URL.
+class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
+public:
+
+ /// @brief Constructor.
+ ///
+ /// @param io_service Reference to the IO service to be used by the
+ /// connections.
+ /// @param max_url_connections maximum number of concurrent
+ /// connections allowed per URL.
+ explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
+ : io_service_(io_service), destinations_(), pool_mutex_(),
+ max_url_connections_(max_url_connections) {
+ }
+
+ /// @brief Destructor.
+ ///
+ /// Closes all connections.
+ ~ConnectionPool() {
+ closeAll();
+ }
+
+ /// @brief Process next queued request for the given URL and TLS context.
+ ///
+ /// @param url URL for which next queued request should be processed.
+ /// @param tls_context TLS context for which next queued request
+ /// should be processed.
+ void processNextRequest(const Url& url, const TlsContextPtr& tls_context) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(pool_mutex_);
+ return (processNextRequestInternal(url, tls_context));
+ } else {
+ return (processNextRequestInternal(url, tls_context));
+ }
+ }
+
+ /// @brief Schedule processing of next queued request.
+ ///
+ /// @param url URL for which next queued request should be processed.
+ /// @param tls_context TLS context for which next queued request
+ /// should be processed.
+ void postProcessNextRequest(const Url& url,
+ const TlsContextPtr& tls_context) {
+ io_service_.post(std::bind(&ConnectionPool::processNextRequest,
+ shared_from_this(), url, tls_context));
+ }
+
+ /// @brief Queue next request for sending to the server.
+ ///
+ /// A new transaction is started immediately, if there is no other request
+ /// in progress for the given URL. Otherwise, the request is queued.
+ ///
+ /// @param url Destination where the request should be sent.
+ /// @param tls_context TLS context to be used for the connection.
+ /// @param request Pointer to the request to be sent to the server.
+ /// @param response Pointer to the object into which the response should be
+ /// stored.
+ /// @param request_timeout Requested timeout for the transaction in
+ /// milliseconds.
+ /// @param request_callback Pointer to the user callback to be invoked when the
+ /// transaction ends.
+ /// @param connect_callback Pointer to the user callback to be invoked when the
+ /// client connects to the server.
+ /// @param handshake_callback Optional callback invoked when the client
+ /// performs the TLS handshake with the server.
+ /// @param close_callback Pointer to the user callback to be invoked when the
+ /// client closes the connection to the server.
+ void queueRequest(const Url& url,
+ const TlsContextPtr& tls_context,
+ const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& request_callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(pool_mutex_);
+ return (queueRequestInternal(url, tls_context, request, response,
+ request_timeout, request_callback,
+ connect_callback, handshake_callback,
+ close_callback));
+ } else {
+ return (queueRequestInternal(url, tls_context, request, response,
+ request_timeout, request_callback,
+ connect_callback, handshake_callback,
+ close_callback));
+ }
+ }
+
+ /// @brief Closes all URLs and removes associated information from
+ /// the connection pool.
+ void closeAll() {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(pool_mutex_);
+ closeAllInternal();
+ } else {
+ closeAllInternal();
+ }
+ }
+
+ /// @brief Closes a connection if it has an out-of-band socket event
+ ///
+ /// If the pool contains a connection using the given socket and that
+ /// connection is currently in a transaction the method returns as this
+ /// indicates a normal ready event. If the connection is not in an
+ /// ongoing transaction, then the connection is closed.
+ ///
+ /// This is method is intended to be used to detect and clean up then
+ /// sockets that are marked ready outside of transactions. The most common
+ /// case is the other end of the socket being closed.
+ ///
+ /// @param socket_fd socket descriptor to check
+ void closeIfOutOfBand(int socket_fd) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(pool_mutex_);
+ closeIfOutOfBandInternal(socket_fd);
+ } else {
+ closeIfOutOfBandInternal(socket_fd);
+ }
+ }
+
+private:
+
+ /// @brief Process next queued request for the given URL and TLS context.
+ ///
+ /// This method should be called in a thread safe context.
+ ///
+ /// @param url URL for which next queued request should be retrieved.
+ /// @param tls_context TLS context for which next queued request
+ /// should be processed.
+ void processNextRequestInternal(const Url& url,
+ const TlsContextPtr& tls_context) {
+ // Check if there is a queue for this URL. If there is no queue, there
+ // is no request queued either.
+ DestinationPtr destination = findDestination(url, tls_context);
+ if (destination) {
+ // Remove closed connections.
+ destination->garbageCollectConnections();
+ if (!destination->queueEmpty()) {
+ // We have at least one queued request. Do we have an
+ // idle connection?
+ ConnectionPtr connection = destination->getIdleConnection();
+ if (!connection) {
+ // No idle connections.
+ if (destination->connectionsFull()) {
+ return;
+ }
+ // Room to make another connection with this destination,
+ // so make one.
+ connection.reset(new Connection(io_service_, tls_context,
+ shared_from_this(), url));
+ destination->addConnection(connection);
+ }
+
+ // Dequeue the oldest request and start a transaction for it using
+ // the idle connection.
+ RequestDescriptor desc = destination->popNextRequest();
+ connection->doTransaction(desc.request_, desc.response_,
+ desc.request_timeout_, desc.callback_,
+ desc.connect_callback_,
+ desc.handshake_callback_,
+ desc.close_callback_);
+ }
+ }
+ }
+
+ /// @brief Queue next request for sending to the server.
+ ///
+ /// A new transaction is started immediately, if there is no other request
+ /// in progress for the given URL. Otherwise, the request is queued.
+ ///
+ /// This method should be called in a thread safe context.
+ ///
+ /// @param url Destination where the request should be sent.
+ /// @param tls_context TLS context to be used for the connection.
+ /// @param request Pointer to the request to be sent to the server.
+ /// @param response Pointer to the object into which the response should be
+ /// stored.
+ /// @param request_timeout Requested timeout for the transaction in
+ /// milliseconds.
+ /// @param request_callback Pointer to the user callback to be invoked when the
+ /// transaction ends.
+ /// @param connect_callback Pointer to the user callback to be invoked when the
+ /// client connects to the server.
+ /// @param handshake_callback Optional callback invoked when the client
+ /// performs the TLS handshake with the server.
+ /// @param close_callback Pointer to the user callback to be invoked when the
+ /// client closes the connection to the server.
+ void queueRequestInternal(const Url& url,
+ const TlsContextPtr& tls_context,
+ const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& request_callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ ConnectionPtr connection;
+ // Find the destination for the requested URL.
+ DestinationPtr destination = findDestination(url, tls_context);
+ if (destination) {
+ // Remove closed connections.
+ destination->garbageCollectConnections();
+ // Found it, look for an idle connection.
+ connection = destination->getIdleConnection();
+ } else {
+ // Doesn't exist yet so it's a new destination.
+ destination = addDestination(url, tls_context);
+ }
+
+ if (!connection) {
+ if (destination->connectionsFull()) {
+ // All connections busy, queue it.
+ destination->pushRequest(RequestDescriptor(request, response,
+ request_timeout,
+ request_callback,
+ connect_callback,
+ handshake_callback,
+ close_callback));
+ return;
+ }
+
+ // Room to make another connection with this destination, so make one.
+ connection.reset(new Connection(io_service_, tls_context,
+ shared_from_this(), url));
+ destination->addConnection(connection);
+ }
+
+ // Use the connection to start the transaction.
+ connection->doTransaction(request, response, request_timeout, request_callback,
+ connect_callback, handshake_callback, close_callback);
+ }
+
+ /// @brief Closes all connections for all URLs and removes associated
+ /// information from the connection pool.
+ ///
+ /// This method should be called in a thread safe context.
+ void closeAllInternal() {
+ for (auto const& destination : destinations_) {
+ destination.second->closeAllConnections();
+ }
+
+ destinations_.clear();
+ }
+
+ /// @brief Closes a connection if it has an out-of-band socket event
+ ///
+ /// If the pool contains a connection using the given socket and that
+ /// connection is currently in a transaction the method returns as this
+ /// indicates a normal ready event. If the connection is not in an
+ /// ongoing transaction, then the connection is closed.
+ ///
+ /// This is method is intended to be used to detect and clean up then
+ /// sockets that are marked ready outside of transactions. The most common
+ /// case is the other end of the socket being closed.
+ ///
+ /// This method should be called in a thread safe context.
+ ///
+ /// @param socket_fd socket descriptor to check
+ void closeIfOutOfBandInternal(int socket_fd) {
+ for (auto const& destination : destinations_) {
+ // First we look for a connection with the socket.
+ ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
+ if (connection) {
+ if (!connection->isTransactionOngoing()) {
+ // Socket has no transaction, so any ready event is
+ // out-of-band (other end probably closed), so
+ // let's close it. Note we do not remove any queued
+ // requests, as this might somehow be occurring in
+ // between them.
+ destination.second->closeConnection(connection);
+ }
+
+ return;
+ }
+ }
+ }
+
+ /// @brief Request descriptor holds parameters associated with the
+ /// particular request.
+ struct RequestDescriptor {
+ /// @brief Constructor.
+ ///
+ /// @param request Pointer to the request to be sent.
+ /// @param response Pointer to the object into which the response will
+ /// be stored.
+ /// @param request_timeout Requested timeout for the transaction.
+ /// @param callback Pointer to the user callback.
+ /// @param connect_callback pointer to the user callback to be invoked
+ /// when the client connects to the server.
+ /// @param handshake_callback Optional callback invoked when the client
+ /// performs the TLS handshake with the server.
+ /// @param close_callback pointer to the user callback to be invoked
+ /// when the client closes the connection to the server.
+ RequestDescriptor(const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long& request_timeout,
+ const HttpClient::RequestHandler& callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback)
+ : request_(request), response_(response),
+ request_timeout_(request_timeout), callback_(callback),
+ connect_callback_(connect_callback),
+ handshake_callback_(handshake_callback),
+ close_callback_(close_callback) {
+ }
+
+ /// @brief Holds pointer to the request.
+ HttpRequestPtr request_;
+
+ /// @brief Holds pointer to the response.
+ HttpResponsePtr response_;
+
+ /// @brief Holds requested timeout value.
+ long request_timeout_;
+
+ /// @brief Holds pointer to the user callback.
+ HttpClient::RequestHandler callback_;
+
+ /// @brief Holds pointer to the user callback for connect.
+ HttpClient::ConnectHandler connect_callback_;
+
+ /// @brief Holds pointer to the user callback for handshake.
+ HttpClient::HandshakeHandler handshake_callback_;
+
+ /// @brief Holds pointer to the user callback for close.
+ HttpClient::CloseHandler close_callback_;
+ };
+
+ /// @brief Type of URL and TLS context pairs.
+ typedef std::pair<Url, TlsContextPtr> DestinationDescriptor;
+
+ /// @brief Encapsulates connections and requests for a given URL
+ class Destination {
+ public:
+ /// @brief Number of queued requests allowed without warnings being emitted.
+ const size_t QUEUE_SIZE_THRESHOLD = 2048;
+ /// @brief Interval between queue size warnings.
+ const int QUEUE_WARN_SECS = 5;
+
+ /// @brief Constructor
+ ///
+ /// @param url server URL of this destination
+ /// @param tls_context server TLS context of this destination
+ /// @param max_connections maximum number of concurrent connections
+ /// allowed for in the list URL
+ Destination(Url const& url, TlsContextPtr tls_context, size_t max_connections)
+ : url_(url), tls_context_(tls_context),
+ max_connections_(max_connections), connections_(), queue_(),
+ last_queue_warn_time_(min_date_time), last_queue_size_(0) {
+ }
+
+ /// @brief Destructor
+ ~Destination() {
+ closeAllConnections();
+ }
+
+ /// @brief Adds a new connection
+ ///
+ /// @param connection the connection to add
+ ///
+ /// @throw BadValue if the maximum number of connections already
+ /// exist.
+ /// @note This should be called in a thread safe context.
+ void addConnection(ConnectionPtr connection) {
+ if (connectionsFull()) {
+ isc_throw(BadValue, "URL: " << url_.toText()
+ << ", already at maximum connections: "
+ << max_connections_);
+ }
+
+ connections_.push_back(connection);
+ }
+
+ /// @brief Closes a connection and removes it from the list.
+ ///
+ /// @param connection the connection to remove
+ /// @note This should be called in a thread safe context.
+ void closeConnection(ConnectionPtr connection) {
+ for (auto it = connections_.begin(); it != connections_.end(); ++it) {
+ if (*it == connection) {
+ (*it)->close();
+ connections_.erase(it);
+ break;
+ }
+ }
+ }
+
+ /// @brief Closes all connections and clears the list.
+ /// @note This should be called in a thread safe context.
+ void closeAllConnections() {
+ // Flush the queue.
+ while (!queue_.empty()) {
+ queue_.pop();
+ }
+
+ for (auto const& connection : connections_) {
+ connection->close();
+ }
+
+ connections_.clear();
+ }
+
+ /// @brief Removes closed connections.
+ ///
+ /// This method should be called before @ref getIdleConnection.
+ ///
+ /// In a first step it closes not usable idle connections
+ /// (idle means no current transaction and not closed,
+ /// usable means the peer side did not close it at that time).
+ /// In a second step it removes (collects) closed connections.
+ ///
+ /// @note a connection is closed when the transaction is finished
+ /// and the connection is persistent, or when the connection was
+ /// idle and the first step of the garbage collector detects that
+ /// it was closed by peer, so is not usable.
+ ///
+ /// @note there are two races here:
+ /// - the peer side closes the connection after the first step
+ /// - a not persistent connection finishes its transaction and
+ /// closes
+ /// The second race is avoided by setting the closed flag before
+ /// the started flag and by unconditionally posting a process next
+ /// request action.
+ ///
+ /// @note This should be called in a thread safe context.
+ void garbageCollectConnections() {
+ for (auto it = connections_.begin(); it != connections_.end();) {
+ (*it)->isClosedByPeer();
+ if (!(*it)->isClosed()) {
+ ++it;
+ } else {
+ it = connections_.erase(it);
+ }
+ }
+ }
+
+ /// @brief Finds the first idle connection.
+ ///
+ /// Iterates over the existing connections and returns the
+ /// first connection which is not currently in a transaction and
+ /// is not closed.
+ ///
+ /// @note @ref garbageCollectConnections should be called before.
+ /// This removes connections which were closed at that time.
+ ///
+ /// @return The first idle connection or an empty pointer if
+ /// all connections are busy or closed.
+ ConnectionPtr getIdleConnection() {
+ for (auto const& connection : connections_) {
+ if (!connection->isTransactionOngoing() &&
+ !connection->isClosed()) {
+ return (connection);
+ }
+ }
+
+ return (ConnectionPtr());
+ }
+
+ /// @brief Find a connection by its socket descriptor.
+ ///
+ /// @param socket_fd socket descriptor to find
+ ///
+ /// @return The connection or an empty pointer if no matching
+ /// connection exists.
+ ConnectionPtr findBySocketFd(int socket_fd) {
+ for (auto const& connection : connections_) {
+ if (connection->isMySocket(socket_fd)) {
+ return (connection);
+ }
+ }
+
+ return (ConnectionPtr());
+ }
+
+ /// @brief Indicates if there are no connections in the list.
+ ///
+ /// @return true if the list is empty.
+ bool connectionsEmpty() {
+ return (connections_.empty());
+ }
+
+ /// @brief Indicates if list contains the maximum number.
+ ///
+ /// @return true if the list is full.
+ bool connectionsFull() {
+ return (connections_.size() >= max_connections_);
+ }
+
+ /// @brief Fetches the number of connections in the list.
+ ///
+ /// @return the number of connections in the list.
+ size_t connectionCount() {
+ return (connections_.size());
+ }
+
+ /// @brief Fetches the maximum number of connections.
+ ///
+ /// @return the maxim number of connections.
+ size_t getMaxConnections() const {
+ return (max_connections_);
+ }
+
+ /// @brief Indicates if request queue is empty.
+ ///
+ /// @return true if there are no requests queued.
+ bool queueEmpty() const {
+ return (queue_.empty());
+ }
+
+ /// @brief Adds a request to the end of the request queue.
+ ///
+ /// If the size of the queue exceeds a threshold and appears
+ /// to be growing it will emit a warning log.
+ ///
+ /// @param desc RequestDescriptor to queue.
+ void pushRequest(RequestDescriptor const& desc) {
+ queue_.push(desc);
+ size_t size = queue_.size();
+ // If the queue size is larger than the threshold and growing, issue a
+ // periodic warning.
+ if ((size > QUEUE_SIZE_THRESHOLD) && (size > last_queue_size_)) {
+ ptime now = microsec_clock::universal_time();
+ if ((now - last_queue_warn_time_) > seconds(QUEUE_WARN_SECS)) {
+ LOG_WARN(http_logger, HTTP_CLIENT_QUEUE_SIZE_GROWING)
+ .arg(url_.toText())
+ .arg(size);
+ // Remember the last time we warned.
+ last_queue_warn_time_ = now;
+ }
+ }
+
+ // Remember the previous size.
+ last_queue_size_ = size;
+ }
+
+ /// @brief Removes a request from the front of the request queue.
+ ///
+ /// @return desc RequestDescriptor of the removed request.
+ RequestDescriptor popNextRequest() {
+ if (queue_.empty()) {
+ isc_throw(InvalidOperation, "cannot pop, queue is empty");
+ }
+
+ RequestDescriptor desc = queue_.front();
+ queue_.pop();
+ return (desc);
+ }
+
+ private:
+ /// @brief URL supported by this destination.
+ Url url_;
+
+ /// @brief TLS context to use with this destination.
+ TlsContextPtr tls_context_;
+
+ /// @brief Maximum number of concurrent connections for this destination.
+ size_t max_connections_;
+
+ /// @brief List of concurrent connections.
+ std::list<ConnectionPtr> connections_;
+
+ /// @brief Holds the queue of request for this destination.
+ std::queue<RequestDescriptor> queue_;
+
+ /// @brief Time the last queue size warning was issued.
+ ptime last_queue_warn_time_;
+
+ /// @brief Size of the queue after last push.
+ size_t last_queue_size_;
+ };
+
+ /// @brief Pointer to a Destination.
+ typedef boost::shared_ptr<Destination> DestinationPtr;
+
+ /// @brief Creates a new destination for the given URL and TLS context.
+ ///
+ /// @param url URL of the new destination.
+ /// @param tls_context TLS context for the new destination.
+ ///
+ /// @return Pointer to the newly created destination.
+ /// @note Must be called from within a thread-safe context.
+ DestinationPtr addDestination(const Url& url,
+ const TlsContextPtr& tls_context) {
+ const DestinationDescriptor& desc = std::make_pair(url, tls_context);
+ DestinationPtr destination(new Destination(url, tls_context,
+ max_url_connections_));
+ destinations_[desc] = destination;
+ return (destination);
+ }
+
+ /// @brief Fetches a destination by URL and TLS context.
+ ///
+ /// @param url URL of the destination desired.
+ /// @param tls_context TLS context for the destination desired.
+ ///
+ /// @return pointer the desired destination, empty pointer
+ /// if the destination does not exist.
+ /// @note Must be called from within a thread-safe context.
+ DestinationPtr findDestination(const Url& url,
+ const TlsContextPtr& tls_context) const {
+ const DestinationDescriptor& desc = std::make_pair(url, tls_context);
+ auto it = destinations_.find(desc);
+ if (it != destinations_.end()) {
+ return (it->second);
+ }
+
+ return (DestinationPtr());
+ }
+
+ /// @brief Removes a destination by URL and TLS context.
+ ///
+ /// Closes all of the destination's connections and
+ /// discards all of its queued requests while removing
+ /// the destination from the list of known destinations.
+ ///
+ /// @note not used yet.
+ ///
+ /// @param url URL of the destination to be removed.
+ /// @param tls_context TLS context for the destination to be removed.
+ /// @note Must be called from within a thread-safe context.
+ void removeDestination(const Url& url,
+ const TlsContextPtr& tls_context) {
+ const DestinationDescriptor& desc = std::make_pair(url, tls_context);
+ auto it = destinations_.find(desc);
+ if (it != destinations_.end()) {
+ it->second->closeAllConnections();
+ destinations_.erase(it);
+ }
+ }
+
+ /// @brief A reference to the IOService that drives socket IO.
+ IOService& io_service_;
+
+ /// @brief Map of Destinations by URL and TLS context.
+ std::map<DestinationDescriptor, DestinationPtr> destinations_;
+
+ /// @brief Mutex to protect the internal state.
+ std::mutex pool_mutex_;
+
+ /// @brief Maximum number of connections per URL and TLS context.
+ size_t max_url_connections_;
+};
+
+Connection::Connection(IOService& io_service,
+ const TlsContextPtr& tls_context,
+ const ConnectionPoolPtr& conn_pool,
+ const Url& url)
+ : conn_pool_(conn_pool), url_(url), tls_context_(tls_context),
+ tcp_socket_(), tls_socket_(), timer_(io_service),
+ current_request_(), current_response_(), parser_(),
+ current_callback_(), buf_(), input_buf_(), current_transid_(0),
+ close_callback_(), started_(false), need_handshake_(false),
+ closed_(false) {
+ if (!tls_context) {
+ tcp_socket_.reset(new asiolink::TCPSocket<SocketCallback>(io_service));
+ } else {
+ tls_socket_.reset(new asiolink::TLSSocket<SocketCallback>(io_service,
+ tls_context));
+ need_handshake_ = true;
+ }
+}
+
+Connection::~Connection() {
+ close();
+}
+
+void
+Connection::resetState() {
+ started_ = false;
+ current_request_.reset();
+ current_response_.reset();
+ parser_.reset();
+ current_callback_ = HttpClient::RequestHandler();
+}
+
+void
+Connection::closeCallback(const bool clear) {
+ if (close_callback_) {
+ try {
+ if (tcp_socket_) {
+ close_callback_(tcp_socket_->getNative());
+ } else if (tls_socket_) {
+ close_callback_(tls_socket_->getNative());
+ } else {
+ isc_throw(Unexpected,
+ "internal error: can't find a socket to close");
+ }
+ } catch (...) {
+ LOG_ERROR(http_logger, HTTP_CONNECTION_CLOSE_CALLBACK_FAILED);
+ }
+ }
+
+ if (clear) {
+ close_callback_ = HttpClient::CloseHandler();
+ }
+}
+
+void
+Connection::isClosedByPeer() {
+ // This method applies only to idle connections.
+ if (started_ || closed_) {
+ return;
+ }
+ // This code was guarded by a lock so keep this.
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ isClosedByPeerInternal();
+ } else {
+ isClosedByPeerInternal();
+ }
+}
+
+void
+Connection::isClosedByPeerInternal() {
+ // If the socket is open we check if it is possible to transmit
+ // the data over this socket by reading from it with message
+ // peeking. If the socket is not usable, we close it and then
+ // re-open it. There is a narrow window of time between checking
+ // the socket usability and actually transmitting the data over
+ // this socket, when the peer may close the connection. In this
+ // case we'll need to re-transmit but we don't handle it here.
+ if (tcp_socket_) {
+ if (tcp_socket_->getASIOSocket().is_open() &&
+ !tcp_socket_->isUsable()) {
+ closeCallback();
+ closed_ = true;
+ tcp_socket_->close();
+ }
+ } else if (tls_socket_) {
+ if (tls_socket_->getASIOSocket().is_open() &&
+ !tls_socket_->isUsable()) {
+ closeCallback();
+ closed_ = true;
+ tls_socket_->close();
+ }
+ } else {
+ isc_throw(Unexpected, "internal error: can't find the sending socket");
+ }
+}
+
+void
+Connection::doTransaction(const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ doTransactionInternal(request, response, request_timeout,
+ callback, connect_callback, handshake_callback,
+ close_callback);
+ } else {
+ doTransactionInternal(request, response, request_timeout,
+ callback, connect_callback, handshake_callback,
+ close_callback);
+ }
+}
+
+void
+Connection::doTransactionInternal(const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ try {
+ started_ = true;
+ current_request_ = request;
+ current_response_ = response;
+ parser_.reset(new HttpResponseParser(*current_response_));
+ parser_->initModel();
+ current_callback_ = callback;
+ handshake_callback_ = handshake_callback;
+ close_callback_ = close_callback;
+
+ // Starting new transaction. Generate new transaction id.
+ ++current_transid_;
+
+ buf_ = request->toString();
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_DETAIL,
+ HTTP_CLIENT_REQUEST_SEND)
+ .arg(request->toBriefString())
+ .arg(url_.toText());
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_DETAIL_DATA,
+ HTTP_CLIENT_REQUEST_SEND_DETAILS)
+ .arg(url_.toText())
+ .arg(HttpMessageParserBase::logFormatHttpMessage(request->toString(),
+ MAX_LOGGED_MESSAGE_SIZE));
+
+ // Setup request timer.
+ scheduleTimer(request_timeout);
+
+ /// @todo We're getting a hostname but in fact it is expected to be an IP address.
+ /// We should extend the TCPEndpoint to also accept names. Currently, it will fall
+ /// over for names.
+ TCPEndpoint endpoint(url_.getStrippedHostname(),
+ static_cast<unsigned short>(url_.getPort()));
+ SocketCallback socket_cb(std::bind(&Connection::connectCallback, shared_from_this(),
+ connect_callback, current_transid_,
+ ph::_1));
+
+ // Establish new connection or use existing connection.
+ if (tcp_socket_) {
+ tcp_socket_->open(&endpoint, socket_cb);
+ return;
+ }
+ if (tls_socket_) {
+ tls_socket_->open(&endpoint, socket_cb);
+ return;
+ }
+
+ // Should never reach this point.
+ isc_throw(Unexpected, "internal error: can't find a socket to open");
+
+ } catch (const std::exception& ex) {
+ // Re-throw with the expected exception type.
+ isc_throw(HttpClientError, ex.what());
+ }
+}
+
+void
+Connection::close() {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ return (closeInternal());
+ } else {
+ return (closeInternal());
+ }
+}
+
+void
+Connection::closeInternal() {
+ // Pass in true to discard the callback.
+ closeCallback(true);
+
+ closed_ = true;
+ timer_.cancel();
+ if (tcp_socket_) {
+ tcp_socket_->close();
+ }
+ if (tls_socket_) {
+ tls_socket_->close();
+ }
+
+ resetState();
+}
+
+bool
+Connection::isMySocket(int socket_fd) const {
+ if (tcp_socket_) {
+ return (tcp_socket_->getNative() == socket_fd);
+ } else if (tls_socket_) {
+ return (tls_socket_->getNative() == socket_fd);
+ }
+ // Should never reach this point.
+ std::cerr << "internal error: can't find my socket\n";
+ return (false);
+}
+
+bool
+Connection::checkPrematureTimeout(const uint64_t transid) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ return (checkPrematureTimeoutInternal(transid));
+ } else {
+ return (checkPrematureTimeoutInternal(transid));
+ }
+}
+
+bool
+Connection::checkPrematureTimeoutInternal(const uint64_t transid) {
+ // If there is no transaction but the handlers are invoked it means
+ // that the last transaction in the queue timed out prematurely.
+ // Also, if there is a transaction in progress but the ID of that
+ // transaction doesn't match the one associated with the handler it,
+ // also means that the transaction timed out prematurely.
+ if (!isTransactionOngoing() || (transid != current_transid_)) {
+ LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED)
+ .arg(isTransactionOngoing())
+ .arg(transid)
+ .arg(current_transid_);
+ return (true);
+ }
+
+ return (false);
+}
+
+void
+Connection::terminate(const boost::system::error_code& ec,
+ const std::string& parsing_error) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ terminateInternal(ec, parsing_error);
+ } else {
+ terminateInternal(ec, parsing_error);
+ }
+}
+
+void
+Connection::terminateInternal(const boost::system::error_code& ec,
+ const std::string& parsing_error) {
+ HttpResponsePtr response;
+ if (isTransactionOngoing()) {
+
+ timer_.cancel();
+ if (tcp_socket_) {
+ tcp_socket_->cancel();
+ }
+ if (tls_socket_) {
+ tls_socket_->cancel();
+ }
+
+ if (!ec && current_response_->isFinalized()) {
+ response = current_response_;
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC,
+ HTTP_SERVER_RESPONSE_RECEIVED)
+ .arg(url_.toText());
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
+ HTTP_SERVER_RESPONSE_RECEIVED_DETAILS)
+ .arg(url_.toText())
+ .arg(parser_ ?
+ parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+ "[HttpResponseParser is null]");
+
+ } else {
+ std::string err = parsing_error.empty() ? ec.message() :
+ parsing_error;
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC,
+ HTTP_BAD_SERVER_RESPONSE_RECEIVED)
+ .arg(url_.toText())
+ .arg(err);
+
+ // Only log the details if we have received anything and tried
+ // to parse it.
+ if (!parsing_error.empty()) {
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
+ HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS)
+ .arg(url_.toText())
+ .arg(parser_ ?
+ parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+ "[HttpResponseParser is null]");
+ }
+ }
+
+ try {
+ // The callback should take care of its own exceptions but one
+ // never knows.
+ if (MultiThreadingMgr::instance().getMode()) {
+ UnlockGuard<std::mutex> lock(mutex_);
+ current_callback_(ec, response, parsing_error);
+ } else {
+ current_callback_(ec, response, parsing_error);
+ }
+ } catch (...) {
+ }
+
+ // If we're not requesting connection persistence or the
+ // connection has timed out, we should close the socket.
+ if (!closed_ &&
+ (!current_request_->isPersistent() ||
+ (ec == boost::asio::error::timed_out))) {
+ closeInternal();
+ }
+
+ resetState();
+ }
+
+ // Check if there are any requests queued for this destination and start
+ // another transaction if there is at least one.
+ ConnectionPoolPtr conn_pool = conn_pool_.lock();
+ if (conn_pool) {
+ conn_pool->postProcessNextRequest(url_, tls_context_);
+ }
+}
+
+void
+Connection::scheduleTimer(const long request_timeout) {
+ if (request_timeout > 0) {
+ timer_.setup(std::bind(&Connection::timerCallback, this), request_timeout,
+ IntervalTimer::ONE_SHOT);
+ }
+}
+
+void
+Connection::doHandshake(const uint64_t transid) {
+ // Skip the handshake if it is not needed.
+ if (!need_handshake_) {
+ doSend(transid);
+ return;
+ }
+
+ SocketCallback socket_cb(std::bind(&Connection::handshakeCallback,
+ shared_from_this(),
+ handshake_callback_,
+ transid,
+ ph::_1));
+ try {
+ tls_socket_->handshake(socket_cb);
+
+ } catch (...) {
+ terminate(boost::asio::error::not_connected);
+ }
+}
+
+void
+Connection::doSend(const uint64_t transid) {
+ SocketCallback socket_cb(std::bind(&Connection::sendCallback,
+ shared_from_this(),
+ transid,
+ ph::_1,
+ ph::_2));
+ try {
+ if (tcp_socket_) {
+ tcp_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
+ return;
+ }
+
+ if (tls_socket_) {
+ tls_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
+ return;
+ }
+
+ // Should never reach this point.
+ std::cerr << "internal error: can't find a socket to send to\n";
+ isc_throw(Unexpected,
+ "internal error: can't find a socket to send to");
+ } catch (...) {
+ terminate(boost::asio::error::not_connected);
+ }
+}
+
+void
+Connection::doReceive(const uint64_t transid) {
+ TCPEndpoint endpoint;
+ SocketCallback socket_cb(std::bind(&Connection::receiveCallback,
+ shared_from_this(),
+ transid,
+ ph::_1,
+ ph::_2));
+ try {
+ if (tcp_socket_) {
+ tcp_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
+ input_buf_.size(), 0,
+ &endpoint, socket_cb);
+ return;
+ }
+ if (tls_socket_) {
+ tls_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
+ input_buf_.size(), 0,
+ &endpoint, socket_cb);
+ return;
+ }
+ // Should never reach this point.
+ std::cerr << "internal error: can't find a socket to receive from\n";
+ isc_throw(Unexpected,
+ "internal error: can't find a socket to receive from");
+
+ } catch (...) {
+ terminate(boost::asio::error::not_connected);
+ }
+}
+
+void
+Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
+ const uint64_t transid,
+ const boost::system::error_code& ec) {
+ if (checkPrematureTimeout(transid)) {
+ return;
+ }
+
+ // Run user defined connect callback if specified.
+ if (connect_callback) {
+ // If the user defined callback indicates that the connection
+ // should not be continued.
+ if (tcp_socket_) {
+ if (!connect_callback(ec, tcp_socket_->getNative())) {
+ return;
+ }
+ } else if (tls_socket_) {
+ if (!connect_callback(ec, tls_socket_->getNative())) {
+ return;
+ }
+ } else {
+ // Should never reach this point.
+ std::cerr << "internal error: can't find a socket to connect\n";
+ }
+ }
+
+ if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
+ return;
+
+ // In some cases the "in progress" status code may be returned. It doesn't
+ // indicate an error. Sending the request over the socket is expected to
+ // be successful. Getting such status appears to be highly dependent on
+ // the operating system.
+ } else if (ec &&
+ (ec.value() != boost::asio::error::in_progress) &&
+ (ec.value() != boost::asio::error::already_connected)) {
+ terminate(ec);
+
+ } else {
+ // Start the TLS handshake asynchronously.
+ doHandshake(transid);
+ }
+}
+
+void
+Connection::handshakeCallback(HttpClient::ConnectHandler handshake_callback,
+ const uint64_t transid,
+ const boost::system::error_code& ec) {
+ need_handshake_ = false;
+ if (checkPrematureTimeout(transid)) {
+ return;
+ }
+
+ // Run user defined handshake callback if specified.
+ if (handshake_callback) {
+ // If the user defined callback indicates that the connection
+ // should not be continued.
+ if (tls_socket_) {
+ if (!handshake_callback(ec, tls_socket_->getNative())) {
+ return;
+ }
+ } else {
+ // Should never reach this point.
+ std::cerr << "internal error: can't find TLS socket\n";
+ }
+ }
+
+ if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
+ return;
+ } else if (ec) {
+ terminate(ec);
+
+ } else {
+ // Start sending the request asynchronously.
+ doSend(transid);
+ }
+}
+
+void
+Connection::sendCallback(const uint64_t transid,
+ const boost::system::error_code& ec,
+ size_t length) {
+ if (checkPrematureTimeout(transid)) {
+ return;
+ }
+
+ if (ec) {
+ if (ec.value() == boost::asio::error::operation_aborted) {
+ return;
+
+ // EAGAIN and EWOULDBLOCK don't really indicate an error. The length
+ // should be 0 in this case but let's be sure.
+ } else if ((ec.value() == boost::asio::error::would_block) ||
+ (ec.value() == boost::asio::error::try_again)) {
+ length = 0;
+
+ } else {
+ // Any other error should cause the transaction to terminate.
+ terminate(ec);
+ return;
+ }
+ }
+
+ // Sending is in progress, so push back the timeout.
+ scheduleTimer(timer_.getInterval());
+
+ // If any data have been sent, remove it from the buffer and only leave the
+ // portion that still has to be sent.
+ if (length > 0) {
+ buf_.erase(0, length);
+ }
+
+ // If there is no more data to be sent, start receiving a response. Otherwise,
+ // continue sending.
+ if (buf_.empty()) {
+ doReceive(transid);
+
+ } else {
+ doSend(transid);
+ }
+}
+
+void
+Connection::receiveCallback(const uint64_t transid,
+ const boost::system::error_code& ec,
+ size_t length) {
+ if (checkPrematureTimeout(transid)) {
+ return;
+ }
+
+ if (ec) {
+ if (ec.value() == boost::asio::error::operation_aborted) {
+ return;
+
+ // EAGAIN and EWOULDBLOCK don't indicate an error in this case. All
+ // other errors should terminate the transaction.
+ }
+ if ((ec.value() != boost::asio::error::try_again) &&
+ (ec.value() != boost::asio::error::would_block)) {
+ terminate(ec);
+ return;
+
+ } else {
+ // For EAGAIN and EWOULDBLOCK the length should be 0 anyway, but let's
+ // make sure.
+ length = 0;
+ }
+ }
+
+ // Receiving is in progress, so push back the timeout.
+ scheduleTimer(timer_.getInterval());
+
+ if (runParser(ec, length)) {
+ doReceive(transid);
+ }
+}
+
+bool
+Connection::runParser(const boost::system::error_code& ec, size_t length) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ return (runParserInternal(ec, length));
+ } else {
+ return (runParserInternal(ec, length));
+ }
+}
+
+bool
+Connection::runParserInternal(const boost::system::error_code& ec,
+ size_t length) {
+ // If we have received any data, let's feed the parser with it.
+ if (length != 0) {
+ parser_->postBuffer(static_cast<void*>(input_buf_.data()), length);
+ parser_->poll();
+ }
+
+ // If the parser still needs data, let's schedule another receive.
+ if (parser_->needData()) {
+ return (true);
+
+ } else if (parser_->httpParseOk()) {
+ // No more data needed and parsing has been successful so far. Let's
+ // try to finalize the response parsing.
+ try {
+ current_response_->finalize();
+ terminateInternal(ec);
+
+ } catch (const std::exception& ex) {
+ // If there is an error here, we need to return the error message.
+ terminateInternal(ec, ex.what());
+ }
+
+ } else {
+ // Parsing was unsuccessful. Let's pass the error message held in the
+ // parser.
+ terminateInternal(ec, parser_->getErrorMessage());
+ }
+
+ return (false);
+}
+
+void
+Connection::timerCallback() {
+ // Request timeout occurred.
+ terminate(boost::asio::error::timed_out);
+}
+
+}
+
+namespace isc {
+namespace http {
+
+/// @brief HttpClient implementation.
+class HttpClientImpl {
+public:
+ /// @brief Constructor.
+ ///
+ /// If single-threading:
+ /// - Creates the connection pool passing in the caller's IOService
+ /// and a maximum number of connections per URL value of 1.
+ /// If multi-threading:
+ /// - Creates a private IOService
+ /// - Creates a thread pool with the thread_pool_size threads
+ /// - Creates the connection pool passing the private IOService
+ /// and the thread_pool_size as the maximum number of connections
+ /// per URL.
+ ///
+ /// @param io_service IOService that will drive connection IO in single
+ /// threaded mode. (Currently ignored in multi-threaded mode)
+ /// @param thread_pool_size maximum number of concurrent threads
+ /// Internally this also sets the maximum number of concurrent connections
+ /// per URL.
+ /// @param defer_thread_start When true, starting of the pool threads is
+ /// deferred until a subsequent call to @ref start(). In this case the
+ /// pool's operational state after construction is STOPPED. Otherwise,
+ /// the thread pool threads will be created and started, with the
+ /// operational state being RUNNING. Applicable only when thread-pool size
+ /// is greater than zero.
+ HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
+ bool defer_thread_start = false)
+ : thread_pool_size_(thread_pool_size), thread_pool_() {
+ if (thread_pool_size_ > 0) {
+ // Create our own private IOService.
+ thread_io_service_.reset(new IOService());
+
+ // Create the connection pool. Note that we use the thread_pool_size
+ // as the maximum connections per URL value.
+ conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
+
+ // Create the thread pool.
+ thread_pool_.reset(new IoServiceThreadPool(thread_io_service_, thread_pool_size_,
+ defer_thread_start));
+
+ LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC, HTTP_CLIENT_MT_STARTED)
+ .arg(thread_pool_size_);
+ } else {
+ // Single-threaded mode: use the caller's IOService,
+ // one connection per URL.
+ conn_pool_.reset(new ConnectionPool(io_service, 1));
+ }
+ }
+
+ /// @brief Destructor
+ ///
+ /// Calls stop().
+ ~HttpClientImpl() {
+ stop();
+ }
+
+ /// @brief Check if the current thread can perform thread pool state
+ /// transition.
+ ///
+ /// @throw MultiThreadingInvalidOperation if the state transition is done on
+ /// any of the worker threads.
+ void checkPermissions() {
+ if (thread_pool_) {
+ thread_pool_->checkPausePermissions();
+ }
+ }
+
+ /// @brief Starts running the client's thread pool, if multi-threaded.
+ void start() {
+ if (thread_pool_) {
+ thread_pool_->run();
+ }
+ }
+
+ /// @brief Close all connections, and if multi-threaded, stops the client's
+ /// thread pool.
+ void stop() {
+ // Close all the connections.
+ conn_pool_->closeAll();
+
+ // Stop the thread pool.
+ if (thread_pool_) {
+ thread_pool_->stop();
+ }
+ }
+
+ /// @brief Pauses the client's thread pool.
+ ///
+ /// Suspends thread pool event processing.
+ /// @throw InvalidOperation if the thread pool does not exist.
+ void pause() {
+ if (!thread_pool_) {
+ isc_throw(InvalidOperation, "HttpClient::pause - no thread pool");
+ }
+
+ // Pause the thread pool.
+ thread_pool_->pause();
+ }
+
+ /// @brief Resumes running the client's thread pool.
+ ///
+ /// Resumes thread pool event processing.
+ /// @throw InvalidOperation if the thread pool does not exist.
+ void resume() {
+ if (!thread_pool_) {
+ isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
+ }
+
+ // Resume running the thread pool.
+ thread_pool_->run();
+ }
+
+ /// @brief Indicates if the thread pool is running.
+ ///
+ /// @return True if the thread pool exists and it is in the RUNNING state,
+ /// false otherwise.
+ bool isRunning() {
+ if (thread_pool_) {
+ return (thread_pool_->isRunning());
+ }
+
+ return (false);
+ }
+
+ /// @brief Indicates if the thread pool is stopped.
+ ///
+ /// @return True if the thread pool exists and it is in the STOPPED state,
+ /// false otherwise.
+ bool isStopped() {
+ if (thread_pool_) {
+ return (thread_pool_->isStopped());
+ }
+
+ return (false);
+ }
+
+ /// @brief Indicates if the thread pool is paused.
+ ///
+ /// @return True if the thread pool exists and it is in the PAUSED state,
+ /// false otherwise.
+ bool isPaused() {
+ if (thread_pool_) {
+ return (thread_pool_->isPaused());
+ }
+
+ return (false);
+ }
+
+ /// @brief Fetches the internal IOService used in multi-threaded mode.
+ ///
+ /// @return A pointer to the IOService, or an empty pointer when
+ /// in single-threaded mode.
+ asiolink::IOServicePtr getThreadIOService() {
+ return (thread_io_service_);
+ };
+
+ /// @brief Fetches the maximum size of the thread pool.
+ ///
+ /// @return the maximum size of the thread pool.
+ uint16_t getThreadPoolSize() {
+ return (thread_pool_size_);
+ }
+
+ /// @brief Fetches the number of threads in the pool.
+ ///
+ /// @return the number of running threads.
+ uint16_t getThreadCount() {
+ if (!thread_pool_) {
+ return (0);
+ }
+ return (thread_pool_->getThreadCount());
+ }
+
+ /// @brief Holds a pointer to the connection pool.
+ ConnectionPoolPtr conn_pool_;
+
+private:
+
+ /// @brief Maxim number of threads in the thread pool.
+ size_t thread_pool_size_;
+
+ /// @brief Pointer to private IOService used in multi-threaded mode.
+ asiolink::IOServicePtr thread_io_service_;
+
+ /// @brief Pool of threads used to service connections in multi-threaded
+ /// mode.
+ IoServiceThreadPoolPtr thread_pool_;
+};
+
+HttpClient::HttpClient(IOService& io_service, bool multi_threading_enabled,
+ size_t thread_pool_size, bool defer_thread_start) {
+ if (!multi_threading_enabled && thread_pool_size) {
+ isc_throw(InvalidOperation,
+ "HttpClient thread_pool_size must be zero "
+ "when Kea core multi-threading is disabled");
+ }
+
+ impl_.reset(new HttpClientImpl(io_service, thread_pool_size,
+ defer_thread_start));
+}
+
+HttpClient::~HttpClient() {
+}
+
+void
+HttpClient::asyncSendRequest(const Url& url,
+ const TlsContextPtr& tls_context,
+ const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const HttpClient::RequestHandler& request_callback,
+ const HttpClient::RequestTimeout& request_timeout,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::HandshakeHandler& handshake_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ if (!url.isValid()) {
+ isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
+ }
+
+ if ((url.getScheme() == Url::Scheme::HTTPS) && !tls_context) {
+ isc_throw(HttpClientError, "HTTPS URL scheme but no TLS context");
+ }
+
+ if (!request) {
+ isc_throw(HttpClientError, "HTTP request must not be null");
+ }
+
+ if (!response) {
+ isc_throw(HttpClientError, "HTTP response must not be null");
+ }
+
+ if (!request_callback) {
+ isc_throw(HttpClientError, "callback for HTTP transaction must not be null");
+ }
+
+ impl_->conn_pool_->queueRequest(url, tls_context, request, response,
+ request_timeout.value_,
+ request_callback, connect_callback,
+ handshake_callback, close_callback);
+}
+
+void
+HttpClient::closeIfOutOfBand(int socket_fd) {
+ return (impl_->conn_pool_->closeIfOutOfBand(socket_fd));
+}
+
+void
+HttpClient::start() {
+ impl_->start();
+}
+
+void
+HttpClient::checkPermissions() {
+ impl_->checkPermissions();
+}
+
+void
+HttpClient::pause() {
+ impl_->pause();
+}
+
+void
+HttpClient::resume() {
+ impl_->resume();
+}
+
+void
+HttpClient::stop() {
+ impl_->stop();
+}
+
+const IOServicePtr
+HttpClient::getThreadIOService() const {
+ return (impl_->getThreadIOService());
+}
+
+uint16_t
+HttpClient::getThreadPoolSize() const {
+ return (impl_->getThreadPoolSize());
+}
+
+uint16_t
+HttpClient::getThreadCount() const {
+ return (impl_->getThreadCount());
+}
+
+bool
+HttpClient::isRunning() {
+ return (impl_->isRunning());
+}
+
+bool
+HttpClient::isStopped() {
+ return (impl_->isStopped());
+}
+
+bool
+HttpClient::isPaused() {
+ return (impl_->isPaused());
+}
+
+} // end of namespace isc::http
+} // end of namespace isc