/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /* * Multiple Producer Single Consumer lock-free queue. * Allocation-free is guaranteed outside of the constructor. * * This is a direct C++ port from * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235 * with the exception we are using atomic uint64t to have 15 slots in the ring * buffer (Rust implem is 5 slots, we want a bit more). * */ #ifndef mozilla_MPSCQueue_h #define mozilla_MPSCQueue_h #include "mozilla/Assertions.h" #include "mozilla/Attributes.h" #include "mozilla/PodOperations.h" #include #include #include #include #include #include #include #include #include namespace mozilla { namespace detail { template ::value> struct MemoryOperations { /** * This allows either moving (if T supports it) or copying a number of * elements from a `aSource` pointer to a `aDestination` pointer. * If it is safe to do so and this call copies, this uses PodCopy. Otherwise, * constructors and destructors are called in a loop. */ static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount); }; template struct MemoryOperations { static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) { PodCopy(aDestination, aSource, aCount); } }; template struct MemoryOperations { static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) { std::move(aSource, aSource + aCount, aDestination); } }; } // namespace detail static const bool MPSC_DEBUG = false; static const size_t kMaxCapacity = 16; /** * This data structure allows producing data from several threads, and consuming * it on one thread, safely and without performing memory allocations or * locking. * * The role for the producers and the consumer must be constant, i.e., the * producer should always be on one thread and the consumer should always be on * another thread. * * Some words about the inner workings of this class: * - Capacity is fixed. Only one allocation is performed, in the constructor. * - Maximum capacity is 15 elements, with 0 being used to denote an empty set. * This is a hard limitation from encoding indexes within the atomic uint64_t. * - This is lock-free but not wait-free, it might spin a little until * compare/exchange succeeds. * - There is no guarantee of forward progression for individual threads. * - This should be safe to use from a signal handler context. */ template class MPSCRingBufferBase { public: explicit MPSCRingBufferBase(size_t aCapacity) : mFree(0), mOccupied(0), mCapacity(aCapacity + 1) { MOZ_RELEASE_ASSERT(aCapacity < kMaxCapacity); if constexpr (MPSC_DEBUG) { fprintf(stderr, "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64 ", mMask=0x%" PRIx64 " }\n", __PRETTY_FUNCTION__, this, mCapacity, mBits, mMask); } // Leave one empty space in the queue, used to distinguish an empty queue // from a full one, as in the SPSCQueue. // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126 for (uint64_t i = 1; i < StorageCapacity(); ++i) { MarkSlot(mFree, i); } // This should be the only allocation performed, thus it cannot be performed // in a restricted context (e.g., signal handler, real-time thread) mData = std::make_unique(Capacity()); std::atomic_thread_fence(std::memory_order_seq_cst); } /** * @brief Put an element in the queue. The caller MUST check the return value * and maybe loop to try again (or drop if acceptable). * * First it attempts to acuire a slot (storage index) that is known to be * non used. If that is not successfull then 0 is returned. If that is * successfull, the slot is ours (it has been exclusively acquired) and data * can be copied into the ring buffer at that index. * * @param aElement The element to put in the queue. * * @return 0 if insertion could not be performed, inserted index otherwise */ [[nodiscard]] int Send(T& aElement) { std::optional empty_idx = UnmarkSlot(mFree); if (empty_idx.has_value()) { detail::MemoryOperations::MoveOrCopy(&mData[*empty_idx - 1], &aElement, 1); MarkSlot(mOccupied, *empty_idx); return *empty_idx; } return 0; } /** * Retrieve one element from the ring buffer, and copy it to * `aElement`, if non-null. * * It attempts to acquire a slot from the list of used ones. If that is not * successfull, then 0 is returned. Once a slot has been exclusively acquired, * data is copied from it into the non-null pointer passed in parameter. * * @param aElement A pointer to a `T` where data will be copied. * * @return The index from which data was copied, 0 if there was nothing in the * ring buffer. */ [[nodiscard]] int Recv(T* aElement) { std::optional idx = UnmarkSlot(mOccupied); if (idx.has_value()) { if (aElement) { detail::MemoryOperations::MoveOrCopy(aElement, &mData[*idx - 1], 1); } MarkSlot(mFree, *idx); return *idx; } return 0; } size_t Capacity() const { return StorageCapacity() - 1; } private: /* * Get/Set manipulates the encoding within `aNumber` by storing the index as a * number and shifting it to the left (set) or right (get). * * Initial `aNumber` value is 0. * * Set() with first index value (1), we store the index on mBits and we shift * it to the left, e.g., as follows: * * aNumber=0b00000000000000000000000000000000000000000000000000000000000000 * aIndex=0 aValue=1 * aNumber=0b00000000000000000000000000000000000000000000000000000000000001 * aIndex=1 aValue=33 * aNumber=0b00000000000000000000000000000000000000000000000000000000100001 * aIndex=2 aValue=801 * aNumber=0b00000000000000000000000000000000000000000000000000001100100001 * aIndex=3 aValue=17185 * aNumber=0b00000000000000000000000000000000000000000000000100001100100001 * aIndex=4 aValue=344865 * aNumber=0b00000000000000000000000000000000000000000001010100001100100001 * aIndex=5 aValue=6636321 * aNumber=0b00000000000000000000000000000000000000011001010100001100100001 * aIndex=6 aValue=124076833 * aNumber=0b00000000000000000000000000000000000111011001010100001100100001 * aIndex=7 aValue=2271560481 * aNumber=0b00000000000000000000000000000010000111011001010100001100100001 * aIndex=8 aValue=40926266145 * aNumber=0b00000000000000000000000000100110000111011001010100001100100001 * aIndex=9 aValue=728121033505 * aNumber=0b00000000000000000000001010100110000111011001010100001100100001 * aIndex=10 aValue=12822748939041 * aNumber=0b00000000000000000010111010100110000111011001010100001100100001 * aIndex=11 aValue=223928981472033 * aNumber=0b00000000000000110010111010100110000111011001010100001100100001 * aIndex=12 aValue=3883103678710561 * aNumber=0b00000000001101110010111010100110000111011001010100001100100001 * aIndex=13 aValue=66933498461897505 * aNumber=0b00000011101101110010111010100110000111011001010100001100100001 * aIndex=14 aValue=1147797409030816545 */ [[nodiscard]] uint64_t Get(uint64_t aNumber, uint64_t aIndex) { return (aNumber >> (mBits * aIndex)) & mMask; } [[nodiscard]] uint64_t Set(uint64_t aNumber, uint64_t aIndex, uint64_t aValue) { return (aNumber & ~(mMask << (mBits * aIndex))) | (aValue << (mBits * aIndex)); } /* * Enqueue a value in the ring buffer at aIndex. * * Takes the current uint64_t value from the atomic and try to acquire a non * used slot in the ring buffer. If unsucessfull, 0 is returned, otherwise * compute the new atomic value that holds the new state of usage of the * slots, and use compare/exchange to perform lock-free synchronization: * compare/exchanges succeeds when the current value and the modified one are * equal, reflecting an acquired lock. If another thread was concurrent to * this one, then it would fail to that operation, and go into the next * iteration of the loop to read the new state value from the atomic, and * acquire a different slot. * * @param aSlotStatus a uint64_t atomic that is used to perform lock-free * thread exclusions * * @param aIndex the index where we want to enqueue. It should come from the * empty queue * */ void MarkSlot(std::atomic& aSlotStatus, uint64_t aIndex) { uint64_t current = aSlotStatus.load(std::memory_order_relaxed); do { // Attempts to find a slot that is available to enqueue, without // cross-thread synchronization auto empty = [&]() -> std::optional { for (uint64_t i = 0; i < Capacity(); ++i) { if (Get(current, i) == 0) { return i; } } return {}; }(); if (!empty.has_value()) { // Rust does expect() which would panic: // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62 // If there's no empty place, then it would be up to the caller to deal // with that MOZ_CRASH("No empty slot available"); } uint64_t modified = Set(current, *empty, aIndex); // This is where the lock-free synchronization happens ; if `current` // matches the content of `aSlotStatus`, then store `modified` in // aSlotStatus and succeeds. Upon success it means no other thread has // tried to change the same value at the same time, so the lock was safely // acquired. // // Upon failure, it means another thread tried at the same time to use the // same slot, so a new iteration of the loop needs to be executed to try // another slot. // // In case of success (`aSlotStatus`'s content is equal to `current`), we // require memory_order_release for the read-modify-write operation // because we want to make sure when acquiring a slot that any concurrent // thread performing a write had a chance to do it. // // In case of failure we require memory_order_relaxed for the load // operation because we dont need synchronization at that point. if (aSlotStatus.compare_exchange_weak(current, modified, std::memory_order_release, std::memory_order_relaxed)) { if constexpr (MPSC_DEBUG) { fprintf(stderr, "[enqueue] modified=0x%" PRIx64 " => index=%" PRIu64 "\n", modified, aIndex); } return; } } while (true); } /* * Dequeue a value from the ring buffer. * * Takes the current value from the uint64_t atomic and read the current index * out of it. If that index is 0 then we are facing a lack of slots and we * return, the caller MUST check this and deal with the situation. If the * index is non null we can try to acquire the matching slot in the ring * buffer thanks to the compare/exchange loop. When the compare/exchange call * succeeds, then the slot was acquired. * * @param aSlotStatus a uint64_t atomic that is used to perform lock-free * thread exclusions * */ [[nodiscard]] std::optional UnmarkSlot( std::atomic& aSlotStatus) { uint64_t current = aSlotStatus.load(std::memory_order_relaxed); do { uint64_t index = current & mMask; if (index == 0) { // Return a None // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77 // If we return None while dequeuing on mFree then we are full and the // caller needs to deal with that. return {}; } uint64_t modified = current >> mBits; // See the comment in MarkSlot for details // // In case of success (`aSlotStatus`'s content is equal to `current`), we // require memory_order_acquire for the read-modify-write operation // because we want to make sure when unmarking a slot that any concurrent // thread performing a read will see the value we are writing. // // In case of failure we require memory_order_relaxed for the load // operation because we dont need synchronization at that point. if (aSlotStatus.compare_exchange_weak(current, modified, std::memory_order_acquire, std::memory_order_relaxed)) { if constexpr (MPSC_DEBUG) { fprintf(stderr, "[dequeue] current=0x%" PRIx64 " => index=%" PRIu64 "\n", current, index); } return index; } } while (true); return {}; } // Return the number of elements we can store within the ring buffer, whereas // Capacity() will return the amount of elements in mData, including the 0 // value. [[nodiscard]] size_t StorageCapacity() const { return mCapacity; } // For the atomics below they are manipulated by Get()/Set(), and we are using // them to store the IDs of the ring buffer usage (empty/full). // // We use mBits bits to store an ID (so we are limited to 16 and 0 is // reserved) and append each of them to the atomics. // // A 0 value in one of those denotes we are full for the atomic, i.e., // mFree=0 means we are full and mOccupied=0 means we are empty. // Holds the IDs of the free slots in the ring buffer std::atomic mFree; // Holds the IDs of the occupied slots in the ring buffer std::atomic mOccupied; const size_t mCapacity; // The actual ring buffer std::unique_ptr mData; // How we are using the uint64_t atomic above to store the IDs of the ring // buffer. static const uint64_t mBits = 4; static const uint64_t mMask = 0b1111; }; /** * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from * several producers threads and one one consumer (that never changes role), * without explicit synchronization nor allocation (outside of the constructor). */ template using MPSCQueue = MPSCRingBufferBase; } // namespace mozilla #endif // mozilla_MPSCQueue_h