summaryrefslogtreecommitdiffstats
path: root/xpcom/threads/StateMirroring.h
diff options
context:
space:
mode:
Diffstat (limited to 'xpcom/threads/StateMirroring.h')
-rw-r--r--xpcom/threads/StateMirroring.h453
1 files changed, 453 insertions, 0 deletions
diff --git a/xpcom/threads/StateMirroring.h b/xpcom/threads/StateMirroring.h
new file mode 100644
index 0000000000..9f8ded70f4
--- /dev/null
+++ b/xpcom/threads/StateMirroring.h
@@ -0,0 +1,453 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#if !defined(StateMirroring_h_)
+# define StateMirroring_h_
+
+# include <cstddef>
+# include "mozilla/AbstractThread.h"
+# include "mozilla/AlreadyAddRefed.h"
+# include "mozilla/Assertions.h"
+# include "mozilla/Logging.h"
+# include "mozilla/Maybe.h"
+# include "mozilla/RefPtr.h"
+# include "mozilla/StateWatching.h"
+# include "nsCOMPtr.h"
+# include "nsIRunnable.h"
+# include "nsISupports.h"
+# include "nsTArray.h"
+# include "nsThreadUtils.h"
+
+/*
+ * The state-mirroring machinery allows pieces of interesting state to be
+ * observed on multiple thread without locking. The basic strategy is to track
+ * changes in a canonical value and post updates to other threads that hold
+ * mirrors for that value.
+ *
+ * One problem with the naive implementation of such a system is that some
+ * pieces of state need to be updated atomically, and certain other operations
+ * need to wait for these atomic updates to complete before executing. The
+ * state-mirroring machinery solves this problem by requiring that its owner
+ * thread uses tail dispatch, and posting state update events (which should
+ * always be run first by TaskDispatcher implementations) to that tail
+ * dispatcher. This ensures that state changes are always atomic from the
+ * perspective of observing threads.
+ *
+ * Given that state-mirroring is an automatic background process, we try to
+ * avoid burdening the caller with worrying too much about teardown. To that
+ * end, we don't assert dispatch success for any of the notifications, and
+ * assume that any canonical or mirror owned by a thread for whom dispatch fails
+ * will soon be disconnected by its holder anyway.
+ *
+ * Given that semantics may change and comments tend to go out of date, we
+ * deliberately don't provide usage examples here. Grep around to find them.
+ */
+
+namespace mozilla {
+
+// Mirror<T> and Canonical<T> inherit WatchTarget, so we piggy-back on the
+// logging that WatchTarget already does. Given that, it makes sense to share
+// the same log module.
+# define MIRROR_LOG(x, ...) \
+ MOZ_ASSERT(gStateWatchingLog); \
+ MOZ_LOG(gStateWatchingLog, LogLevel::Debug, (x, ##__VA_ARGS__))
+
+template <typename T>
+class AbstractMirror;
+
+/*
+ * AbstractCanonical is a superclass from which all Canonical values must
+ * inherit. It serves as the interface of operations which may be performed (via
+ * asynchronous dispatch) by other threads, in particular by the corresponding
+ * Mirror value.
+ */
+template <typename T>
+class AbstractCanonical {
+ public:
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical)
+ AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {}
+ virtual void AddMirror(AbstractMirror<T>* aMirror) = 0;
+ virtual void RemoveMirror(AbstractMirror<T>* aMirror) = 0;
+
+ AbstractThread* OwnerThread() const { return mOwnerThread; }
+
+ protected:
+ virtual ~AbstractCanonical() {}
+ RefPtr<AbstractThread> mOwnerThread;
+};
+
+/*
+ * AbstractMirror is a superclass from which all Mirror values must
+ * inherit. It serves as the interface of operations which may be performed (via
+ * asynchronous dispatch) by other threads, in particular by the corresponding
+ * Canonical value.
+ */
+template <typename T>
+class AbstractMirror {
+ public:
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror)
+ AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {}
+ virtual void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) = 0;
+ virtual void UpdateValue(const T& aNewValue) = 0;
+ virtual void NotifyDisconnected() = 0;
+
+ AbstractThread* OwnerThread() const { return mOwnerThread; }
+
+ protected:
+ virtual ~AbstractMirror() {}
+ RefPtr<AbstractThread> mOwnerThread;
+};
+
+/*
+ * Canonical<T> is a wrapper class that allows a given value to be mirrored by
+ * other threads. It maintains a list of active mirrors, and queues updates for
+ * them when the internal value changes. When changing the value, the caller
+ * needs to pass a TaskDispatcher object, which fires the updates at the
+ * appropriate time. Canonical<T> is also a WatchTarget, and may be set up to
+ * trigger other routines (on the same thread) when the canonical value changes.
+ *
+ * Canonical<T> is intended to be used as a member variable, so it doesn't
+ * actually inherit AbstractCanonical<T> (a refcounted type). Rather, it
+ * contains an inner class called |Impl| that implements most of the interesting
+ * logic.
+ */
+template <typename T>
+class Canonical {
+ public:
+ Canonical(AbstractThread* aThread, const T& aInitialValue,
+ const char* aName) {
+ mImpl = new Impl(aThread, aInitialValue, aName);
+ }
+
+ ~Canonical() {}
+
+ private:
+ class Impl : public AbstractCanonical<T>, public WatchTarget {
+ public:
+ using AbstractCanonical<T>::OwnerThread;
+
+ Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
+ : AbstractCanonical<T>(aThread),
+ WatchTarget(aName),
+ mValue(aInitialValue) {
+ MIRROR_LOG("%s [%p] initialized", mName, this);
+ MOZ_ASSERT(aThread->SupportsTailDispatch(),
+ "Can't get coherency without tail dispatch");
+ }
+
+ void ConnectMirror(AbstractMirror<T>* aMirror) {
+ MIRROR_LOG("%s [%p] canonical-init connecting mirror %p", mName, this,
+ aMirror);
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aMirror->OwnerThread()),
+ "Can't get coherency without tail dispatch");
+ aMirror->ConnectedOnCanonicalThread(this);
+ AddMirror(aMirror);
+ }
+
+ void AddMirror(AbstractMirror<T>* aMirror) override {
+ MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror);
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(!mMirrors.Contains(aMirror));
+ mMirrors.AppendElement(aMirror);
+ aMirror->OwnerThread()->DispatchStateChange(MakeNotifier(aMirror));
+ }
+
+ void RemoveMirror(AbstractMirror<T>* aMirror) override {
+ MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror);
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(mMirrors.Contains(aMirror));
+ mMirrors.RemoveElement(aMirror);
+ }
+
+ void DisconnectAll() {
+ MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this);
+ for (size_t i = 0; i < mMirrors.Length(); ++i) {
+ mMirrors[i]->OwnerThread()->Dispatch(
+ NewRunnableMethod("AbstractMirror::NotifyDisconnected", mMirrors[i],
+ &AbstractMirror<T>::NotifyDisconnected));
+ }
+ mMirrors.Clear();
+ }
+
+ operator const T&() {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ return mValue;
+ }
+
+ void Set(const T& aNewValue) {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+
+ if (aNewValue == mValue) {
+ return;
+ }
+
+ // Notify same-thread watchers. The state watching machinery will make
+ // sure that notifications run at the right time.
+ NotifyWatchers();
+
+ // Check if we've already got a pending update. If so we won't schedule
+ // another one.
+ bool alreadyNotifying = mInitialValue.isSome();
+
+ // Stash the initial value if needed, then update to the new value.
+ if (mInitialValue.isNothing()) {
+ mInitialValue.emplace(mValue);
+ }
+ mValue = aNewValue;
+
+ // We wait until things have stablized before sending state updates so
+ // that we can avoid sending multiple updates, and possibly avoid sending
+ // any updates at all if the value ends up where it started.
+ if (!alreadyNotifying) {
+ AbstractThread::DispatchDirectTask(NewRunnableMethod(
+ "Canonical::Impl::DoNotify", this, &Impl::DoNotify));
+ }
+ }
+
+ Impl& operator=(const T& aNewValue) {
+ Set(aNewValue);
+ return *this;
+ }
+ Impl& operator=(const Impl& aOther) {
+ Set(aOther);
+ return *this;
+ }
+ Impl(const Impl& aOther) = delete;
+
+ protected:
+ ~Impl() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); }
+
+ private:
+ void DoNotify() {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(mInitialValue.isSome());
+ bool same = mInitialValue.ref() == mValue;
+ mInitialValue.reset();
+
+ if (same) {
+ MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this);
+ return;
+ }
+
+ for (size_t i = 0; i < mMirrors.Length(); ++i) {
+ mMirrors[i]->OwnerThread()->DispatchStateChange(
+ MakeNotifier(mMirrors[i]));
+ }
+ }
+
+ already_AddRefed<nsIRunnable> MakeNotifier(AbstractMirror<T>* aMirror) {
+ return NewRunnableMethod<T>("AbstractMirror::UpdateValue", aMirror,
+ &AbstractMirror<T>::UpdateValue, mValue);
+ }
+
+ T mValue;
+ Maybe<T> mInitialValue;
+ nsTArray<RefPtr<AbstractMirror<T>>> mMirrors;
+ };
+
+ public:
+ /*
+ * Connect this Canonical to aMirror. Note that the canonical value starts
+ * being mirrored to aMirror immediately, and requires one thread hop to
+ * aMirror's owning thread before the connection is established.
+ *
+ * Note that this could race with Mirror::DisconnectIfConnected(). It is up to
+ * the caller to provide the guarantee that disconnection happens after the
+ * connection has been established. There is no race between this and
+ * DisconnectAll().
+ */
+ void ConnectMirror(AbstractMirror<T>* aMirror) {
+ return mImpl->ConnectMirror(aMirror);
+ }
+ void DisconnectAll() { return mImpl->DisconnectAll(); }
+
+ // Access to the Impl.
+ operator Impl&() { return *mImpl; }
+ Impl* operator&() { return mImpl; }
+
+ // Access to the T.
+ const T& Ref() const { return *mImpl; }
+ operator const T&() const { return Ref(); }
+ void Set(const T& aNewValue) { mImpl->Set(aNewValue); }
+ Canonical& operator=(const T& aNewValue) {
+ Set(aNewValue);
+ return *this;
+ }
+ Canonical& operator=(const Canonical& aOther) {
+ Set(aOther);
+ return *this;
+ }
+ Canonical(const Canonical& aOther) = delete;
+
+ private:
+ RefPtr<Impl> mImpl;
+};
+
+/*
+ * Mirror<T> is a wrapper class that allows a given value to mirror that of a
+ * Canonical<T> owned by another thread. It registers itself with a
+ * Canonical<T>, and is periodically updated with new values. Mirror<T> is also
+ * a WatchTarget, and may be set up to trigger other routines (on the same
+ * thread) when the mirrored value changes.
+ *
+ * Mirror<T> is intended to be used as a member variable, so it doesn't actually
+ * inherit AbstractMirror<T> (a refcounted type). Rather, it contains an inner
+ * class called |Impl| that implements most of the interesting logic.
+ */
+template <typename T>
+class Mirror {
+ public:
+ Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName) {
+ mImpl = new Impl(aThread, aInitialValue, aName);
+ }
+
+ ~Mirror() {
+ // As a member of complex objects, a Mirror<T> may be destroyed on a
+ // different thread than its owner, or late in shutdown during CC. Given
+ // that, we require manual disconnection so that callers can put things in
+ // the right place.
+ MOZ_DIAGNOSTIC_ASSERT(!mImpl->IsConnected());
+ mImpl->AssertNoIncomingConnects();
+ }
+
+ private:
+ class Impl : public AbstractMirror<T>, public WatchTarget {
+ public:
+ using AbstractMirror<T>::OwnerThread;
+
+ Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
+ : AbstractMirror<T>(aThread),
+ WatchTarget(aName),
+ mValue(aInitialValue) {
+ MIRROR_LOG("%s [%p] initialized", mName, this);
+ MOZ_ASSERT(aThread->SupportsTailDispatch(),
+ "Can't get coherency without tail dispatch");
+ }
+
+ operator const T&() {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ return mValue;
+ }
+
+ void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) override {
+ MOZ_ASSERT(aCanonical->OwnerThread()->IsCurrentThreadIn());
+# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ ++mIncomingConnects;
+# endif
+ OwnerThread()->DispatchStateChange(
+ NewRunnableMethod<StoreRefPtrPassByPtr<AbstractCanonical<T>>>(
+ "Mirror::Impl::SetCanonical", this, &Impl::SetCanonical,
+ aCanonical));
+ }
+
+ void SetCanonical(AbstractCanonical<T>* aCanonical) {
+ MIRROR_LOG("%s [%p] Canonical-init setting canonical %p", mName, this,
+ aCanonical);
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(!IsConnected());
+# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ --mIncomingConnects;
+# endif
+ mCanonical = aCanonical;
+ }
+
+ void UpdateValue(const T& aNewValue) override {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ if (mValue != aNewValue) {
+ mValue = aNewValue;
+ WatchTarget::NotifyWatchers();
+ }
+ }
+
+ void NotifyDisconnected() override {
+ MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this,
+ mCanonical.get());
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ mCanonical = nullptr;
+ }
+
+ bool IsConnected() const { return !!mCanonical; }
+
+ void Connect(AbstractCanonical<T>* aCanonical) {
+ MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical);
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ MOZ_ASSERT(!IsConnected());
+ MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aCanonical->OwnerThread()),
+ "Can't get coherency without tail dispatch");
+
+ nsCOMPtr<nsIRunnable> r =
+ NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>(
+ "AbstractCanonical::AddMirror", aCanonical,
+ &AbstractCanonical<T>::AddMirror, this);
+ aCanonical->OwnerThread()->Dispatch(r.forget());
+ mCanonical = aCanonical;
+ }
+
+ void DisconnectIfConnected() {
+ MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
+ if (!IsConnected()) {
+ return;
+ }
+
+ MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this,
+ mCanonical.get());
+ nsCOMPtr<nsIRunnable> r =
+ NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>(
+ "AbstractCanonical::RemoveMirror", mCanonical,
+ &AbstractCanonical<T>::RemoveMirror, this);
+ mCanonical->OwnerThread()->Dispatch(r.forget());
+ mCanonical = nullptr;
+ }
+
+ void AssertNoIncomingConnects() {
+ MOZ_DIAGNOSTIC_ASSERT(mIncomingConnects == 0);
+ }
+
+ protected:
+ ~Impl() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); }
+
+ private:
+ T mValue;
+ RefPtr<AbstractCanonical<T>> mCanonical;
+# ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ std::atomic<size_t> mIncomingConnects = 0;
+# endif
+ };
+
+ public:
+ // Forward control operations to the Impl<T>.
+ /*
+ * Connect aCanonical to this Mirror. Note that this requires one thread hop
+ * back to aCanonical's owning thread before the canonical value starts
+ * being mirrored, and another to our owning thread before the connection is
+ * established.
+ *
+ * Note that this mirror-initialized connection could race with
+ * Canonical::DisconnectAll(). It is up to the caller to provide the guarantee
+ * that disconnection happens after the connection has been established. There
+ * is no race between this and DisconnectIfConnected().
+ */
+ void Connect(AbstractCanonical<T>* aCanonical) { mImpl->Connect(aCanonical); }
+ void DisconnectIfConnected() { mImpl->DisconnectIfConnected(); }
+
+ // Access to the Impl<T>.
+ operator Impl&() { return *mImpl; }
+ Impl* operator&() { return mImpl; }
+
+ // Access to the T.
+ const T& Ref() const { return *mImpl; }
+ operator const T&() const { return Ref(); }
+
+ private:
+ RefPtr<Impl> mImpl;
+};
+
+# undef MIRROR_LOG
+
+} // namespace mozilla
+
+#endif