diff options
Diffstat (limited to 'src/jaegertracing/thrift/test/cpp')
-rwxr-xr-x | src/jaegertracing/thrift/test/cpp/CMakeLists.txt | 97 | ||||
-rwxr-xr-x | src/jaegertracing/thrift/test/cpp/Makefile.am | 125 | ||||
-rw-r--r-- | src/jaegertracing/thrift/test/cpp/src/StressTest.cpp | 605 | ||||
-rw-r--r-- | src/jaegertracing/thrift/test/cpp/src/StressTestNonBlocking.cpp | 542 | ||||
-rw-r--r-- | src/jaegertracing/thrift/test/cpp/src/TestClient.cpp | 1231 | ||||
-rw-r--r-- | src/jaegertracing/thrift/test/cpp/src/TestServer.cpp | 845 | ||||
-rw-r--r-- | src/jaegertracing/thrift/test/cpp/src/ThriftTest_extras.cpp | 33 |
7 files changed, 3478 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/test/cpp/CMakeLists.txt b/src/jaegertracing/thrift/test/cpp/CMakeLists.txt new file mode 100755 index 000000000..90af7826c --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/CMakeLists.txt @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# The test executables still depend on Boost +include(BoostMacros) +REQUIRE_BOOST_HEADERS() +set(BOOST_COMPONENTS filesystem program_options random) +REQUIRE_BOOST_LIBRARIES(BOOST_COMPONENTS) + +# Contains the thrift specific LINK_AGAINST_THRIFT_LIBRARY +include(ThriftMacros) + +find_package(OpenSSL REQUIRED) +include_directories(SYSTEM "${OPENSSL_INCLUDE_DIR}") + +find_package(Libevent REQUIRED) # Libevent comes with CMake support from upstream +include_directories(SYSTEM ${LIBEVENT_INCLUDE_DIRS}) + +find_package(ZLIB REQUIRED) +include_directories(SYSTEM ${ZLIB_INCLUDE_DIRS}) + +#Make sure gen-cpp files can be included +include_directories("${CMAKE_CURRENT_BINARY_DIR}") +include_directories("${CMAKE_CURRENT_BINARY_DIR}/gen-cpp") +include_directories("${PROJECT_SOURCE_DIR}/lib/cpp/src") + +set(crosstestgencpp_SOURCES + gen-cpp/SecondService.cpp + gen-cpp/ThriftTest.cpp + gen-cpp/ThriftTest_types.cpp + gen-cpp/ThriftTest_constants.cpp + src/ThriftTest_extras.cpp +) +add_library(crosstestgencpp STATIC ${crosstestgencpp_SOURCES}) +LINK_AGAINST_THRIFT_LIBRARY(crosstestgencpp thrift) + +set(crossstressgencpp_SOURCES + gen-cpp/Service.cpp + gen-cpp/StressTest_types.cpp + gen-cpp/StressTest_constants.cpp +) +add_library(crossstressgencpp STATIC ${crossstressgencpp_SOURCES}) +LINK_AGAINST_THRIFT_LIBRARY(crossstressgencpp thrift) + +add_executable(TestServer src/TestServer.cpp) +target_link_libraries(TestServer crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +LINK_AGAINST_THRIFT_LIBRARY(TestServer thrift) +LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftnb) +LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftz) + +add_executable(TestClient src/TestClient.cpp) +target_link_libraries(TestClient crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +LINK_AGAINST_THRIFT_LIBRARY(TestClient thrift) +LINK_AGAINST_THRIFT_LIBRARY(TestClient thriftnb) +LINK_AGAINST_THRIFT_LIBRARY(TestClient thriftz) + +add_executable(StressTest src/StressTest.cpp) +target_link_libraries(StressTest crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +LINK_AGAINST_THRIFT_LIBRARY(StressTest thrift) +LINK_AGAINST_THRIFT_LIBRARY(StressTest thriftnb) +add_test(NAME StressTest COMMAND StressTest) +add_test(NAME StressTestConcurrent COMMAND StressTest --client-type=concurrent) + +add_executable(StressTestNonBlocking src/StressTestNonBlocking.cpp) +target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thrift) +LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftnb) +LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftz) +add_test(NAME StressTestNonBlocking COMMAND StressTestNonBlocking) + +# +# Common thrift code generation rules +# + +add_custom_command(OUTPUT gen-cpp/SecondService.cpp gen-cpp/SecondService.h gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest.h gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp + COMMAND ${THRIFT_COMPILER} --gen cpp:templates,cob_style -r ${PROJECT_SOURCE_DIR}/test/ThriftTest.thrift +) + +add_custom_command(OUTPUT gen-cpp/StressTest_types.cpp gen-cpp/StressTest_constants.cpp gen-cpp/Service.cpp + COMMAND ${THRIFT_COMPILER} --gen cpp ${PROJECT_SOURCE_DIR}/test/StressTest.thrift +) diff --git a/src/jaegertracing/thrift/test/cpp/Makefile.am b/src/jaegertracing/thrift/test/cpp/Makefile.am new file mode 100755 index 000000000..76ae82bfb --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/Makefile.am @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +AUTOMAKE_OPTIONS = subdir-objects serial-tests nostdinc + +BUILT_SOURCES = gen-cpp/ThriftTest.cpp \ + gen-cpp/ThriftTest_types.cpp \ + gen-cpp/ThriftTest_constants.cpp \ + gen-cpp/SecondService.cpp \ + gen-cpp/StressTest_types.cpp \ + gen-cpp/StressTest_constants.cpp \ + gen-cpp/Service.cpp + +noinst_LTLIBRARIES = libtestgencpp.la libstresstestgencpp.la +nodist_libtestgencpp_la_SOURCES = \ + gen-cpp/SecondService.cpp \ + gen-cpp/SecondService.h \ + gen-cpp/SecondService.tcc \ + gen-cpp/ThriftTest_constants.cpp \ + gen-cpp/ThriftTest_constants.h \ + gen-cpp/ThriftTest_types.cpp \ + gen-cpp/ThriftTest_types.h \ + gen-cpp/ThriftTest_types.tcc \ + gen-cpp/ThriftTest.cpp \ + gen-cpp/ThriftTest.h \ + gen-cpp/ThriftTest.tcc \ + src/ThriftTest_extras.cpp + +libtestgencpp_la_LIBADD = $(top_builddir)/lib/cpp/libthrift.la + +nodist_libstresstestgencpp_la_SOURCES = \ + gen-cpp/StressTest_constants.cpp \ + gen-cpp/StressTest_types.cpp \ + gen-cpp/StressTest_constants.h \ + gen-cpp/StressTest_types.h \ + gen-cpp/Service.cpp \ + gen-cpp/Service.h + +libstresstestgencpp_la_LIBADD = $(top_builddir)/lib/cpp/libthrift.la + +precross: TestServer TestClient + +check_PROGRAMS = \ + TestServer \ + TestClient \ + StressTest \ + StressTestNonBlocking + +# we currently do not run the testsuite, stop c++ server issue +# TESTS = \ +# $(check_PROGRAMS) + +TestServer_SOURCES = \ + src/TestServer.cpp + +TestServer_LDADD = \ + libtestgencpp.la \ + $(top_builddir)/lib/cpp/libthrift.la \ + $(top_builddir)/lib/cpp/libthriftz.la \ + $(top_builddir)/lib/cpp/libthriftnb.la \ + -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS) + +TestClient_SOURCES = \ + src/TestClient.cpp + +TestClient_LDADD = \ + libtestgencpp.la \ + $(top_builddir)/lib/cpp/libthrift.la \ + $(top_builddir)/lib/cpp/libthriftz.la \ + $(top_builddir)/lib/cpp/libthriftnb.la \ + -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS) + +StressTest_SOURCES = \ + src/StressTest.cpp + +StressTest_LDADD = \ + libstresstestgencpp.la \ + $(top_builddir)/lib/cpp/libthrift.la + +StressTestNonBlocking_SOURCES = \ + src/StressTestNonBlocking.cpp + +StressTestNonBlocking_LDADD = \ + libstresstestgencpp.la \ + $(top_builddir)/lib/cpp/libthriftnb.la \ + -levent +# +# Common thrift code generation rules +# +gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/SecondService.cpp gen-cpp/SecondService.h gen-cpp/SecondService.tcc: $(top_srcdir)/test/ThriftTest.thrift $(THRIFT) + $(THRIFT) --gen cpp:templates,cob_style -r $< + +gen-cpp/StressTest_types.cpp gen-cpp/StressTest_constants.cpp gen-cpp/Service.cpp: $(top_srcdir)/test/StressTest.thrift $(THRIFT) + $(THRIFT) --gen cpp $< + +AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(LIBEVENT_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -Igen-cpp -I. +AM_CXXFLAGS = -Wall -Wextra -pedantic -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS +AM_LDFLAGS = $(BOOST_LDFLAGS) $(LIBEVENT_LDFLAGS) $(ZLIB_LIBS) + +clean-local: + $(RM) -r gen-cpp/ + +style-local: + $(CPPSTYLE_CMD) + +EXTRA_DIST = \ + src/TestClient.cpp \ + src/TestServer.cpp \ + src/StressTest.cpp \ + src/StressTestNonBlocking.cpp diff --git a/src/jaegertracing/thrift/test/cpp/src/StressTest.cpp b/src/jaegertracing/thrift/test/cpp/src/StressTest.cpp new file mode 100644 index 000000000..79a708e8f --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/src/StressTest.cpp @@ -0,0 +1,605 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/concurrency/ThreadManager.h> +#include <thrift/concurrency/ThreadFactory.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Mutex.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/server/TSimpleServer.h> +#include <thrift/server/TThreadPoolServer.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TTransportUtils.h> +#include <thrift/transport/TFileTransport.h> +#include <thrift/TLogging.h> + +#include "Service.h" +#include <iostream> +#include <set> +#include <stdexcept> +#include <sstream> +#include <map> +#if _WIN32 +#include <thrift/windows/TWinsockSingleton.h> +#endif + +using namespace std; + +using namespace apache::thrift; +using namespace apache::thrift::async; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; +using namespace apache::thrift::concurrency; + +using namespace test::stress; + +struct eqstr { + bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) == 0; } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) < 0; } +}; + +// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map; +typedef map<const char*, int, ltstr> count_map; + +class Server : public ServiceIf { +public: + Server() = default; + + void count(const char* method) { + Guard m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() override { + count("echoVoid"); + return; + } + + count_map getCount() { + Guard m(lock_); + return counts_; + } + + int8_t echoByte(const int8_t arg) override { return arg; } + int32_t echoI32(const int32_t arg) override { return arg; } + int64_t echoI64(const int64_t arg) override { return arg; } + void echoString(string& out, const string& arg) override { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str()); + } + out = arg; + } + void echoList(vector<int8_t>& out, const vector<int8_t>& arg) override { out = arg; } + void echoSet(set<int8_t>& out, const set<int8_t>& arg) override { out = arg; } + void echoMap(map<int8_t, int8_t>& out, const map<int8_t, int8_t>& arg) override { out = arg; } + +private: + count_map counts_; + Mutex lock_; +}; + +enum TransportOpenCloseBehavior { + OpenAndCloseTransportInThread, + DontOpenAndCloseTransportInThread +}; +class ClientThread : public Runnable { +public: + ClientThread(std::shared_ptr<TTransport> transport, + std::shared_ptr<ServiceIf> client, + Monitor& monitor, + size_t& workerCount, + size_t loopCount, + TType loopType, + TransportOpenCloseBehavior behavior) + : _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount), + _loopType(loopType), + _behavior(behavior) {} + + void run() override { + + // Wait for all worker threads to start + + { + Synchronized s(_monitor); + while (_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + if(_behavior == OpenAndCloseTransportInThread) { + _transport->open(); + } + + switch (_loopType) { + case T_VOID: + loopEchoVoid(); + break; + case T_BYTE: + loopEchoByte(); + break; + case T_I32: + loopEchoI32(); + break; + case T_I64: + loopEchoI64(); + break; + case T_STRING: + loopEchoString(); + break; + default: + cerr << "Unexpected loop type" << _loopType << endl; + break; + } + + _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + if(_behavior == OpenAndCloseTransportInThread) { + _transport->close(); + } + + _done = true; + + { + Synchronized s(_monitor); + + _workerCount--; + + if (_workerCount == 0) { + + _monitor.notify(); + } + } + } + + void loopEchoVoid() { + for (size_t ix = 0; ix < _loopCount; ix++) { + _client->echoVoid(); + } + } + + void loopEchoByte() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int8_t arg = 1; + int8_t result; + result = _client->echoByte(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoI32() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int32_t arg = 1; + int32_t result; + result = _client->echoI32(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoI64() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int64_t arg = 1; + int64_t result; + result = _client->echoI64(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoString() { + for (size_t ix = 0; ix < _loopCount; ix++) { + string arg = "hello"; + string result; + _client->echoString(result, arg); + assert(result == arg); + } + } + + std::shared_ptr<TTransport> _transport; + std::shared_ptr<ServiceIf> _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + TType _loopType; + int64_t _startTime; + int64_t _endTime; + bool _done; + Monitor _sleep; + TransportOpenCloseBehavior _behavior; +}; + +class TStartObserver : public apache::thrift::server::TServerEventHandler { +public: + TStartObserver() : awake_(false) {} + void preServe() override { + apache::thrift::concurrency::Synchronized s(m_); + awake_ = true; + m_.notifyAll(); + } + void waitForService() { + apache::thrift::concurrency::Synchronized s(m_); + while (!awake_) + m_.waitForever(); + } + +private: + apache::thrift::concurrency::Monitor m_; + bool awake_; +}; + +int main(int argc, char** argv) { +#if _WIN32 + transport::TWinsockSingleton::create(); +#endif + + int port = 9091; + string clientType = "regular"; + string serverType = "thread-pool"; + string protocolType = "binary"; + size_t workerCount = 8; + size_t clientCount = 4; + size_t loopCount = 50000; + TType loopType = T_VOID; + string callName = "echoVoid"; + bool runServer = true; + bool logRequests = false; + string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; + + ostringstream usage; + + usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] " + "[--protocol-type=<protocol-type>] [--workers=<worker-count>] " + "[--clients=<client-count>] [--loop=<loop-count>] " + "[--client-type=<client-type>]" << endl + << "\tclients Number of client threads to create - 0 implies no clients, i.e. " + "server only. Default is " << clientCount << endl + << "\thelp Prints this help text." << endl + << "\tcall Service method to call. Default is " << callName << endl + << "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl + << "\tport The port the server and clients should bind to " + "for thrift network connections. Default is " << port << endl + << "\tserver Run the Thrift server in this process. Default is " << runServer << endl + << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl + << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl + << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl + << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl + << "\tworkers Number of thread pools workers. Only valid " + "for thread-pool server type. Default is " << workerCount << endl + << "\tclient-type Type of client, \"regular\" or \"concurrent\". Default is " << clientType << endl + << endl; + + map<string, string> args; + + for (int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if (arg.compare(0, 2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + string key = string(arg, 2, end - 2); + + if (end != string::npos) { + args[key] = string(arg, end + 1); + } else { + args[key] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: " + arg); + } + } + + try { + + if (!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if (!args["help"].empty()) { + cerr << usage.str(); + return 0; + } + + if (!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + + if (!args["call"].empty()) { + callName = args["call"]; + } + + if (!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if (!args["server"].empty()) { + runServer = args["server"] == "true"; + } + + if (!args["log-request"].empty()) { + logRequests = args["log-request"] == "true"; + } + + if (!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + + if (serverType == "simple") { + + } else if (serverType == "thread-pool") { + + } else if (serverType == "threaded") { + + } else { + + throw invalid_argument("Unknown server type " + serverType); + } + } + if (!args["client-type"].empty()) { + clientType = args["client-type"]; + + if (clientType == "regular") { + + } else if (clientType == "concurrent") { + + } else { + + throw invalid_argument("Unknown client type " + clientType); + } + } + if (!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + } catch (std::exception& e) { + cerr << e.what() << endl; + cerr << usage.str(); + } + + std::shared_ptr<ThreadFactory> threadFactory + = std::shared_ptr<ThreadFactory>(new ThreadFactory()); + + // Dispatcher + std::shared_ptr<Server> serviceHandler(new Server()); + + if (replayRequests) { + std::shared_ptr<Server> serviceHandler(new Server()); + std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + if (runServer) { + + std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transport + std::shared_ptr<TServerSocket> serverSocket(new TServerSocket(port)); + + // Transport Factory + std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); + + // Protocol Factory + std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + if (logRequests) { + // initialize the log file + std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + + transportFactory + = std::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport)); + } + + std::shared_ptr<TServer> server; + + if (serverType == "simple") { + + server.reset( + new TSimpleServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)); + + } else if (serverType == "threaded") { + + server.reset( + new TThreadedServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)); + + } else if (serverType == "thread-pool") { + + std::shared_ptr<ThreadManager> threadManager + = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + threadManager->start(); + server.reset(new TThreadPoolServer(serviceProcessor, + serverSocket, + transportFactory, + protocolFactory, + threadManager)); + } + + std::shared_ptr<TStartObserver> observer(new TStartObserver); + server->setServerEventHandler(observer); + std::shared_ptr<Thread> serverThread = threadFactory->newThread(server); + + cerr << "Starting the server on port " << port << endl; + + serverThread->start(); + observer->waitForService(); + + // If we aren't running clients, just wait forever for external clients + if (clientCount == 0) { + serverThread->join(); + } + } + + if (clientCount > 0) { //FIXME: start here for client type? + + Monitor monitor; + + size_t threadCount = 0; + + set<std::shared_ptr<Thread> > clientThreads; + + if (callName == "echoVoid") { + loopType = T_VOID; + } else if (callName == "echoByte") { + loopType = T_BYTE; + } else if (callName == "echoI32") { + loopType = T_I32; + } else if (callName == "echoI64") { + loopType = T_I64; + } else if (callName == "echoString") { + loopType = T_STRING; + } else { + throw invalid_argument("Unknown service call " + callName); + } + + if(clientType == "regular") { + for (size_t ix = 0; ix < clientCount; ix++) { + + std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port)); + std::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048)); + std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket)); + std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol)); + + clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>( + new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, OpenAndCloseTransportInThread)))); + } + } else if(clientType == "concurrent") { + std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port)); + std::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048)); + std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket)); + auto sync = std::make_shared<TConcurrentClientSyncInfo>(); + std::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol, sync)); + socket->open(); + for (size_t ix = 0; ix < clientCount; ix++) { + clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>( + new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, DontOpenAndCloseTransportInThread)))); + } + } + + for (auto thread = clientThreads.begin(); + thread != clientThreads.end(); + thread++) { + (*thread)->start(); + } + + int64_t time00; + int64_t time01; + + { + Synchronized s(monitor); + threadCount = clientCount; + + cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl; + + time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + monitor.notifyAll(); + + while (threadCount > 0) { + monitor.wait(); + } + + time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + } + + int64_t firstTime = 9223372036854775807LL; + int64_t lastTime = 0; + + double averageTime = 0; + int64_t minTime = 9223372036854775807LL; + int64_t maxTime = 0; + + for (auto ix = clientThreads.begin(); + ix != clientThreads.end(); + ix++) { + + std::shared_ptr<ClientThread> client + = std::dynamic_pointer_cast<ClientThread>((*ix)->runnable()); + + int64_t delta = client->_endTime - client->_startTime; + + assert(delta > 0); + + if (client->_startTime < firstTime) { + firstTime = client->_startTime; + } + + if (client->_endTime > lastTime) { + lastTime = client->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime += delta; + } + + averageTime /= clientCount; + + cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount + << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } + cerr << "done." << endl; + } + + return 0; +} diff --git a/src/jaegertracing/thrift/test/cpp/src/StressTestNonBlocking.cpp b/src/jaegertracing/thrift/test/cpp/src/StressTestNonBlocking.cpp new file mode 100644 index 000000000..e94ecb2db --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/src/StressTestNonBlocking.cpp @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/concurrency/ThreadManager.h> +#include <thrift/concurrency/ThreadFactory.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Mutex.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/server/TSimpleServer.h> +#include <thrift/server/TThreadPoolServer.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/server/TNonblockingServer.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TNonblockingServerSocket.h> +#include <thrift/transport/TTransportUtils.h> +#include <thrift/transport/TFileTransport.h> +#include <thrift/TLogging.h> + +#include "Service.h" + +#include <iostream> +#include <set> +#include <stdexcept> +#include <sstream> +#include <map> +#if _WIN32 +#include <thrift/windows/TWinsockSingleton.h> +#endif + +using namespace std; + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; +using namespace apache::thrift::concurrency; + +using namespace test::stress; + +struct eqstr { + bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) == 0; } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) < 0; } +}; + +// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map; +typedef map<const char*, int, ltstr> count_map; + +class Server : public ServiceIf { +public: + Server() = default; + + void count(const char* method) { + Guard m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() override { + count("echoVoid"); + // Sleep to simulate work + THRIFT_SLEEP_USEC(1); + return; + } + + count_map getCount() { + Guard m(lock_); + return counts_; + } + + int8_t echoByte(const int8_t arg) override { return arg; } + int32_t echoI32(const int32_t arg) override { return arg; } + int64_t echoI64(const int64_t arg) override { return arg; } + void echoString(string& out, const string& arg) override { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str()); + } + out = arg; + } + void echoList(vector<int8_t>& out, const vector<int8_t>& arg) override { out = arg; } + void echoSet(set<int8_t>& out, const set<int8_t>& arg) override { out = arg; } + void echoMap(map<int8_t, int8_t>& out, const map<int8_t, int8_t>& arg) override { out = arg; } + +private: + count_map counts_; + Mutex lock_; +}; + +class ClientThread : public Runnable { +public: + ClientThread(std::shared_ptr<TTransport> transport, + std::shared_ptr<ServiceClient> client, + Monitor& monitor, + size_t& workerCount, + size_t loopCount, + TType loopType) + : _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount), + _loopType(loopType) {} + + void run() override { + + // Wait for all worker threads to start + + { + Synchronized s(_monitor); + while (_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + _transport->open(); + + switch (_loopType) { + case T_VOID: + loopEchoVoid(); + break; + case T_BYTE: + loopEchoByte(); + break; + case T_I32: + loopEchoI32(); + break; + case T_I64: + loopEchoI64(); + break; + case T_STRING: + loopEchoString(); + break; + default: + cerr << "Unexpected loop type" << _loopType << endl; + break; + } + + _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + _transport->close(); + + _done = true; + + { + Synchronized s(_monitor); + + _workerCount--; + + if (_workerCount == 0) { + + _monitor.notify(); + } + } + } + + void loopEchoVoid() { + for (size_t ix = 0; ix < _loopCount; ix++) { + _client->echoVoid(); + } + } + + void loopEchoByte() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int8_t arg = 1; + int8_t result; + result = _client->echoByte(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoI32() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int32_t arg = 1; + int32_t result; + result = _client->echoI32(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoI64() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int64_t arg = 1; + int64_t result; + result = _client->echoI64(arg); + (void)result; + assert(result == arg); + } + } + + void loopEchoString() { + for (size_t ix = 0; ix < _loopCount; ix++) { + string arg = "hello"; + string result; + _client->echoString(result, arg); + assert(result == arg); + } + } + + std::shared_ptr<TTransport> _transport; + std::shared_ptr<ServiceClient> _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + TType _loopType; + int64_t _startTime; + int64_t _endTime; + bool _done; + Monitor _sleep; +}; + +int main(int argc, char** argv) { +#if _WIN32 + transport::TWinsockSingleton::create(); +#endif + + int port = 9091; + string serverType = "simple"; + string protocolType = "binary"; + uint32_t workerCount = 4; + uint32_t clientCount = 20; + uint32_t loopCount = 1000; + TType loopType = T_VOID; + string callName = "echoVoid"; + bool runServer = true; + bool logRequests = false; + string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; + + ostringstream usage; + + usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] " + "[--protocol-type=<protocol-type>] [--workers=<worker-count>] " + "[--clients=<client-count>] [--loop=<loop-count>]" << endl + << "\tclients Number of client threads to create - 0 implies no clients, i.e. " + "server only. Default is " << clientCount << endl + << "\thelp Prints this help text." << endl + << "\tcall Service method to call. Default is " << callName << endl + << "\tloop The number of remote thrift calls each client makes. Default is " + << loopCount << endl << "\tport The port the server and clients should bind to " + "for thrift network connections. Default is " << port << endl + << "\tserver Run the Thrift server in this process. Default is " << runServer + << endl << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " + << serverType << endl + << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " + << protocolType << endl + << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests + << endl << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " + << replayRequests << endl << "\tworkers Number of thread pools workers. Only valid " + "for thread-pool server type. Default is " << workerCount + << endl; + + map<string, string> args; + + for (int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if (arg.compare(0, 2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + string key = string(arg, 2, end - 2); + + if (end != string::npos) { + args[key] = string(arg, end + 1); + } else { + args[key] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: " + arg); + } + } + + try { + + if (!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if (!args["help"].empty()) { + cerr << usage.str(); + return 0; + } + + if (!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + + if (!args["call"].empty()) { + callName = args["call"]; + } + + if (!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if (!args["server"].empty()) { + runServer = args["server"] == "true"; + } + + if (!args["log-request"].empty()) { + logRequests = args["log-request"] == "true"; + } + + if (!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + } + + if (!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + } catch (std::exception& e) { + cerr << e.what() << endl; + cerr << usage.str(); + } + + std::shared_ptr<ThreadFactory> threadFactory + = std::shared_ptr<ThreadFactory>(new ThreadFactory()); + + // Dispatcher + std::shared_ptr<Server> serviceHandler(new Server()); + + if (replayRequests) { + std::shared_ptr<Server> serviceHandler(new Server()); + std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + if (runServer) { + + std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Protocol Factory + std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + // Transport Factory + std::shared_ptr<TTransportFactory> transportFactory; + + if (logRequests) { + // initialize the log file + std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + + transportFactory + = std::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport)); + } + + std::shared_ptr<Thread> serverThread; + std::shared_ptr<Thread> serverThread2; + std::shared_ptr<transport::TNonblockingServerSocket> nbSocket1; + std::shared_ptr<transport::TNonblockingServerSocket> nbSocket2; + + if (serverType == "simple") { + + nbSocket1.reset(new transport::TNonblockingServerSocket(port)); + serverThread = threadFactory->newThread(std::shared_ptr<TServer>( + new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1))); + nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1)); + serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>( + new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2))); + + } else if (serverType == "thread-pool") { + + std::shared_ptr<ThreadManager> threadManager + = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + threadManager->start(); + nbSocket1.reset(new transport::TNonblockingServerSocket(port)); + serverThread = threadFactory->newThread(std::shared_ptr<TServer>( + new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1, threadManager))); + nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1)); + serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>( + new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2, threadManager))); + } + + cerr << "Starting the server on port " << port << " and " << (port + 1) << endl; + serverThread->start(); + serverThread2->start(); + + // If we aren't running clients, just wait forever for external clients + + if (clientCount == 0) { + serverThread->join(); + serverThread2->join(); + } + } + THRIFT_SLEEP_SEC(1); + + if (clientCount > 0) { + + Monitor monitor; + + size_t threadCount = 0; + + set<std::shared_ptr<Thread> > clientThreads; + + if (callName == "echoVoid") { + loopType = T_VOID; + } else if (callName == "echoByte") { + loopType = T_BYTE; + } else if (callName == "echoI32") { + loopType = T_I32; + } else if (callName == "echoI64") { + loopType = T_I64; + } else if (callName == "echoString") { + loopType = T_STRING; + } else { + throw invalid_argument("Unknown service call " + callName); + } + + for (uint32_t ix = 0; ix < clientCount; ix++) { + + std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2))); + std::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket)); + std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket)); + std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol)); + + clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>( + new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType)))); + } + + for (auto thread = clientThreads.begin(); + thread != clientThreads.end(); + thread++) { + (*thread)->start(); + } + + int64_t time00; + int64_t time01; + + { + Synchronized s(monitor); + threadCount = clientCount; + + cerr << "Launch " << clientCount << " client threads" << endl; + + time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + monitor.notifyAll(); + + while (threadCount > 0) { + monitor.wait(); + } + + time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + } + + int64_t firstTime = 9223372036854775807LL; + int64_t lastTime = 0; + + double averageTime = 0; + int64_t minTime = 9223372036854775807LL; + int64_t maxTime = 0; + + for (auto ix = clientThreads.begin(); + ix != clientThreads.end(); + ix++) { + + std::shared_ptr<ClientThread> client + = std::dynamic_pointer_cast<ClientThread>((*ix)->runnable()); + + int64_t delta = client->_endTime - client->_startTime; + + assert(delta > 0); + + if (client->_startTime < firstTime) { + firstTime = client->_startTime; + } + + if (client->_endTime > lastTime) { + lastTime = client->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime += delta; + } + + averageTime /= clientCount; + + cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount + << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } + cerr << "done." << endl; + } + + return 0; +} diff --git a/src/jaegertracing/thrift/test/cpp/src/TestClient.cpp b/src/jaegertracing/thrift/test/cpp/src/TestClient.cpp new file mode 100644 index 000000000..c4146cc5c --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/src/TestClient.cpp @@ -0,0 +1,1231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <limits> +#include <locale> +#include <ios> +#include <iostream> +#include <sstream> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TCompactProtocol.h> +#include <thrift/protocol/THeaderProtocol.h> +#include <thrift/protocol/TJSONProtocol.h> +#include <thrift/protocol/TMultiplexedProtocol.h> +#include <thrift/transport/THttpClient.h> +#include <thrift/transport/TTransportUtils.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TSSLSocket.h> +#include <thrift/transport/TZlibTransport.h> +#include <thrift/async/TEvhttpClientChannel.h> +#include <thrift/server/TNonblockingServer.h> // <event.h> + +#ifdef HAVE_STDINT_H +#include <stdint.h> +#endif +#ifdef HAVE_INTTYPES_H +#include <inttypes.h> +#endif + +#include <boost/algorithm/string.hpp> +#include <boost/filesystem.hpp> +#include <boost/program_options.hpp> +#include <boost/random/random_device.hpp> +#if _WIN32 +#include <thrift/windows/TWinsockSingleton.h> +#endif + +#include "SecondService.h" +#include "ThriftTest.h" + +using namespace std; +using namespace apache::thrift; +using namespace apache::thrift::async; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace thrift::test; + +// +// A pedantic protocol that checks to make sure the response sequence ID +// is the same as the sent sequence ID. lib/cpp always sends zero for +// synchronous clients, so this bumps the number to make sure it gets +// returned properly from the remote server. Any server that does not +// respond with the same sequence number is violating the sequence ID +// agreement between client and server. +// + +template<typename Proto> +class TPedanticProtocol : public Proto +{ + public: + TPedanticProtocol(std::shared_ptr<TTransport>& transport) + : Proto(transport), m_last_seqid((std::numeric_limits<int32_t>::max)() - 10) { } + + virtual uint32_t writeMessageBegin_virt(const std::string& name, + const TMessageType messageType, + const int32_t in_seqid) override + { + int32_t seqid = in_seqid; + if (!seqid) { // this is typical for normal cpp generated code + seqid = ++m_last_seqid; + } + + return Proto::writeMessageBegin_virt(name, messageType, seqid); + } + + virtual uint32_t readMessageBegin_virt(std::string& name, + TMessageType& messageType, + int32_t& seqid) override + { + uint32_t result = Proto::readMessageBegin_virt(name, messageType, seqid); + if (seqid != m_last_seqid) { + std::stringstream ss; + ss << "ERROR: send request with seqid " << m_last_seqid << " and got reply with seqid " << seqid; + throw std::logic_error(ss.str()); + } /* else { + std::cout << "verified seqid " << m_last_seqid << " round trip OK" << std::endl; + } */ + return result; + } + + private: + int32_t m_last_seqid; +}; + +// Current time, microseconds since the epoch +uint64_t now() { + int64_t ret; + struct timeval tv; + + THRIFT_GETTIMEOFDAY(&tv, nullptr); + ret = tv.tv_sec; + ret = ret * 1000 * 1000 + tv.tv_usec; + return ret; +} + +static void testString_clientReturn(event_base* base, + int testNr, + ThriftTestCobClient* client) { + try { + string s; + client->recv_testString(s); + std::ostringstream os; + os << "test" << testNr; + const bool ok = (s == os.str()); + cout << "testString: " << s << " " << ((ok) ? "ok" : "failed") << endl; + } catch (TException& exn) { + cout << "Error: " << exn.what() << endl; + } + + if (testNr == 9) + event_base_loopbreak(base); // end test +} + +static void testVoid_clientReturn(event_base* base, ThriftTestCobClient* client) { + try { + client->recv_testVoid(); + cout << "testVoid" << endl; + + for (int testNr = 0; testNr < 10; ++testNr) { + std::ostringstream os; + os << "test" << testNr; + client->testString(std::bind(testString_clientReturn, + base, + testNr, + std::placeholders::_1), + os.str()); + } + } catch (TException& exn) { + cout << "Error: " << exn.what() << endl; + } +} + +// Workaround for absense of C++11 "auto" keyword. +template <typename T> +bool print_eq(T expected, T actual) { + cout << "(" << actual << ")" << endl; + if (expected != actual) { + cout << "*** FAILED ***" << endl << "Expected: " << expected << " but got: " << actual << endl; + return false; + } + return true; +} + +#define BASETYPE_IDENTITY_TEST(func, value) \ + cout << #func "(" << value << ") = "; \ + try { \ + if (!print_eq(value, testClient.func(value))) \ + return_code |= ERR_BASETYPES; \ + } catch (TTransportException&) { \ + throw; \ + } catch (exception & ex) { \ + cout << "*** FAILED ***" << endl << ex.what() << endl; \ + return_code |= ERR_BASETYPES; \ + } + +int binary_test(ThriftTestClient& testClient, string::size_type siz); + +BOOST_CONSTEXPR_OR_CONST int ERR_BASETYPES = 1; +BOOST_CONSTEXPR_OR_CONST int ERR_STRUCTS = 2; +BOOST_CONSTEXPR_OR_CONST int ERR_CONTAINERS = 4; +BOOST_CONSTEXPR_OR_CONST int ERR_EXCEPTIONS = 8; +BOOST_CONSTEXPR_OR_CONST int ERR_UNKNOWN = 64; + +int main(int argc, char** argv) { + cout.precision(19); + + string testDir = boost::filesystem::system_complete(argv[0]).parent_path().parent_path().parent_path().string(); + string caPath = testDir + "/keys/CA.pem"; + string certPath = testDir + "/keys/client.crt"; + string keyPath = testDir + "/keys/client.key"; + +#if _WIN32 + transport::TWinsockSingleton::create(); +#endif + string host = "localhost"; + int port = 9090; + int numTests = 1; + bool ssl = false; + bool zlib = false; + string transport_type = "buffered"; + string protocol_type = "binary"; + string domain_socket = ""; + bool abstract_namespace = false; + bool noinsane = false; + + int return_code = 0; + + boost::program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("host", + boost::program_options::value<string>(&host)->default_value(host), + "Host to connect") + ("port", + boost::program_options::value<int>(&port)->default_value(port), + "Port number to connect") + ("domain-socket", + boost::program_options::value<string>(&domain_socket)->default_value(domain_socket), + "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port") + ("abstract-namespace", + "Look for the domain socket in the Abstract Namespace" + " (no connection with filesystem pathnames)") + ("transport", + boost::program_options::value<string>(&transport_type)->default_value(transport_type), + "Transport: buffered, framed, http, evhttp, zlib") + ("protocol", + boost::program_options::value<string>(&protocol_type)->default_value(protocol_type), + "Protocol: binary, compact, header, json, multi, multic, multih, multij") + ("ssl", + "Encrypted Transport using SSL") + ("zlib", + "Wrap Transport with Zlib") + ("testloops,n", + boost::program_options::value<int>(&numTests)->default_value(numTests), + "Number of Tests") + ("noinsane", + "Do not run insanity test"); + + boost::program_options::variables_map vm; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); + boost::program_options::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return ERR_UNKNOWN; + } + + try { + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "compact") { + } else if (protocol_type == "header") { + } else if (protocol_type == "json") { + } else if (protocol_type == "multi") { + } else if (protocol_type == "multic") { + } else if (protocol_type == "multih") { + } else if (protocol_type == "multij") { + } else { + throw invalid_argument("Unknown protocol type " + protocol_type); + } + } + + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { + } else if (transport_type == "evhttp") { + } else if (transport_type == "zlib") { + // crosstest will pass zlib as a transport and as a flag right now.. + } else { + throw invalid_argument("Unknown transport type " + transport_type); + } + } + + } catch (exception& e) { + cerr << e.what() << endl; + cout << desc << endl; + return ERR_UNKNOWN; + } + + if (vm.count("ssl")) { + ssl = true; + } + + if (vm.count("zlib")) { + zlib = true; + } + + if (vm.count("abstract-namespace")) { + abstract_namespace = true; + } + + if (vm.count("noinsane")) { + noinsane = true; + } + + // THRIFT-4164: The factory MUST outlive any sockets it creates for correct behavior! + std::shared_ptr<TSSLSocketFactory> factory; + std::shared_ptr<TSocket> socket; + std::shared_ptr<TTransport> transport; + std::shared_ptr<TProtocol> protocol; + std::shared_ptr<TProtocol> protocol2; // SecondService for multiplexed + + if (ssl) { + cout << "Client Certificate File: " << certPath << endl; + cout << "Client Key File: " << keyPath << endl; + cout << "CA File: " << caPath << endl; + + factory = std::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory()); + factory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); + factory->loadTrustedCertificates(caPath.c_str()); + factory->loadCertificate(certPath.c_str()); + factory->loadPrivateKey(keyPath.c_str()); + factory->authenticate(true); + socket = factory->createSocket(host, port); + } else { + if (domain_socket != "") { + if (abstract_namespace) { + std::string abstract_socket("\0", 1); + abstract_socket += domain_socket; + socket = std::shared_ptr<TSocket>(new TSocket(abstract_socket)); + } else { + socket = std::shared_ptr<TSocket>(new TSocket(domain_socket)); + } + port = 0; + } else { + socket = std::shared_ptr<TSocket>(new TSocket(host, port)); + } + } + + if (transport_type.compare("http") == 0) { + transport = std::make_shared<THttpClient>(socket, host, "/service"); + } else if (transport_type.compare("framed") == 0) { + transport = std::make_shared<TFramedTransport>(socket); + } else { + transport = std::make_shared<TBufferedTransport>(socket); + } + + if (zlib) { + transport = std::make_shared<TZlibTransport>(transport); + } + + if (protocol_type == "json" || protocol_type == "multij") { + typedef TPedanticProtocol<TJSONProtocol> TPedanticJSONProtocol; + protocol = std::make_shared<TPedanticJSONProtocol>(transport); + } else if (protocol_type == "compact" || protocol_type == "multic") { + typedef TPedanticProtocol<TCompactProtocol> TPedanticCompactProtocol; + protocol = std::make_shared<TPedanticCompactProtocol>(transport); + } else if (protocol_type == "header" || protocol_type == "multih") { + typedef TPedanticProtocol<THeaderProtocol> TPedanticHeaderProtocol; + protocol = std::make_shared<TPedanticHeaderProtocol>(transport); + } else { + typedef TPedanticProtocol<TBinaryProtocol> TPedanticBinaryProtocol; + protocol = std::make_shared<TPedanticBinaryProtocol>(transport); + } + + if (boost::starts_with(protocol_type, "multi")) { + protocol2 = std::make_shared<TMultiplexedProtocol>(protocol, "SecondService"); + // we don't need access to the original protocol any more, so... + protocol = std::make_shared<TMultiplexedProtocol>(protocol, "ThriftTest"); + } + + // Connection info + cout << "Connecting (" << transport_type << "/" << protocol_type << ") to: "; + if (abstract_namespace) { + cout << '@'; + } + cout << domain_socket; + if (port != 0) { + cout << host << ":" << port; + } + cout << endl; + + if (transport_type.compare("evhttp") == 0) { + event_base* base = event_base_new(); + cout << "Libevent Version: " << event_get_version() << endl; + cout << "Libevent Method: " << event_base_get_method(base) << endl; +#if LIBEVENT_VERSION_NUMBER >= 0x02000000 + cout << "Libevent Features: 0x" << hex << event_base_get_features(base) << endl; +#endif + + std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + std::shared_ptr<TAsyncChannel> channel( + new TEvhttpClientChannel(host.c_str(), "/", host.c_str(), port, base)); + ThriftTestCobClient* client = new ThriftTestCobClient(channel, protocolFactory.get()); + client->testVoid(std::bind(testVoid_clientReturn, + base, + std::placeholders::_1)); + + event_base_loop(base, 0); + return 0; + } + + ThriftTestClient testClient(protocol); + + uint64_t time_min = 0; + uint64_t time_max = 0; + uint64_t time_tot = 0; + + int test = 0; + for (test = 0; test < numTests; ++test) { + + try { + transport->open(); + } catch (TTransportException& ex) { + cout << "Connect failed: " << ex.what() << endl; + return ERR_UNKNOWN; + } + + /** + * CONNECT TEST + */ + printf("Test #%d, connect %s:%d\n", test + 1, host.c_str(), port); + + uint64_t start = now(); + + /** + * VOID TEST + */ + try { + cout << "testVoid()" << flush; + testClient.testVoid(); + cout << " = void" << endl; + } catch (TTransportException&) { + // Stop here if transport got broken + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_BASETYPES; + } + + /** + * STRING TEST + */ + cout << "testString(\"Test\")" << flush; + string s; + testClient.testString(s, "Test"); + cout << " = " << s << endl; + if (s != "Test") { + cout << "*** FAILED ***" << endl; + return_code |= ERR_BASETYPES; + } + + // + // Multiplexed protocol - call another service method + // in the middle of the ThriftTest + // + if (boost::starts_with(protocol_type, "multi")) { + SecondServiceClient ssc(protocol2); + // transport is already open... + + try { + cout << "secondService.secondTestString(\"foo\") => " << flush; + std::string result; + ssc.secondtestString(result, "foo"); + cout << "{" << result << "}" << endl; + } catch (std::exception& e) { + cout << " *** FAILED *** " << e.what() << endl; + return_code |= ERR_EXCEPTIONS; + } + } + + try { +#ifdef _MSC_VER +#pragma warning( push ) +#pragma warning( disable : 4566 ) +#endif + string str( + "}{Afrikaans, Alemannisch, Aragonés, العربية, مصرى, " + "Asturianu, Aymar aru, Azərbaycan, Башҡорт, Boarisch, Žemaitėška, " + "Беларуская, Беларуская (тарашкевіца), Български, Bamanankan, " + "বাংলা, Brezhoneg, Bosanski, Català, Mìng-dĕ̤ng-ngṳ̄, Нохчийн, " + "Cebuano, ᏣᎳᎩ, Česky, Словѣ́ньскъ / ⰔⰎⰑⰂⰡⰐⰠⰔⰍⰟ, Чӑвашла, Cymraeg, " + "Dansk, Zazaki, ދިވެހިބަސް, Ελληνικά, Emiliàn e rumagnòl, English, " + "Esperanto, Español, Eesti, Euskara, فارسی, Suomi, Võro, Føroyskt, " + "Français, Arpetan, Furlan, Frysk, Gaeilge, 贛語, Gàidhlig, Galego, " + "Avañe'ẽ, ગુજરાતી, Gaelg, עברית, हिन्दी, Fiji Hindi, Hrvatski, " + "Kreyòl ayisyen, Magyar, Հայերեն, Interlingua, Bahasa Indonesia, " + "Ilokano, Ido, Íslenska, Italiano, 日本語, Lojban, Basa Jawa, " + "ქართული, Kongo, Kalaallisut, ಕನ್ನಡ, 한국어, Къарачай-Малкъар, " + "Ripoarisch, Kurdî, Коми, Kernewek, Кыргызча, Latina, Ladino, " + "Lëtzebuergesch, Limburgs, Lingála, ລາວ, Lietuvių, Latviešu, Basa " + "Banyumasan, Malagasy, Македонски, മലയാളം, मराठी, مازِرونی, Bahasa " + "Melayu, Nnapulitano, Nedersaksisch, नेपाल भाषा, Nederlands, " + "Norsk (nynorsk), Norsk (bokmål), Nouormand, Diné bizaad, " + "Occitan, Иронау, Papiamentu, Deitsch, Polski, پنجابی, پښتو, " + "Norfuk / Pitkern, Português, Runa Simi, Rumantsch, Romani, Română, " + "Русский, Саха тыла, Sardu, Sicilianu, Scots, Sámegiella, Simple " + "English, Slovenčina, Slovenščina, Српски / Srpski, Seeltersk, " + "Svenska, Kiswahili, தமிழ், తెలుగు, Тоҷикӣ, ไทย, Türkmençe, Tagalog, " + "Türkçe, Татарча/Tatarça, Українська, اردو, Tiếng Việt, Volapük, " + "Walon, Winaray, 吴语, isiXhosa, ייִדיש, Yorùbá, Zeêuws, 中文, " + "Bân-lâm-gú, 粵語"); +#ifdef _MSC_VER +#pragma warning( pop ) +#endif + cout << "testString(" << str << ") = " << flush; + testClient.testString(s, str); + cout << s << endl; + if (s != str) { + cout.imbue(locale("en_US.UTF8")); + cout << "*** FAILED ***" << endl << "Expected string: " << str << " but got: " << s << endl << "CLEAR"; + return_code |= ERR_BASETYPES; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_BASETYPES; + return return_code; + } + try { + string str( + "quote: \" backslash:" + " forwardslash-escaped: \\/ " + " backspace: \b formfeed: \f newline: \n return: \r tab: " + " now-all-of-them-together: \"\\\\/\b\n\r\t" + " now-a-bunch-of-junk: !@#$%&()(&%$#{}{}<><><" + " char-to-test-json-parsing: ]] \"]] \\\" }}}{ [[[ "); + cout << "testString(" << str << ") = " << flush; + testClient.testString(s, str); + cout << s << endl; + if (s != str) { + cout.imbue(locale("en_US.UTF8")); + cout << "*** FAILED ***" << endl + << "Expected string: " << str << " but got: " << s << endl + << "CLEAR"; + ; + return_code |= ERR_BASETYPES; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_BASETYPES; + return return_code; + } + + /** + * BOOL TEST + */ + cout << boolalpha; + BASETYPE_IDENTITY_TEST(testBool, true); + BASETYPE_IDENTITY_TEST(testBool, false); + + /** + * BYTE TEST + */ + BASETYPE_IDENTITY_TEST(testByte, (int8_t)0); + BASETYPE_IDENTITY_TEST(testByte, (int8_t)-1); + BASETYPE_IDENTITY_TEST(testByte, (int8_t)42); + BASETYPE_IDENTITY_TEST(testByte, (int8_t)-42); + BASETYPE_IDENTITY_TEST(testByte, (int8_t)127); + BASETYPE_IDENTITY_TEST(testByte, (int8_t)-128); + + /** + * I32 TEST + */ + BASETYPE_IDENTITY_TEST(testI32, 0); + BASETYPE_IDENTITY_TEST(testI32, -1); + BASETYPE_IDENTITY_TEST(testI32, 190000013); + BASETYPE_IDENTITY_TEST(testI32, -190000013); + BASETYPE_IDENTITY_TEST(testI32, (numeric_limits<int32_t>::max)()); + BASETYPE_IDENTITY_TEST(testI32, (numeric_limits<int32_t>::min)()); + + /** + * I64 TEST + */ + BASETYPE_IDENTITY_TEST(testI64, (int64_t)0); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)-1); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)7000000000000000123LL); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)-7000000000000000123LL); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(static_cast<double>(2LL), 32)); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(static_cast<double>(2LL), 32)); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(static_cast<double>(2LL), 32) + 1); + BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(static_cast<double>(2LL), 32) - 1); + BASETYPE_IDENTITY_TEST(testI64, (numeric_limits<int64_t>::max)()); + BASETYPE_IDENTITY_TEST(testI64, (numeric_limits<int64_t>::min)()); + + /** + * DOUBLE TEST + */ + // Comparing double values with plain equality because Thrift handles full precision of double + BASETYPE_IDENTITY_TEST(testDouble, 0.0); + BASETYPE_IDENTITY_TEST(testDouble, -1.0); + BASETYPE_IDENTITY_TEST(testDouble, -5.2098523); + BASETYPE_IDENTITY_TEST(testDouble, -0.000341012439638598279); + BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 32)); + BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 32) + 1); + BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 53) - 1); + BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 32)); + BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 32) - 1); + BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 53) + 1); + + try { + double expected = pow(static_cast<double>(10), 307); + cout << "testDouble(" << expected << ") = " << flush; + double actual = testClient.testDouble(expected); + cout << "(" << actual << ")" << endl; + if (expected - actual > pow(static_cast<double>(10), 292)) { + cout << "*** FAILED ***" << endl + << "Expected: " << expected << " but got: " << actual << endl; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_BASETYPES; + } + + try { + double expected = pow(static_cast<double>(10), -292); + cout << "testDouble(" << expected << ") = " << flush; + double actual = testClient.testDouble(expected); + cout << "(" << actual << ")" << endl; + if (expected - actual > pow(static_cast<double>(10), -307)) { + cout << "*** FAILED ***" << endl + << "Expected: " << expected << " but got: " << actual << endl; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_BASETYPES; + } + + /** + * BINARY TEST + */ + for (string::size_type i = 0; i < 131073 && !return_code; ) { + return_code |= binary_test(testClient, i); + if (i > 0) { i *= 2; } else { ++i; } + } + + + /** + * STRUCT TEST + */ + cout << "testStruct({\"Zero\", 1, -3, -5})" << flush; + Xtruct out; + out.string_thing = "Zero"; + out.byte_thing = 1; + out.i32_thing = -3; + out.i64_thing = -5; + Xtruct in; + testClient.testStruct(in, out); + printf(" = {\"%s\", %d, %d, %" PRId64 "}\n", + in.string_thing.c_str(), + (int)in.byte_thing, + in.i32_thing, + in.i64_thing); + if (in != out) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + /** + * NESTED STRUCT TEST + */ + cout << "testNest({1, {\"Zero\", 1, -3, -5}), 5}" << flush; + Xtruct2 out2; + out2.byte_thing = 1; + out2.struct_thing = out; + out2.i32_thing = 5; + Xtruct2 in2; + testClient.testNest(in2, out2); + in = in2.struct_thing; + printf(" = {%d, {\"%s\", %d, %d, %" PRId64 "}, %d}\n", + in2.byte_thing, + in.string_thing.c_str(), + (int)in.byte_thing, + in.i32_thing, + in.i64_thing, + in2.i32_thing); + if (in2 != out2) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + /** + * MAP TEST + */ + map<int32_t, int32_t> mapout; + for (int32_t i = 0; i < 5; ++i) { + mapout.insert(make_pair(i, i - 10)); + } + cout << "testMap({" << flush; + map<int32_t, int32_t>::const_iterator m_iter; + bool first = true; + for (m_iter = mapout.begin(); m_iter != mapout.end(); ++m_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << m_iter->first << " => " << m_iter->second; + } + cout << "})"; + map<int32_t, int32_t> mapin; + testClient.testMap(mapin, mapout); + cout << " = {"; + first = true; + for (m_iter = mapin.begin(); m_iter != mapin.end(); ++m_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << m_iter->first << " => " << m_iter->second; + } + cout << "}" << endl; + if (mapin != mapout) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_CONTAINERS; + } + + /** + * STRING MAP TEST + */ + cout << "testStringMap({a => 2, b => blah, some => thing}) = {" << flush; + map<string, string> smapin; + map<string, string> smapout; + smapin["a"] = "2"; + smapin["b"] = "blah"; + smapin["some"] = "thing"; + try { + testClient.testStringMap(smapout, smapin); + first = true; + for (map<string, string>::const_iterator it = smapout.begin(); it != smapout.end(); ++it) { + if (first) + cout << ","; + else + first = false; + cout << it->first << " => " << it->second; + } + cout << "}" << endl; + if (smapin != smapout) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_CONTAINERS; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_CONTAINERS; + } + + /** + * SET TEST + */ + set<int32_t> setout; + for (int32_t i = -2; i < 3; ++i) { + setout.insert(i); + } + cout << "testSet({" << flush; + set<int32_t>::const_iterator s_iter; + first = true; + for (s_iter = setout.begin(); s_iter != setout.end(); ++s_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << *s_iter; + } + cout << "})"; + set<int32_t> setin; + testClient.testSet(setin, setout); + cout << " = {"; + first = true; + for (s_iter = setin.begin(); s_iter != setin.end(); ++s_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << *s_iter; + } + cout << "}" << endl; + if (setin != setout) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_CONTAINERS; + } + + /** + * LIST TEST + */ + cout << "testList(empty)" << flush; + try { + vector<int32_t> listout; + testClient.testList(listout, vector<int32_t>()); + if (!listout.empty()) { + cout << "*** FAILED ***" << endl; + cout << "invalid length: " << listout.size() << endl; + return_code |= ERR_CONTAINERS; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_CONTAINERS; + } + try { + vector<int32_t> listout; + for (int32_t i = -2; i < 3; ++i) { + listout.push_back(i); + } + cout << "testList({" << flush; + vector<int32_t>::const_iterator l_iter; + first = true; + for (l_iter = listout.begin(); l_iter != listout.end(); ++l_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << *l_iter; + } + cout << "})"; + vector<int32_t> listin; + testClient.testList(listin, listout); + cout << " = {"; + first = true; + for (l_iter = listin.begin(); l_iter != listin.end(); ++l_iter) { + if (first) { + first = false; + } else { + cout << ","; + } + cout << *l_iter; + } + cout << "}" << endl; + if (listin != listout) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_CONTAINERS; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_CONTAINERS; + } + + /** + * ENUM TEST + */ + cout << "testEnum(ONE)" << flush; + Numberz::type ret = testClient.testEnum(Numberz::ONE); + cout << " = " << ret << endl; + if (ret != Numberz::ONE) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + cout << "testEnum(TWO)" << flush; + ret = testClient.testEnum(Numberz::TWO); + cout << " = " << ret << endl; + if (ret != Numberz::TWO) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + cout << "testEnum(THREE)" << flush; + ret = testClient.testEnum(Numberz::THREE); + cout << " = " << ret << endl; + if (ret != Numberz::THREE) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + cout << "testEnum(FIVE)" << flush; + ret = testClient.testEnum(Numberz::FIVE); + cout << " = " << ret << endl; + if (ret != Numberz::FIVE) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + cout << "testEnum(EIGHT)" << flush; + ret = testClient.testEnum(Numberz::EIGHT); + cout << " = " << ret << endl; + if (ret != Numberz::EIGHT) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + /** + * TYPEDEF TEST + */ + cout << "testTypedef(309858235082523)" << flush; + UserId uid = testClient.testTypedef(309858235082523LL); + cout << " = " << uid << endl; + if (uid != 309858235082523LL) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + + /** + * NESTED MAP TEST + */ + cout << "testMapMap(1)" << flush; + map<int32_t, map<int32_t, int32_t> > mm; + testClient.testMapMap(mm, 1); + cout << " = {"; + map<int32_t, map<int32_t, int32_t> >::const_iterator mi; + for (mi = mm.begin(); mi != mm.end(); ++mi) { + printf("%d => {", mi->first); + map<int32_t, int32_t>::const_iterator mi2; + for (mi2 = mi->second.begin(); mi2 != mi->second.end(); ++mi2) { + cout << mi2->first << " => " << mi2->second; + } + cout << "}, "; + } + cout << "}" << endl; + if (mm.size() != 2 || + mm[-4][-4] != -4 || + mm[-4][-3] != -3 || + mm[-4][-2] != -2 || + mm[-4][-1] != -1 || + mm[4][4] != 4 || + mm[4][3] != 3 || + mm[4][2] != 2 || + mm[4][1] != 1) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_CONTAINERS; + } + + /** + * INSANITY TEST + */ + if (!noinsane) { + Insanity insane; + insane.userMap.insert(make_pair(Numberz::FIVE, 5)); + insane.userMap.insert(make_pair(Numberz::EIGHT, 8)); + Xtruct truck; + truck.string_thing = "Goodbye4"; + truck.byte_thing = 4; + truck.i32_thing = 4; + truck.i64_thing = 4; + Xtruct truck2; + truck2.string_thing = "Hello2"; + truck2.byte_thing = 2; + truck2.i32_thing = 2; + truck2.i64_thing = 2; + insane.xtructs.push_back(truck); + insane.xtructs.push_back(truck2); + cout << "testInsanity()" << flush; + map<UserId, map<Numberz::type, Insanity> > whoa; + testClient.testInsanity(whoa, insane); + cout << " = {"; + map<UserId, map<Numberz::type, Insanity> >::const_iterator i_iter; + for (i_iter = whoa.begin(); i_iter != whoa.end(); ++i_iter) { + printf("%" PRId64 " => {", i_iter->first); + map<Numberz::type, Insanity>::const_iterator i2_iter; + for (i2_iter = i_iter->second.begin(); i2_iter != i_iter->second.end(); ++i2_iter) { + printf("%d => {", i2_iter->first); + map<Numberz::type, UserId> userMap = i2_iter->second.userMap; + map<Numberz::type, UserId>::const_iterator um; + cout << "{"; + for (um = userMap.begin(); um != userMap.end(); ++um) { + cout << um->first << " => " << um->second; + } + cout << "}, "; + + vector<Xtruct> xtructs = i2_iter->second.xtructs; + vector<Xtruct>::const_iterator x; + cout << "{"; + for (x = xtructs.begin(); x != xtructs.end(); ++x) { + printf("{\"%s\", %d, %d, %" PRId64 "}, ", + x->string_thing.c_str(), + (int)x->byte_thing, + x->i32_thing, + x->i64_thing); + } + cout << "}"; + + cout << "}, "; + } + cout << "}, "; + } + cout << "}" << endl; + bool failed = false; + map<UserId, map<Numberz::type, Insanity> >::const_iterator it1 = whoa.find(UserId(1)); + if (whoa.size() != 2) { + failed = true; + } + if (it1 == whoa.end()) { + failed = true; + } else { + auto it12 = it1->second.find(Numberz::TWO); + if (it12 == it1->second.end() || it12->second != insane) { + failed = true; + } + auto it13 = it1->second.find(Numberz::THREE); + if (it13 == it1->second.end() || it13->second != insane) { + failed = true; + } + } + map<UserId, map<Numberz::type, Insanity> >::const_iterator it2 = whoa.find(UserId(2)); + if (it2 == whoa.end()) { + failed = true; + } else { + auto it26 = it2->second.find(Numberz::SIX); + if (it26 == it2->second.end() || it26->second != Insanity()) { + failed = true; + } + } + if (failed) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + } + + /** + * MULTI TEST + */ + cout << "testMulti()" << endl; + try { + map<int16_t, string> mul_map; + Xtruct mul_result; + mul_map[1] = "blah"; + mul_map[2] = "thing"; + testClient.testMulti(mul_result, 42, 4242, 424242, mul_map, Numberz::EIGHT, UserId(24)); + Xtruct xxs; + xxs.string_thing = "Hello2"; + xxs.byte_thing = 42; + xxs.i32_thing = 4242; + xxs.i64_thing = 424242; + if (mul_result != xxs) { + cout << "*** FAILED ***" << endl; + return_code |= ERR_STRUCTS; + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_STRUCTS; + } + + /* test exception */ + + try { + cout << "testClient.testException(\"Xception\") =>" << flush; + testClient.testException("Xception"); + cout << " void\n*** FAILED ***" << endl; + return_code |= ERR_EXCEPTIONS; + + } catch (Xception& e) { + printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str()); + } + + try { + cout << "testClient.testException(\"TException\") =>" << flush; + testClient.testException("TException"); + cout << " void\n*** FAILED ***" << endl; + return_code |= ERR_EXCEPTIONS; + + } catch (const TException&) { + cout << " Caught TException" << endl; + } + + try { + cout << "testClient.testException(\"success\") =>" << flush; + testClient.testException("success"); + cout << " void" << endl; + } catch (exception & ex) { \ + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_EXCEPTIONS; + } + + /* test multi exception */ + + try { + cout << "testClient.testMultiException(\"Xception\", \"test 1\") =>" << flush; + Xtruct result; + testClient.testMultiException(result, "Xception", "test 1"); + cout << " result\n*** FAILED ***" << endl; + return_code |= ERR_EXCEPTIONS; + } catch (Xception& e) { + printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str()); + } + + try { + cout << "testClient.testMultiException(\"Xception2\", \"test 2\") =>" << flush; + Xtruct result; + testClient.testMultiException(result, "Xception2", "test 2"); + cout << " result\n*** FAILED ***" << endl; + return_code |= ERR_EXCEPTIONS; + + } catch (Xception2& e) { + printf(" {%u, {\"%s\"}}\n", e.errorCode, e.struct_thing.string_thing.c_str()); + } + + try { + cout << "testClient.testMultiException(\"success\", \"test 3\") =>" << flush; + Xtruct result; + testClient.testMultiException(result, "success", "test 3"); + printf(" {{\"%s\"}}\n", result.string_thing.c_str()); + } catch (exception & ex) { \ + cout << "*** FAILED ***" << endl << ex.what() << endl; + return_code |= ERR_EXCEPTIONS; + } + + /* test oneway void */ + { + cout << "testClient.testOneway(1) =>" << flush; + uint64_t startOneway = now(); + testClient.testOneway(1); + uint64_t elapsed = now() - startOneway; + if (elapsed > 200 * 1000) { // 0.2 seconds + printf("*** FAILED *** - took %.2f ms\n", (double)elapsed / 1000.0); + return_code |= ERR_BASETYPES; + } else { + printf(" success - took %.2f ms\n", (double)elapsed / 1000.0); + } + } + + /** + * redo a simple test after the oneway to make sure we aren't "off by one" -- + * if the server treated oneway void like normal void, this next test will + * fail since it will get the void confirmation rather than the correct + * result. In this circumstance, the client will throw the exception: + * + * TApplicationException: Wrong method namea + */ + /** + * I32 TEST + */ + cout << "re-test testI32(-1)" << flush; + int i32 = testClient.testI32(-1); + cout << " = " << i32 << endl; + if (i32 != -1) + return_code |= ERR_BASETYPES; + + cout << endl << "All tests done." << endl << flush; + + uint64_t stop = now(); + uint64_t tot = stop - start; + + cout << "Total time: " << stop - start << " us" << endl; + + time_tot += tot; + if (time_min == 0 || tot < time_min) { + time_min = tot; + } + if (tot > time_max) { + time_max = tot; + } + + cout << flush; + transport->close(); + } + + + uint64_t time_avg = time_tot / numTests; + + cout << "Min time: " << time_min << " us" << endl; + cout << "Max time: " << time_max << " us" << endl; + cout << "Avg time: " << time_avg << " us" << endl; + + return return_code; +} + +void binary_fill(std::string& str, string::size_type siz) +{ + static const signed char bin_data[256] + = {}; + + str.resize(siz); + char *ptr = &str[0]; + string::size_type pos = 0; + for (string::size_type i = 0; i < siz; ++i) + { + if (pos == 255) { pos = 0; } else { ++pos; } + *ptr++ = bin_data[pos]; + } +} + +int binary_test(ThriftTestClient& testClient, string::size_type siz) +{ + string bin_request; + string bin_result; + + cout << "testBinary(siz = " << siz << ")" << endl; + binary_fill(bin_request, siz); + try { + testClient.testBinary(bin_result, bin_request); + + if (bin_request.size() != bin_result.size()) { + cout << "*** FAILED: request size " << bin_request.size() << "; result size " << bin_result.size() << endl; + return ERR_BASETYPES; + } + + for (string::size_type i = 0; i < siz; ++i) { + if (bin_request.at(i) != bin_result.at(i)) { + cout << "*** FAILED: at position " << i << " request[i] is h" << hex << bin_request.at(i) << " result[i] is h" << hex << bin_result.at(i) << endl; + return ERR_BASETYPES; + } + } + } catch (TTransportException&) { + throw; + } catch (exception& ex) { + cout << "*** FAILED ***" << endl << ex.what() << endl; + return ERR_BASETYPES; + } + + return 0; +} diff --git a/src/jaegertracing/thrift/test/cpp/src/TestServer.cpp b/src/jaegertracing/thrift/test/cpp/src/TestServer.cpp new file mode 100644 index 000000000..8d5b4d93e --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/src/TestServer.cpp @@ -0,0 +1,845 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TAsyncBufferProcessor.h> +#include <thrift/async/TAsyncProtocolProcessor.h> +#include <thrift/async/TEvhttpServer.h> +#include <thrift/concurrency/ThreadFactory.h> +#include <thrift/concurrency/ThreadManager.h> +#include <thrift/processor/TMultiplexedProcessor.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TCompactProtocol.h> +#include <thrift/protocol/THeaderProtocol.h> +#include <thrift/protocol/TJSONProtocol.h> +#include <thrift/server/TNonblockingServer.h> +#include <thrift/server/TSimpleServer.h> +#include <thrift/server/TThreadPoolServer.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/transport/THttpServer.h> +#include <thrift/transport/THttpTransport.h> +#include <thrift/transport/TNonblockingSSLServerSocket.h> +#include <thrift/transport/TNonblockingServerSocket.h> +#include <thrift/transport/TSSLServerSocket.h> +#include <thrift/transport/TSSLSocket.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TTransportUtils.h> +#include <thrift/transport/TZlibTransport.h> + +#include "SecondService.h" +#include "ThriftTest.h" + +#ifdef HAVE_STDINT_H +#include <stdint.h> +#endif +#ifdef HAVE_INTTYPES_H +#include <inttypes.h> +#endif +#ifdef HAVE_SIGNAL_H +#include <signal.h> +#endif + +#include <iostream> +#include <stdexcept> +#include <sstream> + +#include <boost/algorithm/string.hpp> +#include <boost/program_options.hpp> +#include <boost/filesystem.hpp> + +#if _WIN32 +#include <thrift/windows/TWinsockSingleton.h> +#endif + +using namespace std; + +using namespace apache::thrift; +using namespace apache::thrift::async; +using namespace apache::thrift::concurrency; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; + +using namespace thrift::test; + +// to handle a controlled shutdown, signal handling is mandatory +#ifdef HAVE_SIGNAL_H +apache::thrift::concurrency::Monitor gMonitor; +void signal_handler(int signum) +{ + if (signum == SIGINT) { + gMonitor.notifyAll(); + } +} +#endif + +class TestHandler : public ThriftTestIf { +public: + TestHandler() = default; + + void testVoid() override { printf("testVoid()\n"); } + + void testString(string& out, const string& thing) override { + printf("testString(\"%s\")\n", thing.c_str()); + out = thing; + } + + bool testBool(const bool thing) override { + printf("testBool(%s)\n", thing ? "true" : "false"); + return thing; + } + + int8_t testByte(const int8_t thing) override { + printf("testByte(%d)\n", (int)thing); + return thing; + } + + int32_t testI32(const int32_t thing) override { + printf("testI32(%d)\n", thing); + return thing; + } + + int64_t testI64(const int64_t thing) override { + printf("testI64(%" PRId64 ")\n", thing); + return thing; + } + + double testDouble(const double thing) override { + printf("testDouble(%f)\n", thing); + return thing; + } + + void testBinary(std::string& _return, const std::string& thing) override { + std::ostringstream hexstr; + hexstr << std::hex << thing; + printf("testBinary(%lu: %s)\n", safe_numeric_cast<unsigned long>(thing.size()), hexstr.str().c_str()); + _return = thing; + } + + void testStruct(Xtruct& out, const Xtruct& thing) override { + printf("testStruct({\"%s\", %d, %d, %" PRId64 "})\n", + thing.string_thing.c_str(), + (int)thing.byte_thing, + thing.i32_thing, + thing.i64_thing); + out = thing; + } + + void testNest(Xtruct2& out, const Xtruct2& nest) override { + const Xtruct& thing = nest.struct_thing; + printf("testNest({%d, {\"%s\", %d, %d, %" PRId64 "}, %d})\n", + (int)nest.byte_thing, + thing.string_thing.c_str(), + (int)thing.byte_thing, + thing.i32_thing, + thing.i64_thing, + nest.i32_thing); + out = nest; + } + + void testMap(map<int32_t, int32_t>& out, const map<int32_t, int32_t>& thing) override { + printf("testMap({"); + map<int32_t, int32_t>::const_iterator m_iter; + bool first = true; + for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d => %d", m_iter->first, m_iter->second); + } + printf("})\n"); + out = thing; + } + + void testStringMap(map<std::string, std::string>& out, + const map<std::string, std::string>& thing) override { + printf("testMap({"); + map<std::string, std::string>::const_iterator m_iter; + bool first = true; + for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%s => %s", (m_iter->first).c_str(), (m_iter->second).c_str()); + } + printf("})\n"); + out = thing; + } + + void testSet(set<int32_t>& out, const set<int32_t>& thing) override { + printf("testSet({"); + set<int32_t>::const_iterator s_iter; + bool first = true; + for (s_iter = thing.begin(); s_iter != thing.end(); ++s_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *s_iter); + } + printf("})\n"); + out = thing; + } + + void testList(vector<int32_t>& out, const vector<int32_t>& thing) override { + printf("testList({"); + vector<int32_t>::const_iterator l_iter; + bool first = true; + for (l_iter = thing.begin(); l_iter != thing.end(); ++l_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *l_iter); + } + printf("})\n"); + out = thing; + } + + Numberz::type testEnum(const Numberz::type thing) override { + printf("testEnum(%d)\n", thing); + return thing; + } + + UserId testTypedef(const UserId thing) override { + printf("testTypedef(%" PRId64 ")\n", thing); + return thing; + } + + void testMapMap(map<int32_t, map<int32_t, int32_t> >& mapmap, const int32_t hello) override { + printf("testMapMap(%d)\n", hello); + + map<int32_t, int32_t> pos; + map<int32_t, int32_t> neg; + for (int i = 1; i < 5; i++) { + pos.insert(make_pair(i, i)); + neg.insert(make_pair(-i, -i)); + } + + mapmap.insert(make_pair(4, pos)); + mapmap.insert(make_pair(-4, neg)); + } + + void testInsanity(map<UserId, map<Numberz::type, Insanity> >& insane, const Insanity& argument) override { + printf("testInsanity()\n"); + + Insanity looney; + map<Numberz::type, Insanity> first_map; + map<Numberz::type, Insanity> second_map; + + first_map.insert(make_pair(Numberz::TWO, argument)); + first_map.insert(make_pair(Numberz::THREE, argument)); + + second_map.insert(make_pair(Numberz::SIX, looney)); + + insane.insert(make_pair(1, first_map)); + insane.insert(make_pair(2, second_map)); + + printf("return"); + printf(" = {"); + map<UserId, map<Numberz::type, Insanity> >::const_iterator i_iter; + for (i_iter = insane.begin(); i_iter != insane.end(); ++i_iter) { + printf("%" PRId64 " => {", i_iter->first); + map<Numberz::type, Insanity>::const_iterator i2_iter; + for (i2_iter = i_iter->second.begin(); i2_iter != i_iter->second.end(); ++i2_iter) { + printf("%d => {", i2_iter->first); + map<Numberz::type, UserId> userMap = i2_iter->second.userMap; + map<Numberz::type, UserId>::const_iterator um; + printf("{"); + for (um = userMap.begin(); um != userMap.end(); ++um) { + printf("%d => %" PRId64 ", ", um->first, um->second); + } + printf("}, "); + + vector<Xtruct> xtructs = i2_iter->second.xtructs; + vector<Xtruct>::const_iterator x; + printf("{"); + for (x = xtructs.begin(); x != xtructs.end(); ++x) { + printf("{\"%s\", %d, %d, %" PRId64 "}, ", + x->string_thing.c_str(), + (int)x->byte_thing, + x->i32_thing, + x->i64_thing); + } + printf("}"); + + printf("}, "); + } + printf("}, "); + } + printf("}\n"); + } + + void testMulti(Xtruct& hello, + const int8_t arg0, + const int32_t arg1, + const int64_t arg2, + const std::map<int16_t, std::string>& arg3, + const Numberz::type arg4, + const UserId arg5) override { + (void)arg3; + (void)arg4; + (void)arg5; + + printf("testMulti()\n"); + + hello.string_thing = "Hello2"; + hello.byte_thing = arg0; + hello.i32_thing = arg1; + hello.i64_thing = (int64_t)arg2; + } + + void testException(const std::string& arg) override { + printf("testException(%s)\n", arg.c_str()); + if (arg.compare("Xception") == 0) { + Xception e; + e.errorCode = 1001; + e.message = arg; + throw e; + } else if (arg.compare("TException") == 0) { + apache::thrift::TException e; + throw e; + } else { + Xtruct result; + result.string_thing = arg; + return; + } + } + + void testMultiException(Xtruct& result, + const std::string& arg0, + const std::string& arg1) override { + + printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str()); + + if (arg0.compare("Xception") == 0) { + Xception e; + e.errorCode = 1001; + e.message = "This is an Xception"; + throw e; + } else if (arg0.compare("Xception2") == 0) { + Xception2 e; + e.errorCode = 2002; + e.struct_thing.string_thing = "This is an Xception2"; + throw e; + } else { + result.string_thing = arg1; + return; + } + } + + void testOneway(const int32_t aNum) override { + printf("testOneway(%d): call received\n", aNum); + } +}; + +class SecondHandler : public SecondServiceIf +{ + public: + void secondtestString(std::string& result, const std::string& thing) override + { result = "testString(\"" + thing + "\")"; } +}; + +class TestProcessorEventHandler : public TProcessorEventHandler { + void* getContext(const char* fn_name, void* serverContext) override { + (void)serverContext; + return new std::string(fn_name); + } + void freeContext(void* ctx, const char* fn_name) override { + (void)fn_name; + delete static_cast<std::string*>(ctx); + } + void preRead(void* ctx, const char* fn_name) override { communicate("preRead", ctx, fn_name); } + void postRead(void* ctx, const char* fn_name, uint32_t bytes) override { + (void)bytes; + communicate("postRead", ctx, fn_name); + } + void preWrite(void* ctx, const char* fn_name) override { communicate("preWrite", ctx, fn_name); } + void postWrite(void* ctx, const char* fn_name, uint32_t bytes) override { + (void)bytes; + communicate("postWrite", ctx, fn_name); + } + void asyncComplete(void* ctx, const char* fn_name) override { + communicate("asyncComplete", ctx, fn_name); + } + void handlerError(void* ctx, const char* fn_name) override { + communicate("handlerError", ctx, fn_name); + } + + void communicate(const char* event, void* ctx, const char* fn_name) { + std::cout << event << ": " << *static_cast<std::string*>(ctx) << " = " << fn_name << std::endl; + } +}; + +class TestHandlerAsync : public ThriftTestCobSvIf { +public: + TestHandlerAsync(std::shared_ptr<TestHandler>& handler) : _delegate(handler) {} + ~TestHandlerAsync() override = default; + + void testVoid(std::function<void()> cob) override { + _delegate->testVoid(); + cob(); + } + + void testString(std::function<void(std::string const& _return)> cob, + const std::string& thing) override { + std::string res; + _delegate->testString(res, thing); + cob(res); + } + + void testBool(std::function<void(bool const& _return)> cob, const bool thing) override { + bool res = _delegate->testBool(thing); + cob(res); + } + + void testByte(std::function<void(int8_t const& _return)> cob, const int8_t thing) override { + int8_t res = _delegate->testByte(thing); + cob(res); + } + + void testI32(std::function<void(int32_t const& _return)> cob, const int32_t thing) override { + int32_t res = _delegate->testI32(thing); + cob(res); + } + + void testI64(std::function<void(int64_t const& _return)> cob, const int64_t thing) override { + int64_t res = _delegate->testI64(thing); + cob(res); + } + + void testDouble(std::function<void(double const& _return)> cob, const double thing) override { + double res = _delegate->testDouble(thing); + cob(res); + } + + void testBinary(std::function<void(std::string const& _return)> cob, + const std::string& thing) override { + std::string res; + _delegate->testBinary(res, thing); + cob(res); + } + + void testStruct(std::function<void(Xtruct const& _return)> cob, const Xtruct& thing) override { + Xtruct res; + _delegate->testStruct(res, thing); + cob(res); + } + + void testNest(std::function<void(Xtruct2 const& _return)> cob, const Xtruct2& thing) override { + Xtruct2 res; + _delegate->testNest(res, thing); + cob(res); + } + + void testMap(std::function<void(std::map<int32_t, int32_t> const& _return)> cob, + const std::map<int32_t, int32_t>& thing) override { + std::map<int32_t, int32_t> res; + _delegate->testMap(res, thing); + cob(res); + } + + void testStringMap( + std::function<void(std::map<std::string, std::string> const& _return)> cob, + const std::map<std::string, std::string>& thing) override { + std::map<std::string, std::string> res; + _delegate->testStringMap(res, thing); + cob(res); + } + + void testSet(std::function<void(std::set<int32_t> const& _return)> cob, + const std::set<int32_t>& thing) override { + std::set<int32_t> res; + _delegate->testSet(res, thing); + cob(res); + } + + void testList(std::function<void(std::vector<int32_t> const& _return)> cob, + const std::vector<int32_t>& thing) override { + std::vector<int32_t> res; + _delegate->testList(res, thing); + cob(res); + } + + void testEnum(std::function<void(Numberz::type const& _return)> cob, + const Numberz::type thing) override { + Numberz::type res = _delegate->testEnum(thing); + cob(res); + } + + void testTypedef(std::function<void(UserId const& _return)> cob, const UserId thing) override { + UserId res = _delegate->testTypedef(thing); + cob(res); + } + + void testMapMap( + std::function<void(std::map<int32_t, std::map<int32_t, int32_t> > const& _return)> cob, + const int32_t hello) override { + std::map<int32_t, std::map<int32_t, int32_t> > res; + _delegate->testMapMap(res, hello); + cob(res); + } + + void testInsanity( + std::function<void(std::map<UserId, std::map<Numberz::type, Insanity> > const& _return)> cob, + const Insanity& argument) override { + std::map<UserId, std::map<Numberz::type, Insanity> > res; + _delegate->testInsanity(res, argument); + cob(res); + } + + void testMulti(std::function<void(Xtruct const& _return)> cob, + const int8_t arg0, + const int32_t arg1, + const int64_t arg2, + const std::map<int16_t, std::string>& arg3, + const Numberz::type arg4, + const UserId arg5) override { + Xtruct res; + _delegate->testMulti(res, arg0, arg1, arg2, arg3, arg4, arg5); + cob(res); + } + + void testException( + std::function<void()> cob, + std::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, + const std::string& arg) override { + try { + _delegate->testException(arg); + } catch (const apache::thrift::TException& e) { + exn_cob(apache::thrift::TDelayedException::delayException(e)); + return; + } + cob(); + } + + void testMultiException( + std::function<void(Xtruct const& _return)> cob, + std::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, + const std::string& arg0, + const std::string& arg1) override { + Xtruct res; + try { + _delegate->testMultiException(res, arg0, arg1); + } catch (const apache::thrift::TException& e) { + exn_cob(apache::thrift::TDelayedException::delayException(e)); + return; + } + cob(res); + } + + void testOneway(std::function<void()> cob, const int32_t secondsToSleep) override { + _delegate->testOneway(secondsToSleep); + cob(); + } + +protected: + std::shared_ptr<TestHandler> _delegate; +}; + +namespace po = boost::program_options; + +int main(int argc, char** argv) { + + string testDir = boost::filesystem::system_complete(argv[0]).parent_path().parent_path().parent_path().string(); + string certPath = testDir + "/keys/server.crt"; + string keyPath = testDir + "/keys/server.key"; + +#if _WIN32 + transport::TWinsockSingleton::create(); +#endif + int port = 9090; + bool ssl = false; + bool zlib = false; + string transport_type = "buffered"; + string protocol_type = "binary"; + string server_type = "simple"; + string domain_socket = ""; + bool abstract_namespace = false; + size_t workers = 4; + int string_limit = 0; + int container_limit = 0; + + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port", po::value<int>(&port)->default_value(port), "Port number to listen") + ("domain-socket", po::value<string>(&domain_socket) ->default_value(domain_socket), "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)") + ("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)") + ("server-type", po::value<string>(&server_type)->default_value(server_type), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"") + ("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http, zlib") + ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json, multi, multic, multih, multij") + ("ssl", "Encrypted Transport using SSL") + ("zlib", "Wrapped Transport using Zlib") + ("processor-events", "processor-events") + ("workers,n", po::value<size_t>(&workers)->default_value(workers), "Number of thread pools workers. Only valid for thread-pool server type") + ("string-limit", po::value<int>(&string_limit)) + ("container-limit", po::value<int>(&container_limit)); + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } + + try { + if (!server_type.empty()) { + if (server_type == "simple") { + } else if (server_type == "thread-pool") { + } else if (server_type == "threaded") { + } else if (server_type == "nonblocking") { + } else { + throw invalid_argument("Unknown server type " + server_type); + } + } + + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "compact") { + } else if (protocol_type == "json") { + } else if (protocol_type == "header") { + } else if (protocol_type == "multi") { // multiplexed binary + } else if (protocol_type == "multic") { // multiplexed compact + } else if (protocol_type == "multih") { // multiplexed header + } else if (protocol_type == "multij") { // multiplexed json + } else { + throw invalid_argument("Unknown protocol type " + protocol_type); + } + } + + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { + } else if (transport_type == "zlib") { + // crosstester will pass zlib as a flag and a transport right now... + } else { + throw invalid_argument("Unknown transport type " + transport_type); + } + } + + } catch (std::exception& e) { + cerr << e.what() << endl; + cout << desc << "\n"; + return 1; + } + + if (vm.count("ssl")) { + ssl = true; + } + + if (vm.count("zlib")) { + zlib = true; + } + +#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) + if (ssl) { + signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly + } +#endif + + if (vm.count("abstract-namespace")) { + abstract_namespace = true; + } + + // Dispatcher + std::shared_ptr<TProtocolFactory> protocolFactory; + if (protocol_type == "json" || protocol_type == "multij") { + std::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory()); + protocolFactory = jsonProtocolFactory; + } else if (protocol_type == "compact" || protocol_type == "multic") { + auto *compactProtocolFactory = new TCompactProtocolFactoryT<TBufferBase>(); + compactProtocolFactory->setContainerSizeLimit(container_limit); + compactProtocolFactory->setStringSizeLimit(string_limit); + protocolFactory.reset(compactProtocolFactory); + } else if (protocol_type == "header" || protocol_type == "multih") { + std::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory()); + protocolFactory = headerProtocolFactory; + } else { + auto* binaryProtocolFactory = new TBinaryProtocolFactoryT<TBufferBase>(); + binaryProtocolFactory->setContainerSizeLimit(container_limit); + binaryProtocolFactory->setStringSizeLimit(string_limit); + protocolFactory.reset(binaryProtocolFactory); + } + + // Processors + std::shared_ptr<TestHandler> testHandler(new TestHandler()); + std::shared_ptr<TProcessor> testProcessor(new ThriftTestProcessor(testHandler)); + + if (vm.count("processor-events")) { + testProcessor->setEventHandler( + std::shared_ptr<TProcessorEventHandler>(new TestProcessorEventHandler())); + } + + // Transport + std::shared_ptr<TSSLSocketFactory> sslSocketFactory; + std::shared_ptr<TServerSocket> serverSocket; + + if (ssl) { + sslSocketFactory = std::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory()); + sslSocketFactory->loadCertificate(certPath.c_str()); + sslSocketFactory->loadPrivateKey(keyPath.c_str()); + sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); + if (server_type != "nonblocking") { + serverSocket = std::shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory)); + } + } else { + if (domain_socket != "") { + if (abstract_namespace) { + std::string abstract_socket("\0", 1); + abstract_socket += domain_socket; + serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(abstract_socket)); + } else { + unlink(domain_socket.c_str()); + serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(domain_socket)); + } + port = 0; + } else { + serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(port)); + } + } + + // Factory + std::shared_ptr<TTransportFactory> transportFactory; + + if (transport_type == "http" && server_type != "nonblocking") { + transportFactory = std::make_shared<THttpServerTransportFactory>(); + } else if (transport_type == "framed") { + transportFactory = std::make_shared<TFramedTransportFactory>(); + } else { + transportFactory = std::make_shared<TBufferedTransportFactory>(); + } + + if (zlib) { + // hmm.. doesn't seem to be a way to make it wrap the others... + transportFactory = std::make_shared<TZlibTransportFactory>(); + } + + // Server Info + cout << "Starting \"" << server_type << "\" server (" << transport_type << "/" << protocol_type + << ") listen on: "; + if (abstract_namespace) { + cout << '@'; + } + cout << domain_socket; + if (port != 0) { + cout << port; + } + cout << endl; + + // Multiplexed Processor if needed + if (boost::starts_with(protocol_type, "multi")) { + std::shared_ptr<SecondHandler> secondHandler(new SecondHandler()); + std::shared_ptr<SecondServiceProcessor> secondProcessor(new SecondServiceProcessor(secondHandler)); + + std::shared_ptr<TMultiplexedProcessor> multiplexedProcessor(new TMultiplexedProcessor()); + multiplexedProcessor->registerDefault(testProcessor); // non-multi clients go to the default processor (multi:binary, multic:compact, ...) + multiplexedProcessor->registerProcessor("ThriftTest", testProcessor); + multiplexedProcessor->registerProcessor("SecondService", secondProcessor); + testProcessor = std::dynamic_pointer_cast<TProcessor>(multiplexedProcessor); + } + + // Server + std::shared_ptr<apache::thrift::server::TServer> server; + + if (server_type == "simple") { + server.reset(new TSimpleServer(testProcessor, serverSocket, transportFactory, protocolFactory)); + } else if (server_type == "thread-pool") { + + std::shared_ptr<ThreadFactory> threadFactory + = std::shared_ptr<ThreadFactory>(new ThreadFactory()); + + std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workers); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + server.reset(new TThreadPoolServer(testProcessor, + serverSocket, + transportFactory, + protocolFactory, + threadManager)); + } else if (server_type == "threaded") { + server.reset( + new TThreadedServer(testProcessor, serverSocket, transportFactory, protocolFactory)); + } else if (server_type == "nonblocking") { + if (transport_type == "http") { + std::shared_ptr<TestHandlerAsync> testHandlerAsync(new TestHandlerAsync(testHandler)); + std::shared_ptr<TAsyncProcessor> testProcessorAsync( + new ThriftTestAsyncProcessor(testHandlerAsync)); + std::shared_ptr<TAsyncBufferProcessor> testBufferProcessor( + new TAsyncProtocolProcessor(testProcessorAsync, protocolFactory)); + + // not loading nonblockingServer into "server" because + // TEvhttpServer doesn't inherit from TServer, and doesn't + // provide a stop method. + TEvhttpServer nonblockingServer(testBufferProcessor, port); + nonblockingServer.serve(); + } else if (transport_type == "framed") { + std::shared_ptr<transport::TNonblockingServerTransport> nbSocket; + nbSocket.reset( + ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory) + : new transport::TNonblockingServerSocket(port)); + server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket)); + } else { + cerr << "server-type nonblocking requires transport of http or framed" << endl; + exit(1); + } + } + + if (server.get() != nullptr) { + if (protocol_type == "header") { + // Tell the server to use the same protocol for input / output + // if using header + server->setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>()); + } + + apache::thrift::concurrency::ThreadFactory factory; + factory.setDetached(false); + std::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server); + std::shared_ptr<apache::thrift::concurrency::Thread> thread + = factory.newThread(serverThreadRunner); + +#ifdef HAVE_SIGNAL_H + signal(SIGINT, signal_handler); +#endif + + thread->start(); + gMonitor.waitForever(); // wait for a shutdown signal + +#ifdef HAVE_SIGNAL_H + signal(SIGINT, SIG_DFL); +#endif + + server->stop(); + thread->join(); + server.reset(); + } + + cout << "done." << endl; + return 0; +} + diff --git a/src/jaegertracing/thrift/test/cpp/src/ThriftTest_extras.cpp b/src/jaegertracing/thrift/test/cpp/src/ThriftTest_extras.cpp new file mode 100644 index 000000000..af5606efb --- /dev/null +++ b/src/jaegertracing/thrift/test/cpp/src/ThriftTest_extras.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Extra functions required for ThriftTest_types to work + +#include <thrift/protocol/TDebugProtocol.h> +#include "gen-cpp/ThriftTest_types.h" + +namespace thrift { +namespace test { + +bool Insanity::operator<(thrift::test::Insanity const& other) const { + using apache::thrift::ThriftDebugString; + return ThriftDebugString(*this) < ThriftDebugString(other); +} +} +} |