diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:15:43 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:15:43 +0000 |
commit | f5f56e1a1c4d9e9496fcb9d81131066a964ccd23 (patch) | |
tree | 49e44c6f87febed37efb953ab5485aa49f6481a7 /src/lib/http/tests/client_mt_unittests.cc | |
parent | Initial commit. (diff) | |
download | isc-kea-f5f56e1a1c4d9e9496fcb9d81131066a964ccd23.tar.xz isc-kea-f5f56e1a1c4d9e9496fcb9d81131066a964ccd23.zip |
Adding upstream version 2.4.1.upstream/2.4.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib/http/tests/client_mt_unittests.cc')
-rw-r--r-- | src/lib/http/tests/client_mt_unittests.cc | 1042 |
1 files changed, 1042 insertions, 0 deletions
diff --git a/src/lib/http/tests/client_mt_unittests.cc b/src/lib/http/tests/client_mt_unittests.cc new file mode 100644 index 0000000..ef04c28 --- /dev/null +++ b/src/lib/http/tests/client_mt_unittests.cc @@ -0,0 +1,1042 @@ +// Copyright (C) 2021-2023 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <config.h> + +#include <asiolink/asio_wrapper.h> +#include <asiolink/interval_timer.h> +#include <cc/data.h> +#include <http/client.h> +#include <http/listener.h> +#include <http/post_request_json.h> +#include <http/response_creator.h> +#include <http/response_creator_factory.h> +#include <http/response_json.h> +#include <http/url.h> +#include <util/multi_threading_mgr.h> +#include <testutils/gtest_utils.h> + +#include <boost/asio/ip/tcp.hpp> +#include <boost/pointer_cast.hpp> +#include <gtest/gtest.h> + +#include <functional> +#include <sstream> +#include <string> + +using namespace isc; +using namespace isc::asiolink; +using namespace isc::data; +using namespace isc::http; +using namespace isc::util; +namespace ph = std::placeholders; + +namespace { + +/// @brief IP address to which HTTP service is bound. +const std::string SERVER_ADDRESS = "127.0.0.1"; + +/// @brief Port number to which HTTP service is bound. +const unsigned short SERVER_PORT = 18123; + +/// @brief Test timeout (ms). +const long TEST_TIMEOUT = 10000; + +/// @brief Container request/response pair handled by a given thread. +struct ClientRR { + /// @brief Thread id of the client thread handling the request as a string. + std::string thread_id_; + + /// @brief HTTP request submitted by the client thread. + PostHttpRequestJsonPtr request_; + + /// @brief HTTP response received by the client thread. + HttpResponseJsonPtr response_; +}; + +/// @brief Pointer to a ClientRR instance. +typedef boost::shared_ptr<ClientRR> ClientRRPtr; + +/// @brief Implementation of the @ref HttpResponseCreator. +/// +/// Creates a response to a request containing body content +/// as follows: +/// +/// ``` +/// { "sequence" : nnnn } +/// ``` +/// +/// The response will include the sequence number of the request +/// as well as the server port passed into the creator's constructor: +/// +/// ``` +/// { "sequence": nnnn, "server-port": xxxx } +/// ``` +class TestHttpResponseCreator : public HttpResponseCreator { +public: + /// @brief Constructor + /// + /// @param server_port integer value the server listens upon, it is + /// echoed back in responses as "server-port". + TestHttpResponseCreator(uint16_t server_port) + : server_port_(server_port) { } + + /// @brief Create a new request. + /// + /// @return Pointer to the new instance of the @ref HttpRequest. + virtual HttpRequestPtr + createNewHttpRequest() const { + return (HttpRequestPtr(new PostHttpRequestJson())); + } + +private: + /// @brief Creates HTTP response. + /// + /// @param request Pointer to the HTTP request. + /// @param status_code status code to include in the response. + /// + /// @return Pointer to the generated HTTP response. + virtual HttpResponsePtr + createStockHttpResponse(const HttpRequestPtr& request, + const HttpStatusCode& status_code) const { + // The request hasn't been finalized so the request object + // doesn't contain any information about the HTTP version number + // used. But, the context should have this data (assuming the + // HTTP version is parsed OK). + HttpVersion http_version(request->context()->http_version_major_, + request->context()->http_version_minor_); + // This will generate the response holding JSON content. + HttpResponseJsonPtr response(new HttpResponseJson(http_version, status_code)); + response->finalize(); + return (response); + } + + /// @brief Creates HTTP response. + /// + /// Generates a response which echoes the requests sequence + /// number as well as the creator's server port value. Responses + /// should appear as follows: + /// + /// ``` + /// { "sequence" : nnnn } + /// ``` + /// + /// @param request Pointer to the HTTP request. + /// @return Pointer to the generated HTTP OK response with no content. + virtual HttpResponsePtr + createDynamicHttpResponse(HttpRequestPtr request) { + // Request must always be JSON. + PostHttpRequestJsonPtr request_json = + boost::dynamic_pointer_cast<PostHttpRequestJson>(request); + if (!request_json) { + return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST)); + } + + // Extract the sequence from the request. + ConstElementPtr sequence = request_json->getJsonElement("sequence"); + if (!sequence) { + return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST)); + } + + // Create the response. + HttpResponseJsonPtr response(new HttpResponseJson(request->getHttpVersion(), + HttpStatusCode::OK)); + // Construct the body. + ElementPtr body = Element::createMap(); + body->set("server-port", Element::create(server_port_)); + body->set("sequence", sequence); + + // Echo request body back in the response. + response->setBodyAsJson(body); + + response->finalize(); + return (response); + } + + /// @brief Port upon which this creator's server is listening. + /// + /// The intent is to use the value to determine which server generated + /// a given response. + uint16_t server_port_; +}; + +/// @brief Implementation of the test @ref HttpResponseCreatorFactory. +/// +/// This factory class creates @ref TestHttpResponseCreator instances. +class TestHttpResponseCreatorFactory : public HttpResponseCreatorFactory { +public: + + /// @brief Constructor + /// + /// @param server_port port upon with the server is listening. This + /// value will be included in responses such that each response + /// can be attributed to a specific server. + TestHttpResponseCreatorFactory(uint16_t server_port) + : server_port_(server_port) {}; + + /// @brief Creates @ref TestHttpResponseCreator instance. + virtual HttpResponseCreatorPtr create() const { + HttpResponseCreatorPtr response_creator(new TestHttpResponseCreator(server_port_)); + return (response_creator); + } + + /// @brief Port upon which this factory's server is listening. + /// + /// The intent is to use the value to determine which server generated + /// a given response. + uint16_t server_port_; +}; + +/// @brief Test fixture class for testing threading modes of HTTP client. +class MultiThreadingHttpClientTest : public ::testing::Test { +public: + + /// @brief Constructor. + MultiThreadingHttpClientTest() + : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(), + test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0), + expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false), + pause_cnt_(0) { + test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true), + TEST_TIMEOUT, IntervalTimer::ONE_SHOT); + MultiThreadingMgr::instance().setMode(true); + } + + /// @brief Destructor. + ~MultiThreadingHttpClientTest() { + // Stop the client. + if (client_) { + client_->stop(); + } + + // Stop all listeners. + for (const auto& listener : listeners_) { + listener->stop(); + } + + MultiThreadingMgr::instance().setMode(false); + } + + /// @brief Callback function to invoke upon test timeout. + /// + /// It stops the IO service and reports test timeout. + /// + /// @param fail_on_timeout Specifies if test failure should be reported. + void timeoutHandler(const bool fail_on_timeout) { + if (fail_on_timeout) { + ADD_FAILURE() << "Timeout occurred while running the test!"; + } + io_service_.stop(); + } + + /// @brief Runs the test's IOService until the desired number of requests + /// have been carried out or the test fails. + void runIOService(size_t request_limit) { + while (getRRCount() < request_limit) { + // Always call reset() before we call run(); + io_service_.restart(); + + // Run until a client stops the service. + io_service_.run(); + } + } + + /// @brief Creates an HTTP request with JSON body. + /// + /// It includes a JSON parameter with a specified value. + /// + /// @param parameter_name JSON parameter to be included. + /// @param value JSON parameter value. + /// @param version HTTP version to be used. Default is HTTP/1.1. + template<typename ValueType> + PostHttpRequestJsonPtr createRequest(const std::string& parameter_name, + const ValueType& value, + const HttpVersion& version = HttpVersion(1, 1)) { + // Create POST request with JSON body. + PostHttpRequestJsonPtr request(new PostHttpRequestJson(HttpRequest::Method::HTTP_POST, + "/boo", version)); + // Body is a map with a specified parameter included. + ElementPtr body = Element::createMap(); + body->set(parameter_name, Element::create(value)); + request->setBodyAsJson(body); + try { + request->finalize(); + } catch (const std::exception& ex) { + ADD_FAILURE() << "failed to create request: " << ex.what(); + } + + return (request); + } + + /// @brief Test that worker threads are not permitted to change thread pool + /// state. + void testIllegalThreadPoolActions() { + ASSERT_THROW(client_->start(), MultiThreadingInvalidOperation); + ASSERT_THROW(client_->pause(), MultiThreadingInvalidOperation); + ASSERT_THROW(client_->resume(), MultiThreadingInvalidOperation); + } + + /// @brief Initiates a single HTTP request. + /// + /// Constructs an HTTP post whose body is a JSON map containing a + /// single integer element, "sequence". + /// + /// The request completion handler will block each requesting thread + /// until the number of in-progress threads reaches the number of + /// threads in the pool. At that point, the handler will unblock + /// until all threads have finished preparing their response and are + /// ready to return. The handler will then notify all pending threads + /// and invoke stop() on the test's main IO service thread. + /// + /// @param sequence value for the integer element, "sequence", + /// to send in the request. + void startRequest(int sequence, int port_offset = 0) { + // Create the URL on which the server can be reached. + std::stringstream ss; + ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset); + Url url(ss.str()); + + // Initiate request to the server. + PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence); + HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>(); + ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(), + request_json, response_json, + [this, request_json, response_json](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + // Bail on an error. + ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec; + + // Wait here until we have as many in progress as we have threads. + { + std::unique_lock<std::mutex> lck(test_mutex_); + ++num_in_progress_; + if (num_threads_ == 0 || num_in_progress_ == num_threads_) { + // Everybody has one, let's go. + num_finished_ = 0; + test_cv_.notify_all(); + } else { + // I'm ready but others aren't wait here. + bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), + [&]() { return (num_in_progress_ == num_threads_); }); + if (!ret) { + ADD_FAILURE() << "clients failed to start work"; + } + } + } + + // If running on multiple threads, threads should be prohibited from + // changing the thread pool state. + if (num_threads_) { + testIllegalThreadPoolActions(); + } + + // Get stringified thread-id. + std::stringstream ss; + ss << std::this_thread::get_id(); + + // Create the ClientRR. + ClientRRPtr clientRR(new ClientRR()); + clientRR->thread_id_ = ss.str(); + clientRR->request_ = request_json; + clientRR->response_ = response_json; + + // Wait here until we have as many ready to finish as we have threads. + { + std::unique_lock<std::mutex> lck(test_mutex_); + ++num_finished_; + clientRRs_.push_back(clientRR); + if (num_threads_ == 0 || num_finished_ == num_threads_) { + // We're all done, notify the others and finish. + num_in_progress_ = 0; + test_cv_.notify_all(); + // Stop the test's IOService. + io_service_.stop(); + } else { + // I'm done but others aren't wait here. + bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), + [&]() { return (num_finished_ == num_threads_); }); + if (!ret) { + ADD_FAILURE() << "clients failed to finish work"; + } + } + } + })); + } + + /// @brief Initiates a single HTTP request. + /// + /// Constructs an HTTP post whose body is a JSON map containing a + /// single integer element, "sequence". + /// + /// The request completion handler simply constructs the response, + /// and adds it the list of completed request/responses. If the + /// number of completed requests has reached the expected number + /// it stops the test IOService. + /// + /// @param sequence value for the integer element, "sequence", + /// to send in the request. + void startRequestSimple(int sequence, int port_offset = 0) { + // Create the URL on which the server can be reached. + std::stringstream ss; + ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset); + Url url(ss.str()); + + // Initiate request to the server. + PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence); + HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>(); + ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(), + request_json, response_json, + [this, request_json, response_json](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + // Bail on an error. + ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec; + + // Get stringified thread-id. + std::stringstream ss; + ss << std::this_thread::get_id(); + + // Create the ClientRR. + ClientRRPtr clientRR(new ClientRR()); + clientRR->thread_id_ = ss.str(); + clientRR->request_ = request_json; + clientRR->response_ = response_json; + + { + std::unique_lock<std::mutex> lck(test_mutex_); + clientRRs_.push_back(clientRR); + ++num_finished_; + if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) { + io_service_.stop(); + } + } + + })); + } + + /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s). + /// + /// This function creates one HttpClient with the given number + /// of threads and then the given number of HttpListeners. It then + /// initiates the given number of request batches where each batch + /// contains one request per thread per listener. + /// + /// Then it iteratively runs the test's IOService until all + /// the requests have been responded to, an error occurs, or the + /// test times out. + /// + /// Each request carries a single integer element, "sequence", which + /// uniquely identifies the request. Each response is expected to + /// contain this value echoed back along with the listener's server + /// port number. Thus each response can be matched to it's request + /// and to the listener that handled the request. + /// + /// After all requests have been conducted, the function verifies + /// that: + /// + /// 1. The number of requests conducted is correct + /// 2. The sequence numbers in request-response pairs match + /// 3. Each client thread handled the same number of requests + /// 4. Each listener handled the same number of requests + /// + /// @param num_threads number of threads the HttpClient should use. + /// A value of 0 puts the HttpClient in single-threaded mode. + /// @param num_batches number of batches of requests that should be + /// conducted. + /// @param num_listeners number of HttpListeners to create. Defaults + /// to 1. + void threadRequestAndReceive(size_t num_threads, size_t num_batches, + size_t num_listeners = 1) { + ASSERT_TRUE(num_batches); + ASSERT_TRUE(num_listeners); + num_threads_ = num_threads; + num_batches_ = num_batches; + num_listeners_ = num_listeners; + + // Client in ST is, in effect, 1 thread. + size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_); + + // Calculate the expected number of requests. + expected_requests_ = (num_batches_ * num_listeners_ * effective_threads); + + for (auto i = 0; i < num_listeners_; ++i) { + // Make a factory + HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i)); + factories_.push_back(factory); + + // Need to create a Listener on + HttpListenerPtr listener(new HttpListener(io_service_, + IOAddress(SERVER_ADDRESS), (SERVER_PORT + i), + TlsContextPtr(), factory, + HttpListener::RequestTimeout(10000), + HttpListener::IdleTimeout(10000))); + listeners_.push_back(listener); + + // Start the server. + ASSERT_NO_THROW(listener->start()); + } + + // Create an MT client with num_threads + ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, + num_threads ? true : false, + num_threads, true))); + ASSERT_TRUE(client_); + + if (num_threads_ == 0) { + // If we single-threaded client should not have it's own IOService. + ASSERT_FALSE(client_->getThreadIOService()); + } else { + // If we multi-threaded client should have it's own IOService. + ASSERT_TRUE(client_->getThreadIOService()); + } + + // Start the requisite number of requests: + // batch * listeners * threads. + int sequence = 0; + for (auto b = 0; b < num_batches; ++b) { + for (auto l = 0; l < num_listeners_; ++l) { + for (auto t = 0; t < effective_threads; ++t) { + startRequest(++sequence, l); + } + } + } + + client_->start(); + + // Verify the pool size and number of threads are as expected. + ASSERT_EQ(client_->getThreadPoolSize(), num_threads); + ASSERT_EQ(client_->getThreadCount(), num_threads); + + // Loop until the clients are done, an error occurs, or the time runs out. + runIOService(expected_requests_); + + // Client should stop without issue. + ASSERT_NO_THROW(client_->stop()); + + // Listeners should stop without issue. + for (const auto& listener : listeners_) { + ASSERT_NO_THROW(listener->stop()); + } + + // We should have a response for each request. + ASSERT_EQ(getRRCount(), expected_requests_); + + // Create a map to track number of responses for each client thread. + std::map<std::string, int> responses_per_thread; + + // Create a map to track number of responses for each listener port. + std::map<uint16_t, int> responses_per_listener; + + // Get the stringified thread-id of the test's main thread. + std::stringstream ss; + ss << std::this_thread::get_id(); + std::string main_thread_id = ss.str(); + + // Iterate over the client request/response pairs. + for (auto const& clientRR : clientRRs_) { + // Make sure it's whole. + ASSERT_FALSE(clientRR->thread_id_.empty()); + ASSERT_TRUE(clientRR->request_); + ASSERT_TRUE(clientRR->response_); + + // Request should contain an integer sequence number. + int request_sequence; + ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence"); + ASSERT_TRUE(sequence); + ASSERT_NO_THROW(request_sequence = sequence->intValue()); + + // Response should contain an integer sequence number. + int response_sequence; + sequence = clientRR->response_->getJsonElement("sequence"); + ASSERT_TRUE(sequence); + ASSERT_NO_THROW(response_sequence = sequence->intValue()); + + // Request and Response sequence numbers should match. + ASSERT_EQ(request_sequence, response_sequence); + + ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port"); + ASSERT_TRUE(server_port_elem); + uint16_t server_port = server_port_elem->intValue(); + + if (num_threads_ == 0) { + // For ST mode thread id should always be the main thread. + ASSERT_EQ(clientRR->thread_id_, main_thread_id); + } else { + // For MT mode the thread id should never be the main thread. + ASSERT_NE(clientRR->thread_id_, main_thread_id); + } + + // Bump the response count for the given client thread-id. + auto rit = responses_per_thread.find(clientRR->thread_id_); + if (rit != responses_per_thread.end()) { + responses_per_thread[clientRR->thread_id_] = rit->second + 1; + } else { + responses_per_thread[clientRR->thread_id_] = 1; + } + + // Bump the response count for the given server port. + auto lit = responses_per_listener.find(server_port); + if (lit != responses_per_listener.end()) { + responses_per_listener[server_port] = lit->second + 1; + } else { + responses_per_listener[server_port] = 1; + } + } + + // Make sure that all client threads received responses. + ASSERT_EQ(responses_per_thread.size(), effective_threads); + + // Make sure that each client thread received the same number of responses. + for (auto const& it : responses_per_thread) { + EXPECT_EQ(it.second, (num_batches_ * num_listeners_)) + << "thread-id: " << it.first + << ", responses: " << it.second << std::endl; + } + + // Make sure that all listeners generated responses. + ASSERT_EQ(responses_per_listener.size(), num_listeners_); + + // Make sure Each listener generated the same number of responses. + for (auto const& it : responses_per_listener) { + EXPECT_EQ(it.second, (num_batches_ * effective_threads)) + << "server-port: " << it.first + << ", responses: " << it.second << std::endl; + } + } + + /// @brief Verifies the client can be paused and resumed repeatedly + /// while doing multi-threaded work. + /// + /// @param num_threads number of threads the HttpClient should use. + /// Must be greater than zero, this test does not make sense for a + /// single threaded client. + /// @param num_batches number of batches of requests that should be + /// conducted. + /// @param num_listeners number of HttpListeners to create. + /// @param num_pauses number of pauses to conduct. + void workPauseResumeShutdown(size_t num_threads, size_t num_batches, + size_t num_listeners, size_t num_pauses) { + ASSERT_TRUE(num_threads); + ASSERT_TRUE(num_batches); + ASSERT_TRUE(num_listeners); + num_threads_ = num_threads; + num_batches_ = num_batches; + num_listeners_ = num_listeners; + + // Calculate the total expected number of requests. + size_t total_requests = (num_batches_ * num_listeners_ * num_threads_); + + // Create the listeners. + for (auto i = 0; i < num_listeners_; ++i) { + // Make a factory + HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i)); + factories_.push_back(factory); + + // Need to create a Listener on + HttpListenerPtr listener(new HttpListener(io_service_, + IOAddress(SERVER_ADDRESS), (SERVER_PORT + i), + TlsContextPtr(), factory, + HttpListener::RequestTimeout(10000), + HttpListener::IdleTimeout(10000))); + listeners_.push_back(listener); + + // Start the server. + ASSERT_NO_THROW(listener->start()); + } + + // Create an instant start, MT client with num_threads + ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, true, num_threads, true))); + ASSERT_TRUE(client_); + + // Start the requisite number of requests: + // batch * listeners * threads. + int sequence = 0; + for (auto b = 0; b < num_batches; ++b) { + for (auto l = 0; l < num_listeners_; ++l) { + for (auto t = 0; t < num_threads_; ++t) { + startRequestSimple(++sequence, l); + } + } + } + + client_->start(); + + // Client should be running. Check convenience functions. + ASSERT_TRUE(client_->isRunning()); + ASSERT_FALSE(client_->isPaused()); + ASSERT_FALSE(client_->isStopped()); + + // Verify the pool size and number of threads are as expected. + ASSERT_EQ(client_->getThreadPoolSize(), num_threads); + ASSERT_EQ(client_->getThreadCount(), num_threads); + + size_t rr_count = 0; + while (rr_count < total_requests) { + size_t request_limit = (pause_cnt_ < num_pauses ? + (rr_count + ((total_requests - rr_count) / num_pauses)) + : total_requests); + + // Run test IOService until we hit the limit. + runIOService(request_limit); + + // If we've done all our pauses we should be through. + if (pause_cnt_ == num_pauses) { + break; + } + + // Pause the client. + ASSERT_NO_THROW(client_->pause()); + ASSERT_TRUE(client_->isPaused()); + ++pause_cnt_; + + // Check our progress. + rr_count = getRRCount(); + ASSERT_GE(rr_count, request_limit); + + // Resume the client. + ASSERT_NO_THROW(client_->resume()); + ASSERT_TRUE(client_->isRunning()); + } + + // Client should stop without issue. + ASSERT_NO_THROW(client_->stop()); + ASSERT_TRUE(client_->isStopped()); + + // We should have finished all our requests. + ASSERT_EQ(getRRCount(), total_requests); + + // Stopping again should be harmless. + ASSERT_NO_THROW(client_->stop()); + + // Listeners should stop without issue. + for (const auto& listener : listeners_) { + ASSERT_NO_THROW(listener->stop()); + } + + // Get the stringified thread-id of the test's main thread. + std::stringstream ss; + ss << std::this_thread::get_id(); + std::string main_thread_id = ss.str(); + + // Tracks the number for requests fulfilled by main thread. + size_t worked_by_main = 0; + + // Iterate over the client request/response pairs. + for (auto const& clientRR : clientRRs_) { + // Make sure it's whole. + ASSERT_FALSE(clientRR->thread_id_.empty()); + ASSERT_TRUE(clientRR->request_); + ASSERT_TRUE(clientRR->response_); + + // Request should contain an integer sequence number. + int request_sequence; + ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence"); + ASSERT_TRUE(sequence); + ASSERT_NO_THROW(request_sequence = sequence->intValue()); + + // Response should contain an integer sequence number. + int response_sequence; + sequence = clientRR->response_->getJsonElement("sequence"); + ASSERT_TRUE(sequence); + ASSERT_NO_THROW(response_sequence = sequence->intValue()); + + // Request and Response sequence numbers should match. + ASSERT_EQ(request_sequence, response_sequence); + + ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port"); + ASSERT_TRUE(server_port_elem); + + // Track how many requests were completed by the main thread. + // These can occur when pausing calls IOService::poll. + if (clientRR->thread_id_ == main_thread_id) { + ++worked_by_main; + } + } + + // Make sure the majority of the requests were worked by + // worker threads. In theory, the number of calls to poll + // times the number of threads is the limit for responses + // built by the main thread. + ASSERT_LE(worked_by_main, num_pauses * num_threads); + } + + /// @brief Fetch the number of completed requests. + /// + /// @return number of completed requests. + size_t getRRCount() { + std::lock_guard<std::mutex> lck(test_mutex_); + return (clientRRs_.size()); + } + + /// @brief IO service used in the tests. + IOService io_service_; + + /// @brief Instance of the client used in the tests. + HttpClientPtr client_; + + /// @brief Instance of the listener used in the tests. + HttpListenerPtr listener_; + + /// @brief Pointer to the response creator factory. + HttpResponseCreatorFactoryPtr factory_; + + /// @brief List of listeners. + std::vector<HttpListenerPtr> listeners_; + + /// @brief List of response factories. + std::vector<HttpResponseCreatorFactoryPtr> factories_; + + /// @brief Asynchronous timer service to detect timeouts. + IntervalTimer test_timer_; + + /// @brief Number of threads HttpClient should use. + size_t num_threads_; + + /// @brief Number of request batches to conduct. + size_t num_batches_; + + /// @brief Number of listeners to start. + size_t num_listeners_; + + /// @brief Number of expected requests to carry out. + size_t expected_requests_; + + /// @brief Number of requests that are in progress. + size_t num_in_progress_; + + /// @brief Number of requests that have been completed. + size_t num_finished_; + + /// @brief a List of client request-response pairs. + std::vector<ClientRRPtr> clientRRs_; + + /// @brief Mutex for locking. + std::mutex test_mutex_; + + /// @brief Condition variable used to make client threads wait + /// until number of in-progress requests reaches the number + /// of client requests. + std::condition_variable test_cv_; + + /// @brief Indicates if client threads are currently "paused". + bool paused_; + + /// @brief Number of times client has been paused during the test. + size_t pause_cnt_; +}; + +// Verifies we can construct and destruct, in both single +// and multi-threaded modes. +TEST_F(MultiThreadingHttpClientTest, basics) { + HttpClientPtr client; + + // Value of 0 for thread_pool_size means single-threaded. + ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, false))); + ASSERT_TRUE(client); + + ASSERT_FALSE(client->getThreadIOService()); + ASSERT_EQ(client->getThreadPoolSize(), 0); + ASSERT_EQ(client->getThreadCount(), 0); + + // Make sure destruction doesn't throw. + ASSERT_NO_THROW_LOG(client.reset()); + + // Non-zero thread-pool-size means multi-threaded mode, should throw. + ASSERT_THROW_MSG(client.reset(new HttpClient(io_service_, false, 1)), InvalidOperation, + "HttpClient thread_pool_size must be zero " + "when Kea core multi-threading is disabled"); + ASSERT_FALSE(client); + + // Multi-threaded construction should work now. + ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, true, 3))); + ASSERT_TRUE(client); + + // Verify that it has an internal IOService and that thread pool size + // and thread count match. + ASSERT_TRUE(client->getThreadIOService()); + EXPECT_FALSE(client->getThreadIOService()->stopped()); + ASSERT_EQ(client->getThreadPoolSize(), 3); + ASSERT_EQ(client->getThreadCount(), 3); + + // Check convenience functions. + ASSERT_TRUE(client->isRunning()); + ASSERT_FALSE(client->isPaused()); + ASSERT_FALSE(client->isStopped()); + + // Verify stop doesn't throw. + ASSERT_NO_THROW_LOG(client->stop()); + + // Verify we're stopped. + ASSERT_TRUE(client->getThreadIOService()); + EXPECT_TRUE(client->getThreadIOService()->stopped()); + ASSERT_EQ(client->getThreadPoolSize(), 3); + ASSERT_EQ(client->getThreadCount(), 0); + + // Check convenience functions. + ASSERT_FALSE(client->isRunning()); + ASSERT_FALSE(client->isPaused()); + ASSERT_TRUE(client->isStopped()); + + // Verify a second call to stop() doesn't throw. + ASSERT_NO_THROW_LOG(client->stop()); + + // Make sure destruction doesn't throw. + ASSERT_NO_THROW_LOG(client.reset()); + + // Create another multi-threaded instance. + ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, true, 3))); + + // Make sure destruction doesn't throw. + ASSERT_NO_THROW_LOG(client.reset()); +} + +// Verifies we can construct with deferred start. +TEST_F(MultiThreadingHttpClientTest, deferredStart) { + HttpClientPtr client; + size_t thread_pool_size = 3; + + // Create MT client with deferred start. + ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, true, thread_pool_size, true))); + ASSERT_TRUE(client); + + // Client should be STOPPED, with no threads. + ASSERT_TRUE(client->getThreadIOService()); + ASSERT_EQ(client->getThreadPoolSize(), thread_pool_size); + ASSERT_EQ(client->getThreadCount(), 0); + + // Check convenience functions. + ASSERT_FALSE(client->isRunning()); + ASSERT_FALSE(client->isPaused()); + ASSERT_TRUE(client->isStopped()); + + // We should be able to start it. + ASSERT_NO_THROW(client->start()); + + // Verify we have threads and run state is RUNNING. + ASSERT_EQ(client->getThreadCount(), 3); + ASSERT_TRUE(client->getThreadIOService()); + ASSERT_FALSE(client->getThreadIOService()->stopped()); + + // Check convenience functions. + ASSERT_TRUE(client->isRunning()); + ASSERT_FALSE(client->isPaused()); + ASSERT_FALSE(client->isStopped()); + + // Second call to start should be harmless. + ASSERT_NO_THROW_LOG(client->start()); + + // Verify we didn't break it. + ASSERT_EQ(client->getThreadCount(), 3); + ASSERT_TRUE(client->isRunning()); + + // Make sure destruction doesn't throw. + ASSERT_NO_THROW_LOG(client.reset()); +} + +// Verifies we can restart after stop. +TEST_F(MultiThreadingHttpClientTest, restartAfterStop) { + HttpClientPtr client; + size_t thread_pool_size = 3; + + // Create MT client with instant start. + ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, true, thread_pool_size))); + ASSERT_TRUE(client); + + // Verify we're started. + ASSERT_EQ(client->getThreadCount(), 3); + ASSERT_TRUE(client->getThreadIOService()); + ASSERT_FALSE(client->getThreadIOService()->stopped()); + ASSERT_TRUE(client->isRunning()); + + // Stop should succeed. + ASSERT_NO_THROW_LOG(client->stop()); + + // Verify we're stopped. + ASSERT_EQ(client->getThreadCount(), 0); + ASSERT_TRUE(client->getThreadIOService()); + ASSERT_TRUE(client->getThreadIOService()->stopped()); + ASSERT_TRUE(client->isStopped()); + + // Starting again should succeed. + ASSERT_NO_THROW_LOG(client->start()); + + // Verify we didn't break it. + ASSERT_EQ(client->getThreadCount(), 3); + ASSERT_TRUE(client->getThreadIOService()); + ASSERT_FALSE(client->getThreadIOService()->stopped()); + ASSERT_TRUE(client->isRunning()); + + // Make sure destruction doesn't throw. + ASSERT_NO_THROW_LOG(client.reset()); +} + +// Now we'll run some permutations of the number of client threads, +// requests, and listeners. + +// Single-threaded, three batches, one listener. +TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) { + size_t num_threads = 0; // Zero threads = ST mode. + size_t num_batches = 3; + threadRequestAndReceive(num_threads, num_batches); +} + +// Single-threaded, three batches, three listeners. +TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) { + size_t num_threads = 0; // Zero threads = ST mode. + size_t num_batches = 3; + size_t num_listeners = 3; + threadRequestAndReceive(num_threads, num_batches, num_listeners); +} + +// Multi-threaded with one thread, three batches, one listener +TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) { + size_t num_threads = 1; + size_t num_batches = 3; + threadRequestAndReceive(num_threads, num_batches); +} + +// Multi-threaded with three threads, three batches, one listener +TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) { + size_t num_threads = 3; + size_t num_batches = 3; + threadRequestAndReceive(num_threads, num_batches); +} + +// Multi-threaded with three threads, nine batches, one listener +TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) { + size_t num_threads = 3; + size_t num_batches = 9; + threadRequestAndReceive(num_threads, num_batches); +} + +// Multi-threaded with two threads, four batches, two listeners +TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) { + size_t num_threads = 2; + size_t num_batches = 4; + size_t num_listeners = 2; + threadRequestAndReceive(num_threads, num_batches, num_listeners); +} + +// Multi-threaded with four threads, four batches, two listeners +TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) { + size_t num_threads = 4; + size_t num_batches = 4; + size_t num_listeners = 2; + threadRequestAndReceive(num_threads, num_batches, num_listeners); +} + +// Verifies that we can cleanly pause, resume, and shutdown while doing +// multi-threaded work. +TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) { + size_t num_threads = 4; + size_t num_batches = 4; + size_t num_listeners = 4; + size_t num_pauses = 3; + workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses); +} + +} // end of anonymous namespace |