diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/contrib/zeromq | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/thrift/contrib/zeromq')
21 files changed, 1106 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/zeromq/Makefile b/src/jaegertracing/thrift/contrib/zeromq/Makefile new file mode 100644 index 000000000..ee398e22c --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/Makefile @@ -0,0 +1,39 @@ +THRIFT = thrift + +CXXFLAGS += -g3 -O0 + +GENNAMES = Storage storage_types +GENHEADERS = $(addsuffix .h, $(GENNAMES)) +GENSRCS = $(addsuffix .cpp, $(GENNAMES)) +GENOBJS = $(addsuffix .o, $(GENNAMES)) + +PYLIBS = storage/__init__.py + +PROGS = test-client test-server test-sender test-receiver + +all: $(PYLIBS) $(PROGS) + +test-client: test-client.o TZmqClient.o $(GENOBJS) + $(CXX) $^ -o $@ -lzmq -lthrift +test-server: test-server.o TZmqServer.o $(GENOBJS) + $(CXX) $^ -o $@ -lzmq -lthrift +test-sender: test-sender.o TZmqClient.o $(GENOBJS) + $(CXX) $^ -o $@ -lzmq -lthrift +test-receiver: test-receiver.o TZmqServer.o $(GENOBJS) + $(CXX) $^ -o $@ -lzmq -lthrift + +test-client.o test-server.o test-sender.o test-receiver.o: $(GENSRCS) + +storage/__init__.py: storage.thrift + $(RM) $(dir $@) + $(THRIFT) --gen py $< + mv gen-py/$(dir $@) . + +$(GENSRCS): storage.thrift + $(THRIFT) --gen cpp $< + mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) . + +clean: + $(RM) -r *.o $(PROGS) storage $(GENSRCS) $(GENHEADERS) + +.PHONY: clean diff --git a/src/jaegertracing/thrift/contrib/zeromq/README.md b/src/jaegertracing/thrift/contrib/zeromq/README.md new file mode 100644 index 000000000..9e0b5bd32 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/README.md @@ -0,0 +1,30 @@ +This directory contains some glue code to allow Thrift RPCs to be sent over +ZeroMQ. Included are client and server implementations for Python and C++, +along with a simple demo interface (with a working client and server for +each language). + +Thrift was designed for stream-based interfaces like TCP, but ZeroMQ is +message-based, so there is a small impedance mismatch. Most of issues are +hidden from developers, but one cannot be: oneway methods have to be handled +differently from normal ones. ZeroMQ requires the messaging pattern to be +declared at socket creation time, so an application cannot decide on a +message-by-message basis whether to send a reply. Therefore, this +implementation makes it the client's responsibility to ensure that ZMQ_REQ +sockets are used for normal methods and ZMQ_DOWNSTREAM sockets are used for +oneway methods. In addition, services that expose both types of methods +have to expose two servers (on two ports), but the TZmqMultiServer makes it +easy to run the two together in the same thread. + +This code was tested with ZeroMQ 2.0.7 and pyzmq afabbb5b9bd3. + +To build, simply install Thrift and ZeroMQ, then run "make". If you install +in a non-standard location, make sure to set THRIFT to the location of the +Thrift code generator on the make command line and PKG_CONFIG_PATH to a path +that includes the pkgconfig files for both Thrift and ZeroMQ. The test +servers take no arguments. Run the test clients with no arguments to +retrieve the stored value or with an integer argument to increment it by +that amount. + +This code is not quite what I would consider production-ready. It doesn't +support all of the normal hooks into Thrift, and its performance is +sub-optimal because it does some unnecessary copying. diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp new file mode 100644 index 000000000..56278f325 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.cpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TZmqClient.h" +#include <cstring> + +namespace apache { namespace thrift { namespace transport { + +uint32_t TZmqClient::read_virt(uint8_t* buf, uint32_t len) { + if (rbuf_.available_read() == 0) { + (void)sock_.recv(&msg_); + rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size()); + } + return rbuf_.read(buf, len); +} + +void TZmqClient::write_virt(const uint8_t* buf, uint32_t len) { + return wbuf_.write(buf, len); +} + +uint32_t TZmqClient::writeEnd() { + uint8_t* buf; + uint32_t size; + wbuf_.getBuffer(&buf, &size); + zmq::message_t msg(size); + std::memcpy(msg.data(), buf, size); + (void)sock_.send(msg); + wbuf_.resetBuffer(true); + return size; +} + +}}} // apache::thrift::transport diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h new file mode 100644 index 000000000..df16e03af --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_ +#define _THRIFT_TRANSPORT_TZMQCLIENT_H_ 1 + +#include <zmq.hpp> +#include <thrift/transport/TBufferTransports.h> + +namespace apache { namespace thrift { namespace transport { + +class TZmqClient : public TTransport { + public: + TZmqClient(zmq::context_t& ctx, const std::string& endpoint, int type) + : sock_(ctx, type) + , endpoint_(endpoint) + , wbuf_() + , rbuf_() + , msg_() + , zmq_type_(type) + {} + + void open() { + if(zmq_type_ == ZMQ_PUB) { + sock_.bind(endpoint_.c_str()); + } + else { + sock_.connect(endpoint_.c_str()); + } + } + + uint32_t read_virt(uint8_t* buf, uint32_t len); + + void write_virt(const uint8_t* buf, uint32_t len); + + uint32_t writeEnd(); + + protected: + zmq::socket_t sock_; + std::string endpoint_; + TMemoryBuffer wbuf_; + TMemoryBuffer rbuf_; + zmq::message_t msg_; + int zmq_type_; +}; + +}}} // apache::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_ diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py new file mode 100644 index 000000000..1bd60a1e5 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqClient.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import zmq +from cStringIO import StringIO +from thrift.transport.TTransport import TTransportBase, CReadableTransport + + +class TZmqClient(TTransportBase, CReadableTransport): + def __init__(self, ctx, endpoint, sock_type): + self._sock = ctx.socket(sock_type) + self._endpoint = endpoint + self._wbuf = StringIO() + self._rbuf = StringIO() + + def open(self): + self._sock.connect(self._endpoint) + + def read(self, size): + ret = self._rbuf.read(size) + if len(ret) != 0: + return ret + self._read_message() + return self._rbuf.read(size) + + def _read_message(self): + msg = self._sock.recv() + self._rbuf = StringIO(msg) + + def write(self, buf): + self._wbuf.write(buf) + + def flush(self): + msg = self._wbuf.getvalue() + self._wbuf = StringIO() + self._sock.send(msg) + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self._rbuf + + # NOTE: This will probably not actually work. + def cstringio_refill(self, prefix, reqlen): + while len(prefix) < reqlen: + self.read_message() + prefix += self._rbuf.getvalue() + self._rbuf = StringIO(prefix) + return self._rbuf diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp new file mode 100644 index 000000000..88660a330 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TZmqServer.h" +#include <thrift/transport/TBufferTransports.h> +#include <boost/scoped_ptr.hpp> + +using apache::thrift::std::shared_ptr; +using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::protocol::TProtocol; + +namespace apache { namespace thrift { namespace server { + +bool TZmqServer::serveOne(int recv_flags) { + zmq::message_t msg; + bool received = sock_.recv(&msg, recv_flags); + if (!received) { + return false; + } + shared_ptr<TMemoryBuffer> inputTransport(new TMemoryBuffer((uint8_t*)msg.data(), msg.size())); + shared_ptr<TMemoryBuffer> outputTransport(new TMemoryBuffer()); + shared_ptr<TProtocol> inputProtocol( + inputProtocolFactory_->getProtocol(inputTransport)); + shared_ptr<TProtocol> outputProtocol( + outputProtocolFactory_->getProtocol(outputTransport)); + shared_ptr<TMemoryBuffer> transport(new TMemoryBuffer); + + processor_->process(inputProtocol, outputProtocol, NULL); + + if (zmq_type_ == ZMQ_REP) { + uint8_t* buf; + uint32_t size; + outputTransport->getBuffer(&buf, &size); + msg.rebuild(size); + std::memcpy(msg.data(), buf, size); + (void)sock_.send(msg); + } + + return true; +} + + +void TZmqMultiServer::serveOne(long timeout) { + boost::scoped_ptr<zmq::pollitem_t> items(setupPoll()); + serveActive(items.get(), timeout); +} + + +void TZmqMultiServer::serveForever() { + boost::scoped_ptr<zmq::pollitem_t> items(setupPoll()); + while (true) { + serveActive(items.get(), -1); + } +} + + +zmq::pollitem_t* TZmqMultiServer::setupPoll() { + zmq::pollitem_t* items = new zmq::pollitem_t[servers_.size()]; + for (int i = 0; i < servers_.size(); ++i) { + items[i].socket = servers_[i]->getSocket(); + items[i].events = ZMQ_POLLIN; + } + return items; +} + +void TZmqMultiServer::serveActive(zmq::pollitem_t* items, long timeout) { + int rc = zmq::poll(items, servers_.size(), timeout); + if (rc == 0) { + return; + } + for (int i = 0; i < servers_.size(); ++i) { + if ((items[i].revents & ZMQ_POLLIN) != 0) { + // Should we pass ZMQ_NOBLOCK here to be safe? + servers_[i]->serveOne(); + } + } +} + + +}}} // apache::thrift::server diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h new file mode 100644 index 000000000..ecd13b424 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_SERVER_TZMQSERVER_H_ +#define _THRIFT_SERVER_TZMQSERVER_H_ 1 + +#include <memory> +#include <zmq.hpp> +#include <thrift/server/TServer.h> + +namespace apache { namespace thrift { namespace server { + +class TZmqServer : public TServer { + public: + TZmqServer( + std::shared_ptr<TProcessor> processor, + zmq::context_t& ctx, const std::string& endpoint, int type) + : TServer(processor) + , processor_(processor) + , zmq_type_(type) + , sock_(ctx, type) + { + if(zmq_type_ == ZMQ_SUB) { + sock_.setsockopt(ZMQ_SUBSCRIBE, "", 0) ; // listen to all messages + sock_.connect(endpoint.c_str()) ; + } + else { + sock_.bind(endpoint.c_str()); + } + } + + bool serveOne(int recv_flags = 0); + void serve() { + while (true) { + serveOne(); + } + } + + zmq::socket_t& getSocket() { + return sock_; + } + + private: + std::shared_ptr<TProcessor> processor_; + int zmq_type_; + zmq::socket_t sock_; +}; + + +class TZmqMultiServer { + public: + void serveOne(long timeout = -1); + void serveForever(); + + std::vector<TZmqServer*>& servers() { + return servers_; + } + + private: + zmq::pollitem_t* setupPoll(); + void serveActive(zmq::pollitem_t* items, long timeout); + std::vector<TZmqServer*> servers_; +}; + + +}}} // apache::thrift::server + +#endif // #ifndef _THRIFT_SERVER_TZMQSERVER_H_ diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py new file mode 100644 index 000000000..15c1543ac --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import zmq +import thrift.server.TServer +import thrift.transport.TTransport + + +class TZmqServer(thrift.server.TServer.TServer): + def __init__(self, processor, ctx, endpoint, sock_type): + thrift.server.TServer.TServer.__init__(self, processor, None) + self.zmq_type = sock_type + self.socket = ctx.socket(sock_type) + self.socket.bind(endpoint) + + def serveOne(self): + msg = self.socket.recv() + itrans = thrift.transport.TTransport.TMemoryBuffer(msg) + otrans = thrift.transport.TTransport.TMemoryBuffer() + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + self.processor.process(iprot, oprot) + except Exception: + logging.exception("Exception while processing request") + # Fall through and send back a response, even if empty or incomplete. + + if self.zmq_type == zmq.REP: + msg = otrans.getvalue() + self.socket.send(msg) + + def serve(self): + while True: + self.serveOne() + + +class TZmqMultiServer(object): + def __init__(self): + self.servers = [] + + def serveOne(self, timeout=-1): + self._serveActive(self._setupPoll(), timeout) + + def serveForever(self): + poll_info = self._setupPoll() + while True: + self._serveActive(poll_info, -1) + + def _setupPoll(self): + server_map = {} + poller = zmq.Poller() + for server in self.servers: + server_map[server.socket] = server + poller.register(server.socket, zmq.POLLIN) + return (server_map, poller) + + def _serveActive(self, poll_info, timeout): + (server_map, poller) = poll_info + ready = dict(poller.poll()) + for sock, state in ready.items(): + assert (state & zmq.POLLIN) != 0 + server_map[sock].serveOne() diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs new file mode 100644 index 000000000..12cd434f3 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/AssemblyInfo.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; + +// Information about this assembly is defined by the following attributes. +// Change them to the values specific to your project. + +[assembly: AssemblyTitle("ZmqServer")] +[assembly: AssemblyDescription("Zmq Examples")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("The Apache Software Foundation")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("The Apache Software Foundation")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}". +// The form "{Major}.{Minor}.*" will automatically update the build and revision, +// and "{Major}.{Minor}.{Build}.*" will update just the revision. + +[assembly: AssemblyVersion("1.0.*")] + +// The following attributes are used to specify the signing key for the assembly, +// if desired. See the Mono documentation for more information about signing. + +//[assembly: AssemblyDelaySign(false)] +//[assembly: AssemblyKeyFile("")] + diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs new file mode 100644 index 000000000..e66cfe080 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/Main.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading; +using Thrift.Protocol; +using ZMQ; +using ZmqServer; +using ZmqClient; + +namespace ZmqServer +{ + class MainClass + { + public static void Main (string[] args) + { + new Thread(Server.serve).Start(); + Client.work(); + } + + static class Server{ + public static void serve(){ + StorageHandler s=new StorageHandler(); + Storage.Processor p=new Storage.Processor(s); + + ZMQ.Context c=new ZMQ.Context(); + + TZmqServer tzs=new TZmqServer(p,c,"tcp://127.0.0.1:9090",ZMQ.SocketType.PAIR); + tzs.Serve(); + } + + class StorageHandler:Storage.Iface{ + int val=0; + + public void incr(int amount){ + val+=amount; + Console.WriteLine("incr({0})",amount); + } + + public int get(){ + return val; + } + } + } + + static class Client{ + public static void work() + { + Context ctx=new Context(); + TZmqClient tzc=new TZmqClient(ctx,"tcp://127.0.0.1:9090",SocketType.PAIR); + TBinaryProtocol p=new TBinaryProtocol(tzc); + + Storage.Client client=new Storage.Client(p); + tzc.Open(); + + Console.WriteLine(client.@get()); + client.incr(1); + client.incr(41); + Console.WriteLine(client.@get()); + } + } + } +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs new file mode 100644 index 000000000..e9ab5166a --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs @@ -0,0 +1,78 @@ +using System; +using ZMQ; +using System.IO; +using Thrift.Transport; + +namespace ZmqClient +{ + public class TZmqClient : TTransport + { + Socket _sock; + String _endpoint; + MemoryStream _wbuf = new MemoryStream (); + MemoryStream _rbuf = new MemoryStream (); + + void debug (string msg) + { + //Uncomment to enable debug +// Console.WriteLine (msg); + } + + public TZmqClient (Context ctx, String endpoint, SocketType sockType) + { + _sock = ctx.Socket (sockType); + _endpoint = endpoint; + } + + public override void Open () + { + _sock.Connect (_endpoint); + } + + public override void Close () + { + throw new NotImplementedException (); + } + + public override bool IsOpen { + get { + throw new NotImplementedException (); + } + } + + public override int Read (byte[] buf, int off, int len) + { + debug ("Client_Read"); + if (off != 0 || len != buf.Length) + throw new NotImplementedException (); + + if (_rbuf.Length == 0) { + //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response + debug ("Client_Read Filling buffer.."); + byte[] tmpBuf = _sock.Recv (); + debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length)); + _rbuf.Write (tmpBuf, 0, tmpBuf.Length); + _rbuf.Position = 0; //For reading + } + int ret = _rbuf.Read (buf, 0, len); + if (_rbuf.Length == _rbuf.Position) //Finished reading + _rbuf.SetLength (0); + debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position)); + return ret; + } + + public override void Write (byte[] buf, int off, int len) + { + debug ("Client_Write"); + _wbuf.Write (buf, off, len); + } + + public override void Flush () + { + debug ("Client_Flush"); + _sock.Send (_wbuf.GetBuffer ()); + _wbuf = new MemoryStream (); + } + } +} + diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs new file mode 100644 index 000000000..535c623d0 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqServer.cs @@ -0,0 +1,56 @@ +using System; +using Thrift; +using Thrift.Server; +using Thrift.Transport; +using Thrift.Protocol; +using ZMQ; +using System.IO; + +using System.Collections.Generic; + +namespace ZmqServer +{ + public class TZmqServer + { + Socket _socket ; + TProcessor _processor; + + void debug (string msg) + { + //Uncomment to enable debug +// Console.WriteLine (msg); + } + + public TZmqServer (TProcessor processor, Context ctx, String endpoint, SocketType sockType) + { + new TSimpleServer (processor,null); + _socket = ctx.Socket (sockType); + _socket.Bind (endpoint); + _processor = processor; + } + + public void ServeOne () + { + debug ("Server_ServeOne"); + Byte[] msg = _socket.Recv (); + MemoryStream istream = new MemoryStream (msg); + MemoryStream ostream = new MemoryStream (); + TProtocol tProtocol = new TBinaryProtocol (new TStreamTransport (istream, ostream)); + _processor.Process (tProtocol, tProtocol); + + if (ostream.Length != 0) { + byte[] newBuf = new byte[ostream.Length]; + Array.Copy (ostream.GetBuffer (), newBuf, ostream.Length); + debug (string.Format ("Server_ServeOne sending {0}b", ostream.Length)); + _socket.Send (newBuf); + } + } + + public void Serve () + { + while (true) + ServeOne (); + } + } +} + diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj new file mode 100755 index 000000000..80ad1dbd6 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.csproj @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">x86</Platform> + <ProductVersion>9.0.21022</ProductVersion> + <SchemaVersion>2.0</SchemaVersion> + <ProjectGuid>{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}</ProjectGuid> + <OutputType>Exe</OutputType> + <RootNamespace>ZmqServer</RootNamespace> + <AssemblyName>ThriftZMQ</AssemblyName> + <TargetFrameworkVersion>v3.5</TargetFrameworkVersion> + <FileUpgradeFlags> + </FileUpgradeFlags> + <OldToolsVersion>3.5</OldToolsVersion> + <UpgradeBackupLocation /> + <PublishUrl>publish\</PublishUrl> + <Install>true</Install> + <InstallFrom>Disk</InstallFrom> + <UpdateEnabled>false</UpdateEnabled> + <UpdateMode>Foreground</UpdateMode> + <UpdateInterval>7</UpdateInterval> + <UpdateIntervalUnits>Days</UpdateIntervalUnits> + <UpdatePeriodically>false</UpdatePeriodically> + <UpdateRequired>false</UpdateRequired> + <MapFileExtensions>true</MapFileExtensions> + <ApplicationRevision>0</ApplicationRevision> + <ApplicationVersion>1.0.0.0</ApplicationVersion> + <IsWebBootstrapper>false</IsWebBootstrapper> + <UseApplicationTrust>false</UseApplicationTrust> + <BootstrapperEnabled>true</BootstrapperEnabled> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' "> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Debug</OutputPath> + <DefineConstants>DEBUG</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>x86</PlatformTarget> + <Externalconsole>true</Externalconsole> + <CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' "> + <DebugType>none</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Release</OutputPath> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>x86</PlatformTarget> + <Externalconsole>true</Externalconsole> + <CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet> + </PropertyGroup> + <ItemGroup> + <Reference Include="clrzmq, Version=2.1.0.0, Culture=neutral, processorArchitecture=x86"> + <SpecificVersion>False</SpecificVersion> + <HintPath>.\clrzmq.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="Thrift, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\..\..\lib\csharp\Thrift.dll</HintPath> + </Reference> + </ItemGroup> + <ItemGroup> + <Compile Include="Main.cs" /> + <Compile Include="AssemblyInfo.cs" /> + <Compile Include="TZmqServer.cs" /> + <Compile Include="TZmqClient.cs" /> + <Compile Include="..\gen-csharp\Storage.cs" /> + </ItemGroup> + <ItemGroup> + <BootstrapperPackage Include="Microsoft.Net.Client.3.5"> + <Visible>False</Visible> + <ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName> + <Install>false</Install> + </BootstrapperPackage> + <BootstrapperPackage Include="Microsoft.Net.Framework.3.5.SP1"> + <Visible>False</Visible> + <ProductName>.NET Framework 3.5 SP1</ProductName> + <Install>true</Install> + </BootstrapperPackage> + <BootstrapperPackage Include="Microsoft.Windows.Installer.3.1"> + <Visible>False</Visible> + <ProductName>Windows Installer 3.1</ProductName> + <Install>true</Install> + </BootstrapperPackage> + </ItemGroup> + <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> +</Project> diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln new file mode 100755 index 000000000..6af57b60c --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/ThriftZMQ.sln @@ -0,0 +1,42 @@ + +Microsoft Visual Studio Solution File, Format Version 11.00 +# Visual Studio 2010 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ThriftZMQ", "ThriftZMQ.csproj", "{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thrift", "..\..\..\lib\csharp\src\Thrift.csproj", "{499EB63C-D74C-47E8-AE48-A2FC94538E9D}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|Mixed Platforms = Debug|Mixed Platforms + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|Mixed Platforms = Release|Mixed Platforms + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Any CPU.ActiveCfg = Debug|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.ActiveCfg = Debug|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.Build.0 = Debug|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.ActiveCfg = Debug|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.Build.0 = Debug|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Any CPU.ActiveCfg = Release|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.ActiveCfg = Release|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.Build.0 = Release|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.ActiveCfg = Release|x86 + {17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.Build.0 = Release|x86 + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|x86.ActiveCfg = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.Build.0 = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|x86.ActiveCfg = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/src/jaegertracing/thrift/contrib/zeromq/storage.thrift b/src/jaegertracing/thrift/contrib/zeromq/storage.thrift new file mode 100644 index 000000000..a1ea96752 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/storage.thrift @@ -0,0 +1,4 @@ +service Storage { + oneway void incr(1: i32 amount); + i32 get(); +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp new file mode 100644 index 000000000..159c25030 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-client.cpp @@ -0,0 +1,40 @@ +#include <iostream> +#include <cstdlib> +#include <thrift/protocol/TBinaryProtocol.h> + +#include "zmq.hpp" +#include "TZmqClient.h" +#include "Storage.h" + +using apache::thrift::std::shared_ptr; +using apache::thrift::transport::TZmqClient; +using apache::thrift::protocol::TBinaryProtocol; + +int main(int argc, char** argv) { + const char* endpoint = "tcp://127.0.0.1:9090"; + int socktype = ZMQ_REQ; + int incr = 0; + if (argc > 1) { + incr = atoi(argv[1]); + if (incr) { + socktype = ZMQ_PUSH; + endpoint = "tcp://127.0.0.1:9091"; + } + } + + zmq::context_t ctx(1); + shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype)); + shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport)); + StorageClient client(protocol); + transport->open(); + + if (incr) { + client.incr(incr); + usleep(50000); + } else { + int value = client.get(); + std::cout << value << std::endl; + } + + return 0; +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-client.py b/src/jaegertracing/thrift/contrib/zeromq/test-client.py new file mode 100755 index 000000000..d51216e45 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-client.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +import sys +import time +import zmq +import TZmqClient +import thrift.protocol.TBinaryProtocol +import storage.ttypes +import storage.Storage + + +def main(args): + endpoint = "tcp://127.0.0.1:9090" + socktype = zmq.REQ + incr = 0 + if len(args) > 1: + incr = int(args[1]) + if incr: + socktype = zmq.PUSH + endpoint = "tcp://127.0.0.1:9091" + + ctx = zmq.Context() + transport = TZmqClient.TZmqClient(ctx, endpoint, socktype) + protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocolAccelerated(transport) + client = storage.Storage.Client(protocol) + transport.open() + + if incr: + client.incr(incr) + time.sleep(0.05) + else: + value = client.get() + print(value) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp new file mode 100644 index 000000000..d465bff63 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-receiver.cpp @@ -0,0 +1,40 @@ +#include "zmq.hpp" +#include "TZmqServer.h" +#include "Storage.h" + +using apache::thrift::std::shared_ptr; +using apache::thrift::TProcessor; +using apache::thrift::server::TZmqServer; +using apache::thrift::server::TZmqMultiServer; + +class StorageHandler : virtual public StorageIf { + public: + StorageHandler() + : value_(0) + {} + + void incr(const int32_t amount) { + value_ += amount; + printf("value_: %i\n", value_) ; + } + + int32_t get() { + return value_; + } + + private: + int32_t value_; + +}; + + +int main(int argc, char *argv[]) { + shared_ptr<StorageHandler> handler(new StorageHandler()); + shared_ptr<TProcessor> processor(new StorageProcessor(handler)); + + zmq::context_t ctx(1); + TZmqServer oneway_server(processor, ctx, "epgm://eth0;239.192.1.1:5555", ZMQ_SUB); + oneway_server.serve(); + + return 0; +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp new file mode 100644 index 000000000..5c086a11f --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-sender.cpp @@ -0,0 +1,32 @@ +#include <iostream> +#include <cstdlib> +#include <thrift/protocol/TBinaryProtocol.h> + +#include "zmq.hpp" +#include "TZmqClient.h" +#include "Storage.h" + +using apache::thrift::std::shared_ptr; +using apache::thrift::transport::TZmqClient; +using apache::thrift::protocol::TBinaryProtocol; + +int main(int argc, char** argv) { + const char* endpoint = "epgm://eth0;239.192.1.1:5555"; + int socktype = ZMQ_PUB; + int incr = 1; + if (argc > 1) { + incr = atoi(argv[1]); + } + + zmq::context_t ctx(1); + shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype)); + shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport)); + StorageClient client(protocol); + + transport->open(); + + client.incr(incr); + usleep(50000); + + return 0; +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp b/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp new file mode 100644 index 000000000..e6f1b2083 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-server.cpp @@ -0,0 +1,43 @@ +#include "zmq.hpp" +#include "TZmqServer.h" +#include "Storage.h" + +using apache::thrift::std::shared_ptr; +using apache::thrift::TProcessor; +using apache::thrift::server::TZmqServer; +using apache::thrift::server::TZmqMultiServer; + +class StorageHandler : virtual public StorageIf { + public: + StorageHandler() + : value_(0) + {} + + void incr(const int32_t amount) { + value_ += amount; + } + + int32_t get() { + return value_; + } + + private: + int32_t value_; + +}; + + +int main(int argc, char *argv[]) { + shared_ptr<StorageHandler> handler(new StorageHandler()); + shared_ptr<TProcessor> processor(new StorageProcessor(handler)); + + zmq::context_t ctx(1); + TZmqServer reqrep_server(processor, ctx, "tcp://0.0.0.0:9090", ZMQ_REP); + TZmqServer oneway_server(processor, ctx, "tcp://0.0.0.0:9091", ZMQ_PULL); + TZmqMultiServer multiserver; + multiserver.servers().push_back(&reqrep_server); + multiserver.servers().push_back(&oneway_server); + multiserver.serveForever(); + + return 0; +} diff --git a/src/jaegertracing/thrift/contrib/zeromq/test-server.py b/src/jaegertracing/thrift/contrib/zeromq/test-server.py new file mode 100755 index 000000000..d89b37ba2 --- /dev/null +++ b/src/jaegertracing/thrift/contrib/zeromq/test-server.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import zmq +import TZmqServer +import storage.ttypes +import storage.Storage + + +class StorageHandler(storage.Storage.Iface): + def __init__(self): + self.value = 0 + + def incr(self, amount): + self.value += amount + + def get(self): + return self.value + + +def main(): + handler = StorageHandler() + processor = storage.Storage.Processor(handler) + + ctx = zmq.Context() + reqrep_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9090", zmq.REP) + oneway_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9091", zmq.PULL) + multiserver = TZmqServer.TZmqMultiServer() + multiserver.servers.append(reqrep_server) + multiserver.servers.append(oneway_server) + multiserver.serveForever() + + +if __name__ == "__main__": + main() |