summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py')
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py
new file mode 100644
index 000000000..15c1543ac
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/TZmqServer.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import zmq
+import thrift.server.TServer
+import thrift.transport.TTransport
+
+
+class TZmqServer(thrift.server.TServer.TServer):
+ def __init__(self, processor, ctx, endpoint, sock_type):
+ thrift.server.TServer.TServer.__init__(self, processor, None)
+ self.zmq_type = sock_type
+ self.socket = ctx.socket(sock_type)
+ self.socket.bind(endpoint)
+
+ def serveOne(self):
+ msg = self.socket.recv()
+ itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
+ otrans = thrift.transport.TTransport.TMemoryBuffer()
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ self.processor.process(iprot, oprot)
+ except Exception:
+ logging.exception("Exception while processing request")
+ # Fall through and send back a response, even if empty or incomplete.
+
+ if self.zmq_type == zmq.REP:
+ msg = otrans.getvalue()
+ self.socket.send(msg)
+
+ def serve(self):
+ while True:
+ self.serveOne()
+
+
+class TZmqMultiServer(object):
+ def __init__(self):
+ self.servers = []
+
+ def serveOne(self, timeout=-1):
+ self._serveActive(self._setupPoll(), timeout)
+
+ def serveForever(self):
+ poll_info = self._setupPoll()
+ while True:
+ self._serveActive(poll_info, -1)
+
+ def _setupPoll(self):
+ server_map = {}
+ poller = zmq.Poller()
+ for server in self.servers:
+ server_map[server.socket] = server
+ poller.register(server.socket, zmq.POLLIN)
+ return (server_map, poller)
+
+ def _serveActive(self, poll_info, timeout):
+ (server_map, poller) = poll_info
+ ready = dict(poller.poll())
+ for sock, state in ready.items():
+ assert (state & zmq.POLLIN) != 0
+ server_map[sock].serveOne()