diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/crossbeam-epoch/src/sync | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-epoch/src/sync')
-rw-r--r-- | third_party/rust/crossbeam-epoch/src/sync/list.rs | 487 | ||||
-rw-r--r-- | third_party/rust/crossbeam-epoch/src/sync/mod.rs | 7 | ||||
-rw-r--r-- | third_party/rust/crossbeam-epoch/src/sync/once_lock.rs | 103 | ||||
-rw-r--r-- | third_party/rust/crossbeam-epoch/src/sync/queue.rs | 469 |
4 files changed, 1066 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-epoch/src/sync/list.rs b/third_party/rust/crossbeam-epoch/src/sync/list.rs new file mode 100644 index 0000000000..52ffd6fca4 --- /dev/null +++ b/third_party/rust/crossbeam-epoch/src/sync/list.rs @@ -0,0 +1,487 @@ +//! Lock-free intrusive linked list. +//! +//! Ideas from Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA +//! 2002. <http://dl.acm.org/citation.cfm?id=564870.564881> + +use core::marker::PhantomData; +use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +use crate::{unprotected, Atomic, Guard, Shared}; + +/// An entry in a linked list. +/// +/// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different +/// cache-line than thread-local data in terms of performance. +#[derive(Debug)] +pub(crate) struct Entry { + /// The next entry in the linked list. + /// If the tag is 1, this entry is marked as deleted. + next: Atomic<Entry>, +} + +/// Implementing this trait asserts that the type `T` can be used as an element in the intrusive +/// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance +/// of `Entry`. +/// +/// # Example +/// +/// ```ignore +/// struct A { +/// entry: Entry, +/// data: usize, +/// } +/// +/// impl IsElement<A> for A { +/// fn entry_of(a: &A) -> &Entry { +/// let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry; +/// unsafe { &*entry_ptr } +/// } +/// +/// unsafe fn element_of(entry: &Entry) -> &T { +/// let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T; +/// &*elem_ptr +/// } +/// +/// unsafe fn finalize(entry: &Entry, guard: &Guard) { +/// guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)); +/// } +/// } +/// ``` +/// +/// This trait is implemented on a type separate from `T` (although it can be just `T`), because +/// one type might be placeable into multiple lists, in which case it would require multiple +/// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>` +/// represents a distinct `Entry` in `T`. +/// +/// For example, we can insert the following struct into two lists using `entry1` for one +/// and `entry2` for the other: +/// +/// ```ignore +/// struct B { +/// entry1: Entry, +/// entry2: Entry, +/// data: usize, +/// } +/// ``` +/// +pub(crate) trait IsElement<T> { + /// Returns a reference to this element's `Entry`. + fn entry_of(_: &T) -> &Entry; + + /// Given a reference to an element's entry, returns that element. + /// + /// ```ignore + /// let elem = ListElement::new(); + /// assert_eq!(elem.entry_of(), + /// unsafe { ListElement::element_of(elem.entry_of()) } ); + /// ``` + /// + /// # Safety + /// + /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance + /// of the element type (`T`). + unsafe fn element_of(_: &Entry) -> &T; + + /// The function that is called when an entry is unlinked from list. + /// + /// # Safety + /// + /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance + /// of the element type (`T`). + unsafe fn finalize(_: &Entry, _: &Guard); +} + +/// A lock-free, intrusive linked list of type `T`. +#[derive(Debug)] +pub(crate) struct List<T, C: IsElement<T> = T> { + /// The head of the linked list. + head: Atomic<Entry>, + + /// The phantom data for using `T` and `C`. + _marker: PhantomData<(T, C)>, +} + +/// An iterator used for retrieving values from the list. +pub(crate) struct Iter<'g, T, C: IsElement<T>> { + /// The guard that protects the iteration. + guard: &'g Guard, + + /// Pointer from the predecessor to the current entry. + pred: &'g Atomic<Entry>, + + /// The current entry. + curr: Shared<'g, Entry>, + + /// The list head, needed for restarting iteration. + head: &'g Atomic<Entry>, + + /// Logically, we store a borrow of an instance of `T` and + /// use the type information from `C`. + _marker: PhantomData<(&'g T, C)>, +} + +/// An error that occurs during iteration over the list. +#[derive(PartialEq, Debug)] +pub(crate) enum IterError { + /// A concurrent thread modified the state of the list at the same place that this iterator + /// was inspecting. Subsequent iteration will restart from the beginning of the list. + Stalled, +} + +impl Default for Entry { + /// Returns the empty entry. + fn default() -> Self { + Self { + next: Atomic::null(), + } + } +} + +impl Entry { + /// Marks this entry as deleted, deferring the actual deallocation to a later iteration. + /// + /// # Safety + /// + /// The entry should be a member of a linked list, and it should not have been deleted. + /// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C` + /// is the associated helper for the linked list. + pub(crate) unsafe fn delete(&self, guard: &Guard) { + self.next.fetch_or(1, Release, guard); + } +} + +impl<T, C: IsElement<T>> List<T, C> { + /// Returns a new, empty linked list. + pub(crate) fn new() -> Self { + Self { + head: Atomic::null(), + _marker: PhantomData, + } + } + + /// Inserts `entry` into the head of the list. + /// + /// # Safety + /// + /// You should guarantee that: + /// + /// - `container` is not null + /// - `container` is immovable, e.g. inside an `Owned` + /// - the same `Entry` is not inserted more than once + /// - the inserted object will be removed before the list is dropped + pub(crate) unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) { + // Insert right after head, i.e. at the beginning of the list. + let to = &self.head; + // Get the intrusively stored Entry of the new element to insert. + let entry: &Entry = C::entry_of(container.deref()); + // Make a Shared ptr to that Entry. + let entry_ptr = Shared::from(entry as *const _); + // Read the current successor of where we want to insert. + let mut next = to.load(Relaxed, guard); + + loop { + // Set the Entry of the to-be-inserted element to point to the previous successor of + // `to`. + entry.next.store(next, Relaxed); + match to.compare_exchange_weak(next, entry_ptr, Release, Relaxed, guard) { + Ok(_) => break, + // We lost the race or weak CAS failed spuriously. Update the successor and try + // again. + Err(err) => next = err.current, + } + } + } + + /// Returns an iterator over all objects. + /// + /// # Caveat + /// + /// Every object that is inserted at the moment this function is called and persists at least + /// until the end of iteration will be returned. Since this iterator traverses a lock-free + /// linked list that may be concurrently modified, some additional caveats apply: + /// + /// 1. If a new object is inserted during iteration, it may or may not be returned. + /// 2. If an object is deleted during iteration, it may or may not be returned. + /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning + /// thread will continue to iterate over the same list. + pub(crate) fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> { + Iter { + guard, + pred: &self.head, + curr: self.head.load(Acquire, guard), + head: &self.head, + _marker: PhantomData, + } + } +} + +impl<T, C: IsElement<T>> Drop for List<T, C> { + fn drop(&mut self) { + unsafe { + let guard = unprotected(); + let mut curr = self.head.load(Relaxed, guard); + while let Some(c) = curr.as_ref() { + let succ = c.next.load(Relaxed, guard); + // Verify that all elements have been removed from the list. + assert_eq!(succ.tag(), 1); + + C::finalize(curr.deref(), guard); + curr = succ; + } + } + } +} + +impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> { + type Item = Result<&'g T, IterError>; + + fn next(&mut self) -> Option<Self::Item> { + while let Some(c) = unsafe { self.curr.as_ref() } { + let succ = c.next.load(Acquire, self.guard); + + if succ.tag() == 1 { + // This entry was removed. Try unlinking it from the list. + let succ = succ.with_tag(0); + + // The tag should always be zero, because removing a node after a logically deleted + // node leaves the list in an invalid state. + debug_assert!(self.curr.tag() == 0); + + // Try to unlink `curr` from the list, and get the new value of `self.pred`. + let succ = match self + .pred + .compare_exchange(self.curr, succ, Acquire, Acquire, self.guard) + { + Ok(_) => { + // We succeeded in unlinking `curr`, so we have to schedule + // deallocation. Deferred drop is okay, because `list.delete()` can only be + // called if `T: 'static`. + unsafe { + C::finalize(self.curr.deref(), self.guard); + } + + // `succ` is the new value of `self.pred`. + succ + } + Err(e) => { + // `e.current` is the current value of `self.pred`. + e.current + } + }; + + // If the predecessor node is already marked as deleted, we need to restart from + // `head`. + if succ.tag() != 0 { + self.pred = self.head; + self.curr = self.head.load(Acquire, self.guard); + + return Some(Err(IterError::Stalled)); + } + + // Move over the removed by only advancing `curr`, not `pred`. + self.curr = succ; + continue; + } + + // Move one step forward. + self.pred = &c.next; + self.curr = succ; + + return Some(Ok(unsafe { C::element_of(c) })); + } + + // We reached the end of the list. + None + } +} + +#[cfg(all(test, not(crossbeam_loom)))] +mod tests { + use super::*; + use crate::{Collector, Owned}; + use crossbeam_utils::thread; + use std::sync::Barrier; + + impl IsElement<Entry> for Entry { + fn entry_of(entry: &Entry) -> &Entry { + entry + } + + unsafe fn element_of(entry: &Entry) -> &Entry { + entry + } + + unsafe fn finalize(entry: &Entry, guard: &Guard) { + guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)); + } + } + + /// Checks whether the list retains inserted elements + /// and returns them in the correct order. + #[test] + fn insert() { + let collector = Collector::new(); + let handle = collector.register(); + let guard = handle.pin(); + + let l: List<Entry> = List::new(); + + let e1 = Owned::new(Entry::default()).into_shared(&guard); + let e2 = Owned::new(Entry::default()).into_shared(&guard); + let e3 = Owned::new(Entry::default()).into_shared(&guard); + + unsafe { + l.insert(e1, &guard); + l.insert(e2, &guard); + l.insert(e3, &guard); + } + + let mut iter = l.iter(&guard); + let maybe_e3 = iter.next(); + assert!(maybe_e3.is_some()); + assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw()); + let maybe_e2 = iter.next(); + assert!(maybe_e2.is_some()); + assert!(maybe_e2.unwrap().unwrap() as *const Entry == e2.as_raw()); + let maybe_e1 = iter.next(); + assert!(maybe_e1.is_some()); + assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw()); + assert!(iter.next().is_none()); + + unsafe { + e1.as_ref().unwrap().delete(&guard); + e2.as_ref().unwrap().delete(&guard); + e3.as_ref().unwrap().delete(&guard); + } + } + + /// Checks whether elements can be removed from the list and whether + /// the correct elements are removed. + #[test] + fn delete() { + let collector = Collector::new(); + let handle = collector.register(); + let guard = handle.pin(); + + let l: List<Entry> = List::new(); + + let e1 = Owned::new(Entry::default()).into_shared(&guard); + let e2 = Owned::new(Entry::default()).into_shared(&guard); + let e3 = Owned::new(Entry::default()).into_shared(&guard); + unsafe { + l.insert(e1, &guard); + l.insert(e2, &guard); + l.insert(e3, &guard); + e2.as_ref().unwrap().delete(&guard); + } + + let mut iter = l.iter(&guard); + let maybe_e3 = iter.next(); + assert!(maybe_e3.is_some()); + assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw()); + let maybe_e1 = iter.next(); + assert!(maybe_e1.is_some()); + assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw()); + assert!(iter.next().is_none()); + + unsafe { + e1.as_ref().unwrap().delete(&guard); + e3.as_ref().unwrap().delete(&guard); + } + + let mut iter = l.iter(&guard); + assert!(iter.next().is_none()); + } + + const THREADS: usize = 8; + const ITERS: usize = 512; + + /// Contends the list on insert and delete operations to make sure they can run concurrently. + #[test] + fn insert_delete_multi() { + let collector = Collector::new(); + + let l: List<Entry> = List::new(); + let b = Barrier::new(THREADS); + + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|_| { + b.wait(); + + let handle = collector.register(); + let guard: Guard = handle.pin(); + let mut v = Vec::with_capacity(ITERS); + + for _ in 0..ITERS { + let e = Owned::new(Entry::default()).into_shared(&guard); + v.push(e); + unsafe { + l.insert(e, &guard); + } + } + + for e in v { + unsafe { + e.as_ref().unwrap().delete(&guard); + } + } + }); + } + }) + .unwrap(); + + let handle = collector.register(); + let guard = handle.pin(); + + let mut iter = l.iter(&guard); + assert!(iter.next().is_none()); + } + + /// Contends the list on iteration to make sure that it can be iterated over concurrently. + #[test] + fn iter_multi() { + let collector = Collector::new(); + + let l: List<Entry> = List::new(); + let b = Barrier::new(THREADS); + + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|_| { + b.wait(); + + let handle = collector.register(); + let guard: Guard = handle.pin(); + let mut v = Vec::with_capacity(ITERS); + + for _ in 0..ITERS { + let e = Owned::new(Entry::default()).into_shared(&guard); + v.push(e); + unsafe { + l.insert(e, &guard); + } + } + + let mut iter = l.iter(&guard); + for _ in 0..ITERS { + assert!(iter.next().is_some()); + } + + for e in v { + unsafe { + e.as_ref().unwrap().delete(&guard); + } + } + }); + } + }) + .unwrap(); + + let handle = collector.register(); + let guard = handle.pin(); + + let mut iter = l.iter(&guard); + assert!(iter.next().is_none()); + } +} diff --git a/third_party/rust/crossbeam-epoch/src/sync/mod.rs b/third_party/rust/crossbeam-epoch/src/sync/mod.rs new file mode 100644 index 0000000000..08981be257 --- /dev/null +++ b/third_party/rust/crossbeam-epoch/src/sync/mod.rs @@ -0,0 +1,7 @@ +//! Synchronization primitives. + +pub(crate) mod list; +#[cfg(feature = "std")] +#[cfg(not(crossbeam_loom))] +pub(crate) mod once_lock; +pub(crate) mod queue; diff --git a/third_party/rust/crossbeam-epoch/src/sync/once_lock.rs b/third_party/rust/crossbeam-epoch/src/sync/once_lock.rs new file mode 100644 index 0000000000..c1fefc96cc --- /dev/null +++ b/third_party/rust/crossbeam-epoch/src/sync/once_lock.rs @@ -0,0 +1,103 @@ +// Based on unstable std::sync::OnceLock. +// +// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs + +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; + +pub(crate) struct OnceLock<T> { + once: Once, + // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized. + is_initialized: AtomicBool, + value: UnsafeCell<MaybeUninit<T>>, + // Unlike std::sync::OnceLock, we don't need PhantomData here because + // we don't use #[may_dangle]. +} + +unsafe impl<T: Sync + Send> Sync for OnceLock<T> {} +unsafe impl<T: Send> Send for OnceLock<T> {} + +impl<T> OnceLock<T> { + /// Creates a new empty cell. + #[must_use] + pub(crate) const fn new() -> Self { + Self { + once: Once::new(), + is_initialized: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Gets the contents of the cell, initializing it with `f` if the cell + /// was empty. + /// + /// Many threads may call `get_or_init` concurrently with different + /// initializing functions, but it is guaranteed that only one function + /// will be executed. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and the cell + /// remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. The + /// exact outcome is unspecified. Current implementation deadlocks, but + /// this may be changed to a panic in the future. + pub(crate) fn get_or_init<F>(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Fast path check + if self.is_initialized() { + // SAFETY: The inner value has been initialized + return unsafe { self.get_unchecked() }; + } + self.initialize(f); + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + unsafe { self.get_unchecked() } + } + + #[inline] + fn is_initialized(&self) -> bool { + self.is_initialized.load(Ordering::Acquire) + } + + #[cold] + fn initialize<F>(&self, f: F) + where + F: FnOnce() -> T, + { + let slot = self.value.get().cast::<T>(); + let is_initialized = &self.is_initialized; + + self.once.call_once(|| { + let value = f(); + unsafe { + slot.write(value); + } + is_initialized.store(true, Ordering::Release); + }); + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + &*self.value.get().cast::<T>() + } +} + +impl<T> Drop for OnceLock<T> { + fn drop(&mut self) { + if self.is_initialized() { + // SAFETY: The inner value has been initialized + unsafe { self.value.get().cast::<T>().drop_in_place() }; + } + } +} diff --git a/third_party/rust/crossbeam-epoch/src/sync/queue.rs b/third_party/rust/crossbeam-epoch/src/sync/queue.rs new file mode 100644 index 0000000000..9500438819 --- /dev/null +++ b/third_party/rust/crossbeam-epoch/src/sync/queue.rs @@ -0,0 +1,469 @@ +//! Michael-Scott lock-free queue. +//! +//! Usable with any number of producers and consumers. +//! +//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue +//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106> +//! +//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a +//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7> + +use core::mem::MaybeUninit; +use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +use crossbeam_utils::CachePadded; + +use crate::{unprotected, Atomic, Guard, Owned, Shared}; + +// The representation here is a singly-linked list, with a sentinel node at the front. In general +// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or +// all `Blocked` (requests for data from blocked threads). +#[derive(Debug)] +pub(crate) struct Queue<T> { + head: CachePadded<Atomic<Node<T>>>, + tail: CachePadded<Atomic<Node<T>>>, +} + +struct Node<T> { + /// The slot in which a value of type `T` can be stored. + /// + /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. + /// For example, the sentinel node in a queue never contains a value: its slot is always empty. + /// Other nodes start their life with a push operation and contain a value until it gets popped + /// out. After that such empty nodes get added to the collector for destruction. + data: MaybeUninit<T>, + + next: Atomic<Node<T>>, +} + +// Any particular `T` should never be accessed concurrently, so no need for `Sync`. +unsafe impl<T: Send> Sync for Queue<T> {} +unsafe impl<T: Send> Send for Queue<T> {} + +impl<T> Queue<T> { + /// Create a new, empty queue. + pub(crate) fn new() -> Queue<T> { + let q = Queue { + head: CachePadded::new(Atomic::null()), + tail: CachePadded::new(Atomic::null()), + }; + let sentinel = Owned::new(Node { + data: MaybeUninit::uninit(), + next: Atomic::null(), + }); + unsafe { + let guard = unprotected(); + let sentinel = sentinel.into_shared(guard); + q.head.store(sentinel, Relaxed); + q.tail.store(sentinel, Relaxed); + q + } + } + + /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on + /// success. The queue's `tail` pointer may be updated. + #[inline(always)] + fn push_internal( + &self, + onto: Shared<'_, Node<T>>, + new: Shared<'_, Node<T>>, + guard: &Guard, + ) -> bool { + // is `onto` the actual tail? + let o = unsafe { onto.deref() }; + let next = o.next.load(Acquire, guard); + if unsafe { next.as_ref().is_some() } { + // if not, try to "help" by moving the tail pointer forward + let _ = self + .tail + .compare_exchange(onto, next, Release, Relaxed, guard); + false + } else { + // looks like the actual tail; attempt to link in `n` + let result = o + .next + .compare_exchange(Shared::null(), new, Release, Relaxed, guard) + .is_ok(); + if result { + // try to move the tail pointer forward + let _ = self + .tail + .compare_exchange(onto, new, Release, Relaxed, guard); + } + result + } + } + + /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. + pub(crate) fn push(&self, t: T, guard: &Guard) { + let new = Owned::new(Node { + data: MaybeUninit::new(t), + next: Atomic::null(), + }); + let new = Owned::into_shared(new, guard); + + loop { + // We push onto the tail, so we'll start optimistically by looking there first. + let tail = self.tail.load(Acquire, guard); + + // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. + if self.push_internal(tail, new, guard) { + break; + } + } + } + + /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. + #[inline(always)] + fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { + let head = self.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + let next = h.next.load(Acquire, guard); + match unsafe { next.as_ref() } { + Some(n) => unsafe { + self.head + .compare_exchange(head, next, Release, Relaxed, guard) + .map(|_| { + let tail = self.tail.load(Relaxed, guard); + // Advance the tail so that we don't retire a pointer to a reachable node. + if head == tail { + let _ = self + .tail + .compare_exchange(tail, next, Release, Relaxed, guard); + } + guard.defer_destroy(head); + // TODO: Replace with MaybeUninit::read when api is stable + Some(n.data.as_ptr().read()) + }) + .map_err(|_| ()) + }, + None => Ok(None), + } + } + + /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue + /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. + #[inline(always)] + fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> + where + T: Sync, + F: Fn(&T) -> bool, + { + let head = self.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + let next = h.next.load(Acquire, guard); + match unsafe { next.as_ref() } { + Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { + self.head + .compare_exchange(head, next, Release, Relaxed, guard) + .map(|_| { + let tail = self.tail.load(Relaxed, guard); + // Advance the tail so that we don't retire a pointer to a reachable node. + if head == tail { + let _ = self + .tail + .compare_exchange(tail, next, Release, Relaxed, guard); + } + guard.defer_destroy(head); + Some(n.data.as_ptr().read()) + }) + .map_err(|_| ()) + }, + None | Some(_) => Ok(None), + } + } + + /// Attempts to dequeue from the front. + /// + /// Returns `None` if the queue is observed to be empty. + pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { + loop { + if let Ok(head) = self.pop_internal(guard) { + return head; + } + } + } + + /// Attempts to dequeue from the front, if the item satisfies the given condition. + /// + /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given + /// condition. + pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> + where + T: Sync, + F: Fn(&T) -> bool, + { + loop { + if let Ok(head) = self.pop_if_internal(&condition, guard) { + return head; + } + } + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let guard = unprotected(); + + while self.try_pop(guard).is_some() {} + + // Destroy the remaining sentinel node. + let sentinel = self.head.load(Relaxed, guard); + drop(sentinel.into_owned()); + } + } +} + +#[cfg(all(test, not(crossbeam_loom)))] +mod test { + use super::*; + use crate::pin; + use crossbeam_utils::thread; + + struct Queue<T> { + queue: super::Queue<T>, + } + + impl<T> Queue<T> { + pub(crate) fn new() -> Queue<T> { + Queue { + queue: super::Queue::new(), + } + } + + pub(crate) fn push(&self, t: T) { + let guard = &pin(); + self.queue.push(t, guard); + } + + pub(crate) fn is_empty(&self) -> bool { + let guard = &pin(); + let head = self.queue.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + h.next.load(Acquire, guard).is_null() + } + + pub(crate) fn try_pop(&self) -> Option<T> { + let guard = &pin(); + self.queue.try_pop(guard) + } + + pub(crate) fn pop(&self) -> T { + loop { + match self.try_pop() { + None => continue, + Some(t) => return t, + } + } + } + } + + #[cfg(miri)] + const CONC_COUNT: i64 = 1000; + #[cfg(not(miri))] + const CONC_COUNT: i64 = 1000000; + + #[test] + fn push_try_pop_1() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + q.push(37); + assert!(!q.is_empty()); + assert_eq!(q.try_pop(), Some(37)); + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_2() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + q.push(37); + q.push(48); + assert_eq!(q.try_pop(), Some(37)); + assert!(!q.is_empty()); + assert_eq!(q.try_pop(), Some(48)); + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_many_seq() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + for i in 0..200 { + q.push(i) + } + assert!(!q.is_empty()); + for i in 0..200 { + assert_eq!(q.try_pop(), Some(i)); + } + assert!(q.is_empty()); + } + + #[test] + fn push_pop_1() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + q.push(37); + assert!(!q.is_empty()); + assert_eq!(q.pop(), 37); + assert!(q.is_empty()); + } + + #[test] + fn push_pop_2() { + let q: Queue<i64> = Queue::new(); + q.push(37); + q.push(48); + assert_eq!(q.pop(), 37); + assert_eq!(q.pop(), 48); + } + + #[test] + fn push_pop_many_seq() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + for i in 0..200 { + q.push(i) + } + assert!(!q.is_empty()); + for i in 0..200 { + assert_eq!(q.pop(), i); + } + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_many_spsc() { + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + + thread::scope(|scope| { + scope.spawn(|_| { + let mut next = 0; + + while next < CONC_COUNT { + if let Some(elem) = q.try_pop() { + assert_eq!(elem, next); + next += 1; + } + } + }); + + for i in 0..CONC_COUNT { + q.push(i) + } + }) + .unwrap(); + } + + #[test] + fn push_try_pop_many_spmc() { + fn recv(_t: i32, q: &Queue<i64>) { + let mut cur = -1; + for _i in 0..CONC_COUNT { + if let Some(elem) = q.try_pop() { + assert!(elem > cur); + cur = elem; + + if cur == CONC_COUNT - 1 { + break; + } + } + } + } + + let q: Queue<i64> = Queue::new(); + assert!(q.is_empty()); + thread::scope(|scope| { + for i in 0..3 { + let q = &q; + scope.spawn(move |_| recv(i, q)); + } + + scope.spawn(|_| { + for i in 0..CONC_COUNT { + q.push(i); + } + }); + }) + .unwrap(); + } + + #[test] + fn push_try_pop_many_mpmc() { + enum LR { + Left(i64), + Right(i64), + } + + let q: Queue<LR> = Queue::new(); + assert!(q.is_empty()); + + thread::scope(|scope| { + for _t in 0..2 { + scope.spawn(|_| { + for i in CONC_COUNT - 1..CONC_COUNT { + q.push(LR::Left(i)) + } + }); + scope.spawn(|_| { + for i in CONC_COUNT - 1..CONC_COUNT { + q.push(LR::Right(i)) + } + }); + scope.spawn(|_| { + let mut vl = vec![]; + let mut vr = vec![]; + for _i in 0..CONC_COUNT { + match q.try_pop() { + Some(LR::Left(x)) => vl.push(x), + Some(LR::Right(x)) => vr.push(x), + _ => {} + } + } + + let mut vl2 = vl.clone(); + let mut vr2 = vr.clone(); + vl2.sort_unstable(); + vr2.sort_unstable(); + + assert_eq!(vl, vl2); + assert_eq!(vr, vr2); + }); + } + }) + .unwrap(); + } + + #[test] + fn push_pop_many_spsc() { + let q: Queue<i64> = Queue::new(); + + thread::scope(|scope| { + scope.spawn(|_| { + let mut next = 0; + while next < CONC_COUNT { + assert_eq!(q.pop(), next); + next += 1; + } + }); + + for i in 0..CONC_COUNT { + q.push(i) + } + }) + .unwrap(); + assert!(q.is_empty()); + } + + #[test] + fn is_empty_dont_pop() { + let q: Queue<i64> = Queue::new(); + q.push(20); + q.push(20); + assert!(!q.is_empty()); + assert!(!q.is_empty()); + assert!(q.try_pop().is_some()); + } +} |