use alloc::boxed::Box; use core::cell::UnsafeCell; use core::fmt; use core::marker::PhantomData; use core::mem::MaybeUninit; use core::ptr; use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; // Bits indicating the state of a slot: // * If a value has been written into the slot, `WRITE` is set. // * If a value has been read from the slot, `READ` is set. // * If the block is being destroyed, `DESTROY` is set. const WRITE: usize = 1; const READ: usize = 2; const DESTROY: usize = 4; // Each block covers one "lap" of indices. const LAP: usize = 32; // The maximum number of values a block can hold. const BLOCK_CAP: usize = LAP - 1; // How many lower bits are reserved for metadata. const SHIFT: usize = 1; // Indicates that the block is not the last one. const HAS_NEXT: usize = 1; /// A slot in a block. struct Slot { /// The value. value: UnsafeCell>, /// The state of the slot. state: AtomicUsize, } impl Slot { const UNINIT: Self = Self { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), }; /// Waits until a value is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); while self.state.load(Ordering::Acquire) & WRITE == 0 { backoff.snooze(); } } } /// A block in a linked list. /// /// Each block in the list can hold up to `BLOCK_CAP` values. struct Block { /// The next block in the linked list. next: AtomicPtr>, /// Slots for values. slots: [Slot; BLOCK_CAP], } impl Block { /// Creates an empty block that starts at `start_index`. fn new() -> Block { Self { next: AtomicPtr::new(ptr::null_mut()), slots: [Slot::UNINIT; BLOCK_CAP], } } /// Waits until the next pointer is set. fn wait_next(&self) -> *mut Block { let backoff = Backoff::new(); loop { let next = self.next.load(Ordering::Acquire); if !next.is_null() { return next; } backoff.snooze(); } } /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. unsafe fn destroy(this: *mut Block, start: usize) { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. for i in start..BLOCK_CAP - 1 { let slot = (*this).slots.get_unchecked(i); // Mark the `DESTROY` bit if a thread is still using the slot. if slot.state.load(Ordering::Acquire) & READ == 0 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 { // If a thread is still using the slot, it will continue destruction of the block. return; } } // No thread is using the block, now it is safe to destroy it. drop(Box::from_raw(this)); } } /// A position in a queue. struct Position { /// The index in the queue. index: AtomicUsize, /// The block in the linked list. block: AtomicPtr>, } /// An unbounded multi-producer multi-consumer queue. /// /// This queue is implemented as a linked list of segments, where each segment is a small buffer /// that can hold a handful of elements. There is no limit to how many elements can be in the queue /// at a time. However, since segments need to be dynamically allocated as elements get pushed, /// this queue is somewhat slower than [`ArrayQueue`]. /// /// [`ArrayQueue`]: super::ArrayQueue /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::new(); /// /// q.push('a'); /// q.push('b'); /// /// assert_eq!(q.pop(), Some('a')); /// assert_eq!(q.pop(), Some('b')); /// assert!(q.pop().is_none()); /// ``` pub struct SegQueue { /// The head of the queue. head: CachePadded>, /// The tail of the queue. tail: CachePadded>, /// Indicates that dropping a `SegQueue` may drop values of type `T`. _marker: PhantomData, } unsafe impl Send for SegQueue {} unsafe impl Sync for SegQueue {} impl SegQueue { /// Creates a new unbounded queue. /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::::new(); /// ``` pub const fn new() -> SegQueue { SegQueue { head: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), index: AtomicUsize::new(0), }), tail: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), index: AtomicUsize::new(0), }), _marker: PhantomData, } } /// Pushes an element into the queue. /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::new(); /// /// q.push(10); /// q.push(20); /// ``` pub fn push(&self, value: T) { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); let mut block = self.tail.block.load(Ordering::Acquire); let mut next_block = None; loop { // Calculate the offset of the index into the block. let offset = (tail >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); tail = self.tail.index.load(Ordering::Acquire); block = self.tail.block.load(Ordering::Acquire); continue; } // If we're going to have to install the next block, allocate it in advance in order to // make the wait for other threads as short as possible. if offset + 1 == BLOCK_CAP && next_block.is_none() { next_block = Some(Box::new(Block::::new())); } // If this is the first push operation, we need to allocate the first block. if block.is_null() { let new = Box::into_raw(Box::new(Block::::new())); if self .tail .block .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) .is_ok() { self.head.block.store(new, Ordering::Release); block = new; } else { next_block = unsafe { Some(Box::from_raw(new)) }; tail = self.tail.index.load(Ordering::Acquire); block = self.tail.block.load(Ordering::Acquire); continue; } } let new_tail = tail + (1 << SHIFT); // Try advancing the tail forward. match self.tail.index.compare_exchange_weak( tail, new_tail, Ordering::SeqCst, Ordering::Acquire, ) { Ok(_) => unsafe { // If we've reached the end of the block, install the next one. if offset + 1 == BLOCK_CAP { let next_block = Box::into_raw(next_block.unwrap()); let next_index = new_tail.wrapping_add(1 << SHIFT); self.tail.block.store(next_block, Ordering::Release); self.tail.index.store(next_index, Ordering::Release); (*block).next.store(next_block, Ordering::Release); } // Write the value into the slot. let slot = (*block).slots.get_unchecked(offset); slot.value.get().write(MaybeUninit::new(value)); slot.state.fetch_or(WRITE, Ordering::Release); return; }, Err(t) => { tail = t; block = self.tail.block.load(Ordering::Acquire); backoff.spin(); } } } } /// Pops an element from the queue. /// /// If the queue is empty, `None` is returned. /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::new(); /// /// q.push(10); /// assert_eq!(q.pop(), Some(10)); /// assert!(q.pop().is_none()); /// ``` pub fn pop(&self) -> Option { let backoff = Backoff::new(); let mut head = self.head.index.load(Ordering::Acquire); let mut block = self.head.block.load(Ordering::Acquire); loop { // Calculate the offset of the index into the block. let offset = (head >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); continue; } let mut new_head = head + (1 << SHIFT); if new_head & HAS_NEXT == 0 { atomic::fence(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::Relaxed); // If the tail equals the head, that means the queue is empty. if head >> SHIFT == tail >> SHIFT { return None; } // If head and tail are not in the same block, set `HAS_NEXT` in head. if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; } } // The block can be null here only if the first push operation is in progress. In that // case, just wait until it gets initialized. if block.is_null() { backoff.snooze(); head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); continue; } // Try moving the head index forward. match self.head.index.compare_exchange_weak( head, new_head, Ordering::SeqCst, Ordering::Acquire, ) { Ok(_) => unsafe { // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { let next = (*block).wait_next(); let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= HAS_NEXT; } self.head.block.store(next, Ordering::Release); self.head.index.store(next_index, Ordering::Release); } // Read the value. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); let value = slot.value.get().read().assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to // destroy but couldn't because we were busy reading from the slot. if offset + 1 == BLOCK_CAP { Block::destroy(block, 0); } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { Block::destroy(block, offset + 1); } return Some(value); }, Err(h) => { head = h; block = self.head.block.load(Ordering::Acquire); backoff.spin(); } } } } /// Returns `true` if the queue is empty. /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::new(); /// /// assert!(q.is_empty()); /// q.push(1); /// assert!(!q.is_empty()); /// ``` pub fn is_empty(&self) -> bool { let head = self.head.index.load(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::SeqCst); head >> SHIFT == tail >> SHIFT } /// Returns the number of elements in the queue. /// /// # Examples /// /// ``` /// use crossbeam_queue::SegQueue; /// /// let q = SegQueue::new(); /// assert_eq!(q.len(), 0); /// /// q.push(10); /// assert_eq!(q.len(), 1); /// /// q.push(20); /// assert_eq!(q.len(), 2); /// ``` pub fn len(&self) -> usize { loop { // Load the tail index, then load the head index. let mut tail = self.tail.index.load(Ordering::SeqCst); let mut head = self.head.index.load(Ordering::SeqCst); // If the tail index didn't change, we've got consistent indices to work with. if self.tail.index.load(Ordering::SeqCst) == tail { // Erase the lower bits. tail &= !((1 << SHIFT) - 1); head &= !((1 << SHIFT) - 1); // Fix up indices if they fall onto block ends. if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { tail = tail.wrapping_add(1 << SHIFT); } if (head >> SHIFT) & (LAP - 1) == LAP - 1 { head = head.wrapping_add(1 << SHIFT); } // Rotate indices so that head falls into the first block. let lap = (head >> SHIFT) / LAP; tail = tail.wrapping_sub((lap * LAP) << SHIFT); head = head.wrapping_sub((lap * LAP) << SHIFT); // Remove the lower bits. tail >>= SHIFT; head >>= SHIFT; // Return the difference minus the number of blocks between tail and head. return tail - head - tail / LAP; } } } } impl Drop for SegQueue { fn drop(&mut self) { let mut head = *self.head.index.get_mut(); let mut tail = *self.tail.index.get_mut(); let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); tail &= !((1 << SHIFT) - 1); unsafe { // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. while head != tail { let offset = (head >> SHIFT) % LAP; if offset < BLOCK_CAP { // Drop the value in the slot. let slot = (*block).slots.get_unchecked(offset); let p = &mut *slot.value.get(); p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } head = head.wrapping_add(1 << SHIFT); } // Deallocate the last remaining block. if !block.is_null() { drop(Box::from_raw(block)); } } } } impl fmt::Debug for SegQueue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("SegQueue { .. }") } } impl Default for SegQueue { fn default() -> SegQueue { SegQueue::new() } } impl IntoIterator for SegQueue { type Item = T; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter { value: self } } } #[derive(Debug)] pub struct IntoIter { value: SegQueue, } impl Iterator for IntoIter { type Item = T; fn next(&mut self) -> Option { let value = &mut self.value; let head = *value.head.index.get_mut(); let tail = *value.tail.index.get_mut(); if head >> SHIFT == tail >> SHIFT { None } else { let block = *value.head.block.get_mut(); let offset = (head >> SHIFT) % LAP; // SAFETY: We have mutable access to this, so we can read without // worrying about concurrency. Furthermore, we know this is // initialized because it is the value pointed at by `value.head` // and this is a non-empty queue. let item = unsafe { let slot = (*block).slots.get_unchecked(offset); let p = &mut *slot.value.get(); p.as_mut_ptr().read() }; if offset + 1 == BLOCK_CAP { // Deallocate the block and move to the next one. // SAFETY: The block is initialized because we've been reading // from it this entire time. We can drop it b/c everything has // been read out of it, so nothing is pointing to it anymore. unsafe { let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); *value.head.block.get_mut() = next; } // The last value in a block is empty, so skip it *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); // Double-check that we're pointing to the first item in a block. debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); } else { *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); } Some(item) } } }