diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:18:32 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:18:32 +0000 |
commit | 4547b622d8d29df964fa2914213088b148c498fc (patch) | |
tree | 9fc6b25f3c3add6b745be9a2400a6e96140046e9 /library/std/src/sync/mpsc/mpsc_queue.rs | |
parent | Releasing progress-linux version 1.66.0+dfsg1-1~progress7.99u1. (diff) | |
download | rustc-4547b622d8d29df964fa2914213088b148c498fc.tar.xz rustc-4547b622d8d29df964fa2914213088b148c498fc.zip |
Merging upstream version 1.67.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sync/mpsc/mpsc_queue.rs')
-rw-r--r-- | library/std/src/sync/mpsc/mpsc_queue.rs | 117 |
1 files changed, 0 insertions, 117 deletions
diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs deleted file mode 100644 index cdd64a5de..000000000 --- a/library/std/src/sync/mpsc/mpsc_queue.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between threads, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue might not be appropriate for all use-cases. - -// https://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests; - -pub use self::PopResult::*; - -use core::cell::UnsafeCell; -use core::ptr; - -use crate::boxed::Box; -use crate::sync::atomic::{AtomicPtr, Ordering}; - -/// A result of the `pop` function. -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue<T> { - head: AtomicPtr<Node<T>>, - tail: UnsafeCell<*mut Node<T>>, -} - -unsafe impl<T: Send> Send for Queue<T> {} -unsafe impl<T: Send> Sync for Queue<T> {} - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) - } -} - -impl<T> Queue<T> { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue<T> { - let stub = unsafe { Node::new(None) }; - Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, Ordering::AcqRel); - (*prev).next.store(n, Ordering::Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option<T>`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&self) -> PopResult<T> { - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Ordering::Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take().unwrap(); - let _: Box<Node<T>> = Box::from_raw(tail); - return Data(ret); - } - - if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } - } - } -} - -impl<T> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Ordering::Relaxed); - let _: Box<Node<T>> = Box::from_raw(cur); - cur = next; - } - } - } -} |