/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef XPCOM_THREADS_STATEMIRRORING_H_ #define XPCOM_THREADS_STATEMIRRORING_H_ #include #include "mozilla/AbstractThread.h" #include "mozilla/AlreadyAddRefed.h" #include "mozilla/Assertions.h" #include "mozilla/Logging.h" #include "mozilla/Maybe.h" #include "mozilla/RefPtr.h" #include "mozilla/StateWatching.h" #include "nsCOMPtr.h" #include "nsIRunnable.h" #include "nsISupports.h" #include "nsTArray.h" #include "nsThreadUtils.h" /* * The state-mirroring machinery allows pieces of interesting state to be * observed on multiple thread without locking. The basic strategy is to track * changes in a canonical value and post updates to other threads that hold * mirrors for that value. * * One problem with the naive implementation of such a system is that some * pieces of state need to be updated atomically, and certain other operations * need to wait for these atomic updates to complete before executing. The * state-mirroring machinery solves this problem by requiring that its owner * thread uses tail dispatch, and posting state update events (which should * always be run first by TaskDispatcher implementations) to that tail * dispatcher. This ensures that state changes are always atomic from the * perspective of observing threads. * * Given that state-mirroring is an automatic background process, we try to * avoid burdening the caller with worrying too much about teardown. To that * end, we don't assert dispatch success for any of the notifications, and * assume that any canonical or mirror owned by a thread for whom dispatch fails * will soon be disconnected by its holder anyway. * * Given that semantics may change and comments tend to go out of date, we * deliberately don't provide usage examples here. Grep around to find them. */ namespace mozilla { // Mirror and Canonical inherit WatchTarget, so we piggy-back on the // logging that WatchTarget already does. Given that, it makes sense to share // the same log module. #define MIRROR_LOG(x, ...) \ MOZ_ASSERT(gStateWatchingLog); \ MOZ_LOG(gStateWatchingLog, LogLevel::Debug, (x, ##__VA_ARGS__)) template class AbstractMirror; /* * AbstractCanonical is a superclass from which all Canonical values must * inherit. It serves as the interface of operations which may be performed (via * asynchronous dispatch) by other threads, in particular by the corresponding * Mirror value. */ template class AbstractCanonical { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical) AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {} virtual void AddMirror(AbstractMirror* aMirror) = 0; virtual void RemoveMirror(AbstractMirror* aMirror) = 0; AbstractThread* OwnerThread() const { return mOwnerThread; } protected: virtual ~AbstractCanonical() {} RefPtr mOwnerThread; }; /* * AbstractMirror is a superclass from which all Mirror values must * inherit. It serves as the interface of operations which may be performed (via * asynchronous dispatch) by other threads, in particular by the corresponding * Canonical value. */ template class AbstractMirror { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror) AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {} virtual void ConnectedOnCanonicalThread(AbstractCanonical* aCanonical) = 0; virtual void UpdateValue(const T& aNewValue) = 0; virtual void NotifyDisconnected() = 0; AbstractThread* OwnerThread() const { return mOwnerThread; } protected: virtual ~AbstractMirror() {} RefPtr mOwnerThread; }; /* * Canonical is a wrapper class that allows a given value to be mirrored by * other threads. It maintains a list of active mirrors, and queues updates for * them when the internal value changes. When changing the value, the caller * needs to pass a TaskDispatcher object, which fires the updates at the * appropriate time. Canonical is also a WatchTarget, and may be set up to * trigger other routines (on the same thread) when the canonical value changes. * * Canonical is intended to be used as a member variable, so it doesn't * actually inherit AbstractCanonical (a refcounted type). Rather, it * contains an inner class called |Impl| that implements most of the interesting * logic. */ template class Canonical { public: Canonical(AbstractThread* aThread, const T& aInitialValue, const char* aName) { mImpl = new Impl(aThread, aInitialValue, aName); } ~Canonical() {} private: class Impl : public AbstractCanonical, public WatchTarget { public: using AbstractCanonical::OwnerThread; Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName) : AbstractCanonical(aThread), WatchTarget(aName), mValue(aInitialValue) { MIRROR_LOG("%s [%p] initialized", mName, this); MOZ_ASSERT(aThread->SupportsTailDispatch(), "Can't get coherency without tail dispatch"); } void ConnectMirror(AbstractMirror* aMirror) { MIRROR_LOG("%s [%p] canonical-init connecting mirror %p", mName, this, aMirror); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aMirror->OwnerThread()), "Can't get coherency without tail dispatch"); aMirror->ConnectedOnCanonicalThread(this); AddMirror(aMirror); } void AddMirror(AbstractMirror* aMirror) override { MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(!mMirrors.Contains(aMirror)); mMirrors.AppendElement(aMirror); aMirror->OwnerThread()->DispatchStateChange(MakeNotifier(aMirror)); } void RemoveMirror(AbstractMirror* aMirror) override { MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(mMirrors.Contains(aMirror)); mMirrors.RemoveElement(aMirror); } void DisconnectAll() { MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this); for (size_t i = 0; i < mMirrors.Length(); ++i) { mMirrors[i]->OwnerThread()->Dispatch( NewRunnableMethod("AbstractMirror::NotifyDisconnected", mMirrors[i], &AbstractMirror::NotifyDisconnected)); } mMirrors.Clear(); } operator const T&() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); return mValue; } void Set(const T& aNewValue) { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (aNewValue == mValue) { return; } // Notify same-thread watchers. The state watching machinery will make // sure that notifications run at the right time. NotifyWatchers(); // Check if we've already got a pending update. If so we won't schedule // another one. bool alreadyNotifying = mInitialValue.isSome(); // Stash the initial value if needed, then update to the new value. if (mInitialValue.isNothing()) { mInitialValue.emplace(mValue); } mValue = aNewValue; // We wait until things have stablized before sending state updates so // that we can avoid sending multiple updates, and possibly avoid sending // any updates at all if the value ends up where it started. if (!alreadyNotifying) { AbstractThread::DispatchDirectTask(NewRunnableMethod( "Canonical::Impl::DoNotify", this, &Impl::DoNotify)); } } Impl& operator=(const T& aNewValue) { Set(aNewValue); return *this; } Impl& operator=(const Impl& aOther) { Set(aOther); return *this; } Impl(const Impl& aOther) = delete; protected: ~Impl() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); } private: void DoNotify() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(mInitialValue.isSome()); bool same = mInitialValue.ref() == mValue; mInitialValue.reset(); if (same) { MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this); return; } for (size_t i = 0; i < mMirrors.Length(); ++i) { mMirrors[i]->OwnerThread()->DispatchStateChange( MakeNotifier(mMirrors[i])); } } already_AddRefed MakeNotifier(AbstractMirror* aMirror) { return NewRunnableMethod("AbstractMirror::UpdateValue", aMirror, &AbstractMirror::UpdateValue, mValue); } T mValue; Maybe mInitialValue; nsTArray>> mMirrors; }; public: /* * Connect this Canonical to aMirror. Note that the canonical value starts * being mirrored to aMirror immediately, and requires one thread hop to * aMirror's owning thread before the connection is established. * * Note that this could race with Mirror::DisconnectIfConnected(). It is up to * the caller to provide the guarantee that disconnection happens after the * connection has been established. There is no race between this and * DisconnectAll(). */ void ConnectMirror(AbstractMirror* aMirror) { return mImpl->ConnectMirror(aMirror); } void DisconnectAll() { return mImpl->DisconnectAll(); } // Access to the Impl. operator Impl&() { return *mImpl; } Impl* operator&() { return mImpl; } // Access to the T. const T& Ref() const { return *mImpl; } operator const T&() const { return Ref(); } void Set(const T& aNewValue) { mImpl->Set(aNewValue); } Canonical& operator=(const T& aNewValue) { Set(aNewValue); return *this; } Canonical& operator=(const Canonical& aOther) { Set(aOther); return *this; } Canonical(const Canonical& aOther) = delete; private: RefPtr mImpl; }; /* * Mirror is a wrapper class that allows a given value to mirror that of a * Canonical owned by another thread. It registers itself with a * Canonical, and is periodically updated with new values. Mirror is also * a WatchTarget, and may be set up to trigger other routines (on the same * thread) when the mirrored value changes. * * Mirror is intended to be used as a member variable, so it doesn't actually * inherit AbstractMirror (a refcounted type). Rather, it contains an inner * class called |Impl| that implements most of the interesting logic. */ template class Mirror { public: Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName) { mImpl = new Impl(aThread, aInitialValue, aName); } ~Mirror() { // As a member of complex objects, a Mirror may be destroyed on a // different thread than its owner, or late in shutdown during CC. Given // that, we require manual disconnection so that callers can put things in // the right place. MOZ_DIAGNOSTIC_ASSERT(!mImpl->IsConnected()); mImpl->AssertNoIncomingConnects(); } private: class Impl : public AbstractMirror, public WatchTarget { public: using AbstractMirror::OwnerThread; Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName) : AbstractMirror(aThread), WatchTarget(aName), mValue(aInitialValue) { MIRROR_LOG("%s [%p] initialized", mName, this); MOZ_ASSERT(aThread->SupportsTailDispatch(), "Can't get coherency without tail dispatch"); } operator const T&() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); return mValue; } void ConnectedOnCanonicalThread(AbstractCanonical* aCanonical) override { MOZ_ASSERT(aCanonical->OwnerThread()->IsCurrentThreadIn()); #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED ++mIncomingConnects; #endif OwnerThread()->DispatchStateChange( NewRunnableMethod>>( "Mirror::Impl::SetCanonical", this, &Impl::SetCanonical, aCanonical)); } void SetCanonical(AbstractCanonical* aCanonical) { MIRROR_LOG("%s [%p] Canonical-init setting canonical %p", mName, this, aCanonical); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(!IsConnected()); #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED --mIncomingConnects; #endif mCanonical = aCanonical; } void UpdateValue(const T& aNewValue) override { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (mValue != aNewValue) { mValue = aNewValue; WatchTarget::NotifyWatchers(); } } void NotifyDisconnected() override { MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this, mCanonical.get()); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); mCanonical = nullptr; } bool IsConnected() const { return !!mCanonical; } void Connect(AbstractCanonical* aCanonical) { MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(!IsConnected()); MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aCanonical->OwnerThread()), "Can't get coherency without tail dispatch"); nsCOMPtr r = NewRunnableMethod>>( "AbstractCanonical::AddMirror", aCanonical, &AbstractCanonical::AddMirror, this); aCanonical->OwnerThread()->Dispatch(r.forget()); mCanonical = aCanonical; } void DisconnectIfConnected() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (!IsConnected()) { return; } MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this, mCanonical.get()); nsCOMPtr r = NewRunnableMethod>>( "AbstractCanonical::RemoveMirror", mCanonical, &AbstractCanonical::RemoveMirror, this); mCanonical->OwnerThread()->Dispatch(r.forget()); mCanonical = nullptr; } void AssertNoIncomingConnects() { MOZ_DIAGNOSTIC_ASSERT(mIncomingConnects == 0); } protected: ~Impl() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); } private: T mValue; RefPtr> mCanonical; #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED std::atomic mIncomingConnects = 0; #endif }; public: // Forward control operations to the Impl. /* * Connect aCanonical to this Mirror. Note that this requires one thread hop * back to aCanonical's owning thread before the canonical value starts * being mirrored, and another to our owning thread before the connection is * established. * * Note that this mirror-initialized connection could race with * Canonical::DisconnectAll(). It is up to the caller to provide the guarantee * that disconnection happens after the connection has been established. There * is no race between this and DisconnectIfConnected(). */ void Connect(AbstractCanonical* aCanonical) { mImpl->Connect(aCanonical); } void DisconnectIfConnected() { mImpl->DisconnectIfConnected(); } // Access to the Impl. operator Impl&() { return *mImpl; } Impl* operator&() { return mImpl; } // Access to the T. const T& Ref() const { return *mImpl; } operator const T&() const { return Ref(); } private: RefPtr mImpl; }; #undef MIRROR_LOG } // namespace mozilla #endif