diff options
Diffstat (limited to '')
-rw-r--r-- | xpcom/threads/TaskQueue.cpp | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/xpcom/threads/TaskQueue.cpp b/xpcom/threads/TaskQueue.cpp new file mode 100644 index 0000000000..9c2019de95 --- /dev/null +++ b/xpcom/threads/TaskQueue.cpp @@ -0,0 +1,347 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/TaskQueue.h" + +#include "mozilla/DelayedRunnable.h" +#include "mozilla/ProfilerRunnable.h" +#include "nsIEventTarget.h" +#include "nsITargetShutdownTask.h" +#include "nsThreadUtils.h" +#include "nsQueryObject.h" + +namespace mozilla { + +// Handle for a TaskQueue being tracked by a TaskQueueTracker. When created, +// it is registered with the TaskQueueTracker, and when destroyed it is +// unregistered. Holds a threadsafe weak reference to the TaskQueue. +class TaskQueueTrackerEntry final + : private LinkedListElement<TaskQueueTrackerEntry> { + public: + TaskQueueTrackerEntry(TaskQueueTracker* aTracker, + const RefPtr<TaskQueue>& aQueue) + : mTracker(aTracker), mQueue(aQueue) { + MutexAutoLock lock(mTracker->mMutex); + mTracker->mEntries.insertFront(this); + } + ~TaskQueueTrackerEntry() { + MutexAutoLock lock(mTracker->mMutex); + removeFrom(mTracker->mEntries); + } + + TaskQueueTrackerEntry(const TaskQueueTrackerEntry&) = delete; + TaskQueueTrackerEntry(TaskQueueTrackerEntry&&) = delete; + TaskQueueTrackerEntry& operator=(const TaskQueueTrackerEntry&) = delete; + TaskQueueTrackerEntry& operator=(TaskQueueTrackerEntry&&) = delete; + + RefPtr<TaskQueue> GetQueue() const { return RefPtr<TaskQueue>(mQueue); } + + private: + friend class LinkedList<TaskQueueTrackerEntry>; + friend class LinkedListElement<TaskQueueTrackerEntry>; + + const RefPtr<TaskQueueTracker> mTracker; + const ThreadSafeWeakPtr<TaskQueue> mQueue; +}; + +RefPtr<TaskQueue> TaskQueue::Create(already_AddRefed<nsIEventTarget> aTarget, + const char* aName, + bool aSupportsTailDispatch) { + nsCOMPtr<nsIEventTarget> target(std::move(aTarget)); + RefPtr<TaskQueue> queue = + new TaskQueue(do_AddRef(target), aName, aSupportsTailDispatch); + + // If |target| is a TaskQueueTracker, register this TaskQueue with it. It will + // be unregistered when the TaskQueue is destroyed or shut down. + if (RefPtr<TaskQueueTracker> tracker = do_QueryObject(target)) { + MonitorAutoLock lock(queue->mQueueMonitor); + queue->mTrackerEntry = MakeUnique<TaskQueueTrackerEntry>(tracker, queue); + } + + return queue; +} + +TaskQueue::TaskQueue(already_AddRefed<nsIEventTarget> aTarget, + const char* aName, bool aSupportsTailDispatch) + : AbstractThread(aSupportsTailDispatch), + mTarget(aTarget), + mQueueMonitor("TaskQueue::Queue"), + mTailDispatcher(nullptr), + mIsRunning(false), + mIsShutdown(false), + mName(aName) {} + +TaskQueue::~TaskQueue() { + // We should never free the TaskQueue if it was destroyed abnormally, meaning + // that all cleanup tasks should be complete if we do. + MOZ_ASSERT(mShutdownTasks.IsEmpty()); +} + +NS_IMPL_ADDREF_INHERITED(TaskQueue, SupportsThreadSafeWeakPtr<TaskQueue>) +NS_IMPL_RELEASE_INHERITED(TaskQueue, SupportsThreadSafeWeakPtr<TaskQueue>) +NS_IMPL_QUERY_INTERFACE(TaskQueue, nsIDirectTaskDispatcher, + nsISerialEventTarget, nsIEventTarget) + +TaskDispatcher& TaskQueue::TailDispatcher() { + MOZ_ASSERT(IsCurrentThreadIn()); + MOZ_ASSERT(mTailDispatcher); + return *mTailDispatcher; +} + +// Note aRunnable is passed by ref to support conditional ownership transfer. +// See Dispatch() in TaskQueue.h for more details. +nsresult TaskQueue::DispatchLocked(nsCOMPtr<nsIRunnable>& aRunnable, + uint32_t aFlags, DispatchReason aReason) { + mQueueMonitor.AssertCurrentThreadOwns(); + + // Continue to allow dispatches after shutdown until the last message has been + // processed, at which point no more messages will be accepted. + if (mIsShutdown && !mIsRunning) { + return NS_ERROR_UNEXPECTED; + } + + AbstractThread* currentThread; + if (aReason != TailDispatch && (currentThread = GetCurrent()) && + RequiresTailDispatch(currentThread) && + currentThread->IsTailDispatcherAvailable()) { + MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL, + "Tail dispatch doesn't support flags"); + return currentThread->TailDispatcher().AddTask(this, aRunnable.forget()); + } + + LogRunnable::LogDispatch(aRunnable); + mTasks.Push({std::move(aRunnable), aFlags}); + + if (mIsRunning) { + return NS_OK; + } + RefPtr<nsIRunnable> runner(new Runner(this)); + nsresult rv = mTarget->Dispatch(runner.forget(), aFlags); + if (NS_FAILED(rv)) { + NS_WARNING("Failed to dispatch runnable to run TaskQueue"); + return rv; + } + mIsRunning = true; + + return NS_OK; +} + +nsresult TaskQueue::RegisterShutdownTask(nsITargetShutdownTask* aTask) { + NS_ENSURE_ARG(aTask); + + MonitorAutoLock mon(mQueueMonitor); + if (mIsShutdown) { + return NS_ERROR_UNEXPECTED; + } + + MOZ_ASSERT(!mShutdownTasks.Contains(aTask)); + mShutdownTasks.AppendElement(aTask); + return NS_OK; +} + +nsresult TaskQueue::UnregisterShutdownTask(nsITargetShutdownTask* aTask) { + NS_ENSURE_ARG(aTask); + + MonitorAutoLock mon(mQueueMonitor); + if (mIsShutdown) { + return NS_ERROR_UNEXPECTED; + } + + return mShutdownTasks.RemoveElement(aTask) ? NS_OK : NS_ERROR_UNEXPECTED; +} + +void TaskQueue::AwaitIdle() { + MonitorAutoLock mon(mQueueMonitor); + AwaitIdleLocked(); +} + +void TaskQueue::AwaitIdleLocked() { + // Make sure there are no tasks for this queue waiting in the caller's tail + // dispatcher. + MOZ_ASSERT_IF(AbstractThread::GetCurrent(), + !AbstractThread::GetCurrent()->HasTailTasksFor(this)); + + mQueueMonitor.AssertCurrentThreadOwns(); + MOZ_ASSERT(mIsRunning || mTasks.IsEmpty()); + while (mIsRunning) { + mQueueMonitor.Wait(); + } +} + +void TaskQueue::AwaitShutdownAndIdle() { + MOZ_ASSERT(!IsCurrentThreadIn()); + // Make sure there are no tasks for this queue waiting in the caller's tail + // dispatcher. + MOZ_ASSERT_IF(AbstractThread::GetCurrent(), + !AbstractThread::GetCurrent()->HasTailTasksFor(this)); + + MonitorAutoLock mon(mQueueMonitor); + while (!mIsShutdown) { + mQueueMonitor.Wait(); + } + AwaitIdleLocked(); +} +RefPtr<ShutdownPromise> TaskQueue::BeginShutdown() { + // Dispatch any tasks for this queue waiting in the caller's tail dispatcher, + // since this is the last opportunity to do so. + if (AbstractThread* currentThread = AbstractThread::GetCurrent()) { + currentThread->TailDispatchTasksFor(this); + } + + MonitorAutoLock mon(mQueueMonitor); + // Dispatch any cleanup tasks to the queue before we put it into full + // shutdown. + for (auto& task : mShutdownTasks) { + nsCOMPtr runnable{task->AsRunnable()}; + MOZ_ALWAYS_SUCCEEDS( + DispatchLocked(runnable, NS_DISPATCH_NORMAL, TailDispatch)); + } + mShutdownTasks.Clear(); + mIsShutdown = true; + + RefPtr<ShutdownPromise> p = mShutdownPromise.Ensure(__func__); + MaybeResolveShutdown(); + mon.NotifyAll(); + return p; +} + +void TaskQueue::MaybeResolveShutdown() { + mQueueMonitor.AssertCurrentThreadOwns(); + if (mIsShutdown && !mIsRunning) { + mShutdownPromise.ResolveIfExists(true, __func__); + // Disconnect from our target as we won't try to dispatch any more events. + mTrackerEntry = nullptr; + mTarget = nullptr; + } +} + +bool TaskQueue::IsEmpty() { + MonitorAutoLock mon(mQueueMonitor); + return mTasks.IsEmpty(); +} + +bool TaskQueue::IsCurrentThreadIn() const { + bool in = mRunningThread == PR_GetCurrentThread(); + return in; +} + +nsresult TaskQueue::Runner::Run() { + TaskStruct event; + { + MonitorAutoLock mon(mQueue->mQueueMonitor); + MOZ_ASSERT(mQueue->mIsRunning); + if (mQueue->mTasks.IsEmpty()) { + mQueue->mIsRunning = false; + mQueue->MaybeResolveShutdown(); + mon.NotifyAll(); + return NS_OK; + } + event = std::move(mQueue->mTasks.FirstElement()); + mQueue->mTasks.Pop(); + } + MOZ_ASSERT(event.event); + + // Note that dropping the queue monitor before running the task, and + // taking the monitor again after the task has run ensures we have memory + // fences enforced. This means that if the object we're calling wasn't + // designed to be threadsafe, it will be, provided we're only calling it + // in this task queue. + { + AutoTaskGuard g(mQueue); + SerialEventTargetGuard tg(mQueue); + { + LogRunnable::Run log(event.event); + + AUTO_PROFILE_FOLLOWING_RUNNABLE(event.event); + event.event->Run(); + + // Drop the reference to event. The event will hold a reference to the + // object it's calling, and we don't want to keep it alive, it may be + // making assumptions what holds references to it. This is especially + // the case if the object is waiting for us to shutdown, so that it + // can shutdown (like in the MediaDecoderStateMachine's SHUTDOWN case). + event.event = nullptr; + } + } + + { + MonitorAutoLock mon(mQueue->mQueueMonitor); + if (mQueue->mTasks.IsEmpty()) { + // No more events to run. Exit the task runner. + mQueue->mIsRunning = false; + mQueue->MaybeResolveShutdown(); + mon.NotifyAll(); + return NS_OK; + } + } + + // There's at least one more event that we can run. Dispatch this Runner + // to the target again to ensure it runs again. Note that we don't just + // run in a loop here so that we don't hog the target. This means we may + // run on another thread next time, but we rely on the memory fences from + // mQueueMonitor for thread safety of non-threadsafe tasks. + nsresult rv; + { + MonitorAutoLock mon(mQueue->mQueueMonitor); + rv = mQueue->mTarget->Dispatch( + this, mQueue->mTasks.FirstElement().flags | NS_DISPATCH_AT_END); + } + if (NS_FAILED(rv)) { + // Failed to dispatch, shutdown! + MonitorAutoLock mon(mQueue->mQueueMonitor); + mQueue->mIsRunning = false; + mQueue->mIsShutdown = true; + mQueue->MaybeResolveShutdown(); + mon.NotifyAll(); + } + + return NS_OK; +} + +//----------------------------------------------------------------------------- +// nsIDirectTaskDispatcher +//----------------------------------------------------------------------------- + +NS_IMETHODIMP +TaskQueue::DispatchDirectTask(already_AddRefed<nsIRunnable> aEvent) { + if (!IsCurrentThreadIn()) { + return NS_ERROR_FAILURE; + } + mDirectTasks.AddTask(std::move(aEvent)); + return NS_OK; +} + +NS_IMETHODIMP TaskQueue::DrainDirectTasks() { + if (!IsCurrentThreadIn()) { + return NS_ERROR_FAILURE; + } + mDirectTasks.DrainTasks(); + return NS_OK; +} + +NS_IMETHODIMP TaskQueue::HaveDirectTasks(bool* aValue) { + if (!IsCurrentThreadIn()) { + return NS_ERROR_FAILURE; + } + + *aValue = mDirectTasks.HaveTasks(); + return NS_OK; +} + +nsTArray<RefPtr<TaskQueue>> TaskQueueTracker::GetAllTrackedTaskQueues() { + MutexAutoLock lock(mMutex); + nsTArray<RefPtr<TaskQueue>> queues; + for (auto* entry : mEntries) { + if (auto queue = entry->GetQueue()) { + queues.AppendElement(queue); + } + } + return queues; +} + +TaskQueueTracker::~TaskQueueTracker() = default; + +} // namespace mozilla |