summaryrefslogtreecommitdiffstats
path: root/third_party/rust/ringbuf/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/ringbuf/src')
-rw-r--r--third_party/rust/ringbuf/src/benchmark.rs51
-rw-r--r--third_party/rust/ringbuf/src/consumer.rs356
-rw-r--r--third_party/rust/ringbuf/src/lib.rs134
-rw-r--r--third_party/rust/ringbuf/src/producer.rs274
-rw-r--r--third_party/rust/ringbuf/src/ring_buffer.rs187
-rw-r--r--third_party/rust/ringbuf/src/tests/access.rs234
-rw-r--r--third_party/rust/ringbuf/src/tests/drop.rs240
-rw-r--r--third_party/rust/ringbuf/src/tests/message.rs167
-rw-r--r--third_party/rust/ringbuf/src/tests/mod.rs6
-rw-r--r--third_party/rust/ringbuf/src/tests/multiple.rs140
-rw-r--r--third_party/rust/ringbuf/src/tests/read_write.rs159
-rw-r--r--third_party/rust/ringbuf/src/tests/single.rs201
12 files changed, 2149 insertions, 0 deletions
diff --git a/third_party/rust/ringbuf/src/benchmark.rs b/third_party/rust/ringbuf/src/benchmark.rs
new file mode 100644
index 0000000000..e145f580c1
--- /dev/null
+++ b/third_party/rust/ringbuf/src/benchmark.rs
@@ -0,0 +1,51 @@
+use super::*;
+
+use test::Bencher;
+
+const RB_SIZE: usize = 0x400;
+
+#[bench]
+fn single_item(b: &mut Bencher) {
+ let buf = RingBuffer::<u64>::new(RB_SIZE);
+ let (mut prod, mut cons) = buf.split();
+ prod.push_slice(&[1; RB_SIZE / 2]);
+ b.iter(|| {
+ prod.push(1).unwrap();
+ cons.pop().unwrap();
+ });
+}
+
+#[bench]
+fn slice_x10(b: &mut Bencher) {
+ let buf = RingBuffer::<u64>::new(RB_SIZE);
+ let (mut prod, mut cons) = buf.split();
+ prod.push_slice(&[1; RB_SIZE / 2]);
+ let mut data = [1; 10];
+ b.iter(|| {
+ prod.push_slice(&data);
+ cons.pop_slice(&mut data);
+ });
+}
+
+#[bench]
+fn slice_x100(b: &mut Bencher) {
+ let buf = RingBuffer::<u64>::new(RB_SIZE);
+ let (mut prod, mut cons) = buf.split();
+ prod.push_slice(&[1; RB_SIZE / 2]);
+ let mut data = [1; 100];
+ b.iter(|| {
+ prod.push_slice(&data);
+ cons.pop_slice(&mut data);
+ });
+}
+#[bench]
+fn slice_x1000(b: &mut Bencher) {
+ let buf = RingBuffer::<u64>::new(RB_SIZE);
+ let (mut prod, mut cons) = buf.split();
+ prod.push_slice(&[1; 16]);
+ let mut data = [1; 1000];
+ b.iter(|| {
+ prod.push_slice(&data);
+ cons.pop_slice(&mut data);
+ });
+}
diff --git a/third_party/rust/ringbuf/src/consumer.rs b/third_party/rust/ringbuf/src/consumer.rs
new file mode 100644
index 0000000000..f367fdd851
--- /dev/null
+++ b/third_party/rust/ringbuf/src/consumer.rs
@@ -0,0 +1,356 @@
+use std::{
+ cmp::min,
+ io::{self, Read, Write},
+ mem::{self, MaybeUninit},
+ ops::Range,
+ ptr::copy_nonoverlapping,
+ sync::{atomic::Ordering, Arc},
+};
+
+use crate::{producer::Producer, ring_buffer::*};
+
+/// Consumer part of ring buffer.
+pub struct Consumer<T> {
+ pub(crate) rb: Arc<RingBuffer<T>>,
+}
+
+impl<T: Sized> Consumer<T> {
+ /// Returns capacity of the ring buffer.
+ ///
+ /// The capacity of the buffer is constant.
+ pub fn capacity(&self) -> usize {
+ self.rb.capacity()
+ }
+
+ /// Checks if the ring buffer is empty.
+ ///
+ /// *The result may become irrelevant at any time because of concurring activity of the producer.*
+ pub fn is_empty(&self) -> bool {
+ self.rb.is_empty()
+ }
+
+ /// Checks if the ring buffer is full.
+ ///
+ /// The result is relevant until you remove items from the consumer.
+ pub fn is_full(&self) -> bool {
+ self.rb.is_full()
+ }
+
+ /// The length of the data stored in the buffer
+ ///
+ /// Actual length may be equal to or greater than the returned value.
+ pub fn len(&self) -> usize {
+ self.rb.len()
+ }
+
+ /// The remaining space in the buffer.
+ ///
+ /// Actual remaining space may be equal to or less than the returning value.
+ pub fn remaining(&self) -> usize {
+ self.rb.remaining()
+ }
+
+ fn get_ranges(&self) -> (Range<usize>, Range<usize>) {
+ let head = self.rb.head.load(Ordering::Acquire);
+ let tail = self.rb.tail.load(Ordering::Acquire);
+ let len = unsafe { self.rb.data.get_ref().len() };
+
+ if head < tail {
+ (head..tail, 0..0)
+ } else if head > tail {
+ (head..len, 0..tail)
+ } else {
+ (0..0, 0..0)
+ }
+ }
+
+ /// Gives immutable access to the elements contained by the ring buffer without removing them.
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn access<F: FnOnce(&[T], &[T])>(&self, f: F) {
+ let ranges = self.get_ranges();
+
+ unsafe {
+ let left = &self.rb.data.get_ref()[ranges.0];
+ let right = &self.rb.data.get_ref()[ranges.1];
+
+ f(
+ &*(left as *const [MaybeUninit<T>] as *const [T]),
+ &*(right as *const [MaybeUninit<T>] as *const [T]),
+ );
+ }
+ }
+
+ /// Gives mutable access to the elements contained by the ring buffer without removing them.
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn access_mut<F: FnOnce(&mut [T], &mut [T])>(&mut self, f: F) {
+ let ranges = self.get_ranges();
+
+ unsafe {
+ let left = &mut self.rb.data.get_mut()[ranges.0];
+ let right = &mut self.rb.data.get_mut()[ranges.1];
+
+ f(
+ &mut *(left as *mut [MaybeUninit<T>] as *mut [T]),
+ &mut *(right as *mut [MaybeUninit<T>] as *mut [T]),
+ );
+ }
+ }
+
+ /// Allows to read from ring buffer memory directry.
+ ///
+ /// *This function is unsafe because it gives access to possibly uninitialized memory*
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// `f` should return number of elements been read.
+ /// *There is no checks for returned number - it remains on the developer's conscience.*
+ ///
+ /// The method **always** calls `f` even if ring buffer is empty.
+ ///
+ /// The method returns number returned from `f`.
+ pub unsafe fn pop_access<F>(&mut self, f: F) -> usize
+ where
+ F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
+ {
+ let head = self.rb.head.load(Ordering::Acquire);
+ let tail = self.rb.tail.load(Ordering::Acquire);
+ let len = self.rb.data.get_ref().len();
+
+ let ranges = if head < tail {
+ (head..tail, 0..0)
+ } else if head > tail {
+ (head..len, 0..tail)
+ } else {
+ (0..0, 0..0)
+ };
+
+ let slices = (
+ &mut self.rb.data.get_mut()[ranges.0],
+ &mut self.rb.data.get_mut()[ranges.1],
+ );
+
+ let n = f(slices.0, slices.1);
+
+ if n > 0 {
+ let new_head = (head + n) % len;
+ self.rb.head.store(new_head, Ordering::Release);
+ }
+ n
+ }
+
+ /// Copies data from the ring buffer to the slice in byte-to-byte manner.
+ ///
+ /// The `elems` slice should contain **un-initialized** data before the method call.
+ /// After the call the copied part of data in `elems` should be interpreted as **initialized**.
+ /// The remaining part is still **un-iniitilized**.
+ ///
+ /// Returns the number of items been copied.
+ pub unsafe fn pop_copy(&mut self, elems: &mut [MaybeUninit<T>]) -> usize {
+ self.pop_access(|left, right| {
+ if elems.len() < left.len() {
+ copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), elems.len());
+ elems.len()
+ } else {
+ copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), left.len());
+ if elems.len() < left.len() + right.len() {
+ copy_nonoverlapping(
+ right.as_ptr(),
+ elems.as_mut_ptr().add(left.len()),
+ elems.len() - left.len(),
+ );
+ elems.len()
+ } else {
+ copy_nonoverlapping(
+ right.as_ptr(),
+ elems.as_mut_ptr().add(left.len()),
+ right.len(),
+ );
+ left.len() + right.len()
+ }
+ }
+ })
+ }
+
+ /// Removes latest element from the ring buffer and returns it.
+ /// Returns `None` if the ring buffer is empty.
+ pub fn pop(&mut self) -> Option<T> {
+ let mut elem_mu = MaybeUninit::uninit();
+ let n = unsafe {
+ self.pop_access(|slice, _| {
+ if !slice.is_empty() {
+ mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
+ 1
+ } else {
+ 0
+ }
+ })
+ };
+ match n {
+ 0 => None,
+ 1 => Some(unsafe { elem_mu.assume_init() }),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Repeatedly calls the closure `f` passing elements removed from the ring buffer to it.
+ ///
+ /// The closure is called until it returns `false` or the ring buffer is empty.
+ ///
+ /// The method returns number of elements been removed from the buffer.
+ pub fn pop_each<F: FnMut(T) -> bool>(&mut self, mut f: F, count: Option<usize>) -> usize {
+ unsafe {
+ self.pop_access(|left, right| {
+ let lb = match count {
+ Some(n) => min(n, left.len()),
+ None => left.len(),
+ };
+ for (i, dst) in left[0..lb].iter_mut().enumerate() {
+ if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) {
+ return i + 1;
+ }
+ }
+ if lb < left.len() {
+ return lb;
+ }
+
+ let rb = match count {
+ Some(n) => min(n - lb, right.len()),
+ None => right.len(),
+ };
+ for (i, dst) in right[0..rb].iter_mut().enumerate() {
+ if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) {
+ return i + lb + 1;
+ }
+ }
+ left.len() + right.len()
+ })
+ }
+ }
+
+ /// Iterate immutably over the elements contained by the ring buffer without removing them.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn for_each<F: FnMut(&T)>(&self, mut f: F) {
+ self.access(|left, right| {
+ for c in left.iter() {
+ f(c);
+ }
+ for c in right.iter() {
+ f(c);
+ }
+ });
+ }
+
+ /// Iterate mutably over the elements contained by the ring buffer without removing them.
+ ///
+ /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
+ pub fn for_each_mut<F: FnMut(&mut T)>(&mut self, mut f: F) {
+ self.access_mut(|left, right| {
+ for c in left.iter_mut() {
+ f(c);
+ }
+ for c in right.iter_mut() {
+ f(c);
+ }
+ });
+ }
+
+ /// Removes at most `count` elements from the consumer and appends them to the producer.
+ /// If `count` is `None` then as much as possible elements will be moved.
+ /// The producer and consumer parts may be of different buffers as well as of the same one.
+ ///
+ /// On success returns count of elements been moved.
+ pub fn move_to(&mut self, other: &mut Producer<T>, count: Option<usize>) -> usize {
+ move_items(self, other, count)
+ }
+}
+
+impl<T: Sized + Copy> Consumer<T> {
+ /// Removes first elements from the ring buffer and writes them into a slice.
+ /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
+ ///
+ /// On success returns count of elements been removed from the ring buffer.
+ pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
+ unsafe { self.pop_copy(&mut *(elems as *mut [T] as *mut [MaybeUninit<T>])) }
+ }
+}
+
+impl Consumer<u8> {
+ /// Removes at most first `count` bytes from the ring buffer and writes them into
+ /// a [`Write`](https://doc.rust-lang.org/std/io/trait.Write.html) instance.
+ /// If `count` is `None` then as much as possible bytes will be written.
+ ///
+ /// Returns `Ok(n)` if `write` is succeded. `n` is number of bytes been written.
+ /// `n == 0` means that either `write` returned zero or ring buffer is empty.
+ ///
+ /// If `write` is failed then error is returned.
+ pub fn write_into(
+ &mut self,
+ writer: &mut dyn Write,
+ count: Option<usize>,
+ ) -> io::Result<usize> {
+ let mut err = None;
+ let n = unsafe {
+ self.pop_access(|left, _| -> usize {
+ let left = match count {
+ Some(c) => {
+ if c < left.len() {
+ &mut left[0..c]
+ } else {
+ left
+ }
+ }
+ None => left,
+ };
+ match writer
+ .write(&*(left as *const [MaybeUninit<u8>] as *const [u8]))
+ .and_then(|n| {
+ if n <= left.len() {
+ Ok(n)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Write operation returned invalid number",
+ ))
+ }
+ }) {
+ Ok(n) => n,
+ Err(e) => {
+ err = Some(e);
+ 0
+ }
+ }
+ })
+ };
+ match err {
+ Some(e) => Err(e),
+ None => Ok(n),
+ }
+ }
+}
+
+impl Read for Consumer<u8> {
+ fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
+ let n = self.pop_slice(buffer);
+ if n == 0 && !buffer.is_empty() {
+ Err(io::Error::new(
+ io::ErrorKind::WouldBlock,
+ "Ring buffer is empty",
+ ))
+ } else {
+ Ok(n)
+ }
+ }
+}
diff --git a/third_party/rust/ringbuf/src/lib.rs b/third_party/rust/ringbuf/src/lib.rs
new file mode 100644
index 0000000000..5b45f9017a
--- /dev/null
+++ b/third_party/rust/ringbuf/src/lib.rs
@@ -0,0 +1,134 @@
+//! Lock-free single-producer single-consumer (SPSC) FIFO ring buffer with direct access to inner data.
+//!
+//! # Overview
+//!
+//! `RingBuffer` is the initial structure representing ring buffer itself.
+//! Ring buffer can be splitted into pair of `Producer` and `Consumer`.
+//!
+//! `Producer` and `Consumer` are used to append/remove elements to/from the ring buffer accordingly. They can be safely transfered between threads.
+//! Operations with `Producer` and `Consumer` are lock-free - they're succeded or failed immediately without blocking or waiting.
+//!
+//! Elements can be effectively appended/removed one by one or many at once.
+//! Also data could be loaded/stored directly into/from [`Read`]/[`Write`] instances.
+//! And finally, there are `unsafe` methods allowing thread-safe direct access in place to the inner memory being appended/removed.
+//!
+//! [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
+//! [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+//!
+//! When building with nightly toolchain it is possible to run benchmarks via `cargo bench --features benchmark`.
+//!
+//! # Examples
+//!
+//! ## Simple example
+//!
+//! ```rust
+//! # extern crate ringbuf;
+//! use ringbuf::RingBuffer;
+//! # fn main() {
+//! let rb = RingBuffer::<i32>::new(2);
+//! let (mut prod, mut cons) = rb.split();
+//!
+//! prod.push(0).unwrap();
+//! prod.push(1).unwrap();
+//! assert_eq!(prod.push(2), Err(2));
+//!
+//! assert_eq!(cons.pop().unwrap(), 0);
+//!
+//! prod.push(2).unwrap();
+//!
+//! assert_eq!(cons.pop().unwrap(), 1);
+//! assert_eq!(cons.pop().unwrap(), 2);
+//! assert_eq!(cons.pop(), None);
+//! # }
+//! ```
+//!
+//! ## Message transfer
+//!
+//! This is more complicated example of transfering text message between threads.
+//!
+//! ```rust
+//! # extern crate ringbuf;
+//! use std::io::Read;
+//! use std::thread;
+//! use std::time::Duration;
+//!
+//! use ringbuf::RingBuffer;
+//!
+//! # fn main() {
+//! let buf = RingBuffer::<u8>::new(10);
+//! let (mut prod, mut cons) = buf.split();
+//!
+//! let smsg = "The quick brown fox jumps over the lazy dog";
+//!
+//! let pjh = thread::spawn(move || {
+//! println!("-> sending message: '{}'", smsg);
+//!
+//! let zero = [0 as u8];
+//! let mut bytes = smsg.as_bytes().chain(&zero[..]);
+//! loop {
+//! if prod.is_full() {
+//! println!("-> buffer is full, waiting");
+//! thread::sleep(Duration::from_millis(1));
+//! } else {
+//! let n = prod.read_from(&mut bytes, None).unwrap();
+//! if n == 0 {
+//! break;
+//! }
+//! println!("-> {} bytes sent", n);
+//! }
+//! }
+//!
+//! println!("-> message sent");
+//! });
+//!
+//! let cjh = thread::spawn(move || {
+//! println!("<- receiving message");
+//!
+//! let mut bytes = Vec::<u8>::new();
+//! loop {
+//! if cons.is_empty() {
+//! if bytes.ends_with(&[0]) {
+//! break;
+//! } else {
+//! println!("<- buffer is empty, waiting");
+//! thread::sleep(Duration::from_millis(1));
+//! }
+//! } else {
+//! let n = cons.write_into(&mut bytes, None).unwrap();
+//! println!("<- {} bytes received", n);
+//! }
+//! }
+//!
+//! assert_eq!(bytes.pop().unwrap(), 0);
+//! let msg = String::from_utf8(bytes).unwrap();
+//! println!("<- message received: '{}'", msg);
+//!
+//! msg
+//! });
+//!
+//! pjh.join().unwrap();
+//! let rmsg = cjh.join().unwrap();
+//!
+//! assert_eq!(smsg, rmsg);
+//! # }
+//! ```
+//!
+
+#![cfg_attr(feature = "benchmark", feature(test))]
+
+#[cfg(feature = "benchmark")]
+extern crate test;
+
+#[cfg(feature = "benchmark")]
+mod benchmark;
+
+#[cfg(test)]
+mod tests;
+
+mod consumer;
+mod producer;
+mod ring_buffer;
+
+pub use consumer::*;
+pub use producer::*;
+pub use ring_buffer::*;
diff --git a/third_party/rust/ringbuf/src/producer.rs b/third_party/rust/ringbuf/src/producer.rs
new file mode 100644
index 0000000000..e77e7528f5
--- /dev/null
+++ b/third_party/rust/ringbuf/src/producer.rs
@@ -0,0 +1,274 @@
+use std::{
+ io::{self, Read, Write},
+ mem::{self, MaybeUninit},
+ ptr::copy_nonoverlapping,
+ sync::{atomic::Ordering, Arc},
+};
+
+use crate::{consumer::Consumer, ring_buffer::*};
+
+/// Producer part of ring buffer.
+pub struct Producer<T> {
+ pub(crate) rb: Arc<RingBuffer<T>>,
+}
+
+impl<T: Sized> Producer<T> {
+ /// Returns capacity of the ring buffer.
+ ///
+ /// The capacity of the buffer is constant.
+ pub fn capacity(&self) -> usize {
+ self.rb.capacity()
+ }
+
+ /// Checks if the ring buffer is empty.
+ ///
+ /// The result is relevant until you push items to the producer.
+ pub fn is_empty(&self) -> bool {
+ self.rb.is_empty()
+ }
+
+ /// Checks if the ring buffer is full.
+ ///
+ /// *The result may become irrelevant at any time because of concurring activity of the consumer.*
+ pub fn is_full(&self) -> bool {
+ self.rb.is_full()
+ }
+
+ /// The length of the data stored in the buffer.
+ ///
+ /// Actual length may be equal to or less than the returned value.
+ pub fn len(&self) -> usize {
+ self.rb.len()
+ }
+
+ /// The remaining space in the buffer.
+ ///
+ /// Actual remaining space may be equal to or greater than the returning value.
+ pub fn remaining(&self) -> usize {
+ self.rb.remaining()
+ }
+
+ /// Allows to write into ring buffer memory directry.
+ ///
+ /// *This function is unsafe because it gives access to possibly uninitialized memory*
+ ///
+ /// The method takes a function `f` as argument.
+ /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
+ /// First slice contains older elements.
+ ///
+ /// `f` should return number of elements been written.
+ /// *There is no checks for returned number - it remains on the developer's conscience.*
+ ///
+ /// The method **always** calls `f` even if ring buffer is full.
+ ///
+ /// The method returns number returned from `f`.
+ pub unsafe fn push_access<F>(&mut self, f: F) -> usize
+ where
+ F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
+ {
+ let head = self.rb.head.load(Ordering::Acquire);
+ let tail = self.rb.tail.load(Ordering::Acquire);
+ let len = self.rb.data.get_ref().len();
+
+ let ranges = if tail >= head {
+ if head > 0 {
+ (tail..len, 0..(head - 1))
+ } else if tail < len - 1 {
+ (tail..(len - 1), 0..0)
+ } else {
+ (0..0, 0..0)
+ }
+ } else if tail < head - 1 {
+ (tail..(head - 1), 0..0)
+ } else {
+ (0..0, 0..0)
+ };
+
+ let slices = (
+ &mut self.rb.data.get_mut()[ranges.0],
+ &mut self.rb.data.get_mut()[ranges.1],
+ );
+
+ let n = f(slices.0, slices.1);
+
+ if n > 0 {
+ let new_tail = (tail + n) % len;
+ self.rb.tail.store(new_tail, Ordering::Release);
+ }
+ n
+ }
+
+ /// Copies data from the slice to the ring buffer in byte-to-byte manner.
+ ///
+ /// The `elems` slice should contain **initialized** data before the method call.
+ /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**.
+ ///
+ /// Returns the number of items been copied.
+ pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize {
+ self.push_access(|left, right| -> usize {
+ if elems.len() < left.len() {
+ copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len());
+ elems.len()
+ } else {
+ copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len());
+ if elems.len() < left.len() + right.len() {
+ copy_nonoverlapping(
+ elems.as_ptr().add(left.len()),
+ right.as_mut_ptr(),
+ elems.len() - left.len(),
+ );
+ elems.len()
+ } else {
+ copy_nonoverlapping(
+ elems.as_ptr().add(left.len()),
+ right.as_mut_ptr(),
+ right.len(),
+ );
+ left.len() + right.len()
+ }
+ }
+ })
+ }
+
+ /// Appends an element to the ring buffer.
+ /// On failure returns an error containing the element that hasn't beed appended.
+ pub fn push(&mut self, elem: T) -> Result<(), T> {
+ let mut elem_mu = MaybeUninit::new(elem);
+ let n = unsafe {
+ self.push_access(|slice, _| {
+ if !slice.is_empty() {
+ mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
+ 1
+ } else {
+ 0
+ }
+ })
+ };
+ match n {
+ 0 => Err(unsafe { elem_mu.assume_init() }),
+ 1 => Ok(()),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer.
+ ///
+ /// The closure is called until it returns `None` or the ring buffer is full.
+ ///
+ /// The method returns number of elements been put into the buffer.
+ pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize {
+ unsafe {
+ self.push_access(|left, right| {
+ for (i, dst) in left.iter_mut().enumerate() {
+ match f() {
+ Some(e) => mem::replace(dst, MaybeUninit::new(e)),
+ None => return i,
+ };
+ }
+ for (i, dst) in right.iter_mut().enumerate() {
+ match f() {
+ Some(e) => mem::replace(dst, MaybeUninit::new(e)),
+ None => return i + left.len(),
+ };
+ }
+ left.len() + right.len()
+ })
+ }
+ }
+
+ /// Appends elements from an iterator to the ring buffer.
+ /// Elements that haven't been added to the ring buffer remain in the iterator.
+ ///
+ /// Returns count of elements been appended to the ring buffer.
+ pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize {
+ self.push_each(|| elems.next())
+ }
+
+ /// Removes at most `count` elements from the consumer and appends them to the producer.
+ /// If `count` is `None` then as much as possible elements will be moved.
+ /// The producer and consumer parts may be of different buffers as well as of the same one.
+ ///
+ /// On success returns number of elements been moved.
+ pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize {
+ move_items(other, self, count)
+ }
+}
+
+impl<T: Sized + Copy> Producer<T> {
+ /// Appends elements from slice to the ring buffer.
+ /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
+ ///
+ /// Returns count of elements been appended to the ring buffer.
+ pub fn push_slice(&mut self, elems: &[T]) -> usize {
+ unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) }
+ }
+}
+
+impl Producer<u8> {
+ /// Reads at most `count` bytes
+ /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance
+ /// and appends them to the ring buffer.
+ /// If `count` is `None` then as much as possible bytes will be read.
+ ///
+ /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read.
+ /// `n == 0` means that either `read` returned zero or ring buffer is full.
+ ///
+ /// If `read` is failed then error is returned.
+ pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> {
+ let mut err = None;
+ let n = unsafe {
+ self.push_access(|left, _| -> usize {
+ let left = match count {
+ Some(c) => {
+ if c < left.len() {
+ &mut left[0..c]
+ } else {
+ left
+ }
+ }
+ None => left,
+ };
+ match reader
+ .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8]))
+ .and_then(|n| {
+ if n <= left.len() {
+ Ok(n)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Read operation returned invalid number",
+ ))
+ }
+ }) {
+ Ok(n) => n,
+ Err(e) => {
+ err = Some(e);
+ 0
+ }
+ }
+ })
+ };
+ match err {
+ Some(e) => Err(e),
+ None => Ok(n),
+ }
+ }
+}
+
+impl Write for Producer<u8> {
+ fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
+ let n = self.push_slice(buffer);
+ if n == 0 && !buffer.is_empty() {
+ Err(io::Error::new(
+ io::ErrorKind::WouldBlock,
+ "Ring buffer is full",
+ ))
+ } else {
+ Ok(n)
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
diff --git a/third_party/rust/ringbuf/src/ring_buffer.rs b/third_party/rust/ringbuf/src/ring_buffer.rs
new file mode 100644
index 0000000000..8ae68afa51
--- /dev/null
+++ b/third_party/rust/ringbuf/src/ring_buffer.rs
@@ -0,0 +1,187 @@
+use std::{
+ cell::UnsafeCell,
+ cmp::min,
+ mem::{self, MaybeUninit},
+ ptr::{self, copy},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+};
+
+use crate::{consumer::Consumer, producer::Producer};
+
+pub(crate) struct SharedVec<T: Sized> {
+ cell: UnsafeCell<Vec<T>>,
+}
+
+unsafe impl<T: Sized> Sync for SharedVec<T> {}
+
+impl<T: Sized> SharedVec<T> {
+ pub fn new(data: Vec<T>) -> Self {
+ Self {
+ cell: UnsafeCell::new(data),
+ }
+ }
+ pub unsafe fn get_ref(&self) -> &Vec<T> {
+ &*self.cell.get()
+ }
+ #[allow(clippy::mut_from_ref)]
+ pub unsafe fn get_mut(&self) -> &mut Vec<T> {
+ &mut *self.cell.get()
+ }
+}
+
+/// Ring buffer itself.
+pub struct RingBuffer<T: Sized> {
+ pub(crate) data: SharedVec<MaybeUninit<T>>,
+ pub(crate) head: AtomicUsize,
+ pub(crate) tail: AtomicUsize,
+}
+
+impl<T: Sized> RingBuffer<T> {
+ /// Creates a new instance of a ring buffer.
+ pub fn new(capacity: usize) -> Self {
+ let mut data = Vec::new();
+ data.resize_with(capacity + 1, MaybeUninit::uninit);
+ Self {
+ data: SharedVec::new(data),
+ head: AtomicUsize::new(0),
+ tail: AtomicUsize::new(0),
+ }
+ }
+
+ /// Splits ring buffer into producer and consumer.
+ pub fn split(self) -> (Producer<T>, Consumer<T>) {
+ let arc = Arc::new(self);
+ (Producer { rb: arc.clone() }, Consumer { rb: arc })
+ }
+
+ /// Returns capacity of the ring buffer.
+ pub fn capacity(&self) -> usize {
+ unsafe { self.data.get_ref() }.len() - 1
+ }
+
+ /// Checks if the ring buffer is empty.
+ pub fn is_empty(&self) -> bool {
+ let head = self.head.load(Ordering::Acquire);
+ let tail = self.tail.load(Ordering::Acquire);
+ head == tail
+ }
+
+ /// Checks if the ring buffer is full.
+ pub fn is_full(&self) -> bool {
+ let head = self.head.load(Ordering::Acquire);
+ let tail = self.tail.load(Ordering::Acquire);
+ (tail + 1) % (self.capacity() + 1) == head
+ }
+
+ /// The length of the data in the buffer.
+ pub fn len(&self) -> usize {
+ let head = self.head.load(Ordering::Acquire);
+ let tail = self.tail.load(Ordering::Acquire);
+ (tail + self.capacity() + 1 - head) % (self.capacity() + 1)
+ }
+
+ /// The remaining space in the buffer.
+ pub fn remaining(&self) -> usize {
+ self.capacity() - self.len()
+ }
+}
+
+impl<T: Sized> Drop for RingBuffer<T> {
+ fn drop(&mut self) {
+ let data = unsafe { self.data.get_mut() };
+
+ let head = self.head.load(Ordering::Acquire);
+ let tail = self.tail.load(Ordering::Acquire);
+ let len = data.len();
+
+ let slices = if head <= tail {
+ (head..tail, 0..0)
+ } else {
+ (head..len, 0..tail)
+ };
+
+ let drop = |elem_ref: &mut MaybeUninit<T>| unsafe {
+ mem::replace(elem_ref, MaybeUninit::uninit()).assume_init();
+ };
+ for elem in data[slices.0].iter_mut() {
+ drop(elem);
+ }
+ for elem in data[slices.1].iter_mut() {
+ drop(elem);
+ }
+ }
+}
+
+struct SlicePtr<T: Sized> {
+ pub ptr: *mut T,
+ pub len: usize,
+}
+
+impl<T> SlicePtr<T> {
+ fn null() -> Self {
+ Self {
+ ptr: ptr::null_mut(),
+ len: 0,
+ }
+ }
+ fn new(slice: &mut [T]) -> Self {
+ Self {
+ ptr: slice.as_mut_ptr(),
+ len: slice.len(),
+ }
+ }
+ unsafe fn shift(&mut self, count: usize) {
+ self.ptr = self.ptr.add(count);
+ self.len -= count;
+ }
+}
+
+/// Moves at most `count` items from the `src` consumer to the `dst` producer.
+/// Consumer and producer may be of different buffers as well as of the same one.
+///
+/// `count` is the number of items being moved, if `None` - as much as possible items will be moved.
+///
+/// Returns number of items been moved.
+pub fn move_items<T>(src: &mut Consumer<T>, dst: &mut Producer<T>, count: Option<usize>) -> usize {
+ unsafe {
+ src.pop_access(|src_left, src_right| -> usize {
+ dst.push_access(|dst_left, dst_right| -> usize {
+ let n = count.unwrap_or_else(|| {
+ min(
+ src_left.len() + src_right.len(),
+ dst_left.len() + dst_right.len(),
+ )
+ });
+ let mut m = 0;
+ let mut src = (SlicePtr::new(src_left), SlicePtr::new(src_right));
+ let mut dst = (SlicePtr::new(dst_left), SlicePtr::new(dst_right));
+
+ loop {
+ let k = min(n - m, min(src.0.len, dst.0.len));
+ if k == 0 {
+ break;
+ }
+ copy(src.0.ptr, dst.0.ptr, k);
+ if src.0.len == k {
+ src.0 = src.1;
+ src.1 = SlicePtr::null();
+ } else {
+ src.0.shift(k);
+ }
+ if dst.0.len == k {
+ dst.0 = dst.1;
+ dst.1 = SlicePtr::null();
+ } else {
+ dst.0.shift(k);
+ }
+ m += k
+ }
+
+ m
+ })
+ })
+ }
+}
diff --git a/third_party/rust/ringbuf/src/tests/access.rs b/third_party/rust/ringbuf/src/tests/access.rs
new file mode 100644
index 0000000000..22cba9efee
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/access.rs
@@ -0,0 +1,234 @@
+use std::mem::MaybeUninit;
+
+use crate::RingBuffer;
+
+#[test]
+fn push() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let vs_20 = (123, 456);
+ let push_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ left[0] = MaybeUninit::new(vs_20.0);
+ left[1] = MaybeUninit::new(vs_20.1);
+ 2
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_20) }, 2);
+
+ assert_eq!(cons.pop().unwrap(), vs_20.0);
+ assert_eq!(cons.pop().unwrap(), vs_20.1);
+ assert_eq!(cons.pop(), None);
+
+ let vs_11 = (123, 456);
+ let push_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 1);
+ left[0] = MaybeUninit::new(vs_11.0);
+ right[0] = MaybeUninit::new(vs_11.1);
+ 2
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_11) }, 2);
+
+ assert_eq!(cons.pop().unwrap(), vs_11.0);
+ assert_eq!(cons.pop().unwrap(), vs_11.1);
+ assert_eq!(cons.pop(), None);
+}
+
+#[test]
+fn pop_full() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (_, mut cons) = buf.split();
+
+ let dummy_fn = |_l: &mut [MaybeUninit<i32>], _r: &mut [MaybeUninit<i32>]| -> usize { 0 };
+ assert_eq!(unsafe { cons.pop_access(dummy_fn) }, 0);
+}
+
+#[test]
+fn pop_empty() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (_, mut cons) = buf.split();
+
+ let dummy_fn = |_l: &mut [MaybeUninit<i32>], _r: &mut [MaybeUninit<i32>]| -> usize { 0 };
+ assert_eq!(unsafe { cons.pop_access(dummy_fn) }, 0);
+}
+
+#[test]
+fn pop() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let vs_20 = (123, 456);
+
+ assert_eq!(prod.push(vs_20.0), Ok(()));
+ assert_eq!(prod.push(vs_20.1), Ok(()));
+ assert_eq!(prod.push(0), Err(0));
+
+ let pop_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ assert_eq!(left[0].assume_init(), vs_20.0);
+ assert_eq!(left[1].assume_init(), vs_20.1);
+ 2
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_20) }, 2);
+
+ let vs_11 = (123, 456);
+
+ assert_eq!(prod.push(vs_11.0), Ok(()));
+ assert_eq!(prod.push(vs_11.1), Ok(()));
+ assert_eq!(prod.push(0), Err(0));
+
+ let pop_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 1);
+ assert_eq!(left[0].assume_init(), vs_11.0);
+ assert_eq!(right[0].assume_init(), vs_11.1);
+ 2
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_11) }, 2);
+}
+
+#[test]
+fn push_return() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let push_fn_0 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ 0
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_0) }, 0);
+
+ let push_fn_1 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ left[0] = MaybeUninit::new(12);
+ 1
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_1) }, 1);
+
+ let push_fn_2 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 0);
+ left[0] = MaybeUninit::new(34);
+ 1
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_2) }, 1);
+
+ assert_eq!(cons.pop().unwrap(), 12);
+ assert_eq!(cons.pop().unwrap(), 34);
+ assert_eq!(cons.pop(), None);
+}
+
+#[test]
+fn pop_return() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ assert_eq!(prod.push(12), Ok(()));
+ assert_eq!(prod.push(34), Ok(()));
+ assert_eq!(prod.push(0), Err(0));
+
+ let pop_fn_0 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ 0
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_0) }, 0);
+
+ let pop_fn_1 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ assert_eq!(left[0].assume_init(), 12);
+ 1
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_1) }, 1);
+
+ let pop_fn_2 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 0);
+ assert_eq!(left[0].assume_init(), 34);
+ 1
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_2) }, 1);
+}
+
+#[test]
+fn push_pop() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let vs_20 = (123, 456);
+ let push_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ left[0] = MaybeUninit::new(vs_20.0);
+ left[1] = MaybeUninit::new(vs_20.1);
+ 2
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_20) }, 2);
+
+ let pop_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 2);
+ assert_eq!(right.len(), 0);
+ assert_eq!(left[0].assume_init(), vs_20.0);
+ assert_eq!(left[1].assume_init(), vs_20.1);
+ 2
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_20) }, 2);
+
+ let vs_11 = (123, 456);
+ let push_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 1);
+ left[0] = MaybeUninit::new(vs_11.0);
+ right[0] = MaybeUninit::new(vs_11.1);
+ 2
+ };
+
+ assert_eq!(unsafe { prod.push_access(push_fn_11) }, 2);
+
+ let pop_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize {
+ unsafe {
+ assert_eq!(left.len(), 1);
+ assert_eq!(right.len(), 1);
+ assert_eq!(left[0].assume_init(), vs_11.0);
+ assert_eq!(right[0].assume_init(), vs_11.1);
+ 2
+ }
+ };
+
+ assert_eq!(unsafe { cons.pop_access(pop_fn_11) }, 2);
+}
diff --git a/third_party/rust/ringbuf/src/tests/drop.rs b/third_party/rust/ringbuf/src/tests/drop.rs
new file mode 100644
index 0000000000..d07b9b1ae5
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/drop.rs
@@ -0,0 +1,240 @@
+use std::{cell::RefCell, collections::HashSet};
+
+use crate::RingBuffer;
+
+#[derive(Debug)]
+struct Dropper<'a> {
+ id: i32,
+ set: &'a RefCell<HashSet<i32>>,
+}
+
+impl<'a> Dropper<'a> {
+ fn new(set: &'a RefCell<HashSet<i32>>, id: i32) -> Self {
+ if !set.borrow_mut().insert(id) {
+ panic!("value {} already exists", id);
+ }
+ Self { set, id }
+ }
+}
+
+impl<'a> Drop for Dropper<'a> {
+ fn drop(&mut self) {
+ if !self.set.borrow_mut().remove(&self.id) {
+ panic!("value {} already removed", self.id);
+ }
+ }
+}
+
+#[test]
+fn single() {
+ let set = RefCell::new(HashSet::new());
+
+ let cap = 3;
+ let buf = RingBuffer::new(cap);
+
+ assert_eq!(set.borrow().len(), 0);
+
+ {
+ let (mut prod, mut cons) = buf.split();
+
+ prod.push(Dropper::new(&set, 1)).unwrap();
+ assert_eq!(set.borrow().len(), 1);
+ prod.push(Dropper::new(&set, 2)).unwrap();
+ assert_eq!(set.borrow().len(), 2);
+ prod.push(Dropper::new(&set, 3)).unwrap();
+ assert_eq!(set.borrow().len(), 3);
+
+ cons.pop().unwrap();
+ assert_eq!(set.borrow().len(), 2);
+ cons.pop().unwrap();
+ assert_eq!(set.borrow().len(), 1);
+
+ prod.push(Dropper::new(&set, 4)).unwrap();
+ assert_eq!(set.borrow().len(), 2);
+ }
+
+ assert_eq!(set.borrow().len(), 0);
+}
+
+#[test]
+fn multiple_each() {
+ let set = RefCell::new(HashSet::new());
+
+ let cap = 5;
+ let buf = RingBuffer::new(cap);
+
+ assert_eq!(set.borrow().len(), 0);
+
+ {
+ let (mut prod, mut cons) = buf.split();
+ let mut id = 0;
+ let mut cnt = 0;
+
+ assert_eq!(
+ prod.push_each(|| {
+ if cnt < 4 {
+ id += 1;
+ cnt += 1;
+ Some(Dropper::new(&set, id))
+ } else {
+ None
+ }
+ }),
+ 4
+ );
+ assert_eq!(cnt, 4);
+ assert_eq!(cnt, set.borrow().len());
+
+ assert_eq!(
+ cons.pop_each(
+ |_| {
+ cnt -= 1;
+ true
+ },
+ Some(2)
+ ),
+ 2
+ );
+ assert_eq!(cnt, 2);
+ assert_eq!(cnt, set.borrow().len());
+
+ assert_eq!(
+ prod.push_each(|| {
+ id += 1;
+ cnt += 1;
+ Some(Dropper::new(&set, id))
+ }),
+ 3
+ );
+ assert_eq!(cnt, 5);
+ assert_eq!(cnt, set.borrow().len());
+
+ assert_eq!(
+ cons.pop_each(
+ |_| {
+ cnt -= 1;
+ true
+ },
+ None
+ ),
+ 5
+ );
+ assert_eq!(cnt, 0);
+ assert_eq!(cnt, set.borrow().len());
+
+ assert_eq!(
+ prod.push_each(|| {
+ id += 1;
+ cnt += 1;
+ Some(Dropper::new(&set, id))
+ }),
+ 5
+ );
+ assert_eq!(cnt, 5);
+ assert_eq!(cnt, set.borrow().len());
+ }
+
+ assert_eq!(set.borrow().len(), 0);
+}
+
+/// Test the `pop_each` with internal function that returns false
+#[test]
+fn pop_each_test1() {
+ let cap = 10usize;
+ let (mut producer, mut consumer) = RingBuffer::new(cap).split();
+
+ for i in 0..cap {
+ producer.push((i, vec![0u8; 1000])).unwrap();
+ }
+
+ for _ in 0..cap {
+ let removed = consumer.pop_each(|_val| -> bool { false }, None);
+ assert_eq!(removed, 1);
+ }
+
+ assert_eq!(consumer.len(), 0);
+}
+
+/// Test the `pop_each` with capped pop
+#[test]
+fn pop_each_test2() {
+ let cap = 10usize;
+ let (mut producer, mut consumer) = RingBuffer::new(cap).split();
+
+ for i in 0..cap {
+ producer.push((i, vec![0u8; 1000])).unwrap();
+ }
+
+ for _ in 0..cap {
+ let removed = consumer.pop_each(|_val| -> bool { true }, Some(1));
+ assert_eq!(removed, 1);
+ }
+
+ assert_eq!(consumer.len(), 0);
+}
+
+/// Test the `push_each` with internal function that adds only 1 element.
+#[test]
+fn push_each_test1() {
+ let cap = 10usize;
+ let (mut producer, mut consumer) = RingBuffer::new(cap).split();
+
+ for i in 0..cap {
+ let mut count = 0;
+ // Add 1 element at a time
+ let added = producer.push_each(|| -> Option<(usize, Vec<u8>)> {
+ if count == 0 {
+ count += 1;
+ Some((i, vec![0u8; 1000]))
+ } else {
+ None
+ }
+ });
+ assert_eq!(added, 1);
+ }
+
+ for _ in 0..cap {
+ consumer.pop().unwrap();
+ }
+
+ assert_eq!(consumer.len(), 0);
+}
+
+/// Test the `push_each` with split internal buffer
+#[test]
+fn push_each_test2() {
+ let cap = 10usize;
+ let cap_half = 5usize;
+ let (mut producer, mut consumer) = RingBuffer::new(cap).split();
+
+ // Fill the entire buffer
+ for i in 0..cap {
+ producer.push((i, vec![0u8; 1000])).unwrap();
+ }
+
+ // Remove half elements
+ for _ in 0..cap_half {
+ consumer.pop().unwrap();
+ }
+
+ // Re add half elements one by one and check the return count.
+ for i in 0..cap_half {
+ let mut count = 0;
+ // Add 1 element at a time
+ let added = producer.push_each(|| -> Option<(usize, Vec<u8>)> {
+ if count == 0 {
+ count += 1;
+ Some((i, vec![0u8; 1000]))
+ } else {
+ None
+ }
+ });
+ assert_eq!(added, 1);
+ }
+
+ for _ in 0..cap {
+ consumer.pop().unwrap();
+ }
+
+ assert_eq!(consumer.len(), 0);
+}
diff --git a/third_party/rust/ringbuf/src/tests/message.rs b/third_party/rust/ringbuf/src/tests/message.rs
new file mode 100644
index 0000000000..0a20a6137b
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/message.rs
@@ -0,0 +1,167 @@
+use std::{
+ io::{self, Read, Write},
+ thread,
+ time::Duration,
+};
+
+use crate::RingBuffer;
+
+const THE_BOOK_FOREWORD: &'static str = "
+It wasn’t always so clear, but the Rust programming language is fundamentally about empowerment: no matter what kind of code you are writing now, Rust empowers you to reach farther, to program with confidence in a wider variety of domains than you did before.
+Take, for example, “systems-level” work that deals with low-level details of memory management, data representation, and concurrency. Traditionally, this realm of programming is seen as arcane, accessible only to a select few who have devoted the necessary years learning to avoid its infamous pitfalls. And even those who practice it do so with caution, lest their code be open to exploits, crashes, or corruption.
+Rust breaks down these barriers by eliminating the old pitfalls and providing a friendly, polished set of tools to help you along the way. Programmers who need to “dip down” into lower-level control can do so with Rust, without taking on the customary risk of crashes or security holes, and without having to learn the fine points of a fickle toolchain. Better yet, the language is designed to guide you naturally towards reliable code that is efficient in terms of speed and memory usage.
+Programmers who are already working with low-level code can use Rust to raise their ambitions. For example, introducing parallelism in Rust is a relatively low-risk operation: the compiler will catch the classical mistakes for you. And you can tackle more aggressive optimizations in your code with the confidence that you won’t accidentally introduce crashes or vulnerabilities.
+But Rust isn’t limited to low-level systems programming. It’s expressive and ergonomic enough to make CLI apps, web servers, and many other kinds of code quite pleasant to write — you’ll find simple examples of both later in the book. Working with Rust allows you to build skills that transfer from one domain to another; you can learn Rust by writing a web app, then apply those same skills to target your Raspberry Pi.
+This book fully embraces the potential of Rust to empower its users. It’s a friendly and approachable text intended to help you level up not just your knowledge of Rust, but also your reach and confidence as a programmer in general. So dive in, get ready to learn—and welcome to the Rust community!
+
+— Nicholas Matsakis and Aaron Turon
+";
+
+#[test]
+fn push_pop_slice() {
+ let buf = RingBuffer::<u8>::new(7);
+ let (mut prod, mut cons) = buf.split();
+
+ let smsg = THE_BOOK_FOREWORD;
+
+ let pjh = thread::spawn(move || {
+ let mut bytes = smsg.as_bytes();
+ while bytes.len() > 0 {
+ let n = prod.push_slice(bytes);
+ if n > 0 {
+ bytes = &bytes[n..bytes.len()]
+ } else {
+ thread::sleep(Duration::from_millis(1))
+ }
+ }
+ loop {
+ match prod.push(0) {
+ Ok(()) => break,
+ Err(_) => thread::sleep(Duration::from_millis(1)),
+ }
+ }
+ });
+
+ let cjh = thread::spawn(move || {
+ let mut bytes = Vec::<u8>::new();
+ let mut buffer = [0; 5];
+ loop {
+ let n = cons.pop_slice(&mut buffer);
+ if n > 0 {
+ bytes.extend_from_slice(&buffer[0..n])
+ } else {
+ if bytes.ends_with(&[0]) {
+ break;
+ } else {
+ thread::sleep(Duration::from_millis(1));
+ }
+ }
+ }
+
+ assert_eq!(bytes.pop().unwrap(), 0);
+ String::from_utf8(bytes).unwrap()
+ });
+
+ pjh.join().unwrap();
+ let rmsg = cjh.join().unwrap();
+
+ assert_eq!(smsg, rmsg);
+}
+
+#[test]
+fn read_from_write_into() {
+ let buf = RingBuffer::<u8>::new(7);
+ let (mut prod, mut cons) = buf.split();
+
+ let smsg = THE_BOOK_FOREWORD;
+
+ let pjh = thread::spawn(move || {
+ let zero = [0 as u8];
+ let mut bytes = smsg.as_bytes().chain(&zero[..]);
+ loop {
+ if prod.is_full() {
+ thread::sleep(Duration::from_millis(1));
+ } else {
+ if prod.read_from(&mut bytes, None).unwrap() == 0 {
+ break;
+ }
+ }
+ }
+ });
+
+ let cjh = thread::spawn(move || {
+ let mut bytes = Vec::<u8>::new();
+ loop {
+ if cons.is_empty() {
+ if bytes.ends_with(&[0]) {
+ break;
+ } else {
+ thread::sleep(Duration::from_millis(1));
+ }
+ } else {
+ cons.write_into(&mut bytes, None).unwrap();
+ }
+ }
+
+ assert_eq!(bytes.pop().unwrap(), 0);
+ String::from_utf8(bytes).unwrap()
+ });
+
+ pjh.join().unwrap();
+ let rmsg = cjh.join().unwrap();
+
+ assert_eq!(smsg, rmsg);
+}
+
+#[test]
+fn read_write() {
+ let buf = RingBuffer::<u8>::new(7);
+ let (mut prod, mut cons) = buf.split();
+
+ let smsg = THE_BOOK_FOREWORD;
+
+ let pjh = thread::spawn(move || {
+ let mut bytes = smsg.as_bytes();
+ while bytes.len() > 0 {
+ match prod.write(bytes) {
+ Ok(n) => bytes = &bytes[n..bytes.len()],
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ thread::sleep(Duration::from_millis(1));
+ }
+ }
+ }
+ loop {
+ match prod.push(0) {
+ Ok(()) => break,
+ Err(_) => thread::sleep(Duration::from_millis(1)),
+ }
+ }
+ });
+
+ let cjh = thread::spawn(move || {
+ let mut bytes = Vec::<u8>::new();
+ let mut buffer = [0; 5];
+ loop {
+ match cons.read(&mut buffer) {
+ Ok(n) => bytes.extend_from_slice(&buffer[0..n]),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ if bytes.ends_with(&[0]) {
+ break;
+ } else {
+ thread::sleep(Duration::from_millis(1));
+ }
+ }
+ }
+ }
+
+ assert_eq!(bytes.pop().unwrap(), 0);
+ String::from_utf8(bytes).unwrap()
+ });
+
+ pjh.join().unwrap();
+ let rmsg = cjh.join().unwrap();
+
+ assert_eq!(smsg, rmsg);
+}
diff --git a/third_party/rust/ringbuf/src/tests/mod.rs b/third_party/rust/ringbuf/src/tests/mod.rs
new file mode 100644
index 0000000000..c07e9d8316
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/mod.rs
@@ -0,0 +1,6 @@
+mod access;
+mod drop;
+mod message;
+mod multiple;
+mod read_write;
+mod single;
diff --git a/third_party/rust/ringbuf/src/tests/multiple.rs b/third_party/rust/ringbuf/src/tests/multiple.rs
new file mode 100644
index 0000000000..002076f0fa
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/multiple.rs
@@ -0,0 +1,140 @@
+use crate::RingBuffer;
+
+#[test]
+fn for_each() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ prod.push(10).unwrap();
+ prod.push(20).unwrap();
+
+ let mut sum_1 = 0;
+ cons.for_each(|v| {
+ sum_1 += *v;
+ });
+
+ let first = cons.pop().expect("First element not available");
+ let second = cons.pop().expect("Second element not available");
+
+ assert_eq!(sum_1, first + second);
+}
+
+#[test]
+fn for_each_mut() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ prod.push(10).unwrap();
+ prod.push(20).unwrap();
+
+ cons.for_each_mut(|v| {
+ *v *= 2;
+ });
+
+ let mut sum_1 = 0;
+ cons.for_each_mut(|v| {
+ sum_1 += *v;
+ });
+
+ let first = cons.pop().expect("First element not available");
+ let second = cons.pop().expect("Second element not available");
+
+ assert_eq!(sum_1, first + second);
+}
+
+#[test]
+fn push_pop_slice() {
+ let buf = RingBuffer::<i32>::new(4);
+ let (mut prod, mut cons) = buf.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod.push_slice(&[]), 0);
+ assert_eq!(prod.push_slice(&[0, 1, 2]), 3);
+
+ assert_eq!(cons.pop_slice(&mut tmp[0..2]), 2);
+ assert_eq!(tmp[0..2], [0, 1]);
+
+ assert_eq!(prod.push_slice(&[3, 4]), 2);
+ assert_eq!(prod.push_slice(&[5, 6]), 1);
+
+ assert_eq!(cons.pop_slice(&mut tmp[0..3]), 3);
+ assert_eq!(tmp[0..3], [2, 3, 4]);
+
+ assert_eq!(prod.push_slice(&[6, 7, 8, 9]), 3);
+
+ assert_eq!(cons.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [5, 6, 7, 8]);
+}
+
+#[test]
+fn move_slice() {
+ let buf0 = RingBuffer::<i32>::new(4);
+ let buf1 = RingBuffer::<i32>::new(4);
+ let (mut prod0, mut cons0) = buf0.split();
+ let (mut prod1, mut cons1) = buf1.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod0.push_slice(&[0, 1, 2]), 3);
+
+ assert_eq!(prod1.move_from(&mut cons0, None), 3);
+ assert_eq!(prod1.move_from(&mut cons0, None), 0);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 3);
+ assert_eq!(tmp[0..3], [0, 1, 2]);
+
+ assert_eq!(prod0.push_slice(&[3, 4, 5]), 3);
+
+ assert_eq!(prod1.move_from(&mut cons0, None), 3);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 3);
+ assert_eq!(tmp[0..3], [3, 4, 5]);
+
+ assert_eq!(prod1.push_slice(&[6, 7, 8]), 3);
+ assert_eq!(prod0.push_slice(&[9, 10]), 2);
+
+ assert_eq!(prod1.move_from(&mut cons0, None), 1);
+ assert_eq!(prod1.move_from(&mut cons0, None), 0);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [6, 7, 8, 9]);
+}
+
+#[test]
+fn move_slice_count() {
+ let buf0 = RingBuffer::<i32>::new(4);
+ let buf1 = RingBuffer::<i32>::new(4);
+ let (mut prod0, mut cons0) = buf0.split();
+ let (mut prod1, mut cons1) = buf1.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod0.push_slice(&[0, 1, 2]), 3);
+
+ assert_eq!(prod1.move_from(&mut cons0, Some(2)), 2);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 2);
+ assert_eq!(tmp[0..2], [0, 1]);
+
+ assert_eq!(prod1.move_from(&mut cons0, Some(2)), 1);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 1);
+ assert_eq!(tmp[0..1], [2]);
+
+ assert_eq!(prod0.push_slice(&[3, 4, 5, 6]), 4);
+
+ assert_eq!(prod1.move_from(&mut cons0, Some(3)), 3);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 3);
+ assert_eq!(tmp[0..3], [3, 4, 5]);
+
+ assert_eq!(prod0.push_slice(&[7, 8, 9]), 3);
+
+ assert_eq!(prod1.move_from(&mut cons0, Some(5)), 4);
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [6, 7, 8, 9]);
+}
diff --git a/third_party/rust/ringbuf/src/tests/read_write.rs b/third_party/rust/ringbuf/src/tests/read_write.rs
new file mode 100644
index 0000000000..08270416af
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/read_write.rs
@@ -0,0 +1,159 @@
+use std::io::{self};
+
+use crate::RingBuffer;
+
+#[test]
+fn from() {
+ let buf0 = RingBuffer::<u8>::new(4);
+ let buf1 = RingBuffer::<u8>::new(4);
+ let (mut prod0, mut cons0) = buf0.split();
+ let (mut prod1, mut cons1) = buf1.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod0.push_slice(&[0, 1, 2]), 3);
+
+ match prod1.read_from(&mut cons0, None) {
+ Ok(n) => assert_eq!(n, 3),
+ other => panic!("{:?}", other),
+ }
+ match prod1.read_from(&mut cons0, None) {
+ Err(e) => {
+ assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
+ }
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 3);
+ assert_eq!(tmp[0..3], [0, 1, 2]);
+
+ assert_eq!(prod0.push_slice(&[3, 4, 5]), 3);
+
+ match prod1.read_from(&mut cons0, None) {
+ Ok(n) => assert_eq!(n, 2),
+ other => panic!("{:?}", other),
+ }
+ assert_eq!(cons1.pop_slice(&mut tmp), 2);
+ assert_eq!(tmp[0..2], [3, 4]);
+
+ match prod1.read_from(&mut cons0, None) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+ assert_eq!(cons1.pop_slice(&mut tmp), 1);
+ assert_eq!(tmp[0..1], [5]);
+
+ assert_eq!(prod1.push_slice(&[6, 7, 8]), 3);
+ assert_eq!(prod0.push_slice(&[9, 10]), 2);
+
+ match prod1.read_from(&mut cons0, None) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+ match prod1.read_from(&mut cons0, None) {
+ Ok(n) => assert_eq!(n, 0),
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [6, 7, 8, 9]);
+}
+
+#[test]
+fn into() {
+ let buf0 = RingBuffer::<u8>::new(4);
+ let buf1 = RingBuffer::<u8>::new(4);
+ let (mut prod0, mut cons0) = buf0.split();
+ let (mut prod1, mut cons1) = buf1.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod0.push_slice(&[0, 1, 2]), 3);
+
+ match cons0.write_into(&mut prod1, None) {
+ Ok(n) => assert_eq!(n, 3),
+ other => panic!("{:?}", other),
+ }
+ match cons0.write_into(&mut prod1, None) {
+ Ok(n) => assert_eq!(n, 0),
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 3);
+ assert_eq!(tmp[0..3], [0, 1, 2]);
+
+ assert_eq!(prod0.push_slice(&[3, 4, 5]), 3);
+
+ match cons0.write_into(&mut prod1, None) {
+ Ok(n) => assert_eq!(n, 2),
+ other => panic!("{:?}", other),
+ }
+ assert_eq!(cons1.pop_slice(&mut tmp), 2);
+ assert_eq!(tmp[0..2], [3, 4]);
+
+ match cons0.write_into(&mut prod1, None) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+ assert_eq!(cons1.pop_slice(&mut tmp), 1);
+ assert_eq!(tmp[0..1], [5]);
+
+ assert_eq!(prod1.push_slice(&[6, 7, 8]), 3);
+ assert_eq!(prod0.push_slice(&[9, 10]), 2);
+
+ match cons0.write_into(&mut prod1, None) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+ match cons0.write_into(&mut prod1, None) {
+ Err(e) => {
+ assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
+ }
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [6, 7, 8, 9]);
+}
+
+#[test]
+fn count() {
+ let buf0 = RingBuffer::<u8>::new(4);
+ let buf1 = RingBuffer::<u8>::new(4);
+ let (mut prod0, mut cons0) = buf0.split();
+ let (mut prod1, mut cons1) = buf1.split();
+
+ let mut tmp = [0; 5];
+
+ assert_eq!(prod0.push_slice(&[0, 1, 2, 3]), 4);
+
+ match prod1.read_from(&mut cons0, Some(3)) {
+ Ok(n) => assert_eq!(n, 3),
+ other => panic!("{:?}", other),
+ }
+ match prod1.read_from(&mut cons0, Some(2)) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [0, 1, 2, 3]);
+
+ assert_eq!(prod0.push_slice(&[4, 5, 6, 7]), 4);
+
+ match cons0.write_into(&mut prod1, Some(3)) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+ match cons0.write_into(&mut prod1, Some(2)) {
+ Ok(n) => assert_eq!(n, 2),
+ other => panic!("{:?}", other),
+ }
+ match cons0.write_into(&mut prod1, Some(2)) {
+ Ok(n) => assert_eq!(n, 1),
+ other => panic!("{:?}", other),
+ }
+
+ assert_eq!(cons1.pop_slice(&mut tmp), 4);
+ assert_eq!(tmp[0..4], [4, 5, 6, 7]);
+}
diff --git a/third_party/rust/ringbuf/src/tests/single.rs b/third_party/rust/ringbuf/src/tests/single.rs
new file mode 100644
index 0000000000..29958861fe
--- /dev/null
+++ b/third_party/rust/ringbuf/src/tests/single.rs
@@ -0,0 +1,201 @@
+use std::{sync::atomic::Ordering, thread};
+
+use crate::RingBuffer;
+
+fn head_tail<T>(rb: &RingBuffer<T>) -> (usize, usize) {
+ (
+ rb.head.load(Ordering::Acquire),
+ rb.tail.load(Ordering::Acquire),
+ )
+}
+
+#[test]
+fn capacity() {
+ let cap = 13;
+ let buf = RingBuffer::<i32>::new(cap);
+ assert_eq!(buf.capacity(), cap);
+}
+#[test]
+fn split_capacity() {
+ let cap = 13;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (prod, cons) = buf.split();
+
+ assert_eq!(prod.capacity(), cap);
+ assert_eq!(cons.capacity(), cap);
+}
+
+#[test]
+fn split_threads() {
+ let buf = RingBuffer::<i32>::new(10);
+ let (prod, cons) = buf.split();
+
+ let pjh = thread::spawn(move || {
+ let _ = prod;
+ });
+
+ let cjh = thread::spawn(move || {
+ let _ = cons;
+ });
+
+ pjh.join().unwrap();
+ cjh.join().unwrap();
+}
+
+#[test]
+fn push() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, _) = buf.split();
+
+ assert_eq!(head_tail(&prod.rb), (0, 0));
+
+ assert_eq!(prod.push(123), Ok(()));
+ assert_eq!(head_tail(&prod.rb), (0, 1));
+
+ assert_eq!(prod.push(234), Ok(()));
+ assert_eq!(head_tail(&prod.rb), (0, 2));
+
+ assert_eq!(prod.push(345), Err(345));
+ assert_eq!(head_tail(&prod.rb), (0, 2));
+}
+
+#[test]
+fn pop_empty() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (_, mut cons) = buf.split();
+
+ assert_eq!(head_tail(&cons.rb), (0, 0));
+
+ assert_eq!(cons.pop(), None);
+ assert_eq!(head_tail(&cons.rb), (0, 0));
+}
+
+#[test]
+fn push_pop_one() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let vcap = cap + 1;
+ let values = [12, 34, 56, 78, 90];
+ assert_eq!(head_tail(&cons.rb), (0, 0));
+
+ for (i, v) in values.iter().enumerate() {
+ assert_eq!(prod.push(*v), Ok(()));
+ assert_eq!(head_tail(&cons.rb), (i % vcap, (i + 1) % vcap));
+
+ assert_eq!(cons.pop().unwrap(), *v);
+ assert_eq!(head_tail(&cons.rb), ((i + 1) % vcap, (i + 1) % vcap));
+
+ assert_eq!(cons.pop(), None);
+ assert_eq!(head_tail(&cons.rb), ((i + 1) % vcap, (i + 1) % vcap));
+ }
+}
+
+#[test]
+fn push_pop_all() {
+ let cap = 2;
+ let buf = RingBuffer::<i32>::new(cap);
+ let (mut prod, mut cons) = buf.split();
+
+ let vcap = cap + 1;
+ let values = [(12, 34, 13), (56, 78, 57), (90, 10, 91)];
+ assert_eq!(head_tail(&cons.rb), (0, 0));
+
+ for (i, v) in values.iter().enumerate() {
+ assert_eq!(prod.push(v.0), Ok(()));
+ assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 1) % vcap));
+
+ assert_eq!(prod.push(v.1), Ok(()));
+ assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 2) % vcap));
+
+ assert_eq!(prod.push(v.2).unwrap_err(), v.2);
+ assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 2) % vcap));
+
+ assert_eq!(cons.pop().unwrap(), v.0);
+ assert_eq!(
+ head_tail(&cons.rb),
+ ((cap * i + 1) % vcap, (cap * i + 2) % vcap)
+ );
+
+ assert_eq!(cons.pop().unwrap(), v.1);
+ assert_eq!(
+ head_tail(&cons.rb),
+ ((cap * i + 2) % vcap, (cap * i + 2) % vcap)
+ );
+
+ assert_eq!(cons.pop(), None);
+ assert_eq!(
+ head_tail(&cons.rb),
+ ((cap * i + 2) % vcap, (cap * i + 2) % vcap)
+ );
+ }
+}
+
+#[test]
+fn empty_full() {
+ let buf = RingBuffer::<i32>::new(1);
+ let (mut prod, cons) = buf.split();
+
+ assert!(prod.is_empty());
+ assert!(cons.is_empty());
+ assert!(!prod.is_full());
+ assert!(!cons.is_full());
+
+ assert_eq!(prod.push(123), Ok(()));
+
+ assert!(!prod.is_empty());
+ assert!(!cons.is_empty());
+ assert!(prod.is_full());
+ assert!(cons.is_full());
+}
+
+#[test]
+fn len_remaining() {
+ let buf = RingBuffer::<i32>::new(2);
+ let (mut prod, mut cons) = buf.split();
+
+ assert_eq!(prod.len(), 0);
+ assert_eq!(cons.len(), 0);
+ assert_eq!(prod.remaining(), 2);
+ assert_eq!(cons.remaining(), 2);
+
+ assert_eq!(prod.push(123), Ok(()));
+
+ assert_eq!(prod.len(), 1);
+ assert_eq!(cons.len(), 1);
+ assert_eq!(prod.remaining(), 1);
+ assert_eq!(cons.remaining(), 1);
+
+ assert_eq!(prod.push(456), Ok(()));
+
+ assert_eq!(prod.len(), 2);
+ assert_eq!(cons.len(), 2);
+ assert_eq!(prod.remaining(), 0);
+ assert_eq!(cons.remaining(), 0);
+
+ assert_eq!(cons.pop(), Some(123));
+
+ assert_eq!(prod.len(), 1);
+ assert_eq!(cons.len(), 1);
+ assert_eq!(prod.remaining(), 1);
+ assert_eq!(cons.remaining(), 1);
+
+ assert_eq!(cons.pop(), Some(456));
+
+ assert_eq!(prod.len(), 0);
+ assert_eq!(cons.len(), 0);
+ assert_eq!(prod.remaining(), 2);
+ assert_eq!(cons.remaining(), 2);
+
+ // now head is at 2, so tail will be at 0. This caught an overflow error
+ // when tail+1 < head because of the substraction of usize.
+ assert_eq!(prod.push(789), Ok(()));
+
+ assert_eq!(prod.len(), 1);
+ assert_eq!(cons.len(), 1);
+ assert_eq!(prod.remaining(), 1);
+ assert_eq!(cons.remaining(), 1);
+}