summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-epoch/src/sync/queue.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:44:51 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:44:51 +0000
commit9e3c08db40b8916968b9f30096c7be3f00ce9647 (patch)
treea68f146d7fa01f0134297619fbe7e33db084e0aa /third_party/rust/crossbeam-epoch/src/sync/queue.rs
parentInitial commit. (diff)
downloadthunderbird-upstream.tar.xz
thunderbird-upstream.zip
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-epoch/src/sync/queue.rs')
-rw-r--r--third_party/rust/crossbeam-epoch/src/sync/queue.rs469
1 files changed, 469 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-epoch/src/sync/queue.rs b/third_party/rust/crossbeam-epoch/src/sync/queue.rs
new file mode 100644
index 0000000000..9500438819
--- /dev/null
+++ b/third_party/rust/crossbeam-epoch/src/sync/queue.rs
@@ -0,0 +1,469 @@
+//! Michael-Scott lock-free queue.
+//!
+//! Usable with any number of producers and consumers.
+//!
+//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
+//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
+//!
+//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
+//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
+
+use core::mem::MaybeUninit;
+use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+
+use crossbeam_utils::CachePadded;
+
+use crate::{unprotected, Atomic, Guard, Owned, Shared};
+
+// The representation here is a singly-linked list, with a sentinel node at the front. In general
+// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
+// all `Blocked` (requests for data from blocked threads).
+#[derive(Debug)]
+pub(crate) struct Queue<T> {
+ head: CachePadded<Atomic<Node<T>>>,
+ tail: CachePadded<Atomic<Node<T>>>,
+}
+
+struct Node<T> {
+ /// The slot in which a value of type `T` can be stored.
+ ///
+ /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
+ /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
+ /// Other nodes start their life with a push operation and contain a value until it gets popped
+ /// out. After that such empty nodes get added to the collector for destruction.
+ data: MaybeUninit<T>,
+
+ next: Atomic<Node<T>>,
+}
+
+// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
+unsafe impl<T: Send> Sync for Queue<T> {}
+unsafe impl<T: Send> Send for Queue<T> {}
+
+impl<T> Queue<T> {
+ /// Create a new, empty queue.
+ pub(crate) fn new() -> Queue<T> {
+ let q = Queue {
+ head: CachePadded::new(Atomic::null()),
+ tail: CachePadded::new(Atomic::null()),
+ };
+ let sentinel = Owned::new(Node {
+ data: MaybeUninit::uninit(),
+ next: Atomic::null(),
+ });
+ unsafe {
+ let guard = unprotected();
+ let sentinel = sentinel.into_shared(guard);
+ q.head.store(sentinel, Relaxed);
+ q.tail.store(sentinel, Relaxed);
+ q
+ }
+ }
+
+ /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
+ /// success. The queue's `tail` pointer may be updated.
+ #[inline(always)]
+ fn push_internal(
+ &self,
+ onto: Shared<'_, Node<T>>,
+ new: Shared<'_, Node<T>>,
+ guard: &Guard,
+ ) -> bool {
+ // is `onto` the actual tail?
+ let o = unsafe { onto.deref() };
+ let next = o.next.load(Acquire, guard);
+ if unsafe { next.as_ref().is_some() } {
+ // if not, try to "help" by moving the tail pointer forward
+ let _ = self
+ .tail
+ .compare_exchange(onto, next, Release, Relaxed, guard);
+ false
+ } else {
+ // looks like the actual tail; attempt to link in `n`
+ let result = o
+ .next
+ .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
+ .is_ok();
+ if result {
+ // try to move the tail pointer forward
+ let _ = self
+ .tail
+ .compare_exchange(onto, new, Release, Relaxed, guard);
+ }
+ result
+ }
+ }
+
+ /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
+ pub(crate) fn push(&self, t: T, guard: &Guard) {
+ let new = Owned::new(Node {
+ data: MaybeUninit::new(t),
+ next: Atomic::null(),
+ });
+ let new = Owned::into_shared(new, guard);
+
+ loop {
+ // We push onto the tail, so we'll start optimistically by looking there first.
+ let tail = self.tail.load(Acquire, guard);
+
+ // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
+ if self.push_internal(tail, new, guard) {
+ break;
+ }
+ }
+ }
+
+ /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
+ #[inline(always)]
+ fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
+ let head = self.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ let next = h.next.load(Acquire, guard);
+ match unsafe { next.as_ref() } {
+ Some(n) => unsafe {
+ self.head
+ .compare_exchange(head, next, Release, Relaxed, guard)
+ .map(|_| {
+ let tail = self.tail.load(Relaxed, guard);
+ // Advance the tail so that we don't retire a pointer to a reachable node.
+ if head == tail {
+ let _ = self
+ .tail
+ .compare_exchange(tail, next, Release, Relaxed, guard);
+ }
+ guard.defer_destroy(head);
+ // TODO: Replace with MaybeUninit::read when api is stable
+ Some(n.data.as_ptr().read())
+ })
+ .map_err(|_| ())
+ },
+ None => Ok(None),
+ }
+ }
+
+ /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
+ /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
+ #[inline(always)]
+ fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
+ where
+ T: Sync,
+ F: Fn(&T) -> bool,
+ {
+ let head = self.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ let next = h.next.load(Acquire, guard);
+ match unsafe { next.as_ref() } {
+ Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
+ self.head
+ .compare_exchange(head, next, Release, Relaxed, guard)
+ .map(|_| {
+ let tail = self.tail.load(Relaxed, guard);
+ // Advance the tail so that we don't retire a pointer to a reachable node.
+ if head == tail {
+ let _ = self
+ .tail
+ .compare_exchange(tail, next, Release, Relaxed, guard);
+ }
+ guard.defer_destroy(head);
+ Some(n.data.as_ptr().read())
+ })
+ .map_err(|_| ())
+ },
+ None | Some(_) => Ok(None),
+ }
+ }
+
+ /// Attempts to dequeue from the front.
+ ///
+ /// Returns `None` if the queue is observed to be empty.
+ pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
+ loop {
+ if let Ok(head) = self.pop_internal(guard) {
+ return head;
+ }
+ }
+ }
+
+ /// Attempts to dequeue from the front, if the item satisfies the given condition.
+ ///
+ /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
+ /// condition.
+ pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
+ where
+ T: Sync,
+ F: Fn(&T) -> bool,
+ {
+ loop {
+ if let Ok(head) = self.pop_if_internal(&condition, guard) {
+ return head;
+ }
+ }
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let guard = unprotected();
+
+ while self.try_pop(guard).is_some() {}
+
+ // Destroy the remaining sentinel node.
+ let sentinel = self.head.load(Relaxed, guard);
+ drop(sentinel.into_owned());
+ }
+ }
+}
+
+#[cfg(all(test, not(crossbeam_loom)))]
+mod test {
+ use super::*;
+ use crate::pin;
+ use crossbeam_utils::thread;
+
+ struct Queue<T> {
+ queue: super::Queue<T>,
+ }
+
+ impl<T> Queue<T> {
+ pub(crate) fn new() -> Queue<T> {
+ Queue {
+ queue: super::Queue::new(),
+ }
+ }
+
+ pub(crate) fn push(&self, t: T) {
+ let guard = &pin();
+ self.queue.push(t, guard);
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ let guard = &pin();
+ let head = self.queue.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ h.next.load(Acquire, guard).is_null()
+ }
+
+ pub(crate) fn try_pop(&self) -> Option<T> {
+ let guard = &pin();
+ self.queue.try_pop(guard)
+ }
+
+ pub(crate) fn pop(&self) -> T {
+ loop {
+ match self.try_pop() {
+ None => continue,
+ Some(t) => return t,
+ }
+ }
+ }
+ }
+
+ #[cfg(miri)]
+ const CONC_COUNT: i64 = 1000;
+ #[cfg(not(miri))]
+ const CONC_COUNT: i64 = 1000000;
+
+ #[test]
+ fn push_try_pop_1() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ assert!(!q.is_empty());
+ assert_eq!(q.try_pop(), Some(37));
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_2() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ q.push(48);
+ assert_eq!(q.try_pop(), Some(37));
+ assert!(!q.is_empty());
+ assert_eq!(q.try_pop(), Some(48));
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_many_seq() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ for i in 0..200 {
+ q.push(i)
+ }
+ assert!(!q.is_empty());
+ for i in 0..200 {
+ assert_eq!(q.try_pop(), Some(i));
+ }
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_pop_1() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ assert!(!q.is_empty());
+ assert_eq!(q.pop(), 37);
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_pop_2() {
+ let q: Queue<i64> = Queue::new();
+ q.push(37);
+ q.push(48);
+ assert_eq!(q.pop(), 37);
+ assert_eq!(q.pop(), 48);
+ }
+
+ #[test]
+ fn push_pop_many_seq() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ for i in 0..200 {
+ q.push(i)
+ }
+ assert!(!q.is_empty());
+ for i in 0..200 {
+ assert_eq!(q.pop(), i);
+ }
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_many_spsc() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+
+ thread::scope(|scope| {
+ scope.spawn(|_| {
+ let mut next = 0;
+
+ while next < CONC_COUNT {
+ if let Some(elem) = q.try_pop() {
+ assert_eq!(elem, next);
+ next += 1;
+ }
+ }
+ });
+
+ for i in 0..CONC_COUNT {
+ q.push(i)
+ }
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_try_pop_many_spmc() {
+ fn recv(_t: i32, q: &Queue<i64>) {
+ let mut cur = -1;
+ for _i in 0..CONC_COUNT {
+ if let Some(elem) = q.try_pop() {
+ assert!(elem > cur);
+ cur = elem;
+
+ if cur == CONC_COUNT - 1 {
+ break;
+ }
+ }
+ }
+ }
+
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ thread::scope(|scope| {
+ for i in 0..3 {
+ let q = &q;
+ scope.spawn(move |_| recv(i, q));
+ }
+
+ scope.spawn(|_| {
+ for i in 0..CONC_COUNT {
+ q.push(i);
+ }
+ });
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_try_pop_many_mpmc() {
+ enum LR {
+ Left(i64),
+ Right(i64),
+ }
+
+ let q: Queue<LR> = Queue::new();
+ assert!(q.is_empty());
+
+ thread::scope(|scope| {
+ for _t in 0..2 {
+ scope.spawn(|_| {
+ for i in CONC_COUNT - 1..CONC_COUNT {
+ q.push(LR::Left(i))
+ }
+ });
+ scope.spawn(|_| {
+ for i in CONC_COUNT - 1..CONC_COUNT {
+ q.push(LR::Right(i))
+ }
+ });
+ scope.spawn(|_| {
+ let mut vl = vec![];
+ let mut vr = vec![];
+ for _i in 0..CONC_COUNT {
+ match q.try_pop() {
+ Some(LR::Left(x)) => vl.push(x),
+ Some(LR::Right(x)) => vr.push(x),
+ _ => {}
+ }
+ }
+
+ let mut vl2 = vl.clone();
+ let mut vr2 = vr.clone();
+ vl2.sort_unstable();
+ vr2.sort_unstable();
+
+ assert_eq!(vl, vl2);
+ assert_eq!(vr, vr2);
+ });
+ }
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_pop_many_spsc() {
+ let q: Queue<i64> = Queue::new();
+
+ thread::scope(|scope| {
+ scope.spawn(|_| {
+ let mut next = 0;
+ while next < CONC_COUNT {
+ assert_eq!(q.pop(), next);
+ next += 1;
+ }
+ });
+
+ for i in 0..CONC_COUNT {
+ q.push(i)
+ }
+ })
+ .unwrap();
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn is_empty_dont_pop() {
+ let q: Queue<i64> = Queue::new();
+ q.push(20);
+ q.push(20);
+ assert!(!q.is_empty());
+ assert!(!q.is_empty());
+ assert!(q.try_pop().is_some());
+ }
+}