From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../thrift/contrib/async-test/test-server.cpp | 97 ++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 src/jaegertracing/thrift/contrib/async-test/test-server.cpp (limited to 'src/jaegertracing/thrift/contrib/async-test/test-server.cpp') diff --git a/src/jaegertracing/thrift/contrib/async-test/test-server.cpp b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp new file mode 100644 index 000000000..b304e1bc9 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include +#include "Aggr.h" + +using std::tr1::bind; +using std::tr1::placeholders::_1; + +using apache::thrift::TException; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::async::TEvhttpServer; +using apache::thrift::async::TAsyncProcessor; +using apache::thrift::async::TAsyncBufferProcessor; +using apache::thrift::async::TAsyncProtocolProcessor; +using apache::thrift::async::TAsyncChannel; +using apache::thrift::async::TEvhttpClientChannel; + +class AggrAsyncHandler : public AggrCobSvIf { + protected: + struct RequestContext { + std::tr1::function const& _return)> cob; + std::vector ret; + int pending_calls; + }; + + public: + AggrAsyncHandler() + : eb_(NULL) + , pfact_(new TBinaryProtocolFactory()) + { + leaf_ports_.push_back(8081); + leaf_ports_.push_back(8082); + } + + void addValue(std::tr1::function cob, const int32_t value) { + // Silently drop writes to the aggrgator. + return cob(); + } + + void getValues(std::tr1::function const& _return)> cob, + std::tr1::function exn_cob) { + RequestContext* ctx = new RequestContext(); + ctx->cob = cob; + ctx->pending_calls = leaf_ports_.size(); + for (std::vector::iterator it = leaf_ports_.begin(); + it != leaf_ports_.end(); ++it) { + boost::shared_ptr channel( + new TEvhttpClientChannel( + "localhost", "/", "127.0.0.1", *it, eb_)); + AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); + client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); + } + } + + void setEventBase(struct event_base* eb) { + eb_ = eb; + } + + void clientReturn(RequestContext* ctx, AggrCobClient* client) { + ctx->pending_calls -= 1; + + try { + std::vector subret; + client->recv_getValues(subret); + ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); + } catch (TException& exn) { + // TODO: Log error + } + + delete client; + + if (ctx->pending_calls == 0) { + ctx->cob(ctx->ret); + delete ctx; + } + } + + protected: + struct event_base* eb_; + std::vector leaf_ports_; + boost::shared_ptr pfact_; +}; + + +int main() { + boost::shared_ptr handler(new AggrAsyncHandler()); + boost::shared_ptr proc(new AggrAsyncProcessor(handler)); + boost::shared_ptr pfact(new TBinaryProtocolFactory()); + boost::shared_ptr bufproc(new TAsyncProtocolProcessor(proc, pfact)); + boost::shared_ptr server(new TEvhttpServer(bufproc, 8080)); + handler->setEventBase(server->getEventBase()); + server->serve(); +} -- cgit v1.2.3