summaryrefslogtreecommitdiffstats
path: root/third_party/rust/triple_buffer/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/triple_buffer/src/lib.rs')
-rw-r--r--third_party/rust/triple_buffer/src/lib.rs1092
1 files changed, 1092 insertions, 0 deletions
diff --git a/third_party/rust/triple_buffer/src/lib.rs b/third_party/rust/triple_buffer/src/lib.rs
new file mode 100644
index 0000000000..4d4bbf8c38
--- /dev/null
+++ b/third_party/rust/triple_buffer/src/lib.rs
@@ -0,0 +1,1092 @@
+//! Triple buffering in Rust
+//!
+//! In this crate, we propose a Rust implementation of triple buffering. This is
+//! a non-blocking thread synchronization mechanism that can be used when a
+//! single producer thread is frequently updating a shared data block, and a
+//! single consumer thread wants to be able to read the latest available version
+//! of the shared data whenever it feels like it.
+//!
+//! # Examples
+//!
+//! For many use cases, you can use the default interface, designed for maximal
+//! ergonomics and synchronization performance, which is based on moving values
+//! into the buffer and subsequently accessing them via shared references:
+//!
+//! ```
+//! // Create a triple buffer
+//! use triple_buffer::TripleBuffer;
+//! let buf = TripleBuffer::new(0);
+//!
+//! // Split it into an input and output interface, to be respectively sent to
+//! // the producer thread and the consumer thread
+//! let (mut buf_input, mut buf_output) = buf.split();
+//!
+//! // The producer can move a value into the buffer at any time
+//! buf_input.write(42);
+//!
+//! // The consumer can access the latest value from the producer at any time
+//! let latest_value_ref = buf_output.read();
+//! assert_eq!(*latest_value_ref, 42);
+//! ```
+//!
+//! In situations where moving the original value away and being unable to
+//! modify it after the fact is too costly, such as if creating a new value
+//! involves dynamic memory allocation, you can opt into the lower-level "raw"
+//! interface, which allows you to access the buffer's data in place and
+//! precisely control when updates are propagated.
+//!
+//! This data access method is more error-prone and comes at a small performance
+//! cost, which is why you will need to enable it explicitly using the "raw"
+//! [cargo feature](http://doc.crates.io/manifest.html#usage-in-end-products).
+//!
+//! ```
+//! # #[cfg(feature = "raw")]
+//! # {
+//! // Create and split a triple buffer
+//! use triple_buffer::TripleBuffer;
+//! let buf = TripleBuffer::new(String::with_capacity(42));
+//! let (mut buf_input, mut buf_output) = buf.split();
+//!
+//! // Mutate the input buffer in place
+//! {
+//! // Acquire a reference to the input buffer
+//! let raw_input = buf_input.raw_input_buffer();
+//!
+//! // In general, you don't know what's inside of the buffer, so you should
+//! // always reset the value before use (this is a type-specific process).
+//! raw_input.clear();
+//!
+//! // Perform an in-place update
+//! raw_input.push_str("Hello, ");
+//! }
+//!
+//! // Publish the input buffer update
+//! buf_input.raw_publish();
+//!
+//! // Manually fetch the buffer update from the consumer interface
+//! buf_output.raw_update();
+//!
+//! // Acquire a mutable reference to the output buffer
+//! let raw_output = buf_output.raw_output_buffer();
+//!
+//! // Post-process the output value before use
+//! raw_output.push_str("world!");
+//! # }
+//! ```
+
+#![deny(missing_debug_implementations, missing_docs)]
+
+use cache_padded::CachePadded;
+
+use std::{
+ cell::UnsafeCell,
+ sync::{
+ atomic::{AtomicU8, Ordering},
+ Arc,
+ },
+};
+
+/// A triple buffer, useful for nonblocking and thread-safe data sharing
+///
+/// A triple buffer is a single-producer single-consumer nonblocking
+/// communication channel which behaves like a shared variable: the producer
+/// submits regular updates, and the consumer accesses the latest available
+/// value whenever it feels like it.
+///
+/// The input and output fields of this struct are what producers and consumers
+/// actually use in practice. They can safely be moved away from the
+/// TripleBuffer struct after construction, and are further documented below.
+///
+#[derive(Debug)]
+pub struct TripleBuffer<T: Send> {
+ /// Input object used by producers to send updates
+ input: Input<T>,
+
+ /// Output object used by consumers to read the current value
+ output: Output<T>,
+}
+//
+impl<T: Clone + Send> TripleBuffer<T> {
+ /// Construct a triple buffer with a certain initial value
+ #[allow(clippy::needless_pass_by_value)]
+ pub fn new(initial: T) -> Self {
+ Self::new_impl(|| initial.clone())
+ }
+}
+//
+impl<T: Default + Send> Default for TripleBuffer<T> {
+ /// Construct a triple buffer with a default-constructed value
+ fn default() -> Self {
+ Self::new_impl(T::default)
+ }
+}
+//
+impl<T: Send> TripleBuffer<T> {
+ /// Construct a triple buffer, using a functor to generate initial values
+ fn new_impl(mut generator: impl FnMut() -> T) -> Self {
+ // Start with the shared state...
+ let shared_state = Arc::new(SharedState::new(|_i| generator(), 0));
+
+ // ...then construct the input and output structs
+ TripleBuffer {
+ input: Input {
+ shared: shared_state.clone(),
+ input_idx: 1,
+ },
+ output: Output {
+ shared: shared_state,
+ output_idx: 2,
+ },
+ }
+ }
+
+ /// Extract input and output of the triple buffer
+ pub fn split(self) -> (Input<T>, Output<T>) {
+ (self.input, self.output)
+ }
+}
+//
+// The Clone and PartialEq traits are used internally for testing.
+//
+#[doc(hidden)]
+impl<T: Clone + Send> Clone for TripleBuffer<T> {
+ fn clone(&self) -> Self {
+ // Clone the shared state. This is safe because at this layer of the
+ // interface, one needs an Input/Output &mut to mutate the shared state.
+ let shared_state = Arc::new(unsafe { (*self.input.shared).clone() });
+
+ // ...then the input and output structs
+ TripleBuffer {
+ input: Input {
+ shared: shared_state.clone(),
+ input_idx: self.input.input_idx,
+ },
+ output: Output {
+ shared: shared_state,
+ output_idx: self.output.output_idx,
+ },
+ }
+ }
+}
+//
+#[doc(hidden)]
+impl<T: PartialEq + Send> PartialEq for TripleBuffer<T> {
+ fn eq(&self, other: &Self) -> bool {
+ // Compare the shared states. This is safe because at this layer of the
+ // interface, one needs an Input/Output &mut to mutate the shared state.
+ let shared_states_equal = unsafe { (*self.input.shared).eq(&*other.input.shared) };
+
+ // Compare the rest of the triple buffer states
+ shared_states_equal
+ && (self.input.input_idx == other.input.input_idx)
+ && (self.output.output_idx == other.output.output_idx)
+ }
+}
+
+/// Producer interface to the triple buffer
+///
+/// The producer of data can use this struct to submit updates to the triple
+/// buffer whenever he likes. These updates are nonblocking: a collision between
+/// the producer and the consumer will result in cache contention, but deadlocks
+/// and scheduling-induced slowdowns cannot happen.
+///
+#[derive(Debug)]
+pub struct Input<T: Send> {
+ /// Reference-counted shared state
+ shared: Arc<SharedState<T>>,
+
+ /// Index of the input buffer (which is private to the producer)
+ input_idx: BufferIndex,
+}
+//
+// Public interface
+impl<T: Send> Input<T> {
+ /// Write a new value into the triple buffer
+ pub fn write(&mut self, value: T) {
+ // Update the input buffer
+ *self.input_buffer() = value;
+
+ // Publish our update to the consumer
+ self.publish();
+ }
+
+ /// Check if the consumer has fetched our last submission yet
+ ///
+ /// This method is only intended for diagnostics purposes. Please do not let
+ /// it inform your decision of sending or not sending a value, as that would
+ /// effectively be building a very poor spinlock-based double buffer
+ /// implementation. If what you truly need is a double buffer, build
+ /// yourself a proper blocking one instead of wasting CPU time.
+ ///
+ pub fn consumed(&self) -> bool {
+ let back_info = self.shared.back_info.load(Ordering::Relaxed);
+ back_info & BACK_DIRTY_BIT == 0
+ }
+
+ /// Get raw access to the input buffer
+ ///
+ /// This advanced interface allows you to update the input buffer in place,
+ /// which can in some case improve performance by avoiding to create values
+ /// of type T repeatedy when this is an expensive process.
+ ///
+ /// However, by opting into it, you force yourself to take into account
+ /// subtle implementation details which you could normally ignore.
+ ///
+ /// First, the buffer does not contain the last value that you sent (which
+ /// is now into the hands of the consumer). In fact, the consumer is allowed
+ /// to write complete garbage into it if it feels so inclined. All you can
+ /// safely assume is that it contains a valid value of type T.
+ ///
+ /// Second, we do not send updates automatically. You need to call
+ /// raw_publish() in order to propagate a buffer update to the consumer.
+ /// Alternative designs based on Drop were considered, but ultimately deemed
+ /// too magical for the target audience of this method.
+ ///
+ /// To use this method, you have to enable the crate's `raw` feature
+ #[cfg(any(feature = "raw", test))]
+ pub fn raw_input_buffer(&mut self) -> &mut T {
+ self.input_buffer()
+ }
+
+ /// Unconditionally publish an update, checking for overwrites
+ ///
+ /// After updating the input buffer using raw_input_buffer(), you can use
+ /// this method to publish your updates to the consumer. It will send back
+ /// an output flag which tells you whether an unread value was overwritten.
+ ///
+ /// To use this method, you have to enable the crate's `raw` feature
+ #[cfg(any(feature = "raw", test))]
+ pub fn raw_publish(&mut self) -> bool {
+ self.publish()
+ }
+}
+//
+// Internal interface
+impl<T: Send> Input<T> {
+ /// Access the input buffer
+ ///
+ /// This is safe because the synchronization protocol ensures that we have
+ /// exclusive access to this buffer.
+ ///
+ fn input_buffer(&mut self) -> &mut T {
+ let input_ptr = self.shared.buffers[self.input_idx as usize].get();
+ unsafe { &mut *input_ptr }
+ }
+
+ /// Tell which memory ordering should be used for buffer swaps
+ ///
+ /// The right answer depends on whether the consumer is allowed to write
+ /// into the output buffer or not. If it can, then we must synchronize with
+ /// its writes. If not, we only need to propagate our own writes.
+ ///
+ fn swap_ordering() -> Ordering {
+ if cfg!(feature = "raw") {
+ Ordering::AcqRel
+ } else {
+ Ordering::Release
+ }
+ }
+
+ /// Publish an update, checking for overwrites (internal version)
+ fn publish(&mut self) -> bool {
+ // Swap the input buffer and the back buffer, setting the dirty bit
+ let former_back_info = self.shared.back_info.swap(
+ self.input_idx | BACK_DIRTY_BIT,
+ Self::swap_ordering(), // Propagate buffer updates as well
+ );
+
+ // The old back buffer becomes our new input buffer
+ self.input_idx = former_back_info & BACK_INDEX_MASK;
+
+ // Tell whether we have overwritten unread data
+ former_back_info & BACK_DIRTY_BIT != 0
+ }
+}
+
+/// Consumer interface to the triple buffer
+///
+/// The consumer of data can use this struct to access the latest published
+/// update from the producer whenever he likes. Readout is nonblocking: a
+/// collision between the producer and consumer will result in cache contention,
+/// but deadlocks and scheduling-induced slowdowns cannot happen.
+///
+#[derive(Debug)]
+pub struct Output<T: Send> {
+ /// Reference-counted shared state
+ shared: Arc<SharedState<T>>,
+
+ /// Index of the output buffer (which is private to the consumer)
+ output_idx: BufferIndex,
+}
+//
+// Public interface
+impl<T: Send> Output<T> {
+ /// Access the latest value from the triple buffer
+ pub fn read(&mut self) -> &T {
+ // Fetch updates from the producer
+ self.update();
+
+ // Give access to the output buffer
+ self.output_buffer()
+ }
+
+ /// Tell whether a buffer update is incoming from the producer
+ ///
+ /// This method is only intended for diagnostics purposes. Please do not let
+ /// it inform your decision of reading a value or not, as that would
+ /// effectively be building a very poor spinlock-based double buffer
+ /// implementation. If what you truly need is a double buffer, build
+ /// yourself a proper blocking one instead of wasting CPU time.
+ ///
+ pub fn updated(&self) -> bool {
+ let back_info = self.shared.back_info.load(Ordering::Relaxed);
+ back_info & BACK_DIRTY_BIT != 0
+ }
+
+ /// Get raw access to the output buffer
+ ///
+ /// This advanced interface allows you to modify the contents of the output
+ /// buffer, which can in some case improve performance by avoiding to create
+ /// values of type T when this is an expensive process. One possible
+ /// application, for example, is to post-process values from the producer.
+ ///
+ /// However, by opting into it, you force yourself to take into account
+ /// subtle implementation details which you could normally ignore.
+ ///
+ /// First, keep in mind that you can lose access to the current output
+ /// buffer any time read() or raw_update() is called, as it will be replaced
+ /// by an updated buffer from the producer automatically.
+ ///
+ /// Second, to reduce the potential for the aforementioned usage error, this
+ /// method does not update the output buffer automatically. You need to call
+ /// raw_update() in order to fetch buffer updates from the producer.
+ ///
+ /// To use this method, you have to enable the crate's `raw` feature
+ #[cfg(any(feature = "raw", test))]
+ pub fn raw_output_buffer(&mut self) -> &mut T {
+ self.output_buffer()
+ }
+
+ /// Update the output buffer
+ ///
+ /// Check for incoming updates from the producer, and if so, update our
+ /// output buffer to the latest data version. This operation will overwrite
+ /// any changes which you may have commited into the output buffer.
+ ///
+ /// Return a flag telling whether an update was carried out
+ ///
+ /// To use this method, you have to enable the crate's `raw` feature
+ #[cfg(any(feature = "raw", test))]
+ pub fn raw_update(&mut self) -> bool {
+ self.update()
+ }
+}
+//
+// Internal interface
+impl<T: Send> Output<T> {
+ /// Access the output buffer (internal version)
+ ///
+ /// This is safe because the synchronization protocol ensures that we have
+ /// exclusive access to this buffer.
+ ///
+ fn output_buffer(&mut self) -> &mut T {
+ let output_ptr = self.shared.buffers[self.output_idx as usize].get();
+ unsafe { &mut *output_ptr }
+ }
+
+ /// Tell which memory ordering should be used for buffer swaps
+ ///
+ /// The right answer depends on whether the client is allowed to write into
+ /// the output buffer or not. If it can, then we must propagate these writes
+ /// back to the producer. Otherwise, we only need to fetch producer updates.
+ ///
+ fn swap_ordering() -> Ordering {
+ if cfg!(feature = "raw") {
+ Ordering::AcqRel
+ } else {
+ Ordering::Acquire
+ }
+ }
+
+ /// Check out incoming output buffer updates (internal version)
+ fn update(&mut self) -> bool {
+ // Access the shared state
+ let shared_state = &(*self.shared);
+
+ // Check if an update is present in the back-buffer
+ let updated = self.updated();
+ if updated {
+ // If so, exchange our output buffer with the back-buffer, thusly
+ // acquiring exclusive access to the old back buffer while giving
+ // the producer a new back-buffer to write to.
+ let former_back_info = shared_state.back_info.swap(
+ self.output_idx,
+ Self::swap_ordering(), // Synchronize with buffer updates
+ );
+
+ // Make the old back-buffer our new output buffer
+ self.output_idx = former_back_info & BACK_INDEX_MASK;
+ }
+
+ // Tell whether an update was carried out
+ updated
+ }
+}
+
+/// Triple buffer shared state
+///
+/// In a triple buffering communication protocol, the producer and consumer
+/// share the following storage:
+///
+/// - Three memory buffers suitable for storing the data at hand
+/// - Information about the back-buffer: which buffer is the current back-buffer
+/// and whether an update was published since the last readout.
+///
+#[derive(Debug)]
+struct SharedState<T: Send> {
+ /// Data storage buffers
+ buffers: [CachePadded<UnsafeCell<T>>; 3],
+
+ /// Information about the current back-buffer state
+ back_info: CachePadded<AtomicBackBufferInfo>,
+}
+//
+#[doc(hidden)]
+impl<T: Send> SharedState<T> {
+ /// Given (a way to generate) buffer contents and the back info, build the shared state
+ fn new(mut gen_buf_data: impl FnMut(usize) -> T, back_info: BackBufferInfo) -> Self {
+ let mut make_buf = |i| -> CachePadded<UnsafeCell<T>> {
+ CachePadded::new(UnsafeCell::new(gen_buf_data(i)))
+ };
+ Self {
+ buffers: [make_buf(0), make_buf(1), make_buf(2)],
+ back_info: CachePadded::new(AtomicBackBufferInfo::new(back_info)),
+ }
+ }
+}
+//
+#[doc(hidden)]
+impl<T: Clone + Send> SharedState<T> {
+ /// Cloning the shared state is unsafe because you must ensure that no one
+ /// is concurrently accessing it, since &self is enough for writing.
+ unsafe fn clone(&self) -> Self {
+ Self::new(
+ |i| (*self.buffers[i].get()).clone(),
+ self.back_info.load(Ordering::Relaxed),
+ )
+ }
+}
+//
+#[doc(hidden)]
+impl<T: PartialEq + Send> SharedState<T> {
+ /// Equality is unsafe for the same reason as cloning: you must ensure that
+ /// no one is concurrently accessing the triple buffer to avoid data races.
+ unsafe fn eq(&self, other: &Self) -> bool {
+ // Check whether the contents of all buffers are equal...
+ let buffers_equal = self
+ .buffers
+ .iter()
+ .zip(other.buffers.iter())
+ .all(|tuple| -> bool {
+ let (cell1, cell2) = tuple;
+ *cell1.get() == *cell2.get()
+ });
+
+ // ...then check whether the rest of the shared state is equal
+ buffers_equal
+ && (self.back_info.load(Ordering::Relaxed) == other.back_info.load(Ordering::Relaxed))
+ }
+}
+//
+unsafe impl<T: Send> Sync for SharedState<T> {}
+
+/// Index types used for triple buffering
+///
+/// These types are used to index into triple buffers. In addition, the
+/// BackBufferInfo type is actually a bitfield, whose third bit (numerical
+/// value: 4) is set to 1 to indicate that the producer published an update into
+/// the back-buffer, and reset to 0 when the consumer fetches the update.
+///
+type BufferIndex = u8;
+type BackBufferInfo = BufferIndex;
+//
+type AtomicBackBufferInfo = AtomicU8;
+const BACK_INDEX_MASK: u8 = 0b11; // Mask used to extract back-buffer index
+const BACK_DIRTY_BIT: u8 = 0b100; // Bit set by producer to signal updates
+
+/// Unit tests
+#[cfg(test)]
+mod tests {
+ use super::{BufferIndex, SharedState, TripleBuffer, BACK_DIRTY_BIT, BACK_INDEX_MASK};
+
+ use std::{fmt::Debug, ops::Deref, sync::atomic::Ordering, thread, time::Duration};
+
+ use testbench::{
+ self,
+ race_cell::{RaceCell, Racey},
+ };
+
+ /// Check that triple buffers are properly initialized
+ #[test]
+ fn initial_state() {
+ // Let's create a triple buffer
+ let mut buf = TripleBuffer::new(42);
+ check_buf_state(&mut buf, false);
+ assert_eq!(*buf.output.read(), 42);
+ }
+
+ /// Check that the shared state's unsafe equality operator works
+ #[test]
+ fn partial_eq_shared() {
+ // Let's create some dummy shared state
+ let dummy_state = SharedState::<u16>::new(|i| [111, 222, 333][i], 0b10);
+
+ // Check that the dummy state is equal to itself
+ assert!(unsafe { dummy_state.eq(&dummy_state) });
+
+ // Check that it's not equal to a state where buffer contents differ
+ assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [114, 222, 333][i], 0b10)) });
+ assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 225, 333][i], 0b10)) });
+ assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 222, 336][i], 0b10)) });
+
+ // Check that it's not equal to a state where the back info differs
+ assert!(unsafe {
+ !dummy_state.eq(&SharedState::<u16>::new(
+ |i| [111, 222, 333][i],
+ BACK_DIRTY_BIT & 0b10,
+ ))
+ });
+ assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 222, 333][i], 0b01)) });
+ }
+
+ /// Check that TripleBuffer's PartialEq impl works
+ #[test]
+ fn partial_eq() {
+ // Create a triple buffer
+ let buf = TripleBuffer::new("test");
+
+ // Check that it is equal to itself
+ assert_eq!(buf, buf);
+
+ // Make another buffer with different contents. As buffer creation is
+ // deterministic, this should only have an impact on the shared state,
+ // but the buffers should nevertheless be considered different.
+ let buf2 = TripleBuffer::new("taste");
+ assert_eq!(buf.input.input_idx, buf2.input.input_idx);
+ assert_eq!(buf.output.output_idx, buf2.output.output_idx);
+ assert!(buf != buf2);
+
+ // Check that changing either the input or output buffer index will
+ // also lead two TripleBuffers to be considered different (this test
+ // technically creates an invalid TripleBuffer state, but it's the only
+ // way to check that the PartialEq impl is exhaustive)
+ let mut buf3 = TripleBuffer::new("test");
+ assert_eq!(buf, buf3);
+ let old_input_idx = buf3.input.input_idx;
+ buf3.input.input_idx = buf3.output.output_idx;
+ assert!(buf != buf3);
+ buf3.input.input_idx = old_input_idx;
+ buf3.output.output_idx = old_input_idx;
+ assert!(buf != buf3);
+ }
+
+ /// Check that the shared state's unsafe clone operator works
+ #[test]
+ fn clone_shared() {
+ // Let's create some dummy shared state
+ let dummy_state = SharedState::<u8>::new(|i| [123, 231, 132][i], BACK_DIRTY_BIT & 0b01);
+
+ // Now, try to clone it
+ let dummy_state_copy = unsafe { dummy_state.clone() };
+
+ // Check that the contents of the original state did not change
+ assert!(unsafe {
+ dummy_state.eq(&SharedState::<u8>::new(
+ |i| [123, 231, 132][i],
+ BACK_DIRTY_BIT & 0b01,
+ ))
+ });
+
+ // Check that the contents of the original and final state are identical
+ assert!(unsafe { dummy_state.eq(&dummy_state_copy) });
+ }
+
+ /// Check that TripleBuffer's Clone impl works
+ #[test]
+ fn clone() {
+ // Create a triple buffer
+ let mut buf = TripleBuffer::new(4.2);
+
+ // Put it in a nontrivial state
+ unsafe {
+ *buf.input.shared.buffers[0].get() = 1.2;
+ *buf.input.shared.buffers[1].get() = 3.4;
+ *buf.input.shared.buffers[2].get() = 5.6;
+ }
+ buf.input
+ .shared
+ .back_info
+ .store(BACK_DIRTY_BIT & 0b01, Ordering::Relaxed);
+ buf.input.input_idx = 0b10;
+ buf.output.output_idx = 0b00;
+
+ // Now clone it
+ let buf_clone = buf.clone();
+
+ // Check that the clone uses its own, separate shared data storage
+ assert_eq!(
+ as_ptr(&buf_clone.output.shared),
+ as_ptr(&buf_clone.output.shared)
+ );
+ assert!(as_ptr(&buf_clone.input.shared) != as_ptr(&buf.input.shared));
+
+ // Check that it is identical from PartialEq's point of view
+ assert_eq!(buf, buf_clone);
+
+ // Check that the contents of the original buffer did not change
+ unsafe {
+ assert_eq!(*buf.input.shared.buffers[0].get(), 1.2);
+ assert_eq!(*buf.input.shared.buffers[1].get(), 3.4);
+ assert_eq!(*buf.input.shared.buffers[2].get(), 5.6);
+ }
+ assert_eq!(
+ buf.input.shared.back_info.load(Ordering::Relaxed),
+ BACK_DIRTY_BIT & 0b01
+ );
+ assert_eq!(buf.input.input_idx, 0b10);
+ assert_eq!(buf.output.output_idx, 0b00);
+ }
+
+ /// Check that the low-level publish/update primitives work
+ #[test]
+ fn swaps() {
+ // Create a new buffer, and a way to track any changes to it
+ let mut buf = TripleBuffer::new([123, 456]);
+ let old_buf = buf.clone();
+ let old_input_idx = old_buf.input.input_idx;
+ let old_shared = &old_buf.input.shared;
+ let old_back_info = old_shared.back_info.load(Ordering::Relaxed);
+ let old_back_idx = old_back_info & BACK_INDEX_MASK;
+ let old_output_idx = old_buf.output.output_idx;
+
+ // Check that updating from a clean state works
+ assert!(!buf.output.raw_update());
+ assert_eq!(buf, old_buf);
+ check_buf_state(&mut buf, false);
+
+ // Check that publishing from a clean state works
+ assert!(!buf.input.raw_publish());
+ let mut expected_buf = old_buf.clone();
+ expected_buf.input.input_idx = old_back_idx;
+ expected_buf
+ .input
+ .shared
+ .back_info
+ .store(old_input_idx | BACK_DIRTY_BIT, Ordering::Relaxed);
+ assert_eq!(buf, expected_buf);
+ check_buf_state(&mut buf, true);
+
+ // Check that overwriting a dirty state works
+ assert!(buf.input.raw_publish());
+ let mut expected_buf = old_buf.clone();
+ expected_buf.input.input_idx = old_input_idx;
+ expected_buf
+ .input
+ .shared
+ .back_info
+ .store(old_back_idx | BACK_DIRTY_BIT, Ordering::Relaxed);
+ assert_eq!(buf, expected_buf);
+ check_buf_state(&mut buf, true);
+
+ // Check that updating from a dirty state works
+ assert!(buf.output.raw_update());
+ expected_buf.output.output_idx = old_back_idx;
+ expected_buf
+ .output
+ .shared
+ .back_info
+ .store(old_output_idx, Ordering::Relaxed);
+ assert_eq!(buf, expected_buf);
+ check_buf_state(&mut buf, false);
+ }
+
+ /// Check that (sequentially) writing to a triple buffer works
+ #[test]
+ fn sequential_write() {
+ // Let's create a triple buffer
+ let mut buf = TripleBuffer::new(false);
+
+ // Back up the initial buffer state
+ let old_buf = buf.clone();
+
+ // Perform a write
+ buf.input.write(true);
+
+ // Check new implementation state
+ {
+ // Starting from the old buffer state...
+ let mut expected_buf = old_buf.clone();
+
+ // ...write the new value in and swap...
+ *expected_buf.input.raw_input_buffer() = true;
+ expected_buf.input.raw_publish();
+
+ // Nothing else should have changed
+ assert_eq!(buf, expected_buf);
+ check_buf_state(&mut buf, true);
+ }
+ }
+
+ /// Check that (sequentially) reading from a triple buffer works
+ #[test]
+ fn sequential_read() {
+ // Let's create a triple buffer and write into it
+ let mut buf = TripleBuffer::new(1.0);
+ buf.input.write(4.2);
+
+ // Test readout from dirty (freshly written) triple buffer
+ {
+ // Back up the initial buffer state
+ let old_buf = buf.clone();
+
+ // Read from the buffer
+ let result = *buf.output.read();
+
+ // Output value should be correct
+ assert_eq!(result, 4.2);
+
+ // Result should be equivalent to carrying out an update
+ let mut expected_buf = old_buf.clone();
+ assert!(expected_buf.output.raw_update());
+ assert_eq!(buf, expected_buf);
+ check_buf_state(&mut buf, false);
+ }
+
+ // Test readout from clean (unchanged) triple buffer
+ {
+ // Back up the initial buffer state
+ let old_buf = buf.clone();
+
+ // Read from the buffer
+ let result = *buf.output.read();
+
+ // Output value should be correct
+ assert_eq!(result, 4.2);
+
+ // Buffer state should be unchanged
+ assert_eq!(buf, old_buf);
+ check_buf_state(&mut buf, false);
+ }
+ }
+
+ /// Check that contended concurrent reads and writes work
+ #[test]
+ #[ignore]
+ fn contended_concurrent_read_write() {
+ // We will stress the infrastructure by performing this many writes
+ // as a reader continuously reads the latest value
+ const TEST_WRITE_COUNT: usize = 100_000_000;
+
+ // This is the buffer that our reader and writer will share
+ let buf = TripleBuffer::new(RaceCell::new(0));
+ let (mut buf_input, mut buf_output) = buf.split();
+
+ // Concurrently run a writer which increments a shared value in a loop,
+ // and a reader which makes sure that no unexpected value slips in.
+ let mut last_value = 0usize;
+ testbench::concurrent_test_2(
+ move || {
+ for value in 1..=TEST_WRITE_COUNT {
+ buf_input.write(RaceCell::new(value));
+ }
+ },
+ move || {
+ while last_value < TEST_WRITE_COUNT {
+ let new_racey_value = buf_output.read().get();
+ match new_racey_value {
+ Racey::Consistent(new_value) => {
+ assert!((new_value >= last_value) && (new_value <= TEST_WRITE_COUNT));
+ last_value = new_value;
+ }
+ Racey::Inconsistent => {
+ panic!("Inconsistent state exposed by the buffer!");
+ }
+ }
+ }
+ },
+ );
+ }
+
+ /// Check that uncontended concurrent reads and writes work
+ ///
+ /// **WARNING:** This test unfortunately needs to have timing-dependent
+ /// behaviour to do its job. If it fails for you, try the following:
+ ///
+ /// - Close running applications in the background
+ /// - Re-run the tests with only one OS thread (--test-threads=1)
+ /// - Increase the writer sleep period
+ ///
+ #[test]
+ #[ignore]
+ fn uncontended_concurrent_read_write() {
+ // We will stress the infrastructure by performing this many writes
+ // as a reader continuously reads the latest value
+ const TEST_WRITE_COUNT: usize = 625;
+
+ // This is the buffer that our reader and writer will share
+ let buf = TripleBuffer::new(RaceCell::new(0));
+ let (mut buf_input, mut buf_output) = buf.split();
+
+ // Concurrently run a writer which slowly increments a shared value,
+ // and a reader which checks that it can receive every update
+ let mut last_value = 0usize;
+ testbench::concurrent_test_2(
+ move || {
+ for value in 1..=TEST_WRITE_COUNT {
+ buf_input.write(RaceCell::new(value));
+ thread::yield_now();
+ thread::sleep(Duration::from_millis(32));
+ }
+ },
+ move || {
+ while last_value < TEST_WRITE_COUNT {
+ let new_racey_value = buf_output.read().get();
+ match new_racey_value {
+ Racey::Consistent(new_value) => {
+ assert!((new_value >= last_value) && (new_value - last_value <= 1));
+ last_value = new_value;
+ }
+ Racey::Inconsistent => {
+ panic!("Inconsistent state exposed by the buffer!");
+ }
+ }
+ }
+ },
+ );
+ }
+
+ /// When raw mode is enabled, the consumer is allowed to modify its bufffer,
+ /// which means that it will unknowingly send back data to the producer.
+ /// This creates new correctness requirements for the synchronization
+ /// protocol, which must be checked as well.
+ #[test]
+ #[ignore]
+ #[cfg(feature = "raw")]
+ fn concurrent_bidirectional_exchange() {
+ // We will stress the infrastructure by performing this many writes
+ // as a reader continuously reads the latest value
+ const TEST_WRITE_COUNT: usize = 100_000_000;
+
+ // This is the buffer that our reader and writer will share
+ let buf = TripleBuffer::new(RaceCell::new(0));
+ let (mut buf_input, mut buf_output) = buf.split();
+
+ // Concurrently run a writer which increments a shared value in a loop,
+ // and a reader which makes sure that no unexpected value slips in.
+ testbench::concurrent_test_2(
+ move || {
+ for new_value in 1..=TEST_WRITE_COUNT {
+ match buf_input.raw_input_buffer().get() {
+ Racey::Consistent(curr_value) => {
+ assert!(curr_value <= TEST_WRITE_COUNT);
+ }
+ Racey::Inconsistent => {
+ panic!("Inconsistent state exposed by the buffer!");
+ }
+ }
+ buf_input.write(RaceCell::new(new_value));
+ }
+ },
+ move || {
+ let mut last_value = 0usize;
+ while last_value < TEST_WRITE_COUNT {
+ match buf_output.raw_output_buffer().get() {
+ Racey::Consistent(new_value) => {
+ assert!((new_value >= last_value) && (new_value <= TEST_WRITE_COUNT));
+ last_value = new_value;
+ }
+ Racey::Inconsistent => {
+ panic!("Inconsistent state exposed by the buffer!");
+ }
+ }
+ if buf_output.updated() {
+ buf_output.raw_output_buffer().set(last_value / 2);
+ buf_output.raw_update();
+ }
+ }
+ },
+ );
+ }
+
+ /// Range check for triple buffer indexes
+ #[allow(unused_comparisons)]
+ fn index_in_range(idx: BufferIndex) -> bool {
+ (idx >= 0) & (idx <= 2)
+ }
+
+ /// Get a pointer to the target of some reference (e.g. an &, an Arc...)
+ fn as_ptr<P: Deref>(ref_like: &P) -> *const P::Target {
+ &(**ref_like) as *const _
+ }
+
+ /// Check the state of a buffer, and the effect of queries on it
+ fn check_buf_state<T>(buf: &mut TripleBuffer<T>, expected_dirty_bit: bool)
+ where
+ T: Clone + Debug + PartialEq + Send,
+ {
+ // Make a backup of the buffer's initial state
+ let initial_buf = buf.clone();
+
+ // Check that the input and output point to the same shared state
+ assert_eq!(as_ptr(&buf.input.shared), as_ptr(&buf.output.shared));
+
+ // Access the shared state and decode back-buffer information
+ let back_info = buf.input.shared.back_info.load(Ordering::Relaxed);
+ let back_idx = back_info & BACK_INDEX_MASK;
+ let back_buffer_dirty = back_info & BACK_DIRTY_BIT != 0;
+
+ // Input-/output-/back-buffer indexes must be in range
+ assert!(index_in_range(buf.input.input_idx));
+ assert!(index_in_range(buf.output.output_idx));
+ assert!(index_in_range(back_idx));
+
+ // Input-/output-/back-buffer indexes must be distinct
+ assert!(buf.input.input_idx != buf.output.output_idx);
+ assert!(buf.input.input_idx != back_idx);
+ assert!(buf.output.output_idx != back_idx);
+
+ // Back-buffer must have the expected dirty bit
+ assert_eq!(back_buffer_dirty, expected_dirty_bit);
+
+ // Check that the "input buffer" query behaves as expected
+ assert_eq!(
+ as_ptr(&buf.input.raw_input_buffer()),
+ buf.input.shared.buffers[buf.input.input_idx as usize].get()
+ );
+ assert_eq!(*buf, initial_buf);
+
+ // Check that the "consumed" query behaves as expected
+ assert_eq!(!buf.input.consumed(), expected_dirty_bit);
+ assert_eq!(*buf, initial_buf);
+
+ // Check that the output_buffer query works in the initial state
+ assert_eq!(
+ as_ptr(&buf.output.raw_output_buffer()),
+ buf.output.shared.buffers[buf.output.output_idx as usize].get()
+ );
+ assert_eq!(*buf, initial_buf);
+
+ // Check that the output buffer query works in the initial state
+ assert_eq!(buf.output.updated(), expected_dirty_bit);
+ assert_eq!(*buf, initial_buf);
+ }
+}
+
+/// Performance benchmarks
+///
+/// These benchmarks masquerading as tests are a stopgap solution until
+/// benchmarking lands in Stable Rust. They should be compiled in release mode,
+/// and run with only one OS thread. In addition, the default behaviour of
+/// swallowing test output should obviously be suppressed.
+///
+/// TL;DR: cargo test --release -- --ignored --nocapture --test-threads=1
+///
+/// TODO: Switch to standard Rust benchmarks once they are stable
+///
+#[cfg(test)]
+mod benchmarks {
+ use super::TripleBuffer;
+ use testbench;
+
+ /// Benchmark for clean read performance
+ #[test]
+ #[ignore]
+ fn clean_read() {
+ // Create a buffer
+ let mut buf = TripleBuffer::new(0u32);
+
+ // Benchmark clean reads
+ testbench::benchmark(2_500_000_000, || {
+ let read = *buf.output.read();
+ assert!(read < u32::max_value());
+ });
+ }
+
+ /// Benchmark for write performance
+ #[test]
+ #[ignore]
+ fn write() {
+ // Create a buffer
+ let mut buf = TripleBuffer::new(0u32);
+
+ // Benchmark writes
+ let mut iter = 1u32;
+ testbench::benchmark(640_000_000, || {
+ buf.input.write(iter);
+ iter += 1;
+ });
+ }
+
+ /// Benchmark for write + dirty read performance
+ #[test]
+ #[ignore]
+ fn write_and_dirty_read() {
+ // Create a buffer
+ let mut buf = TripleBuffer::new(0u32);
+
+ // Benchmark writes + dirty reads
+ let mut iter = 1u32;
+ testbench::benchmark(290_000_000u32, || {
+ buf.input.write(iter);
+ iter += 1;
+ let read = *buf.output.read();
+ assert!(read < u32::max_value());
+ });
+ }
+
+ /// Benchmark read performance under concurrent write pressure
+ #[test]
+ #[ignore]
+ fn concurrent_read() {
+ // Create a buffer
+ let buf = TripleBuffer::new(0u32);
+ let (mut buf_input, mut buf_output) = buf.split();
+
+ // Benchmark reads under concurrent write pressure
+ let mut counter = 0u32;
+ testbench::concurrent_benchmark(
+ 56_000_000u32,
+ move || {
+ let read = *buf_output.read();
+ assert!(read < u32::max_value());
+ },
+ move || {
+ buf_input.write(counter);
+ counter = (counter + 1) % u32::max_value();
+ },
+ );
+ }
+
+ /// Benchmark write performance under concurrent read pressure
+ #[test]
+ #[ignore]
+ fn concurrent_write() {
+ // Create a buffer
+ let buf = TripleBuffer::new(0u32);
+ let (mut buf_input, mut buf_output) = buf.split();
+
+ // Benchmark writes under concurrent read pressure
+ let mut iter = 1u32;
+ testbench::concurrent_benchmark(
+ 88_000_000u32,
+ move || {
+ buf_input.write(iter);
+ iter += 1;
+ },
+ move || {
+ let read = *buf_output.read();
+ assert!(read < u32::max_value());
+ },
+ );
+ }
+}