diff options
Diffstat (limited to 'library/std/src/sync/mpsc/mpsc_queue.rs')
-rw-r--r-- | library/std/src/sync/mpsc/mpsc_queue.rs | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs new file mode 100644 index 000000000..cdd64a5de --- /dev/null +++ b/library/std/src/sync/mpsc/mpsc_queue.rs @@ -0,0 +1,117 @@ +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between threads, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue might not be appropriate for all use-cases. + +// https://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +pub use self::PopResult::*; + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, Ordering}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) + } +} + +impl<T> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&self) -> PopResult<T> { + unsafe { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + let _: Box<Node<T>> = Box::from_raw(tail); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } + } + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _: Box<Node<T>> = Box::from_raw(cur); + cur = next; + } + } + } +} |