1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
|
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "WorkerThread.h"
#include <utility>
#include "WorkerPrivate.h"
#include "WorkerRunnable.h"
#include "mozilla/Assertions.h"
#include "mozilla/Atomics.h"
#include "mozilla/CycleCollectedJSContext.h"
#include "mozilla/EventQueue.h"
#include "mozilla/MacroForEach.h"
#include "mozilla/NotNull.h"
#include "mozilla/PerformanceCounter.h"
#include "mozilla/ThreadEventQueue.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/ipc/BackgroundChild.h"
#include "nsCOMPtr.h"
#include "nsDebug.h"
#include "nsICancelableRunnable.h"
#include "nsIEventTarget.h"
#include "nsIRunnable.h"
#include "nsIThreadInternal.h"
#include "nsString.h"
#include "prthread.h"
namespace mozilla {
using namespace ipc;
namespace dom {
namespace {
// The C stack size. We use the same stack size on all platforms for
// consistency.
//
// Note: Our typical equation of 256 machine words works out to 2MB on 64-bit
// platforms. Since that works out to the size of a VM huge page, that can
// sometimes lead to an OS allocating an entire huge page for the stack at once.
// To avoid this, we subtract the size of 2 pages, to be safe.
const uint32_t kWorkerStackSize = 256 * sizeof(size_t) * 1024 - 8192;
} // namespace
WorkerThreadFriendKey::WorkerThreadFriendKey() {
MOZ_COUNT_CTOR(WorkerThreadFriendKey);
}
WorkerThreadFriendKey::~WorkerThreadFriendKey() {
MOZ_COUNT_DTOR(WorkerThreadFriendKey);
}
class WorkerThread::Observer final : public nsIThreadObserver {
WorkerPrivate* mWorkerPrivate;
public:
explicit Observer(WorkerPrivate* aWorkerPrivate)
: mWorkerPrivate(aWorkerPrivate) {
MOZ_ASSERT(aWorkerPrivate);
aWorkerPrivate->AssertIsOnWorkerThread();
}
NS_DECL_THREADSAFE_ISUPPORTS
private:
~Observer() { mWorkerPrivate->AssertIsOnWorkerThread(); }
NS_DECL_NSITHREADOBSERVER
};
WorkerThread::WorkerThread(ConstructorKey)
: nsThread(
MakeNotNull<ThreadEventQueue*>(MakeUnique<mozilla::EventQueue>()),
nsThread::NOT_MAIN_THREAD, {.stackSize = kWorkerStackSize}),
mLock("WorkerThread::mLock"),
mWorkerPrivateCondVar(mLock, "WorkerThread::mWorkerPrivateCondVar"),
mWorkerPrivate(nullptr),
mOtherThreadsDispatchingViaEventTarget(0)
#ifdef DEBUG
,
mAcceptingNonWorkerRunnables(true)
#endif
{
}
WorkerThread::~WorkerThread() {
MOZ_ASSERT(!mWorkerPrivate);
MOZ_ASSERT(!mOtherThreadsDispatchingViaEventTarget);
MOZ_ASSERT(mAcceptingNonWorkerRunnables);
}
// static
SafeRefPtr<WorkerThread> WorkerThread::Create(
const WorkerThreadFriendKey& /* aKey */) {
SafeRefPtr<WorkerThread> thread =
MakeSafeRefPtr<WorkerThread>(ConstructorKey());
if (NS_FAILED(thread->Init("DOM Worker"_ns))) {
NS_WARNING("Failed to create new thread!");
return nullptr;
}
return thread;
}
void WorkerThread::SetWorker(const WorkerThreadFriendKey& /* aKey */,
WorkerPrivate* aWorkerPrivate) {
MOZ_ASSERT(PR_GetCurrentThread() == mThread);
if (aWorkerPrivate) {
{
MutexAutoLock lock(mLock);
MOZ_ASSERT(!mWorkerPrivate);
MOZ_ASSERT(mAcceptingNonWorkerRunnables);
mWorkerPrivate = aWorkerPrivate;
#ifdef DEBUG
mAcceptingNonWorkerRunnables = false;
#endif
}
mObserver = new Observer(aWorkerPrivate);
MOZ_ALWAYS_SUCCEEDS(AddObserver(mObserver));
} else {
MOZ_ALWAYS_SUCCEEDS(RemoveObserver(mObserver));
mObserver = nullptr;
{
MutexAutoLock lock(mLock);
MOZ_ASSERT(mWorkerPrivate);
MOZ_ASSERT(!mAcceptingNonWorkerRunnables);
// mOtherThreadsDispatchingViaEventTarget can still be non-zero here
// because WorkerThread::Dispatch isn't atomic so a thread initiating
// dispatch can have dispatched a runnable at this thread allowing us to
// begin shutdown before that thread gets a chance to decrement
// mOtherThreadsDispatchingViaEventTarget back to 0. So we need to wait
// for that.
while (mOtherThreadsDispatchingViaEventTarget) {
mWorkerPrivateCondVar.Wait();
}
#ifdef DEBUG
mAcceptingNonWorkerRunnables = true;
#endif
mWorkerPrivate = nullptr;
}
}
}
void WorkerThread::IncrementDispatchCounter() {
MutexAutoLock lock(mLock);
if (mWorkerPrivate) {
mWorkerPrivate->MutablePerformanceCounterRef().IncrementDispatchCounter(
DispatchCategory::Worker);
}
}
nsresult WorkerThread::DispatchPrimaryRunnable(
const WorkerThreadFriendKey& /* aKey */,
already_AddRefed<nsIRunnable> aRunnable) {
nsCOMPtr<nsIRunnable> runnable(aRunnable);
#ifdef DEBUG
MOZ_ASSERT(PR_GetCurrentThread() != mThread);
MOZ_ASSERT(runnable);
{
MutexAutoLock lock(mLock);
MOZ_ASSERT(!mWorkerPrivate);
MOZ_ASSERT(mAcceptingNonWorkerRunnables);
}
#endif
nsresult rv = nsThread::Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
nsresult WorkerThread::DispatchAnyThread(
const WorkerThreadFriendKey& /* aKey */,
already_AddRefed<WorkerRunnable> aWorkerRunnable) {
// May be called on any thread!
#ifdef DEBUG
{
const bool onWorkerThread = PR_GetCurrentThread() == mThread;
{
MutexAutoLock lock(mLock);
MOZ_ASSERT(mWorkerPrivate);
MOZ_ASSERT(!mAcceptingNonWorkerRunnables);
if (onWorkerThread) {
mWorkerPrivate->AssertIsOnWorkerThread();
}
}
}
#endif
// Increment the PerformanceCounter dispatch count
// to keep track of how many runnables are executed.
IncrementDispatchCounter();
nsCOMPtr<nsIRunnable> runnable(aWorkerRunnable);
nsresult rv = nsThread::Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
// We don't need to notify the worker's condition variable here because we're
// being called from worker-controlled code and it will make sure to wake up
// the worker thread if needed.
return NS_OK;
}
NS_IMETHODIMP
WorkerThread::DispatchFromScript(nsIRunnable* aRunnable, uint32_t aFlags) {
nsCOMPtr<nsIRunnable> runnable(aRunnable);
return Dispatch(runnable.forget(), aFlags);
}
NS_IMETHODIMP
WorkerThread::Dispatch(already_AddRefed<nsIRunnable> aRunnable,
uint32_t aFlags) {
// May be called on any thread!
nsCOMPtr<nsIRunnable> runnable(aRunnable); // in case we exit early
// Workers only support asynchronous dispatch.
if (NS_WARN_IF(aFlags != NS_DISPATCH_NORMAL)) {
return NS_ERROR_UNEXPECTED;
}
const bool onWorkerThread = PR_GetCurrentThread() == mThread;
#ifdef DEBUG
if (runnable && !onWorkerThread) {
nsCOMPtr<nsIDiscardableRunnable> discardable = do_QueryInterface(runnable);
{
MutexAutoLock lock(mLock);
// Only enforce discardable runnables after we've started the worker loop.
if (!mAcceptingNonWorkerRunnables) {
MOZ_ASSERT(
discardable,
"Only nsIDiscardableRunnable may be dispatched to a worker!");
}
}
}
#endif
WorkerPrivate* workerPrivate = nullptr;
if (onWorkerThread) {
// No need to lock here because it is only modified on this thread.
MOZ_ASSERT(mWorkerPrivate);
mWorkerPrivate->AssertIsOnWorkerThread();
workerPrivate = mWorkerPrivate;
} else {
MutexAutoLock lock(mLock);
MOZ_ASSERT(mOtherThreadsDispatchingViaEventTarget < UINT32_MAX);
if (mWorkerPrivate) {
workerPrivate = mWorkerPrivate;
// Incrementing this counter will make the worker thread sleep if it
// somehow tries to unset mWorkerPrivate while we're using it.
mOtherThreadsDispatchingViaEventTarget++;
}
}
// Increment the PerformanceCounter dispatch count
// to keep track of how many runnables are executed.
IncrementDispatchCounter();
nsresult rv;
if (runnable && onWorkerThread) {
RefPtr<WorkerRunnable> workerRunnable =
workerPrivate->MaybeWrapAsWorkerRunnable(runnable.forget());
rv = nsThread::Dispatch(workerRunnable.forget(), NS_DISPATCH_NORMAL);
} else {
rv = nsThread::Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
}
if (!onWorkerThread && workerPrivate) {
// We need to wake the worker thread if we're not already on the right
// thread and the dispatch succeeded.
if (NS_SUCCEEDED(rv)) {
MutexAutoLock workerLock(workerPrivate->mMutex);
workerPrivate->mCondVar.Notify();
}
// Now unset our waiting flag.
{
MutexAutoLock lock(mLock);
MOZ_ASSERT(mOtherThreadsDispatchingViaEventTarget);
if (!--mOtherThreadsDispatchingViaEventTarget) {
mWorkerPrivateCondVar.Notify();
}
}
}
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
NS_IMETHODIMP
WorkerThread::DelayedDispatch(already_AddRefed<nsIRunnable>, uint32_t) {
return NS_ERROR_NOT_IMPLEMENTED;
}
uint32_t WorkerThread::RecursionDepth(
const WorkerThreadFriendKey& /* aKey */) const {
MOZ_ASSERT(PR_GetCurrentThread() == mThread);
return mNestedEventLoopDepth;
}
PerformanceCounter* WorkerThread::GetPerformanceCounter(nsIRunnable*) const {
return mWorkerPrivate ? &mWorkerPrivate->MutablePerformanceCounterRef()
: nullptr;
}
NS_IMPL_ISUPPORTS(WorkerThread::Observer, nsIThreadObserver)
NS_IMETHODIMP
WorkerThread::Observer::OnDispatchedEvent() {
MOZ_CRASH("OnDispatchedEvent() should never be called!");
}
NS_IMETHODIMP
WorkerThread::Observer::OnProcessNextEvent(nsIThreadInternal* /* aThread */,
bool aMayWait) {
mWorkerPrivate->AssertIsOnWorkerThread();
// If the PBackground child is not created yet, then we must permit
// blocking event processing to support
// BackgroundChild::GetOrCreateCreateForCurrentThread(). If this occurs
// then we are spinning on the event queue at the start of
// PrimaryWorkerRunnable::Run() and don't want to process the event in
// mWorkerPrivate yet.
if (aMayWait) {
MOZ_ASSERT(CycleCollectedJSContext::Get()->RecursionDepth() == 2);
MOZ_ASSERT(!BackgroundChild::GetForCurrentThread());
return NS_OK;
}
mWorkerPrivate->OnProcessNextEvent();
return NS_OK;
}
NS_IMETHODIMP
WorkerThread::Observer::AfterProcessNextEvent(nsIThreadInternal* /* aThread */,
bool /* aEventWasProcessed */) {
mWorkerPrivate->AssertIsOnWorkerThread();
mWorkerPrivate->AfterProcessNextEvent();
return NS_OK;
}
} // namespace dom
} // namespace mozilla
|