diff options
Diffstat (limited to 'js/src/tests/lib/tasks_unix.py')
-rw-r--r-- | js/src/tests/lib/tasks_unix.py | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/js/src/tests/lib/tasks_unix.py b/js/src/tests/lib/tasks_unix.py new file mode 100644 index 0000000000..b86c0aa5ac --- /dev/null +++ b/js/src/tests/lib/tasks_unix.py @@ -0,0 +1,293 @@ +# A unix-oriented process dispatcher. Uses a single thread with select and +# waitpid to dispatch tasks. This avoids several deadlocks that are possible +# with fork/exec + threads + Python. + +import errno +import os +import select +import signal +import sys + +from datetime import datetime, timedelta + +from .progressbar import ProgressBar +from .results import ( + NullTestOutput, + TestOutput, + escape_cmdline, +) +from .adaptor import xdr_annotate + +PY2 = sys.version_info.major == 2 + + +class Task(object): + def __init__(self, test, prefix, tempdir, pid, stdout, stderr): + self.test = test + self.cmd = test.get_command(prefix, tempdir) + self.pid = pid + self.stdout = stdout + self.stderr = stderr + self.start = datetime.now() + self.out = [] + self.err = [] + + +def spawn_test(test, prefix, tempdir, passthrough, run_skipped, show_cmd): + """Spawn one child, return a task struct.""" + if not test.enable and not run_skipped: + return None + + cmd = test.get_command(prefix, tempdir) + if show_cmd: + print(escape_cmdline(cmd)) + + if not passthrough: + (rout, wout) = os.pipe() + (rerr, werr) = os.pipe() + + rv = os.fork() + + # Parent. + if rv: + os.close(wout) + os.close(werr) + return Task(test, prefix, tempdir, rv, rout, rerr) + + # Child. + os.close(rout) + os.close(rerr) + + os.dup2(wout, 1) + os.dup2(werr, 2) + + os.execvp(cmd[0], cmd) + + +def get_max_wait(tasks, timeout): + """ + Return the maximum time we can wait before any task should time out. + """ + + # If we have a progress-meter, we need to wake up to update it frequently. + wait = ProgressBar.update_granularity() + + # If a timeout is supplied, we need to wake up for the first task to + # timeout if that is sooner. + if timeout: + now = datetime.now() + timeout_delta = timedelta(seconds=timeout) + for task in tasks: + remaining = task.start + timeout_delta - now + if remaining < wait: + wait = remaining + + # Return the wait time in seconds, clamped between zero and max_wait. + return max(wait.total_seconds(), 0) + + +def flush_input(fd, frags): + """ + Read any pages sitting in the file descriptor 'fd' into the list 'frags'. + """ + rv = os.read(fd, 4096) + frags.append(rv) + while len(rv) == 4096: + # If read() returns a full buffer, it may indicate there was 1 buffer + # worth of data, or that there is more data to read. Poll the socket + # before we read again to ensure that we will not block indefinitly. + readable, _, _ = select.select([fd], [], [], 0) + if not readable: + return + + rv = os.read(fd, 4096) + frags.append(rv) + + +def read_input(tasks, timeout): + """ + Select on input or errors from the given task list for a max of timeout + seconds. + """ + rlist = [] + exlist = [] + outmap = {} # Fast access to fragment list given fd. + for t in tasks: + rlist.append(t.stdout) + rlist.append(t.stderr) + outmap[t.stdout] = t.out + outmap[t.stderr] = t.err + # This will trigger with a close event when the child dies, allowing + # us to respond immediately and not leave cores idle. + exlist.append(t.stdout) + + readable = [] + try: + readable, _, _ = select.select(rlist, [], exlist, timeout) + except OverflowError: + print >>sys.stderr, "timeout value", timeout + raise + + for fd in readable: + flush_input(fd, outmap[fd]) + + +def remove_task(tasks, pid): + """ + Return a pair with the removed task and the new, modified tasks list. + """ + index = None + for i, t in enumerate(tasks): + if t.pid == pid: + index = i + break + else: + raise KeyError("No such pid: {}".format(pid)) + + out = tasks[index] + tasks.pop(index) + return out + + +def timed_out(task, timeout): + """ + Return a timedelta with the amount we are overdue, or False if the timeout + has not yet been reached (or timeout is falsy, indicating there is no + timeout.) + """ + if not timeout: + return False + + elapsed = datetime.now() - task.start + over = elapsed - timedelta(seconds=timeout) + return over if over.total_seconds() > 0 else False + + +# Local copy of six.ensure_str for when six is unavailable or too old. +def ensure_str(s, encoding="utf-8", errors="strict"): + if PY2: + if isinstance(s, str): + return s + else: + return s.encode(encoding, errors) + elif isinstance(s, bytes): + return s.decode(encoding, errors) + else: + return s + + +def reap_zombies(tasks, timeout): + """ + Search for children of this process that have finished. If they are tasks, + then this routine will clean up the child. This method returns a new task + list that has had the ended tasks removed, followed by the list of finished + tasks. + """ + finished = [] + while True: + try: + pid, status = os.waitpid(0, os.WNOHANG) + if pid == 0: + break + except OSError as e: + if e.errno == errno.ECHILD: + break + raise e + + ended = remove_task(tasks, pid) + flush_input(ended.stdout, ended.out) + flush_input(ended.stderr, ended.err) + os.close(ended.stdout) + os.close(ended.stderr) + + returncode = os.WEXITSTATUS(status) + if os.WIFSIGNALED(status): + returncode = -os.WTERMSIG(status) + + finished.append( + TestOutput( + ended.test, + ended.cmd, + ensure_str(b"".join(ended.out), errors="replace"), + ensure_str(b"".join(ended.err), errors="replace"), + returncode, + (datetime.now() - ended.start).total_seconds(), + timed_out(ended, timeout), + {"pid": ended.pid}, + ) + ) + return tasks, finished + + +def kill_undead(tasks, timeout): + """ + Signal all children that are over the given timeout. Use SIGABRT first to + generate a stack dump. If it still doesn't die for another 30 seconds, kill + with SIGKILL. + """ + for task in tasks: + over = timed_out(task, timeout) + if over: + if over.total_seconds() < 30: + os.kill(task.pid, signal.SIGABRT) + else: + os.kill(task.pid, signal.SIGKILL) + + +def run_all_tests(tests, prefix, tempdir, pb, options): + # Copy and reverse for fast pop off end. + tests = list(tests) + tests = tests[:] + tests.reverse() + + # The set of currently running tests. + tasks = [] + + # Piggy back on the first test to generate the XDR content needed for all + # other tests to run. To avoid read/write races, we temporarily limit the + # number of workers. + wait_for_encoding = False + worker_count = options.worker_count + if options.use_xdr and len(tests) > 1: + # The next loop pops tests, thus we iterate over the tests in reversed + # order. + tests = list(xdr_annotate(reversed(tests), options)) + tests = tests[:] + tests.reverse() + wait_for_encoding = True + worker_count = 1 + + while len(tests) or len(tasks): + while len(tests) and len(tasks) < worker_count: + test = tests.pop() + task = spawn_test( + test, + prefix, + tempdir, + options.passthrough, + options.run_skipped, + options.show_cmd, + ) + if task: + tasks.append(task) + else: + yield NullTestOutput(test) + + timeout = get_max_wait(tasks, options.timeout) + read_input(tasks, timeout) + + kill_undead(tasks, options.timeout) + tasks, finished = reap_zombies(tasks, options.timeout) + + # With Python3.4+ we could use yield from to remove this loop. + for out in finished: + yield out + if wait_for_encoding and out.test == test: + assert test.selfhosted_xdr_mode == "encode" + wait_for_encoding = False + worker_count = options.worker_count + + # If we did not finish any tasks, poke the progress bar to show that + # the test harness is at least not frozen. + if len(finished) == 0: + pb.poke() |