diff options
Diffstat (limited to 'xpcom/threads/TaskDispatcher.h')
-rw-r--r-- | xpcom/threads/TaskDispatcher.h | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/xpcom/threads/TaskDispatcher.h b/xpcom/threads/TaskDispatcher.h new file mode 100644 index 0000000000..1f27c32c7d --- /dev/null +++ b/xpcom/threads/TaskDispatcher.h @@ -0,0 +1,304 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#if !defined(TaskDispatcher_h_) +# define TaskDispatcher_h_ + +# include <queue> + +# include "mozilla/AbstractThread.h" +# include "mozilla/Maybe.h" +# include "mozilla/ProfilerRunnable.h" +# include "mozilla/UniquePtr.h" +# include "nsIDirectTaskDispatcher.h" +# include "nsISupportsImpl.h" +# include "nsTArray.h" +# include "nsThreadUtils.h" + +namespace mozilla { + +class SimpleTaskQueue { + public: + SimpleTaskQueue() = default; + virtual ~SimpleTaskQueue() = default; + + void AddTask(already_AddRefed<nsIRunnable> aRunnable) { + if (!mTasks) { + mTasks.emplace(); + } + mTasks->push(std::move(aRunnable)); + } + + void DrainTasks() { + if (!mTasks) { + return; + } + auto& queue = mTasks.ref(); + while (!queue.empty()) { + nsCOMPtr<nsIRunnable> r = std::move(queue.front()); + queue.pop(); + AUTO_PROFILE_FOLLOWING_RUNNABLE(r); + r->Run(); + } + } + + bool HaveTasks() const { return mTasks && !mTasks->empty(); } + + private: + // We use a Maybe<> because (a) when used for DirectTasks it often doesn't get + // anything put into it, and (b) the std::queue implementation in GNU + // libstdc++ does two largish heap allocations when creating a new std::queue. + Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mTasks; +}; + +/* + * A classic approach to cross-thread communication is to dispatch asynchronous + * runnables to perform updates on other threads. This generally works well, but + * there are sometimes reasons why we might want to delay the actual dispatch of + * these tasks until a specified moment. At present, this is primarily useful to + * ensure that mirrored state gets updated atomically - but there may be other + * applications as well. + * + * TaskDispatcher is a general abstract class that accepts tasks and dispatches + * them at some later point. These groups of tasks are per-target-thread, and + * contain separate queues for several kinds of tasks (see comments below). - + * "state change tasks" (which run first, and are intended to be used to update + * the value held by mirrors), and regular tasks, which are other arbitrary + * operations that the are gated to run after all the state changes have + * completed. + */ +class TaskDispatcher { + public: + TaskDispatcher() = default; + virtual ~TaskDispatcher() = default; + + // Direct tasks are run directly (rather than dispatched asynchronously) when + // the tail dispatcher fires. A direct task may cause other tasks to be added + // to the tail dispatcher. + virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0; + + // State change tasks are dispatched asynchronously always run before regular + // tasks. They are intended to be used to update the value held by mirrors + // before any other dispatched tasks are run on the target thread. + virtual void AddStateChangeTask(AbstractThread* aThread, + already_AddRefed<nsIRunnable> aRunnable) = 0; + + // Regular tasks are dispatched asynchronously, and run after state change + // tasks. + virtual nsresult AddTask(AbstractThread* aThread, + already_AddRefed<nsIRunnable> aRunnable) = 0; + + virtual nsresult DispatchTasksFor(AbstractThread* aThread) = 0; + virtual bool HasTasksFor(AbstractThread* aThread) = 0; + virtual void DrainDirectTasks() = 0; +}; + +/* + * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires + * its queued tasks when it is popped off the stack. + */ +class AutoTaskDispatcher : public TaskDispatcher { + public: + explicit AutoTaskDispatcher(nsIDirectTaskDispatcher* aDirectTaskDispatcher, + bool aIsTailDispatcher = false) + : mDirectTaskDispatcher(aDirectTaskDispatcher), + mIsTailDispatcher(aIsTailDispatcher) {} + + ~AutoTaskDispatcher() { + // Given that direct tasks may trigger other code that uses the tail + // dispatcher, it's better to avoid processing them in the tail dispatcher's + // destructor. So we require TailDispatchers to manually invoke + // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth, + // this is only necessary in the case where this AutoTaskDispatcher can be + // accessed by the direct tasks it dispatches (true for TailDispatchers, but + // potentially not true for other hypothetical AutoTaskDispatchers). Feel + // free to loosen this restriction to apply only to mIsTailDispatcher if a + // use-case requires it. + MOZ_ASSERT(!HaveDirectTasks()); + + for (size_t i = 0; i < mTaskGroups.Length(); ++i) { + DispatchTaskGroup(std::move(mTaskGroups[i])); + } + } + + bool HaveDirectTasks() { + return mDirectTaskDispatcher && mDirectTaskDispatcher->HaveDirectTasks(); + } + + void DrainDirectTasks() override { + if (mDirectTaskDispatcher) { + mDirectTaskDispatcher->DrainDirectTasks(); + } + } + + void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override { + MOZ_ASSERT(mDirectTaskDispatcher); + mDirectTaskDispatcher->DispatchDirectTask(std::move(aRunnable)); + } + + void AddStateChangeTask(AbstractThread* aThread, + already_AddRefed<nsIRunnable> aRunnable) override { + nsCOMPtr<nsIRunnable> r = aRunnable; + MOZ_RELEASE_ASSERT(r); + EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget()); + } + + nsresult AddTask(AbstractThread* aThread, + already_AddRefed<nsIRunnable> aRunnable) override { + nsCOMPtr<nsIRunnable> r = aRunnable; + MOZ_RELEASE_ASSERT(r); + // To preserve the event order, we need to append a new group if the last + // group is not targeted for |aThread|. + // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0 + // for the details of the issue. + if (mTaskGroups.Length() == 0 || + mTaskGroups.LastElement()->mThread != aThread) { + mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); + } + + PerThreadTaskGroup& group = *mTaskGroups.LastElement(); + group.mRegularTasks.AppendElement(r.forget()); + + return NS_OK; + } + + bool HasTasksFor(AbstractThread* aThread) override { + return !!GetTaskGroup(aThread) || + (aThread == AbstractThread::GetCurrent() && HaveDirectTasks()); + } + + nsresult DispatchTasksFor(AbstractThread* aThread) override { + nsresult rv = NS_OK; + + // Dispatch all groups that match |aThread|. + for (size_t i = 0; i < mTaskGroups.Length(); ++i) { + if (mTaskGroups[i]->mThread == aThread) { + nsresult rv2 = DispatchTaskGroup(std::move(mTaskGroups[i])); + + if (NS_WARN_IF(NS_FAILED(rv2)) && NS_SUCCEEDED(rv)) { + // We should try our best to call DispatchTaskGroup() as much as + // possible and return an error if any of DispatchTaskGroup() calls + // failed. + rv = rv2; + } + + mTaskGroups.RemoveElementAt(i--); + } + } + + return rv; + } + + private: + struct PerThreadTaskGroup { + public: + explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread) { + MOZ_COUNT_CTOR(PerThreadTaskGroup); + } + + MOZ_COUNTED_DTOR(PerThreadTaskGroup) + + RefPtr<AbstractThread> mThread; + nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks; + nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks; + }; + + class TaskGroupRunnable : public Runnable { + public: + explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks) + : Runnable("AutoTaskDispatcher::TaskGroupRunnable"), + mTasks(std::move(aTasks)) {} + + NS_IMETHOD Run() override { + // State change tasks get run all together before any code is run, so + // that all state changes are made in an atomic unit. + for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) { + mTasks->mStateChangeTasks[i]->Run(); + } + + // Once the state changes have completed, drain any direct tasks + // generated by those state changes (i.e. watcher notification tasks). + // This needs to be outside the loop because we don't want to run code + // that might observe intermediate states. + MaybeDrainDirectTasks(); + + for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) { + AUTO_PROFILE_FOLLOWING_RUNNABLE(mTasks->mRegularTasks[i]); + mTasks->mRegularTasks[i]->Run(); + + // Scope direct tasks tightly to the task that generated them. + MaybeDrainDirectTasks(); + } + + return NS_OK; + } + + private: + void MaybeDrainDirectTasks() { + AbstractThread* currentThread = AbstractThread::GetCurrent(); + if (currentThread && currentThread->MightHaveTailTasks()) { + currentThread->TailDispatcher().DrainDirectTasks(); + } + } + + UniquePtr<PerThreadTaskGroup> mTasks; + }; + + PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) { + PerThreadTaskGroup* existing = GetTaskGroup(aThread); + if (existing) { + return *existing; + } + + mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); + return *mTaskGroups.LastElement(); + } + + PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) { + for (size_t i = 0; i < mTaskGroups.Length(); ++i) { + if (mTaskGroups[i]->mThread == aThread) { + return mTaskGroups[i].get(); + } + } + + // Not found. + return nullptr; + } + + nsresult DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup) { + RefPtr<AbstractThread> thread = aGroup->mThread; + + AbstractThread::DispatchReason reason = + mIsTailDispatcher ? AbstractThread::TailDispatch + : AbstractThread::NormalDispatch; + nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(std::move(aGroup)); + return thread->Dispatch(r.forget(), reason); + } + + // Task groups, organized by thread. + nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups; + + nsCOMPtr<nsIDirectTaskDispatcher> mDirectTaskDispatcher; + // True if this TaskDispatcher represents the tail dispatcher for the thread + // upon which it runs. + const bool mIsTailDispatcher; +}; + +// Little utility class to allow declaring AutoTaskDispatcher as a default +// parameter for methods that take a TaskDispatcher&. +template <typename T> +class PassByRef { + public: + PassByRef() = default; + operator T&() { return mVal; } + + private: + T mVal; +}; + +} // namespace mozilla + +#endif |