summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp')
-rw-r--r--src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp536
1 files changed, 536 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp b/src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp
new file mode 100644
index 000000000..b88c35bf4
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/test/TServerIntegrationTest.cpp
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TServerIntegrationTest
+#include <atomic>
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/foreach.hpp>
+#include <boost/format.hpp>
+#include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TThreadedServer.h>
+#include <memory>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TTransport.h>
+#include "gen-cpp/ParentService.h"
+#include <string>
+#include <vector>
+
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Synchronized;
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
+using apache::thrift::server::TThreadedServer;
+using std::dynamic_pointer_cast;
+using std::make_shared;
+using std::shared_ptr;
+using apache::thrift::test::ParentServiceClient;
+using apache::thrift::test::ParentServiceIf;
+using apache::thrift::test::ParentServiceIfFactory;
+using apache::thrift::test::ParentServiceIfSingletonFactory;
+using apache::thrift::test::ParentServiceProcessor;
+using apache::thrift::test::ParentServiceProcessorFactory;
+using apache::thrift::TProcessor;
+using apache::thrift::TProcessorFactory;
+using boost::posix_time::milliseconds;
+
+/**
+ * preServe runs after listen() is successful, when we can connect
+ */
+class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
+public:
+ TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
+ ~TServerReadyEventHandler() override = default;
+ void preServe() override {
+ Synchronized sync(*this);
+ isListening_ = true;
+ notify();
+ }
+ void* createContext(shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) override {
+ Synchronized sync(*this);
+ ++accepted_;
+ notify();
+
+ (void)input;
+ (void)output;
+ return nullptr;
+ }
+ bool isListening() const { return isListening_; }
+ uint64_t acceptedCount() const { return accepted_; }
+
+private:
+ bool isListening_;
+ uint64_t accepted_;
+};
+
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
+public:
+ ParentHandler() : generation_(0) {}
+
+ int32_t incrementGeneration() override {
+ Guard g(mutex_);
+ return ++generation_;
+ }
+
+ int32_t getGeneration() override {
+ Guard g(mutex_);
+ return generation_;
+ }
+
+ void addString(const std::string& s) override {
+ Guard g(mutex_);
+ strings_.push_back(s);
+ }
+
+ void getStrings(std::vector<std::string>& _return) override {
+ Guard g(mutex_);
+ _return = strings_;
+ }
+
+ void getDataWait(std::string& _return, const int32_t length) override {
+ THRIFT_UNUSED_VARIABLE(_return);
+ THRIFT_UNUSED_VARIABLE(length);
+ }
+
+ void onewayWait() override {}
+
+ void exceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
+
+ void unexpectedExceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
+
+protected:
+ Mutex mutex_;
+ int32_t generation_;
+ std::vector<std::string> strings_;
+};
+
+void autoSocketCloser(TSocket* pSock) {
+ pSock->close();
+ delete pSock;
+}
+
+template <class TServerType>
+class TServerIntegrationTestFixture {
+public:
+ TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory>& _processorFactory)
+ : pServer(new TServerType(_processorFactory,
+ shared_ptr<TServerTransport>(
+ new TServerSocket("localhost", 0)),
+ shared_ptr<TTransportFactory>(new TTransportFactory),
+ shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
+ pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+ bStressDone(false),
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
+ pServer->setServerEventHandler(pEventHandler);
+ }
+
+ TServerIntegrationTestFixture(const shared_ptr<TProcessor>& _processor)
+ : pServer(
+ new TServerType(_processor,
+ shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
+ shared_ptr<TTransportFactory>(new TTransportFactory),
+ shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
+ pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+ bStressDone(false),
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
+ pServer->setServerEventHandler(pEventHandler);
+ }
+
+ void startServer() {
+ pServerThread.reset(new boost::thread(std::bind(&TServerType::serve, pServer.get())));
+
+ // block until listen() completes so clients will be able to connect
+ Synchronized sync(*(pEventHandler.get()));
+ while (!pEventHandler->isListening()) {
+ pEventHandler->wait();
+ }
+
+ BOOST_TEST_MESSAGE(" server is listening");
+ }
+
+ void blockUntilAccepted(uint64_t numAccepted) {
+ Synchronized sync(*(pEventHandler.get()));
+ while (pEventHandler->acceptedCount() < numAccepted) {
+ pEventHandler->wait();
+ }
+
+ BOOST_TEST_MESSAGE(boost::format(" server has accepted %1%") % numAccepted);
+ }
+
+ void stopServer() {
+ if (pServerThread) {
+ pServer->stop();
+ BOOST_TEST_MESSAGE(" server stop completed");
+
+ pServerThread->join();
+ BOOST_TEST_MESSAGE(" server thread joined");
+ pServerThread.reset();
+ }
+ }
+
+ ~TServerIntegrationTestFixture() { stopServer(); }
+
+ /**
+ * Performs a baseline test where some clients are opened and issue a single operation
+ * and then disconnect at different intervals.
+ * \param[in] numToMake the number of concurrent clients
+ * \param[in] expectedHWM the high water mark we expect of concurrency
+ * \param[in] purpose a description of the test for logging purposes
+ */
+ void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
+ BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
+ % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
+
+ startServer();
+
+ std::vector<shared_ptr<TSocket> > holdSockets;
+ std::vector<shared_ptr<boost::thread> > holdThreads;
+
+ for (int64_t i = 0; i < numToMake; ++i) {
+ shared_ptr<TSocket> pClientSock(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ holdSockets.push_back(pClientSock);
+ shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+ ParentServiceClient client(pClientProtocol);
+ pClientSock->open();
+ client.incrementGeneration();
+ holdThreads.push_back(shared_ptr<boost::thread>(
+ new boost::thread(std::bind(&TServerIntegrationTestFixture::delayClose,
+ this,
+ pClientSock,
+ milliseconds(10 * numToMake)))));
+ }
+
+ BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+
+ BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
+ holdThreads.clear();
+ holdSockets.clear();
+
+ stopServer();
+ }
+
+ /**
+ * Helper method used to close a connection after a delay.
+ * \param[in] toClose the connection to close
+ * \param[in] after the delay to impose
+ */
+ void delayClose(shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+ boost::this_thread::sleep(after);
+ toClose->close();
+ }
+
+ /**
+ * \returns the server port number
+ */
+ int getServerPort() {
+ auto* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
+ if (!pSock) { throw std::logic_error("how come?"); }
+ return pSock->getPort();
+ }
+
+ /**
+ * Performs a stress test by spawning threads that connect, do a number of operations
+ * and disconnect, then a random delay, then do it over again. This is done for a fixed
+ * period of time to test for concurrency correctness.
+ * \param[in] numToMake the number of concurrent clients
+ */
+ void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) {
+ BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds")
+ % typeid(TServerType).name() % numToMake % duration.total_seconds());
+
+ startServer();
+
+ std::vector<shared_ptr<boost::thread> > holdThreads;
+ for (int64_t i = 0; i < numToMake; ++i) {
+ holdThreads.push_back(shared_ptr<boost::thread>(
+ new boost::thread(std::bind(&TServerIntegrationTestFixture::stressor, this))));
+ }
+
+ boost::this_thread::sleep(duration);
+ bStressDone = true;
+
+ BOOST_TEST_MESSAGE(boost::format(" serviced %1% connections (HWM %2%) totaling %3% requests")
+ % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount);
+
+ BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
+ holdThreads.clear();
+
+ BOOST_CHECK(bStressRequestCount > 0);
+
+ stopServer();
+ }
+
+ /**
+ * Helper method to stress the system
+ */
+ void stressor() {
+ while (!bStressDone) {
+ shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
+ shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
+ ParentServiceClient client(pProtocol);
+ pSocket->open();
+ bStressConnectionCount.fetch_add(1, std::memory_order_relaxed);
+ for (int i = 0; i < rand() % 1000; ++i) {
+ client.incrementGeneration();
+ bStressRequestCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ }
+
+ shared_ptr<TServerType> pServer;
+ shared_ptr<TServerReadyEventHandler> pEventHandler;
+ shared_ptr<boost::thread> pServerThread;
+ std::atomic<bool> bStressDone;
+ std::atomic<int64_t> bStressConnectionCount;
+ std::atomic<int64_t> bStressRequestCount;
+};
+
+template <class TServerType>
+class TServerIntegrationProcessorFactoryTestFixture
+ : public TServerIntegrationTestFixture<TServerType> {
+public:
+ TServerIntegrationProcessorFactoryTestFixture()
+ : TServerIntegrationTestFixture<TServerType>(make_shared<ParentServiceProcessorFactory>(
+ make_shared<ParentServiceIfSingletonFactory>(
+ make_shared<ParentHandler>()))) {}
+};
+
+template <class TServerType>
+class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture<TServerType> {
+public:
+ TServerIntegrationProcessorTestFixture()
+ : TServerIntegrationTestFixture<TServerType>(
+ make_shared<ParentServiceProcessor>(make_shared<ParentHandler>())) {}
+};
+
+BOOST_AUTO_TEST_SUITE(constructors)
+
+BOOST_FIXTURE_TEST_CASE(test_simple_factory,
+ TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) {
+ baseline(3, 1, "factory");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) {
+ baseline(3, 1, "processor");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_factory,
+ TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
+ baseline(10, 10, "factory");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) {
+ baseline(10, 10, "processor");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound,
+ TServerIntegrationProcessorTestFixture<TThreadedServer>) {
+ pServer->setConcurrentClientLimit(4);
+ baseline(10, 4, "limit by server framework");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_stress,
+ TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
+ stress(10, boost::posix_time::seconds(3));
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,
+ TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>) {
+ pServer->getThreadManager()->threadFactory(
+ shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::ThreadFactory));
+ pServer->getThreadManager()->start();
+
+ // thread factory has 4 threads as a default
+ // thread factory however is a bad way to limit concurrent clients
+ // as accept() will be called to grab a 5th client socket, in this case
+ // and then the thread factory will block adding the thread to manage
+ // that client.
+ baseline(10, 5, "limit by thread manager");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool,
+ TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
+ pServer->getThreadManager()->threadFactory(
+ shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::ThreadFactory));
+ pServer->getThreadManager()->start();
+
+ // thread factory has 4 threads as a default
+ // thread factory however is a bad way to limit concurrent clients
+ // as accept() will be called to grab a 5th client socket, in this case
+ // and then the thread factory will block adding the thread to manage
+ // that client.
+ baseline(10, 5, "limit by thread manager");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,
+ TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
+ pServer->getThreadManager()->threadFactory(
+ shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::ThreadFactory));
+ pServer->getThreadManager()->start();
+ pServer->setConcurrentClientLimit(4);
+
+ baseline(10, 4, "server framework connection limit");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,
+ TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
+ pServer->getThreadManager()->threadFactory(
+ shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::ThreadFactory));
+ pServer->getThreadManager()->start();
+
+ stress(10, boost::posix_time::seconds(3));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest,
+ TServerIntegrationProcessorTestFixture<TThreadedServer>)
+
+BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
+ // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+ BOOST_TEST_MESSAGE("Testing stop with interruptable clients");
+
+ startServer();
+
+ shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock1->open();
+
+ shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock2->open();
+
+ // Ensure they have been accepted
+ blockUntilAccepted(2);
+
+ // The test fixture destructor will force the sockets to disconnect
+ // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
+ stopServer();
+
+ // extra proof the server end disconnected the clients
+ uint8_t buf[1];
+ BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
+ BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
+ // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
+ // disconnect.
+ BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
+
+ dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
+ ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+
+ startServer();
+
+ shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock1->open();
+
+ shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock2->open();
+
+ // Ensure they have been accepted
+ blockUntilAccepted(2);
+
+ boost::thread t1(std::bind(&TServerIntegrationTestFixture::delayClose,
+ this,
+ pClientSock1,
+ milliseconds(250)));
+ boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
+ this,
+ pClientSock2,
+ milliseconds(250)));
+
+ // Once the clients disconnect the server will stop
+ stopServer();
+ BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
+ t1.join();
+ t2.join();
+}
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) {
+ startServer();
+ BOOST_TEST_MESSAGE("Testing the concurrent client limit");
+
+ BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+ pServer->setConcurrentClientLimit(2);
+ BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+ shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock1->open();
+ blockUntilAccepted(1);
+ BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+ shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+ // a third client cannot connect until one of the other two closes
+ boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
+ this,
+ pClientSock2,
+ milliseconds(250)));
+ shared_ptr<TSocket> pClientSock3(new TSocket("localhost", getServerPort()),
+ autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
+
+ stopServer();
+ BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
+ t2.join();
+}
+
+BOOST_AUTO_TEST_SUITE_END()