diff options
Diffstat (limited to 'src/jaegertracing/thrift/contrib/async-test/test-server.cpp')
-rw-r--r-- | src/jaegertracing/thrift/contrib/async-test/test-server.cpp | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/async-test/test-server.cpp b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp new file mode 100644 index 000000000..b304e1bc9 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp @@ -0,0 +1,97 @@ +#include <tr1/functional> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/async/TAsyncProtocolProcessor.h> +#include <thrift/async/TEvhttpServer.h> +#include <thrift/async/TEvhttpClientChannel.h> +#include "Aggr.h" + +using std::tr1::bind; +using std::tr1::placeholders::_1; + +using apache::thrift::TException; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::async::TEvhttpServer; +using apache::thrift::async::TAsyncProcessor; +using apache::thrift::async::TAsyncBufferProcessor; +using apache::thrift::async::TAsyncProtocolProcessor; +using apache::thrift::async::TAsyncChannel; +using apache::thrift::async::TEvhttpClientChannel; + +class AggrAsyncHandler : public AggrCobSvIf { + protected: + struct RequestContext { + std::tr1::function<void(std::vector<int32_t> const& _return)> cob; + std::vector<int32_t> ret; + int pending_calls; + }; + + public: + AggrAsyncHandler() + : eb_(NULL) + , pfact_(new TBinaryProtocolFactory()) + { + leaf_ports_.push_back(8081); + leaf_ports_.push_back(8082); + } + + void addValue(std::tr1::function<void()> cob, const int32_t value) { + // Silently drop writes to the aggrgator. + return cob(); + } + + void getValues(std::tr1::function<void( + std::vector<int32_t> const& _return)> cob, + std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { + RequestContext* ctx = new RequestContext(); + ctx->cob = cob; + ctx->pending_calls = leaf_ports_.size(); + for (std::vector<int>::iterator it = leaf_ports_.begin(); + it != leaf_ports_.end(); ++it) { + boost::shared_ptr<TAsyncChannel> channel( + new TEvhttpClientChannel( + "localhost", "/", "127.0.0.1", *it, eb_)); + AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); + client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); + } + } + + void setEventBase(struct event_base* eb) { + eb_ = eb; + } + + void clientReturn(RequestContext* ctx, AggrCobClient* client) { + ctx->pending_calls -= 1; + + try { + std::vector<int32_t> subret; + client->recv_getValues(subret); + ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); + } catch (TException& exn) { + // TODO: Log error + } + + delete client; + + if (ctx->pending_calls == 0) { + ctx->cob(ctx->ret); + delete ctx; + } + } + + protected: + struct event_base* eb_; + std::vector<int> leaf_ports_; + boost::shared_ptr<TProtocolFactory> pfact_; +}; + + +int main() { + boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); + boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); + boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); + boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); + boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); + handler->setEventBase(server->getEventBase()); + server->serve(); +} |