diff options
Diffstat (limited to '')
4 files changed, 162 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/async-test/Makefile b/src/jaegertracing/thrift/contrib/async-test/Makefile new file mode 100644 index 000000000..33e7f8ad1 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/Makefile @@ -0,0 +1,33 @@ +THRIFT = thrift +CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent +LDLIBS = `pkg-config --libs thrift thrift-nb` -levent +CXXFLAGS += -g -O0 + +GENNAMES = Aggr aggr_types +GENHEADERS = $(addsuffix .h, $(GENNAMES)) +GENSRCS = $(addsuffix .cpp, $(GENNAMES)) +GENOBJS = $(addsuffix .o, $(GENNAMES)) + +PYLIBS = aggr/__init__.py + +PROGS = test-server + +all: $(PYLIBS) $(PROGS) + +test-server: test-server.o $(GENOBJS) + +test-server.o: $(GENSRCS) + +aggr/__init__.py: aggr.thrift + $(RM) $(dir $@) + $(THRIFT) --gen py:newstyle $< + mv gen-py/$(dir $@) . + +$(GENSRCS): aggr.thrift + $(THRIFT) --gen cpp:cob_style $< + mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) . + +clean: + $(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr + +.PHONY: clean diff --git a/src/jaegertracing/thrift/contrib/async-test/aggr.thrift b/src/jaegertracing/thrift/contrib/async-test/aggr.thrift new file mode 100644 index 000000000..c016a65d8 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/aggr.thrift @@ -0,0 +1,8 @@ +exception Error { + 1: string desc; +} + +service Aggr { + void addValue(1: i32 value); + list<i32> getValues() throws (1: Error err); +} diff --git a/src/jaegertracing/thrift/contrib/async-test/test-leaf.py b/src/jaegertracing/thrift/contrib/async-test/test-leaf.py new file mode 100755 index 000000000..4ea4a9b8c --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/test-leaf.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +import sys +import time +from thrift.transport import TTransport +from thrift.transport import TSocket +from thrift.protocol import TBinaryProtocol +from thrift.server import THttpServer +from aggr import Aggr + + +class AggrHandler(Aggr.Iface): + def __init__(self): + self.values = [] + + def addValue(self, value): + self.values.append(value) + + def getValues(self, ): + time.sleep(1) + return self.values + +processor = Aggr.Processor(AggrHandler()) +pfactory = TBinaryProtocol.TBinaryProtocolFactory() +THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve() diff --git a/src/jaegertracing/thrift/contrib/async-test/test-server.cpp b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp new file mode 100644 index 000000000..b304e1bc9 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp @@ -0,0 +1,97 @@ +#include <tr1/functional> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/async/TAsyncProtocolProcessor.h> +#include <thrift/async/TEvhttpServer.h> +#include <thrift/async/TEvhttpClientChannel.h> +#include "Aggr.h" + +using std::tr1::bind; +using std::tr1::placeholders::_1; + +using apache::thrift::TException; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::async::TEvhttpServer; +using apache::thrift::async::TAsyncProcessor; +using apache::thrift::async::TAsyncBufferProcessor; +using apache::thrift::async::TAsyncProtocolProcessor; +using apache::thrift::async::TAsyncChannel; +using apache::thrift::async::TEvhttpClientChannel; + +class AggrAsyncHandler : public AggrCobSvIf { + protected: + struct RequestContext { + std::tr1::function<void(std::vector<int32_t> const& _return)> cob; + std::vector<int32_t> ret; + int pending_calls; + }; + + public: + AggrAsyncHandler() + : eb_(NULL) + , pfact_(new TBinaryProtocolFactory()) + { + leaf_ports_.push_back(8081); + leaf_ports_.push_back(8082); + } + + void addValue(std::tr1::function<void()> cob, const int32_t value) { + // Silently drop writes to the aggrgator. + return cob(); + } + + void getValues(std::tr1::function<void( + std::vector<int32_t> const& _return)> cob, + std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { + RequestContext* ctx = new RequestContext(); + ctx->cob = cob; + ctx->pending_calls = leaf_ports_.size(); + for (std::vector<int>::iterator it = leaf_ports_.begin(); + it != leaf_ports_.end(); ++it) { + boost::shared_ptr<TAsyncChannel> channel( + new TEvhttpClientChannel( + "localhost", "/", "127.0.0.1", *it, eb_)); + AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); + client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); + } + } + + void setEventBase(struct event_base* eb) { + eb_ = eb; + } + + void clientReturn(RequestContext* ctx, AggrCobClient* client) { + ctx->pending_calls -= 1; + + try { + std::vector<int32_t> subret; + client->recv_getValues(subret); + ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); + } catch (TException& exn) { + // TODO: Log error + } + + delete client; + + if (ctx->pending_calls == 0) { + ctx->cob(ctx->ret); + delete ctx; + } + } + + protected: + struct event_base* eb_; + std::vector<int> leaf_ports_; + boost::shared_ptr<TProtocolFactory> pfact_; +}; + + +int main() { + boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); + boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); + boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); + boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); + boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); + handler->setEventBase(server->getEventBase()); + server->serve(); +} |