summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/test/py/TestServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/test/py/TestServer.py')
-rwxr-xr-xsrc/jaegertracing/thrift/test/py/TestServer.py412
1 files changed, 412 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/test/py/TestServer.py b/src/jaegertracing/thrift/test/py/TestServer.py
new file mode 100755
index 000000000..d0a13e5f7
--- /dev/null
+++ b/src/jaegertracing/thrift/test/py/TestServer.py
@@ -0,0 +1,412 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import division
+import logging
+import os
+import signal
+import sys
+import time
+from optparse import OptionParser
+
+from util import local_libpath
+sys.path.insert(0, local_libpath())
+from thrift.protocol import TProtocol, TProtocolDecorator
+
+SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
+
+
+class TestHandler(object):
+ def testVoid(self):
+ if options.verbose > 1:
+ logging.info('testVoid()')
+
+ def testString(self, str):
+ if options.verbose > 1:
+ logging.info('testString(%s)' % str)
+ return str
+
+ def testBool(self, boolean):
+ if options.verbose > 1:
+ logging.info('testBool(%s)' % str(boolean).lower())
+ return boolean
+
+ def testByte(self, byte):
+ if options.verbose > 1:
+ logging.info('testByte(%d)' % byte)
+ return byte
+
+ def testI16(self, i16):
+ if options.verbose > 1:
+ logging.info('testI16(%d)' % i16)
+ return i16
+
+ def testI32(self, i32):
+ if options.verbose > 1:
+ logging.info('testI32(%d)' % i32)
+ return i32
+
+ def testI64(self, i64):
+ if options.verbose > 1:
+ logging.info('testI64(%d)' % i64)
+ return i64
+
+ def testDouble(self, dub):
+ if options.verbose > 1:
+ logging.info('testDouble(%f)' % dub)
+ return dub
+
+ def testBinary(self, thing):
+ if options.verbose > 1:
+ logging.info('testBinary()') # TODO: hex output
+ return thing
+
+ def testStruct(self, thing):
+ if options.verbose > 1:
+ logging.info('testStruct({%s, %s, %s, %s})' % (thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing))
+ return thing
+
+ def testException(self, arg):
+ # if options.verbose > 1:
+ logging.info('testException(%s)' % arg)
+ if arg == 'Xception':
+ raise Xception(errorCode=1001, message=arg)
+ elif arg == 'TException':
+ raise TException(message='This is a TException')
+
+ def testMultiException(self, arg0, arg1):
+ if options.verbose > 1:
+ logging.info('testMultiException(%s, %s)' % (arg0, arg1))
+ if arg0 == 'Xception':
+ raise Xception(errorCode=1001, message='This is an Xception')
+ elif arg0 == 'Xception2':
+ raise Xception2(
+ errorCode=2002,
+ struct_thing=Xtruct(string_thing='This is an Xception2'))
+ return Xtruct(string_thing=arg1)
+
+ def testOneway(self, seconds):
+ if options.verbose > 1:
+ logging.info('testOneway(%d) => sleeping...' % seconds)
+ time.sleep(seconds / 3) # be quick
+ if options.verbose > 1:
+ logging.info('done sleeping')
+
+ def testNest(self, thing):
+ if options.verbose > 1:
+ logging.info('testNest(%s)' % thing)
+ return thing
+
+ def testMap(self, thing):
+ if options.verbose > 1:
+ logging.info('testMap(%s)' % thing)
+ return thing
+
+ def testStringMap(self, thing):
+ if options.verbose > 1:
+ logging.info('testStringMap(%s)' % thing)
+ return thing
+
+ def testSet(self, thing):
+ if options.verbose > 1:
+ logging.info('testSet(%s)' % thing)
+ return thing
+
+ def testList(self, thing):
+ if options.verbose > 1:
+ logging.info('testList(%s)' % thing)
+ return thing
+
+ def testEnum(self, thing):
+ if options.verbose > 1:
+ logging.info('testEnum(%s)' % thing)
+ return thing
+
+ def testTypedef(self, thing):
+ if options.verbose > 1:
+ logging.info('testTypedef(%s)' % thing)
+ return thing
+
+ def testMapMap(self, thing):
+ if options.verbose > 1:
+ logging.info('testMapMap(%s)' % thing)
+ return {
+ -4: {
+ -4: -4,
+ -3: -3,
+ -2: -2,
+ -1: -1,
+ },
+ 4: {
+ 4: 4,
+ 3: 3,
+ 2: 2,
+ 1: 1,
+ },
+ }
+
+ def testInsanity(self, argument):
+ if options.verbose > 1:
+ logging.info('testInsanity(%s)' % argument)
+ return {
+ 1: {
+ 2: argument,
+ 3: argument,
+ },
+ 2: {6: Insanity()},
+ }
+
+ def testMulti(self, arg0, arg1, arg2, arg3, arg4, arg5):
+ if options.verbose > 1:
+ logging.info('testMulti(%s)' % [arg0, arg1, arg2, arg3, arg4, arg5])
+ return Xtruct(string_thing='Hello2',
+ byte_thing=arg0, i32_thing=arg1, i64_thing=arg2)
+
+
+class SecondHandler(object):
+ def secondtestString(self, argument):
+ return "testString(\"" + argument + "\")"
+
+
+# LAST_SEQID is a global because we have one transport and multiple protocols
+# running on it (when multiplexed)
+LAST_SEQID = None
+
+
+class TPedanticSequenceIdProtocolWrapper(TProtocolDecorator.TProtocolDecorator):
+ """
+ Wraps any protocol with sequence ID checking: looks for outbound
+ uniqueness as well as request/response alignment.
+ """
+ def __init__(self, protocol):
+ # TProtocolDecorator.__new__ does all the heavy lifting
+ pass
+
+ def readMessageBegin(self):
+ global LAST_SEQID
+ (name, type, seqid) =\
+ super(TPedanticSequenceIdProtocolWrapper, self).readMessageBegin()
+ if LAST_SEQID is not None and LAST_SEQID == seqid:
+ raise TProtocol.TProtocolException(
+ TProtocol.TProtocolException.INVALID_DATA,
+ "We received the same seqid {0} twice in a row".format(seqid))
+ LAST_SEQID = seqid
+ return (name, type, seqid)
+
+
+def make_pedantic(proto):
+ """ Wrap a protocol in the pedantic sequence ID wrapper. """
+ # NOTE: this is disabled for now as many clients send seqid
+ # of zero and that is okay, need a way to identify
+ # clients that MUST send seqid unique to function right
+ # or just force all implementations to send unique seqids (preferred)
+ return proto # TPedanticSequenceIdProtocolWrapper(proto)
+
+
+class TPedanticSequenceIdProtocolFactory(TProtocol.TProtocolFactory):
+ def __init__(self, encapsulated):
+ super(TPedanticSequenceIdProtocolFactory, self).__init__()
+ self.encapsulated = encapsulated
+
+ def getProtocol(self, trans):
+ return make_pedantic(self.encapsulated.getProtocol(trans))
+
+
+def main(options):
+ # common header allowed client types
+ allowed_client_types = [
+ THeaderTransport.THeaderClientType.HEADERS,
+ THeaderTransport.THeaderClientType.FRAMED_BINARY,
+ THeaderTransport.THeaderClientType.UNFRAMED_BINARY,
+ THeaderTransport.THeaderClientType.FRAMED_COMPACT,
+ THeaderTransport.THeaderClientType.UNFRAMED_COMPACT,
+ ]
+
+ # set up the protocol factory form the --protocol option
+ prot_factories = {
+ 'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory(),
+ 'multia': TBinaryProtocol.TBinaryProtocolAcceleratedFactory(),
+ 'accelc': TCompactProtocol.TCompactProtocolAcceleratedFactory(),
+ 'multiac': TCompactProtocol.TCompactProtocolAcceleratedFactory(),
+ 'binary': TPedanticSequenceIdProtocolFactory(TBinaryProtocol.TBinaryProtocolFactory()),
+ 'multi': TPedanticSequenceIdProtocolFactory(TBinaryProtocol.TBinaryProtocolFactory()),
+ 'compact': TCompactProtocol.TCompactProtocolFactory(),
+ 'multic': TCompactProtocol.TCompactProtocolFactory(),
+ 'header': THeaderProtocol.THeaderProtocolFactory(allowed_client_types),
+ 'multih': THeaderProtocol.THeaderProtocolFactory(allowed_client_types),
+ 'json': TJSONProtocol.TJSONProtocolFactory(),
+ 'multij': TJSONProtocol.TJSONProtocolFactory(),
+ }
+ pfactory = prot_factories.get(options.proto, None)
+ if pfactory is None:
+ raise AssertionError('Unknown --protocol option: %s' % options.proto)
+ try:
+ pfactory.string_length_limit = options.string_limit
+ pfactory.container_length_limit = options.container_limit
+ except Exception:
+ # Ignore errors for those protocols that does not support length limit
+ pass
+
+ # get the server type (TSimpleServer, TNonblockingServer, etc...)
+ if len(args) > 1:
+ raise AssertionError('Only one server type may be specified, not multiple types.')
+ server_type = args[0]
+ if options.trans == 'http':
+ server_type = 'THttpServer'
+
+ # Set up the handler and processor objects
+ handler = TestHandler()
+ processor = ThriftTest.Processor(handler)
+
+ if options.proto.startswith('multi'):
+ secondHandler = SecondHandler()
+ secondProcessor = SecondService.Processor(secondHandler)
+
+ multiplexedProcessor = TMultiplexedProcessor()
+ multiplexedProcessor.registerDefault(processor)
+ multiplexedProcessor.registerProcessor('ThriftTest', processor)
+ multiplexedProcessor.registerProcessor('SecondService', secondProcessor)
+ processor = multiplexedProcessor
+
+ global server
+
+ # Handle THttpServer as a special case
+ if server_type == 'THttpServer':
+ if options.ssl:
+ __certfile = os.path.join(os.path.dirname(SCRIPT_DIR), "keys", "server.crt")
+ __keyfile = os.path.join(os.path.dirname(SCRIPT_DIR), "keys", "server.key")
+ server = THttpServer.THttpServer(processor, ('', options.port), pfactory, cert_file=__certfile, key_file=__keyfile)
+ else:
+ server = THttpServer.THttpServer(processor, ('', options.port), pfactory)
+ server.serve()
+ sys.exit(0)
+
+ # set up server transport and transport factory
+
+ abs_key_path = os.path.join(os.path.dirname(SCRIPT_DIR), 'keys', 'server.pem')
+
+ host = None
+ if options.ssl:
+ from thrift.transport import TSSLSocket
+ transport = TSSLSocket.TSSLServerSocket(host, options.port, certfile=abs_key_path)
+ else:
+ transport = TSocket.TServerSocket(host, options.port)
+ tfactory = TTransport.TBufferedTransportFactory()
+ if options.trans == 'buffered':
+ tfactory = TTransport.TBufferedTransportFactory()
+ elif options.trans == 'framed':
+ tfactory = TTransport.TFramedTransportFactory()
+ elif options.trans == '':
+ raise AssertionError('Unknown --transport option: %s' % options.trans)
+ else:
+ tfactory = TTransport.TBufferedTransportFactory()
+ # if --zlib, then wrap server transport, and use a different transport factory
+ if options.zlib:
+ transport = TZlibTransport.TZlibTransport(transport) # wrap with zlib
+ tfactory = TZlibTransport.TZlibTransportFactory()
+
+ # do server-specific setup here:
+ if server_type == "TNonblockingServer":
+ server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
+ elif server_type == "TProcessPoolServer":
+ import signal
+ from thrift.server import TProcessPoolServer
+ server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
+ server.setNumWorkers(5)
+
+ def set_alarm():
+ def clean_shutdown(signum, frame):
+ for worker in server.workers:
+ if options.verbose > 0:
+ logging.info('Terminating worker: %s' % worker)
+ worker.terminate()
+ if options.verbose > 0:
+ logging.info('Requesting server to stop()')
+ try:
+ server.stop()
+ except Exception:
+ pass
+ signal.signal(signal.SIGALRM, clean_shutdown)
+ signal.alarm(4)
+ set_alarm()
+ else:
+ # look up server class dynamically to instantiate server
+ ServerClass = getattr(TServer, server_type)
+ server = ServerClass(processor, transport, tfactory, pfactory)
+ # enter server main loop
+ server.serve()
+
+
+def exit_gracefully(signum, frame):
+ print("SIGINT received\n")
+ server.shutdown() # doesn't work properly, yet
+ sys.exit(0)
+
+
+if __name__ == '__main__':
+ signal.signal(signal.SIGINT, exit_gracefully)
+
+ parser = OptionParser()
+ parser.add_option('--libpydir', type='string', dest='libpydir',
+ help='include this directory to sys.path for locating library code')
+ parser.add_option('--genpydir', type='string', dest='genpydir',
+ default='gen-py',
+ help='include this directory to sys.path for locating generated code')
+ parser.add_option("--port", type="int", dest="port",
+ help="port number for server to listen on")
+ parser.add_option("--zlib", action="store_true", dest="zlib",
+ help="use zlib wrapper for compressed transport")
+ parser.add_option("--ssl", action="store_true", dest="ssl",
+ help="use SSL for encrypted transport")
+ parser.add_option('-v', '--verbose', action="store_const",
+ dest="verbose", const=2,
+ help="verbose output")
+ parser.add_option('-q', '--quiet', action="store_const",
+ dest="verbose", const=0,
+ help="minimal output")
+ parser.add_option('--protocol', dest="proto", type="string",
+ help="protocol to use, one of: accel, accelc, binary, compact, json, multi, multia, multiac, multic, multih, multij")
+ parser.add_option('--transport', dest="trans", type="string",
+ help="transport to use, one of: buffered, framed, http")
+ parser.add_option('--container-limit', dest='container_limit', type='int', default=None)
+ parser.add_option('--string-limit', dest='string_limit', type='int', default=None)
+ parser.set_defaults(port=9090, verbose=1, proto='binary', transport='buffered')
+ options, args = parser.parse_args()
+
+ # Print TServer log to stdout so that the test-runner can redirect it to log files
+ logging.basicConfig(level=options.verbose)
+
+ sys.path.insert(0, os.path.join(SCRIPT_DIR, options.genpydir))
+
+ from ThriftTest import ThriftTest, SecondService
+ from ThriftTest.ttypes import Xtruct, Xception, Xception2, Insanity
+ from thrift.Thrift import TException
+ from thrift.TMultiplexedProcessor import TMultiplexedProcessor
+ from thrift.transport import THeaderTransport
+ from thrift.transport import TTransport
+ from thrift.transport import TSocket
+ from thrift.transport import TZlibTransport
+ from thrift.protocol import TBinaryProtocol
+ from thrift.protocol import TCompactProtocol
+ from thrift.protocol import THeaderProtocol
+ from thrift.protocol import TJSONProtocol
+ from thrift.server import TServer, TNonblockingServer, THttpServer
+
+ sys.exit(main(options))