summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/py/src/server
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/jaegertracing/thrift/lib/py/src/server/THttpServer.py131
-rw-r--r--src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py370
-rw-r--r--src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py123
-rw-r--r--src/jaegertracing/thrift/lib/py/src/server/TServer.py323
-rw-r--r--src/jaegertracing/thrift/lib/py/src/server/__init__.py20
5 files changed, 967 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py b/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py
new file mode 100644
index 000000000..47e817df7
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import ssl
+
+from six.moves import BaseHTTPServer
+
+from thrift.Thrift import TMessageType
+from thrift.server import TServer
+from thrift.transport import TTransport
+
+
+class ResponseException(Exception):
+ """Allows handlers to override the HTTP response
+
+ Normally, THttpServer always sends a 200 response. If a handler wants
+ to override this behavior (e.g., to simulate a misconfigured or
+ overloaded web server during testing), it can raise a ResponseException.
+ The function passed to the constructor will be called with the
+ RequestHandler as its only argument. Note that this is irrelevant
+ for ONEWAY requests, as the HTTP response must be sent before the
+ RPC is processed.
+ """
+ def __init__(self, handler):
+ self.handler = handler
+
+
+class THttpServer(TServer.TServer):
+ """A simple HTTP-based Thrift server
+
+ This class is not very performant, but it is useful (for example) for
+ acting as a mock version of an Apache-based PHP Thrift endpoint.
+ Also important to note the HTTP implementation pretty much violates the
+ transport/protocol/processor/server layering, by performing the transport
+ functions here. This means things like oneway handling are oddly exposed.
+ """
+ def __init__(self,
+ processor,
+ server_address,
+ inputProtocolFactory,
+ outputProtocolFactory=None,
+ server_class=BaseHTTPServer.HTTPServer,
+ **kwargs):
+ """Set up protocol factories and HTTP (or HTTPS) server.
+
+ See BaseHTTPServer for server_address.
+ See TServer for protocol factories.
+
+ To make a secure server, provide the named arguments:
+ * cafile - to validate clients [optional]
+ * cert_file - the server cert
+ * key_file - the server's key
+ """
+ if outputProtocolFactory is None:
+ outputProtocolFactory = inputProtocolFactory
+
+ TServer.TServer.__init__(self, processor, None, None, None,
+ inputProtocolFactory, outputProtocolFactory)
+
+ thttpserver = self
+ self._replied = None
+
+ class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ # Don't care about the request path.
+ thttpserver._replied = False
+ iftrans = TTransport.TFileObjectTransport(self.rfile)
+ itrans = TTransport.TBufferedTransport(
+ iftrans, int(self.headers['Content-Length']))
+ otrans = TTransport.TMemoryBuffer()
+ iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
+ oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
+ try:
+ thttpserver.processor.on_message_begin(self.on_begin)
+ thttpserver.processor.process(iprot, oprot)
+ except ResponseException as exn:
+ exn.handler(self)
+ else:
+ if not thttpserver._replied:
+ # If the request was ONEWAY we would have replied already
+ data = otrans.getvalue()
+ self.send_response(200)
+ self.send_header("Content-Length", len(data))
+ self.send_header("Content-Type", "application/x-thrift")
+ self.end_headers()
+ self.wfile.write(data)
+
+ def on_begin(self, name, type, seqid):
+ """
+ Inspect the message header.
+
+ This allows us to post an immediate transport response
+ if the request is a ONEWAY message type.
+ """
+ if type == TMessageType.ONEWAY:
+ self.send_response(200)
+ self.send_header("Content-Type", "application/x-thrift")
+ self.end_headers()
+ thttpserver._replied = True
+
+ self.httpd = server_class(server_address, RequestHander)
+
+ if (kwargs.get('cafile') or kwargs.get('cert_file') or kwargs.get('key_file')):
+ context = ssl.create_default_context(cafile=kwargs.get('cafile'))
+ context.check_hostname = False
+ context.load_cert_chain(kwargs.get('cert_file'), kwargs.get('key_file'))
+ context.verify_mode = ssl.CERT_REQUIRED if kwargs.get('cafile') else ssl.CERT_NONE
+ self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True)
+
+ def serve(self):
+ self.httpd.serve_forever()
+
+ def shutdown(self):
+ self.httpd.socket.close()
+ # self.httpd.shutdown() # hangs forever, python doesn't handle POLLNVAL properly!
diff --git a/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py b/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py
new file mode 100644
index 000000000..f62d486eb
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py
@@ -0,0 +1,370 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""Implementation of non-blocking server.
+
+The main idea of the server is to receive and send requests
+only from the main thread.
+
+The thread poool should be sized for concurrent tasks, not
+maximum connections
+"""
+
+import logging
+import select
+import socket
+import struct
+import threading
+
+from collections import deque
+from six.moves import queue
+
+from thrift.transport import TTransport
+from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
+
+__all__ = ['TNonblockingServer']
+
+logger = logging.getLogger(__name__)
+
+
+class Worker(threading.Thread):
+ """Worker is a small helper to process incoming connection."""
+
+ def __init__(self, queue):
+ threading.Thread.__init__(self)
+ self.queue = queue
+
+ def run(self):
+ """Process queries from task queue, stop if processor is None."""
+ while True:
+ try:
+ processor, iprot, oprot, otrans, callback = self.queue.get()
+ if processor is None:
+ break
+ processor.process(iprot, oprot)
+ callback(True, otrans.getvalue())
+ except Exception:
+ logger.exception("Exception while processing request", exc_info=True)
+ callback(False, b'')
+
+
+WAIT_LEN = 0
+WAIT_MESSAGE = 1
+WAIT_PROCESS = 2
+SEND_ANSWER = 3
+CLOSED = 4
+
+
+def locked(func):
+ """Decorator which locks self.lock."""
+ def nested(self, *args, **kwargs):
+ self.lock.acquire()
+ try:
+ return func(self, *args, **kwargs)
+ finally:
+ self.lock.release()
+ return nested
+
+
+def socket_exception(func):
+ """Decorator close object on socket.error."""
+ def read(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except socket.error:
+ logger.debug('ignoring socket exception', exc_info=True)
+ self.close()
+ return read
+
+
+class Message(object):
+ def __init__(self, offset, len_, header):
+ self.offset = offset
+ self.len = len_
+ self.buffer = None
+ self.is_header = header
+
+ @property
+ def end(self):
+ return self.offset + self.len
+
+
+class Connection(object):
+ """Basic class is represented connection.
+
+ It can be in state:
+ WAIT_LEN --- connection is reading request len.
+ WAIT_MESSAGE --- connection is reading request.
+ WAIT_PROCESS --- connection has just read whole request and
+ waits for call ready routine.
+ SEND_ANSWER --- connection is sending answer string (including length
+ of answer).
+ CLOSED --- socket was closed and connection should be deleted.
+ """
+ def __init__(self, new_socket, wake_up):
+ self.socket = new_socket
+ self.socket.setblocking(False)
+ self.status = WAIT_LEN
+ self.len = 0
+ self.received = deque()
+ self._reading = Message(0, 4, True)
+ self._rbuf = b''
+ self._wbuf = b''
+ self.lock = threading.Lock()
+ self.wake_up = wake_up
+ self.remaining = False
+
+ @socket_exception
+ def read(self):
+ """Reads data from stream and switch state."""
+ assert self.status in (WAIT_LEN, WAIT_MESSAGE)
+ assert not self.received
+ buf_size = 8192
+ first = True
+ done = False
+ while not done:
+ read = self.socket.recv(buf_size)
+ rlen = len(read)
+ done = rlen < buf_size
+ self._rbuf += read
+ if first and rlen == 0:
+ if self.status != WAIT_LEN or self._rbuf:
+ logger.error('could not read frame from socket')
+ else:
+ logger.debug('read zero length. client might have disconnected')
+ self.close()
+ while len(self._rbuf) >= self._reading.end:
+ if self._reading.is_header:
+ mlen, = struct.unpack('!i', self._rbuf[:4])
+ self._reading = Message(self._reading.end, mlen, False)
+ self.status = WAIT_MESSAGE
+ else:
+ self._reading.buffer = self._rbuf
+ self.received.append(self._reading)
+ self._rbuf = self._rbuf[self._reading.end:]
+ self._reading = Message(0, 4, True)
+ first = False
+ if self.received:
+ self.status = WAIT_PROCESS
+ break
+ self.remaining = not done
+
+ @socket_exception
+ def write(self):
+ """Writes data from socket and switch state."""
+ assert self.status == SEND_ANSWER
+ sent = self.socket.send(self._wbuf)
+ if sent == len(self._wbuf):
+ self.status = WAIT_LEN
+ self._wbuf = b''
+ self.len = 0
+ else:
+ self._wbuf = self._wbuf[sent:]
+
+ @locked
+ def ready(self, all_ok, message):
+ """Callback function for switching state and waking up main thread.
+
+ This function is the only function witch can be called asynchronous.
+
+ The ready can switch Connection to three states:
+ WAIT_LEN if request was oneway.
+ SEND_ANSWER if request was processed in normal way.
+ CLOSED if request throws unexpected exception.
+
+ The one wakes up main thread.
+ """
+ assert self.status == WAIT_PROCESS
+ if not all_ok:
+ self.close()
+ self.wake_up()
+ return
+ self.len = 0
+ if len(message) == 0:
+ # it was a oneway request, do not write answer
+ self._wbuf = b''
+ self.status = WAIT_LEN
+ else:
+ self._wbuf = struct.pack('!i', len(message)) + message
+ self.status = SEND_ANSWER
+ self.wake_up()
+
+ @locked
+ def is_writeable(self):
+ """Return True if connection should be added to write list of select"""
+ return self.status == SEND_ANSWER
+
+ # it's not necessary, but...
+ @locked
+ def is_readable(self):
+ """Return True if connection should be added to read list of select"""
+ return self.status in (WAIT_LEN, WAIT_MESSAGE)
+
+ @locked
+ def is_closed(self):
+ """Returns True if connection is closed."""
+ return self.status == CLOSED
+
+ def fileno(self):
+ """Returns the file descriptor of the associated socket."""
+ return self.socket.fileno()
+
+ def close(self):
+ """Closes connection"""
+ self.status = CLOSED
+ self.socket.close()
+
+
+class TNonblockingServer(object):
+ """Non-blocking server."""
+
+ def __init__(self,
+ processor,
+ lsocket,
+ inputProtocolFactory=None,
+ outputProtocolFactory=None,
+ threads=10):
+ self.processor = processor
+ self.socket = lsocket
+ self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
+ self.out_protocol = outputProtocolFactory or self.in_protocol
+ self.threads = int(threads)
+ self.clients = {}
+ self.tasks = queue.Queue()
+ self._read, self._write = socket.socketpair()
+ self.prepared = False
+ self._stop = False
+
+ def setNumThreads(self, num):
+ """Set the number of worker threads that should be created."""
+ # implement ThreadPool interface
+ assert not self.prepared, "Can't change number of threads after start"
+ self.threads = num
+
+ def prepare(self):
+ """Prepares server for serve requests."""
+ if self.prepared:
+ return
+ self.socket.listen()
+ for _ in range(self.threads):
+ thread = Worker(self.tasks)
+ thread.setDaemon(True)
+ thread.start()
+ self.prepared = True
+
+ def wake_up(self):
+ """Wake up main thread.
+
+ The server usually waits in select call in we should terminate one.
+ The simplest way is using socketpair.
+
+ Select always wait to read from the first socket of socketpair.
+
+ In this case, we can just write anything to the second socket from
+ socketpair.
+ """
+ self._write.send(b'1')
+
+ def stop(self):
+ """Stop the server.
+
+ This method causes the serve() method to return. stop() may be invoked
+ from within your handler, or from another thread.
+
+ After stop() is called, serve() will return but the server will still
+ be listening on the socket. serve() may then be called again to resume
+ processing requests. Alternatively, close() may be called after
+ serve() returns to close the server socket and shutdown all worker
+ threads.
+ """
+ self._stop = True
+ self.wake_up()
+
+ def _select(self):
+ """Does select on open connections."""
+ readable = [self.socket.handle.fileno(), self._read.fileno()]
+ writable = []
+ remaining = []
+ for i, connection in list(self.clients.items()):
+ if connection.is_readable():
+ readable.append(connection.fileno())
+ if connection.remaining or connection.received:
+ remaining.append(connection.fileno())
+ if connection.is_writeable():
+ writable.append(connection.fileno())
+ if connection.is_closed():
+ del self.clients[i]
+ if remaining:
+ return remaining, [], [], False
+ else:
+ return select.select(readable, writable, readable) + (True,)
+
+ def handle(self):
+ """Handle requests.
+
+ WARNING! You must call prepare() BEFORE calling handle()
+ """
+ assert self.prepared, "You have to call prepare before handle"
+ rset, wset, xset, selected = self._select()
+ for readable in rset:
+ if readable == self._read.fileno():
+ # don't care i just need to clean readable flag
+ self._read.recv(1024)
+ elif readable == self.socket.handle.fileno():
+ try:
+ client = self.socket.accept()
+ if client:
+ self.clients[client.handle.fileno()] = Connection(client.handle,
+ self.wake_up)
+ except socket.error:
+ logger.debug('error while accepting', exc_info=True)
+ else:
+ connection = self.clients[readable]
+ if selected:
+ connection.read()
+ if connection.received:
+ connection.status = WAIT_PROCESS
+ msg = connection.received.popleft()
+ itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
+ otransport = TTransport.TMemoryBuffer()
+ iprot = self.in_protocol.getProtocol(itransport)
+ oprot = self.out_protocol.getProtocol(otransport)
+ self.tasks.put([self.processor, iprot, oprot,
+ otransport, connection.ready])
+ for writeable in wset:
+ self.clients[writeable].write()
+ for oob in xset:
+ self.clients[oob].close()
+ del self.clients[oob]
+
+ def close(self):
+ """Closes the server."""
+ for _ in range(self.threads):
+ self.tasks.put([None, None, None, None, None])
+ self.socket.close()
+ self.prepared = False
+
+ def serve(self):
+ """Serve requests.
+
+ Serve requests forever, or until stop() is called.
+ """
+ self._stop = False
+ self.prepare()
+ while not self._stop:
+ self.handle()
diff --git a/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py b/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py
new file mode 100644
index 000000000..fe6dc8162
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import logging
+
+from multiprocessing import Process, Value, Condition
+
+from .TServer import TServer
+from thrift.transport.TTransport import TTransportException
+
+logger = logging.getLogger(__name__)
+
+
+class TProcessPoolServer(TServer):
+ """Server with a fixed size pool of worker subprocesses to service requests
+
+ Note that if you need shared state between the handlers - it's up to you!
+ Written by Dvir Volk, doat.com
+ """
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ self.numWorkers = 10
+ self.workers = []
+ self.isRunning = Value('b', False)
+ self.stopCondition = Condition()
+ self.postForkCallback = None
+
+ def setPostForkCallback(self, callback):
+ if not callable(callback):
+ raise TypeError("This is not a callback!")
+ self.postForkCallback = callback
+
+ def setNumWorkers(self, num):
+ """Set the number of worker threads that should be created"""
+ self.numWorkers = num
+
+ def workerProcess(self):
+ """Loop getting clients from the shared queue and process them"""
+ if self.postForkCallback:
+ self.postForkCallback()
+
+ while self.isRunning.value:
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ self.serveClient(client)
+ except (KeyboardInterrupt, SystemExit):
+ return 0
+ except Exception as x:
+ logger.exception(x)
+
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
+
+ itrans.close()
+ otrans.close()
+
+ def serve(self):
+ """Start workers and put into queue"""
+ # this is a shared state that can tell the workers to exit when False
+ self.isRunning.value = True
+
+ # first bind and listen to the port
+ self.serverTransport.listen()
+
+ # fork the children
+ for i in range(self.numWorkers):
+ try:
+ w = Process(target=self.workerProcess)
+ w.daemon = True
+ w.start()
+ self.workers.append(w)
+ except Exception as x:
+ logger.exception(x)
+
+ # wait until the condition is set by stop()
+ while True:
+ self.stopCondition.acquire()
+ try:
+ self.stopCondition.wait()
+ break
+ except (SystemExit, KeyboardInterrupt):
+ break
+ except Exception as x:
+ logger.exception(x)
+
+ self.isRunning.value = False
+
+ def stop(self):
+ self.isRunning.value = False
+ self.stopCondition.acquire()
+ self.stopCondition.notify()
+ self.stopCondition.release()
diff --git a/src/jaegertracing/thrift/lib/py/src/server/TServer.py b/src/jaegertracing/thrift/lib/py/src/server/TServer.py
new file mode 100644
index 000000000..df2a7bb93
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/py/src/server/TServer.py
@@ -0,0 +1,323 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from six.moves import queue
+import logging
+import os
+import threading
+
+from thrift.protocol import TBinaryProtocol
+from thrift.protocol.THeaderProtocol import THeaderProtocolFactory
+from thrift.transport import TTransport
+
+logger = logging.getLogger(__name__)
+
+
+class TServer(object):
+ """Base interface for a server, which must have a serve() method.
+
+ Three constructors for all servers:
+ 1) (processor, serverTransport)
+ 2) (processor, serverTransport, transportFactory, protocolFactory)
+ 3) (processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory)
+ """
+ def __init__(self, *args):
+ if (len(args) == 2):
+ self.__initArgs__(args[0], args[1],
+ TTransport.TTransportFactoryBase(),
+ TTransport.TTransportFactoryBase(),
+ TBinaryProtocol.TBinaryProtocolFactory(),
+ TBinaryProtocol.TBinaryProtocolFactory())
+ elif (len(args) == 4):
+ self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
+ elif (len(args) == 6):
+ self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
+
+ def __initArgs__(self, processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory):
+ self.processor = processor
+ self.serverTransport = serverTransport
+ self.inputTransportFactory = inputTransportFactory
+ self.outputTransportFactory = outputTransportFactory
+ self.inputProtocolFactory = inputProtocolFactory
+ self.outputProtocolFactory = outputProtocolFactory
+
+ input_is_header = isinstance(self.inputProtocolFactory, THeaderProtocolFactory)
+ output_is_header = isinstance(self.outputProtocolFactory, THeaderProtocolFactory)
+ if any((input_is_header, output_is_header)) and input_is_header != output_is_header:
+ raise ValueError("THeaderProtocol servers require that both the input and "
+ "output protocols are THeaderProtocol.")
+
+ def serve(self):
+ pass
+
+
+class TSimpleServer(TServer):
+ """Simple single-threaded server that just pumps around one transport."""
+
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+
+ itrans = self.inputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+
+ # for THeaderProtocol, we must use the same protocol instance for
+ # input and output so that the response is in the same dialect that
+ # the server detected the request was in.
+ if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
+ otrans = None
+ oprot = iprot
+ else:
+ otrans = self.outputTransportFactory.getTransport(client)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
+
+ itrans.close()
+ if otrans:
+ otrans.close()
+
+
+class TThreadedServer(TServer):
+ """Threaded server that spawns a new thread per each connection."""
+
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.daemon = kwargs.get("daemon", False)
+
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ t = threading.Thread(target=self.handle, args=(client,))
+ t.setDaemon(self.daemon)
+ t.start()
+ except KeyboardInterrupt:
+ raise
+ except Exception as x:
+ logger.exception(x)
+
+ def handle(self, client):
+ itrans = self.inputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+
+ # for THeaderProtocol, we must use the same protocol instance for input
+ # and output so that the response is in the same dialect that the
+ # server detected the request was in.
+ if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
+ otrans = None
+ oprot = iprot
+ else:
+ otrans = self.outputTransportFactory.getTransport(client)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
+
+ itrans.close()
+ if otrans:
+ otrans.close()
+
+
+class TThreadPoolServer(TServer):
+ """Server with a fixed size pool of threads which service requests."""
+
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.clients = queue.Queue()
+ self.threads = 10
+ self.daemon = kwargs.get("daemon", False)
+
+ def setNumThreads(self, num):
+ """Set the number of worker threads that should be created"""
+ self.threads = num
+
+ def serveThread(self):
+ """Loop around getting clients from the shared queue and process them."""
+ while True:
+ try:
+ client = self.clients.get()
+ self.serveClient(client)
+ except Exception as x:
+ logger.exception(x)
+
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ itrans = self.inputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+
+ # for THeaderProtocol, we must use the same protocol instance for input
+ # and output so that the response is in the same dialect that the
+ # server detected the request was in.
+ if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
+ otrans = None
+ oprot = iprot
+ else:
+ otrans = self.outputTransportFactory.getTransport(client)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
+
+ itrans.close()
+ if otrans:
+ otrans.close()
+
+ def serve(self):
+ """Start a fixed number of worker threads and put client into a queue"""
+ for i in range(self.threads):
+ try:
+ t = threading.Thread(target=self.serveThread)
+ t.setDaemon(self.daemon)
+ t.start()
+ except Exception as x:
+ logger.exception(x)
+
+ # Pump the socket for clients
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ self.clients.put(client)
+ except Exception as x:
+ logger.exception(x)
+
+
+class TForkingServer(TServer):
+ """A Thrift server that forks a new process for each request
+
+ This is more scalable than the threaded server as it does not cause
+ GIL contention.
+
+ Note that this has different semantics from the threading server.
+ Specifically, updates to shared variables will no longer be shared.
+ It will also not work on windows.
+
+ This code is heavily inspired by SocketServer.ForkingMixIn in the
+ Python stdlib.
+ """
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ self.children = []
+
+ def serve(self):
+ def try_close(file):
+ try:
+ file.close()
+ except IOError as e:
+ logger.warning(e, exc_info=True)
+
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ try:
+ pid = os.fork()
+
+ if pid: # parent
+ # add before collect, otherwise you race w/ waitpid
+ self.children.append(pid)
+ self.collect_children()
+
+ # Parent must close socket or the connection may not get
+ # closed promptly
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ try_close(itrans)
+ try_close(otrans)
+ else:
+ itrans = self.inputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+
+ # for THeaderProtocol, we must use the same protocol
+ # instance for input and output so that the response is in
+ # the same dialect that the server detected the request was
+ # in.
+ if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
+ otrans = None
+ oprot = iprot
+ else:
+ otrans = self.outputTransportFactory.getTransport(client)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ ecode = 0
+ try:
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as e:
+ logger.exception(e)
+ ecode = 1
+ finally:
+ try_close(itrans)
+ if otrans:
+ try_close(otrans)
+
+ os._exit(ecode)
+
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
+
+ def collect_children(self):
+ while self.children:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except os.error:
+ pid = None
+
+ if pid:
+ self.children.remove(pid)
+ else:
+ break
diff --git a/src/jaegertracing/thrift/lib/py/src/server/__init__.py b/src/jaegertracing/thrift/lib/py/src/server/__init__.py
new file mode 100644
index 000000000..1bf6e254e
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/py/src/server/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+__all__ = ['TServer', 'TNonblockingServer']