summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/build/android/gyp/util/parallel.py
blob: c26875a71ca0d06fc54b54840f360548ca341beb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helpers related to multiprocessing.

Based on: //tools/binary_size/libsupersize/parallel.py
"""

import atexit
import logging
import multiprocessing
import os
import sys
import threading
import traceback

DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
if DISABLE_ASYNC:
  logging.warning('Running in synchronous mode.')

_all_pools = None
_is_child_process = False
_silence_exceptions = False

# Used to pass parameters to forked processes without pickling.
_fork_params = None
_fork_kwargs = None


class _ImmediateResult(object):
  def __init__(self, value):
    self._value = value

  def get(self):
    return self._value

  def wait(self):
    pass

  def ready(self):
    return True

  def successful(self):
    return True


class _ExceptionWrapper(object):
  """Used to marshal exception messages back to main process."""

  def __init__(self, msg, exception_type=None):
    self.msg = msg
    self.exception_type = exception_type

  def MaybeThrow(self):
    if self.exception_type:
      raise getattr(__builtins__,
                    self.exception_type)('Originally caused by: ' + self.msg)


class _FuncWrapper(object):
  """Runs on the fork()'ed side to catch exceptions and spread *args."""

  def __init__(self, func):
    global _is_child_process
    _is_child_process = True
    self._func = func

  def __call__(self, index, _=None):
    try:
      return self._func(*_fork_params[index], **_fork_kwargs)
    except Exception as e:
      # Only keep the exception type for builtin exception types or else risk
      # further marshalling exceptions.
      exception_type = None
      if hasattr(__builtins__, type(e).__name__):
        exception_type = type(e).__name__
      # multiprocessing is supposed to catch and return exceptions automatically
      # but it doesn't seem to work properly :(.
      return _ExceptionWrapper(traceback.format_exc(), exception_type)
    except:  # pylint: disable=bare-except
      return _ExceptionWrapper(traceback.format_exc())


class _WrappedResult(object):
  """Allows for host-side logic to be run after child process has terminated.

  * Unregisters associated pool _all_pools.
  * Raises exception caught by _FuncWrapper.
  """

  def __init__(self, result, pool=None):
    self._result = result
    self._pool = pool

  def get(self):
    self.wait()
    value = self._result.get()
    _CheckForException(value)
    return value

  def wait(self):
    self._result.wait()
    if self._pool:
      _all_pools.remove(self._pool)
      self._pool = None

  def ready(self):
    return self._result.ready()

  def successful(self):
    return self._result.successful()


def _TerminatePools():
  """Calls .terminate() on all active process pools.

  Not supposed to be necessary according to the docs, but seems to be required
  when child process throws an exception or Ctrl-C is hit.
  """
  global _silence_exceptions
  _silence_exceptions = True
  # Child processes cannot have pools, but atexit runs this function because
  # it was registered before fork()ing.
  if _is_child_process:
    return

  def close_pool(pool):
    try:
      pool.terminate()
    except:  # pylint: disable=bare-except
      pass

  for i, pool in enumerate(_all_pools):
    # Without calling terminate() on a separate thread, the call can block
    # forever.
    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
                              target=close_pool,
                              args=(pool, ))
    thread.daemon = True
    thread.start()


def _CheckForException(value):
  if isinstance(value, _ExceptionWrapper):
    global _silence_exceptions
    if not _silence_exceptions:
      value.MaybeThrow()
      _silence_exceptions = True
      logging.error('Subprocess raised an exception:\n%s', value.msg)
    sys.exit(1)


def _MakeProcessPool(job_params, **job_kwargs):
  global _all_pools
  global _fork_params
  global _fork_kwargs
  assert _fork_params is None
  assert _fork_kwargs is None
  pool_size = min(len(job_params), multiprocessing.cpu_count())
  _fork_params = job_params
  _fork_kwargs = job_kwargs
  ret = multiprocessing.Pool(pool_size)
  _fork_params = None
  _fork_kwargs = None
  if _all_pools is None:
    _all_pools = []
    atexit.register(_TerminatePools)
  _all_pools.append(ret)
  return ret


def ForkAndCall(func, args):
  """Runs |func| in a fork'ed process.

  Returns:
    A Result object (call .get() to get the return value)
  """
  if DISABLE_ASYNC:
    pool = None
    result = _ImmediateResult(func(*args))
  else:
    pool = _MakeProcessPool([args])  # Omit |kwargs|.
    result = pool.apply_async(_FuncWrapper(func), (0, ))
    pool.close()
  return _WrappedResult(result, pool=pool)


def BulkForkAndCall(func, arg_tuples, **kwargs):
  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.

  Args:
    kwargs: Common keyword arguments to be passed to |func|.

  Yields the return values in order.
  """
  arg_tuples = list(arg_tuples)
  if not arg_tuples:
    return

  if DISABLE_ASYNC:
    for args in arg_tuples:
      yield func(*args, **kwargs)
    return

  pool = _MakeProcessPool(arg_tuples, **kwargs)
  wrapped_func = _FuncWrapper(func)
  try:
    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
      _CheckForException(result)
      yield result
  finally:
    pool.close()
    pool.join()
    _all_pools.remove(pool)