summaryrefslogtreecommitdiffstats
path: root/js/src/gdb/taskpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/gdb/taskpool.py')
-rw-r--r--js/src/gdb/taskpool.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/js/src/gdb/taskpool.py b/js/src/gdb/taskpool.py
new file mode 100644
index 0000000000..38254af6b3
--- /dev/null
+++ b/js/src/gdb/taskpool.py
@@ -0,0 +1,240 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+# flake8: noqa: F821
+
+import fcntl
+import os
+import select
+import time
+from subprocess import PIPE, Popen
+
+
+class TaskPool(object):
+ # Run a series of subprocesses. Try to keep up to a certain number going in
+ # parallel at any given time. Enforce time limits.
+ #
+ # This is implemented using non-blocking I/O, and so is Unix-specific.
+ #
+ # We assume that, if a task closes its standard error, then it's safe to
+ # wait for it to terminate. So an ill-behaved task that closes its standard
+ # output and then hangs will hang us, as well. However, as it takes special
+ # effort to close one's standard output, this seems unlikely to be a
+ # problem in practice.
+
+ # A task we should run in a subprocess. Users should subclass this and
+ # fill in the methods as given.
+ class Task(object):
+ def __init__(self):
+ self.pipe = None
+ self.start_time = None
+
+ # Record that this task is running, with |pipe| as its Popen object,
+ # and should time out at |deadline|.
+ def start(self, pipe, deadline):
+ self.pipe = pipe
+ self.deadline = deadline
+
+ # Return a shell command (a string or sequence of arguments) to be
+ # passed to Popen to run the task. The command will be given
+ # /dev/null as its standard input, and pipes as its standard output
+ # and error.
+ def cmd(self):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process wrote
+ # |string| to its standard output.
+ def onStdout(self, string):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process wrote
+ # |string| to its standard error.
+ def onStderr(self, string):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process terminated,
+ # yielding |returncode|.
+ def onFinished(self, returncode):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process timed out and
+ # was killed.
+ def onTimeout(self):
+ raise NotImplementedError
+
+ # If a task output handler (onStdout, onStderr) throws this, we terminate
+ # the task.
+ class TerminateTask(Exception):
+ pass
+
+ def __init__(self, tasks, cwd=".", job_limit=4, timeout=150):
+ self.pending = iter(tasks)
+ self.cwd = cwd
+ self.job_limit = job_limit
+ self.timeout = timeout
+ self.next_pending = next(self.pending, None)
+
+ def run_all(self):
+ # The currently running tasks: a set of Task instances.
+ running = set()
+ with open(os.devnull, "r") as devnull:
+ while True:
+ while len(running) < self.job_limit and self.next_pending:
+ task = self.next_pending
+ p = Popen(
+ task.cmd(),
+ bufsize=16384,
+ stdin=devnull,
+ stdout=PIPE,
+ stderr=PIPE,
+ cwd=self.cwd,
+ )
+
+ # Put the stdout and stderr pipes in non-blocking mode. See
+ # the post-'select' code below for details.
+ flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
+ fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
+ fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ task.start(p, time.time() + self.timeout)
+ running.add(task)
+ self.next_pending = next(self.pending, None)
+
+ # If we have no tasks running, and the above wasn't able to
+ # start any new ones, then we must be done!
+ if not running:
+ break
+
+ # How many seconds do we have until the earliest deadline?
+ now = time.time()
+ secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
+
+ # Wait for output or a timeout.
+ stdouts_and_stderrs = [t.pipe.stdout for t in running] + [
+ t.pipe.stderr for t in running
+ ]
+ (readable, w, x) = select.select(
+ stdouts_and_stderrs, [], [], secs_to_next_deadline
+ )
+ finished = set()
+ terminate = set()
+ for t in running:
+ # Since we've placed the pipes in non-blocking mode, these
+ # 'read's will simply return as many bytes as are available,
+ # rather than blocking until they have accumulated the full
+ # amount requested (or reached EOF). The 'read's should
+ # never throw, since 'select' has told us there was
+ # something available.
+ if t.pipe.stdout in readable:
+ output = t.pipe.stdout.read(16384)
+ if len(output):
+ try:
+ t.onStdout(output.decode("utf-8"))
+ except TerminateTask:
+ terminate.add(t)
+ if t.pipe.stderr in readable:
+ output = t.pipe.stderr.read(16384)
+ if len(output):
+ try:
+ t.onStderr(output.decode("utf-8"))
+ except TerminateTask:
+ terminate.add(t)
+ else:
+ # We assume that, once a task has closed its stderr,
+ # it will soon terminate. If a task closes its
+ # stderr and then hangs, we'll hang too, here.
+ t.pipe.wait()
+ t.onFinished(t.pipe.returncode)
+ finished.add(t)
+ # Remove the finished tasks from the running set. (Do this here
+ # to avoid mutating the set while iterating over it.)
+ running -= finished
+
+ # Terminate any tasks whose handlers have asked us to do so.
+ for t in terminate:
+ t.pipe.terminate()
+ t.pipe.wait()
+ running.remove(t)
+
+ # Terminate any tasks which have missed their deadline.
+ finished = set()
+ for t in running:
+ if now >= t.deadline:
+ t.pipe.terminate()
+ t.pipe.wait()
+ t.onTimeout()
+ finished.add(t)
+ # Remove the finished tasks from the running set. (Do this here
+ # to avoid mutating the set while iterating over it.)
+ running -= finished
+ return None
+
+
+def get_cpu_count():
+ """
+ Guess at a reasonable parallelism count to set as the default for the
+ current machine and run.
+ """
+ # Python 2.6+
+ try:
+ import multiprocessing
+
+ return multiprocessing.cpu_count()
+ except (ImportError, NotImplementedError):
+ pass
+
+ # POSIX
+ try:
+ res = int(os.sysconf("SC_NPROCESSORS_ONLN"))
+ if res > 0:
+ return res
+ except (AttributeError, ValueError):
+ pass
+
+ # Windows
+ try:
+ res = int(os.environ["NUMBER_OF_PROCESSORS"])
+ if res > 0:
+ return res
+ except (KeyError, ValueError):
+ pass
+
+ return 1
+
+
+if __name__ == "__main__":
+ # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
+ def sleep_sort(ns, timeout):
+ sorted = []
+
+ class SortableTask(TaskPool.Task):
+ def __init__(self, n):
+ super(SortableTask, self).__init__()
+ self.n = n
+
+ def start(self, pipe, deadline):
+ super(SortableTask, self).start(pipe, deadline)
+
+ def cmd(self):
+ return ["sh", "-c", "echo out; sleep %d; echo err>&2" % (self.n,)]
+
+ def onStdout(self, text):
+ print("%d stdout: %r" % (self.n, text))
+
+ def onStderr(self, text):
+ print("%d stderr: %r" % (self.n, text))
+
+ def onFinished(self, returncode):
+ print("%d (rc=%d)" % (self.n, returncode))
+ sorted.append(self.n)
+
+ def onTimeout(self):
+ print("%d timed out" % (self.n,))
+
+ p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
+ p.run_all()
+ return sorted
+
+ print(repr(sleep_sort([1, 1, 2, 3, 5, 8, 13, 21, 34], 15)))