summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/build/android/gyp/util/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/build/android/gyp/util/parallel.py')
-rw-r--r--third_party/libwebrtc/build/android/gyp/util/parallel.py214
1 files changed, 214 insertions, 0 deletions
diff --git a/third_party/libwebrtc/build/android/gyp/util/parallel.py b/third_party/libwebrtc/build/android/gyp/util/parallel.py
new file mode 100644
index 0000000000..c26875a71c
--- /dev/null
+++ b/third_party/libwebrtc/build/android/gyp/util/parallel.py
@@ -0,0 +1,214 @@
+# Copyright 2020 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Helpers related to multiprocessing.
+
+Based on: //tools/binary_size/libsupersize/parallel.py
+"""
+
+import atexit
+import logging
+import multiprocessing
+import os
+import sys
+import threading
+import traceback
+
+DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
+if DISABLE_ASYNC:
+ logging.warning('Running in synchronous mode.')
+
+_all_pools = None
+_is_child_process = False
+_silence_exceptions = False
+
+# Used to pass parameters to forked processes without pickling.
+_fork_params = None
+_fork_kwargs = None
+
+
+class _ImmediateResult(object):
+ def __init__(self, value):
+ self._value = value
+
+ def get(self):
+ return self._value
+
+ def wait(self):
+ pass
+
+ def ready(self):
+ return True
+
+ def successful(self):
+ return True
+
+
+class _ExceptionWrapper(object):
+ """Used to marshal exception messages back to main process."""
+
+ def __init__(self, msg, exception_type=None):
+ self.msg = msg
+ self.exception_type = exception_type
+
+ def MaybeThrow(self):
+ if self.exception_type:
+ raise getattr(__builtins__,
+ self.exception_type)('Originally caused by: ' + self.msg)
+
+
+class _FuncWrapper(object):
+ """Runs on the fork()'ed side to catch exceptions and spread *args."""
+
+ def __init__(self, func):
+ global _is_child_process
+ _is_child_process = True
+ self._func = func
+
+ def __call__(self, index, _=None):
+ try:
+ return self._func(*_fork_params[index], **_fork_kwargs)
+ except Exception as e:
+ # Only keep the exception type for builtin exception types or else risk
+ # further marshalling exceptions.
+ exception_type = None
+ if hasattr(__builtins__, type(e).__name__):
+ exception_type = type(e).__name__
+ # multiprocessing is supposed to catch and return exceptions automatically
+ # but it doesn't seem to work properly :(.
+ return _ExceptionWrapper(traceback.format_exc(), exception_type)
+ except: # pylint: disable=bare-except
+ return _ExceptionWrapper(traceback.format_exc())
+
+
+class _WrappedResult(object):
+ """Allows for host-side logic to be run after child process has terminated.
+
+ * Unregisters associated pool _all_pools.
+ * Raises exception caught by _FuncWrapper.
+ """
+
+ def __init__(self, result, pool=None):
+ self._result = result
+ self._pool = pool
+
+ def get(self):
+ self.wait()
+ value = self._result.get()
+ _CheckForException(value)
+ return value
+
+ def wait(self):
+ self._result.wait()
+ if self._pool:
+ _all_pools.remove(self._pool)
+ self._pool = None
+
+ def ready(self):
+ return self._result.ready()
+
+ def successful(self):
+ return self._result.successful()
+
+
+def _TerminatePools():
+ """Calls .terminate() on all active process pools.
+
+ Not supposed to be necessary according to the docs, but seems to be required
+ when child process throws an exception or Ctrl-C is hit.
+ """
+ global _silence_exceptions
+ _silence_exceptions = True
+ # Child processes cannot have pools, but atexit runs this function because
+ # it was registered before fork()ing.
+ if _is_child_process:
+ return
+
+ def close_pool(pool):
+ try:
+ pool.terminate()
+ except: # pylint: disable=bare-except
+ pass
+
+ for i, pool in enumerate(_all_pools):
+ # Without calling terminate() on a separate thread, the call can block
+ # forever.
+ thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
+ target=close_pool,
+ args=(pool, ))
+ thread.daemon = True
+ thread.start()
+
+
+def _CheckForException(value):
+ if isinstance(value, _ExceptionWrapper):
+ global _silence_exceptions
+ if not _silence_exceptions:
+ value.MaybeThrow()
+ _silence_exceptions = True
+ logging.error('Subprocess raised an exception:\n%s', value.msg)
+ sys.exit(1)
+
+
+def _MakeProcessPool(job_params, **job_kwargs):
+ global _all_pools
+ global _fork_params
+ global _fork_kwargs
+ assert _fork_params is None
+ assert _fork_kwargs is None
+ pool_size = min(len(job_params), multiprocessing.cpu_count())
+ _fork_params = job_params
+ _fork_kwargs = job_kwargs
+ ret = multiprocessing.Pool(pool_size)
+ _fork_params = None
+ _fork_kwargs = None
+ if _all_pools is None:
+ _all_pools = []
+ atexit.register(_TerminatePools)
+ _all_pools.append(ret)
+ return ret
+
+
+def ForkAndCall(func, args):
+ """Runs |func| in a fork'ed process.
+
+ Returns:
+ A Result object (call .get() to get the return value)
+ """
+ if DISABLE_ASYNC:
+ pool = None
+ result = _ImmediateResult(func(*args))
+ else:
+ pool = _MakeProcessPool([args]) # Omit |kwargs|.
+ result = pool.apply_async(_FuncWrapper(func), (0, ))
+ pool.close()
+ return _WrappedResult(result, pool=pool)
+
+
+def BulkForkAndCall(func, arg_tuples, **kwargs):
+ """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
+
+ Args:
+ kwargs: Common keyword arguments to be passed to |func|.
+
+ Yields the return values in order.
+ """
+ arg_tuples = list(arg_tuples)
+ if not arg_tuples:
+ return
+
+ if DISABLE_ASYNC:
+ for args in arg_tuples:
+ yield func(*args, **kwargs)
+ return
+
+ pool = _MakeProcessPool(arg_tuples, **kwargs)
+ wrapped_func = _FuncWrapper(func)
+ try:
+ for result in pool.imap(wrapped_func, range(len(arg_tuples))):
+ _CheckForException(result)
+ yield result
+ finally:
+ pool.close()
+ pool.join()
+ _all_pools.remove(pool)