summaryrefslogtreecommitdiffstats
path: root/mfbt/MPSCQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'mfbt/MPSCQueue.h')
-rw-r--r--mfbt/MPSCQueue.h374
1 files changed, 374 insertions, 0 deletions
diff --git a/mfbt/MPSCQueue.h b/mfbt/MPSCQueue.h
new file mode 100644
index 0000000000..a413fd63bc
--- /dev/null
+++ b/mfbt/MPSCQueue.h
@@ -0,0 +1,374 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/*
+ * Multiple Producer Single Consumer lock-free queue.
+ * Allocation-free is guaranteed outside of the constructor.
+ *
+ * This is a direct C++ port from
+ * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235
+ * with the exception we are using atomic uint64t to have 15 slots in the ring
+ * buffer (Rust implem is 5 slots, we want a bit more).
+ * */
+
+#ifndef mozilla_MPSCQueue_h
+#define mozilla_MPSCQueue_h
+
+#include "mozilla/Assertions.h"
+#include "mozilla/Attributes.h"
+#include "mozilla/PodOperations.h"
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <limits>
+#include <memory>
+#include <thread>
+#include <type_traits>
+#include <optional>
+#include <inttypes.h>
+
+namespace mozilla {
+
+namespace detail {
+template <typename T, bool IsPod = std::is_trivial<T>::value>
+struct MemoryOperations {
+ /**
+ * This allows either moving (if T supports it) or copying a number of
+ * elements from a `aSource` pointer to a `aDestination` pointer.
+ * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
+ * constructors and destructors are called in a loop.
+ */
+ static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
+};
+
+template <typename T>
+struct MemoryOperations<T, true> {
+ static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
+ PodCopy(aDestination, aSource, aCount);
+ }
+};
+
+template <typename T>
+struct MemoryOperations<T, false> {
+ static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
+ std::move(aSource, aSource + aCount, aDestination);
+ }
+};
+} // namespace detail
+
+static const bool MPSC_DEBUG = false;
+
+static const size_t kMaxCapacity = 16;
+
+/**
+ * This data structure allows producing data from several threads, and consuming
+ * it on one thread, safely and without performing memory allocations or
+ * locking.
+ *
+ * The role for the producers and the consumer must be constant, i.e., the
+ * producer should always be on one thread and the consumer should always be on
+ * another thread.
+ *
+ * Some words about the inner workings of this class:
+ * - Capacity is fixed. Only one allocation is performed, in the constructor.
+ * - Maximum capacity is 15 elements, with 0 being used to denote an empty set.
+ * This is a hard limitation from encoding indexes within the atomic uint64_t.
+ * - This is lock-free but not wait-free, it might spin a little until
+ * compare/exchange succeeds.
+ * - There is no guarantee of forward progression for individual threads.
+ * - This should be safe to use from a signal handler context.
+ */
+template <typename T>
+class MPSCRingBufferBase {
+ public:
+ explicit MPSCRingBufferBase(size_t aCapacity)
+ : mFree(0), mOccupied(0), mCapacity(aCapacity + 1) {
+ MOZ_RELEASE_ASSERT(aCapacity < kMaxCapacity);
+
+ if constexpr (MPSC_DEBUG) {
+ fprintf(stderr,
+ "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64
+ ", mMask=0x%" PRIx64 " }\n",
+ __PRETTY_FUNCTION__, this, mCapacity, mBits, mMask);
+ }
+
+ // Leave one empty space in the queue, used to distinguish an empty queue
+ // from a full one, as in the SPSCQueue.
+ // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126
+ for (uint64_t i = 1; i < StorageCapacity(); ++i) {
+ MarkSlot(mFree, i);
+ }
+
+ // This should be the only allocation performed, thus it cannot be performed
+ // in a restricted context (e.g., signal handler, real-time thread)
+ mData = std::make_unique<T[]>(Capacity());
+
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ }
+
+ /**
+ * @brief Put an element in the queue. The caller MUST check the return value
+ * and maybe loop to try again (or drop if acceptable).
+ *
+ * First it attempts to acuire a slot (storage index) that is known to be
+ * non used. If that is not successfull then 0 is returned. If that is
+ * successfull, the slot is ours (it has been exclusively acquired) and data
+ * can be copied into the ring buffer at that index.
+ *
+ * @param aElement The element to put in the queue.
+ *
+ * @return 0 if insertion could not be performed, inserted index otherwise
+ */
+ [[nodiscard]] int Send(T& aElement) {
+ std::optional<uint64_t> empty_idx = UnmarkSlot(mFree);
+ if (empty_idx.has_value()) {
+ detail::MemoryOperations<T>::MoveOrCopy(&mData[*empty_idx - 1], &aElement,
+ 1);
+ MarkSlot(mOccupied, *empty_idx);
+ return *empty_idx;
+ }
+ return 0;
+ }
+
+ /**
+ * Retrieve one element from the ring buffer, and copy it to
+ * `aElement`, if non-null.
+ *
+ * It attempts to acquire a slot from the list of used ones. If that is not
+ * successfull, then 0 is returned. Once a slot has been exclusively acquired,
+ * data is copied from it into the non-null pointer passed in parameter.
+ *
+ * @param aElement A pointer to a `T` where data will be copied.
+ *
+ * @return The index from which data was copied, 0 if there was nothing in the
+ * ring buffer.
+ */
+ [[nodiscard]] int Recv(T* aElement) {
+ std::optional<uint64_t> idx = UnmarkSlot(mOccupied);
+ if (idx.has_value()) {
+ if (aElement) {
+ detail::MemoryOperations<T>::MoveOrCopy(aElement, &mData[*idx - 1], 1);
+ }
+ MarkSlot(mFree, *idx);
+ return *idx;
+ }
+ return 0;
+ }
+
+ size_t Capacity() const { return StorageCapacity() - 1; }
+
+ private:
+ /*
+ * Get/Set manipulates the encoding within `aNumber` by storing the index as a
+ * number and shifting it to the left (set) or right (get).
+ *
+ * Initial `aNumber` value is 0.
+ *
+ * Set() with first index value (1), we store the index on mBits and we shift
+ * it to the left, e.g., as follows:
+ *
+ * aNumber=0b00000000000000000000000000000000000000000000000000000000000000
+ * aIndex=0 aValue=1
+ * aNumber=0b00000000000000000000000000000000000000000000000000000000000001
+ * aIndex=1 aValue=33
+ * aNumber=0b00000000000000000000000000000000000000000000000000000000100001
+ * aIndex=2 aValue=801
+ * aNumber=0b00000000000000000000000000000000000000000000000000001100100001
+ * aIndex=3 aValue=17185
+ * aNumber=0b00000000000000000000000000000000000000000000000100001100100001
+ * aIndex=4 aValue=344865
+ * aNumber=0b00000000000000000000000000000000000000000001010100001100100001
+ * aIndex=5 aValue=6636321
+ * aNumber=0b00000000000000000000000000000000000000011001010100001100100001
+ * aIndex=6 aValue=124076833
+ * aNumber=0b00000000000000000000000000000000000111011001010100001100100001
+ * aIndex=7 aValue=2271560481
+ * aNumber=0b00000000000000000000000000000010000111011001010100001100100001
+ * aIndex=8 aValue=40926266145
+ * aNumber=0b00000000000000000000000000100110000111011001010100001100100001
+ * aIndex=9 aValue=728121033505
+ * aNumber=0b00000000000000000000001010100110000111011001010100001100100001
+ * aIndex=10 aValue=12822748939041
+ * aNumber=0b00000000000000000010111010100110000111011001010100001100100001
+ * aIndex=11 aValue=223928981472033
+ * aNumber=0b00000000000000110010111010100110000111011001010100001100100001
+ * aIndex=12 aValue=3883103678710561
+ * aNumber=0b00000000001101110010111010100110000111011001010100001100100001
+ * aIndex=13 aValue=66933498461897505
+ * aNumber=0b00000011101101110010111010100110000111011001010100001100100001
+ * aIndex=14 aValue=1147797409030816545
+ */
+ [[nodiscard]] uint64_t Get(uint64_t aNumber, uint64_t aIndex) {
+ return (aNumber >> (mBits * aIndex)) & mMask;
+ }
+
+ [[nodiscard]] uint64_t Set(uint64_t aNumber, uint64_t aIndex,
+ uint64_t aValue) {
+ return (aNumber & ~(mMask << (mBits * aIndex))) |
+ (aValue << (mBits * aIndex));
+ }
+
+ /*
+ * Enqueue a value in the ring buffer at aIndex.
+ *
+ * Takes the current uint64_t value from the atomic and try to acquire a non
+ * used slot in the ring buffer. If unsucessfull, 0 is returned, otherwise
+ * compute the new atomic value that holds the new state of usage of the
+ * slots, and use compare/exchange to perform lock-free synchronization:
+ * compare/exchanges succeeds when the current value and the modified one are
+ * equal, reflecting an acquired lock. If another thread was concurrent to
+ * this one, then it would fail to that operation, and go into the next
+ * iteration of the loop to read the new state value from the atomic, and
+ * acquire a different slot.
+ *
+ * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
+ * thread exclusions
+ *
+ * @param aIndex the index where we want to enqueue. It should come from the
+ * empty queue
+ * */
+ void MarkSlot(std::atomic<uint64_t>& aSlotStatus, uint64_t aIndex) {
+ uint64_t current = aSlotStatus.load(std::memory_order_relaxed);
+ do {
+ // Attempts to find a slot that is available to enqueue, without
+ // cross-thread synchronization
+ auto empty = [&]() -> std::optional<uint64_t> {
+ for (uint64_t i = 0; i < Capacity(); ++i) {
+ if (Get(current, i) == 0) {
+ return i;
+ }
+ }
+ return {};
+ }();
+ if (!empty.has_value()) {
+ // Rust does expect() which would panic:
+ // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62
+ // If there's no empty place, then it would be up to the caller to deal
+ // with that
+ MOZ_CRASH("No empty slot available");
+ }
+ uint64_t modified = Set(current, *empty, aIndex);
+ // This is where the lock-free synchronization happens ; if `current`
+ // matches the content of `aSlotStatus`, then store `modified` in
+ // aSlotStatus and succeeds. Upon success it means no other thread has
+ // tried to change the same value at the same time, so the lock was safely
+ // acquired.
+ //
+ // Upon failure, it means another thread tried at the same time to use the
+ // same slot, so a new iteration of the loop needs to be executed to try
+ // another slot.
+ //
+ // In case of success (`aSlotStatus`'s content is equal to `current`), we
+ // require memory_order_release for the read-modify-write operation
+ // because we want to make sure when acquiring a slot that any concurrent
+ // thread performing a write had a chance to do it.
+ //
+ // In case of failure we require memory_order_relaxed for the load
+ // operation because we dont need synchronization at that point.
+ if (aSlotStatus.compare_exchange_weak(current, modified,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ if constexpr (MPSC_DEBUG) {
+ fprintf(stderr,
+ "[enqueue] modified=0x%" PRIx64 " => index=%" PRIu64 "\n",
+ modified, aIndex);
+ }
+ return;
+ }
+ } while (true);
+ }
+
+ /*
+ * Dequeue a value from the ring buffer.
+ *
+ * Takes the current value from the uint64_t atomic and read the current index
+ * out of it. If that index is 0 then we are facing a lack of slots and we
+ * return, the caller MUST check this and deal with the situation. If the
+ * index is non null we can try to acquire the matching slot in the ring
+ * buffer thanks to the compare/exchange loop. When the compare/exchange call
+ * succeeds, then the slot was acquired.
+ *
+ * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
+ * thread exclusions
+ * */
+ [[nodiscard]] std::optional<uint64_t> UnmarkSlot(
+ std::atomic<uint64_t>& aSlotStatus) {
+ uint64_t current = aSlotStatus.load(std::memory_order_relaxed);
+ do {
+ uint64_t index = current & mMask;
+ if (index == 0) {
+ // Return a None
+ // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77
+ // If we return None while dequeuing on mFree then we are full and the
+ // caller needs to deal with that.
+ return {};
+ }
+ uint64_t modified = current >> mBits;
+ // See the comment in MarkSlot for details
+ //
+ // In case of success (`aSlotStatus`'s content is equal to `current`), we
+ // require memory_order_acquire for the read-modify-write operation
+ // because we want to make sure when unmarking a slot that any concurrent
+ // thread performing a read will see the value we are writing.
+ //
+ // In case of failure we require memory_order_relaxed for the load
+ // operation because we dont need synchronization at that point.
+ if (aSlotStatus.compare_exchange_weak(current, modified,
+ std::memory_order_acquire,
+ std::memory_order_relaxed)) {
+ if constexpr (MPSC_DEBUG) {
+ fprintf(stderr,
+ "[dequeue] current=0x%" PRIx64 " => index=%" PRIu64 "\n",
+ current, index);
+ }
+ return index;
+ }
+ } while (true);
+ return {};
+ }
+
+ // Return the number of elements we can store within the ring buffer, whereas
+ // Capacity() will return the amount of elements in mData, including the 0
+ // value.
+ [[nodiscard]] size_t StorageCapacity() const { return mCapacity; }
+
+ // For the atomics below they are manipulated by Get()/Set(), and we are using
+ // them to store the IDs of the ring buffer usage (empty/full).
+ //
+ // We use mBits bits to store an ID (so we are limited to 16 and 0 is
+ // reserved) and append each of them to the atomics.
+ //
+ // A 0 value in one of those denotes we are full for the atomic, i.e.,
+ // mFree=0 means we are full and mOccupied=0 means we are empty.
+
+ // Holds the IDs of the free slots in the ring buffer
+ std::atomic<uint64_t> mFree;
+
+ // Holds the IDs of the occupied slots in the ring buffer
+ std::atomic<uint64_t> mOccupied;
+
+ const size_t mCapacity;
+
+ // The actual ring buffer
+ std::unique_ptr<T[]> mData;
+
+ // How we are using the uint64_t atomic above to store the IDs of the ring
+ // buffer.
+ static const uint64_t mBits = 4;
+ static const uint64_t mMask = 0b1111;
+};
+
+/**
+ * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from
+ * several producers threads and one one consumer (that never changes role),
+ * without explicit synchronization nor allocation (outside of the constructor).
+ */
+template <typename T>
+using MPSCQueue = MPSCRingBufferBase<T>;
+
+} // namespace mozilla
+
+#endif // mozilla_MPSCQueue_h