diff options
Diffstat (limited to '')
5 files changed, 967 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py b/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py new file mode 100644 index 000000000..47e817df7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/py/src/server/THttpServer.py @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import ssl + +from six.moves import BaseHTTPServer + +from thrift.Thrift import TMessageType +from thrift.server import TServer +from thrift.transport import TTransport + + +class ResponseException(Exception): + """Allows handlers to override the HTTP response + + Normally, THttpServer always sends a 200 response. If a handler wants + to override this behavior (e.g., to simulate a misconfigured or + overloaded web server during testing), it can raise a ResponseException. + The function passed to the constructor will be called with the + RequestHandler as its only argument. Note that this is irrelevant + for ONEWAY requests, as the HTTP response must be sent before the + RPC is processed. + """ + def __init__(self, handler): + self.handler = handler + + +class THttpServer(TServer.TServer): + """A simple HTTP-based Thrift server + + This class is not very performant, but it is useful (for example) for + acting as a mock version of an Apache-based PHP Thrift endpoint. + Also important to note the HTTP implementation pretty much violates the + transport/protocol/processor/server layering, by performing the transport + functions here. This means things like oneway handling are oddly exposed. + """ + def __init__(self, + processor, + server_address, + inputProtocolFactory, + outputProtocolFactory=None, + server_class=BaseHTTPServer.HTTPServer, + **kwargs): + """Set up protocol factories and HTTP (or HTTPS) server. + + See BaseHTTPServer for server_address. + See TServer for protocol factories. + + To make a secure server, provide the named arguments: + * cafile - to validate clients [optional] + * cert_file - the server cert + * key_file - the server's key + """ + if outputProtocolFactory is None: + outputProtocolFactory = inputProtocolFactory + + TServer.TServer.__init__(self, processor, None, None, None, + inputProtocolFactory, outputProtocolFactory) + + thttpserver = self + self._replied = None + + class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler): + def do_POST(self): + # Don't care about the request path. + thttpserver._replied = False + iftrans = TTransport.TFileObjectTransport(self.rfile) + itrans = TTransport.TBufferedTransport( + iftrans, int(self.headers['Content-Length'])) + otrans = TTransport.TMemoryBuffer() + iprot = thttpserver.inputProtocolFactory.getProtocol(itrans) + oprot = thttpserver.outputProtocolFactory.getProtocol(otrans) + try: + thttpserver.processor.on_message_begin(self.on_begin) + thttpserver.processor.process(iprot, oprot) + except ResponseException as exn: + exn.handler(self) + else: + if not thttpserver._replied: + # If the request was ONEWAY we would have replied already + data = otrans.getvalue() + self.send_response(200) + self.send_header("Content-Length", len(data)) + self.send_header("Content-Type", "application/x-thrift") + self.end_headers() + self.wfile.write(data) + + def on_begin(self, name, type, seqid): + """ + Inspect the message header. + + This allows us to post an immediate transport response + if the request is a ONEWAY message type. + """ + if type == TMessageType.ONEWAY: + self.send_response(200) + self.send_header("Content-Type", "application/x-thrift") + self.end_headers() + thttpserver._replied = True + + self.httpd = server_class(server_address, RequestHander) + + if (kwargs.get('cafile') or kwargs.get('cert_file') or kwargs.get('key_file')): + context = ssl.create_default_context(cafile=kwargs.get('cafile')) + context.check_hostname = False + context.load_cert_chain(kwargs.get('cert_file'), kwargs.get('key_file')) + context.verify_mode = ssl.CERT_REQUIRED if kwargs.get('cafile') else ssl.CERT_NONE + self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True) + + def serve(self): + self.httpd.serve_forever() + + def shutdown(self): + self.httpd.socket.close() + # self.httpd.shutdown() # hangs forever, python doesn't handle POLLNVAL properly! diff --git a/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py b/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py new file mode 100644 index 000000000..f62d486eb --- /dev/null +++ b/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py @@ -0,0 +1,370 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""Implementation of non-blocking server. + +The main idea of the server is to receive and send requests +only from the main thread. + +The thread poool should be sized for concurrent tasks, not +maximum connections +""" + +import logging +import select +import socket +import struct +import threading + +from collections import deque +from six.moves import queue + +from thrift.transport import TTransport +from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory + +__all__ = ['TNonblockingServer'] + +logger = logging.getLogger(__name__) + + +class Worker(threading.Thread): + """Worker is a small helper to process incoming connection.""" + + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + + def run(self): + """Process queries from task queue, stop if processor is None.""" + while True: + try: + processor, iprot, oprot, otrans, callback = self.queue.get() + if processor is None: + break + processor.process(iprot, oprot) + callback(True, otrans.getvalue()) + except Exception: + logger.exception("Exception while processing request", exc_info=True) + callback(False, b'') + + +WAIT_LEN = 0 +WAIT_MESSAGE = 1 +WAIT_PROCESS = 2 +SEND_ANSWER = 3 +CLOSED = 4 + + +def locked(func): + """Decorator which locks self.lock.""" + def nested(self, *args, **kwargs): + self.lock.acquire() + try: + return func(self, *args, **kwargs) + finally: + self.lock.release() + return nested + + +def socket_exception(func): + """Decorator close object on socket.error.""" + def read(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except socket.error: + logger.debug('ignoring socket exception', exc_info=True) + self.close() + return read + + +class Message(object): + def __init__(self, offset, len_, header): + self.offset = offset + self.len = len_ + self.buffer = None + self.is_header = header + + @property + def end(self): + return self.offset + self.len + + +class Connection(object): + """Basic class is represented connection. + + It can be in state: + WAIT_LEN --- connection is reading request len. + WAIT_MESSAGE --- connection is reading request. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. + SEND_ANSWER --- connection is sending answer string (including length + of answer). + CLOSED --- socket was closed and connection should be deleted. + """ + def __init__(self, new_socket, wake_up): + self.socket = new_socket + self.socket.setblocking(False) + self.status = WAIT_LEN + self.len = 0 + self.received = deque() + self._reading = Message(0, 4, True) + self._rbuf = b'' + self._wbuf = b'' + self.lock = threading.Lock() + self.wake_up = wake_up + self.remaining = False + + @socket_exception + def read(self): + """Reads data from stream and switch state.""" + assert self.status in (WAIT_LEN, WAIT_MESSAGE) + assert not self.received + buf_size = 8192 + first = True + done = False + while not done: + read = self.socket.recv(buf_size) + rlen = len(read) + done = rlen < buf_size + self._rbuf += read + if first and rlen == 0: + if self.status != WAIT_LEN or self._rbuf: + logger.error('could not read frame from socket') + else: + logger.debug('read zero length. client might have disconnected') + self.close() + while len(self._rbuf) >= self._reading.end: + if self._reading.is_header: + mlen, = struct.unpack('!i', self._rbuf[:4]) + self._reading = Message(self._reading.end, mlen, False) + self.status = WAIT_MESSAGE + else: + self._reading.buffer = self._rbuf + self.received.append(self._reading) + self._rbuf = self._rbuf[self._reading.end:] + self._reading = Message(0, 4, True) + first = False + if self.received: + self.status = WAIT_PROCESS + break + self.remaining = not done + + @socket_exception + def write(self): + """Writes data from socket and switch state.""" + assert self.status == SEND_ANSWER + sent = self.socket.send(self._wbuf) + if sent == len(self._wbuf): + self.status = WAIT_LEN + self._wbuf = b'' + self.len = 0 + else: + self._wbuf = self._wbuf[sent:] + + @locked + def ready(self, all_ok, message): + """Callback function for switching state and waking up main thread. + + This function is the only function witch can be called asynchronous. + + The ready can switch Connection to three states: + WAIT_LEN if request was oneway. + SEND_ANSWER if request was processed in normal way. + CLOSED if request throws unexpected exception. + + The one wakes up main thread. + """ + assert self.status == WAIT_PROCESS + if not all_ok: + self.close() + self.wake_up() + return + self.len = 0 + if len(message) == 0: + # it was a oneway request, do not write answer + self._wbuf = b'' + self.status = WAIT_LEN + else: + self._wbuf = struct.pack('!i', len(message)) + message + self.status = SEND_ANSWER + self.wake_up() + + @locked + def is_writeable(self): + """Return True if connection should be added to write list of select""" + return self.status == SEND_ANSWER + + # it's not necessary, but... + @locked + def is_readable(self): + """Return True if connection should be added to read list of select""" + return self.status in (WAIT_LEN, WAIT_MESSAGE) + + @locked + def is_closed(self): + """Returns True if connection is closed.""" + return self.status == CLOSED + + def fileno(self): + """Returns the file descriptor of the associated socket.""" + return self.socket.fileno() + + def close(self): + """Closes connection""" + self.status = CLOSED + self.socket.close() + + +class TNonblockingServer(object): + """Non-blocking server.""" + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): + self.processor = processor + self.socket = lsocket + self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() + self.out_protocol = outputProtocolFactory or self.in_protocol + self.threads = int(threads) + self.clients = {} + self.tasks = queue.Queue() + self._read, self._write = socket.socketpair() + self.prepared = False + self._stop = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created.""" + # implement ThreadPool interface + assert not self.prepared, "Can't change number of threads after start" + self.threads = num + + def prepare(self): + """Prepares server for serve requests.""" + if self.prepared: + return + self.socket.listen() + for _ in range(self.threads): + thread = Worker(self.tasks) + thread.setDaemon(True) + thread.start() + self.prepared = True + + def wake_up(self): + """Wake up main thread. + + The server usually waits in select call in we should terminate one. + The simplest way is using socketpair. + + Select always wait to read from the first socket of socketpair. + + In this case, we can just write anything to the second socket from + socketpair. + """ + self._write.send(b'1') + + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + + def _select(self): + """Does select on open connections.""" + readable = [self.socket.handle.fileno(), self._read.fileno()] + writable = [] + remaining = [] + for i, connection in list(self.clients.items()): + if connection.is_readable(): + readable.append(connection.fileno()) + if connection.remaining or connection.received: + remaining.append(connection.fileno()) + if connection.is_writeable(): + writable.append(connection.fileno()) + if connection.is_closed(): + del self.clients[i] + if remaining: + return remaining, [], [], False + else: + return select.select(readable, writable, readable) + (True,) + + def handle(self): + """Handle requests. + + WARNING! You must call prepare() BEFORE calling handle() + """ + assert self.prepared, "You have to call prepare before handle" + rset, wset, xset, selected = self._select() + for readable in rset: + if readable == self._read.fileno(): + # don't care i just need to clean readable flag + self._read.recv(1024) + elif readable == self.socket.handle.fileno(): + try: + client = self.socket.accept() + if client: + self.clients[client.handle.fileno()] = Connection(client.handle, + self.wake_up) + except socket.error: + logger.debug('error while accepting', exc_info=True) + else: + connection = self.clients[readable] + if selected: + connection.read() + if connection.received: + connection.status = WAIT_PROCESS + msg = connection.received.popleft() + itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset) + otransport = TTransport.TMemoryBuffer() + iprot = self.in_protocol.getProtocol(itransport) + oprot = self.out_protocol.getProtocol(otransport) + self.tasks.put([self.processor, iprot, oprot, + otransport, connection.ready]) + for writeable in wset: + self.clients[writeable].write() + for oob in xset: + self.clients[oob].close() + del self.clients[oob] + + def close(self): + """Closes the server.""" + for _ in range(self.threads): + self.tasks.put([None, None, None, None, None]) + self.socket.close() + self.prepared = False + + def serve(self): + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False + self.prepare() + while not self._stop: + self.handle() diff --git a/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py b/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py new file mode 100644 index 000000000..fe6dc8162 --- /dev/null +++ b/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py @@ -0,0 +1,123 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +import logging + +from multiprocessing import Process, Value, Condition + +from .TServer import TServer +from thrift.transport.TTransport import TTransportException + +logger = logging.getLogger(__name__) + + +class TProcessPoolServer(TServer): + """Server with a fixed size pool of worker subprocesses to service requests + + Note that if you need shared state between the handlers - it's up to you! + Written by Dvir Volk, doat.com + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.numWorkers = 10 + self.workers = [] + self.isRunning = Value('b', False) + self.stopCondition = Condition() + self.postForkCallback = None + + def setPostForkCallback(self, callback): + if not callable(callback): + raise TypeError("This is not a callback!") + self.postForkCallback = callback + + def setNumWorkers(self, num): + """Set the number of worker threads that should be created""" + self.numWorkers = num + + def workerProcess(self): + """Loop getting clients from the shared queue and process them""" + if self.postForkCallback: + self.postForkCallback() + + while self.isRunning.value: + try: + client = self.serverTransport.accept() + if not client: + continue + self.serveClient(client) + except (KeyboardInterrupt, SystemExit): + return 0 + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransportException: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start workers and put into queue""" + # this is a shared state that can tell the workers to exit when False + self.isRunning.value = True + + # first bind and listen to the port + self.serverTransport.listen() + + # fork the children + for i in range(self.numWorkers): + try: + w = Process(target=self.workerProcess) + w.daemon = True + w.start() + self.workers.append(w) + except Exception as x: + logger.exception(x) + + # wait until the condition is set by stop() + while True: + self.stopCondition.acquire() + try: + self.stopCondition.wait() + break + except (SystemExit, KeyboardInterrupt): + break + except Exception as x: + logger.exception(x) + + self.isRunning.value = False + + def stop(self): + self.isRunning.value = False + self.stopCondition.acquire() + self.stopCondition.notify() + self.stopCondition.release() diff --git a/src/jaegertracing/thrift/lib/py/src/server/TServer.py b/src/jaegertracing/thrift/lib/py/src/server/TServer.py new file mode 100644 index 000000000..df2a7bb93 --- /dev/null +++ b/src/jaegertracing/thrift/lib/py/src/server/TServer.py @@ -0,0 +1,323 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from six.moves import queue +import logging +import os +import threading + +from thrift.protocol import TBinaryProtocol +from thrift.protocol.THeaderProtocol import THeaderProtocolFactory +from thrift.transport import TTransport + +logger = logging.getLogger(__name__) + + +class TServer(object): + """Base interface for a server, which must have a serve() method. + + Three constructors for all servers: + 1) (processor, serverTransport) + 2) (processor, serverTransport, transportFactory, protocolFactory) + 3) (processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) + """ + def __init__(self, *args): + if (len(args) == 2): + self.__initArgs__(args[0], args[1], + TTransport.TTransportFactoryBase(), + TTransport.TTransportFactoryBase(), + TBinaryProtocol.TBinaryProtocolFactory(), + TBinaryProtocol.TBinaryProtocolFactory()) + elif (len(args) == 4): + self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) + elif (len(args) == 6): + self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) + + def __initArgs__(self, processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory): + self.processor = processor + self.serverTransport = serverTransport + self.inputTransportFactory = inputTransportFactory + self.outputTransportFactory = outputTransportFactory + self.inputProtocolFactory = inputProtocolFactory + self.outputProtocolFactory = outputProtocolFactory + + input_is_header = isinstance(self.inputProtocolFactory, THeaderProtocolFactory) + output_is_header = isinstance(self.outputProtocolFactory, THeaderProtocolFactory) + if any((input_is_header, output_is_header)) and input_is_header != output_is_header: + raise ValueError("THeaderProtocol servers require that both the input and " + "output protocols are THeaderProtocol.") + + def serve(self): + pass + + +class TSimpleServer(TServer): + """Simple single-threaded server that just pumps around one transport.""" + + def __init__(self, *args): + TServer.__init__(self, *args) + + def serve(self): + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + if not client: + continue + + itrans = self.inputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + + # for THeaderProtocol, we must use the same protocol instance for + # input and output so that the response is in the same dialect that + # the server detected the request was in. + if isinstance(self.inputProtocolFactory, THeaderProtocolFactory): + otrans = None + oprot = iprot + else: + otrans = self.outputTransportFactory.getTransport(client) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + if otrans: + otrans.close() + + +class TThreadedServer(TServer): + """Threaded server that spawns a new thread per each connection.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.daemon = kwargs.get("daemon", False) + + def serve(self): + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + if not client: + continue + t = threading.Thread(target=self.handle, args=(client,)) + t.setDaemon(self.daemon) + t.start() + except KeyboardInterrupt: + raise + except Exception as x: + logger.exception(x) + + def handle(self, client): + itrans = self.inputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + + # for THeaderProtocol, we must use the same protocol instance for input + # and output so that the response is in the same dialect that the + # server detected the request was in. + if isinstance(self.inputProtocolFactory, THeaderProtocolFactory): + otrans = None + oprot = iprot + else: + otrans = self.outputTransportFactory.getTransport(client) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + if otrans: + otrans.close() + + +class TThreadPoolServer(TServer): + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.clients = queue.Queue() + self.threads = 10 + self.daemon = kwargs.get("daemon", False) + + def setNumThreads(self, num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the shared queue and process them.""" + while True: + try: + client = self.clients.get() + self.serveClient(client) + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + + # for THeaderProtocol, we must use the same protocol instance for input + # and output so that the response is in the same dialect that the + # server detected the request was in. + if isinstance(self.inputProtocolFactory, THeaderProtocolFactory): + otrans = None + oprot = iprot + else: + otrans = self.outputTransportFactory.getTransport(client) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + if otrans: + otrans.close() + + def serve(self): + """Start a fixed number of worker threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.setDaemon(self.daemon) + t.start() + except Exception as x: + logger.exception(x) + + # Pump the socket for clients + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + if not client: + continue + self.clients.put(client) + except Exception as x: + logger.exception(x) + + +class TForkingServer(TServer): + """A Thrift server that forks a new process for each request + + This is more scalable than the threaded server as it does not cause + GIL contention. + + Note that this has different semantics from the threading server. + Specifically, updates to shared variables will no longer be shared. + It will also not work on windows. + + This code is heavily inspired by SocketServer.ForkingMixIn in the + Python stdlib. + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.children = [] + + def serve(self): + def try_close(file): + try: + file.close() + except IOError as e: + logger.warning(e, exc_info=True) + + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + if not client: + continue + try: + pid = os.fork() + + if pid: # parent + # add before collect, otherwise you race w/ waitpid + self.children.append(pid) + self.collect_children() + + # Parent must close socket or the connection may not get + # closed promptly + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + try_close(itrans) + try_close(otrans) + else: + itrans = self.inputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + + # for THeaderProtocol, we must use the same protocol + # instance for input and output so that the response is in + # the same dialect that the server detected the request was + # in. + if isinstance(self.inputProtocolFactory, THeaderProtocolFactory): + otrans = None + oprot = iprot + else: + otrans = self.outputTransportFactory.getTransport(client) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + ecode = 0 + try: + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException: + pass + except Exception as e: + logger.exception(e) + ecode = 1 + finally: + try_close(itrans) + if otrans: + try_close(otrans) + + os._exit(ecode) + + except TTransport.TTransportException: + pass + except Exception as x: + logger.exception(x) + + def collect_children(self): + while self.children: + try: + pid, status = os.waitpid(0, os.WNOHANG) + except os.error: + pid = None + + if pid: + self.children.remove(pid) + else: + break diff --git a/src/jaegertracing/thrift/lib/py/src/server/__init__.py b/src/jaegertracing/thrift/lib/py/src/server/__init__.py new file mode 100644 index 000000000..1bf6e254e --- /dev/null +++ b/src/jaegertracing/thrift/lib/py/src/server/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['TServer', 'TNonblockingServer'] |