summaryrefslogtreecommitdiffstats
path: root/vendor/crossbeam-queue/src/seg_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--vendor/crossbeam-queue/src/seg_queue.rs545
1 files changed, 0 insertions, 545 deletions
diff --git a/vendor/crossbeam-queue/src/seg_queue.rs b/vendor/crossbeam-queue/src/seg_queue.rs
deleted file mode 100644
index 1767775d1..000000000
--- a/vendor/crossbeam-queue/src/seg_queue.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-use alloc::boxed::Box;
-use core::cell::UnsafeCell;
-use core::fmt;
-use core::marker::PhantomData;
-use core::mem::MaybeUninit;
-use core::ptr;
-use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
-
-use crossbeam_utils::{Backoff, CachePadded};
-
-// Bits indicating the state of a slot:
-// * If a value has been written into the slot, `WRITE` is set.
-// * If a value has been read from the slot, `READ` is set.
-// * If the block is being destroyed, `DESTROY` is set.
-const WRITE: usize = 1;
-const READ: usize = 2;
-const DESTROY: usize = 4;
-
-// Each block covers one "lap" of indices.
-const LAP: usize = 32;
-// The maximum number of values a block can hold.
-const BLOCK_CAP: usize = LAP - 1;
-// How many lower bits are reserved for metadata.
-const SHIFT: usize = 1;
-// Indicates that the block is not the last one.
-const HAS_NEXT: usize = 1;
-
-/// A slot in a block.
-struct Slot<T> {
- /// The value.
- value: UnsafeCell<MaybeUninit<T>>,
-
- /// The state of the slot.
- state: AtomicUsize,
-}
-
-impl<T> Slot<T> {
- /// Waits until a value is written into the slot.
- fn wait_write(&self) {
- let backoff = Backoff::new();
- while self.state.load(Ordering::Acquire) & WRITE == 0 {
- backoff.snooze();
- }
- }
-}
-
-/// A block in a linked list.
-///
-/// Each block in the list can hold up to `BLOCK_CAP` values.
-struct Block<T> {
- /// The next block in the linked list.
- next: AtomicPtr<Block<T>>,
-
- /// Slots for values.
- slots: [Slot<T>; BLOCK_CAP],
-}
-
-impl<T> Block<T> {
- /// Creates an empty block that starts at `start_index`.
- fn new() -> Block<T> {
- // SAFETY: This is safe because:
- // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
- // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
- // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
- // holds a MaybeUninit.
- // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
- unsafe { MaybeUninit::zeroed().assume_init() }
- }
-
- /// Waits until the next pointer is set.
- fn wait_next(&self) -> *mut Block<T> {
- let backoff = Backoff::new();
- loop {
- let next = self.next.load(Ordering::Acquire);
- if !next.is_null() {
- return next;
- }
- backoff.snooze();
- }
- }
-
- /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
- unsafe fn destroy(this: *mut Block<T>, start: usize) {
- // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
- // begun destruction of the block.
- for i in start..BLOCK_CAP - 1 {
- let slot = (*this).slots.get_unchecked(i);
-
- // Mark the `DESTROY` bit if a thread is still using the slot.
- if slot.state.load(Ordering::Acquire) & READ == 0
- && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
- {
- // If a thread is still using the slot, it will continue destruction of the block.
- return;
- }
- }
-
- // No thread is using the block, now it is safe to destroy it.
- drop(Box::from_raw(this));
- }
-}
-
-/// A position in a queue.
-struct Position<T> {
- /// The index in the queue.
- index: AtomicUsize,
-
- /// The block in the linked list.
- block: AtomicPtr<Block<T>>,
-}
-
-/// An unbounded multi-producer multi-consumer queue.
-///
-/// This queue is implemented as a linked list of segments, where each segment is a small buffer
-/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
-/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
-/// this queue is somewhat slower than [`ArrayQueue`].
-///
-/// [`ArrayQueue`]: super::ArrayQueue
-///
-/// # Examples
-///
-/// ```
-/// use crossbeam_queue::SegQueue;
-///
-/// let q = SegQueue::new();
-///
-/// q.push('a');
-/// q.push('b');
-///
-/// assert_eq!(q.pop(), Some('a'));
-/// assert_eq!(q.pop(), Some('b'));
-/// assert!(q.pop().is_none());
-/// ```
-pub struct SegQueue<T> {
- /// The head of the queue.
- head: CachePadded<Position<T>>,
-
- /// The tail of the queue.
- tail: CachePadded<Position<T>>,
-
- /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
- _marker: PhantomData<T>,
-}
-
-unsafe impl<T: Send> Send for SegQueue<T> {}
-unsafe impl<T: Send> Sync for SegQueue<T> {}
-
-impl<T> SegQueue<T> {
- /// Creates a new unbounded queue.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::SegQueue;
- ///
- /// let q = SegQueue::<i32>::new();
- /// ```
- pub const fn new() -> SegQueue<T> {
- SegQueue {
- head: CachePadded::new(Position {
- block: AtomicPtr::new(ptr::null_mut()),
- index: AtomicUsize::new(0),
- }),
- tail: CachePadded::new(Position {
- block: AtomicPtr::new(ptr::null_mut()),
- index: AtomicUsize::new(0),
- }),
- _marker: PhantomData,
- }
- }
-
- /// Pushes an element into the queue.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::SegQueue;
- ///
- /// let q = SegQueue::new();
- ///
- /// q.push(10);
- /// q.push(20);
- /// ```
- pub fn push(&self, value: T) {
- let backoff = Backoff::new();
- let mut tail = self.tail.index.load(Ordering::Acquire);
- let mut block = self.tail.block.load(Ordering::Acquire);
- let mut next_block = None;
-
- loop {
- // Calculate the offset of the index into the block.
- let offset = (tail >> SHIFT) % LAP;
-
- // If we reached the end of the block, wait until the next one is installed.
- if offset == BLOCK_CAP {
- backoff.snooze();
- tail = self.tail.index.load(Ordering::Acquire);
- block = self.tail.block.load(Ordering::Acquire);
- continue;
- }
-
- // If we're going to have to install the next block, allocate it in advance in order to
- // make the wait for other threads as short as possible.
- if offset + 1 == BLOCK_CAP && next_block.is_none() {
- next_block = Some(Box::new(Block::<T>::new()));
- }
-
- // If this is the first push operation, we need to allocate the first block.
- if block.is_null() {
- let new = Box::into_raw(Box::new(Block::<T>::new()));
-
- if self
- .tail
- .block
- .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- self.head.block.store(new, Ordering::Release);
- block = new;
- } else {
- next_block = unsafe { Some(Box::from_raw(new)) };
- tail = self.tail.index.load(Ordering::Acquire);
- block = self.tail.block.load(Ordering::Acquire);
- continue;
- }
- }
-
- let new_tail = tail + (1 << SHIFT);
-
- // Try advancing the tail forward.
- match self.tail.index.compare_exchange_weak(
- tail,
- new_tail,
- Ordering::SeqCst,
- Ordering::Acquire,
- ) {
- Ok(_) => unsafe {
- // If we've reached the end of the block, install the next one.
- if offset + 1 == BLOCK_CAP {
- let next_block = Box::into_raw(next_block.unwrap());
- let next_index = new_tail.wrapping_add(1 << SHIFT);
-
- self.tail.block.store(next_block, Ordering::Release);
- self.tail.index.store(next_index, Ordering::Release);
- (*block).next.store(next_block, Ordering::Release);
- }
-
- // Write the value into the slot.
- let slot = (*block).slots.get_unchecked(offset);
- slot.value.get().write(MaybeUninit::new(value));
- slot.state.fetch_or(WRITE, Ordering::Release);
-
- return;
- },
- Err(t) => {
- tail = t;
- block = self.tail.block.load(Ordering::Acquire);
- backoff.spin();
- }
- }
- }
- }
-
- /// Pops an element from the queue.
- ///
- /// If the queue is empty, `None` is returned.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::SegQueue;
- ///
- /// let q = SegQueue::new();
- ///
- /// q.push(10);
- /// assert_eq!(q.pop(), Some(10));
- /// assert!(q.pop().is_none());
- /// ```
- pub fn pop(&self) -> Option<T> {
- let backoff = Backoff::new();
- let mut head = self.head.index.load(Ordering::Acquire);
- let mut block = self.head.block.load(Ordering::Acquire);
-
- loop {
- // Calculate the offset of the index into the block.
- let offset = (head >> SHIFT) % LAP;
-
- // If we reached the end of the block, wait until the next one is installed.
- if offset == BLOCK_CAP {
- backoff.snooze();
- head = self.head.index.load(Ordering::Acquire);
- block = self.head.block.load(Ordering::Acquire);
- continue;
- }
-
- let mut new_head = head + (1 << SHIFT);
-
- if new_head & HAS_NEXT == 0 {
- atomic::fence(Ordering::SeqCst);
- let tail = self.tail.index.load(Ordering::Relaxed);
-
- // If the tail equals the head, that means the queue is empty.
- if head >> SHIFT == tail >> SHIFT {
- return None;
- }
-
- // If head and tail are not in the same block, set `HAS_NEXT` in head.
- if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
- new_head |= HAS_NEXT;
- }
- }
-
- // The block can be null here only if the first push operation is in progress. In that
- // case, just wait until it gets initialized.
- if block.is_null() {
- backoff.snooze();
- head = self.head.index.load(Ordering::Acquire);
- block = self.head.block.load(Ordering::Acquire);
- continue;
- }
-
- // Try moving the head index forward.
- match self.head.index.compare_exchange_weak(
- head,
- new_head,
- Ordering::SeqCst,
- Ordering::Acquire,
- ) {
- Ok(_) => unsafe {
- // If we've reached the end of the block, move to the next one.
- if offset + 1 == BLOCK_CAP {
- let next = (*block).wait_next();
- let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
- if !(*next).next.load(Ordering::Relaxed).is_null() {
- next_index |= HAS_NEXT;
- }
-
- self.head.block.store(next, Ordering::Release);
- self.head.index.store(next_index, Ordering::Release);
- }
-
- // Read the value.
- let slot = (*block).slots.get_unchecked(offset);
- slot.wait_write();
- let value = slot.value.get().read().assume_init();
-
- // Destroy the block if we've reached the end, or if another thread wanted to
- // destroy but couldn't because we were busy reading from the slot.
- if offset + 1 == BLOCK_CAP {
- Block::destroy(block, 0);
- } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
- Block::destroy(block, offset + 1);
- }
-
- return Some(value);
- },
- Err(h) => {
- head = h;
- block = self.head.block.load(Ordering::Acquire);
- backoff.spin();
- }
- }
- }
- }
-
- /// Returns `true` if the queue is empty.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::SegQueue;
- ///
- /// let q = SegQueue::new();
- ///
- /// assert!(q.is_empty());
- /// q.push(1);
- /// assert!(!q.is_empty());
- /// ```
- pub fn is_empty(&self) -> bool {
- let head = self.head.index.load(Ordering::SeqCst);
- let tail = self.tail.index.load(Ordering::SeqCst);
- head >> SHIFT == tail >> SHIFT
- }
-
- /// Returns the number of elements in the queue.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::SegQueue;
- ///
- /// let q = SegQueue::new();
- /// assert_eq!(q.len(), 0);
- ///
- /// q.push(10);
- /// assert_eq!(q.len(), 1);
- ///
- /// q.push(20);
- /// assert_eq!(q.len(), 2);
- /// ```
- pub fn len(&self) -> usize {
- loop {
- // Load the tail index, then load the head index.
- let mut tail = self.tail.index.load(Ordering::SeqCst);
- let mut head = self.head.index.load(Ordering::SeqCst);
-
- // If the tail index didn't change, we've got consistent indices to work with.
- if self.tail.index.load(Ordering::SeqCst) == tail {
- // Erase the lower bits.
- tail &= !((1 << SHIFT) - 1);
- head &= !((1 << SHIFT) - 1);
-
- // Fix up indices if they fall onto block ends.
- if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
- tail = tail.wrapping_add(1 << SHIFT);
- }
- if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
- head = head.wrapping_add(1 << SHIFT);
- }
-
- // Rotate indices so that head falls into the first block.
- let lap = (head >> SHIFT) / LAP;
- tail = tail.wrapping_sub((lap * LAP) << SHIFT);
- head = head.wrapping_sub((lap * LAP) << SHIFT);
-
- // Remove the lower bits.
- tail >>= SHIFT;
- head >>= SHIFT;
-
- // Return the difference minus the number of blocks between tail and head.
- return tail - head - tail / LAP;
- }
- }
- }
-}
-
-impl<T> Drop for SegQueue<T> {
- fn drop(&mut self) {
- let mut head = self.head.index.load(Ordering::Relaxed);
- let mut tail = self.tail.index.load(Ordering::Relaxed);
- let mut block = self.head.block.load(Ordering::Relaxed);
-
- // Erase the lower bits.
- head &= !((1 << SHIFT) - 1);
- tail &= !((1 << SHIFT) - 1);
-
- unsafe {
- // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
- while head != tail {
- let offset = (head >> SHIFT) % LAP;
-
- if offset < BLOCK_CAP {
- // Drop the value in the slot.
- let slot = (*block).slots.get_unchecked(offset);
- let p = &mut *slot.value.get();
- p.as_mut_ptr().drop_in_place();
- } else {
- // Deallocate the block and move to the next one.
- let next = (*block).next.load(Ordering::Relaxed);
- drop(Box::from_raw(block));
- block = next;
- }
-
- head = head.wrapping_add(1 << SHIFT);
- }
-
- // Deallocate the last remaining block.
- if !block.is_null() {
- drop(Box::from_raw(block));
- }
- }
- }
-}
-
-impl<T> fmt::Debug for SegQueue<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.pad("SegQueue { .. }")
- }
-}
-
-impl<T> Default for SegQueue<T> {
- fn default() -> SegQueue<T> {
- SegQueue::new()
- }
-}
-
-impl<T> IntoIterator for SegQueue<T> {
- type Item = T;
-
- type IntoIter = IntoIter<T>;
-
- fn into_iter(self) -> Self::IntoIter {
- IntoIter { value: self }
- }
-}
-
-#[derive(Debug)]
-pub struct IntoIter<T> {
- value: SegQueue<T>,
-}
-
-impl<T> Iterator for IntoIter<T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- let value = &mut self.value;
- let head = *value.head.index.get_mut();
- let tail = *value.tail.index.get_mut();
- if head >> SHIFT == tail >> SHIFT {
- None
- } else {
- let block = *value.head.block.get_mut();
- let offset = (head >> SHIFT) % LAP;
-
- // SAFETY: We have mutable access to this, so we can read without
- // worrying about concurrency. Furthermore, we know this is
- // initialized because it is the value pointed at by `value.head`
- // and this is a non-empty queue.
- let item = unsafe {
- let slot = (*block).slots.get_unchecked(offset);
- let p = &mut *slot.value.get();
- p.as_mut_ptr().read()
- };
- if offset + 1 == BLOCK_CAP {
- // Deallocate the block and move to the next one.
- // SAFETY: The block is initialized because we've been reading
- // from it this entire time. We can drop it b/c everything has
- // been read out of it, so nothing is pointing to it anymore.
- unsafe {
- let next = *(*block).next.get_mut();
- drop(Box::from_raw(block));
- *value.head.block.get_mut() = next;
- }
- // The last value in a block is empty, so skip it
- *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
- // Double-check that we're pointing to the first item in a block.
- debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
- } else {
- *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
- }
- Some(item)
- }
- }
-}