summaryrefslogtreecommitdiffstats
path: root/media/libcubeb/src/cubeb_ringbuffer.h
blob: f0203515660745800c2f53b57abc7ac04134c374 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/*
 * Copyright © 2016 Mozilla Foundation
 *
 * This program is made available under an ISC-style license.  See the
 * accompanying file LICENSE for details.
 */

#ifndef CUBEB_RING_BUFFER_H
#define CUBEB_RING_BUFFER_H

#include "cubeb_utils.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>

/**
 * Single producer single consumer lock-free and wait-free ring buffer.
 *
 * This data structure allows producing data from one thread, and consuming it
 * on another thread, safely and without explicit synchronization. If used on
 * two threads, this data structure uses atomics for thread safety. It is
 * possible to disable the use of atomics at compile time and only use this data
 * structure on one thread.
 *
 * The role for the producer and the consumer must be constant, i.e., the
 * producer should always be on one thread and the consumer should always be on
 * another thread.
 *
 * Some words about the inner workings of this class:
 * - Capacity is fixed. Only one allocation is performed, in the constructor.
 *   When reading and writing, the return value of the method allows checking if
 *   the ring buffer is empty or full.
 * - We always keep the read index at least one element ahead of the write
 *   index, so we can distinguish between an empty and a full ring buffer: an
 *   empty ring buffer is when the write index is at the same position as the
 *   read index. A full buffer is when the write index is exactly one position
 *   before the read index.
 * - We synchronize updates to the read index after having read the data, and
 *   the write index after having written the data. This means that the each
 *   thread can only touch a portion of the buffer that is not touched by the
 *   other thread.
 * - Callers are expected to provide buffers. When writing to the queue,
 *   elements are copied into the internal storage from the buffer passed in.
 *   When reading from the queue, the user is expected to provide a buffer.
 *   Because this is a ring buffer, data might not be contiguous in memory,
 *   providing an external buffer to copy into is an easy way to have linear
 *   data for further processing.
 */
template <typename T> class ring_buffer_base {
public:
  /**
   * Constructor for a ring buffer.
   *
   * This performs an allocation, but is the only allocation that will happen
   * for the life time of a `ring_buffer_base`.
   *
   * @param capacity The maximum number of element this ring buffer will hold.
   */
  ring_buffer_base(int capacity)
      /* One more element to distinguish from empty and full buffer. */
      : capacity_(capacity + 1)
  {
    assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
           "buffer too large for the type of index used.");
    assert(capacity_ > 0);

    data_.reset(new T[storage_capacity()]);
    /* If this queue is using atomics, initializing those members as the last
     * action in the constructor acts as a full barrier, and allow capacity() to
     * be thread-safe. */
    write_index_ = 0;
    read_index_ = 0;
  }
  /**
   * Push `count` zero or default constructed elements in the array.
   *
   * Only safely called on the producer thread.
   *
   * @param count The number of elements to enqueue.
   * @return The number of element enqueued.
   */
  int enqueue_default(int count) { return enqueue(nullptr, count); }
  /**
   * @brief Put an element in the queue
   *
   * Only safely called on the producer thread.
   *
   * @param element The element to put in the queue.
   *
   * @return 1 if the element was inserted, 0 otherwise.
   */
  int enqueue(T & element) { return enqueue(&element, 1); }
  /**
   * Push `count` elements in the ring buffer.
   *
   * Only safely called on the producer thread.
   *
   * @param elements a pointer to a buffer containing at least `count` elements.
   * If `elements` is nullptr, zero or default constructed elements are
   * enqueued.
   * @param count The number of elements to read from `elements`
   * @return The number of elements successfully coped from `elements` and
   * inserted into the ring buffer.
   */
  int enqueue(T * elements, int count)
  {
#ifndef NDEBUG
    assert_correct_thread(producer_id);
#endif

    int wr_idx = write_index_.load(std::memory_order_relaxed);
    int rd_idx = read_index_.load(std::memory_order_acquire);

    if (full_internal(rd_idx, wr_idx)) {
      return 0;
    }

    int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);

    /* First part, from the write index to the end of the array. */
    int first_part = std::min(storage_capacity() - wr_idx, to_write);
    /* Second part, from the beginning of the array */
    int second_part = to_write - first_part;

    if (elements) {
      Copy(data_.get() + wr_idx, elements, first_part);
      Copy(data_.get(), elements + first_part, second_part);
    } else {
      ConstructDefault(data_.get() + wr_idx, first_part);
      ConstructDefault(data_.get(), second_part);
    }

    write_index_.store(increment_index(wr_idx, to_write),
                       std::memory_order_release);

    return to_write;
  }
  /**
   * Retrieve at most `count` elements from the ring buffer, and copy them to
   * `elements`, if non-null.
   *
   * Only safely called on the consumer side.
   *
   * @param elements A pointer to a buffer with space for at least `count`
   * elements. If `elements` is `nullptr`, `count` element will be discarded.
   * @param count The maximum number of elements to dequeue.
   * @return The number of elements written to `elements`.
   */
  int dequeue(T * elements, int count)
  {
#ifndef NDEBUG
    assert_correct_thread(consumer_id);
#endif

    int rd_idx = read_index_.load(std::memory_order_relaxed);
    int wr_idx = write_index_.load(std::memory_order_acquire);

    if (empty_internal(rd_idx, wr_idx)) {
      return 0;
    }

    int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);

    int first_part = std::min(storage_capacity() - rd_idx, to_read);
    int second_part = to_read - first_part;

    if (elements) {
      Copy(elements, data_.get() + rd_idx, first_part);
      Copy(elements + first_part, data_.get(), second_part);
    }

    read_index_.store(increment_index(rd_idx, to_read),
                      std::memory_order_release);

    return to_read;
  }
  /**
   * Get the number of available element for consuming.
   *
   * Only safely called on the consumer thread.
   *
   * @return The number of available elements for reading.
   */
  int available_read() const
  {
#ifndef NDEBUG
    assert_correct_thread(consumer_id);
#endif
    return available_read_internal(
        read_index_.load(std::memory_order_relaxed),
        write_index_.load(std::memory_order_acquire));
  }
  /**
   * Get the number of available elements for consuming.
   *
   * Only safely called on the producer thread.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int available_write() const
  {
#ifndef NDEBUG
    assert_correct_thread(producer_id);
#endif
    return available_write_internal(
        read_index_.load(std::memory_order_acquire),
        write_index_.load(std::memory_order_relaxed));
  }
  /**
   * Get the total capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum capacity of this ring buffer.
   */
  int capacity() const { return storage_capacity() - 1; }
  /**
   * Reset the consumer and producer thread identifier, in case the thread are
   * being changed. This has to be externally synchronized. This is no-op when
   * asserts are disabled.
   */
  void reset_thread_ids()
  {
#ifndef NDEBUG
    consumer_id = producer_id = std::thread::id();
#endif
  }

private:
  /** Return true if the ring buffer is empty.
   *
   * @param read_index the read index to consider
   * @param write_index the write index to consider
   * @return true if the ring buffer is empty, false otherwise.
   **/
  bool empty_internal(int read_index, int write_index) const
  {
    return write_index == read_index;
  }
  /** Return true if the ring buffer is full.
   *
   * This happens if the write index is exactly one element behind the read
   * index.
   *
   * @param read_index the read index to consider
   * @param write_index the write index to consider
   * @return true if the ring buffer is full, false otherwise.
   **/
  bool full_internal(int read_index, int write_index) const
  {
    return (write_index + 1) % storage_capacity() == read_index;
  }
  /**
   * Return the size of the storage. It is one more than the number of elements
   * that can be stored in the buffer.
   *
   * @return the number of elements that can be stored in the buffer.
   */
  int storage_capacity() const { return capacity_; }
  /**
   * Returns the number of elements available for reading.
   *
   * @return the number of available elements for reading.
   */
  int available_read_internal(int read_index, int write_index) const
  {
    if (write_index >= read_index) {
      return write_index - read_index;
    } else {
      return write_index + storage_capacity() - read_index;
    }
  }
  /**
   * Returns the number of empty elements, available for writing.
   *
   * @return the number of elements that can be written into the array.
   */
  int available_write_internal(int read_index, int write_index) const
  {
    /* We substract one element here to always keep at least one sample
     * free in the buffer, to distinguish between full and empty array. */
    int rv = read_index - write_index - 1;
    if (write_index >= read_index) {
      rv += storage_capacity();
    }
    return rv;
  }
  /**
   * Increments an index, wrapping it around the storage.
   *
   * @param index a reference to the index to increment.
   * @param increment the number by which `index` is incremented.
   * @return the new index.
   */
  int increment_index(int index, int increment) const
  {
    assert(increment >= 0);
    return (index + increment) % storage_capacity();
  }
  /**
   * @brief This allows checking that enqueue (resp. dequeue) are always called
   * by the right thread.
   *
   * @param id the id of the thread that has called the calling method first.
   */
#ifndef NDEBUG
  static void assert_correct_thread(std::thread::id & id)
  {
    if (id == std::thread::id()) {
      id = std::this_thread::get_id();
      return;
    }
    assert(id == std::this_thread::get_id());
  }
#endif
  /** Index at which the oldest element is at, in samples. */
  std::atomic<int> read_index_;
  /** Index at which to write new elements. `write_index` is always at
   * least one element ahead of `read_index_`. */
  std::atomic<int> write_index_;
  /** Maximum number of elements that can be stored in the ring buffer. */
  const int capacity_;
  /** Data storage */
  std::unique_ptr<T[]> data_;
#ifndef NDEBUG
  /** The id of the only thread that is allowed to read from the queue. */
  mutable std::thread::id consumer_id;
  /** The id of the only thread that is allowed to write from the queue. */
  mutable std::thread::id producer_id;
#endif
};

/**
 * Adapter for `ring_buffer_base` that exposes an interface in frames.
 */
template <typename T> class audio_ring_buffer_base {
public:
  /**
   * @brief Constructor.
   *
   * @param channel_count       Number of channels.
   * @param capacity_in_frames  The capacity in frames.
   */
  audio_ring_buffer_base(int channel_count, int capacity_in_frames)
      : channel_count(channel_count),
        ring_buffer(frames_to_samples(capacity_in_frames))
  {
    assert(channel_count > 0);
  }
  /**
   * @brief Enqueue silence.
   *
   * Only safely called on the producer thread.
   *
   * @param frame_count The number of frames of silence to enqueue.
   * @return  The number of frames of silence actually written to the queue.
   */
  int enqueue_default(int frame_count)
  {
    return samples_to_frames(
        ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
  }
  /**
   * @brief Enqueue `frames_count` frames of audio.
   *
   * Only safely called from the producer thread.
   *
   * @param [in] frames If non-null, the frames to enqueue.
   *                    Otherwise, silent frames are enqueued.
   * @param frame_count The number of frames to enqueue.
   *
   * @return The number of frames enqueued
   */

  int enqueue(T * frames, int frame_count)
  {
    return samples_to_frames(
        ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
  }

  /**
   * @brief Removes `frame_count` frames from the buffer, and
   *        write them to `frames` if it is non-null.
   *
   * Only safely called on the consumer thread.
   *
   * @param frames      If non-null, the frames are copied to `frames`.
   *                    Otherwise, they are dropped.
   * @param frame_count The number of frames to remove.
   *
   * @return  The number of frames actually dequeud.
   */
  int dequeue(T * frames, int frame_count)
  {
    return samples_to_frames(
        ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
  }
  /**
   * Get the number of available frames of audio for consuming.
   *
   * Only safely called on the consumer thread.
   *
   * @return The number of available frames of audio for reading.
   */
  int available_read() const
  {
    return samples_to_frames(ring_buffer.available_read());
  }
  /**
   * Get the number of available frames of audio for consuming.
   *
   * Only safely called on the producer thread.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int available_write() const
  {
    return samples_to_frames(ring_buffer.available_write());
  }
  /**
   * Get the total capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum capacity of this ring buffer.
   */
  int capacity() const { return samples_to_frames(ring_buffer.capacity()); }

private:
  /**
   * @brief Frames to samples conversion.
   *
   * @param frames The number of frames.
   *
   * @return  A number of samples.
   */
  int frames_to_samples(int frames) const { return frames * channel_count; }
  /**
   * @brief Samples to frames conversion.
   *
   * @param samples The number of samples.
   *
   * @return  A number of frames.
   */
  int samples_to_frames(int samples) const { return samples / channel_count; }
  /** Number of channels of audio that will stream through this ring buffer. */
  int channel_count;
  /** The underlying ring buffer that is used to store the data. */
  ring_buffer_base<T> ring_buffer;
};

/**
 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template <typename T> using lock_free_queue = ring_buffer_base<T>;
/**
 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template <typename T>
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;

#endif // CUBEB_RING_BUFFER_H