summaryrefslogtreecommitdiffstats
path: root/xpcom/threads/StateMirroring.h
blob: 9f8ded70f42337df88c55abde1cefd22dea7b108 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#if !defined(StateMirroring_h_)
#  define StateMirroring_h_

#  include <cstddef>
#  include "mozilla/AbstractThread.h"
#  include "mozilla/AlreadyAddRefed.h"
#  include "mozilla/Assertions.h"
#  include "mozilla/Logging.h"
#  include "mozilla/Maybe.h"
#  include "mozilla/RefPtr.h"
#  include "mozilla/StateWatching.h"
#  include "nsCOMPtr.h"
#  include "nsIRunnable.h"
#  include "nsISupports.h"
#  include "nsTArray.h"
#  include "nsThreadUtils.h"

/*
 * The state-mirroring machinery allows pieces of interesting state to be
 * observed on multiple thread without locking. The basic strategy is to track
 * changes in a canonical value and post updates to other threads that hold
 * mirrors for that value.
 *
 * One problem with the naive implementation of such a system is that some
 * pieces of state need to be updated atomically, and certain other operations
 * need to wait for these atomic updates to complete before executing. The
 * state-mirroring machinery solves this problem by requiring that its owner
 * thread uses tail dispatch, and posting state update events (which should
 * always be run first by TaskDispatcher implementations) to that tail
 * dispatcher. This ensures that state changes are always atomic from the
 * perspective of observing threads.
 *
 * Given that state-mirroring is an automatic background process, we try to
 * avoid burdening the caller with worrying too much about teardown. To that
 * end, we don't assert dispatch success for any of the notifications, and
 * assume that any canonical or mirror owned by a thread for whom dispatch fails
 * will soon be disconnected by its holder anyway.
 *
 * Given that semantics may change and comments tend to go out of date, we
 * deliberately don't provide usage examples here. Grep around to find them.
 */

namespace mozilla {

// Mirror<T> and Canonical<T> inherit WatchTarget, so we piggy-back on the
// logging that WatchTarget already does. Given that, it makes sense to share
// the same log module.
#  define MIRROR_LOG(x, ...)       \
    MOZ_ASSERT(gStateWatchingLog); \
    MOZ_LOG(gStateWatchingLog, LogLevel::Debug, (x, ##__VA_ARGS__))

template <typename T>
class AbstractMirror;

/*
 * AbstractCanonical is a superclass from which all Canonical values must
 * inherit. It serves as the interface of operations which may be performed (via
 * asynchronous dispatch) by other threads, in particular by the corresponding
 * Mirror value.
 */
template <typename T>
class AbstractCanonical {
 public:
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical)
  AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {}
  virtual void AddMirror(AbstractMirror<T>* aMirror) = 0;
  virtual void RemoveMirror(AbstractMirror<T>* aMirror) = 0;

  AbstractThread* OwnerThread() const { return mOwnerThread; }

 protected:
  virtual ~AbstractCanonical() {}
  RefPtr<AbstractThread> mOwnerThread;
};

/*
 * AbstractMirror is a superclass from which all Mirror values must
 * inherit. It serves as the interface of operations which may be performed (via
 * asynchronous dispatch) by other threads, in particular by the corresponding
 * Canonical value.
 */
template <typename T>
class AbstractMirror {
 public:
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror)
  AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {}
  virtual void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) = 0;
  virtual void UpdateValue(const T& aNewValue) = 0;
  virtual void NotifyDisconnected() = 0;

  AbstractThread* OwnerThread() const { return mOwnerThread; }

 protected:
  virtual ~AbstractMirror() {}
  RefPtr<AbstractThread> mOwnerThread;
};

/*
 * Canonical<T> is a wrapper class that allows a given value to be mirrored by
 * other threads. It maintains a list of active mirrors, and queues updates for
 * them when the internal value changes. When changing the value, the caller
 * needs to pass a TaskDispatcher object, which fires the updates at the
 * appropriate time. Canonical<T> is also a WatchTarget, and may be set up to
 * trigger other routines (on the same thread) when the canonical value changes.
 *
 * Canonical<T> is intended to be used as a member variable, so it doesn't
 * actually inherit AbstractCanonical<T> (a refcounted type). Rather, it
 * contains an inner class called |Impl| that implements most of the interesting
 * logic.
 */
template <typename T>
class Canonical {
 public:
  Canonical(AbstractThread* aThread, const T& aInitialValue,
            const char* aName) {
    mImpl = new Impl(aThread, aInitialValue, aName);
  }

  ~Canonical() {}

 private:
  class Impl : public AbstractCanonical<T>, public WatchTarget {
   public:
    using AbstractCanonical<T>::OwnerThread;

    Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
        : AbstractCanonical<T>(aThread),
          WatchTarget(aName),
          mValue(aInitialValue) {
      MIRROR_LOG("%s [%p] initialized", mName, this);
      MOZ_ASSERT(aThread->SupportsTailDispatch(),
                 "Can't get coherency without tail dispatch");
    }

    void ConnectMirror(AbstractMirror<T>* aMirror) {
      MIRROR_LOG("%s [%p] canonical-init connecting mirror %p", mName, this,
                 aMirror);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aMirror->OwnerThread()),
                 "Can't get coherency without tail dispatch");
      aMirror->ConnectedOnCanonicalThread(this);
      AddMirror(aMirror);
    }

    void AddMirror(AbstractMirror<T>* aMirror) override {
      MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(!mMirrors.Contains(aMirror));
      mMirrors.AppendElement(aMirror);
      aMirror->OwnerThread()->DispatchStateChange(MakeNotifier(aMirror));
    }

    void RemoveMirror(AbstractMirror<T>* aMirror) override {
      MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(mMirrors.Contains(aMirror));
      mMirrors.RemoveElement(aMirror);
    }

    void DisconnectAll() {
      MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this);
      for (size_t i = 0; i < mMirrors.Length(); ++i) {
        mMirrors[i]->OwnerThread()->Dispatch(
            NewRunnableMethod("AbstractMirror::NotifyDisconnected", mMirrors[i],
                              &AbstractMirror<T>::NotifyDisconnected));
      }
      mMirrors.Clear();
    }

    operator const T&() {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      return mValue;
    }

    void Set(const T& aNewValue) {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());

      if (aNewValue == mValue) {
        return;
      }

      // Notify same-thread watchers. The state watching machinery will make
      // sure that notifications run at the right time.
      NotifyWatchers();

      // Check if we've already got a pending update. If so we won't schedule
      // another one.
      bool alreadyNotifying = mInitialValue.isSome();

      // Stash the initial value if needed, then update to the new value.
      if (mInitialValue.isNothing()) {
        mInitialValue.emplace(mValue);
      }
      mValue = aNewValue;

      // We wait until things have stablized before sending state updates so
      // that we can avoid sending multiple updates, and possibly avoid sending
      // any updates at all if the value ends up where it started.
      if (!alreadyNotifying) {
        AbstractThread::DispatchDirectTask(NewRunnableMethod(
            "Canonical::Impl::DoNotify", this, &Impl::DoNotify));
      }
    }

    Impl& operator=(const T& aNewValue) {
      Set(aNewValue);
      return *this;
    }
    Impl& operator=(const Impl& aOther) {
      Set(aOther);
      return *this;
    }
    Impl(const Impl& aOther) = delete;

   protected:
    ~Impl() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); }

   private:
    void DoNotify() {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(mInitialValue.isSome());
      bool same = mInitialValue.ref() == mValue;
      mInitialValue.reset();

      if (same) {
        MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this);
        return;
      }

      for (size_t i = 0; i < mMirrors.Length(); ++i) {
        mMirrors[i]->OwnerThread()->DispatchStateChange(
            MakeNotifier(mMirrors[i]));
      }
    }

    already_AddRefed<nsIRunnable> MakeNotifier(AbstractMirror<T>* aMirror) {
      return NewRunnableMethod<T>("AbstractMirror::UpdateValue", aMirror,
                                  &AbstractMirror<T>::UpdateValue, mValue);
    }

    T mValue;
    Maybe<T> mInitialValue;
    nsTArray<RefPtr<AbstractMirror<T>>> mMirrors;
  };

 public:
  /*
   * Connect this Canonical to aMirror. Note that the canonical value starts
   * being mirrored to aMirror immediately, and requires one thread hop to
   * aMirror's owning thread before the connection is established.
   *
   * Note that this could race with Mirror::DisconnectIfConnected(). It is up to
   * the caller to provide the guarantee that disconnection happens after the
   * connection has been established. There is no race between this and
   * DisconnectAll().
   */
  void ConnectMirror(AbstractMirror<T>* aMirror) {
    return mImpl->ConnectMirror(aMirror);
  }
  void DisconnectAll() { return mImpl->DisconnectAll(); }

  // Access to the Impl.
  operator Impl&() { return *mImpl; }
  Impl* operator&() { return mImpl; }

  // Access to the T.
  const T& Ref() const { return *mImpl; }
  operator const T&() const { return Ref(); }
  void Set(const T& aNewValue) { mImpl->Set(aNewValue); }
  Canonical& operator=(const T& aNewValue) {
    Set(aNewValue);
    return *this;
  }
  Canonical& operator=(const Canonical& aOther) {
    Set(aOther);
    return *this;
  }
  Canonical(const Canonical& aOther) = delete;

 private:
  RefPtr<Impl> mImpl;
};

/*
 * Mirror<T> is a wrapper class that allows a given value to mirror that of a
 * Canonical<T> owned by another thread. It registers itself with a
 * Canonical<T>, and is periodically updated with new values. Mirror<T> is also
 * a WatchTarget, and may be set up to trigger other routines (on the same
 * thread) when the mirrored value changes.
 *
 * Mirror<T> is intended to be used as a member variable, so it doesn't actually
 * inherit AbstractMirror<T> (a refcounted type). Rather, it contains an inner
 * class called |Impl| that implements most of the interesting logic.
 */
template <typename T>
class Mirror {
 public:
  Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName) {
    mImpl = new Impl(aThread, aInitialValue, aName);
  }

  ~Mirror() {
    // As a member of complex objects, a Mirror<T> may be destroyed on a
    // different thread than its owner, or late in shutdown during CC. Given
    // that, we require manual disconnection so that callers can put things in
    // the right place.
    MOZ_DIAGNOSTIC_ASSERT(!mImpl->IsConnected());
    mImpl->AssertNoIncomingConnects();
  }

 private:
  class Impl : public AbstractMirror<T>, public WatchTarget {
   public:
    using AbstractMirror<T>::OwnerThread;

    Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
        : AbstractMirror<T>(aThread),
          WatchTarget(aName),
          mValue(aInitialValue) {
      MIRROR_LOG("%s [%p] initialized", mName, this);
      MOZ_ASSERT(aThread->SupportsTailDispatch(),
                 "Can't get coherency without tail dispatch");
    }

    operator const T&() {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      return mValue;
    }

    void ConnectedOnCanonicalThread(AbstractCanonical<T>* aCanonical) override {
      MOZ_ASSERT(aCanonical->OwnerThread()->IsCurrentThreadIn());
#  ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
      ++mIncomingConnects;
#  endif
      OwnerThread()->DispatchStateChange(
          NewRunnableMethod<StoreRefPtrPassByPtr<AbstractCanonical<T>>>(
              "Mirror::Impl::SetCanonical", this, &Impl::SetCanonical,
              aCanonical));
    }

    void SetCanonical(AbstractCanonical<T>* aCanonical) {
      MIRROR_LOG("%s [%p] Canonical-init setting canonical %p", mName, this,
                 aCanonical);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(!IsConnected());
#  ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
      --mIncomingConnects;
#  endif
      mCanonical = aCanonical;
    }

    void UpdateValue(const T& aNewValue) override {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      if (mValue != aNewValue) {
        mValue = aNewValue;
        WatchTarget::NotifyWatchers();
      }
    }

    void NotifyDisconnected() override {
      MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this,
                 mCanonical.get());
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      mCanonical = nullptr;
    }

    bool IsConnected() const { return !!mCanonical; }

    void Connect(AbstractCanonical<T>* aCanonical) {
      MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(!IsConnected());
      MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aCanonical->OwnerThread()),
                 "Can't get coherency without tail dispatch");

      nsCOMPtr<nsIRunnable> r =
          NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>(
              "AbstractCanonical::AddMirror", aCanonical,
              &AbstractCanonical<T>::AddMirror, this);
      aCanonical->OwnerThread()->Dispatch(r.forget());
      mCanonical = aCanonical;
    }

    void DisconnectIfConnected() {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      if (!IsConnected()) {
        return;
      }

      MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this,
                 mCanonical.get());
      nsCOMPtr<nsIRunnable> r =
          NewRunnableMethod<StoreRefPtrPassByPtr<AbstractMirror<T>>>(
              "AbstractCanonical::RemoveMirror", mCanonical,
              &AbstractCanonical<T>::RemoveMirror, this);
      mCanonical->OwnerThread()->Dispatch(r.forget());
      mCanonical = nullptr;
    }

    void AssertNoIncomingConnects() {
      MOZ_DIAGNOSTIC_ASSERT(mIncomingConnects == 0);
    }

   protected:
    ~Impl() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); }

   private:
    T mValue;
    RefPtr<AbstractCanonical<T>> mCanonical;
#  ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
    std::atomic<size_t> mIncomingConnects = 0;
#  endif
  };

 public:
  // Forward control operations to the Impl<T>.
  /*
   * Connect aCanonical to this Mirror. Note that this requires one thread hop
   * back to aCanonical's owning thread before the canonical value starts
   * being mirrored, and another to our owning thread before the connection is
   * established.
   *
   * Note that this mirror-initialized connection could race with
   * Canonical::DisconnectAll(). It is up to the caller to provide the guarantee
   * that disconnection happens after the connection has been established. There
   * is no race between this and DisconnectIfConnected().
   */
  void Connect(AbstractCanonical<T>* aCanonical) { mImpl->Connect(aCanonical); }
  void DisconnectIfConnected() { mImpl->DisconnectIfConnected(); }

  // Access to the Impl<T>.
  operator Impl&() { return *mImpl; }
  Impl* operator&() { return mImpl; }

  // Access to the T.
  const T& Ref() const { return *mImpl; }
  operator const T&() const { return Ref(); }

 private:
  RefPtr<Impl> mImpl;
};

#  undef MIRROR_LOG

}  // namespace mozilla

#endif