diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/cpp/test/processor')
7 files changed, 1806 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.cpp b/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.cpp new file mode 100644 index 000000000..c75955d27 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.cpp @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "EventLog.h" + +#include <stdarg.h> +#include <stdlib.h> + +using namespace apache::thrift::concurrency; + +namespace { + +// Define environment variable DEBUG_EVENTLOG to enable debug logging +// ex: $ DEBUG_EVENTLOG=1 processor_test +static const char * DEBUG_EVENTLOG = getenv("DEBUG_EVENTLOG"); + +void debug(const char* fmt, ...) { + if (DEBUG_EVENTLOG) { + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + fprintf(stderr, "\n"); + } +} +} + +namespace apache { +namespace thrift { +namespace test { + +uint32_t EventLog::nextId_ = 0; + +#define EVENT_TYPE(value) EventType EventLog::value = #value +EVENT_TYPE(ET_LOG_END); +EVENT_TYPE(ET_CONN_CREATED); +EVENT_TYPE(ET_CONN_DESTROYED); +EVENT_TYPE(ET_CALL_STARTED); +EVENT_TYPE(ET_CALL_FINISHED); +EVENT_TYPE(ET_PROCESS); +EVENT_TYPE(ET_PRE_READ); +EVENT_TYPE(ET_POST_READ); +EVENT_TYPE(ET_PRE_WRITE); +EVENT_TYPE(ET_POST_WRITE); +EVENT_TYPE(ET_ASYNC_COMPLETE); +EVENT_TYPE(ET_HANDLER_ERROR); + +EVENT_TYPE(ET_CALL_INCREMENT_GENERATION); +EVENT_TYPE(ET_CALL_GET_GENERATION); +EVENT_TYPE(ET_CALL_ADD_STRING); +EVENT_TYPE(ET_CALL_GET_STRINGS); +EVENT_TYPE(ET_CALL_GET_DATA_WAIT); +EVENT_TYPE(ET_CALL_ONEWAY_WAIT); +EVENT_TYPE(ET_CALL_EXCEPTION_WAIT); +EVENT_TYPE(ET_CALL_UNEXPECTED_EXCEPTION_WAIT); +EVENT_TYPE(ET_CALL_SET_VALUE); +EVENT_TYPE(ET_CALL_GET_VALUE); +EVENT_TYPE(ET_WAIT_RETURN); + +EventLog::EventLog() { + id_ = nextId_++; + debug("New log: %d", id_); +} + +void EventLog::append(EventType type, + uint32_t connectionId, + uint32_t callId, + const std::string& message) { + Synchronized s(monitor_); + debug("%d <-- %u, %u, %s \"%s\"", id_, connectionId, callId, type, message.c_str()); + + Event e(type, connectionId, callId, message); + events_.push_back(e); + + monitor_.notify(); +} + +Event EventLog::waitForEvent(int64_t timeout) { + Synchronized s(monitor_); + + try { + while (events_.empty()) { + monitor_.wait(timeout); + } + } catch (const TimedOutException &) { + return Event(ET_LOG_END, 0, 0, ""); + } + + Event event = events_.front(); + events_.pop_front(); + return event; +} + +Event EventLog::waitForConnEvent(uint32_t connId, int64_t timeout) { + Synchronized s(monitor_); + + auto it = events_.begin(); + while (true) { + try { + // TODO: it would be nicer to honor timeout for the duration of this + // call, rather than restarting it for each call to wait(). It shouldn't + // be a big problem in practice, though. + while (it == events_.end()) { + monitor_.wait(timeout); + } + } catch (const TimedOutException &) { + return Event(ET_LOG_END, 0, 0, ""); + } + + if (it->connectionId == connId) { + Event event = *it; + events_.erase(it); + return event; + } + } +} +} +} +} // apache::thrift::test diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.h b/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.h new file mode 100644 index 000000000..4f8275db9 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/EventLog.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_EVENTLOG_H_ +#define _THRIFT_TEST_EVENTLOG_H_ 1 + +#include <thrift/concurrency/Monitor.h> + +namespace apache { +namespace thrift { +namespace test { + +// Initially I made EventType an enum, but using char* results +// in much more readable error messages when there is a mismatch. +// It also lets users of EventLog easily define their own new types. +// Comparing the literal pointer values should be safe, barring any strange +// linking setup that results in duplicate symbols. +typedef const char* EventType; + +struct Event { + Event(EventType type, uint32_t connectionId, uint32_t callId, const std::string& message) + : type(type), connectionId(connectionId), callId(callId), message(message) {} + + EventType type; + uint32_t connectionId; + uint32_t callId; + std::string message; +}; + +class EventLog { +public: + static EventType ET_LOG_END; + static EventType ET_CONN_CREATED; + static EventType ET_CONN_DESTROYED; + static EventType ET_CALL_STARTED; + static EventType ET_CALL_FINISHED; + static EventType ET_PROCESS; + static EventType ET_PRE_READ; + static EventType ET_POST_READ; + static EventType ET_PRE_WRITE; + static EventType ET_POST_WRITE; + static EventType ET_ASYNC_COMPLETE; + static EventType ET_HANDLER_ERROR; + + static EventType ET_CALL_INCREMENT_GENERATION; + static EventType ET_CALL_GET_GENERATION; + static EventType ET_CALL_ADD_STRING; + static EventType ET_CALL_GET_STRINGS; + static EventType ET_CALL_GET_DATA_WAIT; + static EventType ET_CALL_ONEWAY_WAIT; + static EventType ET_CALL_UNEXPECTED_EXCEPTION_WAIT; + static EventType ET_CALL_EXCEPTION_WAIT; + static EventType ET_WAIT_RETURN; + static EventType ET_CALL_SET_VALUE; + static EventType ET_CALL_GET_VALUE; + + EventLog(); + + void append(EventType type, + uint32_t connectionId, + uint32_t callId, + const std::string& message = ""); + + Event waitForEvent(int64_t timeout = 500); + Event waitForConnEvent(uint32_t connId, int64_t timeout = 500); + +protected: + typedef std::list<Event> EventList; + + concurrency::Monitor monitor_; + EventList events_; + uint32_t id_; + + static uint32_t nextId_; +}; +} +} +} // apache::thrift::test + +#endif // _THRIFT_TEST_EVENTLOG_H_ diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/Handlers.h b/src/jaegertracing/thrift/lib/cpp/test/processor/Handlers.h new file mode 100644 index 000000000..05d19edd9 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/Handlers.h @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_PROCESSOR_TEST_HANDLERS_H_ +#define _THRIFT_PROCESSOR_TEST_HANDLERS_H_ 1 + +#include "EventLog.h" +#include "gen-cpp/ParentService.h" +#include "gen-cpp/ChildService.h" + +namespace apache { +namespace thrift { +namespace test { + +class ParentHandler : virtual public ParentServiceIf { +public: + ParentHandler(const std::shared_ptr<EventLog>& log) + : triggerMonitor(&mutex_), generation_(0), wait_(false), log_(log) {} + + int32_t incrementGeneration() override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_INCREMENT_GENERATION, 0, 0); + return ++generation_; + } + + int32_t getGeneration() override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_GENERATION, 0, 0); + return generation_; + } + + void addString(const std::string& s) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_ADD_STRING, 0, 0); + strings_.push_back(s); + } + + void getStrings(std::vector<std::string>& _return) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_STRINGS, 0, 0); + _return = strings_; + } + + void getDataWait(std::string& _return, const int32_t length) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_DATA_WAIT, 0, 0); + + blockUntilTriggered(); + + _return.append(length, 'a'); + } + + void onewayWait() override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_ONEWAY_WAIT, 0, 0); + + blockUntilTriggered(); + } + + void exceptionWait(const std::string& message) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_EXCEPTION_WAIT, 0, 0); + + blockUntilTriggered(); + + MyError e; + e.message = message; + throw e; + } + + void unexpectedExceptionWait(const std::string& message) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, 0, 0); + + blockUntilTriggered(); + + MyError e; + e.message = message; + throw e; + } + + /** + * After prepareTriggeredCall() is invoked, calls to any of the *Wait() + * functions won't return until triggerPendingCalls() is invoked + * + * This has to be a separate function invoked by the main test thread + * in order to to avoid race conditions. + */ + void prepareTriggeredCall() { + concurrency::Guard g(mutex_); + wait_ = true; + } + + /** + * Wake up all calls waiting in blockUntilTriggered() + */ + void triggerPendingCalls() { + concurrency::Guard g(mutex_); + wait_ = false; + triggerMonitor.notifyAll(); + } + +protected: + /** + * blockUntilTriggered() won't return until triggerPendingCalls() is invoked + * in another thread. + * + * This should only be called when already holding mutex_. + */ + void blockUntilTriggered() { + while (wait_) { + triggerMonitor.waitForever(); + } + + // Log an event when we return + log_->append(EventLog::ET_WAIT_RETURN, 0, 0); + } + + concurrency::Mutex mutex_; + concurrency::Monitor triggerMonitor; + int32_t generation_; + bool wait_; + std::vector<std::string> strings_; + std::shared_ptr<EventLog> log_; +}; + +#ifdef _WIN32 + #pragma warning( push ) + #pragma warning (disable : 4250 ) //inheriting methods via dominance +#endif + +class ChildHandler : public ParentHandler, virtual public ChildServiceIf { +public: + ChildHandler(const std::shared_ptr<EventLog>& log) : ParentHandler(log), value_(0) {} + + int32_t setValue(const int32_t value) override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_SET_VALUE, 0, 0); + + int32_t oldValue = value_; + value_ = value; + return oldValue; + } + + int32_t getValue() override { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_VALUE, 0, 0); + + return value_; + } + +protected: + int32_t value_; +}; + +#ifdef _WIN32 + #pragma warning( pop ) +#endif + +struct ConnContext { +public: + ConnContext(std::shared_ptr<protocol::TProtocol> in, + std::shared_ptr<protocol::TProtocol> out, + uint32_t id) + : input(in), output(out), id(id) {} + + std::shared_ptr<protocol::TProtocol> input; + std::shared_ptr<protocol::TProtocol> output; + uint32_t id; +}; + +struct CallContext { +public: + CallContext(ConnContext* context, uint32_t id, const std::string& name) + : connContext(context), name(name), id(id) {} + + ConnContext* connContext; + std::string name; + uint32_t id; +}; + +class ServerEventHandler : public server::TServerEventHandler { +public: + ServerEventHandler(const std::shared_ptr<EventLog>& log) : nextId_(1), log_(log) {} + + void preServe() override {} + + void* createContext(std::shared_ptr<protocol::TProtocol> input, + std::shared_ptr<protocol::TProtocol> output) override { + ConnContext* context = new ConnContext(input, output, nextId_); + ++nextId_; + log_->append(EventLog::ET_CONN_CREATED, context->id, 0); + return context; + } + + void deleteContext(void* serverContext, + std::shared_ptr<protocol::TProtocol> input, + std::shared_ptr<protocol::TProtocol> output) override { + auto* context = reinterpret_cast<ConnContext*>(serverContext); + + if (input != context->input) { + abort(); + } + if (output != context->output) { + abort(); + } + + log_->append(EventLog::ET_CONN_DESTROYED, context->id, 0); + + delete context; + } + + void processContext(void* serverContext, + std::shared_ptr<transport::TTransport> transport) override { +// TODO: We currently don't test the behavior of the processContext() +// calls. The various server implementations call processContext() at +// slightly different times, and it is too annoying to try and account for +// their various differences. +// +// TThreadedServer, TThreadPoolServer, and TSimpleServer usually wait until +// they see the first byte of a request before calling processContext(). +// However, they don't wait for the first byte of the very first request, +// and instead immediately call processContext() before any data is +// received. +// +// TNonblockingServer always waits until receiving the full request before +// calling processContext(). +#if 0 + ConnContext* context = reinterpret_cast<ConnContext*>(serverContext); + log_->append(EventLog::ET_PROCESS, context->id, 0); +#else + THRIFT_UNUSED_VARIABLE(serverContext); + THRIFT_UNUSED_VARIABLE(transport); +#endif + } + +protected: + uint32_t nextId_; + std::shared_ptr<EventLog> log_; +}; + +class ProcessorEventHandler : public TProcessorEventHandler { +public: + ProcessorEventHandler(const std::shared_ptr<EventLog>& log) : nextId_(1), log_(log) {} + + void* getContext(const char* fnName, void* serverContext) override { + auto* connContext = reinterpret_cast<ConnContext*>(serverContext); + + CallContext* context = new CallContext(connContext, nextId_, fnName); + ++nextId_; + + log_->append(EventLog::ET_CALL_STARTED, connContext->id, context->id, fnName); + return context; + } + + void freeContext(void* ctx, const char* fnName) override { + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_CALL_FINISHED, context->connContext->id, context->id, fnName); + delete context; + } + + void preRead(void* ctx, const char* fnName) override { + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_PRE_READ, context->connContext->id, context->id, fnName); + } + + void postRead(void* ctx, const char* fnName, uint32_t bytes) override { + THRIFT_UNUSED_VARIABLE(bytes); + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_POST_READ, context->connContext->id, context->id, fnName); + } + + void preWrite(void* ctx, const char* fnName) override { + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_PRE_WRITE, context->connContext->id, context->id, fnName); + } + + void postWrite(void* ctx, const char* fnName, uint32_t bytes) override { + THRIFT_UNUSED_VARIABLE(bytes); + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_POST_WRITE, context->connContext->id, context->id, fnName); + } + + void asyncComplete(void* ctx, const char* fnName) override { + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_ASYNC_COMPLETE, context->connContext->id, context->id, fnName); + } + + void handlerError(void* ctx, const char* fnName) override { + auto* context = reinterpret_cast<CallContext*>(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_HANDLER_ERROR, context->connContext->id, context->id, fnName); + } + +protected: + void checkName(const CallContext* context, const char* fnName) { + // Note: we can't use BOOST_CHECK_EQUAL here, since the handler runs in a + // different thread from the test functions. Just abort if the names are + // different + if (context->name != fnName) { + fprintf(stderr, + "call context name mismatch: \"%s\" != \"%s\"\n", + context->name.c_str(), + fnName); + fflush(stderr); + abort(); + } + } + + uint32_t nextId_; + std::shared_ptr<EventLog> log_; +}; +} +} +} // apache::thrift::test + +#endif // _THRIFT_PROCESSOR_TEST_HANDLERS_H_ diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/ProcessorTest.cpp b/src/jaegertracing/thrift/lib/cpp/test/processor/ProcessorTest.cpp new file mode 100644 index 000000000..a36ef3eec --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/ProcessorTest.cpp @@ -0,0 +1,929 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file contains tests that ensure TProcessorEventHandler and + * TServerEventHandler are invoked properly by the various server + * implementations. + */ + +#include <boost/test/unit_test.hpp> + +#include <thrift/concurrency/ThreadFactory.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/server/TThreadPoolServer.h> +#include <thrift/server/TNonblockingServer.h> +#include <thrift/server/TSimpleServer.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TNonblockingServerSocket.h> + +#include "EventLog.h" +#include "ServerThread.h" +#include "Handlers.h" +#include "gen-cpp/ChildService.h" + +using namespace apache::thrift; +using namespace apache::thrift::concurrency; +using namespace apache::thrift::protocol; +using namespace apache::thrift::server; +using namespace apache::thrift::test; +using namespace apache::thrift::transport; +using std::string; +using std::vector; + +/* + * Traits classes that encapsulate how to create various types of servers. + */ + +class TSimpleServerTraits { +public: + typedef TSimpleServer ServerType; + + std::shared_ptr<TSimpleServer> createServer( + const std::shared_ptr<TProcessor>& processor, + uint16_t port, + const std::shared_ptr<TTransportFactory>& transportFactory, + const std::shared_ptr<TProtocolFactory>& protocolFactory) { + std::shared_ptr<TServerSocket> socket(new TServerSocket(port)); + return std::shared_ptr<TSimpleServer>( + new TSimpleServer(processor, socket, transportFactory, protocolFactory)); + } +}; + +class TThreadedServerTraits { +public: + typedef TThreadedServer ServerType; + + std::shared_ptr<TThreadedServer> createServer( + const std::shared_ptr<TProcessor>& processor, + uint16_t port, + const std::shared_ptr<TTransportFactory>& transportFactory, + const std::shared_ptr<TProtocolFactory>& protocolFactory) { + std::shared_ptr<TServerSocket> socket(new TServerSocket(port)); + return std::shared_ptr<TThreadedServer>( + new TThreadedServer(processor, socket, transportFactory, protocolFactory)); + } +}; + +class TThreadPoolServerTraits { +public: + typedef TThreadPoolServer ServerType; + + std::shared_ptr<TThreadPoolServer> createServer( + const std::shared_ptr<TProcessor>& processor, + uint16_t port, + const std::shared_ptr<TTransportFactory>& transportFactory, + const std::shared_ptr<TProtocolFactory>& protocolFactory) { + std::shared_ptr<TServerSocket> socket(new TServerSocket(port)); + + std::shared_ptr<ThreadFactory> threadFactory(new ThreadFactory); + std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(8); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + return std::shared_ptr<TThreadPoolServer>( + new TThreadPoolServer(processor, socket, transportFactory, protocolFactory, threadManager)); + } +}; + +class TNonblockingServerTraits { +public: + typedef TNonblockingServer ServerType; + + std::shared_ptr<TNonblockingServer> createServer( + const std::shared_ptr<TProcessor>& processor, + uint16_t port, + const std::shared_ptr<TTransportFactory>& transportFactory, + const std::shared_ptr<TProtocolFactory>& protocolFactory) { + // TNonblockingServer automatically uses TFramedTransport. + // Raise an exception if the supplied transport factory is not a + // TFramedTransportFactory + auto* framedFactory + = dynamic_cast<TFramedTransportFactory*>(transportFactory.get()); + if (framedFactory == nullptr) { + throw TException("TNonblockingServer must use TFramedTransport"); + } + + std::shared_ptr<TNonblockingServerSocket> socket(new TNonblockingServerSocket(port)); + std::shared_ptr<ThreadFactory> threadFactory(new ThreadFactory); + std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(8); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + return std::shared_ptr<TNonblockingServer>( + new TNonblockingServer(processor, protocolFactory, socket, threadManager)); + } +}; + +class TNonblockingServerNoThreadsTraits { +public: + typedef TNonblockingServer ServerType; + + std::shared_ptr<TNonblockingServer> createServer( + const std::shared_ptr<TProcessor>& processor, + uint16_t port, + const std::shared_ptr<TTransportFactory>& transportFactory, + const std::shared_ptr<TProtocolFactory>& protocolFactory) { + // TNonblockingServer automatically uses TFramedTransport. + // Raise an exception if the supplied transport factory is not a + // TFramedTransportFactory + auto* framedFactory + = dynamic_cast<TFramedTransportFactory*>(transportFactory.get()); + if (framedFactory == nullptr) { + throw TException("TNonblockingServer must use TFramedTransport"); + } + + std::shared_ptr<TNonblockingServerSocket> socket(new TNonblockingServerSocket(port)); + // Use a NULL ThreadManager + std::shared_ptr<ThreadManager> threadManager; + return std::shared_ptr<TNonblockingServer>( + new TNonblockingServer(processor, protocolFactory, socket, threadManager)); + } +}; + +/* + * Traits classes for controlling if we instantiate templated or generic + * protocol factories, processors, clients, etc. + * + * The goal is to allow the outer test code to select which server type is + * being tested, and whether or not we are testing the templated classes, or + * the generic classes. + * + * Each specific test case can control whether we create a child or parent + * server, and whether we use TFramedTransport or TBufferedTransport. + */ + +class UntemplatedTraits { +public: + typedef TBinaryProtocolFactory ProtocolFactory; + typedef TBinaryProtocol Protocol; + + typedef ParentServiceProcessor ParentProcessor; + typedef ChildServiceProcessor ChildProcessor; + typedef ParentServiceClient ParentClient; + typedef ChildServiceClient ChildClient; +}; + +class TemplatedTraits { +public: + typedef TBinaryProtocolFactoryT<TBufferBase> ProtocolFactory; + typedef TBinaryProtocolT<TBufferBase> Protocol; + + typedef ParentServiceProcessorT<Protocol> ParentProcessor; + typedef ChildServiceProcessorT<Protocol> ChildProcessor; + typedef ParentServiceClientT<Protocol> ParentClient; + typedef ChildServiceClientT<Protocol> ChildClient; +}; + +template <typename TemplateTraits_> +class ParentServiceTraits { +public: + typedef typename TemplateTraits_::ParentProcessor Processor; + typedef typename TemplateTraits_::ParentClient Client; + typedef ParentHandler Handler; + + typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory; + typedef typename TemplateTraits_::Protocol Protocol; +}; + +template <typename TemplateTraits_> +class ChildServiceTraits { +public: + typedef typename TemplateTraits_::ChildProcessor Processor; + typedef typename TemplateTraits_::ChildClient Client; + typedef ChildHandler Handler; + + typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory; + typedef typename TemplateTraits_::Protocol Protocol; +}; + +// TODO: It would be nicer if the TTransportFactory types defined a typedef, +// to allow us to figure out the exact transport type without having to pass it +// in as a separate template parameter here. +// +// It would also be niec if they used covariant return types. Unfortunately, +// since they return shared_ptr instead of raw pointers, covariant return types +// won't work. +template <typename ServerTraits_, + typename ServiceTraits_, + typename TransportFactory_ = TFramedTransportFactory, + typename Transport_ = TFramedTransport> +class ServiceState : public ServerState { +public: + typedef typename ServiceTraits_::Processor Processor; + typedef typename ServiceTraits_::Client Client; + typedef typename ServiceTraits_::Handler Handler; + + ServiceState() + : port_(0), + log_(new EventLog), + handler_(new Handler(log_)), + processor_(new Processor(handler_)), + transportFactory_(new TransportFactory_), + protocolFactory_(new typename ServiceTraits_::ProtocolFactory), + serverEventHandler_(new ServerEventHandler(log_)), + processorEventHandler_(new ProcessorEventHandler(log_)) { + processor_->setEventHandler(processorEventHandler_); + } + + std::shared_ptr<TServer> createServer(uint16_t port) override { + ServerTraits_ serverTraits; + return serverTraits.createServer(processor_, port, transportFactory_, protocolFactory_); + } + + std::shared_ptr<TServerEventHandler> getServerEventHandler() override { return serverEventHandler_; } + + void bindSuccessful(uint16_t port) override { port_ = port; } + + uint16_t getPort() const { return port_; } + + const std::shared_ptr<EventLog>& getLog() const { return log_; } + + const std::shared_ptr<Handler>& getHandler() const { return handler_; } + + std::shared_ptr<Client> createClient() { + typedef typename ServiceTraits_::Protocol Protocol; + + std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port_)); + std::shared_ptr<Transport_> transport(new Transport_(socket)); + std::shared_ptr<Protocol> protocol(new Protocol(transport)); + transport->open(); + + std::shared_ptr<Client> client(new Client(protocol)); + return client; + } + +private: + uint16_t port_; + std::shared_ptr<EventLog> log_; + std::shared_ptr<Handler> handler_; + std::shared_ptr<Processor> processor_; + std::shared_ptr<TTransportFactory> transportFactory_; + std::shared_ptr<TProtocolFactory> protocolFactory_; + std::shared_ptr<TServerEventHandler> serverEventHandler_; + std::shared_ptr<TProcessorEventHandler> processorEventHandler_; +}; + +/** + * Check that there are no more events in the log + */ +void checkNoEvents(const std::shared_ptr<EventLog>& log) { + // Wait for an event with a very short timeout period. We don't expect + // anything to be present, so we will normally wait for the full timeout. + // On the other hand, a non-zero timeout is nice since it does give a short + // window for events to arrive in case there is a problem. + Event event = log->waitForEvent(10); + BOOST_CHECK_EQUAL(EventLog::ET_LOG_END, event.type); +} + +/** + * Check for the events that should be logged when a new connection is created. + * + * Returns the connection ID allocated by the server. + */ +uint32_t checkNewConnEvents(const std::shared_ptr<EventLog>& log) { + // Check for an ET_CONN_CREATED event + Event event = log->waitForEvent(2500); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type); + + // Some servers call the processContext() hook immediately. + // Others (TNonblockingServer) only call it once a full request is received. + // We don't check for it yet, to allow either behavior. + + return event.connectionId; +} + +/** + * Check for the events that should be logged when a connection is closed. + */ +void checkCloseEvents(const std::shared_ptr<EventLog>& log, uint32_t connId) { + // Check for an ET_CONN_DESTROYED event + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + + // Make sure there are no more events + checkNoEvents(log); +} + +/** + * Check for the events that should be logged when a call is received + * and the handler is invoked. + * + * It does not check for anything after the handler invocation. + * + * Returns the call ID allocated by the server. + */ +uint32_t checkCallHandlerEvents(const std::shared_ptr<EventLog>& log, + uint32_t connId, + EventType callType, + const string& callName) { + // Call started + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callName, event.message); + uint32_t callId = event.callId; + + // Pre-read + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Post-read + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Handler invocation + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(callType, event.type); + // The handler doesn't have any connection or call context, + // so the connectionId and callId in this event aren't valid + + return callId; +} + +/** + * Check for the events that should be after a handler returns. + */ +void checkCallPostHandlerEvents(const std::shared_ptr<EventLog>& log, + uint32_t connId, + uint32_t callId, + const string& callName) { + // Pre-write + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Post-write + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Call finished + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // It is acceptable for servers to call processContext() again immediately + // to start waiting on the next request. However, some servers wait before + // getting either a partial request or the full request before calling + // processContext(). We don't check for the next call to processContext() + // yet. +} + +/** + * Check for the events that should be logged when a call is made. + * + * This just calls checkCallHandlerEvents() followed by + * checkCallPostHandlerEvents(). + * + * Returns the call ID allocated by the server. + */ +uint32_t checkCallEvents(const std::shared_ptr<EventLog>& log, + uint32_t connId, + EventType callType, + const string& callName) { + uint32_t callId = checkCallHandlerEvents(log, connId, callType, callName); + checkCallPostHandlerEvents(log, connId, callId, callName); + + return callId; +} + +/* + * Test functions + */ + +template <typename State_> +void testParentService(const std::shared_ptr<State_>& state) { + std::shared_ptr<typename State_::Client> client = state->createClient(); + + int32_t gen = client->getGeneration(); + int32_t newGen = client->incrementGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + newGen = client->getGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + + client->addString("foo"); + client->addString("bar"); + client->addString("asdf"); + + vector<string> strings; + client->getStrings(strings); + BOOST_REQUIRE_EQUAL(3, strings.size()); + BOOST_REQUIRE_EQUAL("foo", strings[0]); + BOOST_REQUIRE_EQUAL("bar", strings[1]); + BOOST_REQUIRE_EQUAL("asdf", strings[2]); +} + +template <typename State_> +void testChildService(const std::shared_ptr<State_>& state) { + std::shared_ptr<typename State_::Client> client = state->createClient(); + + // Test calling some of the parent methids via the a child client + int32_t gen = client->getGeneration(); + int32_t newGen = client->incrementGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + newGen = client->getGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + + // Test some of the child methods + client->setValue(10); + BOOST_CHECK_EQUAL(10, client->getValue()); + BOOST_CHECK_EQUAL(10, client->setValue(99)); + BOOST_CHECK_EQUAL(99, client->getValue()); +} + +template <typename ServerTraits, typename TemplateTraits> +void testBasicService() { + typedef ServiceState<ServerTraits, ParentServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + testParentService(state); +} + +template <typename ServerTraits, typename TemplateTraits> +void testInheritedService() { + typedef ServiceState<ServerTraits, ChildServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + testParentService(state); + testChildService(state); +} + +/** + * Test to make sure that the TServerEventHandler and TProcessorEventHandler + * methods are invoked in the correct order with the actual events. + */ +template <typename ServerTraits, typename TemplateTraits> +void testEventSequencing() { + // We use TBufferedTransport for this test, instead of TFramedTransport. + // This way the server will start processing data as soon as it is received, + // instead of waiting for the full request. This is necessary so we can + // separate the preRead() and postRead() events. + typedef ServiceState<ServerTraits, + ChildServiceTraits<TemplateTraits>, + TBufferedTransportFactory, + TBufferedTransport> State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + const std::shared_ptr<EventLog>& log = state->getLog(); + + // Make sure we're at the end of the log + checkNoEvents(log); + + state->getHandler()->prepareTriggeredCall(); + + // Make sure createContext() is called after a connection has been + // established. We open a plain socket instead of creating a client. + std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", state->getPort())); + socket->open(); + + // Make sure the proper events occurred after a new connection + uint32_t connId = checkNewConnEvents(log); + + // Send a message header. We manually construct the request so that we + // can test the timing for the preRead() call. + string requestName = "getDataWait"; + string eventName = "ParentService.getDataWait"; + auto seqid = int32_t(time(nullptr)); + TBinaryProtocol protocol(socket); + protocol.writeMessageBegin(requestName, T_CALL, seqid); + socket->flush(); + + // Make sure we saw the call started and pre-read events + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + uint32_t callId = event.callId; + + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Make sure there are no new events + checkNoEvents(log); + + // Send the rest of the request + protocol.writeStructBegin("ParentService_getDataNotified_pargs"); + protocol.writeFieldBegin("length", apache::thrift::protocol::T_I32, 1); + protocol.writeI32(8 * 1024 * 1024); + protocol.writeFieldEnd(); + protocol.writeFieldStop(); + protocol.writeStructEnd(); + protocol.writeMessageEnd(); + socket->writeEnd(); + socket->flush(); + + // We should then see postRead() + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Then the handler should be invoked + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_GET_DATA_WAIT, event.type); + + // The handler won't respond until we notify it. + // Make sure there are no more events. + checkNoEvents(log); + + // Notify the handler that it should return + // We just use a global lock for now, since it is easiest + state->getHandler()->triggerPendingCalls(); + + // The handler will log a separate event before it returns + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // We should then see preWrite() + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // We requested more data than can be buffered, and we aren't reading it, + // so the server shouldn't be able to finish its write yet. + // Make sure there are no more events. + checkNoEvents(log); + + // Read the response header + string responseName; + int32_t responseSeqid = 0; + apache::thrift::protocol::TMessageType responseType; + protocol.readMessageBegin(responseName, responseType, responseSeqid); + BOOST_CHECK_EQUAL(responseSeqid, seqid); + BOOST_CHECK_EQUAL(requestName, responseName); + BOOST_CHECK_EQUAL(responseType, T_REPLY); + // Read the body. We just ignore it for now. + protocol.skip(T_STRUCT); + + // Now that we have read, the server should have finished sending the data + // and called the postWrite() handler + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Call finished should be last + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // There should be no more events + checkNoEvents(log); + + // Close the connection, and make sure we get a connection destroyed event + socket->close(); + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + + // There should be no more events + checkNoEvents(log); +} + +template <typename ServerTraits, typename TemplateTraits> +void testSeparateConnections() { + typedef ServiceState<ServerTraits, ChildServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + const std::shared_ptr<EventLog>& log = state->getLog(); + + // Create a client + std::shared_ptr<typename State::Client> client1 = state->createClient(); + + // Make sure the expected events were logged + uint32_t client1Id = checkNewConnEvents(log); + + // Create a second client + std::shared_ptr<typename State::Client> client2 = state->createClient(); + + // Make sure the expected events were logged + uint32_t client2Id = checkNewConnEvents(log); + + // The two connections should have different IDs + BOOST_CHECK_NE(client1Id, client2Id); + + // Make a call, and check for the proper events + int32_t value = 5; + client1->setValue(value); + uint32_t call1 + = checkCallEvents(log, client1Id, EventLog::ET_CALL_SET_VALUE, "ChildService.setValue"); + + // Make a call with client2 + int32_t v = client2->getValue(); + BOOST_CHECK_EQUAL(value, v); + checkCallEvents(log, client2Id, EventLog::ET_CALL_GET_VALUE, "ChildService.getValue"); + + // Make another call with client1 + v = client1->getValue(); + BOOST_CHECK_EQUAL(value, v); + uint32_t call2 + = checkCallEvents(log, client1Id, EventLog::ET_CALL_GET_VALUE, "ChildService.getValue"); + BOOST_CHECK_NE(call1, call2); + + // Close the second client, and check for the appropriate events + client2.reset(); + checkCloseEvents(log, client2Id); +} + +template <typename ServerTraits, typename TemplateTraits> +void testOnewayCall() { + typedef ServiceState<ServerTraits, ChildServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + const std::shared_ptr<EventLog>& log = state->getLog(); + + // Create a client + std::shared_ptr<typename State::Client> client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Make a oneway call + // It should return immediately, even though the server's handler + // won't return right away + state->getHandler()->prepareTriggeredCall(); + client->onewayWait(); + string callName = "ParentService.onewayWait"; + uint32_t callId = checkCallHandlerEvents(log, connId, EventLog::ET_CALL_ONEWAY_WAIT, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now we should see the async complete event, then call finished + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_ASYNC_COMPLETE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + +template <typename ServerTraits, typename TemplateTraits> +void testExpectedError() { + typedef ServiceState<ServerTraits, ChildServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + const std::shared_ptr<EventLog>& log = state->getLog(); + + // Create a client + std::shared_ptr<typename State::Client> client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Send the exceptionWait() call + state->getHandler()->prepareTriggeredCall(); + string message = "test 1234 test"; + client->send_exceptionWait(message); + string callName = "ParentService.exceptionWait"; + uint32_t callId = checkCallHandlerEvents(log, connId, EventLog::ET_CALL_EXCEPTION_WAIT, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now receive the response + try { + client->recv_exceptionWait(); + BOOST_FAIL("expected MyError to be thrown"); + } catch (const MyError& e) { + BOOST_CHECK_EQUAL(message, e.message); + // Check if std::exception::what() is handled properly + size_t message_pos = string(e.what()).find("TException - service has thrown: MyError"); + BOOST_CHECK_NE(message_pos, string::npos); + } + + // Now we should see the events for a normal call finish + checkCallPostHandlerEvents(log, connId, callId, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + +template <typename ServerTraits, typename TemplateTraits> +void testUnexpectedError() { + typedef ServiceState<ServerTraits, ChildServiceTraits<TemplateTraits> > State; + + // Start the server + std::shared_ptr<State> state(new State); + ServerThread serverThread(state, true); + + const std::shared_ptr<EventLog>& log = state->getLog(); + + // Create a client + std::shared_ptr<typename State::Client> client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Send the unexpectedExceptionWait() call + state->getHandler()->prepareTriggeredCall(); + string message = "1234 test 5678"; + client->send_unexpectedExceptionWait(message); + string callName = "ParentService.unexpectedExceptionWait"; + uint32_t callId + = checkCallHandlerEvents(log, connId, EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now receive the response + try { + client->recv_unexpectedExceptionWait(); + BOOST_FAIL("expected TApplicationError to be thrown"); + } catch (const TApplicationException&) { + } + + // Now we should see a handler error event + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_HANDLER_ERROR, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // pre-write and post-write events aren't generated after a handler error + // (Even for non-oneway calls where a response is written.) + // + // A call finished event is logged when the call context is destroyed + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // There shouldn't be any more events + checkNoEvents(log); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + +// Macro to define simple tests that can be used with all server types +#define DEFINE_SIMPLE_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_basicService) { \ + testBasicService<Server##Traits, Template##Traits>(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_inheritedService) { \ + testInheritedService<Server##Traits, Template##Traits>(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_oneway) { \ + testOnewayCall<Server##Traits, Template##Traits>(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_exception) { \ + testExpectedError<Server##Traits, Template##Traits>(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_unexpectedException) { \ + testUnexpectedError<Server##Traits, Template##Traits>(); \ + } + +// Tests that require the server to process multiple connections concurrently +// (i.e., not TSimpleServer) +#define DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_separateConnections) { \ + testSeparateConnections<Server##Traits, Template##Traits>(); \ + } + +// The testEventSequencing() test manually generates a request for the server, +// and doesn't work with TFramedTransport. Therefore we can't test it with +// TNonblockingServer. +#define DEFINE_NOFRAME_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_eventSequencing) { \ + testEventSequencing<Server##Traits, Template##Traits>(); \ + } + +#define DEFINE_TNONBLOCKINGSERVER_TESTS(Server, Template) \ + DEFINE_SIMPLE_TESTS(Server, Template) \ + DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) + +#define DEFINE_ALL_SERVER_TESTS(Server, Template) \ + DEFINE_SIMPLE_TESTS(Server, Template) \ + DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \ + DEFINE_NOFRAME_TESTS(Server, Template) + +DEFINE_ALL_SERVER_TESTS(TThreadedServer, Templated) +DEFINE_ALL_SERVER_TESTS(TThreadedServer, Untemplated) +DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Templated) +DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Untemplated) + +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Templated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Untemplated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Templated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Untemplated) + +DEFINE_SIMPLE_TESTS(TSimpleServer, Templated) +DEFINE_SIMPLE_TESTS(TSimpleServer, Untemplated) +DEFINE_NOFRAME_TESTS(TSimpleServer, Templated) +DEFINE_NOFRAME_TESTS(TSimpleServer, Untemplated) + +// TODO: We should test TEventServer in the future. +// For now, it is known not to work correctly with TProcessorEventHandler. +#ifdef BOOST_TEST_DYN_LINK +bool init_unit_test_suite() { + ::boost::unit_test::framework::master_test_suite().p_name.value = "ProcessorTest"; + return true; +} + +int main( int argc, char* argv[] ) { + return ::boost::unit_test::unit_test_main(&init_unit_test_suite,argc,argv); +} +#else +::boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) { + THRIFT_UNUSED_VARIABLE(argc); + THRIFT_UNUSED_VARIABLE(argv); + ::boost::unit_test::framework::master_test_suite().p_name.value = "ProcessorTest"; + return NULL; +} +#endif diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.cpp b/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.cpp new file mode 100644 index 000000000..b0505005b --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.cpp @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_SERVERTHREAD_TCC_ +#define _THRIFT_TEST_SERVERTHREAD_TCC_ 1 + +#include "ServerThread.h" + +#include <thrift/concurrency/ThreadFactory.h> +#include <thrift/concurrency/ThreadManager.h> +#include <thrift/server/TThreadPoolServer.h> +#include <thrift/transport/TBufferTransports.h> +#include <thrift/transport/TServerSocket.h> + +namespace apache { +namespace thrift { +namespace test { + +void ServerThread::start() { + assert(!running_); + running_ = true; + + helper_.reset(new Helper(this)); + + // Start the other thread + concurrency::ThreadFactory threadFactory; + threadFactory.setDetached(false); + thread_ = threadFactory.newThread(helper_); + + thread_->start(); + + // Wait on the other thread to tell us that it has successfully + // bound to the port and started listening (or until an error occurs). + concurrency::Synchronized s(serverMonitor_); + while (!serving_ && !error_) { + serverMonitor_.waitForever(); + } + + if (error_) { + throw transport::TTransportException(transport::TTransportException::NOT_OPEN, + "failed to bind on server socket"); + } +} + +void ServerThread::stop() { + if (!running_) { + return; + } + + // Tell the server to stop + server_->stop(); + running_ = false; + + // Wait for the server thread to exit + // + // Note: this only works if all client connections have closed. The servers + // generally wait for everything to be closed before exiting; there currently + // isn't a way to tell them to just exit now, and shut down existing + // connections. + thread_->join(); +} + +void ServerThread::run() { + /* + * Try binding to several ports, in case the one we want is already in use. + */ + port_ = 12345; + unsigned int maxRetries = 10; + for (unsigned int n = 0; n < maxRetries; ++n) { + // Create the server + server_ = serverState_->createServer(port_); + // Install our helper as the server event handler, so that our + // preServe() method will be called once we've successfully bound to + // the port and are about to start listening. + server_->setServerEventHandler(helper_); + + try { + // Try to serve requests + server_->serve(); + } catch (const TException&) { + // TNonblockingServer throws a generic TException if it fails to bind. + // If we get a TException, we'll optimistically assume the bind failed. + ++port_; + continue; + } + + // Seriously? serve() is pretty lame. If it fails to start serving it + // just returns rather than throwing an exception. + // + // We have to use our preServe() hook to tell if serve() successfully + // started serving and is returning because stop() is called, or if it just + // failed to start serving in the first place. + concurrency::Synchronized s(serverMonitor_); + if (serving_) { + // Oh good, we started serving and are exiting because + // we're trying to stop. + serving_ = false; + return; + } else { + // We never started serving, probably because we failed to bind to the + // port. Increment the port number and try again. + ++port_; + continue; + } + } + + // We failed to bind on any port. + concurrency::Synchronized s(serverMonitor_); + error_ = true; + serverMonitor_.notify(); +} + +void ServerThread::preServe() { + // We bound to the port successfully, and are about to start serving requests + serverState_->bindSuccessful(port_); + + // Set the real server event handler (replacing ourself) + std::shared_ptr<server::TServerEventHandler> serverEventHandler + = serverState_->getServerEventHandler(); + server_->setServerEventHandler(serverEventHandler); + + // Notify the main thread that we have successfully started serving requests + concurrency::Synchronized s(serverMonitor_); + serving_ = true; + serverMonitor_.notify(); + + // Invoke preServe() on the real event handler, since we ate + // the original preServe() event. + if (serverEventHandler) { + serverEventHandler->preServe(); + } +} +} +} +} // apache::thrift::test + +#endif // _THRIFT_TEST_SERVERTHREAD_TCC_ diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.h b/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.h new file mode 100644 index 000000000..9cca2d600 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/ServerThread.h @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_SERVERTHREAD_H_ +#define _THRIFT_TEST_SERVERTHREAD_H_ 1 + +#include <thrift/TProcessor.h> +#include <thrift/protocol/TProtocol.h> +#include <thrift/server/TServer.h> +#include <thrift/transport/TTransport.h> + +#include "EventLog.h" + +namespace apache { +namespace thrift { +namespace test { + +/** + * A helper class to tell ServerThread how to create the server + */ +class ServerState { +public: + virtual ~ServerState() = default; + + /** + * Create a server to listen on the specified port. + * + * If the server returned fails to bind to the specified port when serve() is + * called on it, createServer() may be called again on a different port. + */ + virtual std::shared_ptr<server::TServer> createServer(uint16_t port) = 0; + + /** + * Get the TServerEventHandler to set on the server. + * + * This is only called after the server successfully binds and is about to + * start serving traffic. It is invoked from the server thread, rather than + * the main thread. + */ + virtual std::shared_ptr<server::TServerEventHandler> getServerEventHandler() { + return std::shared_ptr<server::TServerEventHandler>(); + } + + /** + * This method is called in the server thread after server binding succeeds. + * + * Subclasses may override this method if they wish to record the final + * port that was used for the server. + */ + virtual void bindSuccessful(uint16_t /*port*/) {} +}; + +/** + * ServerThread starts a thrift server running in a separate thread. + */ +class ServerThread { +public: + ServerThread(const std::shared_ptr<ServerState>& state, bool autoStart) + : port_(0), + running_(false), + serving_(false), + error_(false), + serverState_(state) { + if (autoStart) { + start(); + } + } + + void start(); + void stop(); + + uint16_t getPort() const { return port_; } + + ~ServerThread() { + if (running_) { + try { + stop(); + } catch (...) { + GlobalOutput.printf("error shutting down server"); + } + } + } + +protected: + // Annoying. thrift forces us to use shared_ptr, so we have to use + // a helper class that we can allocate on the heap and give to thrift. + // It would be simpler if we could just make Runnable and TServerEventHandler + // private base classes of ServerThread. + class Helper : public concurrency::Runnable, public server::TServerEventHandler { + public: + Helper(ServerThread* serverThread) : serverThread_(serverThread) {} + + void run() override { serverThread_->run(); } + + void preServe() override { serverThread_->preServe(); } + + private: + ServerThread* serverThread_; + }; + + void run(); + void preServe(); + + std::shared_ptr<Helper> helper_; + + uint16_t port_; + bool running_; + bool serving_; + bool error_; + concurrency::Monitor serverMonitor_; + + std::shared_ptr<ServerState> serverState_; + std::shared_ptr<server::TServer> server_; + std::shared_ptr<concurrency::Thread> thread_; +}; +} +} +} // apache::thrift::test + +#endif // _THRIFT_TEST_SERVERTHREAD_H_ diff --git a/src/jaegertracing/thrift/lib/cpp/test/processor/proc.thrift b/src/jaegertracing/thrift/lib/cpp/test/processor/proc.thrift new file mode 100644 index 000000000..ac3c5f953 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/processor/proc.thrift @@ -0,0 +1,22 @@ +namespace cpp apache.thrift.test + +exception MyError { + 1: string message +} + +service ParentService { + i32 incrementGeneration() + i32 getGeneration() + void addString(1: string s) + list<string> getStrings() + + binary getDataWait(1: i32 length) + oneway void onewayWait() + void exceptionWait(1: string message) throws (2: MyError error) + void unexpectedExceptionWait(1: string message) +} + +service ChildService extends ParentService { + i32 setValue(1: i32 value) + i32 getValue() +} |