diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:44:51 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:44:51 +0000 |
commit | 9e3c08db40b8916968b9f30096c7be3f00ce9647 (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /xpcom/threads/ThrottledEventQueue.cpp | |
parent | Initial commit. (diff) | |
download | thunderbird-upstream.tar.xz thunderbird-upstream.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | xpcom/threads/ThrottledEventQueue.cpp | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/xpcom/threads/ThrottledEventQueue.cpp b/xpcom/threads/ThrottledEventQueue.cpp new file mode 100644 index 0000000000..9e4219b305 --- /dev/null +++ b/xpcom/threads/ThrottledEventQueue.cpp @@ -0,0 +1,459 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "ThrottledEventQueue.h" + +#include "mozilla/Atomics.h" +#include "mozilla/ClearOnShutdown.h" +#include "mozilla/CondVar.h" +#include "mozilla/EventQueue.h" +#include "mozilla/Mutex.h" +#include "mozilla/Unused.h" +#include "nsThreadUtils.h" + +namespace mozilla { + +namespace {} // anonymous namespace + +// The ThrottledEventQueue is designed with inner and outer objects: +// +// XPCOM code base event target +// | | +// v v +// +-------+ +--------+ +// | Outer | +-->|executor| +// +-------+ | +--------+ +// | | | +// | +-------+ | +// +-->| Inner |<--+ +// +-------+ +// +// Client code references the outer nsIEventTarget which in turn references +// an inner object, which actually holds the queue of runnables. +// +// Whenever the queue is non-empty (and not paused), it keeps an "executor" +// runnable dispatched to the base event target. Each time the executor is run, +// it draws the next event from Inner's queue and runs it. If that queue has +// more events, the executor is dispatched to the base again. +// +// The executor holds a strong reference to the Inner object. This means that if +// the outer object is dereferenced and destroyed, the Inner object will remain +// live for as long as the executor exists - that is, until the Inner's queue is +// empty. +// +// A Paused ThrottledEventQueue does not enqueue an executor when new events are +// added. Any executor previously queued on the base event target draws no +// events from a Paused ThrottledEventQueue, and returns without re-enqueueing +// itself. Since there is no executor keeping the Inner object alive until its +// queue is empty, dropping a Paused ThrottledEventQueue may drop the Inner +// while it still owns events. This is the correct behavior: if there are no +// references to it, it will never be Resumed, and thus it will never dispatch +// events again. +// +// Resuming a ThrottledEventQueue must dispatch an executor, so calls to Resume +// are fallible for the same reasons as calls to Dispatch. +// +// The xpcom shutdown process drains the main thread's event queue several +// times, so if a ThrottledEventQueue is being driven by the main thread, it +// should get emptied out by the time we reach the "eventq shutdown" phase. +class ThrottledEventQueue::Inner final : public nsISupports { + // The runnable which is dispatched to the underlying base target. Since + // we only execute one event at a time we just re-use a single instance + // of this class while there are events left in the queue. + class Executor final : public Runnable, public nsIRunnablePriority { + // The Inner whose runnables we execute. mInner->mExecutor points + // to this executor, forming a reference loop. + RefPtr<Inner> mInner; + + ~Executor() = default; + + public: + explicit Executor(Inner* aInner) + : Runnable("ThrottledEventQueue::Inner::Executor"), mInner(aInner) {} + + NS_DECL_ISUPPORTS_INHERITED + + NS_IMETHODIMP + Run() override { + mInner->ExecuteRunnable(); + return NS_OK; + } + + NS_IMETHODIMP + GetPriority(uint32_t* aPriority) override { + *aPriority = mInner->mPriority; + return NS_OK; + } + +#ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY + NS_IMETHODIMP + GetName(nsACString& aName) override { return mInner->CurrentName(aName); } +#endif + }; + + mutable Mutex mMutex; + mutable CondVar mIdleCondVar MOZ_GUARDED_BY(mMutex); + + // As-of-yet unexecuted runnables queued on this ThrottledEventQueue. + // + // Used from any thread; protected by mMutex. Signals mIdleCondVar when + // emptied. + EventQueueSized<64> mEventQueue MOZ_GUARDED_BY(mMutex); + + // The event target we dispatch our events (actually, just our Executor) to. + // + // Written only during construction. Readable by any thread without locking. + const nsCOMPtr<nsISerialEventTarget> mBaseTarget; + + // The Executor that we dispatch to mBaseTarget to draw runnables from our + // queue. mExecutor->mInner points to this Inner, forming a reference loop. + // + // Used from any thread; protected by mMutex. + nsCOMPtr<nsIRunnable> mExecutor MOZ_GUARDED_BY(mMutex); + + const char* const mName; + + const uint32_t mPriority; + + // True if this queue is currently paused. + // Used from any thread; protected by mMutex. + bool mIsPaused MOZ_GUARDED_BY(mMutex); + + explicit Inner(nsISerialEventTarget* aBaseTarget, const char* aName, + uint32_t aPriority) + : mMutex("ThrottledEventQueue"), + mIdleCondVar(mMutex, "ThrottledEventQueue:Idle"), + mBaseTarget(aBaseTarget), + mName(aName), + mPriority(aPriority), + mIsPaused(false) { + MOZ_ASSERT(mName, "Must pass a valid name!"); + } + + ~Inner() { +#ifdef DEBUG + MutexAutoLock lock(mMutex); + + // As long as an executor exists, it had better keep us alive, since it's + // going to call ExecuteRunnable on us. + MOZ_ASSERT(!mExecutor); + + // If we have any events in our queue, there should be an executor queued + // for them, and that should have kept us alive. The exception is that, if + // we're paused, we don't enqueue an executor. + MOZ_ASSERT(mEventQueue.IsEmpty(lock) || IsPaused(lock)); + + // Some runnables are only safe to drop on the main thread, so if our queue + // isn't empty, we'd better be on the main thread. + MOZ_ASSERT_IF(!mEventQueue.IsEmpty(lock), NS_IsMainThread()); +#endif + } + + // Make sure an executor has been queued on our base target. If we already + // have one, do nothing; otherwise, create and dispatch it. + nsresult EnsureExecutor(MutexAutoLock& lock) MOZ_REQUIRES(mMutex) { + if (mExecutor) return NS_OK; + + // Note, this creates a ref cycle keeping the inner alive + // until the queue is drained. + mExecutor = new Executor(this); + nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL); + if (NS_WARN_IF(NS_FAILED(rv))) { + mExecutor = nullptr; + return rv; + } + + return NS_OK; + } + + nsresult CurrentName(nsACString& aName) { + nsCOMPtr<nsIRunnable> event; + +#ifdef DEBUG + bool currentThread = false; + mBaseTarget->IsOnCurrentThread(¤tThread); + MOZ_ASSERT(currentThread); +#endif + + { + MutexAutoLock lock(mMutex); + event = mEventQueue.PeekEvent(lock); + // It is possible that mEventQueue wasn't empty when the executor + // was added to the queue, but someone processed events from mEventQueue + // before the executor, this is why mEventQueue is empty here + if (!event) { + aName.AssignLiteral("no runnables left in the ThrottledEventQueue"); + return NS_OK; + } + } + + if (nsCOMPtr<nsINamed> named = do_QueryInterface(event)) { + nsresult rv = named->GetName(aName); + return rv; + } + + aName.AssignASCII(mName); + return NS_OK; + } + + void ExecuteRunnable() { + // Any thread + nsCOMPtr<nsIRunnable> event; + +#ifdef DEBUG + bool currentThread = false; + mBaseTarget->IsOnCurrentThread(¤tThread); + MOZ_ASSERT(currentThread); +#endif + + { + MutexAutoLock lock(mMutex); + + // Normally, a paused queue doesn't dispatch any executor, but we might + // have been paused after the executor was already in flight. There's no + // way to yank the executor out of the base event target, so we just check + // for a paused queue here and return without running anything. We'll + // create a new executor when we're resumed. + if (IsPaused(lock)) { + // Note, this breaks a ref cycle. + mExecutor = nullptr; + return; + } + + // We only dispatch an executor runnable when we know there is something + // in the queue, so this should never fail. + event = mEventQueue.GetEvent(lock); + MOZ_ASSERT(event); + + // If there are more events in the queue, then dispatch the next + // executor. We do this now, before running the event, because + // the event might spin the event loop and we don't want to stall + // the queue. + if (mEventQueue.HasReadyEvent(lock)) { + // Dispatch the next base target runnable to attempt to execute + // the next throttled event. We must do this before executing + // the event in case the event spins the event loop. + MOZ_ALWAYS_SUCCEEDS( + mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL)); + } + + // Otherwise the queue is empty and we can stop dispatching the + // executor. + else { + // Break the Executor::mInner / Inner::mExecutor reference loop. + mExecutor = nullptr; + mIdleCondVar.NotifyAll(); + } + } + + // Execute the event now that we have unlocked. + LogRunnable::Run log(event); + Unused << event->Run(); + + // To cover the event's destructor code in the LogRunnable log + event = nullptr; + } + + public: + static already_AddRefed<Inner> Create(nsISerialEventTarget* aBaseTarget, + const char* aName, uint32_t aPriority) { + MOZ_ASSERT(NS_IsMainThread()); + // FIXME: This assertion only worked when `sCurrentShutdownPhase` was not + // being updated. + // MOZ_ASSERT(ClearOnShutdown_Internal::sCurrentShutdownPhase == + // ShutdownPhase::NotInShutdown); + + RefPtr<Inner> ref = new Inner(aBaseTarget, aName, aPriority); + return ref.forget(); + } + + bool IsEmpty() const { + // Any thread + return Length() == 0; + } + + uint32_t Length() const { + // Any thread + MutexAutoLock lock(mMutex); + return mEventQueue.Count(lock); + } + + already_AddRefed<nsIRunnable> GetEvent() { + MutexAutoLock lock(mMutex); + return mEventQueue.GetEvent(lock); + } + + void AwaitIdle() const { + // Any thread, except the main thread or our base target. Blocking the + // main thread is forbidden. Blocking the base target is guaranteed to + // produce a deadlock. + MOZ_ASSERT(!NS_IsMainThread()); +#ifdef DEBUG + bool onBaseTarget = false; + Unused << mBaseTarget->IsOnCurrentThread(&onBaseTarget); + MOZ_ASSERT(!onBaseTarget); +#endif + + MutexAutoLock lock(mMutex); + while (mExecutor || IsPaused(lock)) { + mIdleCondVar.Wait(); + } + } + + bool IsPaused() const { + MutexAutoLock lock(mMutex); + return IsPaused(lock); + } + + bool IsPaused(const MutexAutoLock& aProofOfLock) const MOZ_REQUIRES(mMutex) { + return mIsPaused; + } + + nsresult SetIsPaused(bool aIsPaused) { + MutexAutoLock lock(mMutex); + + // If we will be unpaused, and we have events in our queue, make sure we + // have an executor queued on the base event target to run them. Do this + // before we actually change mIsPaused, since this is fallible. + if (!aIsPaused && !mEventQueue.IsEmpty(lock)) { + nsresult rv = EnsureExecutor(lock); + if (NS_FAILED(rv)) { + return rv; + } + } + + mIsPaused = aIsPaused; + return NS_OK; + } + + nsresult DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags) { + // Any thread + nsCOMPtr<nsIRunnable> r = aEvent; + return Dispatch(r.forget(), aFlags); + } + + nsresult Dispatch(already_AddRefed<nsIRunnable> aEvent, uint32_t aFlags) { + MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL || aFlags == NS_DISPATCH_AT_END); + + // Any thread + MutexAutoLock lock(mMutex); + + if (!IsPaused(lock)) { + // Make sure we have an executor in flight to process events. This is + // fallible, so do it first. Our lock will prevent the executor from + // accessing the event queue before we add the event below. + nsresult rv = EnsureExecutor(lock); + if (NS_FAILED(rv)) return rv; + } + + // Only add the event to the underlying queue if are able to + // dispatch to our base target. + nsCOMPtr<nsIRunnable> event(aEvent); + LogRunnable::LogDispatch(event); + mEventQueue.PutEvent(event.forget(), EventQueuePriority::Normal, lock); + return NS_OK; + } + + nsresult DelayedDispatch(already_AddRefed<nsIRunnable> aEvent, + uint32_t aDelay) { + // The base target may implement this, but we don't. Always fail + // to provide consistent behavior. + return NS_ERROR_NOT_IMPLEMENTED; + } + + nsresult RegisterShutdownTask(nsITargetShutdownTask* aTask) { + return mBaseTarget->RegisterShutdownTask(aTask); + } + + nsresult UnregisterShutdownTask(nsITargetShutdownTask* aTask) { + return mBaseTarget->UnregisterShutdownTask(aTask); + } + + bool IsOnCurrentThread() { return mBaseTarget->IsOnCurrentThread(); } + + NS_DECL_THREADSAFE_ISUPPORTS +}; + +NS_IMPL_ISUPPORTS(ThrottledEventQueue::Inner, nsISupports); + +NS_IMPL_ISUPPORTS_INHERITED(ThrottledEventQueue::Inner::Executor, Runnable, + nsIRunnablePriority) + +NS_IMPL_ISUPPORTS(ThrottledEventQueue, ThrottledEventQueue, nsIEventTarget, + nsISerialEventTarget); + +ThrottledEventQueue::ThrottledEventQueue(already_AddRefed<Inner> aInner) + : mInner(aInner) { + MOZ_ASSERT(mInner); +} + +already_AddRefed<ThrottledEventQueue> ThrottledEventQueue::Create( + nsISerialEventTarget* aBaseTarget, const char* aName, uint32_t aPriority) { + MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(aBaseTarget); + + RefPtr<Inner> inner = Inner::Create(aBaseTarget, aName, aPriority); + + RefPtr<ThrottledEventQueue> ref = new ThrottledEventQueue(inner.forget()); + return ref.forget(); +} + +bool ThrottledEventQueue::IsEmpty() const { return mInner->IsEmpty(); } + +uint32_t ThrottledEventQueue::Length() const { return mInner->Length(); } + +// Get the next runnable from the queue +already_AddRefed<nsIRunnable> ThrottledEventQueue::GetEvent() { + return mInner->GetEvent(); +} + +void ThrottledEventQueue::AwaitIdle() const { return mInner->AwaitIdle(); } + +nsresult ThrottledEventQueue::SetIsPaused(bool aIsPaused) { + return mInner->SetIsPaused(aIsPaused); +} + +bool ThrottledEventQueue::IsPaused() const { return mInner->IsPaused(); } + +NS_IMETHODIMP +ThrottledEventQueue::DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags) { + return mInner->DispatchFromScript(aEvent, aFlags); +} + +NS_IMETHODIMP +ThrottledEventQueue::Dispatch(already_AddRefed<nsIRunnable> aEvent, + uint32_t aFlags) { + return mInner->Dispatch(std::move(aEvent), aFlags); +} + +NS_IMETHODIMP +ThrottledEventQueue::DelayedDispatch(already_AddRefed<nsIRunnable> aEvent, + uint32_t aFlags) { + return mInner->DelayedDispatch(std::move(aEvent), aFlags); +} + +NS_IMETHODIMP +ThrottledEventQueue::RegisterShutdownTask(nsITargetShutdownTask* aTask) { + return mInner->RegisterShutdownTask(aTask); +} + +NS_IMETHODIMP +ThrottledEventQueue::UnregisterShutdownTask(nsITargetShutdownTask* aTask) { + return mInner->UnregisterShutdownTask(aTask); +} + +NS_IMETHODIMP +ThrottledEventQueue::IsOnCurrentThread(bool* aResult) { + *aResult = mInner->IsOnCurrentThread(); + return NS_OK; +} + +NS_IMETHODIMP_(bool) +ThrottledEventQueue::IsOnCurrentThreadInfallible() { + return mInner->IsOnCurrentThread(); +} + +} // namespace mozilla |