diff options
Diffstat (limited to 'third_party/rust/tokio/src/sync/mpsc/list.rs')
-rw-r--r-- | third_party/rust/tokio/src/sync/mpsc/list.rs | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/mpsc/list.rs b/third_party/rust/tokio/src/sync/mpsc/list.rs new file mode 100644 index 0000000000..e4eeb45411 --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/list.rs @@ -0,0 +1,371 @@ +//! A concurrent, lock-free, FIFO list. + +use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; +use crate::loom::thread; +use crate::sync::mpsc::block::{self, Block}; + +use std::fmt; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; + +/// List queue transmit handle. +pub(crate) struct Tx<T> { + /// Tail in the `Block` mpmc list. + block_tail: AtomicPtr<Block<T>>, + + /// Position to push the next message. This references a block and offset + /// into the block. + tail_position: AtomicUsize, +} + +/// List queue receive handle +pub(crate) struct Rx<T> { + /// Pointer to the block being processed. + head: NonNull<Block<T>>, + + /// Next slot index to process. + index: usize, + + /// Pointer to the next block pending release. + free_head: NonNull<Block<T>>, +} + +/// Return value of `Rx::try_pop`. +pub(crate) enum TryPopResult<T> { + /// Successfully popped a value. + Ok(T), + /// The channel is empty. + Empty, + /// The channel is empty and closed. + Closed, + /// The channel is not empty, but the first value is being written. + Busy, +} + +pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) { + // Create the initial block shared between the tx and rx halves. + let initial_block = Box::new(Block::new(0)); + let initial_block_ptr = Box::into_raw(initial_block); + + let tx = Tx { + block_tail: AtomicPtr::new(initial_block_ptr), + tail_position: AtomicUsize::new(0), + }; + + let head = NonNull::new(initial_block_ptr).unwrap(); + + let rx = Rx { + head, + index: 0, + free_head: head, + }; + + (tx, rx) +} + +impl<T> Tx<T> { + /// Pushes a value into the list. + pub(crate) fn push(&self, value: T) { + // First, claim a slot for the value. `Acquire` is used here to + // synchronize with the `fetch_add` in `reclaim_blocks`. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + // Load the current block and write the value + let block = self.find_block(slot_index); + + unsafe { + // Write the value to the block + block.as_ref().write(slot_index, value); + } + } + + /// Closes the send half of the list. + /// + /// Similar process as pushing a value, but instead of writing the value & + /// setting the ready flag, the TX_CLOSED flag is set on the block. + pub(crate) fn close(&self) { + // First, claim a slot for the value. This is the last slot that will be + // claimed. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + let block = self.find_block(slot_index); + + unsafe { block.as_ref().tx_close() } + } + + fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> { + // The start index of the block that contains `index`. + let start_index = block::start_index(slot_index); + + // The index offset into the block + let offset = block::offset(slot_index); + + // Load the current head of the block + let mut block_ptr = self.block_tail.load(Acquire); + + let block = unsafe { &*block_ptr }; + + // Calculate the distance between the tail ptr and the target block + let distance = block.distance(start_index); + + // Decide if this call to `find_block` should attempt to update the + // `block_tail` pointer. + // + // Updating `block_tail` is not always performed in order to reduce + // contention. + // + // When set, as the routine walks the linked list, it attempts to update + // `block_tail`. If the update cannot be performed, `try_updating_tail` + // is unset. + let mut try_updating_tail = distance > offset; + + // Walk the linked list of blocks until the block with `start_index` is + // found. + loop { + let block = unsafe { &(*block_ptr) }; + + if block.is_at_index(start_index) { + return unsafe { NonNull::new_unchecked(block_ptr) }; + } + + let next_block = block + .load_next(Acquire) + // There is no allocated next block, grow the linked list. + .unwrap_or_else(|| block.grow()); + + // If the block is **not** final, then the tail pointer cannot be + // advanced any more. + try_updating_tail &= block.is_final(); + + if try_updating_tail { + // Advancing `block_tail` must happen when walking the linked + // list. `block_tail` may not advance passed any blocks that are + // not "final". At the point a block is finalized, it is unknown + // if there are any prior blocks that are unfinalized, which + // makes it impossible to advance `block_tail`. + // + // While walking the linked list, `block_tail` can be advanced + // as long as finalized blocks are traversed. + // + // Release ordering is used to ensure that any subsequent reads + // are able to see the memory pointed to by `block_tail`. + // + // Acquire is not needed as any "actual" value is not accessed. + // At this point, the linked list is walked to acquire blocks. + if self + .block_tail + .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed) + .is_ok() + { + // Synchronize with any senders + let tail_position = self.tail_position.fetch_add(0, Release); + + unsafe { + block.tx_release(tail_position); + } + } else { + // A concurrent sender is also working on advancing + // `block_tail` and this thread is falling behind. + // + // Stop trying to advance the tail pointer + try_updating_tail = false; + } + } + + block_ptr = next_block.as_ptr(); + + thread::yield_now(); + } + } + + pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) { + // The block has been removed from the linked list and ownership + // is reclaimed. + // + // Before dropping the block, see if it can be reused by + // inserting it back at the end of the linked list. + // + // First, reset the data + block.as_mut().reclaim(); + + let mut reused = false; + + // Attempt to insert the block at the end + // + // Walk at most three times + // + let curr_ptr = self.block_tail.load(Acquire); + + // The pointer can never be null + debug_assert!(!curr_ptr.is_null()); + + let mut curr = NonNull::new_unchecked(curr_ptr); + + // TODO: Unify this logic with Block::grow + for _ in 0..3 { + match curr.as_ref().try_push(&mut block, AcqRel, Acquire) { + Ok(_) => { + reused = true; + break; + } + Err(next) => { + curr = next; + } + } + } + + if !reused { + let _ = Box::from_raw(block.as_ptr()); + } + } +} + +impl<T> fmt::Debug for Tx<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx") + .field("block_tail", &self.block_tail.load(Relaxed)) + .field("tail_position", &self.tail_position.load(Relaxed)) + .finish() + } +} + +impl<T> Rx<T> { + /// Pops the next value off the queue. + pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> { + // Advance `head`, if needed + if !self.try_advancing_head() { + return None; + } + + self.reclaim_blocks(tx); + + unsafe { + let block = self.head.as_ref(); + + let ret = block.read(self.index); + + if let Some(block::Read::Value(..)) = ret { + self.index = self.index.wrapping_add(1); + } + + ret + } + } + + /// Pops the next value off the queue, detecting whether the block + /// is busy or empty on failure. + /// + /// This function exists because `Rx::pop` can return `None` even if the + /// channel's queue contains a message that has been completely written. + /// This can happen if the fully delivered message is behind another message + /// that is in the middle of being written to the block, since the channel + /// can't return the messages out of order. + pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> { + let tail_position = tx.tail_position.load(Acquire); + let result = self.pop(tx); + + match result { + Some(block::Read::Value(t)) => TryPopResult::Ok(t), + Some(block::Read::Closed) => TryPopResult::Closed, + None if tail_position == self.index => TryPopResult::Empty, + None => TryPopResult::Busy, + } + } + + /// Tries advancing the block pointer to the block referenced by `self.index`. + /// + /// Returns `true` if successful, `false` if there is no next block to load. + fn try_advancing_head(&mut self) -> bool { + let block_index = block::start_index(self.index); + + loop { + let next_block = { + let block = unsafe { self.head.as_ref() }; + + if block.is_at_index(block_index) { + return true; + } + + block.load_next(Acquire) + }; + + let next_block = match next_block { + Some(next_block) => next_block, + None => { + return false; + } + }; + + self.head = next_block; + + thread::yield_now(); + } + } + + fn reclaim_blocks(&mut self, tx: &Tx<T>) { + while self.free_head != self.head { + unsafe { + // Get a handle to the block that will be freed and update + // `free_head` to point to the next block. + let block = self.free_head; + + let observed_tail_position = block.as_ref().observed_tail_position(); + + let required_index = match observed_tail_position { + Some(i) => i, + None => return, + }; + + if required_index > self.index { + return; + } + + // We may read the next pointer with `Relaxed` ordering as it is + // guaranteed that the `reclaim_blocks` routine trails the `recv` + // routine. Any memory accessed by `reclaim_blocks` has already + // been acquired by `recv`. + let next_block = block.as_ref().load_next(Relaxed); + + // Update the free list head + self.free_head = next_block.unwrap(); + + // Push the emptied block onto the back of the queue, making it + // available to senders. + tx.reclaim_block(block); + } + + thread::yield_now(); + } + } + + /// Effectively `Drop` all the blocks. Should only be called once, when + /// the list is dropping. + pub(super) unsafe fn free_blocks(&mut self) { + debug_assert_ne!(self.free_head, NonNull::dangling()); + + let mut cur = Some(self.free_head); + + #[cfg(debug_assertions)] + { + // to trigger the debug assert above so as to catch that we + // don't call `free_blocks` more than once. + self.free_head = NonNull::dangling(); + self.head = NonNull::dangling(); + } + + while let Some(block) = cur { + cur = block.as_ref().load_next(Relaxed); + drop(Box::from_raw(block.as_ptr())); + } + } +} + +impl<T> fmt::Debug for Rx<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx") + .field("head", &self.head) + .field("index", &self.index) + .field("free_head", &self.free_head) + .finish() + } +} |