diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/ringbuf/src/producer.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/ringbuf/src/producer.rs')
-rw-r--r-- | third_party/rust/ringbuf/src/producer.rs | 290 |
1 files changed, 290 insertions, 0 deletions
diff --git a/third_party/rust/ringbuf/src/producer.rs b/third_party/rust/ringbuf/src/producer.rs new file mode 100644 index 0000000000..04ff998070 --- /dev/null +++ b/third_party/rust/ringbuf/src/producer.rs @@ -0,0 +1,290 @@ +use alloc::sync::Arc; +use core::{ + mem::{self, MaybeUninit}, + ptr::copy_nonoverlapping, + slice, + sync::atomic::Ordering, +}; +#[cfg(feature = "std")] +use std::io::{self, Read, Write}; + +use crate::{consumer::Consumer, ring_buffer::*}; + +/// Producer part of ring buffer. +pub struct Producer<T> { + pub(crate) rb: Arc<RingBuffer<T>>, +} + +impl<T: Sized> Producer<T> { + /// Returns capacity of the ring buffer. + /// + /// The capacity of the buffer is constant. + pub fn capacity(&self) -> usize { + self.rb.capacity() + } + + /// Checks if the ring buffer is empty. + /// + /// The result is relevant until you push items to the producer. + pub fn is_empty(&self) -> bool { + self.rb.is_empty() + } + + /// Checks if the ring buffer is full. + /// + /// *The result may become irrelevant at any time because of concurring activity of the consumer.* + pub fn is_full(&self) -> bool { + self.rb.is_full() + } + + /// The length of the data stored in the buffer. + /// + /// Actual length may be equal to or less than the returned value. + pub fn len(&self) -> usize { + self.rb.len() + } + + /// The remaining space in the buffer. + /// + /// Actual remaining space may be equal to or greater than the returning value. + pub fn remaining(&self) -> usize { + self.rb.remaining() + } + + /// Allows to write into ring buffer memory directly. + /// + /// *This function is unsafe because it gives access to possibly uninitialized memory* + /// + /// The method takes a function `f` as argument. + /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). + /// First slice contains older elements. + /// + /// `f` should return number of elements been written. + /// *There is no checks for returned number - it remains on the developer's conscience.* + /// + /// The method **always** calls `f` even if ring buffer is full. + /// + /// The method returns number returned from `f`. + /// + /// # Safety + /// + /// The method gives access to ring buffer underlying memory which may be uninitialized. + /// + pub unsafe fn push_access<F>(&mut self, f: F) -> usize + where + F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize, + { + let head = self.rb.head.load(Ordering::Acquire); + let tail = self.rb.tail.load(Ordering::Acquire); + let len = self.rb.data.len(); + + let ranges = if tail >= head { + if head > 0 { + (tail..len, 0..(head - 1)) + } else if tail < len - 1 { + (tail..(len - 1), 0..0) + } else { + (0..0, 0..0) + } + } else if tail < head - 1 { + (tail..(head - 1), 0..0) + } else { + (0..0, 0..0) + }; + + let ptr = self.rb.data.get_mut().as_mut_ptr(); + + let slices = ( + slice::from_raw_parts_mut(ptr.add(ranges.0.start), ranges.0.len()), + slice::from_raw_parts_mut(ptr.add(ranges.1.start), ranges.1.len()), + ); + + let n = f(slices.0, slices.1); + + if n > 0 { + let new_tail = (tail + n) % len; + self.rb.tail.store(new_tail, Ordering::Release); + } + n + } + + /// Copies data from the slice to the ring buffer in byte-to-byte manner. + /// + /// The `elems` slice should contain **initialized** data before the method call. + /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**. + /// + /// Returns the number of items been copied. + /// + /// # Safety + /// + /// The method copies raw data into the ring buffer. + /// + /// *You should properly fill the slice and manage remaining elements after copy.* + /// + pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize { + self.push_access(|left, right| -> usize { + if elems.len() < left.len() { + copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len()); + elems.len() + } else { + copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len()); + if elems.len() < left.len() + right.len() { + copy_nonoverlapping( + elems.as_ptr().add(left.len()), + right.as_mut_ptr(), + elems.len() - left.len(), + ); + elems.len() + } else { + copy_nonoverlapping( + elems.as_ptr().add(left.len()), + right.as_mut_ptr(), + right.len(), + ); + left.len() + right.len() + } + } + }) + } + + /// Appends an element to the ring buffer. + /// On failure returns an error containing the element that hasn't been appended. + pub fn push(&mut self, elem: T) -> Result<(), T> { + let mut elem_mu = MaybeUninit::new(elem); + let n = unsafe { + self.push_access(|slice, _| { + if !slice.is_empty() { + mem::swap(slice.get_unchecked_mut(0), &mut elem_mu); + 1 + } else { + 0 + } + }) + }; + match n { + 0 => Err(unsafe { elem_mu.assume_init() }), + 1 => Ok(()), + _ => unreachable!(), + } + } + + /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer. + /// + /// The closure is called until it returns `None` or the ring buffer is full. + /// + /// The method returns number of elements been put into the buffer. + pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize { + unsafe { + self.push_access(|left, right| { + for (i, dst) in left.iter_mut().enumerate() { + match f() { + Some(e) => dst.as_mut_ptr().write(e), + None => return i, + }; + } + for (i, dst) in right.iter_mut().enumerate() { + match f() { + Some(e) => dst.as_mut_ptr().write(e), + None => return i + left.len(), + }; + } + left.len() + right.len() + }) + } + } + + /// Appends elements from an iterator to the ring buffer. + /// Elements that haven't been added to the ring buffer remain in the iterator. + /// + /// Returns count of elements been appended to the ring buffer. + pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize { + self.push_each(|| elems.next()) + } + + /// Removes at most `count` elements from the consumer and appends them to the producer. + /// If `count` is `None` then as much as possible elements will be moved. + /// The producer and consumer parts may be of different buffers as well as of the same one. + /// + /// On success returns number of elements been moved. + pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize { + move_items(other, self, count) + } +} + +impl<T: Sized + Copy> Producer<T> { + /// Appends elements from slice to the ring buffer. + /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html). + /// + /// Returns count of elements been appended to the ring buffer. + pub fn push_slice(&mut self, elems: &[T]) -> usize { + unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) } + } +} + +#[cfg(feature = "std")] +impl Producer<u8> { + /// Reads at most `count` bytes + /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance + /// and appends them to the ring buffer. + /// If `count` is `None` then as much as possible bytes will be read. + /// + /// Returns `Ok(n)` if `read` succeeded. `n` is number of bytes been read. + /// `n == 0` means that either `read` returned zero or ring buffer is full. + /// + /// If `read` is failed or returned an invalid number then error is returned. + pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> { + let mut err = None; + let n = unsafe { + self.push_access(|left, _| -> usize { + let left = match count { + Some(c) => { + if c < left.len() { + &mut left[0..c] + } else { + left + } + } + None => left, + }; + match reader + .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8])) + .and_then(|n| { + if n <= left.len() { + Ok(n) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Read operation returned an invalid number", + )) + } + }) { + Ok(n) => n, + Err(e) => { + err = Some(e); + 0 + } + } + }) + }; + match err { + Some(e) => Err(e), + None => Ok(n), + } + } +} + +#[cfg(feature = "std")] +impl Write for Producer<u8> { + fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { + let n = self.push_slice(buffer); + if n == 0 && !buffer.is_empty() { + Err(io::ErrorKind::WouldBlock.into()) + } else { + Ok(n) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} |