# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import contextlib import multiprocessing import multiprocessing.managers import os import platform import random import socket import subprocess import sys import time from .compat import str_join from .report import ExecReporter, SummaryReporter from .test import TestEntry from .util import domain_socket_path RESULT_ERROR = 64 RESULT_TIMEOUT = 128 SIGNONE = 0 SIGKILL = 15 # globals ports = None stop = None class ExecutionContext(object): def __init__(self, cmd, cwd, env, stop_signal, is_server, report): self._log = multiprocessing.get_logger() self.cmd = cmd self.cwd = cwd self.env = env self.stop_signal = stop_signal self.is_server = is_server self.report = report self.expired = False self.killed = False self.proc = None def _popen_args(self): args = { 'cwd': self.cwd, 'env': self.env, 'stdout': self.report.out, 'stderr': subprocess.STDOUT, } # make sure child processes doesn't remain after killing if platform.system() == 'Windows': DETACHED_PROCESS = 0x00000008 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) else: args.update(preexec_fn=os.setsid) return args def start(self): joined = str_join(' ', self.cmd) self._log.debug('COMMAND: %s', joined) self._log.debug('WORKDIR: %s', self.cwd) self._log.debug('LOGFILE: %s', self.report.logpath) self.report.begin() self.proc = subprocess.Popen(self.cmd, **self._popen_args()) self._log.debug(' PID: %d', self.proc.pid) self._log.debug(' PGID: %d', os.getpgid(self.proc.pid)) return self._scoped() @contextlib.contextmanager def _scoped(self): yield self if self.is_server: # the server is supposed to run until we stop it if self.returncode is not None: self.report.died() else: if self.stop_signal != SIGNONE: if self.sigwait(self.stop_signal): self.report.end(self.returncode) else: self.report.killed() else: self.sigwait(SIGKILL) else: # the client is supposed to exit normally if self.returncode is not None: self.report.end(self.returncode) else: self.sigwait(SIGKILL) self.report.killed() self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode)) # Send a signal to the process and then wait for it to end # If the signal requested is SIGNONE, no signal is sent, and # instead we just wait for the process to end; further if it # does not end normally with SIGNONE, we mark it as expired. # If the process fails to end and the signal is not SIGKILL, # it re-runs with SIGKILL so that a real process kill occurs # returns True if the process ended, False if it may not have def sigwait(self, sig=SIGKILL, timeout=2): try: if sig != SIGNONE: self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig)) if sig == SIGKILL: self.killed = True try: if platform.system() != 'Windows': os.killpg(os.getpgid(self.proc.pid), sig) else: self.proc.send_signal(sig) except Exception: self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info()) self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout)) self.proc.communicate(timeout=timeout) self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode)) self.report.end(self.returncode) return True except subprocess.TimeoutExpired: self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid)) if sig == SIGNONE: self.expired = True return False if sig == SIGKILL else self.sigwait(SIGKILL, 1) # called on the client process to wait for it to end naturally def wait(self, timeout): self.sigwait(SIGNONE, timeout) @property def returncode(self): return self.proc.returncode if self.proc else None def exec_context(port, logdir, test, prog, is_server): report = ExecReporter(logdir, test, prog) prog.build_command(port) return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report) def run_test(testdir, logdir, test_dict, max_retry, async_mode=True): logger = multiprocessing.get_logger() def ensure_socket_open(sv, port, test): slept = 0.1 time.sleep(slept) sleep_step = 0.1 while True: if slept > test.delay: logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept)) return False if test.socket == 'domain': if not os.path.exists(domain_socket_path(port)): logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) time.sleep(sleep_step) slept += sleep_step elif test.socket == 'abstract': return True else: # Create sockets every iteration because refused sockets cannot be # reused on some systems. sock4 = socket.socket() sock6 = socket.socket(family=socket.AF_INET6) try: if sock4.connect_ex(('127.0.0.1', port)) == 0 \ or sock6.connect_ex(('::1', port)) == 0: return True if sv.proc.poll() is not None: logger.warn('[{0}] server process is exited'.format(sv.proc.pid)) return False logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) time.sleep(sleep_step) slept += sleep_step finally: sock4.close() sock6.close() logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept)) return True try: max_bind_retry = 3 retry_count = 0 bind_retry_count = 0 test = TestEntry(testdir, **test_dict) while True: if stop.is_set(): logger.debug('Skipping because shutting down') return (retry_count, None) logger.debug('Start') with PortAllocator.alloc_port_scoped(ports, test.socket) as port: logger.debug('Start with port %d' % port) sv = exec_context(port, logdir, test, test.server, True) cl = exec_context(port, logdir, test, test.client, False) logger.debug('Starting server') with sv.start(): port_ok = ensure_socket_open(sv, port, test) if port_ok: connect_retry_count = 0 max_connect_retry = 12 connect_retry_wait = 0.25 while True: if sv.proc.poll() is not None: logger.info('not starting client because server process is absent') break logger.debug('Starting client') cl.start() logger.debug('Waiting client (up to %d secs)' % test.timeout) cl.wait(test.timeout) if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: if connect_retry_count > 0 and connect_retry_count < max_connect_retry: logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) # Wait for 50ms to see if server does not die at the end. time.sleep(0.05) break logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait) time.sleep(connect_retry_wait) connect_retry_count += 1 if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry: logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) bind_retry_count += 1 else: result = RESULT_TIMEOUT if cl.expired else cl.returncode if (cl.proc and cl.proc.poll()) is not None else RESULT_ERROR # For servers that handle a controlled shutdown by signal # if they are killed, or return an error code, that is a # problem. For servers that are not signal-aware, we simply # kill them off; if we didn't kill them off, something else # happened (crashed?) if test.server.stop_signal != 0: if sv.killed or sv.returncode > 0: result |= RESULT_ERROR else: if not sv.killed: result |= RESULT_ERROR if result == 0 or retry_count >= max_retry: return (retry_count, result) else: logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) retry_count += 1 except Exception: if not async_mode: raise logger.warn('Error executing [%s]', test.name, exc_info=True) return (retry_count, RESULT_ERROR) except Exception: logger.info('Interrupted execution', exc_info=True) if not async_mode: raise stop.set() return (retry_count, RESULT_ERROR) class PortAllocator(object): def __init__(self): self._log = multiprocessing.get_logger() self._lock = multiprocessing.Lock() self._ports = set() self._dom_ports = set() self._last_alloc = 0 def _get_tcp_port(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', 0)) port = sock.getsockname()[1] self._lock.acquire() try: ok = port not in self._ports if ok: self._ports.add(port) self._last_alloc = time.time() finally: self._lock.release() sock.close() return port if ok else self._get_tcp_port() def _get_domain_port(self): port = random.randint(1024, 65536) self._lock.acquire() try: ok = port not in self._dom_ports if ok: self._dom_ports.add(port) finally: self._lock.release() return port if ok else self._get_domain_port() def alloc_port(self, socket_type): if socket_type in ('domain', 'abstract'): return self._get_domain_port() else: return self._get_tcp_port() # static method for inter-process invokation @staticmethod @contextlib.contextmanager def alloc_port_scoped(allocator, socket_type): port = allocator.alloc_port(socket_type) yield port allocator.free_port(socket_type, port) def free_port(self, socket_type, port): self._log.debug('free_port') self._lock.acquire() try: if socket_type == 'domain': self._dom_ports.remove(port) path = domain_socket_path(port) if os.path.exists(path): os.remove(path) elif socket_type == 'abstract': self._dom_ports.remove(port) else: self._ports.remove(port) except IOError: self._log.info('Error while freeing port', exc_info=sys.exc_info()) finally: self._lock.release() class NonAsyncResult(object): def __init__(self, value): self._value = value def get(self, timeout=None): return self._value def wait(self, timeout=None): pass def ready(self): return True def successful(self): return self._value == 0 class TestDispatcher(object): def __init__(self, testdir, basedir, logdir_rel, concurrency): self._log = multiprocessing.get_logger() self.testdir = testdir self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1) self.logdir = self._report.testdir # seems needed for python 2.x to handle keyboard interrupt self._stop = multiprocessing.Event() self._async = concurrency > 1 if not self._async: self._pool = None global stop global ports stop = self._stop ports = PortAllocator() else: self._m = multiprocessing.managers.BaseManager() self._m.register('ports', PortAllocator) self._m.start() self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) self._log.debug( 'TestDispatcher started with %d concurrent jobs' % concurrency) def _pool_init(self, address): global stop global m global ports stop = self._stop m = multiprocessing.managers.BaseManager(address) m.connect() ports = m.ports() def _dispatch_sync(self, test, cont, max_retry): r = run_test(self.testdir, self.logdir, test, max_retry, async_mode=False) cont(r) return NonAsyncResult(r) def _dispatch_async(self, test, cont, max_retry): self._log.debug('_dispatch_async') return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont) def dispatch(self, test, max_retry): index = self._report.add_test(test) def cont(result): if not self._stop.is_set(): if result and len(result) == 2: retry_count, returncode = result else: retry_count = 0 returncode = RESULT_ERROR self._log.debug('freeing port') self._log.debug('adding result') self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) self._log.debug('finish continuation') fn = self._dispatch_async if self._async else self._dispatch_sync return fn(test, cont, max_retry) def wait(self): if self._async: self._pool.close() self._pool.join() self._m.shutdown() return self._report.end() def terminate(self): self._stop.set() if self._async: self._pool.terminate() self._pool.join() self._m.shutdown()