diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/cpp/test/OneWayHTTPTest.cpp')
-rw-r--r-- | src/jaegertracing/thrift/lib/cpp/test/OneWayHTTPTest.cpp | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/cpp/test/OneWayHTTPTest.cpp b/src/jaegertracing/thrift/lib/cpp/test/OneWayHTTPTest.cpp new file mode 100644 index 000000000..55d919bba --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/OneWayHTTPTest.cpp @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <boost/test/auto_unit_test.hpp> +#include <boost/thread.hpp> +#include <iostream> +#include <climits> +#include <vector> +#include <thrift/concurrency/Monitor.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TJSONProtocol.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/transport/THttpServer.h> +#include <thrift/transport/THttpClient.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <memory> +#include <thrift/transport/TBufferTransports.h> +#include "gen-cpp/OneWayService.h" + +BOOST_AUTO_TEST_SUITE(OneWayHTTPTest) + +using namespace apache::thrift; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TBinaryProtocol; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TJSONProtocol; +using apache::thrift::protocol::TJSONProtocolFactory; +using apache::thrift::server::TThreadedServer; +using apache::thrift::server::TServerEventHandler; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::THttpServer; +using apache::thrift::transport::THttpServerTransportFactory; +using apache::thrift::transport::THttpClient; +using apache::thrift::transport::TBufferedTransport; +using apache::thrift::transport::TBufferedTransportFactory; +using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::transport::TServerSocket; +using apache::thrift::transport::TSocket; +using apache::thrift::transport::TTransportException; +using std::shared_ptr; +using std::cout; +using std::cerr; +using std::endl; +using std::string; +namespace utf = boost::unit_test; + +// Define this env var to enable some logging (in case you need to debug) +#undef ENABLE_STDERR_LOGGING + +class OneWayServiceHandler : public onewaytest::OneWayServiceIf { +public: + OneWayServiceHandler() = default; + + void roundTripRPC() override { +#ifdef ENABLE_STDERR_LOGGING + cerr << "roundTripRPC()" << endl; +#endif + } + void oneWayRPC() override { +#ifdef ENABLE_STDERR_LOGGING + cerr << "oneWayRPC()" << std::endl ; +#endif + } +}; + +class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory { + public: + ~OneWayServiceCloneFactory() override = default; + onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override + { + (void)connInfo ; + return new OneWayServiceHandler; + } + void releaseHandler( onewaytest::OneWayServiceIf* handler) override { + delete handler; + } +}; + +class RPC0ThreadClass { +public: + RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor +~RPC0ThreadClass() = default; // Destructor + +void Run() { + server_.serve() ; +} + TThreadedServer& server_ ; +} ; + +using apache::thrift::concurrency::Monitor; +using apache::thrift::concurrency::Mutex; +using apache::thrift::concurrency::Synchronized; + +// copied from IntegrationTest +class TServerReadyEventHandler : public TServerEventHandler, public Monitor { +public: + TServerReadyEventHandler() : isListening_(false), accepted_(0) {} + ~TServerReadyEventHandler() override = default; + void preServe() override { + Synchronized sync(*this); + isListening_ = true; + notify(); + } + void* createContext(shared_ptr<TProtocol> input, + shared_ptr<TProtocol> output) override { + Synchronized sync(*this); + ++accepted_; + notify(); + + (void)input; + (void)output; + return nullptr; + } + bool isListening() const { return isListening_; } + uint64_t acceptedCount() const { return accepted_; } + +private: + bool isListening_; + uint64_t accepted_; +}; + +class TBlockableBufferedTransport : public TBufferedTransport { + public: + TBlockableBufferedTransport(std::shared_ptr<TTransport> transport) + : TBufferedTransport(transport, 10240), + blocked_(false) { + } + + uint32_t write_buffer_length() { + auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); + return have_bytes ; + } + + void block() { + blocked_ = true ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "block flushing\n" ; +#endif + } + void unblock() { + blocked_ = false ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ; +#endif + } + + void flush() override { + if (blocked_) { +#ifdef ENABLE_STDERR_LOGGING + cerr << "flush was blocked\n" ; +#endif + return ; + } + TBufferedTransport::flush() ; + } + + bool blocked_ ; +} ; + +BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP ) +{ + std::shared_ptr<TServerSocket> ss = std::make_shared<TServerSocket>(0) ; + TThreadedServer server( + std::make_shared<onewaytest::OneWayServiceProcessorFactory>(std::make_shared<OneWayServiceCloneFactory>()), + ss, //port + std::make_shared<THttpServerTransportFactory>(), + std::make_shared<TJSONProtocolFactory>()); + + std::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ; + server.setServerEventHandler(pEventHandler); + +#ifdef ENABLE_STDERR_LOGGING + cerr << "Starting the server...\n"; +#endif + RPC0ThreadClass t(server) ; + boost::thread thread(&RPC0ThreadClass::Run, &t); + + { + Synchronized sync(*(pEventHandler.get())); + while (!pEventHandler->isListening()) { + pEventHandler->wait(); + } + } + + int port = ss->getPort() ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "port " << port << endl ; +#endif + + { + std::shared_ptr<TSocket> socket(new TSocket("localhost", port)); + socket->setRecvTimeout(10000) ; // 1000msec should be enough + std::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket)); + std::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service")); + std::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport)); + onewaytest::OneWayServiceClient client(protocol); + + + transport->open(); + client.roundTripRPC(); + blockable_transport->block() ; + uint32_t size0 = blockable_transport->write_buffer_length() ; + client.send_oneWayRPC() ; + uint32_t size1 = blockable_transport->write_buffer_length() ; + client.send_oneWayRPC() ; + uint32_t size2 = blockable_transport->write_buffer_length() ; + BOOST_CHECK((size1 - size0) == (size2 - size1)) ; + blockable_transport->unblock() ; + client.send_roundTripRPC() ; + blockable_transport->flush() ; + try { + client.recv_roundTripRPC() ; + } catch (const TTransportException &e) { + BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ; + } + transport->close(); + } + server.stop(); + thread.join() ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "finished.\n"; +#endif +} + +BOOST_AUTO_TEST_SUITE_END() |