/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/TaskQueue.h" #include "mozilla/ProfilerRunnable.h" #include "nsIEventTarget.h" #include "nsITargetShutdownTask.h" #include "nsThreadUtils.h" #include "nsQueryObject.h" namespace mozilla { // Handle for a TaskQueue being tracked by a TaskQueueTracker. When created, // it is registered with the TaskQueueTracker, and when destroyed it is // unregistered. Holds a threadsafe weak reference to the TaskQueue. class TaskQueueTrackerEntry final : private LinkedListElement { public: TaskQueueTrackerEntry(TaskQueueTracker* aTracker, const RefPtr& aQueue) : mTracker(aTracker), mQueue(aQueue) { MutexAutoLock lock(mTracker->mMutex); mTracker->mEntries.insertFront(this); } ~TaskQueueTrackerEntry() { MutexAutoLock lock(mTracker->mMutex); removeFrom(mTracker->mEntries); } TaskQueueTrackerEntry(const TaskQueueTrackerEntry&) = delete; TaskQueueTrackerEntry(TaskQueueTrackerEntry&&) = delete; TaskQueueTrackerEntry& operator=(const TaskQueueTrackerEntry&) = delete; TaskQueueTrackerEntry& operator=(TaskQueueTrackerEntry&&) = delete; RefPtr GetQueue() const { return RefPtr(mQueue); } private: friend class LinkedList; friend class LinkedListElement; const RefPtr mTracker; const ThreadSafeWeakPtr mQueue; }; RefPtr TaskQueue::Create(already_AddRefed aTarget, const char* aName, bool aSupportsTailDispatch) { nsCOMPtr target(std::move(aTarget)); RefPtr queue = new TaskQueue(do_AddRef(target), aName, aSupportsTailDispatch); // If |target| is a TaskQueueTracker, register this TaskQueue with it. It will // be unregistered when the TaskQueue is destroyed or shut down. if (RefPtr tracker = do_QueryObject(target)) { MonitorAutoLock lock(queue->mQueueMonitor); queue->mTrackerEntry = MakeUnique(tracker, queue); } return queue; } TaskQueue::TaskQueue(already_AddRefed aTarget, const char* aName, bool aSupportsTailDispatch) : AbstractThread(aSupportsTailDispatch), mTarget(aTarget), mQueueMonitor("TaskQueue::Queue"), mTailDispatcher(nullptr), mIsRunning(false), mIsShutdown(false), mName(aName) {} TaskQueue::~TaskQueue() { // We should never free the TaskQueue if it was destroyed abnormally, meaning // that all cleanup tasks should be complete if we do. MOZ_ASSERT(mShutdownTasks.IsEmpty()); } NS_IMPL_ADDREF_INHERITED(TaskQueue, SupportsThreadSafeWeakPtr) NS_IMPL_RELEASE_INHERITED(TaskQueue, SupportsThreadSafeWeakPtr) NS_IMPL_QUERY_INTERFACE(TaskQueue, nsIDirectTaskDispatcher, nsISerialEventTarget, nsIEventTarget) TaskDispatcher& TaskQueue::TailDispatcher() { MOZ_ASSERT(IsCurrentThreadIn()); MOZ_ASSERT(mTailDispatcher); return *mTailDispatcher; } // Note aRunnable is passed by ref to support conditional ownership transfer. // See Dispatch() in TaskQueue.h for more details. nsresult TaskQueue::DispatchLocked(nsCOMPtr& aRunnable, uint32_t aFlags, DispatchReason aReason) { mQueueMonitor.AssertCurrentThreadOwns(); // Continue to allow dispatches after shutdown until the last message has been // processed, at which point no more messages will be accepted. if (mIsShutdown && !mIsRunning) { return NS_ERROR_UNEXPECTED; } AbstractThread* currentThread; if (aReason != TailDispatch && (currentThread = GetCurrent()) && RequiresTailDispatch(currentThread) && currentThread->IsTailDispatcherAvailable()) { MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL, "Tail dispatch doesn't support flags"); return currentThread->TailDispatcher().AddTask(this, aRunnable.forget()); } LogRunnable::LogDispatch(aRunnable); mTasks.Push({std::move(aRunnable), aFlags}); if (mIsRunning) { return NS_OK; } RefPtr runner(new Runner(this)); nsresult rv = mTarget->Dispatch(runner.forget(), aFlags); if (NS_FAILED(rv)) { NS_WARNING("Failed to dispatch runnable to run TaskQueue"); return rv; } mIsRunning = true; return NS_OK; } nsresult TaskQueue::RegisterShutdownTask(nsITargetShutdownTask* aTask) { NS_ENSURE_ARG(aTask); MonitorAutoLock mon(mQueueMonitor); if (mIsShutdown) { return NS_ERROR_UNEXPECTED; } MOZ_ASSERT(!mShutdownTasks.Contains(aTask)); mShutdownTasks.AppendElement(aTask); return NS_OK; } nsresult TaskQueue::UnregisterShutdownTask(nsITargetShutdownTask* aTask) { NS_ENSURE_ARG(aTask); MonitorAutoLock mon(mQueueMonitor); if (mIsShutdown) { return NS_ERROR_UNEXPECTED; } return mShutdownTasks.RemoveElement(aTask) ? NS_OK : NS_ERROR_UNEXPECTED; } void TaskQueue::AwaitIdle() { MonitorAutoLock mon(mQueueMonitor); AwaitIdleLocked(); } void TaskQueue::AwaitIdleLocked() { // Make sure there are no tasks for this queue waiting in the caller's tail // dispatcher. MOZ_ASSERT_IF(AbstractThread::GetCurrent(), !AbstractThread::GetCurrent()->HasTailTasksFor(this)); mQueueMonitor.AssertCurrentThreadOwns(); MOZ_ASSERT(mIsRunning || mTasks.IsEmpty()); while (mIsRunning) { mQueueMonitor.Wait(); } } void TaskQueue::AwaitShutdownAndIdle() { MOZ_ASSERT(!IsCurrentThreadIn()); // Make sure there are no tasks for this queue waiting in the caller's tail // dispatcher. MOZ_ASSERT_IF(AbstractThread::GetCurrent(), !AbstractThread::GetCurrent()->HasTailTasksFor(this)); MonitorAutoLock mon(mQueueMonitor); while (!mIsShutdown) { mQueueMonitor.Wait(); } AwaitIdleLocked(); } RefPtr TaskQueue::BeginShutdown() { // Dispatch any tasks for this queue waiting in the caller's tail dispatcher, // since this is the last opportunity to do so. if (AbstractThread* currentThread = AbstractThread::GetCurrent()) { currentThread->TailDispatchTasksFor(this); } MonitorAutoLock mon(mQueueMonitor); // Dispatch any cleanup tasks to the queue before we put it into full // shutdown. for (auto& task : mShutdownTasks) { nsCOMPtr runnable{task->AsRunnable()}; MOZ_ALWAYS_SUCCEEDS( DispatchLocked(runnable, NS_DISPATCH_NORMAL, TailDispatch)); } mShutdownTasks.Clear(); mIsShutdown = true; RefPtr p = mShutdownPromise.Ensure(__func__); MaybeResolveShutdown(); mon.NotifyAll(); return p; } void TaskQueue::MaybeResolveShutdown() { mQueueMonitor.AssertCurrentThreadOwns(); if (mIsShutdown && !mIsRunning) { mShutdownPromise.ResolveIfExists(true, __func__); // Disconnect from our target as we won't try to dispatch any more events. mTrackerEntry = nullptr; mTarget = nullptr; } } bool TaskQueue::IsEmpty() { MonitorAutoLock mon(mQueueMonitor); return mTasks.IsEmpty(); } bool TaskQueue::IsCurrentThreadIn() const { bool in = mRunningThread == PR_GetCurrentThread(); return in; } nsresult TaskQueue::Runner::Run() { TaskStruct event; { MonitorAutoLock mon(mQueue->mQueueMonitor); MOZ_ASSERT(mQueue->mIsRunning); if (mQueue->mTasks.IsEmpty()) { mQueue->mIsRunning = false; mQueue->MaybeResolveShutdown(); mon.NotifyAll(); return NS_OK; } event = mQueue->mTasks.Pop(); } MOZ_ASSERT(event.event); // Note that dropping the queue monitor before running the task, and // taking the monitor again after the task has run ensures we have memory // fences enforced. This means that if the object we're calling wasn't // designed to be threadsafe, it will be, provided we're only calling it // in this task queue. { AutoTaskGuard g(mQueue); SerialEventTargetGuard tg(mQueue); { LogRunnable::Run log(event.event); AUTO_PROFILE_FOLLOWING_RUNNABLE(event.event); event.event->Run(); // Drop the reference to event. The event will hold a reference to the // object it's calling, and we don't want to keep it alive, it may be // making assumptions what holds references to it. This is especially // the case if the object is waiting for us to shutdown, so that it // can shutdown (like in the MediaDecoderStateMachine's SHUTDOWN case). event.event = nullptr; } } { MonitorAutoLock mon(mQueue->mQueueMonitor); if (mQueue->mTasks.IsEmpty()) { // No more events to run. Exit the task runner. mQueue->mIsRunning = false; mQueue->MaybeResolveShutdown(); mon.NotifyAll(); return NS_OK; } } // There's at least one more event that we can run. Dispatch this Runner // to the target again to ensure it runs again. Note that we don't just // run in a loop here so that we don't hog the target. This means we may // run on another thread next time, but we rely on the memory fences from // mQueueMonitor for thread safety of non-threadsafe tasks. nsresult rv; { MonitorAutoLock mon(mQueue->mQueueMonitor); rv = mQueue->mTarget->Dispatch( this, mQueue->mTasks.FirstElement().flags | NS_DISPATCH_AT_END); } if (NS_FAILED(rv)) { // Failed to dispatch, shutdown! MonitorAutoLock mon(mQueue->mQueueMonitor); mQueue->mIsRunning = false; mQueue->mIsShutdown = true; mQueue->MaybeResolveShutdown(); mon.NotifyAll(); } return NS_OK; } //----------------------------------------------------------------------------- // nsIDirectTaskDispatcher //----------------------------------------------------------------------------- NS_IMETHODIMP TaskQueue::DispatchDirectTask(already_AddRefed aEvent) { if (!IsCurrentThreadIn()) { return NS_ERROR_FAILURE; } mDirectTasks.AddTask(std::move(aEvent)); return NS_OK; } NS_IMETHODIMP TaskQueue::DrainDirectTasks() { if (!IsCurrentThreadIn()) { return NS_ERROR_FAILURE; } mDirectTasks.DrainTasks(); return NS_OK; } NS_IMETHODIMP TaskQueue::HaveDirectTasks(bool* aValue) { if (!IsCurrentThreadIn()) { return NS_ERROR_FAILURE; } *aValue = mDirectTasks.HaveTasks(); return NS_OK; } nsTArray> TaskQueueTracker::GetAllTrackedTaskQueues() { MutexAutoLock lock(mMutex); nsTArray> queues; for (auto* entry : mEntries) { if (auto queue = entry->GetQueue()) { queues.AppendElement(queue); } } return queues; } TaskQueueTracker::~TaskQueueTracker() = default; } // namespace mozilla