summaryrefslogtreecommitdiffstats
path: root/mfbt/SPSCQueue.h
blob: bd4223d70ae14da05e0c09b119c26ae9ecf30dee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

/* Single producer single consumer lock-free and wait-free queue. */

#ifndef mozilla_LockFreeQueue_h
#define mozilla_LockFreeQueue_h

#include "mozilla/Assertions.h"
#include "mozilla/Attributes.h"
#include "mozilla/PodOperations.h"
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <limits>
#include <memory>
#include <thread>
#include <type_traits>

namespace mozilla {

namespace detail {
template <typename T, bool IsPod = std::is_trivial<T>::value>
struct MemoryOperations {
  /**
   * This allows zeroing (using memset) or default-constructing a number of
   * elements calling the constructors if necessary.
   */
  static void ConstructDefault(T* aDestination, size_t aCount);
  /**
   * This allows either moving (if T supports it) or copying a number of
   * elements from a `aSource` pointer to a `aDestination` pointer.
   * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
   * constructors and destructors are called in a loop.
   */
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
};

template <typename T>
struct MemoryOperations<T, true> {
  static void ConstructDefault(T* aDestination, size_t aCount) {
    PodZero(aDestination, aCount);
  }
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
    PodCopy(aDestination, aSource, aCount);
  }
};

template <typename T>
struct MemoryOperations<T, false> {
  static void ConstructDefault(T* aDestination, size_t aCount) {
    for (size_t i = 0; i < aCount; i++) {
      aDestination[i] = T();
    }
  }
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
    std::move(aSource, aSource + aCount, aDestination);
  }
};
}  // namespace detail

/**
 * This data structure allows producing data from one thread, and consuming it
 * on another thread, safely and without explicit synchronization.
 *
 * The role for the producer and the consumer must be constant, i.e., the
 * producer should always be on one thread and the consumer should always be on
 * another thread.
 *
 * Some words about the inner workings of this class:
 * - Capacity is fixed. Only one allocation is performed, in the constructor.
 *   When reading and writing, the return value of the method allows checking if
 *   the ring buffer is empty or full.
 * - We always keep the read index at least one element ahead of the write
 *   index, so we can distinguish between an empty and a full ring buffer: an
 *   empty ring buffer is when the write index is at the same position as the
 *   read index. A full buffer is when the write index is exactly one position
 *   before the read index.
 * - We synchronize updates to the read index after having read the data, and
 *   the write index after having written the data. This means that the each
 *   thread can only touch a portion of the buffer that is not touched by the
 *   other thread.
 * - Callers are expected to provide buffers. When writing to the queue,
 *   elements are copied into the internal storage from the buffer passed in.
 *   When reading from the queue, the user is expected to provide a buffer.
 *   Because this is a ring buffer, data might not be contiguous in memory;
 *   providing an external buffer to copy into is an easy way to have linear
 *   data for further processing.
 */
template <typename T>
class SPSCRingBufferBase {
 public:
  /**
   * Constructor for a ring buffer.
   *
   * This performs an allocation on the heap, but is the only allocation that
   * will happen for the life time of a `SPSCRingBufferBase`.
   *
   * @param Capacity The maximum number of element this ring buffer will hold.
   */
  explicit SPSCRingBufferBase(int aCapacity)
      : mReadIndex(0),
        mWriteIndex(0),
        /* One more element to distinguish from empty and full buffer. */
        mCapacity(aCapacity + 1) {
    MOZ_RELEASE_ASSERT(aCapacity != std::numeric_limits<int>::max());
    MOZ_RELEASE_ASSERT(mCapacity > 0);

    mData = std::make_unique<T[]>(StorageCapacity());

    std::atomic_thread_fence(std::memory_order_seq_cst);
  }
  /**
   * Push `aCount` zero or default constructed elements in the array.
   *
   * Only safely called on the producer thread.
   *
   * @param count The number of elements to enqueue.
   * @return The number of element enqueued.
   */
  [[nodiscard]] int EnqueueDefault(int aCount) {
    return Enqueue(nullptr, aCount);
  }
  /**
   * @brief Put an element in the queue.
   *
   * Only safely called on the producer thread.
   *
   * @param element The element to put in the queue.
   *
   * @return 1 if the element was inserted, 0 otherwise.
   */
  [[nodiscard]] int Enqueue(T& aElement) { return Enqueue(&aElement, 1); }
  /**
   * Push `aCount` elements in the ring buffer.
   *
   * Only safely called on the producer thread.
   *
   * @param elements a pointer to a buffer containing at least `count` elements.
   * If `elements` is nullptr, zero or default constructed elements are enqueud.
   * @param count The number of elements to read from `elements`
   * @return The number of elements successfully coped from `elements` and
   * inserted into the ring buffer.
   */
  [[nodiscard]] int Enqueue(T* aElements, int aCount) {
#ifdef DEBUG
    AssertCorrectThread(mProducerId);
#endif

    int rdIdx = mReadIndex.load(std::memory_order_acquire);
    int wrIdx = mWriteIndex.load(std::memory_order_relaxed);

    if (IsFull(rdIdx, wrIdx)) {
      return 0;
    }

    int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);

    /* First part, from the write index to the end of the array. */
    int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
    /* Second part, from the beginning of the array */
    int secondPart = toWrite - firstPart;

    if (aElements) {
      detail::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements,
                                              firstPart);
      detail::MemoryOperations<T>::MoveOrCopy(
          mData.get(), aElements + firstPart, secondPart);
    } else {
      detail::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx,
                                                    firstPart);
      detail::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
    }

    mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
                      std::memory_order_release);

    return toWrite;
  }
  /**
   * Retrieve at most `count` elements from the ring buffer, and copy them to
   * `elements`, if non-null.
   *
   * Only safely called on the consumer side.
   *
   * @param elements A pointer to a buffer with space for at least `count`
   * elements. If `elements` is `nullptr`, `count` element will be discarded.
   * @param count The maximum number of elements to Dequeue.
   * @return The number of elements written to `elements`.
   */
  [[nodiscard]] int Dequeue(T* elements, int count) {
#ifdef DEBUG
    AssertCorrectThread(mConsumerId);
#endif

    int wrIdx = mWriteIndex.load(std::memory_order_acquire);
    int rdIdx = mReadIndex.load(std::memory_order_relaxed);

    if (IsEmpty(rdIdx, wrIdx)) {
      return 0;
    }

    int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);

    int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
    int secondPart = toRead - firstPart;

    if (elements) {
      detail::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx,
                                              firstPart);
      detail::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(),
                                              secondPart);
    }

    mReadIndex.store(IncrementIndex(rdIdx, toRead), std::memory_order_release);

    return toRead;
  }
  /**
   * Get the number of available elements for consuming.
   *
   * This can be less than the actual number of elements in the queue, since the
   * mWriteIndex is updated at the very end of the Enqueue method on the
   * producer thread, but consequently always returns a number of elements such
   * that a call to Dequeue return this number of elements.
   *
   * @return The number of available elements for reading.
   */
  int AvailableRead() const {
    return AvailableReadInternal(mReadIndex.load(std::memory_order_relaxed),
                                 mWriteIndex.load(std::memory_order_relaxed));
  }
  /**
   * Get the number of available elements for writing.
   *
   * This can be less than than the actual number of slots that are available,
   * because mReadIndex is updated at the very end of the Deque method. It
   * always returns a number such that a call to Enqueue with this number will
   * succeed in enqueuing this number of elements.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int AvailableWrite() const {
    return AvailableWriteInternal(mReadIndex.load(std::memory_order_relaxed),
                                  mWriteIndex.load(std::memory_order_relaxed));
  }
  /**
   * Get the total Capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum Capacity of this ring buffer.
   */
  int Capacity() const { return StorageCapacity() - 1; }

  /**
   * Reset the consumer thread id to the current thread. The caller must
   * guarantee that the last call to Dequeue() on the previous consumer thread
   * has completed, and subsequent calls to Dequeue() will only happen on the
   * current thread.
   */
  void ResetConsumerThreadId() {
#ifdef DEBUG
    mConsumerId = std::this_thread::get_id();
#endif

    // When changing consumer from thread A to B, the last Dequeue on A (synced
    // by mReadIndex.store with memory_order_release) must be picked up by B
    // through an acquire operation.
    std::ignore = mReadIndex.load(std::memory_order_acquire);
  }

  /**
   * Reset the producer thread id to the current thread. The caller must
   * guarantee that the last call to Enqueue() on the previous consumer thread
   * has completed, and subsequent calls to Dequeue() will only happen on the
   * current thread.
   */
  void ResetProducerThreadId() {
#ifdef DEBUG
    mProducerId = std::this_thread::get_id();
#endif

    // When changing producer from thread A to B, the last Enqueue on A (synced
    // by mWriteIndex.store with memory_order_release) must be picked up by B
    // through an acquire operation.
    std::ignore = mWriteIndex.load(std::memory_order_acquire);
  }

 private:
  /** Return true if the ring buffer is empty.
   *
   * This can be called from the consumer or the producer thread.
   *
   * @param aReadIndex the read index to consider
   * @param writeIndex the write index to consider
   * @return true if the ring buffer is empty, false otherwise.
   **/
  bool IsEmpty(int aReadIndex, int aWriteIndex) const {
    return aWriteIndex == aReadIndex;
  }
  /** Return true if the ring buffer is full.
   *
   * This happens if the write index is exactly one element behind the read
   * index.
   *
   * This can be called from the consummer or the producer thread.
   *
   * @param aReadIndex the read index to consider
   * @param writeIndex the write index to consider
   * @return true if the ring buffer is full, false otherwise.
   **/
  bool IsFull(int aReadIndex, int aWriteIndex) const {
    return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
  }
  /**
   * Return the size of the storage. It is one more than the number of elements
   * that can be stored in the buffer.
   *
   * This can be called from any thread.
   *
   * @return the number of elements that can be stored in the buffer.
   */
  int StorageCapacity() const { return mCapacity; }
  /**
   * Returns the number of elements available for reading.
   *
   * This can be called from the consummer or producer thread, but see the
   * comment in `AvailableRead`.
   *
   * @return the number of available elements for reading.
   */
  int AvailableReadInternal(int aReadIndex, int aWriteIndex) const {
    if (aWriteIndex >= aReadIndex) {
      return aWriteIndex - aReadIndex;
    } else {
      return aWriteIndex + StorageCapacity() - aReadIndex;
    }
  }
  /**
   * Returns the number of empty elements, available for writing.
   *
   * This can be called from the consummer or producer thread, but see the
   * comment in `AvailableWrite`.
   *
   * @return the number of elements that can be written into the array.
   */
  int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const {
    /* We subtract one element here to always keep at least one sample
     * free in the buffer, to distinguish between full and empty array. */
    int rv = aReadIndex - aWriteIndex - 1;
    if (aWriteIndex >= aReadIndex) {
      rv += StorageCapacity();
    }
    return rv;
  }
  /**
   * Increments an index, wrapping it around the storage.
   *
   * Incrementing `mWriteIndex` can be done on the producer thread.
   * Incrementing `mReadIndex` can be done on the consummer thread.
   *
   * @param index a reference to the index to increment.
   * @param increment the number by which `index` is incremented.
   * @return the new index.
   */
  int IncrementIndex(int aIndex, int aIncrement) const {
    MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() &&
               aIndex < StorageCapacity());
    return (aIndex + aIncrement) % StorageCapacity();
  }
  /**
   * @brief This allows checking that Enqueue (resp. Dequeue) are always
   * called by the right thread.
   *
   * The role of the thread are assigned the first time they call Enqueue or
   * Dequeue, and cannot change, except by a ResetThreadId method.
   *
   * @param id the id of the thread that has called the calling method first.
   */
#ifdef DEBUG
  static void AssertCorrectThread(std::thread::id& aId) {
    if (aId == std::thread::id()) {
      aId = std::this_thread::get_id();
      return;
    }
    MOZ_ASSERT(aId == std::this_thread::get_id());
  }
#endif
  /** Index at which the oldest element is. */
  std::atomic<int> mReadIndex;
  /** Index at which to write new elements. `mWriteIndex` is always at
   * least one element ahead of `mReadIndex`. */
  std::atomic<int> mWriteIndex;
  /** Maximum number of elements that can be stored in the ring buffer. */
  const int mCapacity;
  /** Data storage, of size `mCapacity + 1` */
  std::unique_ptr<T[]> mData;
#ifdef DEBUG
  /** The id of the only thread that is allowed to read from the queue. */
  mutable std::thread::id mConsumerId;
  /** The id of the only thread that is allowed to write from the queue. */
  mutable std::thread::id mProducerId;
#endif
};

/**
 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template <typename T>
using SPSCQueue = SPSCRingBufferBase<T>;

}  // namespace mozilla

#endif  // mozilla_LockFreeQueue_h