/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef mozilla_MultiWriterQueue_h_ #define mozilla_MultiWriterQueue_h_ #include #include #include "RollingNumber.h" #include "mozilla/Atomics.h" #include "mozilla/MemoryReporting.h" #include "mozilla/Mutex.h" #include "prthread.h" namespace mozilla { // Default reader locking strategy, using a mutex to ensure that concurrent // PopAll calls won't overlap. class MultiWriterQueueReaderLocking_Mutex { public: MultiWriterQueueReaderLocking_Mutex() : mMutex("MultiWriterQueueReaderLocking_Mutex") {} void Lock() { mMutex.Lock(); }; void Unlock() { mMutex.Unlock(); }; private: Mutex mMutex; }; // Reader non-locking strategy, trusting that PopAll will never be called // concurrently (e.g., by only calling it from a specific thread). class MultiWriterQueueReaderLocking_None { public: #ifndef DEBUG void Lock(){}; void Unlock(){}; #else // DEBUG-mode checks to catch concurrent misuses. void Lock() { MOZ_ASSERT(mLocked.compareExchange(false, true)); }; void Unlock() { MOZ_ASSERT(mLocked.compareExchange(true, false)); }; private: Atomic mLocked{false}; #endif }; static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192; // Multi-writer, single-reader queue of elements of type `T`. // Elements are bunched together in buffers of `BufferSize` elements. // // This queue is heavily optimized for pushing. In most cases pushes will only // cost a couple of atomic reads and a few non-atomic reads. Worst cases: // - Once per buffer, a push will allocate or reuse a buffer for later pushes; // - During the above new-buffer push, other pushes will be blocked. // // By default, popping is protected by mutex; it may be disabled if popping is // guaranteed never to be concurrent. // In any case, popping will never negatively impact pushes. // (However, *not* popping will add runtime costs, as unread buffers will not // be freed, or made available to future pushes; Push functions provide // feedback as to when popping would be most efficient.) template class MultiWriterQueue { static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer"); public: // Constructor. // Allocates the initial buffer that will receive the first `BufferSize` // elements. Also allocates one reusable buffer, which will definitely be // needed after the first `BufferSize` elements have been pushed. // Ideally (if the reader can process each buffer quickly enough), there // won't be a need for more buffer allocations. MultiWriterQueue() : mBuffersCoverAtLeastUpTo(BufferSize - 1), mMostRecentBuffer(new Buffer{}), mReusableBuffers(new Buffer{}), mOldestBuffer(static_cast(mMostRecentBuffer)), mLiveBuffersStats(1), mReusableBuffersStats(1), mAllocatedBuffersStats(2) {} ~MultiWriterQueue() { auto DestroyList = [](Buffer* aBuffer) { while (aBuffer) { Buffer* older = aBuffer->Older(); delete aBuffer; aBuffer = older; } }; DestroyList(mMostRecentBuffer); DestroyList(mReusableBuffers); } // We need the index to be order-resistant to overflow, i.e., numbers before // an overflow should test smaller-than numbers after the overflow. // This is because we keep pushing elements with increasing Index, and this // Index is used to find the appropriate buffer based on a range; and this // need to work smoothly when crossing the overflow boundary. using Index = RollingNumber; // Pushes indicate whether they have just reached the end of a buffer. using DidReachEndOfBuffer = bool; // Push new element and call aF on it. // Element may be in just-created state, or recycled after a PopAll call. // Atomically thread-safe; in the worst case some pushes may be blocked // while a new buffer is created/reused for them. // Returns whether that push reached the end of a buffer; useful if caller // wants to trigger processing regularly at the most efficient time. template DidReachEndOfBuffer PushF(F&& aF) { // Atomically claim ownership of the next available element. const Index index{mNextElementToWrite++}; // And now go and set that element. for (;;) { Index lastIndex{mBuffersCoverAtLeastUpTo}; if (MOZ_UNLIKELY(index == lastIndex)) { // We have claimed the last element in the current head -> Allocate a // new head in advance of more pushes. Make it point at the current // most-recent buffer. // This whole process is effectively guarded: // - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to // one that can accept their claimed index. // - Readers will stop until the last element is marked as valid. Buffer* ourBuffer = mMostRecentBuffer; Buffer* newBuffer = NewBuffer(ourBuffer, index + 1); // Because we have claimed this very specific index, we should be the // only one touching the most-recent buffer pointer. MOZ_ASSERT(mMostRecentBuffer == ourBuffer); // Just pivot the most-recent buffer pointer to our new buffer. mMostRecentBuffer = newBuffer; // Because we have claimed this very specific index, we should be the // only one touching the buffer coverage watermark. MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value()); // Update it to include the just-added most-recent buffer. mBuffersCoverAtLeastUpTo = index.Value() + BufferSize; // We know for sure that `ourBuffer` is the correct one for this index. ourBuffer->SetAndValidateElement(aF, index); // And indicate that we have reached the end of a buffer. return true; } if (MOZ_UNLIKELY(index > lastIndex)) { // We have claimed an element in a yet-unavailable buffer, wait for our // target buffer to be created (see above). while (Index(mBuffersCoverAtLeastUpTo) < index) { PR_Sleep(PR_INTERVAL_NO_WAIT); // Yield } // Then loop to examine the new situation. continue; } // Here, we have claimed a number that is covered by current buffers. // These buffers cannot be destroyed, because our buffer is not filled // yet (we haven't written in it yet), therefore the reader thread will // have to stop there (or before) and won't destroy our buffer or more // recent ones. MOZ_ASSERT(index < lastIndex); Buffer* ourBuffer = mMostRecentBuffer; // In rare situations, another thread may have had the time to create a // new more-recent buffer, in which case we need to find our older buffer. while (MOZ_UNLIKELY(index < ourBuffer->Origin())) { // We assume that older buffers with still-invalid elements (e.g., the // one we have just claimed) cannot be destroyed. MOZ_ASSERT(ourBuffer->Older()); ourBuffer = ourBuffer->Older(); } // Now we can set&validate the claimed element, and indicate that we have // not reached the end of a buffer. ourBuffer->SetAndValidateElement(aF, index); return false; } } // Push new element and assign it a value. // Atomically thread-safe; in the worst case some pushes may be blocked // while a new buffer is created/reused for them. // Returns whether that push reached the end of a buffer; useful if caller // wants to trigger processing regularly at the most efficient time. DidReachEndOfBuffer Push(const T& aT) { return PushF([&aT](T& aElement, Index) { aElement = aT; }); } // Push new element and move-assign it a value. // Atomically thread-safe; in the worst case some pushes may be blocked // while a new buffer is created/reused for them. // Returns whether that push reached the end of a buffer; useful if caller // wants to trigger processing regularly at the most efficient time. DidReachEndOfBuffer Push(T&& aT) { return PushF([&aT](T& aElement, Index) { aElement = std::move(aT); }); } // Pop all elements before the first invalid one, running aF on each of them // in FIFO order. // Thread-safety with other PopAll calls is controlled by the `Locking` // template argument. // Concurrent pushes are always allowed, because: // - PopAll won't read elements until valid, // - Pushes do not interfere with pop-related members -- except for // mReusableBuffers, which is accessed atomically. template void PopAll(F&& aF) { mReaderLocking.Lock(); // Destroy every second fully-read buffer. // TODO: Research a better algorithm, probably based on stats. bool destroy = false; for (;;) { Buffer* b = mOldestBuffer; MOZ_ASSERT(!b->Older()); // The next element to pop must be in that oldest buffer. MOZ_ASSERT(mNextElementToPop >= b->Origin()); MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize); // Start reading each element. if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) { // Found an invalid element, stop popping. mReaderLocking.Unlock(); return; } // Reached the end of this oldest buffer MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize); // Delete this oldest buffer. // Since the last element was valid, it must mean that there is a newer // buffer. MOZ_ASSERT(b->Newer()); MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin()); StopUsing(b, destroy); destroy = !destroy; // We will loop and start reading the now-oldest buffer. } } // Size of all buffers (used, or recyclable), excluding external data. size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const { return mAllocatedBuffersStats.Count() * sizeof(Buffer); } struct CountAndWatermark { int mCount; int mWatermark; }; CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); } CountAndWatermark ReusableBuffersStats() const { return mReusableBuffersStats.Get(); } CountAndWatermark AllocatedBuffersStats() const { return mAllocatedBuffersStats.Get(); } private: // Structure containing the element to be stored, and a validity-marker. class BufferedElement { public: // Run aF on an invalid element, and mark it as valid. template void SetAndValidate(F&& aF, Index aIndex) { MOZ_ASSERT(!mValid); aF(mT, aIndex); mValid = true; } // Run aF on a valid element and mark it as invalid, return true. // Return false if element was invalid. template bool ReadAndInvalidate(F&& aF) { if (!mValid) { return false; } aF(mT); mValid = false; return true; } private: T mT; // mValid should be atomically changed to true *after* mT has been written, // so that the reader can only see valid data. // ReleaseAcquire, because when set to `true`, we want the just-written mT // to be visible to the thread reading this `true`; and when set to `false`, // we want the previous reads to have completed. Atomic mValid{false}; }; // Buffer contains a sequence of BufferedElements starting at a specific // index, and it points to the next-older buffer (if any). class Buffer { public: // Constructor of the very first buffer. Buffer() : mOlder(nullptr), mNewer(nullptr), mOrigin(0) {} // Constructor of later buffers. Buffer(Buffer* aOlder, Index aOrigin) : mOlder(aOlder), mNewer(nullptr), mOrigin(aOrigin) { MOZ_ASSERT(aOlder); aOlder->mNewer = this; } Buffer* Older() const { return mOlder; } void SetOlder(Buffer* aOlder) { mOlder = aOlder; } Buffer* Newer() const { return mNewer; } void SetNewer(Buffer* aNewer) { mNewer = aNewer; } Index Origin() const { return mOrigin; } void SetOrigin(Index aOrigin) { mOrigin = aOrigin; } // Run aF on a yet-invalid element. // Not thread-safe by itself, but nothing else should write this element, // and reader won't access it until after it becomes valid. template void SetAndValidateElement(F&& aF, Index aIndex) { MOZ_ASSERT(aIndex >= Origin()); MOZ_ASSERT(aIndex < Origin() + BufferSize); mElements[aIndex - Origin()].SetAndValidate(aF, aIndex); } using DidReadLastElement = bool; // Read all valid elements starting at aIndex, marking them invalid and // updating aIndex. // Returns true if we ended up reading the last element in this buffer. // Accessing the validity bit is thread-safe (as it's atomic), but once // an element is valid, the reading itself is not thread-safe and should be // guarded. template DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex) { MOZ_ASSERT(aIndex >= Origin()); MOZ_ASSERT(aIndex < Origin() + BufferSize); for (; aIndex < Origin() + BufferSize; ++aIndex) { if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) { // Found an invalid element, stop here. (aIndex will not be updated // past it, so we will start from here next time.) return false; } } return true; } private: Buffer* mOlder; Buffer* mNewer; Index mOrigin; BufferedElement mElements[BufferSize]; }; // Reuse a buffer, or create a new one. // All buffered elements will be invalid. Buffer* NewBuffer(Buffer* aOlder, Index aOrigin) { MOZ_ASSERT(aOlder); for (;;) { Buffer* head = mReusableBuffers; if (!head) { ++mAllocatedBuffersStats; ++mLiveBuffersStats; Buffer* buffer = new Buffer(aOlder, aOrigin); return buffer; } Buffer* older = head->Older(); // Try to pivot the reusable-buffer pointer from the current head to the // next buffer in line. if (mReusableBuffers.compareExchange(head, older)) { // Success! The reusable-buffer pointer now points at the older buffer, // so we can recycle this ex-head. --mReusableBuffersStats; ++mLiveBuffersStats; head->SetOlder(aOlder); aOlder->SetNewer(head); // We will be the newest; newer-pointer should already be null. MOZ_ASSERT(!head->Newer()); head->SetOrigin(aOrigin); return head; } // Failure, someone else must have touched the list, loop to try again. } } // Discard a fully-read buffer. // If aDestroy is true, delete it. // If aDestroy is false, move the buffer to a reusable-buffer stack. void StopUsing(Buffer* aBuffer, bool aDestroy) { --mLiveBuffersStats; // We should only stop using the oldest buffer. MOZ_ASSERT(!aBuffer->Older()); // The newest buffer should not be modified here. MOZ_ASSERT(aBuffer->Newer()); MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer); // Detach from the second-oldest buffer. aBuffer->Newer()->SetOlder(nullptr); // Make the second-oldest buffer the now-oldest buffer. mOldestBuffer = aBuffer->Newer(); if (aDestroy) { --mAllocatedBuffersStats; delete aBuffer; } else { ++mReusableBuffersStats; // The recycling stack only uses mOlder; mNewer is not needed. aBuffer->SetNewer(nullptr); // Make the given buffer the new head of reusable buffers. for (;;) { Buffer* head = mReusableBuffers; aBuffer->SetOlder(head); if (mReusableBuffers.compareExchange(head, aBuffer)) { break; } } } } // Index of the next element to write. Modified when an element index is // claimed for a push. If the last element of a buffer is claimed, that push // will be responsible for adding a new head buffer. // Relaxed, because there is no synchronization based on this variable, each // thread just needs to get a different value, and will then write different // things (which themselves have some atomic validation before they may be // read elsewhere, independent of this `mNextElementToWrite`.) Atomic mNextElementToWrite{0}; // Index that a live recent buffer reaches. If a push claims a lesser-or- // equal number, the corresponding buffer is guaranteed to still be alive: // - It will have been created before this index was updated, // - It will not be destroyed until all its values have been written, // including the one that just claimed a position within it. // Also, the push that claims this exact number is responsible for adding the // next buffer and updating this value accordingly. // ReleaseAcquire, because when set to a certain value, the just-created // buffer covering the new range must be visible to readers. Atomic mBuffersCoverAtLeastUpTo; // Pointer to the most recent buffer. Never null. // This is the most recent of a deque of yet-unread buffers. // Only modified when adding a new head buffer. // ReleaseAcquire, because when modified, the just-created new buffer must be // visible to readers. Atomic mMostRecentBuffer; // Stack of reusable buffers. // ReleaseAcquire, because when modified, the just-added buffer must be // visible to readers. Atomic mReusableBuffers; // Template-provided locking mechanism to protect PopAll()-only member // variables below. ReaderLocking mReaderLocking; // Pointer to the oldest buffer, which contains the new element to be popped. // Never null. Buffer* mOldestBuffer; // Index of the next element to be popped. Index mNextElementToPop{0}; // Stats. class AtomicCountAndWatermark { public: explicit AtomicCountAndWatermark(int aCount) : mCount(aCount), mWatermark(aCount) {} int Count() const { return int(mCount); } CountAndWatermark Get() const { return CountAndWatermark{int(mCount), int(mWatermark)}; } int operator++() { int count = int(++mCount); // Update watermark. for (;;) { int watermark = int(mWatermark); if (watermark >= count) { // printf("++[%p] -=> %d-%d\n", this, count, watermark); break; } if (mWatermark.compareExchange(watermark, count)) { // printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark, // count); break; } } return count; } int operator--() { int count = int(--mCount); // printf("--[%p] -> %d\n", this, count); return count; } private: // Relaxed, as these are just gathering stats, so consistency is not // critical. Atomic mCount; Atomic mWatermark; }; // All buffers in the mMostRecentBuffer deque. AtomicCountAndWatermark mLiveBuffersStats; // All buffers in the mReusableBuffers stack. AtomicCountAndWatermark mReusableBuffersStats; // All allocated buffers (sum of above). AtomicCountAndWatermark mAllocatedBuffersStats; }; } // namespace mozilla #endif // mozilla_MultiWriterQueue_h_