diff options
Diffstat (limited to 'xpcom/threads/StateMirroring.h')
-rw-r--r-- | xpcom/threads/StateMirroring.h | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/xpcom/threads/StateMirroring.h b/xpcom/threads/StateMirroring.h new file mode 100644 index 0000000000..9f8ded70f4 --- /dev/null +++ b/xpcom/threads/StateMirroring.h @@ -0,0 +1,453 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#if !defined(StateMirroring_h_) +# define StateMirroring_h_ + +# include <cstddef> +# include "mozilla/AbstractThread.h" +# include "mozilla/AlreadyAddRefed.h" +# include "mozilla/Assertions.h" +# include "mozilla/Logging.h" +# include "mozilla/Maybe.h" +# include "mozilla/RefPtr.h" +# include "mozilla/StateWatching.h" +# include "nsCOMPtr.h" +# include "nsIRunnable.h" +# include "nsISupports.h" +# include "nsTArray.h" +# include "nsThreadUtils.h" + +/* + * The state-mirroring machinery allows pieces of interesting state to be + * observed on multiple thread without locking. The basic strategy is to track + * changes in a canonical value and post updates to other threads that hold + * mirrors for that value. + * + * One problem with the naive implementation of such a system is that some + * pieces of state need to be updated atomically, and certain other operations + * need to wait for these atomic updates to complete before executing. The + * state-mirroring machinery solves this problem by requiring that its owner + * thread uses tail dispatch, and posting state update events (which should + * always be run first by TaskDispatcher implementations) to that tail + * dispatcher. This ensures that state changes are always atomic from the + * perspective of observing threads. + * + * Given that state-mirroring is an automatic background process, we try to + * avoid burdening the caller with worrying too much about teardown. To that + * end, we don't assert dispatch success for any of the notifications, and + * assume that any canonical or mirror owned by a thread for whom dispatch fails + * will soon be disconnected by its holder anyway. + * + * Given that semantics may change and comments tend to go out of date, we + * deliberately don't provide usage examples here. Grep around to find them. + */ + +namespace mozilla { + +// Mirror<T> and Canonical<T> inherit WatchTarget, so we piggy-back on the +// logging that WatchTarget already does. Given that, it makes sense to share +// the same log module. +# define MIRROR_LOG(x, ...) \ + MOZ_ASSERT(gStateWatchingLog); \ + MOZ_LOG(gStateWatchingLog, LogLevel::Debug, (x, ##__VA_ARGS__)) + +template <typename T> +class AbstractMirror; + +/* + * AbstractCanonical is a superclass from which all Canonical values must + * inherit. It serves as the interface of operations which may be performed (via + * asynchronous dispatch) by other threads, in particular by the corresponding + * Mirror value. + */ +template <typename T> +class AbstractCanonical { + public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical) + AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {} + virtual void AddMirror(AbstractMirror<T>* aMirror) = 0; + virtual void RemoveMirror(AbstractMirror<T>* aMirror) = 0; + + AbstractThread* OwnerThread() const { return mOwnerThread; } + + protected: + virtual ~AbstractCanonical() {} + RefPtr<AbstractThread> mOwnerThread; +}; + +/* + * AbstractMirror is a superclass from which all Mirror values must + * inherit. It serves as the interface of operations which may be performed (via + * asynchronous dispatch) by other threads, in particular by the corresponding + * Canonical value. + */ +template <typename T> +class AbstractMirror { + public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror) + AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {} + virtual void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) = 0; + virtual void UpdateValue(const T& aNewValue) = 0; + virtual void NotifyDisconnected() = 0; + + AbstractThread* OwnerThread() const { return mOwnerThread; } + + protected: + virtual ~AbstractMirror() {} + RefPtr<AbstractThread> mOwnerThread; +}; + +/* + * Canonical<T> is a wrapper class that allows a given value to be mirrored by + * other threads. It maintains a list of active mirrors, and queues updates for + * them when the internal value changes. When changing the value, the caller + * needs to pass a TaskDispatcher object, which fires the updates at the + * appropriate time. Canonical<T> is also a WatchTarget, and may be set up to + * trigger other routines (on the same thread) when the canonical value changes. + * + * Canonical<T> is intended to be used as a member variable, so it doesn't + * actually inherit AbstractCanonical<T> (a refcounted type). Rather, it + * contains an inner class called |Impl| that implements most of the interesting + * logic. + */ +template <typename T> +class Canonical { + public: + Canonical(AbstractThread* aThread, const T& aInitialValue, + const char* aName) { + mImpl = new Impl(aThread, aInitialValue, aName); + } + + ~Canonical() {} + + private: + class Impl : public AbstractCanonical<T>, public WatchTarget { + public: + using AbstractCanonical<T>::OwnerThread; + + Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName) + : AbstractCanonical<T>(aThread), + WatchTarget(aName), + mValue(aInitialValue) { + MIRROR_LOG("%s [%p] initialized", mName, this); + MOZ_ASSERT(aThread->SupportsTailDispatch(), + "Can't get coherency without tail dispatch"); + } + + void ConnectMirror(AbstractMirror<T>* aMirror) { + MIRROR_LOG("%s [%p] canonical-init connecting mirror %p", mName, this, + aMirror); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aMirror->OwnerThread()), + "Can't get coherency without tail dispatch"); + aMirror->ConnectedOnCanonicalThread(this); + AddMirror(aMirror); + } + + void AddMirror(AbstractMirror<T>* aMirror) override { + MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(!mMirrors.Contains(aMirror)); + mMirrors.AppendElement(aMirror); + aMirror->OwnerThread()->DispatchStateChange(MakeNotifier(aMirror)); + } + + void RemoveMirror(AbstractMirror<T>* aMirror) override { + MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(mMirrors.Contains(aMirror)); + mMirrors.RemoveElement(aMirror); + } + + void DisconnectAll() { + MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this); + for (size_t i = 0; i < mMirrors.Length(); ++i) { + mMirrors[i]->OwnerThread()->Dispatch( + NewRunnableMethod("AbstractMirror::NotifyDisconnected", mMirrors[i], + &AbstractMirror<T>::NotifyDisconnected)); + } + mMirrors.Clear(); + } + + operator const T&() { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + return mValue; + } + + void Set(const T& aNewValue) { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + + if (aNewValue == mValue) { + return; + } + + // Notify same-thread watchers. The state watching machinery will make + // sure that notifications run at the right time. + NotifyWatchers(); + + // Check if we've already got a pending update. If so we won't schedule + // another one. + bool alreadyNotifying = mInitialValue.isSome(); + + // Stash the initial value if needed, then update to the new value. + if (mInitialValue.isNothing()) { + mInitialValue.emplace(mValue); + } + mValue = aNewValue; + + // We wait until things have stablized before sending state updates so + // that we can avoid sending multiple updates, and possibly avoid sending + // any updates at all if the value ends up where it started. + if (!alreadyNotifying) { + AbstractThread::DispatchDirectTask(NewRunnableMethod( + "Canonical::Impl::DoNotify", this, &Impl::DoNotify)); + } + } + + Impl& operator=(const T& aNewValue) { + Set(aNewValue); + return *this; + } + Impl& operator=(const Impl& aOther) { + Set(aOther); + return *this; + } + Impl(const Impl& aOther) = delete; + + protected: + ~Impl() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); } + + private: + void DoNotify() { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(mInitialValue.isSome()); + bool same = mInitialValue.ref() == mValue; + mInitialValue.reset(); + + if (same) { + MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this); + return; + } + + for (size_t i = 0; i < mMirrors.Length(); ++i) { + mMirrors[i]->OwnerThread()->DispatchStateChange( + MakeNotifier(mMirrors[i])); + } + } + + already_AddRefed<nsIRunnable> MakeNotifier(AbstractMirror<T>* aMirror) { + return NewRunnableMethod<T>("AbstractMirror::UpdateValue", aMirror, + &AbstractMirror<T>::UpdateValue, mValue); + } + + T mValue; + Maybe<T> mInitialValue; + nsTArray<RefPtr<AbstractMirror<T>>> mMirrors; + }; + + public: + /* + * Connect this Canonical to aMirror. Note that the canonical value starts + * being mirrored to aMirror immediately, and requires one thread hop to + * aMirror's owning thread before the connection is established. + * + * Note that this could race with Mirror::DisconnectIfConnected(). It is up to + * the caller to provide the guarantee that disconnection happens after the + * connection has been established. There is no race between this and + * DisconnectAll(). + */ + void ConnectMirror(AbstractMirror<T>* aMirror) { + return mImpl->ConnectMirror(aMirror); + } + void DisconnectAll() { return mImpl->DisconnectAll(); } + + // Access to the Impl. + operator Impl&() { return *mImpl; } + Impl* operator&() { return mImpl; } + + // Access to the T. + const T& Ref() const { return *mImpl; } + operator const T&() const { return Ref(); } + void Set(const T& aNewValue) { mImpl->Set(aNewValue); } + Canonical& operator=(const T& aNewValue) { + Set(aNewValue); + return *this; + } + Canonical& operator=(const Canonical& aOther) { + Set(aOther); + return *this; + } + Canonical(const Canonical& aOther) = delete; + + private: + RefPtr<Impl> mImpl; +}; + +/* + * Mirror<T> is a wrapper class that allows a given value to mirror that of a + * Canonical<T> owned by another thread. It registers itself with a + * Canonical<T>, and is periodically updated with new values. Mirror<T> is also + * a WatchTarget, and may be set up to trigger other routines (on the same + * thread) when the mirrored value changes. + * + * Mirror<T> is intended to be used as a member variable, so it doesn't actually + * inherit AbstractMirror<T> (a refcounted type). Rather, it contains an inner + * class called |Impl| that implements most of the interesting logic. + */ +template <typename T> +class Mirror { + public: + Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName) { + mImpl = new Impl(aThread, aInitialValue, aName); + } + + ~Mirror() { + // As a member of complex objects, a Mirror<T> may be destroyed on a + // different thread than its owner, or late in shutdown during CC. Given + // that, we require manual disconnection so that callers can put things in + // the right place. + MOZ_DIAGNOSTIC_ASSERT(!mImpl->IsConnected()); + mImpl->AssertNoIncomingConnects(); + } + + private: + class Impl : public AbstractMirror<T>, public WatchTarget { + public: + using AbstractMirror<T>::OwnerThread; + + Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName) + : AbstractMirror<T>(aThread), + WatchTarget(aName), + mValue(aInitialValue) { + MIRROR_LOG("%s [%p] initialized", mName, this); + MOZ_ASSERT(aThread->SupportsTailDispatch(), + "Can't get coherency without tail dispatch"); + } + + operator const T&() { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + return mValue; + } + + void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) override { + MOZ_ASSERT(aCanonical->OwnerThread()->IsCurrentThreadIn()); +# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED + ++mIncomingConnects; +# endif + OwnerThread()->DispatchStateChange( + NewRunnableMethod<StoreRefPtrPassByPtr<AbstractCanonical<T>>>( + "Mirror::Impl::SetCanonical", this, &Impl::SetCanonical, + aCanonical)); + } + + void SetCanonical(AbstractCanonical<T>* aCanonical) { + MIRROR_LOG("%s [%p] Canonical-init setting canonical %p", mName, this, + aCanonical); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(!IsConnected()); +# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED + --mIncomingConnects; +# endif + mCanonical = aCanonical; + } + + void UpdateValue(const T& aNewValue) override { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + if (mValue != aNewValue) { + mValue = aNewValue; + WatchTarget::NotifyWatchers(); + } + } + + void NotifyDisconnected() override { + MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this, + mCanonical.get()); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + mCanonical = nullptr; + } + + bool IsConnected() const { return !!mCanonical; } + + void Connect(AbstractCanonical<T>* aCanonical) { + MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical); + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + MOZ_ASSERT(!IsConnected()); + MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aCanonical->OwnerThread()), + "Can't get coherency without tail dispatch"); + + nsCOMPtr<nsIRunnable> r = + NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>( + "AbstractCanonical::AddMirror", aCanonical, + &AbstractCanonical<T>::AddMirror, this); + aCanonical->OwnerThread()->Dispatch(r.forget()); + mCanonical = aCanonical; + } + + void DisconnectIfConnected() { + MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); + if (!IsConnected()) { + return; + } + + MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this, + mCanonical.get()); + nsCOMPtr<nsIRunnable> r = + NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>( + "AbstractCanonical::RemoveMirror", mCanonical, + &AbstractCanonical<T>::RemoveMirror, this); + mCanonical->OwnerThread()->Dispatch(r.forget()); + mCanonical = nullptr; + } + + void AssertNoIncomingConnects() { + MOZ_DIAGNOSTIC_ASSERT(mIncomingConnects == 0); + } + + protected: + ~Impl() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); } + + private: + T mValue; + RefPtr<AbstractCanonical<T>> mCanonical; +# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED + std::atomic<size_t> mIncomingConnects = 0; +# endif + }; + + public: + // Forward control operations to the Impl<T>. + /* + * Connect aCanonical to this Mirror. Note that this requires one thread hop + * back to aCanonical's owning thread before the canonical value starts + * being mirrored, and another to our owning thread before the connection is + * established. + * + * Note that this mirror-initialized connection could race with + * Canonical::DisconnectAll(). It is up to the caller to provide the guarantee + * that disconnection happens after the connection has been established. There + * is no race between this and DisconnectIfConnected(). + */ + void Connect(AbstractCanonical<T>* aCanonical) { mImpl->Connect(aCanonical); } + void DisconnectIfConnected() { mImpl->DisconnectIfConnected(); } + + // Access to the Impl<T>. + operator Impl&() { return *mImpl; } + Impl* operator&() { return mImpl; } + + // Access to the T. + const T& Ref() const { return *mImpl; } + operator const T&() const { return Ref(); } + + private: + RefPtr<Impl> mImpl; +}; + +# undef MIRROR_LOG + +} // namespace mozilla + +#endif |