From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/tokio/src/sync/mpsc/block.rs | 387 ++++++++++++++++ third_party/rust/tokio/src/sync/mpsc/bounded.rs | 479 ++++++++++++++++++++ third_party/rust/tokio/src/sync/mpsc/chan.rs | 524 ++++++++++++++++++++++ third_party/rust/tokio/src/sync/mpsc/error.rs | 146 ++++++ third_party/rust/tokio/src/sync/mpsc/list.rs | 341 ++++++++++++++ third_party/rust/tokio/src/sync/mpsc/mod.rs | 64 +++ third_party/rust/tokio/src/sync/mpsc/unbounded.rs | 176 ++++++++ 7 files changed, 2117 insertions(+) create mode 100644 third_party/rust/tokio/src/sync/mpsc/block.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/bounded.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/chan.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/error.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/list.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/mod.rs create mode 100644 third_party/rust/tokio/src/sync/mpsc/unbounded.rs (limited to 'third_party/rust/tokio/src/sync/mpsc') diff --git a/third_party/rust/tokio/src/sync/mpsc/block.rs b/third_party/rust/tokio/src/sync/mpsc/block.rs new file mode 100644 index 0000000000..7bf161967b --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/block.rs @@ -0,0 +1,387 @@ +use crate::loom::{ + cell::UnsafeCell, + sync::atomic::{AtomicPtr, AtomicUsize}, + thread, +}; + +use std::mem::MaybeUninit; +use std::ops; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` messages. +pub(crate) struct Block { + /// The start index of this block. + /// + /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. + start_index: usize, + + /// The next block in the linked list. + next: AtomicPtr>, + + /// Bitfield tracking slots that are ready to have their values consumed. + ready_slots: AtomicUsize, + + /// The observed `tail_position` value *after* the block has been passed by + /// `block_tail`. + observed_tail_position: UnsafeCell, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values, +} + +pub(crate) enum Read { + Value(T), + Closed, +} + +struct Values([UnsafeCell>; BLOCK_CAP]); + +use super::BLOCK_CAP; + +/// Masks an index to get the block identifier +const BLOCK_MASK: usize = !(BLOCK_CAP - 1); + +/// Masks an index to get the value offset in a block. +const SLOT_MASK: usize = BLOCK_CAP - 1; + +/// Flag tracking that a block has gone through the sender's release routine. +/// +/// When this is set, the receiver may consider freeing the block. +const RELEASED: usize = 1 << BLOCK_CAP; + +/// Flag tracking all senders dropped. +/// +/// When this flag is set, the send half of the channel has closed. +const TX_CLOSED: usize = RELEASED << 1; + +/// Mask covering all bits used to track slot readiness. +const READY_MASK: usize = RELEASED - 1; + +/// Returns the index of the first slot in the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn start_index(slot_index: usize) -> usize { + BLOCK_MASK & slot_index +} + +/// Returns the offset into the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn offset(slot_index: usize) -> usize { + SLOT_MASK & slot_index +} + +impl Block { + pub(crate) fn new(start_index: usize) -> Block { + Block { + // The absolute index in the channel of the first slot in the block. + start_index, + + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), + + ready_slots: AtomicUsize::new(0), + + observed_tail_position: UnsafeCell::new(0), + + // Value storage + values: unsafe { Values::uninitialized() }, + } + } + + /// Returns `true` if the block matches the given index + pub(crate) fn is_at_index(&self, index: usize) -> bool { + debug_assert!(offset(index) == 0); + self.start_index == index + } + + /// Returns the number of blocks between `self` and the block at the + /// specified index. + /// + /// `start_index` must represent a block *after* `self`. + pub(crate) fn distance(&self, other_index: usize) -> usize { + debug_assert!(offset(other_index) == 0); + other_index.wrapping_sub(self.start_index) / BLOCK_CAP + } + + /// Reads the value at the given offset. + /// + /// Returns `None` if the slot is empty. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * No concurrent access to the slot. + pub(crate) unsafe fn read(&self, slot_index: usize) -> Option> { + let offset = offset(slot_index); + + let ready_bits = self.ready_slots.load(Acquire); + + if !is_ready(ready_bits, offset) { + if is_tx_closed(ready_bits) { + return Some(Read::Closed); + } + + return None; + } + + // Get the value + let value = self.values[offset].with(|ptr| ptr::read(ptr)); + + Some(Read::Value(value.assume_init())) + } + + /// Writes a value to the block at the given offset. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The slot is empty. + /// * No concurrent access to the slot. + pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { + // Get the offset into the block + let slot_offset = offset(slot_index); + + self.values[slot_offset].with_mut(|ptr| { + ptr::write(ptr, MaybeUninit::new(value)); + }); + + // Release the value. After this point, the slot ref may no longer + // be used. It is possible for the receiver to free the memory at + // any point. + self.set_ready(slot_offset); + } + + /// Signal to the receiver that the sender half of the list is closed. + pub(crate) unsafe fn tx_close(&self) { + self.ready_slots.fetch_or(TX_CLOSED, Release); + } + + /// Resets the block to a blank state. This enables reusing blocks in the + /// channel. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * All slots are empty. + /// * The caller holds a unique pointer to the block. + pub(crate) unsafe fn reclaim(&mut self) { + self.start_index = 0; + self.next = AtomicPtr::new(ptr::null_mut()); + self.ready_slots = AtomicUsize::new(0); + } + + /// Releases the block to the rx half for freeing. + /// + /// This function is called by the tx half once it can be guaranteed that no + /// more senders will attempt to access the block. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The block will no longer be accessed by any sender. + pub(crate) unsafe fn tx_release(&self, tail_position: usize) { + // Track the observed tail_position. Any sender targetting a greater + // tail_position is guaranteed to not access this block. + self.observed_tail_position + .with_mut(|ptr| *ptr = tail_position); + + // Set the released bit, signalling to the receiver that it is safe to + // free the block's memory as soon as all slots **prior** to + // `observed_tail_position` have been filled. + self.ready_slots.fetch_or(RELEASED, Release); + } + + /// Mark a slot as ready + fn set_ready(&self, slot: usize) { + let mask = 1 << slot; + self.ready_slots.fetch_or(mask, Release); + } + + /// Returns `true` when all slots have their `ready` bits set. + /// + /// This indicates that the block is in its final state and will no longer + /// be mutated. + /// + /// # Implementation + /// + /// The implementation walks each slot checking the `ready` flag. It might + /// be that it would make more sense to coalesce ready flags as bits in a + /// single atomic cell. However, this could have negative impact on cache + /// behavior as there would be many more mutations to a single slot. + pub(crate) fn is_final(&self) -> bool { + self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + } + + /// Returns the `observed_tail_position` value, if set + pub(crate) fn observed_tail_position(&self) -> Option { + if 0 == RELEASED & self.ready_slots.load(Acquire) { + None + } else { + Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + } + } + + /// Loads the next block + pub(crate) fn load_next(&self, ordering: Ordering) -> Option>> { + let ret = NonNull::new(self.next.load(ordering)); + + debug_assert!(unsafe { + ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) + .unwrap_or(true) + }); + + ret + } + + /// Pushes `block` as the next block in the link. + /// + /// Returns Ok if successful, otherwise, a pointer to the next block in + /// the list is returned. + /// + /// This requires that the next pointer is null. + /// + /// # Ordering + /// + /// This performs a compare-and-swap on `next` using AcqRel ordering. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * `block` is not freed until it has been removed from the list. + pub(crate) unsafe fn try_push( + &self, + block: &mut NonNull>, + ordering: Ordering, + ) -> Result<(), NonNull>> { + block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + + let next_ptr = self + .next + .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + + match NonNull::new(next_ptr) { + Some(next_ptr) => Err(next_ptr), + None => Ok(()), + } + } + + /// Grows the `Block` linked list by allocating and appending a new block. + /// + /// The next block in the linked list is returned. This may or may not be + /// the one allocated by the function call. + /// + /// # Implementation + /// + /// It is assumed that `self.next` is null. A new block is allocated with + /// `start_index` set to be the next block. A compare-and-swap is performed + /// with AcqRel memory ordering. If the compare-and-swap is successful, the + /// newly allocated block is released to other threads walking the block + /// linked list. If the compare-and-swap fails, the current thread acquires + /// the next block in the linked list, allowing the current thread to access + /// the slots. + pub(crate) fn grow(&self) -> NonNull> { + // Create the new block. It is assumed that the block will become the + // next one after `&self`. If this turns out to not be the case, + // `start_index` is updated accordingly. + let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + + let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; + + // Attempt to store the block. The first compare-and-swap attempt is + // "unrolled" due to minor differences in logic + // + // `AcqRel` is used as the ordering **only** when attempting the + // compare-and-swap on self.next. + // + // If the compare-and-swap fails, then the actual value of the cell is + // returned from this function and accessed by the caller. Given this, + // the memory must be acquired. + // + // `Release` ensures that the newly allocated block is available to + // other threads acquiring the next pointer. + let next = NonNull::new(self.next.compare_and_swap( + ptr::null_mut(), + new_block.as_ptr(), + AcqRel, + )); + + let next = match next { + Some(next) => next, + None => { + // The compare-and-swap succeeded and the newly allocated block + // is successfully pushed. + return new_block; + } + }; + + // There already is a next block in the linked list. The newly allocated + // block could be dropped and the discovered next block returned; + // however, that would be wasteful. Instead, the linked list is walked + // by repeatedly attempting to compare-and-swap the pointer into the + // `next` register until the compare-and-swap succeed. + // + // Care is taken to update new_block's start_index field as appropriate. + + let mut curr = next; + + // TODO: Should this iteration be capped? + loop { + let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + + curr = match actual { + Ok(_) => { + return next; + } + Err(curr) => curr, + }; + + // When running outside of loom, this calls `spin_loop_hint`. + thread::yield_now(); + } + } +} + +/// Returns `true` if the specificed slot has a value ready to be consumed. +fn is_ready(bits: usize, slot: usize) -> bool { + let mask = 1 << slot; + mask == mask & bits +} + +/// Returns `true` if the closed flag has been set. +fn is_tx_closed(bits: usize) -> bool { + TX_CLOSED == bits & TX_CLOSED +} + +impl Values { + unsafe fn uninitialized() -> Values { + let mut vals = MaybeUninit::uninit(); + + // When fuzzing, `UnsafeCell` needs to be initialized. + if_loom! { + let p = vals.as_mut_ptr() as *mut UnsafeCell>; + for i in 0..BLOCK_CAP { + p.add(i) + .write(UnsafeCell::new(MaybeUninit::uninit())); + } + } + + Values(vals.assume_init()) + } +} + +impl ops::Index for Values { + type Output = UnsafeCell>; + + fn index(&self, index: usize) -> &Self::Output { + self.0.index(index) + } +} diff --git a/third_party/rust/tokio/src/sync/mpsc/bounded.rs b/third_party/rust/tokio/src/sync/mpsc/bounded.rs new file mode 100644 index 0000000000..afca8c524d --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/bounded.rs @@ -0,0 +1,479 @@ +use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; +use crate::sync::semaphore_ll as semaphore; + +cfg_time! { + use crate::sync::mpsc::error::SendTimeoutError; + use crate::time::Duration; +} + +use std::fmt; +use std::task::{Context, Poll}; + +/// Send values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`](channel) function. +pub struct Sender { + chan: chan::Tx, +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `Sender`. +/// +/// Instances are created by the [`channel`](channel) function. +pub struct Receiver { + /// The channel receiver + chan: chan::Rx, +} + +impl fmt::Debug for Receiver { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Creates a bounded mpsc channel for communicating between asynchronous tasks, +/// returning the sender/receiver halves. +/// +/// All data sent on `Sender` will become available on `Receiver` in the same +/// order as it was sent. +/// +/// The `Sender` can be cloned to `send` to the same channel from multiple code +/// locations. Only one `Receiver` is supported. +/// +/// If the `Receiver` is disconnected while trying to `send`, the `send` method +/// will return a `SendError`. Similarly, if `Sender` is disconnected while +/// trying to `recv`, the `recv` method will return a `RecvError`. +/// +/// # Examples +/// +/// ```rust +/// use tokio::sync::mpsc; +/// +/// #[tokio::main] +/// async fn main() { +/// let (mut tx, mut rx) = mpsc::channel(100); +/// +/// tokio::spawn(async move { +/// for i in 0..10 { +/// if let Err(_) = tx.send(i).await { +/// println!("receiver dropped"); +/// return; +/// } +/// } +/// }); +/// +/// while let Some(i) = rx.recv().await { +/// println!("got = {}", i); +/// } +/// } +/// ``` +pub fn channel(buffer: usize) -> (Sender, Receiver) { + assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); + let semaphore = (semaphore::Semaphore::new(buffer), buffer); + let (tx, rx) = chan::channel(semaphore); + + let tx = Sender::new(tx); + let rx = Receiver::new(rx); + + (tx, rx) +} + +/// Channel semaphore is a tuple of the semaphore implementation and a `usize` +/// representing the channel bound. +type Semaphore = (semaphore::Semaphore, usize); + +impl Receiver { + pub(crate) fn new(chan: chan::Rx) -> Receiver { + Receiver { chan } + } + + /// Receives the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tokio::spawn(async move { + /// tx.send("hello").await.unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tx.send("hello").await.unwrap(); + /// tx.send("world").await.unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option { + use crate::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + #[doc(hidden)] // TODO: document + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with + /// a possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with recv, this function has two failure cases instead of + /// one (one for disconnection, one for an empty buffer). + pub fn try_recv(&mut self) -> Result { + self.chan.try_recv() + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl Unpin for Receiver {} + +cfg_stream! { + impl crate::stream::Stream for Receiver { + type Item = T; + + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_recv(cx) + } + } +} + +impl Sender { + pub(crate) fn new(chan: chan::Tx) -> Sender { + Sender { chan } + } + + /// Sends a value, waiting until there is capacity. + /// + /// A successful send occurs when it is determined that the other end of the + /// channel has not hung up already. An unsuccessful send would be one where + /// the corresponding receiver has already been closed. Note that a return + /// value of `Err` means that the data will never be received, but a return + /// value of `Ok` does not mean that the data will be received. It is + /// possible for the corresponding receiver to hang up immediately after + /// this function returns `Ok`. + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] handle dropping, the function returns + /// an error. The error includes the value passed to `send`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + /// + /// # Examples + /// + /// In the following example, each call to `send` will block until the + /// previously sent value was received. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(_) = tx.send(i).await { + /// println!("receiver dropped"); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// } + /// } + /// ``` + pub async fn send(&mut self, value: T) -> Result<(), SendError> { + use crate::future::poll_fn; + + if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { + return Err(SendError(value)); + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendError(value)), + } + } + + /// Attempts to immediately send a message on this `Sender` + /// + /// This method differs from [`send`] by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. Compared + /// with [`send`], this function has two failure cases instead of one (one for + /// disconnection, one for a full buffer). + /// + /// This function may be paired with [`poll_ready`] in order to wait for + /// channel capacity before trying to send a value. + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`channel`], then an + /// error is returned. + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] handle dropping, the function returns + /// an error. The error includes the value passed to `send`. + /// + /// [`send`]: Sender::send + /// [`poll_ready`]: Sender::poll_ready + /// [`channel`]: channel + /// [`close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// // Create a channel with buffer size 1 + /// let (mut tx1, mut rx) = mpsc::channel(1); + /// let mut tx2 = tx1.clone(); + /// + /// tokio::spawn(async move { + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// // task waits until the receiver receives a value. + /// }); + /// + /// tokio::spawn(async move { + /// // This will return an error and send + /// // no message if the buffer is full + /// let _ = tx2.try_send(3); + /// }); + /// + /// let mut msg; + /// msg = rx.recv().await.unwrap(); + /// println!("message {} received", msg); + /// + /// msg = rx.recv().await.unwrap(); + /// println!("message {} received", msg); + /// + /// // Third message may have never been sent + /// match rx.recv().await { + /// Some(msg) => println!("message {} received", msg), + /// None => println!("the third message was never sent"), + /// } + /// } + /// ``` + pub fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.chan.try_send(message)?; + Ok(()) + } + + /// Sends a value, waiting until there is capacity, but only for a limited time. + /// + /// Shares the same success and error conditions as [`send`], adding one more + /// condition for an unsuccessful send, which is when the provided timeout has + /// elapsed, and there is no capacity available. + /// + /// [`send`]: Sender::send + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] having been dropped, + /// the function returns an error. The error includes the value passed to `send`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + /// + /// # Examples + /// + /// In the following example, each call to `send_timeout` will block until the + /// previously sent value was received, unless the timeout has elapsed. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// use tokio::time::{delay_for, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { + /// println!("send error: #{:?}", e); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// delay_for(Duration::from_millis(200)).await; + /// } + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + pub async fn send_timeout( + &mut self, + value: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError> { + use crate::future::poll_fn; + + match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + Err(_) => { + return Err(SendTimeoutError::Timeout(value)); + } + Ok(Err(_)) => { + return Err(SendTimeoutError::Closed(value)); + } + Ok(_) => {} + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), + } + } + + /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. + /// + /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a + /// slot becomes available. + /// + /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless + /// the channel has since been closed. To provide this guarantee, the channel reserves one slot + /// in the channel for the coming send. This reserved slot is not available to other `Sender` + /// instances, so you need to be careful to not end up with deadlocks by blocking after calling + /// `poll_ready` but before sending an element. + /// + /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you + /// can use [`disarm`](Sender::disarm) to release the reserved slot. + /// + /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to + /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel + /// is closed. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) + } + + /// Undo a successful call to `poll_ready`. + /// + /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the + /// channel to make room for the coming send. `disarm` allows you to give up that slot if you + /// decide you do not wish to send an item after all. After calling `disarm`, you must call + /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again. + /// + /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was + /// not previously called, or did not succeed). + /// + /// # Motivation + /// + /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers + /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may + /// take up all the slots of the channel, and prevent active senders from getting any requests + /// through. Consider this code that forwards from one channel to another: + /// + /// ```rust,ignore + /// loop { + /// ready!(tx.poll_ready(cx))?; + /// if let Some(item) = ready!(rx.poll_recv(cx)) { + /// tx.try_send(item)?; + /// } else { + /// break; + /// } + /// } + /// ``` + /// + /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then + /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do, + /// they are effectively each reducing the channel's capacity by 1. If enough of these + /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot + /// for them through `poll_ready`, and the system will deadlock. + /// + /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that + /// you have to block. We can then fix the code above by writing: + /// + /// ```rust,ignore + /// loop { + /// ready!(tx.poll_ready(cx))?; + /// let item = rx.poll_recv(cx); + /// if let Poll::Ready(Ok(_)) = item { + /// // we're going to send the item below, so don't disarm + /// } else { + /// // give up our send slot, we won't need it for a while + /// tx.disarm(); + /// } + /// if let Some(item) = ready!(item) { + /// tx.try_send(item)?; + /// } else { + /// break; + /// } + /// } + /// ``` + pub fn disarm(&mut self) -> bool { + if self.chan.is_ready() { + self.chan.disarm(); + true + } else { + false + } + } +} diff --git a/third_party/rust/tokio/src/sync/mpsc/chan.rs b/third_party/rust/tokio/src/sync/mpsc/chan.rs new file mode 100644 index 0000000000..3466395788 --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/chan.rs @@ -0,0 +1,524 @@ +use crate::loom::cell::UnsafeCell; +use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::sync::mpsc::error::{ClosedError, TryRecvError}; +use crate::sync::mpsc::{error, list}; + +use std::fmt; +use std::process; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; + +/// Channel sender +pub(crate) struct Tx { + inner: Arc>, + permit: S::Permit, +} + +impl fmt::Debug for Tx +where + S::Permit: fmt::Debug, + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx") + .field("inner", &self.inner) + .field("permit", &self.permit) + .finish() + } +} + +/// Channel receiver +pub(crate) struct Rx { + inner: Arc>, +} + +impl fmt::Debug for Rx +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx").field("inner", &self.inner).finish() + } +} + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum TrySendError { + Closed, + Full, +} + +impl From<(T, TrySendError)> for error::SendError { + fn from(src: (T, TrySendError)) -> error::SendError { + match src.1 { + TrySendError::Closed => error::SendError(src.0), + TrySendError::Full => unreachable!(), + } + } +} + +impl From<(T, TrySendError)> for error::TrySendError { + fn from(src: (T, TrySendError)) -> error::TrySendError { + match src.1 { + TrySendError::Closed => error::TrySendError::Closed(src.0), + TrySendError::Full => error::TrySendError::Full(src.0), + } + } +} + +pub(crate) trait Semaphore { + type Permit; + + fn new_permit() -> Self::Permit; + + /// The permit is dropped without a value being sent. In this case, the + /// permit must be returned to the semaphore. + fn drop_permit(&self, permit: &mut Self::Permit); + + fn is_idle(&self) -> bool; + + fn add_permit(&self); + + fn poll_acquire( + &self, + cx: &mut Context<'_>, + permit: &mut Self::Permit, + ) -> Poll>; + + fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; + + /// A value was sent into the channel and the permit held by `tx` is + /// dropped. In this case, the permit should not immeditely be returned to + /// the semaphore. Instead, the permit is returnred to the semaphore once + /// the sent value is read by the rx handle. + fn forget(&self, permit: &mut Self::Permit); + + fn close(&self); +} + +struct Chan { + /// Handle to the push half of the lock-free list. + tx: list::Tx, + + /// Coordinates access to channel's capacity. + semaphore: S, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, + + /// Tracks the number of outstanding sender handles. + /// + /// When this drops to zero, the send half of the channel is closed. + tx_count: AtomicUsize, + + /// Only accessed by `Rx` handle. + rx_fields: UnsafeCell>, +} + +impl fmt::Debug for Chan +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Chan") + .field("tx", &self.tx) + .field("semaphore", &self.semaphore) + .field("rx_waker", &self.rx_waker) + .field("tx_count", &self.tx_count) + .field("rx_fields", &"...") + .finish() + } +} + +/// Fields only accessed by `Rx` handle. +struct RxFields { + /// Channel receiver. This field is only accessed by the `Receiver` type. + list: list::Rx, + + /// `true` if `Rx::close` is called. + rx_closed: bool, +} + +impl fmt::Debug for RxFields { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("RxFields") + .field("list", &self.list) + .field("rx_closed", &self.rx_closed) + .finish() + } +} + +unsafe impl Send for Chan {} +unsafe impl Sync for Chan {} + +pub(crate) fn channel(semaphore: S) -> (Tx, Rx) +where + S: Semaphore, +{ + let (tx, rx) = list::channel(); + + let chan = Arc::new(Chan { + tx, + semaphore, + rx_waker: AtomicWaker::new(), + tx_count: AtomicUsize::new(1), + rx_fields: UnsafeCell::new(RxFields { + list: rx, + rx_closed: false, + }), + }); + + (Tx::new(chan.clone()), Rx::new(chan)) +} + +// ===== impl Tx ===== + +impl Tx +where + S: Semaphore, +{ + fn new(chan: Arc>) -> Tx { + Tx { + inner: chan, + permit: S::new_permit(), + } + } + + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) + } + + pub(crate) fn disarm(&mut self) { + // TODO: should this error if not acquired? + self.inner.semaphore.drop_permit(&mut self.permit) + } + + /// Send a message and notify the receiver. + pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { + self.inner.try_send(value, &mut self.permit) + } +} + +impl Tx { + pub(crate) fn is_ready(&self) -> bool { + self.permit.is_acquired() + } +} + +impl Tx { + pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> { + self.inner.try_send(value, &mut ()) + } +} + +impl Clone for Tx +where + S: Semaphore, +{ + fn clone(&self) -> Tx { + // Using a Relaxed ordering here is sufficient as the caller holds a + // strong ref to `self`, preventing a concurrent decrement to zero. + self.inner.tx_count.fetch_add(1, Relaxed); + + Tx { + inner: self.inner.clone(), + permit: S::new_permit(), + } + } +} + +impl Drop for Tx +where + S: Semaphore, +{ + fn drop(&mut self) { + self.inner.semaphore.drop_permit(&mut self.permit); + + if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { + return; + } + + // Close the list, which sends a `Close` message + self.inner.tx.close(); + + // Notify the receiver + self.inner.rx_waker.wake(); + } +} + +// ===== impl Rx ===== + +impl Rx +where + S: Semaphore, +{ + fn new(chan: Arc>) -> Rx { + Rx { inner: chan } + } + + pub(crate) fn close(&mut self) { + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + if rx_fields.rx_closed { + return; + } + + rx_fields.rx_closed = true; + }); + + self.inner.semaphore.close(); + } + + /// Receive the next value + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { + use super::block::Read::*; + + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + return Ready(Some(value)); + } + Some(Closed) => { + // TODO: This check may not be required as it most + // likely can only return `true` at this point. A + // channel is closed when all tx handles are + // dropped. Dropping a tx handle releases memory, + // which ensures that if dropping the tx handle is + // visible, then all messages sent are also visible. + assert!(self.inner.semaphore.is_idle()); + return Ready(None); + } + None => {} // fall through + } + }; + } + + try_recv!(); + + self.inner.rx_waker.register_by_ref(cx.waker()); + + // It is possible that a value was pushed between attempting to read + // and registering the task, so we have to check the channel a + // second time here. + try_recv!(); + + if rx_fields.rx_closed && self.inner.semaphore.is_idle() { + Ready(None) + } else { + Pending + } + }) + } + + /// Receives the next value without blocking + pub(crate) fn try_recv(&mut self) -> Result { + use super::block::Read::*; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + Ok(value) + } + Some(Closed) => Err(TryRecvError::Closed), + None => Err(TryRecvError::Empty), + } + }) + } +} + +impl Drop for Rx +where + S: Semaphore, +{ + fn drop(&mut self) { + use super::block::Read::Value; + + self.close(); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { + self.inner.semaphore.add_permit(); + } + }) + } +} + +// ===== impl Chan ===== + +impl Chan +where + S: Semaphore, +{ + fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> { + if let Err(e) = self.semaphore.try_acquire(permit) { + return Err((value, e)); + } + + // Push the value + self.tx.push(value); + + // Notify the rx task + self.rx_waker.wake(); + + // Release the permit + self.semaphore.forget(permit); + + Ok(()) + } +} + +impl Drop for Chan { + fn drop(&mut self) { + use super::block::Read::Value; + + // Safety: the only owner of the rx fields is Chan, and eing + // inside its own Drop means we're the last ones to touch it. + self.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} + unsafe { rx_fields.list.free_blocks() }; + }); + } +} + +use crate::sync::semaphore_ll::TryAcquireError; + +impl From for TrySendError { + fn from(src: TryAcquireError) -> TrySendError { + if src.is_closed() { + TrySendError::Closed + } else if src.is_no_permits() { + TrySendError::Full + } else { + unreachable!(); + } + } +} + +// ===== impl Semaphore for (::Semaphore, capacity) ===== + +use crate::sync::semaphore_ll::Permit; + +impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { + type Permit = Permit; + + fn new_permit() -> Permit { + Permit::new() + } + + fn drop_permit(&self, permit: &mut Permit) { + permit.release(1, &self.0); + } + + fn add_permit(&self) { + self.0.add_permits(1) + } + + fn is_idle(&self) -> bool { + self.0.available_permits() == self.1 + } + + fn poll_acquire( + &self, + cx: &mut Context<'_>, + permit: &mut Permit, + ) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + permit + .poll_acquire(cx, 1, &self.0) + .map_err(|_| ClosedError::new()) + } + + fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { + permit.try_acquire(1, &self.0)?; + Ok(()) + } + + fn forget(&self, permit: &mut Self::Permit) { + permit.forget(1); + } + + fn close(&self) { + self.0.close(); + } +} + +// ===== impl Semaphore for AtomicUsize ===== + +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::usize; + +impl Semaphore for AtomicUsize { + type Permit = (); + + fn new_permit() {} + + fn drop_permit(&self, _permit: &mut ()) {} + + fn add_permit(&self) { + let prev = self.fetch_sub(2, Release); + + if prev >> 1 == 0 { + // Something went wrong + process::abort(); + } + } + + fn is_idle(&self) -> bool { + self.load(Acquire) >> 1 == 0 + } + + fn poll_acquire( + &self, + _cx: &mut Context<'_>, + permit: &mut (), + ) -> Poll> { + Ready(self.try_acquire(permit).map_err(|_| ClosedError::new())) + } + + fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { + let mut curr = self.load(Acquire); + + loop { + if curr & 1 == 1 { + return Err(TrySendError::Closed); + } + + if curr == usize::MAX ^ 1 { + // Overflowed the ref count. There is no safe way to recover, so + // abort the process. In practice, this should never happen. + process::abort() + } + + match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) { + Ok(_) => return Ok(()), + Err(actual) => { + curr = actual; + } + } + } + } + + fn forget(&self, _permit: &mut ()) {} + + fn close(&self) { + self.fetch_or(1, Release); + } +} diff --git a/third_party/rust/tokio/src/sync/mpsc/error.rs b/third_party/rust/tokio/src/sync/mpsc/error.rs new file mode 100644 index 0000000000..72c42aa53e --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/error.rs @@ -0,0 +1,146 @@ +//! Channel error types + +use std::error::Error; +use std::fmt; + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError(pub T); + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl std::error::Error for SendError {} + +// ===== TrySendError ===== + +/// This enumeration is the list of the possible error outcomes for the +/// [try_send](super::Sender::try_send) method. +#[derive(Debug)] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), +} + +impl Error for TrySendError {} + +impl fmt::Display for TrySendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TrySendError::Full(..) => "no available capacity", + TrySendError::Closed(..) => "channel closed", + } + ) + } +} + +impl From> for TrySendError { + fn from(src: SendError) -> TrySendError { + TrySendError::Closed(src.0) + } +} + +// ===== RecvError ===== + +/// Error returned by `Receiver`. +#[derive(Debug)] +pub struct RecvError(()); + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl Error for RecvError {} + +// ===== TryRecvError ===== + +/// This enumeration is the list of the possible reasons that try_recv +/// could not return data when called. +#[derive(Debug, PartialEq)] +pub enum TryRecvError { + /// This channel is currently empty, but the Sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// The channel's sending half has been closed, and there will + /// never be any more data received on it. + Closed, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TryRecvError::Empty => "channel empty", + TryRecvError::Closed => "channel closed", + } + ) + } +} + +impl Error for TryRecvError {} + +// ===== ClosedError ===== + +/// Error returned by [`Sender::poll_ready`](super::Sender::poll_ready). +#[derive(Debug)] +pub struct ClosedError(()); + +impl ClosedError { + pub(crate) fn new() -> ClosedError { + ClosedError(()) + } +} + +impl fmt::Display for ClosedError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl Error for ClosedError {} + +cfg_time! { + // ===== SendTimeoutError ===== + + #[derive(Debug)] + /// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)]. + pub enum SendTimeoutError { + /// The data could not be sent on the channel because the channel is + /// full, and the timeout to send has elapsed. + Timeout(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), + } + + impl Error for SendTimeoutError {} + + impl fmt::Display for SendTimeoutError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + SendTimeoutError::Timeout(..) => "timed out waiting on send operation", + SendTimeoutError::Closed(..) => "channel closed", + } + ) + } + } +} diff --git a/third_party/rust/tokio/src/sync/mpsc/list.rs b/third_party/rust/tokio/src/sync/mpsc/list.rs new file mode 100644 index 0000000000..53f82a25ef --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/list.rs @@ -0,0 +1,341 @@ +//! A concurrent, lock-free, FIFO list. + +use crate::loom::{ + sync::atomic::{AtomicPtr, AtomicUsize}, + thread, +}; +use crate::sync::mpsc::block::{self, Block}; + +use std::fmt; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; + +/// List queue transmit handle +pub(crate) struct Tx { + /// Tail in the `Block` mpmc list. + block_tail: AtomicPtr>, + + /// Position to push the next message. This reference a block and offset + /// into the block. + tail_position: AtomicUsize, +} + +/// List queue receive handle +pub(crate) struct Rx { + /// Pointer to the block being processed + head: NonNull>, + + /// Next slot index to process + index: usize, + + /// Pointer to the next block pending release + free_head: NonNull>, +} + +pub(crate) fn channel() -> (Tx, Rx) { + // Create the initial block shared between the tx and rx halves. + let initial_block = Box::new(Block::new(0)); + let initial_block_ptr = Box::into_raw(initial_block); + + let tx = Tx { + block_tail: AtomicPtr::new(initial_block_ptr), + tail_position: AtomicUsize::new(0), + }; + + let head = NonNull::new(initial_block_ptr).unwrap(); + + let rx = Rx { + head, + index: 0, + free_head: head, + }; + + (tx, rx) +} + +impl Tx { + /// Pushes a value into the list. + pub(crate) fn push(&self, value: T) { + // First, claim a slot for the value. `Acquire` is used here to + // synchronize with the `fetch_add` in `reclaim_blocks`. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + // Load the current block and write the value + let block = self.find_block(slot_index); + + unsafe { + // Write the value to the block + block.as_ref().write(slot_index, value); + } + } + + /// Closes the send half of the list + /// + /// Similar process as pushing a value, but instead of writing the value & + /// setting the ready flag, the TX_CLOSED flag is set on the block. + pub(crate) fn close(&self) { + // First, claim a slot for the value. This is the last slot that will be + // claimed. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + let block = self.find_block(slot_index); + + unsafe { block.as_ref().tx_close() } + } + + fn find_block(&self, slot_index: usize) -> NonNull> { + // The start index of the block that contains `index`. + let start_index = block::start_index(slot_index); + + // The index offset into the block + let offset = block::offset(slot_index); + + // Load the current head of the block + let mut block_ptr = self.block_tail.load(Acquire); + + let block = unsafe { &*block_ptr }; + + // Calculate the distance between the tail ptr and the target block + let distance = block.distance(start_index); + + // Decide if this call to `find_block` should attempt to update the + // `block_tail` pointer. + // + // Updating `block_tail` is not always performed in order to reduce + // contention. + // + // When set, as the routine walks the linked list, it attempts to update + // `block_tail`. If the update cannot be performed, `try_updating_tail` + // is unset. + let mut try_updating_tail = distance > offset; + + // Walk the linked list of blocks until the block with `start_index` is + // found. + loop { + let block = unsafe { &(*block_ptr) }; + + if block.is_at_index(start_index) { + return unsafe { NonNull::new_unchecked(block_ptr) }; + } + + let next_block = block + .load_next(Acquire) + // There is no allocated next block, grow the linked list. + .unwrap_or_else(|| block.grow()); + + // If the block is **not** final, then the tail pointer cannot be + // advanced any more. + try_updating_tail &= block.is_final(); + + if try_updating_tail { + // Advancing `block_tail` must happen when walking the linked + // list. `block_tail` may not advance passed any blocks that are + // not "final". At the point a block is finalized, it is unknown + // if there are any prior blocks that are unfinalized, which + // makes it impossible to advance `block_tail`. + // + // While walking the linked list, `block_tail` can be advanced + // as long as finalized blocks are traversed. + // + // Release ordering is used to ensure that any subsequent reads + // are able to see the memory pointed to by `block_tail`. + // + // Acquire is not needed as any "actual" value is not accessed. + // At this point, the linked list is walked to acquire blocks. + let actual = + self.block_tail + .compare_and_swap(block_ptr, next_block.as_ptr(), Release); + + if actual == block_ptr { + // Synchronize with any senders + let tail_position = self.tail_position.fetch_add(0, Release); + + unsafe { + block.tx_release(tail_position); + } + } else { + // A concurrent sender is also working on advancing + // `block_tail` and this thread is falling behind. + // + // Stop trying to advance the tail pointer + try_updating_tail = false; + } + } + + block_ptr = next_block.as_ptr(); + + thread::yield_now(); + } + } + + pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull>) { + // The block has been removed from the linked list and ownership + // is reclaimed. + // + // Before dropping the block, see if it can be reused by + // inserting it back at the end of the linked list. + // + // First, reset the data + block.as_mut().reclaim(); + + let mut reused = false; + + // Attempt to insert the block at the end + // + // Walk at most three times + // + let curr_ptr = self.block_tail.load(Acquire); + + // The pointer can never be null + debug_assert!(!curr_ptr.is_null()); + + let mut curr = NonNull::new_unchecked(curr_ptr); + + // TODO: Unify this logic with Block::grow + for _ in 0..3 { + match curr.as_ref().try_push(&mut block, AcqRel) { + Ok(_) => { + reused = true; + break; + } + Err(next) => { + curr = next; + } + } + } + + if !reused { + let _ = Box::from_raw(block.as_ptr()); + } + } +} + +impl fmt::Debug for Tx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx") + .field("block_tail", &self.block_tail.load(Relaxed)) + .field("tail_position", &self.tail_position.load(Relaxed)) + .finish() + } +} + +impl Rx { + /// Pops the next value off the queue + pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { + // Advance `head`, if needed + if !self.try_advancing_head() { + return None; + } + + self.reclaim_blocks(tx); + + unsafe { + let block = self.head.as_ref(); + + let ret = block.read(self.index); + + if let Some(block::Read::Value(..)) = ret { + self.index = self.index.wrapping_add(1); + } + + ret + } + } + + /// Tries advancing the block pointer to the block referenced by `self.index`. + /// + /// Returns `true` if successful, `false` if there is no next block to load. + fn try_advancing_head(&mut self) -> bool { + let block_index = block::start_index(self.index); + + loop { + let next_block = { + let block = unsafe { self.head.as_ref() }; + + if block.is_at_index(block_index) { + return true; + } + + block.load_next(Acquire) + }; + + let next_block = match next_block { + Some(next_block) => next_block, + None => { + return false; + } + }; + + self.head = next_block; + + thread::yield_now(); + } + } + + fn reclaim_blocks(&mut self, tx: &Tx) { + while self.free_head != self.head { + unsafe { + // Get a handle to the block that will be freed and update + // `free_head` to point to the next block. + let block = self.free_head; + + let observed_tail_position = block.as_ref().observed_tail_position(); + + let required_index = match observed_tail_position { + Some(i) => i, + None => return, + }; + + if required_index > self.index { + return; + } + + // We may read the next pointer with `Relaxed` ordering as it is + // guaranteed that the `reclaim_blocks` routine trails the `recv` + // routine. Any memory accessed by `reclaim_blocks` has already + // been acquired by `recv`. + let next_block = block.as_ref().load_next(Relaxed); + + // Update the free list head + self.free_head = next_block.unwrap(); + + // Push the emptied block onto the back of the queue, making it + // available to senders. + tx.reclaim_block(block); + } + + thread::yield_now(); + } + } + + /// Effectively `Drop` all the blocks. Should only be called once, when + /// the list is dropping. + pub(super) unsafe fn free_blocks(&mut self) { + debug_assert_ne!(self.free_head, NonNull::dangling()); + + let mut cur = Some(self.free_head); + + #[cfg(debug_assertions)] + { + // to trigger the debug assert above so as to catch that we + // don't call `free_blocks` more than once. + self.free_head = NonNull::dangling(); + self.head = NonNull::dangling(); + } + + while let Some(block) = cur { + cur = block.as_ref().load_next(Relaxed); + drop(Box::from_raw(block.as_ptr())); + } + } +} + +impl fmt::Debug for Rx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx") + .field("head", &self.head) + .field("index", &self.index) + .field("free_head", &self.free_head) + .finish() + } +} diff --git a/third_party/rust/tokio/src/sync/mpsc/mod.rs b/third_party/rust/tokio/src/sync/mpsc/mod.rs new file mode 100644 index 0000000000..4cfd6150f3 --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/mod.rs @@ -0,0 +1,64 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + +//! A multi-producer, single-consumer queue for sending values across +//! asynchronous tasks. +//! +//! Similar to `std`, channel creation provides [`Receiver`] and [`Sender`] +//! handles. [`Receiver`] implements `Stream` and allows a task to read values +//! out of the channel. If there is no message to read, the current task will be +//! notified when a new value is sent. [`Sender`] implements the `Sink` trait +//! and allows sending messages into the channel. If the channel is at capacity, +//! the send is rejected and the task will be notified when additional capacity +//! is available. In other words, the channel provides backpressure. +//! +//! Unbounded channels are also available using the `unbounded_channel` +//! constructor. +//! +//! # Disconnection +//! +//! When all [`Sender`] handles have been dropped, it is no longer +//! possible to send values into the channel. This is considered the termination +//! event of the stream. As such, `Receiver::poll` returns `Ok(Ready(None))`. +//! +//! If the [`Receiver`] handle is dropped, then messages can no longer +//! be read out of the channel. In this case, all further attempts to send will +//! result in an error. +//! +//! # Clean Shutdown +//! +//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to +//! remain in the channel. Instead, it is usually desirable to perform a "clean" +//! shutdown. To do this, the receiver first calls `close`, which will prevent +//! any further messages to be sent into the channel. Then, the receiver +//! consumes the channel to completion, at which point the receiver can be +//! dropped. +//! +//! [`Sender`]: crate::sync::mpsc::Sender +//! [`Receiver`]: crate::sync::mpsc::Receiver + +pub(super) mod block; + +mod bounded; +pub use self::bounded::{channel, Receiver, Sender}; + +mod chan; + +pub(super) mod list; + +mod unbounded; +pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub mod error; + +/// The number of values a block can contain. +/// +/// This value must be a power of 2. It also must be smaller than the number of +/// bits in `usize`. +#[cfg(all(target_pointer_width = "64", not(loom)))] +const BLOCK_CAP: usize = 32; + +#[cfg(all(not(target_pointer_width = "64"), not(loom)))] +const BLOCK_CAP: usize = 16; + +#[cfg(loom)] +const BLOCK_CAP: usize = 2; diff --git a/third_party/rust/tokio/src/sync/mpsc/unbounded.rs b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs new file mode 100644 index 0000000000..ba543fe4c8 --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs @@ -0,0 +1,176 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::{SendError, TryRecvError}; + +use std::fmt; +use std::task::{Context, Poll}; + +/// Send values to the associated `UnboundedReceiver`. +/// +/// Instances are created by the +/// [`unbounded_channel`](unbounded_channel) function. +pub struct UnboundedSender { + chan: chan::Tx, +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + UnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl fmt::Debug for UnboundedSender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedSender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `UnboundedSender`. +/// +/// Instances are created by the +/// [`unbounded_channel`](unbounded_channel) function. +pub struct UnboundedReceiver { + /// The channel receiver + chan: chan::Rx, +} + +impl fmt::Debug for UnboundedReceiver { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedReceiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Creates an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { + let (tx, rx) = chan::channel(AtomicUsize::new(0)); + + let tx = UnboundedSender::new(tx); + let rx = UnboundedReceiver::new(rx); + + (tx, rx) +} + +/// No capacity +type Semaphore = AtomicUsize; + +impl UnboundedReceiver { + pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { + UnboundedReceiver { chan } + } + + #[doc(hidden)] // TODO: doc + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + + /// Receives the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tokio::spawn(async move { + /// tx.send("hello").unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.send("hello").unwrap(); + /// tx.send("world").unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option { + use crate::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with + /// a possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with recv, this function has two failure cases instead of + /// one (one for disconnection, one for an empty buffer). + pub fn try_recv(&mut self) -> Result { + self.chan.try_recv() + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +#[cfg(feature = "stream")] +impl crate::stream::Stream for UnboundedReceiver { + type Item = T; + + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_recv(cx) + } +} + +impl UnboundedSender { + pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { + UnboundedSender { chan } + } + + /// Attempts to send a message on this `UnboundedSender` without blocking. + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`UnboundedReceiver`] having been dropped, + /// the function returns an error. The error includes the value passed to `send`. + /// + /// [`close`]: UnboundedReceiver::close + /// [`UnboundedReceiver`]: UnboundedReceiver + pub fn send(&self, message: T) -> Result<(), SendError> { + self.chan.send_unbounded(message)?; + Ok(()) + } +} -- cgit v1.2.3