From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- library/std/src/sync/mpsc/spsc_queue.rs | 236 ++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 library/std/src/sync/mpsc/spsc_queue.rs (limited to 'library/std/src/sync/mpsc/spsc_queue.rs') diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs new file mode 100644 index 000000000..7e745eb31 --- /dev/null +++ b/library/std/src/sync/mpsc/spsc_queue.rs @@ -0,0 +1,236 @@ +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two threads. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + +// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +use super::cache_aligned::CacheAligned; + +// Node within the linked list queue of messages to send +struct Node { + // FIXME: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache + next: AtomicPtr>, // next node in the queue +} + +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an Arc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { + // consumer fields + consumer: CacheAligned>, + + // producer fields + producer: CacheAligned>, +} + +struct Consumer { + tail: UnsafeCell<*mut Node>, // where to pop from + tail_prev: AtomicPtr>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cacheable + addition: Addition, +} + +struct Producer { + head: UnsafeCell<*mut Node>, // where to push to + first: UnsafeCell<*mut Node>, // where to get new nodes from + tail_copy: UnsafeCell<*mut Node>, // between first/tail + addition: Addition, +} + +unsafe impl Send for Queue {} + +unsafe impl Sync for Queue {} + +impl Node { + fn new() -> *mut Node { + Box::into_raw(box Node { + value: None, + cached: false, + next: AtomicPtr::new(ptr::null_mut::>()), + }) + } +} + +impl Queue { + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { + let n1 = Node::new(); + let n2 = Node::new(); + (*n1).next.store(n2, Ordering::Relaxed); + Queue { + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition, + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition, + }), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(ptr::null_mut(), Ordering::Relaxed); + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; + } + } + + unsafe fn alloc(&self) -> *mut Node { + // First try to see if we can consume the 'first' node for our uses. + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { + return None; + } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next + .store(next, Ordering::Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: Box> = Box::from_raw(tail); + } + } + ret + } + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + /// + /// # Warning + /// The reference returned is invalid if it is not used before the consumer + /// pops the value off the queue. If the producer then pushes another value + /// onto the queue, it will overwrite the value pointed to by the reference. + pub fn peek(&self) -> Option<&mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. + unsafe { + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { None } else { (*next).value.as_mut() } + } + } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } +} + +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = *self.producer.first.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _n: Box> = Box::from_raw(cur); + cur = next; + } + } + } +} -- cgit v1.2.3