summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/contrib/async-test/test-server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/contrib/async-test/test-server.cpp')
-rw-r--r--src/jaegertracing/thrift/contrib/async-test/test-server.cpp97
1 files changed, 97 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/async-test/test-server.cpp b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp
new file mode 100644
index 000000000..b304e1bc9
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/async-test/test-server.cpp
@@ -0,0 +1,97 @@
+#include <tr1/functional>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/async/TAsyncProtocolProcessor.h>
+#include <thrift/async/TEvhttpServer.h>
+#include <thrift/async/TEvhttpClientChannel.h>
+#include "Aggr.h"
+
+using std::tr1::bind;
+using std::tr1::placeholders::_1;
+
+using apache::thrift::TException;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::async::TEvhttpServer;
+using apache::thrift::async::TAsyncProcessor;
+using apache::thrift::async::TAsyncBufferProcessor;
+using apache::thrift::async::TAsyncProtocolProcessor;
+using apache::thrift::async::TAsyncChannel;
+using apache::thrift::async::TEvhttpClientChannel;
+
+class AggrAsyncHandler : public AggrCobSvIf {
+ protected:
+ struct RequestContext {
+ std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
+ std::vector<int32_t> ret;
+ int pending_calls;
+ };
+
+ public:
+ AggrAsyncHandler()
+ : eb_(NULL)
+ , pfact_(new TBinaryProtocolFactory())
+ {
+ leaf_ports_.push_back(8081);
+ leaf_ports_.push_back(8082);
+ }
+
+ void addValue(std::tr1::function<void()> cob, const int32_t value) {
+ // Silently drop writes to the aggrgator.
+ return cob();
+ }
+
+ void getValues(std::tr1::function<void(
+ std::vector<int32_t> const& _return)> cob,
+ std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
+ RequestContext* ctx = new RequestContext();
+ ctx->cob = cob;
+ ctx->pending_calls = leaf_ports_.size();
+ for (std::vector<int>::iterator it = leaf_ports_.begin();
+ it != leaf_ports_.end(); ++it) {
+ boost::shared_ptr<TAsyncChannel> channel(
+ new TEvhttpClientChannel(
+ "localhost", "/", "127.0.0.1", *it, eb_));
+ AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
+ client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
+ }
+ }
+
+ void setEventBase(struct event_base* eb) {
+ eb_ = eb;
+ }
+
+ void clientReturn(RequestContext* ctx, AggrCobClient* client) {
+ ctx->pending_calls -= 1;
+
+ try {
+ std::vector<int32_t> subret;
+ client->recv_getValues(subret);
+ ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
+ } catch (TException& exn) {
+ // TODO: Log error
+ }
+
+ delete client;
+
+ if (ctx->pending_calls == 0) {
+ ctx->cob(ctx->ret);
+ delete ctx;
+ }
+ }
+
+ protected:
+ struct event_base* eb_;
+ std::vector<int> leaf_ports_;
+ boost::shared_ptr<TProtocolFactory> pfact_;
+};
+
+
+int main() {
+ boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
+ boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
+ boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
+ boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
+ boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
+ handler->setEventBase(server->getEventBase());
+ server->serve();
+}