summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/mpsc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/mpsc')
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/block.rs385
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/bounded.rs1197
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/chan.rs405
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/error.rs125
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/list.rs371
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/mod.rs115
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/unbounded.rs373
7 files changed, 2971 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/mpsc/block.rs b/third_party/rust/tokio/src/sync/mpsc/block.rs
new file mode 100644
index 0000000000..58f4a9f6cc
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/block.rs
@@ -0,0 +1,385 @@
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
+
+use std::mem::MaybeUninit;
+use std::ops;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+pub(crate) struct Block<T> {
+ /// The start index of this block.
+ ///
+ /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
+ start_index: usize,
+
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Bitfield tracking slots that are ready to have their values consumed.
+ ready_slots: AtomicUsize,
+
+ /// The observed `tail_position` value *after* the block has been passed by
+ /// `block_tail`.
+ observed_tail_position: UnsafeCell<usize>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+pub(crate) enum Read<T> {
+ Value(T),
+ Closed,
+}
+
+struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
+
+use super::BLOCK_CAP;
+
+/// Masks an index to get the block identifier.
+const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
+
+/// Masks an index to get the value offset in a block.
+const SLOT_MASK: usize = BLOCK_CAP - 1;
+
+/// Flag tracking that a block has gone through the sender's release routine.
+///
+/// When this is set, the receiver may consider freeing the block.
+const RELEASED: usize = 1 << BLOCK_CAP;
+
+/// Flag tracking all senders dropped.
+///
+/// When this flag is set, the send half of the channel has closed.
+const TX_CLOSED: usize = RELEASED << 1;
+
+/// Mask covering all bits used to track slot readiness.
+const READY_MASK: usize = RELEASED - 1;
+
+/// Returns the index of the first slot in the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn start_index(slot_index: usize) -> usize {
+ BLOCK_MASK & slot_index
+}
+
+/// Returns the offset into the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn offset(slot_index: usize) -> usize {
+ SLOT_MASK & slot_index
+}
+
+impl<T> Block<T> {
+ pub(crate) fn new(start_index: usize) -> Block<T> {
+ Block {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
+
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
+
+ ready_slots: AtomicUsize::new(0),
+
+ observed_tail_position: UnsafeCell::new(0),
+
+ // Value storage
+ values: unsafe { Values::uninitialized() },
+ }
+ }
+
+ /// Returns `true` if the block matches the given index.
+ pub(crate) fn is_at_index(&self, index: usize) -> bool {
+ debug_assert!(offset(index) == 0);
+ self.start_index == index
+ }
+
+ /// Returns the number of blocks between `self` and the block at the
+ /// specified index.
+ ///
+ /// `start_index` must represent a block *after* `self`.
+ pub(crate) fn distance(&self, other_index: usize) -> usize {
+ debug_assert!(offset(other_index) == 0);
+ other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ }
+
+ /// Reads the value at the given offset.
+ ///
+ /// Returns `None` if the slot is empty.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
+ let offset = offset(slot_index);
+
+ let ready_bits = self.ready_slots.load(Acquire);
+
+ if !is_ready(ready_bits, offset) {
+ if is_tx_closed(ready_bits) {
+ return Some(Read::Closed);
+ }
+
+ return None;
+ }
+
+ // Get the value
+ let value = self.values[offset].with(|ptr| ptr::read(ptr));
+
+ Some(Read::Value(value.assume_init()))
+ }
+
+ /// Writes a value to the block at the given offset.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The slot is empty.
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
+ // Get the offset into the block
+ let slot_offset = offset(slot_index);
+
+ self.values[slot_offset].with_mut(|ptr| {
+ ptr::write(ptr, MaybeUninit::new(value));
+ });
+
+ // Release the value. After this point, the slot ref may no longer
+ // be used. It is possible for the receiver to free the memory at
+ // any point.
+ self.set_ready(slot_offset);
+ }
+
+ /// Signal to the receiver that the sender half of the list is closed.
+ pub(crate) unsafe fn tx_close(&self) {
+ self.ready_slots.fetch_or(TX_CLOSED, Release);
+ }
+
+ /// Resets the block to a blank state. This enables reusing blocks in the
+ /// channel.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * All slots are empty.
+ /// * The caller holds a unique pointer to the block.
+ pub(crate) unsafe fn reclaim(&mut self) {
+ self.start_index = 0;
+ self.next = AtomicPtr::new(ptr::null_mut());
+ self.ready_slots = AtomicUsize::new(0);
+ }
+
+ /// Releases the block to the rx half for freeing.
+ ///
+ /// This function is called by the tx half once it can be guaranteed that no
+ /// more senders will attempt to access the block.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The block will no longer be accessed by any sender.
+ pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
+ // Track the observed tail_position. Any sender targeting a greater
+ // tail_position is guaranteed to not access this block.
+ self.observed_tail_position
+ .with_mut(|ptr| *ptr = tail_position);
+
+ // Set the released bit, signalling to the receiver that it is safe to
+ // free the block's memory as soon as all slots **prior** to
+ // `observed_tail_position` have been filled.
+ self.ready_slots.fetch_or(RELEASED, Release);
+ }
+
+ /// Mark a slot as ready
+ fn set_ready(&self, slot: usize) {
+ let mask = 1 << slot;
+ self.ready_slots.fetch_or(mask, Release);
+ }
+
+ /// Returns `true` when all slots have their `ready` bits set.
+ ///
+ /// This indicates that the block is in its final state and will no longer
+ /// be mutated.
+ ///
+ /// # Implementation
+ ///
+ /// The implementation walks each slot checking the `ready` flag. It might
+ /// be that it would make more sense to coalesce ready flags as bits in a
+ /// single atomic cell. However, this could have negative impact on cache
+ /// behavior as there would be many more mutations to a single slot.
+ pub(crate) fn is_final(&self) -> bool {
+ self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ }
+
+ /// Returns the `observed_tail_position` value, if set
+ pub(crate) fn observed_tail_position(&self) -> Option<usize> {
+ if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ None
+ } else {
+ Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ }
+ }
+
+ /// Loads the next block
+ pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
+ let ret = NonNull::new(self.next.load(ordering));
+
+ debug_assert!(unsafe {
+ ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
+ .unwrap_or(true)
+ });
+
+ ret
+ }
+
+ /// Pushes `block` as the next block in the link.
+ ///
+ /// Returns Ok if successful, otherwise, a pointer to the next block in
+ /// the list is returned.
+ ///
+ /// This requires that the next pointer is null.
+ ///
+ /// # Ordering
+ ///
+ /// This performs a compare-and-swap on `next` using AcqRel ordering.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * `block` is not freed until it has been removed from the list.
+ pub(crate) unsafe fn try_push(
+ &self,
+ block: &mut NonNull<Block<T>>,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<(), NonNull<Block<T>>> {
+ block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+
+ let next_ptr = self
+ .next
+ .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
+ .unwrap_or_else(|x| x);
+
+ match NonNull::new(next_ptr) {
+ Some(next_ptr) => Err(next_ptr),
+ None => Ok(()),
+ }
+ }
+
+ /// Grows the `Block` linked list by allocating and appending a new block.
+ ///
+ /// The next block in the linked list is returned. This may or may not be
+ /// the one allocated by the function call.
+ ///
+ /// # Implementation
+ ///
+ /// It is assumed that `self.next` is null. A new block is allocated with
+ /// `start_index` set to be the next block. A compare-and-swap is performed
+ /// with AcqRel memory ordering. If the compare-and-swap is successful, the
+ /// newly allocated block is released to other threads walking the block
+ /// linked list. If the compare-and-swap fails, the current thread acquires
+ /// the next block in the linked list, allowing the current thread to access
+ /// the slots.
+ pub(crate) fn grow(&self) -> NonNull<Block<T>> {
+ // Create the new block. It is assumed that the block will become the
+ // next one after `&self`. If this turns out to not be the case,
+ // `start_index` is updated accordingly.
+ let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+
+ let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
+
+ // Attempt to store the block. The first compare-and-swap attempt is
+ // "unrolled" due to minor differences in logic
+ //
+ // `AcqRel` is used as the ordering **only** when attempting the
+ // compare-and-swap on self.next.
+ //
+ // If the compare-and-swap fails, then the actual value of the cell is
+ // returned from this function and accessed by the caller. Given this,
+ // the memory must be acquired.
+ //
+ // `Release` ensures that the newly allocated block is available to
+ // other threads acquiring the next pointer.
+ let next = NonNull::new(
+ self.next
+ .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
+ .unwrap_or_else(|x| x),
+ );
+
+ let next = match next {
+ Some(next) => next,
+ None => {
+ // The compare-and-swap succeeded and the newly allocated block
+ // is successfully pushed.
+ return new_block;
+ }
+ };
+
+ // There already is a next block in the linked list. The newly allocated
+ // block could be dropped and the discovered next block returned;
+ // however, that would be wasteful. Instead, the linked list is walked
+ // by repeatedly attempting to compare-and-swap the pointer into the
+ // `next` register until the compare-and-swap succeed.
+ //
+ // Care is taken to update new_block's start_index field as appropriate.
+
+ let mut curr = next;
+
+ // TODO: Should this iteration be capped?
+ loop {
+ let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
+
+ curr = match actual {
+ Ok(_) => {
+ return next;
+ }
+ Err(curr) => curr,
+ };
+
+ crate::loom::thread::yield_now();
+ }
+ }
+}
+
+/// Returns `true` if the specified slot has a value ready to be consumed.
+fn is_ready(bits: usize, slot: usize) -> bool {
+ let mask = 1 << slot;
+ mask == mask & bits
+}
+
+/// Returns `true` if the closed flag has been set.
+fn is_tx_closed(bits: usize) -> bool {
+ TX_CLOSED == bits & TX_CLOSED
+}
+
+impl<T> Values<T> {
+ unsafe fn uninitialized() -> Values<T> {
+ let mut vals = MaybeUninit::uninit();
+
+ // When fuzzing, `UnsafeCell` needs to be initialized.
+ if_loom! {
+ let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
+ for i in 0..BLOCK_CAP {
+ p.add(i)
+ .write(UnsafeCell::new(MaybeUninit::uninit()));
+ }
+ }
+
+ Values(vals.assume_init())
+ }
+}
+
+impl<T> ops::Index<usize> for Values<T> {
+ type Output = UnsafeCell<MaybeUninit<T>>;
+
+ fn index(&self, index: usize) -> &Self::Output {
+ self.0.index(index)
+ }
+}
diff --git a/third_party/rust/tokio/src/sync/mpsc/bounded.rs b/third_party/rust/tokio/src/sync/mpsc/bounded.rs
new file mode 100644
index 0000000000..ddded8ebb3
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/bounded.rs
@@ -0,0 +1,1197 @@
+use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
+use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
+
+cfg_time! {
+ use crate::sync::mpsc::error::SendTimeoutError;
+ use crate::time::Duration;
+}
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// Sends values to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](channel) function.
+///
+/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
+/// use the [`PollSender`] utility.
+///
+/// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
+pub struct Sender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+/// Permits to send one value into the channel.
+///
+/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
+/// and are used to guarantee channel capacity before generating a message to send.
+///
+/// [`Sender::reserve()`]: Sender::reserve
+/// [`Sender::try_reserve()`]: Sender::try_reserve
+pub struct Permit<'a, T> {
+ chan: &'a chan::Tx<T, Semaphore>,
+}
+
+/// Owned permit to send one value into the channel.
+///
+/// This is identical to the [`Permit`] type, except that it moves the sender
+/// rather than borrowing it.
+///
+/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
+/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
+/// before generating a message to send.
+///
+/// [`Permit`]: Permit
+/// [`Sender::reserve_owned()`]: Sender::reserve_owned
+/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
+pub struct OwnedPermit<T> {
+ chan: Option<chan::Tx<T, Semaphore>>,
+}
+
+/// Receives values from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](channel) function.
+///
+/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
+///
+/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
+pub struct Receiver<T> {
+ /// The channel receiver.
+ chan: chan::Rx<T, Semaphore>,
+}
+
+/// Creates a bounded mpsc channel for communicating between asynchronous tasks
+/// with backpressure.
+///
+/// The channel will buffer up to the provided number of messages. Once the
+/// buffer is full, attempts to send new messages will wait until a message is
+/// received from the channel. The provided buffer capacity must be at least 1.
+///
+/// All data sent on `Sender` will become available on `Receiver` in the same
+/// order as it was sent.
+///
+/// The `Sender` can be cloned to `send` to the same channel from multiple code
+/// locations. Only one `Receiver` is supported.
+///
+/// If the `Receiver` is disconnected while trying to `send`, the `send` method
+/// will return a `SendError`. Similarly, if `Sender` is disconnected while
+/// trying to `recv`, the `recv` method will return `None`.
+///
+/// # Panics
+///
+/// Panics if the buffer capacity is 0.
+///
+/// # Examples
+///
+/// ```rust
+/// use tokio::sync::mpsc;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, mut rx) = mpsc::channel(100);
+///
+/// tokio::spawn(async move {
+/// for i in 0..10 {
+/// if let Err(_) = tx.send(i).await {
+/// println!("receiver dropped");
+/// return;
+/// }
+/// }
+/// });
+///
+/// while let Some(i) = rx.recv().await {
+/// println!("got = {}", i);
+/// }
+/// }
+/// ```
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
+ let semaphore = (semaphore::Semaphore::new(buffer), buffer);
+ let (tx, rx) = chan::channel(semaphore);
+
+ let tx = Sender::new(tx);
+ let rx = Receiver::new(rx);
+
+ (tx, rx)
+}
+
+/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
+/// representing the channel bound.
+type Semaphore = (semaphore::Semaphore, usize);
+
+impl<T> Receiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
+ Receiver { chan }
+ }
+
+ /// Receives the next value for this receiver.
+ ///
+ /// This method returns `None` if the channel has been closed and there are
+ /// no remaining messages in the channel's buffer. This indicates that no
+ /// further values can ever be received from this `Receiver`. The channel is
+ /// closed when all senders have been dropped, or when [`close`] is called.
+ ///
+ /// If there are no messages in the channel's buffer, but the channel has
+ /// not yet been closed, this method will sleep until a message is sent or
+ /// the channel is closed. Note that if [`close`] is called, but there are
+ /// still outstanding [`Permits`] from before it was closed, the channel is
+ /// not considered closed by `recv` until the permits are released.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `recv` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, it is guaranteed that no messages were received on this
+ /// channel.
+ ///
+ /// [`close`]: Self::close
+ /// [`Permits`]: struct@crate::sync::mpsc::Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// tx.send("world").await.unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use crate::future::poll_fn;
+ poll_fn(|cx| self.chan.recv(cx)).await
+ }
+
+ /// Tries to receive the next value for this receiver.
+ ///
+ /// This method returns the [`Empty`] error if the channel is currently
+ /// empty, but there are still outstanding [senders] or [permits].
+ ///
+ /// This method returns the [`Disconnected`] error if the channel is
+ /// currently empty, and there are no outstanding [senders] or [permits].
+ ///
+ /// Unlike the [`poll_recv`] method, this method will never return an
+ /// [`Empty`] error spuriously.
+ ///
+ /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
+ /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
+ /// [`poll_recv`]: Self::poll_recv
+ /// [senders]: crate::sync::mpsc::Sender
+ /// [permits]: crate::sync::mpsc::Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ /// use tokio::sync::mpsc::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// // Drop the last sender, closing the channel.
+ /// drop(tx);
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
+ /// Blocking receive to call outside of asynchronous contexts.
+ ///
+ /// This method returns `None` if the channel has been closed and there are
+ /// no remaining messages in the channel's buffer. This indicates that no
+ /// further values can ever be received from this `Receiver`. The channel is
+ /// closed when all senders have been dropped, or when [`close`] is called.
+ ///
+ /// If there are no messages in the channel's buffer, but the channel has
+ /// not yet been closed, this method will block until a message is sent or
+ /// the channel is closed.
+ ///
+ /// This method is intended for use cases where you are sending from
+ /// asynchronous code to synchronous code, and will work even if the sender
+ /// is not using [`blocking_send`] to send the message.
+ ///
+ /// Note that if [`close`] is called, but there are still outstanding
+ /// [`Permits`] from before it was closed, the channel is not considered
+ /// closed by `blocking_recv` until the permits are released.
+ ///
+ /// [`close`]: Self::close
+ /// [`Permits`]: struct@crate::sync::mpsc::Permit
+ /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::runtime::Runtime;
+ /// use tokio::sync::mpsc;
+ ///
+ /// fn main() {
+ /// let (tx, mut rx) = mpsc::channel::<u8>(10);
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// assert_eq!(Some(10), rx.blocking_recv());
+ /// });
+ ///
+ /// Runtime::new()
+ /// .unwrap()
+ /// .block_on(async move {
+ /// let _ = tx.send(10).await;
+ /// });
+ /// sync_code.join().unwrap()
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_recv(&mut self) -> Option<T> {
+ crate::future::block_on(self.recv())
+ }
+
+ /// Closes the receiving half of a channel without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered. Any
+ /// outstanding [`Permit`] values will still be able to send messages.
+ ///
+ /// To guarantee that no messages are dropped, after calling `close()`,
+ /// `recv()` must be called until `None` is returned. If there are
+ /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
+ /// not return `None` until those are released.
+ ///
+ /// [`Permit`]: Permit
+ /// [`OwnedPermit`]: OwnedPermit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(20);
+ ///
+ /// tokio::spawn(async move {
+ /// let mut i = 0;
+ /// while let Ok(permit) = tx.reserve().await {
+ /// permit.send(i);
+ /// i += 1;
+ /// }
+ /// });
+ ///
+ /// rx.close();
+ ///
+ /// while let Some(msg) = rx.recv().await {
+ /// println!("got {}", msg);
+ /// }
+ ///
+ /// // Channel closed and no messages are lost.
+ /// }
+ /// ```
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+
+ /// Polls to receive the next message on this channel.
+ ///
+ /// This method returns:
+ ///
+ /// * `Poll::Pending` if no messages are available but the channel is not
+ /// closed, or if a spurious failure happens.
+ /// * `Poll::Ready(Some(message))` if a message is available.
+ /// * `Poll::Ready(None)` if the channel has been closed and all messages
+ /// sent before it was closed have been received.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided
+ /// `Context` is scheduled to receive a wakeup when a message is sent on any
+ /// receiver, or when the channel is closed. Note that on multiple calls to
+ /// `poll_recv`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup.
+ ///
+ /// If this method returns `Poll::Pending` due to a spurious failure, then
+ /// the `Waker` will be notified when the situation causing the spurious
+ /// failure has been resolved. Note that receiving such a wakeup does not
+ /// guarantee that the next call will succeed — it could fail with another
+ /// spurious failure.
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+impl<T> Unpin for Receiver<T> {}
+
+impl<T> Sender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
+ Sender { chan }
+ }
+
+ /// Sends a value, waiting until there is capacity.
+ ///
+ /// A successful send occurs when it is determined that the other end of the
+ /// channel has not hung up already. An unsuccessful send would be one where
+ /// the corresponding receiver has already been closed. Note that a return
+ /// value of `Err` means that the data will never be received, but a return
+ /// value of `Ok` does not mean that the data will be received. It is
+ /// possible for the corresponding receiver to hang up immediately after
+ /// this function returns `Ok`.
+ ///
+ /// # Errors
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`Receiver`] handle dropping, the function returns
+ /// an error. The error includes the value passed to `send`.
+ ///
+ /// [`close`]: Receiver::close
+ /// [`Receiver`]: Receiver
+ ///
+ /// # Cancel safety
+ ///
+ /// If `send` is used as the event in a [`tokio::select!`](crate::select)
+ /// statement and some other branch completes first, then it is guaranteed
+ /// that the message was not sent.
+ ///
+ /// This channel uses a queue to ensure that calls to `send` and `reserve`
+ /// complete in the order they were requested. Cancelling a call to
+ /// `send` makes you lose your place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
+ /// }
+ /// ```
+ pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
+ match self.reserve().await {
+ Ok(permit) => {
+ permit.send(value);
+ Ok(())
+ }
+ Err(_) => Err(SendError(value)),
+ }
+ }
+
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once the channel is closed, it stays closed
+ /// forever and all future calls to `closed` will return immediately.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx1, rx) = mpsc::channel::<()>(1);
+ /// let tx2 = tx1.clone();
+ /// let tx3 = tx1.clone();
+ /// let tx4 = tx1.clone();
+ /// let tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ /// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&self) {
+ self.chan.closed().await
+ }
+
+ /// Attempts to immediately send a message on this `Sender`
+ ///
+ /// This method differs from [`send`] by returning immediately if the channel's
+ /// buffer is full or no receiver is waiting to acquire some data. Compared
+ /// with [`send`], this function has two failure cases instead of one (one for
+ /// disconnection, one for a full buffer).
+ ///
+ /// # Errors
+ ///
+ /// If the channel capacity has been reached, i.e., the channel has `n`
+ /// buffered values where `n` is the argument passed to [`channel`], then an
+ /// error is returned.
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`Receiver`] handle dropping, the function returns
+ /// an error. The error includes the value passed to `send`.
+ ///
+ /// [`send`]: Sender::send
+ /// [`channel`]: channel
+ /// [`close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // Create a channel with buffer size 1
+ /// let (tx1, mut rx) = mpsc::channel(1);
+ /// let tx2 = tx1.clone();
+ ///
+ /// tokio::spawn(async move {
+ /// tx1.send(1).await.unwrap();
+ /// tx1.send(2).await.unwrap();
+ /// // task waits until the receiver receives a value.
+ /// });
+ ///
+ /// tokio::spawn(async move {
+ /// // This will return an error and send
+ /// // no message if the buffer is full
+ /// let _ = tx2.try_send(3);
+ /// });
+ ///
+ /// let mut msg;
+ /// msg = rx.recv().await.unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// msg = rx.recv().await.unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// // Third message may have never been sent
+ /// match rx.recv().await {
+ /// Some(msg) => println!("message {} received", msg),
+ /// None => println!("the third message was never sent"),
+ /// }
+ /// }
+ /// ```
+ pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
+ }
+
+ // Send the message
+ self.chan.send(message);
+ Ok(())
+ }
+
+ /// Sends a value, waiting until there is capacity, but only for a limited time.
+ ///
+ /// Shares the same success and error conditions as [`send`], adding one more
+ /// condition for an unsuccessful send, which is when the provided timeout has
+ /// elapsed, and there is no capacity available.
+ ///
+ /// [`send`]: Sender::send
+ ///
+ /// # Errors
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`Receiver`] having been dropped,
+ /// the function returns an error. The error includes the value passed to `send`.
+ ///
+ /// [`close`]: Receiver::close
+ /// [`Receiver`]: Receiver
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is called outside the context of a Tokio
+ /// runtime [with time enabled](crate::runtime::Builder::enable_time).
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send_timeout` will block until the
+ /// previously sent value was received, unless the timeout has elapsed.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
+ /// println!("send error: #{:?}", e);
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// sleep(Duration::from_millis(200)).await;
+ /// }
+ /// }
+ /// ```
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ pub async fn send_timeout(
+ &self,
+ value: T,
+ timeout: Duration,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let permit = match crate::time::timeout(timeout, self.reserve()).await {
+ Err(_) => {
+ return Err(SendTimeoutError::Timeout(value));
+ }
+ Ok(Err(_)) => {
+ return Err(SendTimeoutError::Closed(value));
+ }
+ Ok(Ok(permit)) => permit,
+ };
+
+ permit.send(value);
+ Ok(())
+ }
+
+ /// Blocking send to call outside of asynchronous contexts.
+ ///
+ /// This method is intended for use cases where you are sending from
+ /// synchronous code to asynchronous code, and will work even if the
+ /// receiver is not using [`blocking_recv`] to receive the message.
+ ///
+ /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::runtime::Runtime;
+ /// use tokio::sync::mpsc;
+ ///
+ /// fn main() {
+ /// let (tx, mut rx) = mpsc::channel::<u8>(1);
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// tx.blocking_send(10).unwrap();
+ /// });
+ ///
+ /// Runtime::new().unwrap().block_on(async move {
+ /// assert_eq!(Some(10), rx.recv().await);
+ /// });
+ /// sync_code.join().unwrap()
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
+ crate::future::block_on(self.send(value))
+ }
+
+ /// Checks if the channel has been closed. This happens when the
+ /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
+ /// called.
+ ///
+ /// [`Receiver`]: crate::sync::mpsc::Receiver
+ /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
+ /// assert!(!tx.is_closed());
+ ///
+ /// let tx2 = tx.clone();
+ /// assert!(!tx2.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// assert!(tx2.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.chan.is_closed()
+ }
+
+ /// Waits for channel capacity. Once capacity to send one message is
+ /// available, it is reserved for the caller.
+ ///
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. A [`Permit`] is returned to track
+ /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
+ /// reserved capacity.
+ ///
+ /// Dropping [`Permit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`Permit`]: Permit
+ /// [`send`]: Permit::send
+ ///
+ /// # Cancel safety
+ ///
+ /// This channel uses a queue to ensure that calls to `send` and `reserve`
+ /// complete in the order they were requested. Cancelling a call to
+ /// `reserve` makes you lose your place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ /// }
+ /// ```
+ pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(Permit { chan: &self.chan })
+ }
+
+ /// Waits for channel capacity, moving the `Sender` and returning an owned
+ /// permit. Once capacity to send one message is available, it is reserved
+ /// for the caller.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `reserve_owned`.
+ ///
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. An [`OwnedPermit`] is returned to
+ /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
+ /// consumes the reserved capacity.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the
+ /// capacity back to the channel.
+ ///
+ /// # Cancel safety
+ ///
+ /// This channel uses a queue to ensure that calls to `send` and `reserve`
+ /// complete in the order they were requested. Cancelling a call to
+ /// `reserve_owned` makes you lose your place in the queue.
+ ///
+ /// # Examples
+ /// Sending a message using an [`OwnedPermit`]:
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity, moving the sender.
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message, consuming the permit and returning
+ /// // the moved sender.
+ /// let tx = permit.send(123);
+ ///
+ /// // The value sent on the permit is received.
+ /// assert_eq!(rx.recv().await.unwrap(), 123);
+ ///
+ /// // The sender can now be used again.
+ /// tx.send(456).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
+ /// by value, it can be inexpensively cloned before calling `reserve_owned`:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity.
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Sending on the permit succeeds.
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ /// }
+ /// ```
+ ///
+ /// [`Sender::reserve`]: Sender::reserve
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
+ async fn reserve_inner(&self) -> Result<(), SendError<()>> {
+ match self.chan.semaphore().0.acquire(1).await {
+ Ok(_) => Ok(()),
+ Err(_) => Err(SendError(())),
+ }
+ }
+
+ /// Tries to acquire a slot in the channel without waiting for the slot to become
+ /// available.
+ ///
+ /// If the channel is full this function will return [`TrySendError`], otherwise
+ /// if there is a slot available it will return a [`Permit`] that will then allow you
+ /// to [`send`] on the channel with a guaranteed slot. This function is similar to
+ /// [`reserve`] except it does not await for the slot to become available.
+ ///
+ /// Dropping [`Permit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`Permit`]: Permit
+ /// [`send`]: Permit::send
+ /// [`reserve`]: Sender::reserve
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.try_reserve().unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Trying to reserve an additional slot on the `tx` will
+ /// // fail because there is no capacity.
+ /// assert!(tx.try_reserve().is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// }
+ /// ```
+ pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
+ }
+
+ Ok(Permit { chan: &self.chan })
+ }
+
+ /// Tries to acquire a slot in the channel without waiting for the slot to become
+ /// available, returning an owned permit.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `try_reserve_owned`.
+ ///
+ /// If the channel is full this function will return a [`TrySendError`].
+ /// Since the sender is taken by value, the `TrySendError` returned in this
+ /// case contains the sender, so that it may be used again. Otherwise, if
+ /// there is a slot available, this method will return an [`OwnedPermit`]
+ /// that can then be used to [`send`] on the channel with a guaranteed slot.
+ /// This function is similar to [`reserve_owned`] except it does not await
+ /// for the slot to become available.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`reserve_owned`]: Sender::reserve_owned
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.clone().try_reserve_owned().unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Trying to reserve an additional slot on the `tx` will
+ /// // fail because there is no capacity.
+ /// assert!(tx.try_reserve().is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// }
+ /// ```
+ pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
+ }
+
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
+ /// let tx2 = tx.clone();
+ /// assert!(tx.same_channel(&tx2));
+ ///
+ /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
+ /// assert!(!tx3.same_channel(&tx2));
+ /// ```
+ pub fn same_channel(&self, other: &Self) -> bool {
+ self.chan.same_channel(&other.chan)
+ }
+
+ /// Returns the current capacity of the channel.
+ ///
+ /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
+ /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel::<()>(5);
+ ///
+ /// assert_eq!(tx.capacity(), 5);
+ ///
+ /// // Making a reservation drops the capacity by one.
+ /// let permit = tx.reserve().await.unwrap();
+ /// assert_eq!(tx.capacity(), 4);
+ ///
+ /// // Sending and receiving a value increases the capacity by one.
+ /// permit.send(());
+ /// rx.recv().await.unwrap();
+ /// assert_eq!(tx.capacity(), 5);
+ /// }
+ /// ```
+ ///
+ /// [`send`]: Sender::send
+ /// [`reserve`]: Sender::reserve
+ pub fn capacity(&self) -> usize {
+ self.chan.semaphore().0.available_permits()
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+// ===== impl Permit =====
+
+impl<T> Permit<'_, T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Send a message on the permit
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ /// }
+ /// ```
+ pub fn send(self, value: T) {
+ use std::mem;
+
+ self.chan.send(value);
+
+ // Avoid the drop logic
+ mem::forget(self);
+ }
+}
+
+impl<T> Drop for Permit<'_, T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ let semaphore = self.chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ // If this is the last sender for this channel, wake the receiver so
+ // that it can be notified that the channel is closed.
+ if semaphore.is_closed() && semaphore.is_idle() {
+ self.chan.wake_rx();
+ }
+ }
+}
+
+impl<T> fmt::Debug for Permit<'_, T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Permit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+// ===== impl Permit =====
+
+impl<T> OwnedPermit<T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
+ /// the `OwnedPermit` was reserved.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message on the permit, returning the sender.
+ /// let tx = permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// }
+ /// ```
+ pub fn send(mut self, value: T) -> Sender<T> {
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+ chan.send(value);
+
+ Sender { chan }
+ }
+
+ /// Releases the reserved capacity *without* sending a message, returning the
+ /// [`Sender`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send on the original `tx` will fail, since the `permit`
+ /// // has reserved all the available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Release the permit without sending a message, returning the clone
+ /// // of the sender.
+ /// let tx2 = permit.release();
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// # drop(rx); drop(tx2);
+ /// }
+ /// ```
+ ///
+ /// [`Sender`]: Sender
+ pub fn release(mut self) -> Sender<T> {
+ use chan::Semaphore;
+
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+
+ // Add the permit back to the semaphore
+ chan.semaphore().add_permit();
+ Sender { chan }
+ }
+}
+
+impl<T> Drop for OwnedPermit<T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ // Are we still holding onto the sender?
+ if let Some(chan) = self.chan.take() {
+ let semaphore = chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ // If this `OwnedPermit` is holding the last sender for this
+ // channel, wake the receiver so that it can be notified that the
+ // channel is closed.
+ if semaphore.is_closed() && semaphore.is_idle() {
+ chan.wake_rx();
+ }
+ }
+
+ // Otherwise, do nothing.
+ }
+}
+
+impl<T> fmt::Debug for OwnedPermit<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("OwnedPermit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/sync/mpsc/chan.rs b/third_party/rust/tokio/src/sync/mpsc/chan.rs
new file mode 100644
index 0000000000..c3007de89c
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/chan.rs
@@ -0,0 +1,405 @@
+use crate::loom::cell::UnsafeCell;
+use crate::loom::future::AtomicWaker;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Arc;
+use crate::park::thread::CachedParkThread;
+use crate::park::Park;
+use crate::sync::mpsc::error::TryRecvError;
+use crate::sync::mpsc::list;
+use crate::sync::notify::Notify;
+
+use std::fmt;
+use std::process;
+use std::sync::atomic::Ordering::{AcqRel, Relaxed};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+
+/// Channel sender.
+pub(crate) struct Tx<T, S> {
+ inner: Arc<Chan<T, S>>,
+}
+
+impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Tx").field("inner", &self.inner).finish()
+ }
+}
+
+/// Channel receiver.
+pub(crate) struct Rx<T, S: Semaphore> {
+ inner: Arc<Chan<T, S>>,
+}
+
+impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Rx").field("inner", &self.inner).finish()
+ }
+}
+
+pub(crate) trait Semaphore {
+ fn is_idle(&self) -> bool;
+
+ fn add_permit(&self);
+
+ fn close(&self);
+
+ fn is_closed(&self) -> bool;
+}
+
+struct Chan<T, S> {
+ /// Notifies all tasks listening for the receiver being dropped.
+ notify_rx_closed: Notify,
+
+ /// Handle to the push half of the lock-free list.
+ tx: list::Tx<T>,
+
+ /// Coordinates access to channel's capacity.
+ semaphore: S,
+
+ /// Receiver waker. Notified when a value is pushed into the channel.
+ rx_waker: AtomicWaker,
+
+ /// Tracks the number of outstanding sender handles.
+ ///
+ /// When this drops to zero, the send half of the channel is closed.
+ tx_count: AtomicUsize,
+
+ /// Only accessed by `Rx` handle.
+ rx_fields: UnsafeCell<RxFields<T>>,
+}
+
+impl<T, S> fmt::Debug for Chan<T, S>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Chan")
+ .field("tx", &self.tx)
+ .field("semaphore", &self.semaphore)
+ .field("rx_waker", &self.rx_waker)
+ .field("tx_count", &self.tx_count)
+ .field("rx_fields", &"...")
+ .finish()
+ }
+}
+
+/// Fields only accessed by `Rx` handle.
+struct RxFields<T> {
+ /// Channel receiver. This field is only accessed by the `Receiver` type.
+ list: list::Rx<T>,
+
+ /// `true` if `Rx::close` is called.
+ rx_closed: bool,
+}
+
+impl<T> fmt::Debug for RxFields<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("RxFields")
+ .field("list", &self.list)
+ .field("rx_closed", &self.rx_closed)
+ .finish()
+ }
+}
+
+unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
+unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
+
+pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
+ let (tx, rx) = list::channel();
+
+ let chan = Arc::new(Chan {
+ notify_rx_closed: Notify::new(),
+ tx,
+ semaphore,
+ rx_waker: AtomicWaker::new(),
+ tx_count: AtomicUsize::new(1),
+ rx_fields: UnsafeCell::new(RxFields {
+ list: rx,
+ rx_closed: false,
+ }),
+ });
+
+ (Tx::new(chan.clone()), Rx::new(chan))
+}
+
+// ===== impl Tx =====
+
+impl<T, S> Tx<T, S> {
+ fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
+ Tx { inner: chan }
+ }
+
+ pub(super) fn semaphore(&self) -> &S {
+ &self.inner.semaphore
+ }
+
+ /// Send a message and notify the receiver.
+ pub(crate) fn send(&self, value: T) {
+ self.inner.send(value);
+ }
+
+ /// Wake the receive half
+ pub(crate) fn wake_rx(&self) {
+ self.inner.rx_waker.wake();
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ pub(crate) fn same_channel(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.inner, &other.inner)
+ }
+}
+
+impl<T, S: Semaphore> Tx<T, S> {
+ pub(crate) fn is_closed(&self) -> bool {
+ self.inner.semaphore.is_closed()
+ }
+
+ pub(crate) async fn closed(&self) {
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check whether the semaphore is closed. If the semaphore is
+ // closed the notification request is dropped.
+ let notified = self.inner.notify_rx_closed.notified();
+
+ if self.inner.semaphore.is_closed() {
+ return;
+ }
+ notified.await;
+ }
+}
+
+impl<T, S> Clone for Tx<T, S> {
+ fn clone(&self) -> Tx<T, S> {
+ // Using a Relaxed ordering here is sufficient as the caller holds a
+ // strong ref to `self`, preventing a concurrent decrement to zero.
+ self.inner.tx_count.fetch_add(1, Relaxed);
+
+ Tx {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl<T, S> Drop for Tx<T, S> {
+ fn drop(&mut self) {
+ if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
+ return;
+ }
+
+ // Close the list, which sends a `Close` message
+ self.inner.tx.close();
+
+ // Notify the receiver
+ self.wake_rx();
+ }
+}
+
+// ===== impl Rx =====
+
+impl<T, S: Semaphore> Rx<T, S> {
+ fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
+ Rx { inner: chan }
+ }
+
+ pub(crate) fn close(&mut self) {
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ if rx_fields.rx_closed {
+ return;
+ }
+
+ rx_fields.rx_closed = true;
+ });
+
+ self.inner.semaphore.close();
+ self.inner.notify_rx_closed.notify_waiters();
+ }
+
+ /// Receive the next value
+ pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ use super::block::Read::*;
+
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ macro_rules! try_recv {
+ () => {
+ match rx_fields.list.pop(&self.inner.tx) {
+ Some(Value(value)) => {
+ self.inner.semaphore.add_permit();
+ coop.made_progress();
+ return Ready(Some(value));
+ }
+ Some(Closed) => {
+ // TODO: This check may not be required as it most
+ // likely can only return `true` at this point. A
+ // channel is closed when all tx handles are
+ // dropped. Dropping a tx handle releases memory,
+ // which ensures that if dropping the tx handle is
+ // visible, then all messages sent are also visible.
+ assert!(self.inner.semaphore.is_idle());
+ coop.made_progress();
+ return Ready(None);
+ }
+ None => {} // fall through
+ }
+ };
+ }
+
+ try_recv!();
+
+ self.inner.rx_waker.register_by_ref(cx.waker());
+
+ // It is possible that a value was pushed between attempting to read
+ // and registering the task, so we have to check the channel a
+ // second time here.
+ try_recv!();
+
+ if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
+ coop.made_progress();
+ Ready(None)
+ } else {
+ Pending
+ }
+ })
+ }
+
+ /// Try to receive the next value.
+ pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ use super::list::TryPopResult;
+
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ macro_rules! try_recv {
+ () => {
+ match rx_fields.list.try_pop(&self.inner.tx) {
+ TryPopResult::Ok(value) => {
+ self.inner.semaphore.add_permit();
+ return Ok(value);
+ }
+ TryPopResult::Closed => return Err(TryRecvError::Disconnected),
+ TryPopResult::Empty => return Err(TryRecvError::Empty),
+ TryPopResult::Busy => {} // fall through
+ }
+ };
+ }
+
+ try_recv!();
+
+ // If a previous `poll_recv` call has set a waker, we wake it here.
+ // This allows us to put our own CachedParkThread waker in the
+ // AtomicWaker slot instead.
+ //
+ // This is not a spurious wakeup to `poll_recv` since we just got a
+ // Busy from `try_pop`, which only happens if there are messages in
+ // the queue.
+ self.inner.rx_waker.wake();
+
+ // Park the thread until the problematic send has completed.
+ let mut park = CachedParkThread::new();
+ let waker = park.unpark().into_waker();
+ loop {
+ self.inner.rx_waker.register_by_ref(&waker);
+ // It is possible that the problematic send has now completed,
+ // so we have to check for messages again.
+ try_recv!();
+ park.park().expect("park failed");
+ }
+ })
+ }
+}
+
+impl<T, S: Semaphore> Drop for Rx<T, S> {
+ fn drop(&mut self) {
+ use super::block::Read::Value;
+
+ self.close();
+
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
+ self.inner.semaphore.add_permit();
+ }
+ })
+ }
+}
+
+// ===== impl Chan =====
+
+impl<T, S> Chan<T, S> {
+ fn send(&self, value: T) {
+ // Push the value
+ self.tx.push(value);
+
+ // Notify the rx task
+ self.rx_waker.wake();
+ }
+}
+
+impl<T, S> Drop for Chan<T, S> {
+ fn drop(&mut self) {
+ use super::block::Read::Value;
+
+ // Safety: the only owner of the rx fields is Chan, and eing
+ // inside its own Drop means we're the last ones to touch it.
+ self.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
+ unsafe { rx_fields.list.free_blocks() };
+ });
+ }
+}
+
+// ===== impl Semaphore for (::Semaphore, capacity) =====
+
+impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
+ fn add_permit(&self) {
+ self.0.release(1)
+ }
+
+ fn is_idle(&self) -> bool {
+ self.0.available_permits() == self.1
+ }
+
+ fn close(&self) {
+ self.0.close();
+ }
+
+ fn is_closed(&self) -> bool {
+ self.0.is_closed()
+ }
+}
+
+// ===== impl Semaphore for AtomicUsize =====
+
+use std::sync::atomic::Ordering::{Acquire, Release};
+use std::usize;
+
+impl Semaphore for AtomicUsize {
+ fn add_permit(&self) {
+ let prev = self.fetch_sub(2, Release);
+
+ if prev >> 1 == 0 {
+ // Something went wrong
+ process::abort();
+ }
+ }
+
+ fn is_idle(&self) -> bool {
+ self.load(Acquire) >> 1 == 0
+ }
+
+ fn close(&self) {
+ self.fetch_or(1, Release);
+ }
+
+ fn is_closed(&self) -> bool {
+ self.load(Acquire) & 1 == 1
+ }
+}
diff --git a/third_party/rust/tokio/src/sync/mpsc/error.rs b/third_party/rust/tokio/src/sync/mpsc/error.rs
new file mode 100644
index 0000000000..3fe6bac5e1
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/error.rs
@@ -0,0 +1,125 @@
+//! Channel error types.
+
+use std::error::Error;
+use std::fmt;
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError<T>(pub T);
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl<T: fmt::Debug> std::error::Error for SendError<T> {}
+
+// ===== TrySendError =====
+
+/// This enumeration is the list of the possible error outcomes for the
+/// [try_send](super::Sender::try_send) method.
+#[derive(Debug, Eq, PartialEq)]
+pub enum TrySendError<T> {
+ /// The data could not be sent on the channel because the channel is
+ /// currently full and sending would require blocking.
+ Full(T),
+
+ /// The receive half of the channel was explicitly closed or has been
+ /// dropped.
+ Closed(T),
+}
+
+impl<T: fmt::Debug> Error for TrySendError<T> {}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TrySendError::Full(..) => "no available capacity",
+ TrySendError::Closed(..) => "channel closed",
+ }
+ )
+ }
+}
+
+impl<T> From<SendError<T>> for TrySendError<T> {
+ fn from(src: SendError<T>) -> TrySendError<T> {
+ TrySendError::Closed(src.0)
+ }
+}
+
+// ===== TryRecvError =====
+
+/// Error returned by `try_recv`.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+pub enum TryRecvError {
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
+ /// disconnected, so data may yet become available.
+ Empty,
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
+ Disconnected,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ TryRecvError::Empty => "receiving on an empty channel".fmt(fmt),
+ TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt),
+ }
+ }
+}
+
+impl Error for TryRecvError {}
+
+// ===== RecvError =====
+
+/// Error returned by `Receiver`.
+#[derive(Debug)]
+#[doc(hidden)]
+#[deprecated(note = "This type is unused because recv returns an Option.")]
+pub struct RecvError(());
+
+#[allow(deprecated)]
+impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+#[allow(deprecated)]
+impl Error for RecvError {}
+
+cfg_time! {
+ // ===== SendTimeoutError =====
+
+ #[derive(Debug, Eq, PartialEq)]
+ /// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)].
+ pub enum SendTimeoutError<T> {
+ /// The data could not be sent on the channel because the channel is
+ /// full, and the timeout to send has elapsed.
+ Timeout(T),
+
+ /// The receive half of the channel was explicitly closed or has been
+ /// dropped.
+ Closed(T),
+ }
+
+ impl<T: fmt::Debug> Error for SendTimeoutError<T> {}
+
+ impl<T> fmt::Display for SendTimeoutError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ SendTimeoutError::Timeout(..) => "timed out waiting on send operation",
+ SendTimeoutError::Closed(..) => "channel closed",
+ }
+ )
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/sync/mpsc/list.rs b/third_party/rust/tokio/src/sync/mpsc/list.rs
new file mode 100644
index 0000000000..e4eeb45411
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/list.rs
@@ -0,0 +1,371 @@
+//! A concurrent, lock-free, FIFO list.
+
+use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
+use crate::loom::thread;
+use crate::sync::mpsc::block::{self, Block};
+
+use std::fmt;
+use std::ptr::NonNull;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+
+/// List queue transmit handle.
+pub(crate) struct Tx<T> {
+ /// Tail in the `Block` mpmc list.
+ block_tail: AtomicPtr<Block<T>>,
+
+ /// Position to push the next message. This references a block and offset
+ /// into the block.
+ tail_position: AtomicUsize,
+}
+
+/// List queue receive handle
+pub(crate) struct Rx<T> {
+ /// Pointer to the block being processed.
+ head: NonNull<Block<T>>,
+
+ /// Next slot index to process.
+ index: usize,
+
+ /// Pointer to the next block pending release.
+ free_head: NonNull<Block<T>>,
+}
+
+/// Return value of `Rx::try_pop`.
+pub(crate) enum TryPopResult<T> {
+ /// Successfully popped a value.
+ Ok(T),
+ /// The channel is empty.
+ Empty,
+ /// The channel is empty and closed.
+ Closed,
+ /// The channel is not empty, but the first value is being written.
+ Busy,
+}
+
+pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
+ // Create the initial block shared between the tx and rx halves.
+ let initial_block = Box::new(Block::new(0));
+ let initial_block_ptr = Box::into_raw(initial_block);
+
+ let tx = Tx {
+ block_tail: AtomicPtr::new(initial_block_ptr),
+ tail_position: AtomicUsize::new(0),
+ };
+
+ let head = NonNull::new(initial_block_ptr).unwrap();
+
+ let rx = Rx {
+ head,
+ index: 0,
+ free_head: head,
+ };
+
+ (tx, rx)
+}
+
+impl<T> Tx<T> {
+ /// Pushes a value into the list.
+ pub(crate) fn push(&self, value: T) {
+ // First, claim a slot for the value. `Acquire` is used here to
+ // synchronize with the `fetch_add` in `reclaim_blocks`.
+ let slot_index = self.tail_position.fetch_add(1, Acquire);
+
+ // Load the current block and write the value
+ let block = self.find_block(slot_index);
+
+ unsafe {
+ // Write the value to the block
+ block.as_ref().write(slot_index, value);
+ }
+ }
+
+ /// Closes the send half of the list.
+ ///
+ /// Similar process as pushing a value, but instead of writing the value &
+ /// setting the ready flag, the TX_CLOSED flag is set on the block.
+ pub(crate) fn close(&self) {
+ // First, claim a slot for the value. This is the last slot that will be
+ // claimed.
+ let slot_index = self.tail_position.fetch_add(1, Acquire);
+
+ let block = self.find_block(slot_index);
+
+ unsafe { block.as_ref().tx_close() }
+ }
+
+ fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
+ // The start index of the block that contains `index`.
+ let start_index = block::start_index(slot_index);
+
+ // The index offset into the block
+ let offset = block::offset(slot_index);
+
+ // Load the current head of the block
+ let mut block_ptr = self.block_tail.load(Acquire);
+
+ let block = unsafe { &*block_ptr };
+
+ // Calculate the distance between the tail ptr and the target block
+ let distance = block.distance(start_index);
+
+ // Decide if this call to `find_block` should attempt to update the
+ // `block_tail` pointer.
+ //
+ // Updating `block_tail` is not always performed in order to reduce
+ // contention.
+ //
+ // When set, as the routine walks the linked list, it attempts to update
+ // `block_tail`. If the update cannot be performed, `try_updating_tail`
+ // is unset.
+ let mut try_updating_tail = distance > offset;
+
+ // Walk the linked list of blocks until the block with `start_index` is
+ // found.
+ loop {
+ let block = unsafe { &(*block_ptr) };
+
+ if block.is_at_index(start_index) {
+ return unsafe { NonNull::new_unchecked(block_ptr) };
+ }
+
+ let next_block = block
+ .load_next(Acquire)
+ // There is no allocated next block, grow the linked list.
+ .unwrap_or_else(|| block.grow());
+
+ // If the block is **not** final, then the tail pointer cannot be
+ // advanced any more.
+ try_updating_tail &= block.is_final();
+
+ if try_updating_tail {
+ // Advancing `block_tail` must happen when walking the linked
+ // list. `block_tail` may not advance passed any blocks that are
+ // not "final". At the point a block is finalized, it is unknown
+ // if there are any prior blocks that are unfinalized, which
+ // makes it impossible to advance `block_tail`.
+ //
+ // While walking the linked list, `block_tail` can be advanced
+ // as long as finalized blocks are traversed.
+ //
+ // Release ordering is used to ensure that any subsequent reads
+ // are able to see the memory pointed to by `block_tail`.
+ //
+ // Acquire is not needed as any "actual" value is not accessed.
+ // At this point, the linked list is walked to acquire blocks.
+ if self
+ .block_tail
+ .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
+ .is_ok()
+ {
+ // Synchronize with any senders
+ let tail_position = self.tail_position.fetch_add(0, Release);
+
+ unsafe {
+ block.tx_release(tail_position);
+ }
+ } else {
+ // A concurrent sender is also working on advancing
+ // `block_tail` and this thread is falling behind.
+ //
+ // Stop trying to advance the tail pointer
+ try_updating_tail = false;
+ }
+ }
+
+ block_ptr = next_block.as_ptr();
+
+ thread::yield_now();
+ }
+ }
+
+ pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
+ // The block has been removed from the linked list and ownership
+ // is reclaimed.
+ //
+ // Before dropping the block, see if it can be reused by
+ // inserting it back at the end of the linked list.
+ //
+ // First, reset the data
+ block.as_mut().reclaim();
+
+ let mut reused = false;
+
+ // Attempt to insert the block at the end
+ //
+ // Walk at most three times
+ //
+ let curr_ptr = self.block_tail.load(Acquire);
+
+ // The pointer can never be null
+ debug_assert!(!curr_ptr.is_null());
+
+ let mut curr = NonNull::new_unchecked(curr_ptr);
+
+ // TODO: Unify this logic with Block::grow
+ for _ in 0..3 {
+ match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
+ Ok(_) => {
+ reused = true;
+ break;
+ }
+ Err(next) => {
+ curr = next;
+ }
+ }
+ }
+
+ if !reused {
+ let _ = Box::from_raw(block.as_ptr());
+ }
+ }
+}
+
+impl<T> fmt::Debug for Tx<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Tx")
+ .field("block_tail", &self.block_tail.load(Relaxed))
+ .field("tail_position", &self.tail_position.load(Relaxed))
+ .finish()
+ }
+}
+
+impl<T> Rx<T> {
+ /// Pops the next value off the queue.
+ pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
+ // Advance `head`, if needed
+ if !self.try_advancing_head() {
+ return None;
+ }
+
+ self.reclaim_blocks(tx);
+
+ unsafe {
+ let block = self.head.as_ref();
+
+ let ret = block.read(self.index);
+
+ if let Some(block::Read::Value(..)) = ret {
+ self.index = self.index.wrapping_add(1);
+ }
+
+ ret
+ }
+ }
+
+ /// Pops the next value off the queue, detecting whether the block
+ /// is busy or empty on failure.
+ ///
+ /// This function exists because `Rx::pop` can return `None` even if the
+ /// channel's queue contains a message that has been completely written.
+ /// This can happen if the fully delivered message is behind another message
+ /// that is in the middle of being written to the block, since the channel
+ /// can't return the messages out of order.
+ pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
+ let tail_position = tx.tail_position.load(Acquire);
+ let result = self.pop(tx);
+
+ match result {
+ Some(block::Read::Value(t)) => TryPopResult::Ok(t),
+ Some(block::Read::Closed) => TryPopResult::Closed,
+ None if tail_position == self.index => TryPopResult::Empty,
+ None => TryPopResult::Busy,
+ }
+ }
+
+ /// Tries advancing the block pointer to the block referenced by `self.index`.
+ ///
+ /// Returns `true` if successful, `false` if there is no next block to load.
+ fn try_advancing_head(&mut self) -> bool {
+ let block_index = block::start_index(self.index);
+
+ loop {
+ let next_block = {
+ let block = unsafe { self.head.as_ref() };
+
+ if block.is_at_index(block_index) {
+ return true;
+ }
+
+ block.load_next(Acquire)
+ };
+
+ let next_block = match next_block {
+ Some(next_block) => next_block,
+ None => {
+ return false;
+ }
+ };
+
+ self.head = next_block;
+
+ thread::yield_now();
+ }
+ }
+
+ fn reclaim_blocks(&mut self, tx: &Tx<T>) {
+ while self.free_head != self.head {
+ unsafe {
+ // Get a handle to the block that will be freed and update
+ // `free_head` to point to the next block.
+ let block = self.free_head;
+
+ let observed_tail_position = block.as_ref().observed_tail_position();
+
+ let required_index = match observed_tail_position {
+ Some(i) => i,
+ None => return,
+ };
+
+ if required_index > self.index {
+ return;
+ }
+
+ // We may read the next pointer with `Relaxed` ordering as it is
+ // guaranteed that the `reclaim_blocks` routine trails the `recv`
+ // routine. Any memory accessed by `reclaim_blocks` has already
+ // been acquired by `recv`.
+ let next_block = block.as_ref().load_next(Relaxed);
+
+ // Update the free list head
+ self.free_head = next_block.unwrap();
+
+ // Push the emptied block onto the back of the queue, making it
+ // available to senders.
+ tx.reclaim_block(block);
+ }
+
+ thread::yield_now();
+ }
+ }
+
+ /// Effectively `Drop` all the blocks. Should only be called once, when
+ /// the list is dropping.
+ pub(super) unsafe fn free_blocks(&mut self) {
+ debug_assert_ne!(self.free_head, NonNull::dangling());
+
+ let mut cur = Some(self.free_head);
+
+ #[cfg(debug_assertions)]
+ {
+ // to trigger the debug assert above so as to catch that we
+ // don't call `free_blocks` more than once.
+ self.free_head = NonNull::dangling();
+ self.head = NonNull::dangling();
+ }
+
+ while let Some(block) = cur {
+ cur = block.as_ref().load_next(Relaxed);
+ drop(Box::from_raw(block.as_ptr()));
+ }
+ }
+}
+
+impl<T> fmt::Debug for Rx<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Rx")
+ .field("head", &self.head)
+ .field("index", &self.index)
+ .field("free_head", &self.free_head)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/sync/mpsc/mod.rs b/third_party/rust/tokio/src/sync/mpsc/mod.rs
new file mode 100644
index 0000000000..b1513a9da5
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/mod.rs
@@ -0,0 +1,115 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
+//! A multi-producer, single-consumer queue for sending values between
+//! asynchronous tasks.
+//!
+//! This module provides two variants of the channel: bounded and unbounded. The
+//! bounded variant has a limit on the number of messages that the channel can
+//! store, and if this limit is reached, trying to send another message will
+//! wait until a message is received from the channel. An unbounded channel has
+//! an infinite capacity, so the `send` method will always complete immediately.
+//! This makes the [`UnboundedSender`] usable from both synchronous and
+//! asynchronous code.
+//!
+//! Similar to the `mpsc` channels provided by `std`, the channel constructor
+//! functions provide separate send and receive handles, [`Sender`] and
+//! [`Receiver`] for the bounded channel, [`UnboundedSender`] and
+//! [`UnboundedReceiver`] for the unbounded channel. If there is no message to read,
+//! the current task will be notified when a new value is sent. [`Sender`] and
+//! [`UnboundedSender`] allow sending values into the channel. If the bounded
+//! channel is at capacity, the send is rejected and the task will be notified
+//! when additional capacity is available. In other words, the channel provides
+//! backpressure.
+//!
+//!
+//! # Disconnection
+//!
+//! When all [`Sender`] handles have been dropped, it is no longer
+//! possible to send values into the channel. This is considered the termination
+//! event of the stream. As such, `Receiver::poll` returns `Ok(Ready(None))`.
+//!
+//! If the [`Receiver`] handle is dropped, then messages can no longer
+//! be read out of the channel. In this case, all further attempts to send will
+//! result in an error.
+//!
+//! # Clean Shutdown
+//!
+//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to
+//! remain in the channel. Instead, it is usually desirable to perform a "clean"
+//! shutdown. To do this, the receiver first calls `close`, which will prevent
+//! any further messages to be sent into the channel. Then, the receiver
+//! consumes the channel to completion, at which point the receiver can be
+//! dropped.
+//!
+//! # Communicating between sync and async code
+//!
+//! When you want to communicate between synchronous and asynchronous code, there
+//! are two situations to consider:
+//!
+//! **Bounded channel**: If you need a bounded channel, you should use a bounded
+//! Tokio `mpsc` channel for both directions of communication. Instead of calling
+//! the async [`send`][bounded-send] or [`recv`][bounded-recv] methods, in
+//! synchronous code you will need to use the [`blocking_send`][blocking-send] or
+//! [`blocking_recv`][blocking-recv] methods.
+//!
+//! **Unbounded channel**: You should use the kind of channel that matches where
+//! the receiver is. So for sending a message _from async to sync_, you should
+//! use [the standard library unbounded channel][std-unbounded] or
+//! [crossbeam][crossbeam-unbounded]. Similarly, for sending a message _from sync
+//! to async_, you should use an unbounded Tokio `mpsc` channel.
+//!
+//! Please be aware that the above remarks were written with the `mpsc` channel
+//! in mind, but they can also be generalized to other kinds of channels. In
+//! general, any channel method that isn't marked async can be called anywhere,
+//! including outside of the runtime. For example, sending a message on a
+//! oneshot channel from outside the runtime is perfectly fine.
+//!
+//! # Multiple runtimes
+//!
+//! The mpsc channel does not care about which runtime you use it in, and can be
+//! used to send messages from one runtime to another. It can also be used in
+//! non-Tokio runtimes.
+//!
+//! There is one exception to the above: the [`send_timeout`] must be used from
+//! within a Tokio runtime, however it is still not tied to one specific Tokio
+//! runtime, and the sender may be moved from one Tokio runtime to another.
+//!
+//! [`Sender`]: crate::sync::mpsc::Sender
+//! [`Receiver`]: crate::sync::mpsc::Receiver
+//! [bounded-send]: crate::sync::mpsc::Sender::send()
+//! [bounded-recv]: crate::sync::mpsc::Receiver::recv()
+//! [blocking-send]: crate::sync::mpsc::Sender::blocking_send()
+//! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv()
+//! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
+//! [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
+//! [`Handle::block_on`]: crate::runtime::Handle::block_on()
+//! [std-unbounded]: std::sync::mpsc::channel
+//! [crossbeam-unbounded]: https://docs.rs/crossbeam/*/crossbeam/channel/fn.unbounded.html
+//! [`send_timeout`]: crate::sync::mpsc::Sender::send_timeout
+
+pub(super) mod block;
+
+mod bounded;
+pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};
+
+mod chan;
+
+pub(super) mod list;
+
+mod unbounded;
+pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};
+
+pub mod error;
+
+/// The number of values a block can contain.
+///
+/// This value must be a power of 2. It also must be smaller than the number of
+/// bits in `usize`.
+#[cfg(all(target_pointer_width = "64", not(loom)))]
+const BLOCK_CAP: usize = 32;
+
+#[cfg(all(not(target_pointer_width = "64"), not(loom)))]
+const BLOCK_CAP: usize = 16;
+
+#[cfg(loom)]
+const BLOCK_CAP: usize = 2;
diff --git a/third_party/rust/tokio/src/sync/mpsc/unbounded.rs b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs
new file mode 100644
index 0000000000..b133f9f35e
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs
@@ -0,0 +1,373 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::{SendError, TryRecvError};
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `UnboundedReceiver`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](unbounded_channel) function.
+pub struct UnboundedSender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ UnboundedSender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for UnboundedSender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedSender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `UnboundedSender`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](unbounded_channel) function.
+///
+/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
+///
+/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
+pub struct UnboundedReceiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for UnboundedReceiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedReceiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Creates an unbounded mpsc channel for communicating between asynchronous
+/// tasks without backpressure.
+///
+/// A `send` on this channel will always succeed as long as the receive half has
+/// not been closed. If the receiver falls behind, messages will be arbitrarily
+/// buffered.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (tx, rx) = chan::channel(AtomicUsize::new(0));
+
+ let tx = UnboundedSender::new(tx);
+ let rx = UnboundedReceiver::new(rx);
+
+ (tx, rx)
+}
+
+/// No capacity
+type Semaphore = AtomicUsize;
+
+impl<T> UnboundedReceiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
+ UnboundedReceiver { chan }
+ }
+
+ /// Receives the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `recv` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, it is guaranteed that no messages were received on this
+ /// channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tx.send("hello").unwrap();
+ /// tx.send("world").unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use crate::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ /// Tries to receive the next value for this receiver.
+ ///
+ /// This method returns the [`Empty`] error if the channel is currently
+ /// empty, but there are still outstanding [senders] or [permits].
+ ///
+ /// This method returns the [`Disconnected`] error if the channel is
+ /// currently empty, and there are no outstanding [senders] or [permits].
+ ///
+ /// Unlike the [`poll_recv`] method, this method will never return an
+ /// [`Empty`] error spuriously.
+ ///
+ /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
+ /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
+ /// [`poll_recv`]: Self::poll_recv
+ /// [senders]: crate::sync::mpsc::Sender
+ /// [permits]: crate::sync::mpsc::Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ /// use tokio::sync::mpsc::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tx.send("hello").unwrap();
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ ///
+ /// tx.send("hello").unwrap();
+ /// // Drop the last sender, closing the channel.
+ /// drop(tx);
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
+ /// Blocking receive to call outside of asynchronous contexts.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// assert_eq!(Some(10), rx.blocking_recv());
+ /// });
+ ///
+ /// let _ = tx.send(10);
+ /// sync_code.join().unwrap();
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_recv(&mut self) -> Option<T> {
+ crate::future::block_on(self.recv())
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+
+ /// Polls to receive the next message on this channel.
+ ///
+ /// This method returns:
+ ///
+ /// * `Poll::Pending` if no messages are available but the channel is not
+ /// closed, or if a spurious failure happens.
+ /// * `Poll::Ready(Some(message))` if a message is available.
+ /// * `Poll::Ready(None)` if the channel has been closed and all messages
+ /// sent before it was closed have been received.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided
+ /// `Context` is scheduled to receive a wakeup when a message is sent on any
+ /// receiver, or when the channel is closed. Note that on multiple calls to
+ /// `poll_recv`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup.
+ ///
+ /// If this method returns `Poll::Pending` due to a spurious failure, then
+ /// the `Waker` will be notified when the situation causing the spurious
+ /// failure has been resolved. Note that receiving such a wakeup does not
+ /// guarantee that the next call will succeed — it could fail with another
+ /// spurious failure.
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
+ UnboundedSender { chan }
+ }
+
+ /// Attempts to send a message on this `UnboundedSender` without blocking.
+ ///
+ /// This method is not marked async because sending a message to an unbounded channel
+ /// never requires any form of waiting. Because of this, the `send` method can be
+ /// used in both synchronous and asynchronous code without problems.
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`UnboundedReceiver`] having been dropped, this
+ /// function returns an error. The error includes the value passed to `send`.
+ ///
+ /// [`close`]: UnboundedReceiver::close
+ /// [`UnboundedReceiver`]: UnboundedReceiver
+ pub fn send(&self, message: T) -> Result<(), SendError<T>> {
+ if !self.inc_num_messages() {
+ return Err(SendError(message));
+ }
+
+ self.chan.send(message);
+ Ok(())
+ }
+
+ fn inc_num_messages(&self) -> bool {
+ use std::process;
+ use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+ let mut curr = self.chan.semaphore().load(Acquire);
+
+ loop {
+ if curr & 1 == 1 {
+ return false;
+ }
+
+ if curr == usize::MAX ^ 1 {
+ // Overflowed the ref count. There is no safe way to recover, so
+ // abort the process. In practice, this should never happen.
+ process::abort()
+ }
+
+ match self
+ .chan
+ .semaphore()
+ .compare_exchange(curr, curr + 2, AcqRel, Acquire)
+ {
+ Ok(_) => return true,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once the channel is closed, it stays closed
+ /// forever and all future calls to `closed` will return immediately.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ /// let tx2 = tx1.clone();
+ /// let tx3 = tx1.clone();
+ /// let tx4 = tx1.clone();
+ /// let tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&self) {
+ self.chan.closed().await
+ }
+
+ /// Checks if the channel has been closed. This happens when the
+ /// [`UnboundedReceiver`] is dropped, or when the
+ /// [`UnboundedReceiver::close`] method is called.
+ ///
+ /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
+ /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
+ /// assert!(!tx.is_closed());
+ ///
+ /// let tx2 = tx.clone();
+ /// assert!(!tx2.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// assert!(tx2.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.chan.is_closed()
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
+ /// let tx2 = tx.clone();
+ /// assert!(tx.same_channel(&tx2));
+ ///
+ /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
+ /// assert!(!tx3.same_channel(&tx2));
+ /// ```
+ pub fn same_channel(&self, other: &Self) -> bool {
+ self.chan.same_channel(&other.chan)
+ }
+}