summaryrefslogtreecommitdiffstats
path: root/third_party/rust/ringbuf/src/consumer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/ringbuf/src/consumer.rs')
-rw-r--r--third_party/rust/ringbuf/src/consumer.rs356
1 files changed, 356 insertions, 0 deletions
diff --git a/third_party/rust/ringbuf/src/consumer.rs b/third_party/rust/ringbuf/src/consumer.rs
new file mode 100644
index 0000000000..f367fdd851
--- /dev/null
+++ b/third_party/rust/ringbuf/src/consumer.rs
@@ -0,0 +1,356 @@
+use std::{
+ cmp::min,
+ io::{self, Read, Write},
+ mem::{self, MaybeUninit},
+ ops::Range,
+ ptr::copy_nonoverlapping,
+ sync::{atomic::Ordering, Arc},
+};
+
+use crate::{producer::Producer, ring_buffer::*};
+
+/// Consumer part of ring buffer.
+pub struct Consumer<T> {
+ pub(crate) rb: Arc<RingBuffer<T>>,
+}
+
+impl<T: Sized> Consumer<T> {
+ /// Returns capacity of the ring buffer.
+ ///
+ /// The capacity of the buffer is constant.
+ pub fn capacity(&self) -> usize {
+ self.rb.capacity()
+ }
+
+ /// Checks if the ring buffer is empty.
+ ///
+ /// *The result may become irrelevant at any time because of concurring activity of the producer.*
+ pub fn is_empty(&self) -> bool {
+ self.rb.is_empty()
+ }
+
+ /// Checks if the ring buffer is full.
+ ///
+ /// The result is relevant until you remove items from the consumer.
+ pub fn is_full(&self) -> bool {
+ self.rb.is_full()
+ }
+
+ /// The length of the data stored in the buffer
+ ///
+ /// Actual length may be equal to or greater than the returned value.
+ pub fn len(&self) -> usize {
+ self.rb.len()
+ }
+
+ /// The remaining space in the buffer.
+ ///
+ /// Actual remaining space may be equal to or less than the returning value.
+ pub fn remaining(&self) -> usize {
+ self.rb.remaining()
+ }
+
+ fn get_ranges(&self) -> (Range<usize>, Range<usize>) {
+ let head = self.rb.head.load(Ordering::Acquire);
+ let tail = self.rb.tail.load(Ordering::Acquire);
+ let len = unsafe { self.rb.data.get_ref().len() };
+
+ if head < tail {
+ (head..tail, 0..0)
+ } else if head > tail {
+ (head..len, 0..tail)
+ } else {
+ (0..0, 0..0)
+ }
+ }
+
+ /// Gives immutable access to the elements contained by the ring buffer without removing them.
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn access<F: FnOnce(&[T], &[T])>(&self, f: F) {
+ let ranges = self.get_ranges();
+
+ unsafe {
+ let left = &self.rb.data.get_ref()[ranges.0];
+ let right = &self.rb.data.get_ref()[ranges.1];
+
+ f(
+ &*(left as *const [MaybeUninit<T>] as *const [T]),
+ &*(right as *const [MaybeUninit<T>] as *const [T]),
+ );
+ }
+ }
+
+ /// Gives mutable access to the elements contained by the ring buffer without removing them.
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn access_mut<F: FnOnce(&mut [T], &mut [T])>(&mut self, f: F) {
+ let ranges = self.get_ranges();
+
+ unsafe {
+ let left = &mut self.rb.data.get_mut()[ranges.0];
+ let right = &mut self.rb.data.get_mut()[ranges.1];
+
+ f(
+ &mut *(left as *mut [MaybeUninit<T>] as *mut [T]),
+ &mut *(right as *mut [MaybeUninit<T>] as *mut [T]),
+ );
+ }
+ }
+
+ /// Allows to read from ring buffer memory directry.
+ ///
+ /// *This function is unsafe because it gives access to possibly uninitialized memory*
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// `f` should return number of elements been read.
+ /// *There is no checks for returned number - it remains on the developer's conscience.*
+ ///
+ /// The method **always** calls `f` even if ring buffer is empty.
+ ///
+ /// The method returns number returned from `f`.
+ pub unsafe fn pop_access<F>(&mut self, f: F) -> usize
+ where
+ F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
+ {
+ let head = self.rb.head.load(Ordering::Acquire);
+ let tail = self.rb.tail.load(Ordering::Acquire);
+ let len = self.rb.data.get_ref().len();
+
+ let ranges = if head < tail {
+ (head..tail, 0..0)
+ } else if head > tail {
+ (head..len, 0..tail)
+ } else {
+ (0..0, 0..0)
+ };
+
+ let slices = (
+ &mut self.rb.data.get_mut()[ranges.0],
+ &mut self.rb.data.get_mut()[ranges.1],
+ );
+
+ let n = f(slices.0, slices.1);
+
+ if n > 0 {
+ let new_head = (head + n) % len;
+ self.rb.head.store(new_head, Ordering::Release);
+ }
+ n
+ }
+
+ /// Copies data from the ring buffer to the slice in byte-to-byte manner.
+ ///
+ /// The `elems` slice should contain **un-initialized** data before the method call.
+ /// After the call the copied part of data in `elems` should be interpreted as **initialized**.
+ /// The remaining part is still **un-iniitilized**.
+ ///
+ /// Returns the number of items been copied.
+ pub unsafe fn pop_copy(&mut self, elems: &mut [MaybeUninit<T>]) -> usize {
+ self.pop_access(|left, right| {
+ if elems.len() < left.len() {
+ copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), elems.len());
+ elems.len()
+ } else {
+ copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), left.len());
+ if elems.len() < left.len() + right.len() {
+ copy_nonoverlapping(
+ right.as_ptr(),
+ elems.as_mut_ptr().add(left.len()),
+ elems.len() - left.len(),
+ );
+ elems.len()
+ } else {
+ copy_nonoverlapping(
+ right.as_ptr(),
+ elems.as_mut_ptr().add(left.len()),
+ right.len(),
+ );
+ left.len() + right.len()
+ }
+ }
+ })
+ }
+
+ /// Removes latest element from the ring buffer and returns it.
+ /// Returns `None` if the ring buffer is empty.
+ pub fn pop(&mut self) -> Option<T> {
+ let mut elem_mu = MaybeUninit::uninit();
+ let n = unsafe {
+ self.pop_access(|slice, _| {
+ if !slice.is_empty() {
+ mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
+ 1
+ } else {
+ 0
+ }
+ })
+ };
+ match n {
+ 0 => None,
+ 1 => Some(unsafe { elem_mu.assume_init() }),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Repeatedly calls the closure `f` passing elements removed from the ring buffer to it.
+ ///
+ /// The closure is called until it returns `false` or the ring buffer is empty.
+ ///
+ /// The method returns number of elements been removed from the buffer.
+ pub fn pop_each<F: FnMut(T) -> bool>(&mut self, mut f: F, count: Option<usize>) -> usize {
+ unsafe {
+ self.pop_access(|left, right| {
+ let lb = match count {
+ Some(n) => min(n, left.len()),
+ None => left.len(),
+ };
+ for (i, dst) in left[0..lb].iter_mut().enumerate() {
+ if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) {
+ return i + 1;
+ }
+ }
+ if lb < left.len() {
+ return lb;
+ }
+
+ let rb = match count {
+ Some(n) => min(n - lb, right.len()),
+ None => right.len(),
+ };
+ for (i, dst) in right[0..rb].iter_mut().enumerate() {
+ if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) {
+ return i + lb + 1;
+ }
+ }
+ left.len() + right.len()
+ })
+ }
+ }
+
+ /// Iterate immutably over the elements contained by the ring buffer without removing them.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn for_each<F: FnMut(&T)>(&self, mut f: F) {
+ self.access(|left, right| {
+ for c in left.iter() {
+ f(c);
+ }
+ for c in right.iter() {
+ f(c);
+ }
+ });
+ }
+
+ /// Iterate mutably over the elements contained by the ring buffer without removing them.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn for_each_mut<F: FnMut(&mut T)>(&mut self, mut f: F) {
+ self.access_mut(|left, right| {
+ for c in left.iter_mut() {
+ f(c);
+ }
+ for c in right.iter_mut() {
+ f(c);
+ }
+ });
+ }
+
+ /// Removes at most `count` elements from the consumer and appends them to the producer.
+ /// If `count` is `None` then as much as possible elements will be moved.
+ /// The producer and consumer parts may be of different buffers as well as of the same one.
+ ///
+ /// On success returns count of elements been moved.
+ pub fn move_to(&mut self, other: &mut Producer<T>, count: Option<usize>) -> usize {
+ move_items(self, other, count)
+ }
+}
+
+impl<T: Sized + Copy> Consumer<T> {
+ /// Removes first elements from the ring buffer and writes them into a slice.
+ /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
+ ///
+ /// On success returns count of elements been removed from the ring buffer.
+ pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
+ unsafe { self.pop_copy(&mut *(elems as *mut [T] as *mut [MaybeUninit<T>])) }
+ }
+}
+
+impl Consumer<u8> {
+ /// Removes at most first `count` bytes from the ring buffer and writes them into
+ /// a [`Write`](https://doc.rust-lang.org/std/io/trait.Write.html) instance.
+ /// If `count` is `None` then as much as possible bytes will be written.
+ ///
+ /// Returns `Ok(n)` if `write` is succeded. `n` is number of bytes been written.
+ /// `n == 0` means that either `write` returned zero or ring buffer is empty.
+ ///
+ /// If `write` is failed then error is returned.
+ pub fn write_into(
+ &mut self,
+ writer: &mut dyn Write,
+ count: Option<usize>,
+ ) -> io::Result<usize> {
+ let mut err = None;
+ let n = unsafe {
+ self.pop_access(|left, _| -> usize {
+ let left = match count {
+ Some(c) => {
+ if c < left.len() {
+ &mut left[0..c]
+ } else {
+ left
+ }
+ }
+ None => left,
+ };
+ match writer
+ .write(&*(left as *const [MaybeUninit<u8>] as *const [u8]))
+ .and_then(|n| {
+ if n <= left.len() {
+ Ok(n)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Write operation returned invalid number",
+ ))
+ }
+ }) {
+ Ok(n) => n,
+ Err(e) => {
+ err = Some(e);
+ 0
+ }
+ }
+ })
+ };
+ match err {
+ Some(e) => Err(e),
+ None => Ok(n),
+ }
+ }
+}
+
+impl Read for Consumer<u8> {
+ fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
+ let n = self.pop_slice(buffer);
+ if n == 0 && !buffer.is_empty() {
+ Err(io::Error::new(
+ io::ErrorKind::WouldBlock,
+ "Ring buffer is empty",
+ ))
+ } else {
+ Ok(n)
+ }
+ }
+}