/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #define BOOST_TEST_MODULE TServerIntegrationTest #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "gen-cpp/ParentService.h" #include #include using apache::thrift::concurrency::Guard; using apache::thrift::concurrency::Monitor; using apache::thrift::concurrency::Mutex; using apache::thrift::concurrency::Synchronized; using apache::thrift::protocol::TBinaryProtocol; using apache::thrift::protocol::TBinaryProtocolFactory; using apache::thrift::protocol::TProtocol; using apache::thrift::protocol::TProtocolFactory; using apache::thrift::transport::TServerSocket; using apache::thrift::transport::TServerTransport; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransport; using apache::thrift::transport::TTransportException; using apache::thrift::transport::TTransportFactory; using apache::thrift::server::TServer; using apache::thrift::server::TServerEventHandler; using apache::thrift::server::TSimpleServer; using apache::thrift::server::TThreadPoolServer; using apache::thrift::server::TThreadedServer; using std::dynamic_pointer_cast; using std::make_shared; using std::shared_ptr; using apache::thrift::test::ParentServiceClient; using apache::thrift::test::ParentServiceIf; using apache::thrift::test::ParentServiceIfFactory; using apache::thrift::test::ParentServiceIfSingletonFactory; using apache::thrift::test::ParentServiceProcessor; using apache::thrift::test::ParentServiceProcessorFactory; using apache::thrift::TProcessor; using apache::thrift::TProcessorFactory; using boost::posix_time::milliseconds; /** * preServe runs after listen() is successful, when we can connect */ class TServerReadyEventHandler : public TServerEventHandler, public Monitor { public: TServerReadyEventHandler() : isListening_(false), accepted_(0) {} ~TServerReadyEventHandler() override = default; void preServe() override { Synchronized sync(*this); isListening_ = true; notify(); } void* createContext(shared_ptr input, shared_ptr output) override { Synchronized sync(*this); ++accepted_; notify(); (void)input; (void)output; return nullptr; } bool isListening() const { return isListening_; } uint64_t acceptedCount() const { return accepted_; } private: bool isListening_; uint64_t accepted_; }; /** * Reusing another generated test, just something to serve up */ class ParentHandler : public ParentServiceIf { public: ParentHandler() : generation_(0) {} int32_t incrementGeneration() override { Guard g(mutex_); return ++generation_; } int32_t getGeneration() override { Guard g(mutex_); return generation_; } void addString(const std::string& s) override { Guard g(mutex_); strings_.push_back(s); } void getStrings(std::vector& _return) override { Guard g(mutex_); _return = strings_; } void getDataWait(std::string& _return, const int32_t length) override { THRIFT_UNUSED_VARIABLE(_return); THRIFT_UNUSED_VARIABLE(length); } void onewayWait() override {} void exceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); } void unexpectedExceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); } protected: Mutex mutex_; int32_t generation_; std::vector strings_; }; void autoSocketCloser(TSocket* pSock) { pSock->close(); delete pSock; } template class TServerIntegrationTestFixture { public: TServerIntegrationTestFixture(const shared_ptr& _processorFactory) : pServer(new TServerType(_processorFactory, shared_ptr( new TServerSocket("localhost", 0)), shared_ptr(new TTransportFactory), shared_ptr(new TBinaryProtocolFactory))), pEventHandler(shared_ptr(new TServerReadyEventHandler)), bStressDone(false), bStressConnectionCount(0), bStressRequestCount(0) { pServer->setServerEventHandler(pEventHandler); } TServerIntegrationTestFixture(const shared_ptr& _processor) : pServer( new TServerType(_processor, shared_ptr(new TServerSocket("localhost", 0)), shared_ptr(new TTransportFactory), shared_ptr(new TBinaryProtocolFactory))), pEventHandler(shared_ptr(new TServerReadyEventHandler)), bStressDone(false), bStressConnectionCount(0), bStressRequestCount(0) { pServer->setServerEventHandler(pEventHandler); } void startServer() { pServerThread.reset(new boost::thread(std::bind(&TServerType::serve, pServer.get()))); // block until listen() completes so clients will be able to connect Synchronized sync(*(pEventHandler.get())); while (!pEventHandler->isListening()) { pEventHandler->wait(); } BOOST_TEST_MESSAGE(" server is listening"); } void blockUntilAccepted(uint64_t numAccepted) { Synchronized sync(*(pEventHandler.get())); while (pEventHandler->acceptedCount() < numAccepted) { pEventHandler->wait(); } BOOST_TEST_MESSAGE(boost::format(" server has accepted %1%") % numAccepted); } void stopServer() { if (pServerThread) { pServer->stop(); BOOST_TEST_MESSAGE(" server stop completed"); pServerThread->join(); BOOST_TEST_MESSAGE(" server thread joined"); pServerThread.reset(); } } ~TServerIntegrationTestFixture() { stopServer(); } /** * Performs a baseline test where some clients are opened and issue a single operation * and then disconnect at different intervals. * \param[in] numToMake the number of concurrent clients * \param[in] expectedHWM the high water mark we expect of concurrency * \param[in] purpose a description of the test for logging purposes */ void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) { BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM") % typeid(TServerType).name() % purpose % numToMake % expectedHWM); startServer(); std::vector > holdSockets; std::vector > holdThreads; for (int64_t i = 0; i < numToMake; ++i) { shared_ptr pClientSock(new TSocket("localhost", getServerPort()), autoSocketCloser); holdSockets.push_back(pClientSock); shared_ptr pClientProtocol(new TBinaryProtocol(pClientSock)); ParentServiceClient client(pClientProtocol); pClientSock->open(); client.incrementGeneration(); holdThreads.push_back(shared_ptr( new boost::thread(std::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock, milliseconds(10 * numToMake))))); } BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM()); BOOST_FOREACH (shared_ptr pThread, holdThreads) { pThread->join(); } holdThreads.clear(); holdSockets.clear(); stopServer(); } /** * Helper method used to close a connection after a delay. * \param[in] toClose the connection to close * \param[in] after the delay to impose */ void delayClose(shared_ptr toClose, boost::posix_time::time_duration after) { boost::this_thread::sleep(after); toClose->close(); } /** * \returns the server port number */ int getServerPort() { auto* pSock = dynamic_cast(pServer->getServerTransport().get()); if (!pSock) { throw std::logic_error("how come?"); } return pSock->getPort(); } /** * Performs a stress test by spawning threads that connect, do a number of operations * and disconnect, then a random delay, then do it over again. This is done for a fixed * period of time to test for concurrency correctness. * \param[in] numToMake the number of concurrent clients */ void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) { BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds") % typeid(TServerType).name() % numToMake % duration.total_seconds()); startServer(); std::vector > holdThreads; for (int64_t i = 0; i < numToMake; ++i) { holdThreads.push_back(shared_ptr( new boost::thread(std::bind(&TServerIntegrationTestFixture::stressor, this)))); } boost::this_thread::sleep(duration); bStressDone = true; BOOST_TEST_MESSAGE(boost::format(" serviced %1% connections (HWM %2%) totaling %3% requests") % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount); BOOST_FOREACH (shared_ptr pThread, holdThreads) { pThread->join(); } holdThreads.clear(); BOOST_CHECK(bStressRequestCount > 0); stopServer(); } /** * Helper method to stress the system */ void stressor() { while (!bStressDone) { shared_ptr pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser); shared_ptr pProtocol(new TBinaryProtocol(pSocket)); ParentServiceClient client(pProtocol); pSocket->open(); bStressConnectionCount.fetch_add(1, std::memory_order_relaxed); for (int i = 0; i < rand() % 1000; ++i) { client.incrementGeneration(); bStressRequestCount.fetch_add(1, std::memory_order_relaxed); } } } shared_ptr pServer; shared_ptr pEventHandler; shared_ptr pServerThread; std::atomic bStressDone; std::atomic bStressConnectionCount; std::atomic bStressRequestCount; }; template class TServerIntegrationProcessorFactoryTestFixture : public TServerIntegrationTestFixture { public: TServerIntegrationProcessorFactoryTestFixture() : TServerIntegrationTestFixture(make_shared( make_shared( make_shared()))) {} }; template class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture { public: TServerIntegrationProcessorTestFixture() : TServerIntegrationTestFixture( make_shared(make_shared())) {} }; BOOST_AUTO_TEST_SUITE(constructors) BOOST_FIXTURE_TEST_CASE(test_simple_factory, TServerIntegrationProcessorFactoryTestFixture) { baseline(3, 1, "factory"); } BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture) { baseline(3, 1, "processor"); } BOOST_FIXTURE_TEST_CASE(test_threaded_factory, TServerIntegrationProcessorFactoryTestFixture) { baseline(10, 10, "factory"); } BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture) { baseline(10, 10, "processor"); } BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationProcessorTestFixture) { pServer->setConcurrentClientLimit(4); baseline(10, 4, "limit by server framework"); } BOOST_FIXTURE_TEST_CASE(test_threaded_stress, TServerIntegrationProcessorFactoryTestFixture) { stress(10, boost::posix_time::seconds(3)); } BOOST_FIXTURE_TEST_CASE(test_threadpool_factory, TServerIntegrationProcessorFactoryTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); // thread factory has 4 threads as a default // thread factory however is a bad way to limit concurrent clients // as accept() will be called to grab a 5th client socket, in this case // and then the thread factory will block adding the thread to manage // that client. baseline(10, 5, "limit by thread manager"); } BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); // thread factory has 4 threads as a default // thread factory however is a bad way to limit concurrent clients // as accept() will be called to grab a 5th client socket, in this case // and then the thread factory will block adding the thread to manage // that client. baseline(10, 5, "limit by thread manager"); } BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); pServer->setConcurrentClientLimit(4); baseline(10, 4, "server framework connection limit"); } BOOST_FIXTURE_TEST_CASE(test_threadpool_stress, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); stress(10, boost::posix_time::seconds(3)); } BOOST_AUTO_TEST_SUITE_END() BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest, TServerIntegrationProcessorTestFixture) BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) { // This tests THRIFT-2441 new behavior: stopping the server disconnects clients BOOST_TEST_MESSAGE("Testing stop with interruptable clients"); startServer(); shared_ptr pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock1->open(); shared_ptr pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock2->open(); // Ensure they have been accepted blockUntilAccepted(2); // The test fixture destructor will force the sockets to disconnect // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected stopServer(); // extra proof the server end disconnected the clients uint8_t buf[1]; BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected } BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) { // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients // disconnect. BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients"); dynamic_pointer_cast(pServer->getServerTransport()) ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior startServer(); shared_ptr pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock1->open(); shared_ptr pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock2->open(); // Ensure they have been accepted blockUntilAccepted(2); boost::thread t1(std::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250))); boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250))); // Once the clients disconnect the server will stop stopServer(); BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0); t1.join(); t2.join(); } BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) { startServer(); BOOST_TEST_MESSAGE("Testing the concurrent client limit"); BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit()); pServer->setConcurrentClientLimit(2); BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount()); BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit()); shared_ptr pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock1->open(); blockUntilAccepted(1); BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount()); shared_ptr pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock2->open(); blockUntilAccepted(2); BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); // a third client cannot connect until one of the other two closes boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250))); shared_ptr pClientSock3(new TSocket("localhost", getServerPort()), autoSocketCloser); pClientSock2->open(); blockUntilAccepted(2); BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM()); stopServer(); BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0); t2.join(); } BOOST_AUTO_TEST_SUITE_END()