summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/contrib/zeromq
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/contrib/zeromq
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/thrift/contrib/zeromq')
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/Makefile39
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/README.md30
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp48
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h65
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py64
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp96
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h84
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py79
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs46
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs60
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs78
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs56
-rwxr-xr-xsrc/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj91
-rwxr-xr-xsrc/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln42
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/storage.thrift4
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/test-client.cpp40
-rwxr-xr-xsrc/jaegertracing/thrift/contrib/zeromq/test-client.py36
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp40
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp32
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/test-server.cpp43
-rwxr-xr-xsrc/jaegertracing/thrift/contrib/zeromq/test-server.py33
21 files changed, 1106 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/zeromq/Makefile b/src/jaegertracing/thrift/contrib/zeromq/Makefile
new file mode 100644
index 000000000..ee398e22c
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/Makefile
@@ -0,0 +1,39 @@
+THRIFT = thrift
+
+CXXFLAGS += -g3 -O0
+
+GENNAMES = Storage storage_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = storage/__init__.py
+
+PROGS = test-client test-server test-sender test-receiver
+
+all: $(PYLIBS) $(PROGS)
+
+test-client: test-client.o TZmqClient.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
+test-server: test-server.o TZmqServer.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
+test-sender: test-sender.o TZmqClient.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
+test-receiver: test-receiver.o TZmqServer.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
+
+test-client.o test-server.o test-sender.o test-receiver.o: $(GENSRCS)
+
+storage/__init__.py: storage.thrift
+ $(RM) $(dir $@)
+ $(THRIFT) --gen py $<
+ mv gen-py/$(dir $@) .
+
+$(GENSRCS): storage.thrift
+ $(THRIFT) --gen cpp $<
+ mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+ $(RM) -r *.o $(PROGS) storage $(GENSRCS) $(GENHEADERS)
+
+.PHONY: clean
diff --git a/src/jaegertracing/thrift/contrib/zeromq/README.md b/src/jaegertracing/thrift/contrib/zeromq/README.md
new file mode 100644
index 000000000..9e0b5bd32
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/README.md
@@ -0,0 +1,30 @@
+This directory contains some glue code to allow Thrift RPCs to be sent over
+ZeroMQ. Included are client and server implementations for Python and C++,
+along with a simple demo interface (with a working client and server for
+each language).
+
+Thrift was designed for stream-based interfaces like TCP, but ZeroMQ is
+message-based, so there is a small impedance mismatch. Most of issues are
+hidden from developers, but one cannot be: oneway methods have to be handled
+differently from normal ones. ZeroMQ requires the messaging pattern to be
+declared at socket creation time, so an application cannot decide on a
+message-by-message basis whether to send a reply. Therefore, this
+implementation makes it the client's responsibility to ensure that ZMQ_REQ
+sockets are used for normal methods and ZMQ_DOWNSTREAM sockets are used for
+oneway methods. In addition, services that expose both types of methods
+have to expose two servers (on two ports), but the TZmqMultiServer makes it
+easy to run the two together in the same thread.
+
+This code was tested with ZeroMQ 2.0.7 and pyzmq afabbb5b9bd3.
+
+To build, simply install Thrift and ZeroMQ, then run "make". If you install
+in a non-standard location, make sure to set THRIFT to the location of the
+Thrift code generator on the make command line and PKG_CONFIG_PATH to a path
+that includes the pkgconfig files for both Thrift and ZeroMQ. The test
+servers take no arguments. Run the test clients with no arguments to
+retrieve the stored value or with an integer argument to increment it by
+that amount.
+
+This code is not quite what I would consider production-ready. It doesn't
+support all of the normal hooks into Thrift, and its performance is
+sub-optimal because it does some unnecessary copying.
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp
new file mode 100644
index 000000000..56278f325
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TZmqClient.h"
+#include <cstring>
+
+namespace apache { namespace thrift { namespace transport {
+
+uint32_t TZmqClient::read_virt(uint8_t* buf, uint32_t len) {
+ if (rbuf_.available_read() == 0) {
+ (void)sock_.recv(&msg_);
+ rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size());
+ }
+ return rbuf_.read(buf, len);
+}
+
+void TZmqClient::write_virt(const uint8_t* buf, uint32_t len) {
+ return wbuf_.write(buf, len);
+}
+
+uint32_t TZmqClient::writeEnd() {
+ uint8_t* buf;
+ uint32_t size;
+ wbuf_.getBuffer(&buf, &size);
+ zmq::message_t msg(size);
+ std::memcpy(msg.data(), buf, size);
+ (void)sock_.send(msg);
+ wbuf_.resetBuffer(true);
+ return size;
+}
+
+}}} // apache::thrift::transport
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h
new file mode 100644
index 000000000..df16e03af
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_
+#define _THRIFT_TRANSPORT_TZMQCLIENT_H_ 1
+
+#include <zmq.hpp>
+#include <thrift/transport/TBufferTransports.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+class TZmqClient : public TTransport {
+ public:
+ TZmqClient(zmq::context_t& ctx, const std::string& endpoint, int type)
+ : sock_(ctx, type)
+ , endpoint_(endpoint)
+ , wbuf_()
+ , rbuf_()
+ , msg_()
+ , zmq_type_(type)
+ {}
+
+ void open() {
+ if(zmq_type_ == ZMQ_PUB) {
+ sock_.bind(endpoint_.c_str());
+ }
+ else {
+ sock_.connect(endpoint_.c_str());
+ }
+ }
+
+ uint32_t read_virt(uint8_t* buf, uint32_t len);
+
+ void write_virt(const uint8_t* buf, uint32_t len);
+
+ uint32_t writeEnd();
+
+ protected:
+ zmq::socket_t sock_;
+ std::string endpoint_;
+ TMemoryBuffer wbuf_;
+ TMemoryBuffer rbuf_;
+ zmq::message_t msg_;
+ int zmq_type_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py
new file mode 100644
index 000000000..1bd60a1e5
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import zmq
+from cStringIO import StringIO
+from thrift.transport.TTransport import TTransportBase, CReadableTransport
+
+
+class TZmqClient(TTransportBase, CReadableTransport):
+ def __init__(self, ctx, endpoint, sock_type):
+ self._sock = ctx.socket(sock_type)
+ self._endpoint = endpoint
+ self._wbuf = StringIO()
+ self._rbuf = StringIO()
+
+ def open(self):
+ self._sock.connect(self._endpoint)
+
+ def read(self, size):
+ ret = self._rbuf.read(size)
+ if len(ret) != 0:
+ return ret
+ self._read_message()
+ return self._rbuf.read(size)
+
+ def _read_message(self):
+ msg = self._sock.recv()
+ self._rbuf = StringIO(msg)
+
+ def write(self, buf):
+ self._wbuf.write(buf)
+
+ def flush(self):
+ msg = self._wbuf.getvalue()
+ self._wbuf = StringIO()
+ self._sock.send(msg)
+
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self._rbuf
+
+ # NOTE: This will probably not actually work.
+ def cstringio_refill(self, prefix, reqlen):
+ while len(prefix) < reqlen:
+ self.read_message()
+ prefix += self._rbuf.getvalue()
+ self._rbuf = StringIO(prefix)
+ return self._rbuf
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp
new file mode 100644
index 000000000..88660a330
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TZmqServer.h"
+#include <thrift/transport/TBufferTransports.h>
+#include <boost/scoped_ptr.hpp>
+
+using apache::thrift::std::shared_ptr;
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::protocol::TProtocol;
+
+namespace apache { namespace thrift { namespace server {
+
+bool TZmqServer::serveOne(int recv_flags) {
+ zmq::message_t msg;
+ bool received = sock_.recv(&msg, recv_flags);
+ if (!received) {
+ return false;
+ }
+ shared_ptr<TMemoryBuffer> inputTransport(new TMemoryBuffer((uint8_t*)msg.data(), msg.size()));
+ shared_ptr<TMemoryBuffer> outputTransport(new TMemoryBuffer());
+ shared_ptr<TProtocol> inputProtocol(
+ inputProtocolFactory_->getProtocol(inputTransport));
+ shared_ptr<TProtocol> outputProtocol(
+ outputProtocolFactory_->getProtocol(outputTransport));
+ shared_ptr<TMemoryBuffer> transport(new TMemoryBuffer);
+
+ processor_->process(inputProtocol, outputProtocol, NULL);
+
+ if (zmq_type_ == ZMQ_REP) {
+ uint8_t* buf;
+ uint32_t size;
+ outputTransport->getBuffer(&buf, &size);
+ msg.rebuild(size);
+ std::memcpy(msg.data(), buf, size);
+ (void)sock_.send(msg);
+ }
+
+ return true;
+}
+
+
+void TZmqMultiServer::serveOne(long timeout) {
+ boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
+ serveActive(items.get(), timeout);
+}
+
+
+void TZmqMultiServer::serveForever() {
+ boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
+ while (true) {
+ serveActive(items.get(), -1);
+ }
+}
+
+
+zmq::pollitem_t* TZmqMultiServer::setupPoll() {
+ zmq::pollitem_t* items = new zmq::pollitem_t[servers_.size()];
+ for (int i = 0; i < servers_.size(); ++i) {
+ items[i].socket = servers_[i]->getSocket();
+ items[i].events = ZMQ_POLLIN;
+ }
+ return items;
+}
+
+void TZmqMultiServer::serveActive(zmq::pollitem_t* items, long timeout) {
+ int rc = zmq::poll(items, servers_.size(), timeout);
+ if (rc == 0) {
+ return;
+ }
+ for (int i = 0; i < servers_.size(); ++i) {
+ if ((items[i].revents & ZMQ_POLLIN) != 0) {
+ // Should we pass ZMQ_NOBLOCK here to be safe?
+ servers_[i]->serveOne();
+ }
+ }
+}
+
+
+}}} // apache::thrift::server
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h
new file mode 100644
index 000000000..ecd13b424
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TZMQSERVER_H_
+#define _THRIFT_SERVER_TZMQSERVER_H_ 1
+
+#include <memory>
+#include <zmq.hpp>
+#include <thrift/server/TServer.h>
+
+namespace apache { namespace thrift { namespace server {
+
+class TZmqServer : public TServer {
+ public:
+ TZmqServer(
+ std::shared_ptr<TProcessor> processor,
+ zmq::context_t& ctx, const std::string& endpoint, int type)
+ : TServer(processor)
+ , processor_(processor)
+ , zmq_type_(type)
+ , sock_(ctx, type)
+ {
+ if(zmq_type_ == ZMQ_SUB) {
+ sock_.setsockopt(ZMQ_SUBSCRIBE, "", 0) ; // listen to all messages
+ sock_.connect(endpoint.c_str()) ;
+ }
+ else {
+ sock_.bind(endpoint.c_str());
+ }
+ }
+
+ bool serveOne(int recv_flags = 0);
+ void serve() {
+ while (true) {
+ serveOne();
+ }
+ }
+
+ zmq::socket_t& getSocket() {
+ return sock_;
+ }
+
+ private:
+ std::shared_ptr<TProcessor> processor_;
+ int zmq_type_;
+ zmq::socket_t sock_;
+};
+
+
+class TZmqMultiServer {
+ public:
+ void serveOne(long timeout = -1);
+ void serveForever();
+
+ std::vector<TZmqServer*>& servers() {
+ return servers_;
+ }
+
+ private:
+ zmq::pollitem_t* setupPoll();
+ void serveActive(zmq::pollitem_t* items, long timeout);
+ std::vector<TZmqServer*> servers_;
+};
+
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TZMQSERVER_H_
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py
new file mode 100644
index 000000000..15c1543ac
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import zmq
+import thrift.server.TServer
+import thrift.transport.TTransport
+
+
+class TZmqServer(thrift.server.TServer.TServer):
+ def __init__(self, processor, ctx, endpoint, sock_type):
+ thrift.server.TServer.TServer.__init__(self, processor, None)
+ self.zmq_type = sock_type
+ self.socket = ctx.socket(sock_type)
+ self.socket.bind(endpoint)
+
+ def serveOne(self):
+ msg = self.socket.recv()
+ itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
+ otrans = thrift.transport.TTransport.TMemoryBuffer()
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ self.processor.process(iprot, oprot)
+ except Exception:
+ logging.exception("Exception while processing request")
+ # Fall through and send back a response, even if empty or incomplete.
+
+ if self.zmq_type == zmq.REP:
+ msg = otrans.getvalue()
+ self.socket.send(msg)
+
+ def serve(self):
+ while True:
+ self.serveOne()
+
+
+class TZmqMultiServer(object):
+ def __init__(self):
+ self.servers = []
+
+ def serveOne(self, timeout=-1):
+ self._serveActive(self._setupPoll(), timeout)
+
+ def serveForever(self):
+ poll_info = self._setupPoll()
+ while True:
+ self._serveActive(poll_info, -1)
+
+ def _setupPoll(self):
+ server_map = {}
+ poller = zmq.Poller()
+ for server in self.servers:
+ server_map[server.socket] = server
+ poller.register(server.socket, zmq.POLLIN)
+ return (server_map, poller)
+
+ def _serveActive(self, poll_info, timeout):
+ (server_map, poller) = poll_info
+ ready = dict(poller.poll())
+ for sock, state in ready.items():
+ assert (state & zmq.POLLIN) != 0
+ server_map[sock].serveOne()
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs
new file mode 100644
index 000000000..12cd434f3
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+
+// Information about this assembly is defined by the following attributes.
+// Change them to the values specific to your project.
+
+[assembly: AssemblyTitle("ZmqServer")]
+[assembly: AssemblyDescription("Zmq Examples")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("The Apache Software Foundation")]
+[assembly: AssemblyProduct("")]
+[assembly: AssemblyCopyright("The Apache Software Foundation")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
+// The form "{Major}.{Minor}.*" will automatically update the build and revision,
+// and "{Major}.{Minor}.{Build}.*" will update just the revision.
+
+[assembly: AssemblyVersion("1.0.*")]
+
+// The following attributes are used to specify the signing key for the assembly,
+// if desired. See the Mono documentation for more information about signing.
+
+//[assembly: AssemblyDelaySign(false)]
+//[assembly: AssemblyKeyFile("")]
+
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs
new file mode 100644
index 000000000..e66cfe080
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs
@@ -0,0 +1,60 @@
+using System;
+using System.Threading;
+using Thrift.Protocol;
+using ZMQ;
+using ZmqServer;
+using ZmqClient;
+
+namespace ZmqServer
+{
+ class MainClass
+ {
+ public static void Main (string[] args)
+ {
+ new Thread(Server.serve).Start();
+ Client.work();
+ }
+
+ static class Server{
+ public static void serve(){
+ StorageHandler s=new StorageHandler();
+ Storage.Processor p=new Storage.Processor(s);
+
+ ZMQ.Context c=new ZMQ.Context();
+
+ TZmqServer tzs=new TZmqServer(p,c,"tcp://127.0.0.1:9090",ZMQ.SocketType.PAIR);
+ tzs.Serve();
+ }
+
+ class StorageHandler:Storage.Iface{
+ int val=0;
+
+ public void incr(int amount){
+ val+=amount;
+ Console.WriteLine("incr({0})",amount);
+ }
+
+ public int get(){
+ return val;
+ }
+ }
+ }
+
+ static class Client{
+ public static void work()
+ {
+ Context ctx=new Context();
+ TZmqClient tzc=new TZmqClient(ctx,"tcp://127.0.0.1:9090",SocketType.PAIR);
+ TBinaryProtocol p=new TBinaryProtocol(tzc);
+
+ Storage.Client client=new Storage.Client(p);
+ tzc.Open();
+
+ Console.WriteLine(client.@get());
+ client.incr(1);
+ client.incr(41);
+ Console.WriteLine(client.@get());
+ }
+ }
+ }
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs
new file mode 100644
index 000000000..e9ab5166a
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs
@@ -0,0 +1,78 @@
+using System;
+using ZMQ;
+using System.IO;
+using Thrift.Transport;
+
+namespace ZmqClient
+{
+ public class TZmqClient : TTransport
+ {
+ Socket _sock;
+ String _endpoint;
+ MemoryStream _wbuf = new MemoryStream ();
+ MemoryStream _rbuf = new MemoryStream ();
+
+ void debug (string msg)
+ {
+ //Uncomment to enable debug
+// Console.WriteLine (msg);
+ }
+
+ public TZmqClient (Context ctx, String endpoint, SocketType sockType)
+ {
+ _sock = ctx.Socket (sockType);
+ _endpoint = endpoint;
+ }
+
+ public override void Open ()
+ {
+ _sock.Connect (_endpoint);
+ }
+
+ public override void Close ()
+ {
+ throw new NotImplementedException ();
+ }
+
+ public override bool IsOpen {
+ get {
+ throw new NotImplementedException ();
+ }
+ }
+
+ public override int Read (byte[] buf, int off, int len)
+ {
+ debug ("Client_Read");
+ if (off != 0 || len != buf.Length)
+ throw new NotImplementedException ();
+
+ if (_rbuf.Length == 0) {
+ //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
+ debug ("Client_Read Filling buffer..");
+ byte[] tmpBuf = _sock.Recv ();
+ debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
+ _rbuf.Write (tmpBuf, 0, tmpBuf.Length);
+ _rbuf.Position = 0; //For reading
+ }
+ int ret = _rbuf.Read (buf, 0, len);
+ if (_rbuf.Length == _rbuf.Position) //Finished reading
+ _rbuf.SetLength (0);
+ debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
+ return ret;
+ }
+
+ public override void Write (byte[] buf, int off, int len)
+ {
+ debug ("Client_Write");
+ _wbuf.Write (buf, off, len);
+ }
+
+ public override void Flush ()
+ {
+ debug ("Client_Flush");
+ _sock.Send (_wbuf.GetBuffer ());
+ _wbuf = new MemoryStream ();
+ }
+ }
+}
+
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs
new file mode 100644
index 000000000..535c623d0
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs
@@ -0,0 +1,56 @@
+using System;
+using Thrift;
+using Thrift.Server;
+using Thrift.Transport;
+using Thrift.Protocol;
+using ZMQ;
+using System.IO;
+
+using System.Collections.Generic;
+
+namespace ZmqServer
+{
+ public class TZmqServer
+ {
+ Socket _socket ;
+ TProcessor _processor;
+
+ void debug (string msg)
+ {
+ //Uncomment to enable debug
+// Console.WriteLine (msg);
+ }
+
+ public TZmqServer (TProcessor processor, Context ctx, String endpoint, SocketType sockType)
+ {
+ new TSimpleServer (processor,null);
+ _socket = ctx.Socket (sockType);
+ _socket.Bind (endpoint);
+ _processor = processor;
+ }
+
+ public void ServeOne ()
+ {
+ debug ("Server_ServeOne");
+ Byte[] msg = _socket.Recv ();
+ MemoryStream istream = new MemoryStream (msg);
+ MemoryStream ostream = new MemoryStream ();
+ TProtocol tProtocol = new TBinaryProtocol (new TStreamTransport (istream, ostream));
+ _processor.Process (tProtocol, tProtocol);
+
+ if (ostream.Length != 0) {
+ byte[] newBuf = new byte[ostream.Length];
+ Array.Copy (ostream.GetBuffer (), newBuf, ostream.Length);
+ debug (string.Format ("Server_ServeOne sending {0}b", ostream.Length));
+ _socket.Send (newBuf);
+ }
+ }
+
+ public void Serve ()
+ {
+ while (true)
+ ServeOne ();
+ }
+ }
+}
+
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj
new file mode 100755
index 000000000..80ad1dbd6
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">x86</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>ZmqServer</RootNamespace>
+ <AssemblyName>ThriftZMQ</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileUpgradeFlags>
+ </FileUpgradeFlags>
+ <OldToolsVersion>3.5</OldToolsVersion>
+ <UpgradeBackupLocation />
+ <PublishUrl>publish\</PublishUrl>
+ <Install>true</Install>
+ <InstallFrom>Disk</InstallFrom>
+ <UpdateEnabled>false</UpdateEnabled>
+ <UpdateMode>Foreground</UpdateMode>
+ <UpdateInterval>7</UpdateInterval>
+ <UpdateIntervalUnits>Days</UpdateIntervalUnits>
+ <UpdatePeriodically>false</UpdatePeriodically>
+ <UpdateRequired>false</UpdateRequired>
+ <MapFileExtensions>true</MapFileExtensions>
+ <ApplicationRevision>0</ApplicationRevision>
+ <ApplicationVersion>1.0.0.0</ApplicationVersion>
+ <IsWebBootstrapper>false</IsWebBootstrapper>
+ <UseApplicationTrust>false</UseApplicationTrust>
+ <BootstrapperEnabled>true</BootstrapperEnabled>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug</OutputPath>
+ <DefineConstants>DEBUG</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <PlatformTarget>x86</PlatformTarget>
+ <Externalconsole>true</Externalconsole>
+ <CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
+ <DebugType>none</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Release</OutputPath>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <PlatformTarget>x86</PlatformTarget>
+ <Externalconsole>true</Externalconsole>
+ <CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="clrzmq, Version=2.1.0.0, Culture=neutral, processorArchitecture=x86">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>.\clrzmq.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="Thrift, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\lib\csharp\Thrift.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Main.cs" />
+ <Compile Include="AssemblyInfo.cs" />
+ <Compile Include="TZmqServer.cs" />
+ <Compile Include="TZmqClient.cs" />
+ <Compile Include="..\gen-csharp\Storage.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <BootstrapperPackage Include="Microsoft.Net.Client.3.5">
+ <Visible>False</Visible>
+ <ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName>
+ <Install>false</Install>
+ </BootstrapperPackage>
+ <BootstrapperPackage Include="Microsoft.Net.Framework.3.5.SP1">
+ <Visible>False</Visible>
+ <ProductName>.NET Framework 3.5 SP1</ProductName>
+ <Install>true</Install>
+ </BootstrapperPackage>
+ <BootstrapperPackage Include="Microsoft.Windows.Installer.3.1">
+ <Visible>False</Visible>
+ <ProductName>Windows Installer 3.1</ProductName>
+ <Install>true</Install>
+ </BootstrapperPackage>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+</Project>
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln
new file mode 100755
index 000000000..6af57b60c
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln
@@ -0,0 +1,42 @@
+
+Microsoft Visual Studio Solution File, Format Version 11.00
+# Visual Studio 2010
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ThriftZMQ", "ThriftZMQ.csproj", "{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thrift", "..\..\..\lib\csharp\src\Thrift.csproj", "{499EB63C-D74C-47E8-AE48-A2FC94538E9D}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Debug|Mixed Platforms = Debug|Mixed Platforms
+ Debug|x86 = Debug|x86
+ Release|Any CPU = Release|Any CPU
+ Release|Mixed Platforms = Release|Mixed Platforms
+ Release|x86 = Release|x86
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Any CPU.ActiveCfg = Debug|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.Build.0 = Debug|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.ActiveCfg = Debug|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.Build.0 = Debug|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Any CPU.ActiveCfg = Release|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.ActiveCfg = Release|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.Build.0 = Release|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.ActiveCfg = Release|x86
+ {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.Build.0 = Release|x86
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|x86.ActiveCfg = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
diff --git a/src/jaegertracing/thrift/contrib/zeromq/storage.thrift b/src/jaegertracing/thrift/contrib/zeromq/storage.thrift
new file mode 100644
index 000000000..a1ea96752
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/storage.thrift
@@ -0,0 +1,4 @@
+service Storage {
+ oneway void incr(1: i32 amount);
+ i32 get();
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp
new file mode 100644
index 000000000..159c25030
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp
@@ -0,0 +1,40 @@
+#include <iostream>
+#include <cstdlib>
+#include <thrift/protocol/TBinaryProtocol.h>
+
+#include "zmq.hpp"
+#include "TZmqClient.h"
+#include "Storage.h"
+
+using apache::thrift::std::shared_ptr;
+using apache::thrift::transport::TZmqClient;
+using apache::thrift::protocol::TBinaryProtocol;
+
+int main(int argc, char** argv) {
+ const char* endpoint = "tcp://127.0.0.1:9090";
+ int socktype = ZMQ_REQ;
+ int incr = 0;
+ if (argc > 1) {
+ incr = atoi(argv[1]);
+ if (incr) {
+ socktype = ZMQ_PUSH;
+ endpoint = "tcp://127.0.0.1:9091";
+ }
+ }
+
+ zmq::context_t ctx(1);
+ shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
+ shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ StorageClient client(protocol);
+ transport->open();
+
+ if (incr) {
+ client.incr(incr);
+ usleep(50000);
+ } else {
+ int value = client.get();
+ std::cout << value << std::endl;
+ }
+
+ return 0;
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-client.py b/src/jaegertracing/thrift/contrib/zeromq/test-client.py
new file mode 100755
index 000000000..d51216e45
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-client.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import sys
+import time
+import zmq
+import TZmqClient
+import thrift.protocol.TBinaryProtocol
+import storage.ttypes
+import storage.Storage
+
+
+def main(args):
+ endpoint = "tcp://127.0.0.1:9090"
+ socktype = zmq.REQ
+ incr = 0
+ if len(args) > 1:
+ incr = int(args[1])
+ if incr:
+ socktype = zmq.PUSH
+ endpoint = "tcp://127.0.0.1:9091"
+
+ ctx = zmq.Context()
+ transport = TZmqClient.TZmqClient(ctx, endpoint, socktype)
+ protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocolAccelerated(transport)
+ client = storage.Storage.Client(protocol)
+ transport.open()
+
+ if incr:
+ client.incr(incr)
+ time.sleep(0.05)
+ else:
+ value = client.get()
+ print(value)
+
+
+if __name__ == "__main__":
+ main(sys.argv)
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp
new file mode 100644
index 000000000..d465bff63
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp
@@ -0,0 +1,40 @@
+#include "zmq.hpp"
+#include "TZmqServer.h"
+#include "Storage.h"
+
+using apache::thrift::std::shared_ptr;
+using apache::thrift::TProcessor;
+using apache::thrift::server::TZmqServer;
+using apache::thrift::server::TZmqMultiServer;
+
+class StorageHandler : virtual public StorageIf {
+ public:
+ StorageHandler()
+ : value_(0)
+ {}
+
+ void incr(const int32_t amount) {
+ value_ += amount;
+ printf("value_: %i\n", value_) ;
+ }
+
+ int32_t get() {
+ return value_;
+ }
+
+ private:
+ int32_t value_;
+
+};
+
+
+int main(int argc, char *argv[]) {
+ shared_ptr<StorageHandler> handler(new StorageHandler());
+ shared_ptr<TProcessor> processor(new StorageProcessor(handler));
+
+ zmq::context_t ctx(1);
+ TZmqServer oneway_server(processor, ctx, "epgm://eth0;239.192.1.1:5555", ZMQ_SUB);
+ oneway_server.serve();
+
+ return 0;
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp
new file mode 100644
index 000000000..5c086a11f
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp
@@ -0,0 +1,32 @@
+#include <iostream>
+#include <cstdlib>
+#include <thrift/protocol/TBinaryProtocol.h>
+
+#include "zmq.hpp"
+#include "TZmqClient.h"
+#include "Storage.h"
+
+using apache::thrift::std::shared_ptr;
+using apache::thrift::transport::TZmqClient;
+using apache::thrift::protocol::TBinaryProtocol;
+
+int main(int argc, char** argv) {
+ const char* endpoint = "epgm://eth0;239.192.1.1:5555";
+ int socktype = ZMQ_PUB;
+ int incr = 1;
+ if (argc > 1) {
+ incr = atoi(argv[1]);
+ }
+
+ zmq::context_t ctx(1);
+ shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
+ shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ StorageClient client(protocol);
+
+ transport->open();
+
+ client.incr(incr);
+ usleep(50000);
+
+ return 0;
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp
new file mode 100644
index 000000000..e6f1b2083
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp
@@ -0,0 +1,43 @@
+#include "zmq.hpp"
+#include "TZmqServer.h"
+#include "Storage.h"
+
+using apache::thrift::std::shared_ptr;
+using apache::thrift::TProcessor;
+using apache::thrift::server::TZmqServer;
+using apache::thrift::server::TZmqMultiServer;
+
+class StorageHandler : virtual public StorageIf {
+ public:
+ StorageHandler()
+ : value_(0)
+ {}
+
+ void incr(const int32_t amount) {
+ value_ += amount;
+ }
+
+ int32_t get() {
+ return value_;
+ }
+
+ private:
+ int32_t value_;
+
+};
+
+
+int main(int argc, char *argv[]) {
+ shared_ptr<StorageHandler> handler(new StorageHandler());
+ shared_ptr<TProcessor> processor(new StorageProcessor(handler));
+
+ zmq::context_t ctx(1);
+ TZmqServer reqrep_server(processor, ctx, "tcp://0.0.0.0:9090", ZMQ_REP);
+ TZmqServer oneway_server(processor, ctx, "tcp://0.0.0.0:9091", ZMQ_PULL);
+ TZmqMultiServer multiserver;
+ multiserver.servers().push_back(&reqrep_server);
+ multiserver.servers().push_back(&oneway_server);
+ multiserver.serveForever();
+
+ return 0;
+}
diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-server.py b/src/jaegertracing/thrift/contrib/zeromq/test-server.py
new file mode 100755
index 000000000..d89b37ba2
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/test-server.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+import zmq
+import TZmqServer
+import storage.ttypes
+import storage.Storage
+
+
+class StorageHandler(storage.Storage.Iface):
+ def __init__(self):
+ self.value = 0
+
+ def incr(self, amount):
+ self.value += amount
+
+ def get(self):
+ return self.value
+
+
+def main():
+ handler = StorageHandler()
+ processor = storage.Storage.Processor(handler)
+
+ ctx = zmq.Context()
+ reqrep_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9090", zmq.REP)
+ oneway_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9091", zmq.PULL)
+ multiserver = TZmqServer.TZmqMultiServer()
+ multiserver.servers.append(reqrep_server)
+ multiserver.servers.append(oneway_server)
+ multiserver.serveForever()
+
+
+if __name__ == "__main__":
+ main()