summaryrefslogtreecommitdiffstats
path: root/dom/media/doctor/MultiWriterQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'dom/media/doctor/MultiWriterQueue.h')
-rw-r--r--dom/media/doctor/MultiWriterQueue.h523
1 files changed, 523 insertions, 0 deletions
diff --git a/dom/media/doctor/MultiWriterQueue.h b/dom/media/doctor/MultiWriterQueue.h
new file mode 100644
index 0000000000..b19c0039ba
--- /dev/null
+++ b/dom/media/doctor/MultiWriterQueue.h
@@ -0,0 +1,523 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_MultiWriterQueue_h_
+#define mozilla_MultiWriterQueue_h_
+
+#include <cstdint>
+#include <utility>
+
+#include "RollingNumber.h"
+#include "mozilla/Atomics.h"
+#include "mozilla/MemoryReporting.h"
+#include "mozilla/Mutex.h"
+#include "prthread.h"
+
+namespace mozilla {
+
+// Default reader locking strategy, using a mutex to ensure that concurrent
+// PopAll calls won't overlap.
+class MOZ_CAPABILITY("mutex") MultiWriterQueueReaderLocking_Mutex {
+ public:
+ MultiWriterQueueReaderLocking_Mutex()
+ : mMutex("MultiWriterQueueReaderLocking_Mutex") {}
+ void Lock() MOZ_CAPABILITY_ACQUIRE(mMutex) { mMutex.Lock(); };
+ void Unlock() MOZ_CAPABILITY_RELEASE(mMutex) { mMutex.Unlock(); };
+
+ private:
+ Mutex mMutex;
+};
+
+// Reader non-locking strategy, trusting that PopAll will never be called
+// concurrently (e.g., by only calling it from a specific thread).
+class MOZ_CAPABILITY("dummy lock") MultiWriterQueueReaderLocking_None {
+ public:
+#ifndef DEBUG
+ void Lock() MOZ_CAPABILITY_ACQUIRE(){};
+ void Unlock() MOZ_CAPABILITY_RELEASE(){};
+#else
+ // DEBUG-mode checks to catch concurrent misuses.
+ void Lock() MOZ_CAPABILITY_ACQUIRE() {
+ MOZ_ASSERT(mLocked.compareExchange(false, true));
+ };
+ void Unlock() MOZ_CAPABILITY_RELEASE() {
+ MOZ_ASSERT(mLocked.compareExchange(true, false));
+ };
+
+ private:
+ Atomic<bool> mLocked{false};
+#endif
+};
+
+static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192;
+
+// Multi-writer, single-reader queue of elements of type `T`.
+// Elements are bunched together in buffers of `BufferSize` elements.
+//
+// This queue is heavily optimized for pushing. In most cases pushes will only
+// cost a couple of atomic reads and a few non-atomic reads. Worst cases:
+// - Once per buffer, a push will allocate or reuse a buffer for later pushes;
+// - During the above new-buffer push, other pushes will be blocked.
+//
+// By default, popping is protected by mutex; it may be disabled if popping is
+// guaranteed never to be concurrent.
+// In any case, popping will never negatively impact pushes.
+// (However, *not* popping will add runtime costs, as unread buffers will not
+// be freed, or made available to future pushes; Push functions provide
+// feedback as to when popping would be most efficient.)
+template <typename T, uint32_t BufferSize = MultiWriterQueueDefaultBufferSize,
+ typename ReaderLocking = MultiWriterQueueReaderLocking_Mutex>
+class MultiWriterQueue {
+ static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer");
+
+ public:
+ // Constructor.
+ // Allocates the initial buffer that will receive the first `BufferSize`
+ // elements. Also allocates one reusable buffer, which will definitely be
+ // needed after the first `BufferSize` elements have been pushed.
+ // Ideally (if the reader can process each buffer quickly enough), there
+ // won't be a need for more buffer allocations.
+ MultiWriterQueue()
+ : mBuffersCoverAtLeastUpTo(BufferSize - 1),
+ mMostRecentBuffer(new Buffer{}),
+ mReusableBuffers(new Buffer{}),
+ mOldestBuffer(static_cast<Buffer*>(mMostRecentBuffer)),
+ mLiveBuffersStats(1),
+ mReusableBuffersStats(1),
+ mAllocatedBuffersStats(2) {}
+
+ ~MultiWriterQueue() {
+ auto DestroyList = [](Buffer* aBuffer) {
+ while (aBuffer) {
+ Buffer* older = aBuffer->Older();
+ delete aBuffer;
+ aBuffer = older;
+ }
+ };
+ DestroyList(mMostRecentBuffer);
+ DestroyList(mReusableBuffers);
+ }
+
+ // We need the index to be order-resistant to overflow, i.e., numbers before
+ // an overflow should test smaller-than numbers after the overflow.
+ // This is because we keep pushing elements with increasing Index, and this
+ // Index is used to find the appropriate buffer based on a range; and this
+ // need to work smoothly when crossing the overflow boundary.
+ using Index = RollingNumber<uint32_t>;
+
+ // Pushes indicate whether they have just reached the end of a buffer.
+ using DidReachEndOfBuffer = bool;
+
+ // Push new element and call aF on it.
+ // Element may be in just-created state, or recycled after a PopAll call.
+ // Atomically thread-safe; in the worst case some pushes may be blocked
+ // while a new buffer is created/reused for them.
+ // Returns whether that push reached the end of a buffer; useful if caller
+ // wants to trigger processing regularly at the most efficient time.
+ template <typename F>
+ DidReachEndOfBuffer PushF(F&& aF) {
+ // Atomically claim ownership of the next available element.
+ const Index index{mNextElementToWrite++};
+ // And now go and set that element.
+ for (;;) {
+ Index lastIndex{mBuffersCoverAtLeastUpTo};
+
+ if (MOZ_UNLIKELY(index == lastIndex)) {
+ // We have claimed the last element in the current head -> Allocate a
+ // new head in advance of more pushes. Make it point at the current
+ // most-recent buffer.
+ // This whole process is effectively guarded:
+ // - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to
+ // one that can accept their claimed index.
+ // - Readers will stop until the last element is marked as valid.
+ Buffer* ourBuffer = mMostRecentBuffer;
+ Buffer* newBuffer = NewBuffer(ourBuffer, index + 1);
+ // Because we have claimed this very specific index, we should be the
+ // only one touching the most-recent buffer pointer.
+ MOZ_ASSERT(mMostRecentBuffer == ourBuffer);
+ // Just pivot the most-recent buffer pointer to our new buffer.
+ mMostRecentBuffer = newBuffer;
+ // Because we have claimed this very specific index, we should be the
+ // only one touching the buffer coverage watermark.
+ MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value());
+ // Update it to include the just-added most-recent buffer.
+ mBuffersCoverAtLeastUpTo = index.Value() + BufferSize;
+ // We know for sure that `ourBuffer` is the correct one for this index.
+ ourBuffer->SetAndValidateElement(aF, index);
+ // And indicate that we have reached the end of a buffer.
+ return true;
+ }
+
+ if (MOZ_UNLIKELY(index > lastIndex)) {
+ // We have claimed an element in a yet-unavailable buffer, wait for our
+ // target buffer to be created (see above).
+ while (Index(mBuffersCoverAtLeastUpTo) < index) {
+ PR_Sleep(PR_INTERVAL_NO_WAIT); // Yield
+ }
+ // Then loop to examine the new situation.
+ continue;
+ }
+
+ // Here, we have claimed a number that is covered by current buffers.
+ // These buffers cannot be destroyed, because our buffer is not filled
+ // yet (we haven't written in it yet), therefore the reader thread will
+ // have to stop there (or before) and won't destroy our buffer or more
+ // recent ones.
+ MOZ_ASSERT(index < lastIndex);
+ Buffer* ourBuffer = mMostRecentBuffer;
+
+ // In rare situations, another thread may have had the time to create a
+ // new more-recent buffer, in which case we need to find our older buffer.
+ while (MOZ_UNLIKELY(index < ourBuffer->Origin())) {
+ // We assume that older buffers with still-invalid elements (e.g., the
+ // one we have just claimed) cannot be destroyed.
+ MOZ_ASSERT(ourBuffer->Older());
+ ourBuffer = ourBuffer->Older();
+ }
+
+ // Now we can set&validate the claimed element, and indicate that we have
+ // not reached the end of a buffer.
+ ourBuffer->SetAndValidateElement(aF, index);
+ return false;
+ }
+ }
+
+ // Push new element and assign it a value.
+ // Atomically thread-safe; in the worst case some pushes may be blocked
+ // while a new buffer is created/reused for them.
+ // Returns whether that push reached the end of a buffer; useful if caller
+ // wants to trigger processing regularly at the most efficient time.
+ DidReachEndOfBuffer Push(const T& aT) {
+ return PushF([&aT](T& aElement, Index) { aElement = aT; });
+ }
+
+ // Push new element and move-assign it a value.
+ // Atomically thread-safe; in the worst case some pushes may be blocked
+ // while a new buffer is created/reused for them.
+ // Returns whether that push reached the end of a buffer; useful if caller
+ // wants to trigger processing regularly at the most efficient time.
+ DidReachEndOfBuffer Push(T&& aT) {
+ return PushF([&aT](T& aElement, Index) { aElement = std::move(aT); });
+ }
+
+ // Pop all elements before the first invalid one, running aF on each of them
+ // in FIFO order.
+ // Thread-safety with other PopAll calls is controlled by the `Locking`
+ // template argument.
+ // Concurrent pushes are always allowed, because:
+ // - PopAll won't read elements until valid,
+ // - Pushes do not interfere with pop-related members -- except for
+ // mReusableBuffers, which is accessed atomically.
+ template <typename F>
+ void PopAll(F&& aF) {
+ mReaderLocking.Lock();
+ // Destroy every second fully-read buffer.
+ // TODO: Research a better algorithm, probably based on stats.
+ bool destroy = false;
+ for (;;) {
+ Buffer* b = mOldestBuffer;
+ MOZ_ASSERT(!b->Older());
+ // The next element to pop must be in that oldest buffer.
+ MOZ_ASSERT(mNextElementToPop >= b->Origin());
+ MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize);
+
+ // Start reading each element.
+ if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) {
+ // Found an invalid element, stop popping.
+ mReaderLocking.Unlock();
+ return;
+ }
+
+ // Reached the end of this oldest buffer
+ MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize);
+ // Delete this oldest buffer.
+ // Since the last element was valid, it must mean that there is a newer
+ // buffer.
+ MOZ_ASSERT(b->Newer());
+ MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin());
+ StopUsing(b, destroy);
+ destroy = !destroy;
+
+ // We will loop and start reading the now-oldest buffer.
+ }
+ }
+
+ // Size of all buffers (used, or recyclable), excluding external data.
+ size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const {
+ return mAllocatedBuffersStats.Count() * sizeof(Buffer);
+ }
+
+ struct CountAndWatermark {
+ int mCount;
+ int mWatermark;
+ };
+
+ CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); }
+ CountAndWatermark ReusableBuffersStats() const {
+ return mReusableBuffersStats.Get();
+ }
+ CountAndWatermark AllocatedBuffersStats() const {
+ return mAllocatedBuffersStats.Get();
+ }
+
+ private:
+ // Structure containing the element to be stored, and a validity-marker.
+ class BufferedElement {
+ public:
+ // Run aF on an invalid element, and mark it as valid.
+ template <typename F>
+ void SetAndValidate(F&& aF, Index aIndex) {
+ MOZ_ASSERT(!mValid);
+ aF(mT, aIndex);
+ mValid = true;
+ }
+
+ // Run aF on a valid element and mark it as invalid, return true.
+ // Return false if element was invalid.
+ template <typename F>
+ bool ReadAndInvalidate(F&& aF) {
+ if (!mValid) {
+ return false;
+ }
+ aF(mT);
+ mValid = false;
+ return true;
+ }
+
+ private:
+ T mT;
+ // mValid should be atomically changed to true *after* mT has been written,
+ // so that the reader can only see valid data.
+ // ReleaseAcquire, because when set to `true`, we want the just-written mT
+ // to be visible to the thread reading this `true`; and when set to `false`,
+ // we want the previous reads to have completed.
+ Atomic<bool, ReleaseAcquire> mValid{false};
+ };
+
+ // Buffer contains a sequence of BufferedElements starting at a specific
+ // index, and it points to the next-older buffer (if any).
+ class Buffer {
+ public:
+ // Constructor of the very first buffer.
+ Buffer() : mOlder(nullptr), mNewer(nullptr), mOrigin(0) {}
+
+ // Constructor of later buffers.
+ Buffer(Buffer* aOlder, Index aOrigin)
+ : mOlder(aOlder), mNewer(nullptr), mOrigin(aOrigin) {
+ MOZ_ASSERT(aOlder);
+ aOlder->mNewer = this;
+ }
+
+ Buffer* Older() const { return mOlder; }
+ void SetOlder(Buffer* aOlder) { mOlder = aOlder; }
+
+ Buffer* Newer() const { return mNewer; }
+ void SetNewer(Buffer* aNewer) { mNewer = aNewer; }
+
+ Index Origin() const { return mOrigin; }
+ void SetOrigin(Index aOrigin) { mOrigin = aOrigin; }
+
+ // Run aF on a yet-invalid element.
+ // Not thread-safe by itself, but nothing else should write this element,
+ // and reader won't access it until after it becomes valid.
+ template <typename F>
+ void SetAndValidateElement(F&& aF, Index aIndex) {
+ MOZ_ASSERT(aIndex >= Origin());
+ MOZ_ASSERT(aIndex < Origin() + BufferSize);
+ mElements[aIndex - Origin()].SetAndValidate(aF, aIndex);
+ }
+
+ using DidReadLastElement = bool;
+
+ // Read all valid elements starting at aIndex, marking them invalid and
+ // updating aIndex.
+ // Returns true if we ended up reading the last element in this buffer.
+ // Accessing the validity bit is thread-safe (as it's atomic), but once
+ // an element is valid, the reading itself is not thread-safe and should be
+ // guarded.
+ template <typename F>
+ DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex) {
+ MOZ_ASSERT(aIndex >= Origin());
+ MOZ_ASSERT(aIndex < Origin() + BufferSize);
+ for (; aIndex < Origin() + BufferSize; ++aIndex) {
+ if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) {
+ // Found an invalid element, stop here. (aIndex will not be updated
+ // past it, so we will start from here next time.)
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ Buffer* mOlder;
+ Buffer* mNewer;
+ Index mOrigin;
+ BufferedElement mElements[BufferSize];
+ };
+
+ // Reuse a buffer, or create a new one.
+ // All buffered elements will be invalid.
+ Buffer* NewBuffer(Buffer* aOlder, Index aOrigin) {
+ MOZ_ASSERT(aOlder);
+ for (;;) {
+ Buffer* head = mReusableBuffers;
+ if (!head) {
+ ++mAllocatedBuffersStats;
+ ++mLiveBuffersStats;
+ Buffer* buffer = new Buffer(aOlder, aOrigin);
+ return buffer;
+ }
+ Buffer* older = head->Older();
+ // Try to pivot the reusable-buffer pointer from the current head to the
+ // next buffer in line.
+ if (mReusableBuffers.compareExchange(head, older)) {
+ // Success! The reusable-buffer pointer now points at the older buffer,
+ // so we can recycle this ex-head.
+ --mReusableBuffersStats;
+ ++mLiveBuffersStats;
+ head->SetOlder(aOlder);
+ aOlder->SetNewer(head);
+ // We will be the newest; newer-pointer should already be null.
+ MOZ_ASSERT(!head->Newer());
+ head->SetOrigin(aOrigin);
+ return head;
+ }
+ // Failure, someone else must have touched the list, loop to try again.
+ }
+ }
+
+ // Discard a fully-read buffer.
+ // If aDestroy is true, delete it.
+ // If aDestroy is false, move the buffer to a reusable-buffer stack.
+ void StopUsing(Buffer* aBuffer, bool aDestroy) {
+ --mLiveBuffersStats;
+
+ // We should only stop using the oldest buffer.
+ MOZ_ASSERT(!aBuffer->Older());
+ // The newest buffer should not be modified here.
+ MOZ_ASSERT(aBuffer->Newer());
+ MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer);
+ // Detach from the second-oldest buffer.
+ aBuffer->Newer()->SetOlder(nullptr);
+ // Make the second-oldest buffer the now-oldest buffer.
+ mOldestBuffer = aBuffer->Newer();
+
+ if (aDestroy) {
+ --mAllocatedBuffersStats;
+ delete aBuffer;
+ } else {
+ ++mReusableBuffersStats;
+ // The recycling stack only uses mOlder; mNewer is not needed.
+ aBuffer->SetNewer(nullptr);
+
+ // Make the given buffer the new head of reusable buffers.
+ for (;;) {
+ Buffer* head = mReusableBuffers;
+ aBuffer->SetOlder(head);
+ if (mReusableBuffers.compareExchange(head, aBuffer)) {
+ break;
+ }
+ }
+ }
+ }
+
+ // Index of the next element to write. Modified when an element index is
+ // claimed for a push. If the last element of a buffer is claimed, that push
+ // will be responsible for adding a new head buffer.
+ // Relaxed, because there is no synchronization based on this variable, each
+ // thread just needs to get a different value, and will then write different
+ // things (which themselves have some atomic validation before they may be
+ // read elsewhere, independent of this `mNextElementToWrite`.)
+ Atomic<Index::ValueType, Relaxed> mNextElementToWrite{0};
+
+ // Index that a live recent buffer reaches. If a push claims a lesser-or-
+ // equal number, the corresponding buffer is guaranteed to still be alive:
+ // - It will have been created before this index was updated,
+ // - It will not be destroyed until all its values have been written,
+ // including the one that just claimed a position within it.
+ // Also, the push that claims this exact number is responsible for adding the
+ // next buffer and updating this value accordingly.
+ // ReleaseAcquire, because when set to a certain value, the just-created
+ // buffer covering the new range must be visible to readers.
+ Atomic<Index::ValueType, ReleaseAcquire> mBuffersCoverAtLeastUpTo;
+
+ // Pointer to the most recent buffer. Never null.
+ // This is the most recent of a deque of yet-unread buffers.
+ // Only modified when adding a new head buffer.
+ // ReleaseAcquire, because when modified, the just-created new buffer must be
+ // visible to readers.
+ Atomic<Buffer*, ReleaseAcquire> mMostRecentBuffer;
+
+ // Stack of reusable buffers.
+ // ReleaseAcquire, because when modified, the just-added buffer must be
+ // visible to readers.
+ Atomic<Buffer*, ReleaseAcquire> mReusableBuffers;
+
+ // Template-provided locking mechanism to protect PopAll()-only member
+ // variables below.
+ ReaderLocking mReaderLocking;
+
+ // Pointer to the oldest buffer, which contains the new element to be popped.
+ // Never null.
+ Buffer* mOldestBuffer;
+
+ // Index of the next element to be popped.
+ Index mNextElementToPop{0};
+
+ // Stats.
+ class AtomicCountAndWatermark {
+ public:
+ explicit AtomicCountAndWatermark(int aCount)
+ : mCount(aCount), mWatermark(aCount) {}
+
+ int Count() const { return int(mCount); }
+
+ CountAndWatermark Get() const {
+ return CountAndWatermark{int(mCount), int(mWatermark)};
+ }
+
+ int operator++() {
+ int count = int(++mCount);
+ // Update watermark.
+ for (;;) {
+ int watermark = int(mWatermark);
+ if (watermark >= count) {
+ // printf("++[%p] -=> %d-%d\n", this, count, watermark);
+ break;
+ }
+ if (mWatermark.compareExchange(watermark, count)) {
+ // printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark,
+ // count);
+ break;
+ }
+ }
+ return count;
+ }
+
+ int operator--() {
+ int count = int(--mCount);
+ // printf("--[%p] -> %d\n", this, count);
+ return count;
+ }
+
+ private:
+ // Relaxed, as these are just gathering stats, so consistency is not
+ // critical.
+ Atomic<int, Relaxed> mCount;
+ Atomic<int, Relaxed> mWatermark;
+ };
+ // All buffers in the mMostRecentBuffer deque.
+ AtomicCountAndWatermark mLiveBuffersStats;
+ // All buffers in the mReusableBuffers stack.
+ AtomicCountAndWatermark mReusableBuffersStats;
+ // All allocated buffers (sum of above).
+ AtomicCountAndWatermark mAllocatedBuffersStats;
+};
+
+} // namespace mozilla
+
+#endif // mozilla_MultiWriterQueue_h_