214 lines
5.4 KiB
Python
214 lines
5.4 KiB
Python
# Copyright 2020 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
"""Helpers related to multiprocessing.
|
|
|
|
Based on: //tools/binary_size/libsupersize/parallel.py
|
|
"""
|
|
|
|
import atexit
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
import threading
|
|
import traceback
|
|
|
|
DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
|
|
if DISABLE_ASYNC:
|
|
logging.warning('Running in synchronous mode.')
|
|
|
|
_all_pools = None
|
|
_is_child_process = False
|
|
_silence_exceptions = False
|
|
|
|
# Used to pass parameters to forked processes without pickling.
|
|
_fork_params = None
|
|
_fork_kwargs = None
|
|
|
|
|
|
class _ImmediateResult(object):
|
|
def __init__(self, value):
|
|
self._value = value
|
|
|
|
def get(self):
|
|
return self._value
|
|
|
|
def wait(self):
|
|
pass
|
|
|
|
def ready(self):
|
|
return True
|
|
|
|
def successful(self):
|
|
return True
|
|
|
|
|
|
class _ExceptionWrapper(object):
|
|
"""Used to marshal exception messages back to main process."""
|
|
|
|
def __init__(self, msg, exception_type=None):
|
|
self.msg = msg
|
|
self.exception_type = exception_type
|
|
|
|
def MaybeThrow(self):
|
|
if self.exception_type:
|
|
raise getattr(__builtins__,
|
|
self.exception_type)('Originally caused by: ' + self.msg)
|
|
|
|
|
|
class _FuncWrapper(object):
|
|
"""Runs on the fork()'ed side to catch exceptions and spread *args."""
|
|
|
|
def __init__(self, func):
|
|
global _is_child_process
|
|
_is_child_process = True
|
|
self._func = func
|
|
|
|
def __call__(self, index, _=None):
|
|
try:
|
|
return self._func(*_fork_params[index], **_fork_kwargs)
|
|
except Exception as e:
|
|
# Only keep the exception type for builtin exception types or else risk
|
|
# further marshalling exceptions.
|
|
exception_type = None
|
|
if hasattr(__builtins__, type(e).__name__):
|
|
exception_type = type(e).__name__
|
|
# multiprocessing is supposed to catch and return exceptions automatically
|
|
# but it doesn't seem to work properly :(.
|
|
return _ExceptionWrapper(traceback.format_exc(), exception_type)
|
|
except: # pylint: disable=bare-except
|
|
return _ExceptionWrapper(traceback.format_exc())
|
|
|
|
|
|
class _WrappedResult(object):
|
|
"""Allows for host-side logic to be run after child process has terminated.
|
|
|
|
* Unregisters associated pool _all_pools.
|
|
* Raises exception caught by _FuncWrapper.
|
|
"""
|
|
|
|
def __init__(self, result, pool=None):
|
|
self._result = result
|
|
self._pool = pool
|
|
|
|
def get(self):
|
|
self.wait()
|
|
value = self._result.get()
|
|
_CheckForException(value)
|
|
return value
|
|
|
|
def wait(self):
|
|
self._result.wait()
|
|
if self._pool:
|
|
_all_pools.remove(self._pool)
|
|
self._pool = None
|
|
|
|
def ready(self):
|
|
return self._result.ready()
|
|
|
|
def successful(self):
|
|
return self._result.successful()
|
|
|
|
|
|
def _TerminatePools():
|
|
"""Calls .terminate() on all active process pools.
|
|
|
|
Not supposed to be necessary according to the docs, but seems to be required
|
|
when child process throws an exception or Ctrl-C is hit.
|
|
"""
|
|
global _silence_exceptions
|
|
_silence_exceptions = True
|
|
# Child processes cannot have pools, but atexit runs this function because
|
|
# it was registered before fork()ing.
|
|
if _is_child_process:
|
|
return
|
|
|
|
def close_pool(pool):
|
|
try:
|
|
pool.terminate()
|
|
except: # pylint: disable=bare-except
|
|
pass
|
|
|
|
for i, pool in enumerate(_all_pools):
|
|
# Without calling terminate() on a separate thread, the call can block
|
|
# forever.
|
|
thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
|
|
target=close_pool,
|
|
args=(pool, ))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
|
|
def _CheckForException(value):
|
|
if isinstance(value, _ExceptionWrapper):
|
|
global _silence_exceptions
|
|
if not _silence_exceptions:
|
|
value.MaybeThrow()
|
|
_silence_exceptions = True
|
|
logging.error('Subprocess raised an exception:\n%s', value.msg)
|
|
sys.exit(1)
|
|
|
|
|
|
def _MakeProcessPool(job_params, **job_kwargs):
|
|
global _all_pools
|
|
global _fork_params
|
|
global _fork_kwargs
|
|
assert _fork_params is None
|
|
assert _fork_kwargs is None
|
|
pool_size = min(len(job_params), multiprocessing.cpu_count())
|
|
_fork_params = job_params
|
|
_fork_kwargs = job_kwargs
|
|
ret = multiprocessing.Pool(pool_size)
|
|
_fork_params = None
|
|
_fork_kwargs = None
|
|
if _all_pools is None:
|
|
_all_pools = []
|
|
atexit.register(_TerminatePools)
|
|
_all_pools.append(ret)
|
|
return ret
|
|
|
|
|
|
def ForkAndCall(func, args):
|
|
"""Runs |func| in a fork'ed process.
|
|
|
|
Returns:
|
|
A Result object (call .get() to get the return value)
|
|
"""
|
|
if DISABLE_ASYNC:
|
|
pool = None
|
|
result = _ImmediateResult(func(*args))
|
|
else:
|
|
pool = _MakeProcessPool([args]) # Omit |kwargs|.
|
|
result = pool.apply_async(_FuncWrapper(func), (0, ))
|
|
pool.close()
|
|
return _WrappedResult(result, pool=pool)
|
|
|
|
|
|
def BulkForkAndCall(func, arg_tuples, **kwargs):
|
|
"""Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
|
|
|
|
Args:
|
|
kwargs: Common keyword arguments to be passed to |func|.
|
|
|
|
Yields the return values in order.
|
|
"""
|
|
arg_tuples = list(arg_tuples)
|
|
if not arg_tuples:
|
|
return
|
|
|
|
if DISABLE_ASYNC:
|
|
for args in arg_tuples:
|
|
yield func(*args, **kwargs)
|
|
return
|
|
|
|
pool = _MakeProcessPool(arg_tuples, **kwargs)
|
|
wrapped_func = _FuncWrapper(func)
|
|
try:
|
|
for result in pool.imap(wrapped_func, range(len(arg_tuples))):
|
|
_CheckForException(result)
|
|
yield result
|
|
finally:
|
|
pool.close()
|
|
pool.join()
|
|
_all_pools.remove(pool)
|