summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-channel/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/futures-channel/src
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--third_party/rust/futures-channel/src/lib.rs42
-rw-r--r--third_party/rust/futures-channel/src/lock.rs102
-rw-r--r--third_party/rust/futures-channel/src/mpsc/mod.rs1359
-rw-r--r--third_party/rust/futures-channel/src/mpsc/queue.rs174
-rw-r--r--third_party/rust/futures-channel/src/mpsc/sink_impl.rs73
-rw-r--r--third_party/rust/futures-channel/src/oneshot.rs488
6 files changed, 2238 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/src/lib.rs b/third_party/rust/futures-channel/src/lib.rs
new file mode 100644
index 0000000000..4cd936d552
--- /dev/null
+++ b/third_party/rust/futures-channel/src/lib.rs
@@ -0,0 +1,42 @@
+//! Asynchronous channels.
+//!
+//! Like threads, concurrent tasks sometimes need to communicate with each
+//! other. This module contains two basic abstractions for doing so:
+//!
+//! - [oneshot], a way of sending a single value from one task to another.
+//! - [mpsc], a multi-producer, single-consumer channel for sending values
+//! between tasks, analogous to the similarly-named structure in the standard
+//! library.
+//!
+//! All items are only available when the `std` or `alloc` feature of this
+//! library is activated, and it is activated by default.
+
+#![cfg_attr(not(feature = "std"), no_std)]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+extern crate alloc;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod lock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "std")]
+pub mod mpsc;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub mod oneshot;
diff --git a/third_party/rust/futures-channel/src/lock.rs b/third_party/rust/futures-channel/src/lock.rs
new file mode 100644
index 0000000000..b328d0f7dd
--- /dev/null
+++ b/third_party/rust/futures-channel/src/lock.rs
@@ -0,0 +1,102 @@
+//! A "mutex" which only supports `try_lock`
+//!
+//! As a futures library the eventual call to an event loop should be the only
+//! thing that ever blocks, so this is assisted with a fast user-space
+//! implementation of a lock that can only have a `try_lock` operation.
+
+use core::cell::UnsafeCell;
+use core::ops::{Deref, DerefMut};
+use core::sync::atomic::AtomicBool;
+use core::sync::atomic::Ordering::SeqCst;
+
+/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
+///
+/// This lock only supports the `try_lock` operation, however, and does not
+/// implement poisoning.
+#[derive(Debug)]
+pub(crate) struct Lock<T> {
+ locked: AtomicBool,
+ data: UnsafeCell<T>,
+}
+
+/// Sentinel representing an acquired lock through which the data can be
+/// accessed.
+pub(crate) struct TryLock<'a, T> {
+ __ptr: &'a Lock<T>,
+}
+
+// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
+// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
+//
+// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
+// across threads to be `Sync` because it allows mutable access from multiple
+// threads.
+unsafe impl<T: Send> Send for Lock<T> {}
+unsafe impl<T: Send> Sync for Lock<T> {}
+
+impl<T> Lock<T> {
+ /// Creates a new lock around the given value.
+ pub(crate) fn new(t: T) -> Self {
+ Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) }
+ }
+
+ /// Attempts to acquire this lock, returning whether the lock was acquired or
+ /// not.
+ ///
+ /// If `Some` is returned then the data this lock protects can be accessed
+ /// through the sentinel. This sentinel allows both mutable and immutable
+ /// access.
+ ///
+ /// If `None` is returned then the lock is already locked, either elsewhere
+ /// on this thread or on another thread.
+ pub(crate) fn try_lock(&self) -> Option<TryLock<'_, T>> {
+ if !self.locked.swap(true, SeqCst) {
+ Some(TryLock { __ptr: self })
+ } else {
+ None
+ }
+ }
+}
+
+impl<T> Deref for TryLock<'_, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ unsafe { &*self.__ptr.data.get() }
+ }
+}
+
+impl<T> DerefMut for TryLock<'_, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ //
+ // Additionally, we're the *only* `TryLock` in existence so mutable
+ // access should be ok.
+ unsafe { &mut *self.__ptr.data.get() }
+ }
+}
+
+impl<T> Drop for TryLock<'_, T> {
+ fn drop(&mut self) {
+ self.__ptr.locked.store(false, SeqCst);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Lock;
+
+ #[test]
+ fn smoke() {
+ let a = Lock::new(1);
+ let mut a1 = a.try_lock().unwrap();
+ assert!(a.try_lock().is_none());
+ assert_eq!(*a1, 1);
+ *a1 = 2;
+ drop(a1);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ }
+}
diff --git a/third_party/rust/futures-channel/src/mpsc/mod.rs b/third_party/rust/futures-channel/src/mpsc/mod.rs
new file mode 100644
index 0000000000..cf45fe77fe
--- /dev/null
+++ b/third_party/rust/futures-channel/src/mpsc/mod.rs
@@ -0,0 +1,1359 @@
+//! A multi-producer, single-consumer queue for sending values across
+//! asynchronous tasks.
+//!
+//! Similarly to the `std`, channel creation provides [`Receiver`] and
+//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
+//! read values out of the channel. If there is no message to read from the
+//! channel, the current task will be notified when a new value is sent.
+//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
+//! the channel. If the channel is at capacity, the send will be rejected and
+//! the task will be notified when additional capacity is available. In other
+//! words, the channel provides backpressure.
+//!
+//! Unbounded channels are also available using the `unbounded` constructor.
+//!
+//! # Disconnection
+//!
+//! When all [`Sender`] handles have been dropped, it is no longer
+//! possible to send values into the channel. This is considered the termination
+//! event of the stream. As such, [`Receiver::poll_next`]
+//! will return `Ok(Ready(None))`.
+//!
+//! If the [`Receiver`] handle is dropped, then messages can no longer
+//! be read out of the channel. In this case, all further attempts to send will
+//! result in an error.
+//!
+//! # Clean Shutdown
+//!
+//! If the [`Receiver`] is simply dropped, then it is possible for
+//! there to be messages still in the channel that will not be processed. As
+//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
+//! receiver will first call `close`, which will prevent any further messages to
+//! be sent into the channel. Then, the receiver consumes the channel to
+//! completion, at which point the receiver can be dropped.
+//!
+//! [`Sender`]: struct.Sender.html
+//! [`Receiver`]: struct.Receiver.html
+//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
+//! [`Receiver::poll_next`]:
+//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
+
+// At the core, the channel uses an atomic FIFO queue for message passing. This
+// queue is used as the primary coordination primitive. In order to enforce
+// capacity limits and handle back pressure, a secondary FIFO queue is used to
+// send parked task handles.
+//
+// The general idea is that the channel is created with a `buffer` size of `n`.
+// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
+// slot to hold a message. This allows `Sender` to know for a fact that a send
+// will succeed *before* starting to do the actual work of sending the value.
+// Since most of this work is lock-free, once the work starts, it is impossible
+// to safely revert.
+//
+// If the sender is unable to process a send operation, then the current
+// task is parked and the handle is sent on the parked task queue.
+//
+// Note that the implementation guarantees that the channel capacity will never
+// exceed the configured limit, however there is no *strict* guarantee that the
+// receiver will wake up a parked task *immediately* when a slot becomes
+// available. However, it will almost always unpark a task when a slot becomes
+// available and it is *guaranteed* that a sender will be unparked when the
+// message that caused the sender to become parked is read out of the channel.
+//
+// The steps for sending a message are roughly:
+//
+// 1) Increment the channel message count
+// 2) If the channel is at capacity, push the task handle onto the wait queue
+// 3) Push the message onto the message queue.
+//
+// The steps for receiving a message are roughly:
+//
+// 1) Pop a message from the message queue
+// 2) Pop a task handle from the wait queue
+// 3) Decrement the channel message count.
+//
+// It's important for the order of operations on lock-free structures to happen
+// in reverse order between the sender and receiver. This makes the message
+// queue the primary coordination structure and establishes the necessary
+// happens-before semantics required for the acquire / release semantics used
+// by the queue structure.
+
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::__internal::AtomicWaker;
+use futures_core::task::{Context, Poll, Waker};
+use std::fmt;
+use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
+use std::thread;
+
+use crate::mpsc::queue::Queue;
+
+mod queue;
+#[cfg(feature = "sink")]
+mod sink_impl;
+
+struct UnboundedSenderInner<T> {
+ // Channel state shared between the sender and receiver.
+ inner: Arc<UnboundedInner<T>>,
+}
+
+struct BoundedSenderInner<T> {
+ // Channel state shared between the sender and receiver.
+ inner: Arc<BoundedInner<T>>,
+
+ // Handle to the task that is blocked on this sender. This handle is sent
+ // to the receiver half in order to be notified when the sender becomes
+ // unblocked.
+ sender_task: Arc<Mutex<SenderTask>>,
+
+ // `true` if the sender might be blocked. This is an optimization to avoid
+ // having to lock the mutex most of the time.
+ maybe_parked: bool,
+}
+
+// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
+impl<T> Unpin for UnboundedSenderInner<T> {}
+impl<T> Unpin for BoundedSenderInner<T> {}
+
+/// The transmission end of a bounded mpsc channel.
+///
+/// This value is created by the [`channel`](channel) function.
+pub struct Sender<T>(Option<BoundedSenderInner<T>>);
+
+/// The transmission end of an unbounded mpsc channel.
+///
+/// This value is created by the [`unbounded`](unbounded) function.
+pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
+
+trait AssertKinds: Send + Sync + Clone {}
+impl AssertKinds for UnboundedSender<u32> {}
+
+/// The receiving end of a bounded mpsc channel.
+///
+/// This value is created by the [`channel`](channel) function.
+pub struct Receiver<T> {
+ inner: Option<Arc<BoundedInner<T>>>,
+}
+
+/// The receiving end of an unbounded mpsc channel.
+///
+/// This value is created by the [`unbounded`](unbounded) function.
+pub struct UnboundedReceiver<T> {
+ inner: Option<Arc<UnboundedInner<T>>>,
+}
+
+// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
+impl<T> Unpin for UnboundedReceiver<T> {}
+
+/// The error type for [`Sender`s](Sender) used as `Sink`s.
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct SendError {
+ kind: SendErrorKind,
+}
+
+/// The error type returned from [`try_send`](Sender::try_send).
+#[derive(Clone, PartialEq, Eq)]
+pub struct TrySendError<T> {
+ err: SendError,
+ val: T,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+enum SendErrorKind {
+ Full,
+ Disconnected,
+}
+
+/// The error type returned from [`try_next`](Receiver::try_next).
+pub struct TryRecvError {
+ _priv: (),
+}
+
+impl fmt::Display for SendError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if self.is_full() {
+ write!(f, "send failed because channel is full")
+ } else {
+ write!(f, "send failed because receiver is gone")
+ }
+ }
+}
+
+impl std::error::Error for SendError {}
+
+impl SendError {
+ /// Returns `true` if this error is a result of the channel being full.
+ pub fn is_full(&self) -> bool {
+ match self.kind {
+ SendErrorKind::Full => true,
+ _ => false,
+ }
+ }
+
+ /// Returns `true` if this error is a result of the receiver being dropped.
+ pub fn is_disconnected(&self) -> bool {
+ match self.kind {
+ SendErrorKind::Disconnected => true,
+ _ => false,
+ }
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if self.is_full() {
+ write!(f, "send failed because channel is full")
+ } else {
+ write!(f, "send failed because receiver is gone")
+ }
+ }
+}
+
+impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
+
+impl<T> TrySendError<T> {
+ /// Returns `true` if this error is a result of the channel being full.
+ pub fn is_full(&self) -> bool {
+ self.err.is_full()
+ }
+
+ /// Returns `true` if this error is a result of the receiver being dropped.
+ pub fn is_disconnected(&self) -> bool {
+ self.err.is_disconnected()
+ }
+
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ self.val
+ }
+
+ /// Drops the message and converts into a `SendError`.
+ pub fn into_send_error(self) -> SendError {
+ self.err
+ }
+}
+
+impl fmt::Debug for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("TryRecvError").finish()
+ }
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "receiver channel is empty")
+ }
+}
+
+impl std::error::Error for TryRecvError {}
+
+struct UnboundedInner<T> {
+ // Internal channel state. Consists of the number of messages stored in the
+ // channel as well as a flag signalling that the channel is closed.
+ state: AtomicUsize,
+
+ // Atomic, FIFO queue used to send messages to the receiver
+ message_queue: Queue<T>,
+
+ // Number of senders in existence
+ num_senders: AtomicUsize,
+
+ // Handle to the receiver's task.
+ recv_task: AtomicWaker,
+}
+
+struct BoundedInner<T> {
+ // Max buffer size of the channel. If `None` then the channel is unbounded.
+ buffer: usize,
+
+ // Internal channel state. Consists of the number of messages stored in the
+ // channel as well as a flag signalling that the channel is closed.
+ state: AtomicUsize,
+
+ // Atomic, FIFO queue used to send messages to the receiver
+ message_queue: Queue<T>,
+
+ // Atomic, FIFO queue used to send parked task handles to the receiver.
+ parked_queue: Queue<Arc<Mutex<SenderTask>>>,
+
+ // Number of senders in existence
+ num_senders: AtomicUsize,
+
+ // Handle to the receiver's task.
+ recv_task: AtomicWaker,
+}
+
+// Struct representation of `Inner::state`.
+#[derive(Clone, Copy)]
+struct State {
+ // `true` when the channel is open
+ is_open: bool,
+
+ // Number of messages in the channel
+ num_messages: usize,
+}
+
+// The `is_open` flag is stored in the left-most bit of `Inner::state`
+const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
+
+// When a new channel is created, it is created in the open state with no
+// pending messages.
+const INIT_STATE: usize = OPEN_MASK;
+
+// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
+const MAX_CAPACITY: usize = !(OPEN_MASK);
+
+// The maximum requested buffer size must be less than the maximum capacity of
+// a channel. This is because each sender gets a guaranteed slot.
+const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
+
+// Sent to the consumer to wake up blocked producers
+struct SenderTask {
+ task: Option<Waker>,
+ is_parked: bool,
+}
+
+impl SenderTask {
+ fn new() -> Self {
+ Self { task: None, is_parked: false }
+ }
+
+ fn notify(&mut self) {
+ self.is_parked = false;
+
+ if let Some(task) = self.task.take() {
+ task.wake();
+ }
+ }
+}
+
+/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
+///
+/// Being bounded, this channel provides backpressure to ensure that the sender
+/// outpaces the receiver by only a limited amount. The channel's capacity is
+/// equal to `buffer + num-senders`. In other words, each sender gets a
+/// guaranteed slot in the channel capacity, and on top of that there are
+/// `buffer` "first come, first serve" slots available to all senders.
+///
+/// The [`Receiver`](Receiver) returned implements the
+/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
+/// `Sink`.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ // Check that the requested buffer size does not exceed the maximum buffer
+ // size permitted by the system.
+ assert!(buffer < MAX_BUFFER, "requested buffer size too large");
+
+ let inner = Arc::new(BoundedInner {
+ buffer,
+ state: AtomicUsize::new(INIT_STATE),
+ message_queue: Queue::new(),
+ parked_queue: Queue::new(),
+ num_senders: AtomicUsize::new(1),
+ recv_task: AtomicWaker::new(),
+ });
+
+ let tx = BoundedSenderInner {
+ inner: inner.clone(),
+ sender_task: Arc::new(Mutex::new(SenderTask::new())),
+ maybe_parked: false,
+ };
+
+ let rx = Receiver { inner: Some(inner) };
+
+ (Sender(Some(tx)), rx)
+}
+
+/// Creates an unbounded mpsc channel for communicating between asynchronous
+/// tasks.
+///
+/// A `send` on this channel will always succeed as long as the receive half has
+/// not been closed. If the receiver falls behind, messages will be arbitrarily
+/// buffered.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let inner = Arc::new(UnboundedInner {
+ state: AtomicUsize::new(INIT_STATE),
+ message_queue: Queue::new(),
+ num_senders: AtomicUsize::new(1),
+ recv_task: AtomicWaker::new(),
+ });
+
+ let tx = UnboundedSenderInner { inner: inner.clone() };
+
+ let rx = UnboundedReceiver { inner: Some(inner) };
+
+ (UnboundedSender(Some(tx)), rx)
+}
+
+/*
+ *
+ * ===== impl Sender =====
+ *
+ */
+
+impl<T> UnboundedSenderInner<T> {
+ fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
+ let state = decode_state(self.inner.state.load(SeqCst));
+ if state.is_open {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
+ }
+ }
+
+ // Push message to the queue and signal to the receiver
+ fn queue_push_and_signal(&self, msg: T) {
+ // Push the message onto the message queue
+ self.inner.message_queue.push(msg);
+
+ // Signal to the receiver that a message has been enqueued. If the
+ // receiver is parked, this will unpark the task.
+ self.inner.recv_task.wake();
+ }
+
+ // Increment the number of queued messages. Returns the resulting number.
+ fn inc_num_messages(&self) -> Option<usize> {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ // The receiver end closed the channel.
+ if !state.is_open {
+ return None;
+ }
+
+ // This probably is never hit? Odds are the process will run out of
+ // memory first. It may be worth to return something else in this
+ // case?
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
+
+ state.num_messages += 1;
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => return Some(state.num_messages),
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+
+ /// Returns whether the senders send to the same receiver.
+ fn same_receiver(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.inner, &other.inner)
+ }
+
+ /// Returns whether the sender send to this receiver.
+ fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
+ Arc::ptr_eq(&self.inner, inner)
+ }
+
+ /// Returns pointer to the Arc containing sender
+ ///
+ /// The returned pointer is not referenced and should be only used for hashing!
+ fn ptr(&self) -> *const UnboundedInner<T> {
+ &*self.inner
+ }
+
+ /// Returns whether this channel is closed without needing a context.
+ fn is_closed(&self) -> bool {
+ !decode_state(self.inner.state.load(SeqCst)).is_open
+ }
+
+ /// Closes this channel from the sender side, preventing any new messages.
+ fn close_channel(&self) {
+ // There's no need to park this sender, its dropping,
+ // and we don't want to check for capacity, so skip
+ // that stuff from `do_send`.
+
+ self.inner.set_closed();
+ self.inner.recv_task.wake();
+ }
+}
+
+impl<T> BoundedSenderInner<T> {
+ /// Attempts to send a message on this `Sender`, returning the message
+ /// if there was an error.
+ fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ // If the sender is currently blocked, reject the message
+ if !self.poll_unparked(None).is_ready() {
+ return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
+ }
+
+ // The channel has capacity to accept the message, so send it
+ self.do_send_b(msg)
+ }
+
+ // Do the send without failing.
+ // Can be called only by bounded sender.
+ fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ // Anyone calling do_send *should* make sure there is room first,
+ // but assert here for tests as a sanity check.
+ debug_assert!(self.poll_unparked(None).is_ready());
+
+ // First, increment the number of messages contained by the channel.
+ // This operation will also atomically determine if the sender task
+ // should be parked.
+ //
+ // `None` is returned in the case that the channel has been closed by the
+ // receiver. This happens when `Receiver::close` is called or the
+ // receiver is dropped.
+ let park_self = match self.inc_num_messages() {
+ Some(num_messages) => {
+ // Block if the current number of pending messages has exceeded
+ // the configured buffer size
+ num_messages > self.inner.buffer
+ }
+ None => {
+ return Err(TrySendError {
+ err: SendError { kind: SendErrorKind::Disconnected },
+ val: msg,
+ })
+ }
+ };
+
+ // If the channel has reached capacity, then the sender task needs to
+ // be parked. This will send the task handle on the parked task queue.
+ //
+ // However, when `do_send` is called while dropping the `Sender`,
+ // `task::current()` can't be called safely. In this case, in order to
+ // maintain internal consistency, a blank message is pushed onto the
+ // parked task queue.
+ if park_self {
+ self.park();
+ }
+
+ self.queue_push_and_signal(msg);
+
+ Ok(())
+ }
+
+ // Push message to the queue and signal to the receiver
+ fn queue_push_and_signal(&self, msg: T) {
+ // Push the message onto the message queue
+ self.inner.message_queue.push(msg);
+
+ // Signal to the receiver that a message has been enqueued. If the
+ // receiver is parked, this will unpark the task.
+ self.inner.recv_task.wake();
+ }
+
+ // Increment the number of queued messages. Returns the resulting number.
+ fn inc_num_messages(&self) -> Option<usize> {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ // The receiver end closed the channel.
+ if !state.is_open {
+ return None;
+ }
+
+ // This probably is never hit? Odds are the process will run out of
+ // memory first. It may be worth to return something else in this
+ // case?
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
+
+ state.num_messages += 1;
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => return Some(state.num_messages),
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+
+ fn park(&mut self) {
+ {
+ let mut sender = self.sender_task.lock().unwrap();
+ sender.task = None;
+ sender.is_parked = true;
+ }
+
+ // Send handle over queue
+ let t = self.sender_task.clone();
+ self.inner.parked_queue.push(t);
+
+ // Check to make sure we weren't closed after we sent our task on the
+ // queue
+ let state = decode_state(self.inner.state.load(SeqCst));
+ self.maybe_parked = state.is_open;
+ }
+
+ /// Polls the channel to determine if there is guaranteed capacity to send
+ /// at least one item without waiting.
+ ///
+ /// # Return value
+ ///
+ /// This method returns:
+ ///
+ /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
+ /// - `Poll::Pending` if the channel may not have
+ /// capacity, in which case the current task is queued to be notified once
+ /// capacity is available;
+ /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let state = decode_state(self.inner.state.load(SeqCst));
+ if !state.is_open {
+ return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
+ }
+
+ self.poll_unparked(Some(cx)).map(Ok)
+ }
+
+ /// Returns whether the senders send to the same receiver.
+ fn same_receiver(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.inner, &other.inner)
+ }
+
+ /// Returns whether the sender send to this receiver.
+ fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
+ Arc::ptr_eq(&self.inner, receiver)
+ }
+
+ /// Returns pointer to the Arc containing sender
+ ///
+ /// The returned pointer is not referenced and should be only used for hashing!
+ fn ptr(&self) -> *const BoundedInner<T> {
+ &*self.inner
+ }
+
+ /// Returns whether this channel is closed without needing a context.
+ fn is_closed(&self) -> bool {
+ !decode_state(self.inner.state.load(SeqCst)).is_open
+ }
+
+ /// Closes this channel from the sender side, preventing any new messages.
+ fn close_channel(&self) {
+ // There's no need to park this sender, its dropping,
+ // and we don't want to check for capacity, so skip
+ // that stuff from `do_send`.
+
+ self.inner.set_closed();
+ self.inner.recv_task.wake();
+ }
+
+ fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
+ // First check the `maybe_parked` variable. This avoids acquiring the
+ // lock in most cases
+ if self.maybe_parked {
+ // Get a lock on the task handle
+ let mut task = self.sender_task.lock().unwrap();
+
+ if !task.is_parked {
+ self.maybe_parked = false;
+ return Poll::Ready(());
+ }
+
+ // At this point, an unpark request is pending, so there will be an
+ // unpark sometime in the future. We just need to make sure that
+ // the correct task will be notified.
+ //
+ // Update the task in case the `Sender` has been moved to another
+ // task
+ task.task = cx.map(|cx| cx.waker().clone());
+
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ }
+}
+
+impl<T> Sender<T> {
+ /// Attempts to send a message on this `Sender`, returning the message
+ /// if there was an error.
+ pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ if let Some(inner) = &mut self.0 {
+ inner.try_send(msg)
+ } else {
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
+ }
+ }
+
+ /// Send a message on the channel.
+ ///
+ /// This function should only be called after
+ /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
+ /// ready to receive a message.
+ pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
+ self.try_send(msg).map_err(|e| e.err)
+ }
+
+ /// Polls the channel to determine if there is guaranteed capacity to send
+ /// at least one item without waiting.
+ ///
+ /// # Return value
+ ///
+ /// This method returns:
+ ///
+ /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
+ /// - `Poll::Pending` if the channel may not have
+ /// capacity, in which case the current task is queued to be notified once
+ /// capacity is available;
+ /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
+ inner.poll_ready(cx)
+ }
+
+ /// Returns whether this channel is closed without needing a context.
+ pub fn is_closed(&self) -> bool {
+ self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
+ }
+
+ /// Closes this channel from the sender side, preventing any new messages.
+ pub fn close_channel(&mut self) {
+ if let Some(inner) = &mut self.0 {
+ inner.close_channel();
+ }
+ }
+
+ /// Disconnects this sender from the channel, closing it if there are no more senders left.
+ pub fn disconnect(&mut self) {
+ self.0 = None;
+ }
+
+ /// Returns whether the senders send to the same receiver.
+ pub fn same_receiver(&self, other: &Self) -> bool {
+ match (&self.0, &other.0) {
+ (Some(inner), Some(other)) => inner.same_receiver(other),
+ _ => false,
+ }
+ }
+
+ /// Returns whether the sender send to this receiver.
+ pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
+ match (&self.0, &receiver.inner) {
+ (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
+ _ => false,
+ }
+ }
+
+ /// Hashes the receiver into the provided hasher
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
+ use std::hash::Hash;
+
+ let ptr = self.0.as_ref().map(|inner| inner.ptr());
+ ptr.hash(hasher);
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ /// Check if the channel is ready to receive a message.
+ pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
+ inner.poll_ready_nb()
+ }
+
+ /// Returns whether this channel is closed without needing a context.
+ pub fn is_closed(&self) -> bool {
+ self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
+ }
+
+ /// Closes this channel from the sender side, preventing any new messages.
+ pub fn close_channel(&self) {
+ if let Some(inner) = &self.0 {
+ inner.close_channel();
+ }
+ }
+
+ /// Disconnects this sender from the channel, closing it if there are no more senders left.
+ pub fn disconnect(&mut self) {
+ self.0 = None;
+ }
+
+ // Do the send without parking current task.
+ fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
+ if let Some(inner) = &self.0 {
+ if inner.inc_num_messages().is_some() {
+ inner.queue_push_and_signal(msg);
+ return Ok(());
+ }
+ }
+
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
+ }
+
+ /// Send a message on the channel.
+ ///
+ /// This method should only be called after `poll_ready` has been used to
+ /// verify that the channel is ready to receive a message.
+ pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
+ self.do_send_nb(msg).map_err(|e| e.err)
+ }
+
+ /// Sends a message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.do_send_nb(msg)
+ }
+
+ /// Returns whether the senders send to the same receiver.
+ pub fn same_receiver(&self, other: &Self) -> bool {
+ match (&self.0, &other.0) {
+ (Some(inner), Some(other)) => inner.same_receiver(other),
+ _ => false,
+ }
+ }
+
+ /// Returns whether the sender send to this receiver.
+ pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
+ match (&self.0, &receiver.inner) {
+ (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
+ _ => false,
+ }
+ }
+
+ /// Hashes the receiver into the provided hasher
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
+ use std::hash::Hash;
+
+ let ptr = self.0.as_ref().map(|inner| inner.ptr());
+ ptr.hash(hasher);
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<T> Clone for UnboundedSenderInner<T> {
+ fn clone(&self) -> Self {
+ // Since this atomic op isn't actually guarding any memory and we don't
+ // care about any orderings besides the ordering on the single atomic
+ // variable, a relaxed ordering is acceptable.
+ let mut curr = self.inner.num_senders.load(SeqCst);
+
+ loop {
+ // If the maximum number of senders has been reached, then fail
+ if curr == MAX_BUFFER {
+ panic!("cannot clone `Sender` -- too many outstanding senders");
+ }
+
+ debug_assert!(curr < MAX_BUFFER);
+
+ let next = curr + 1;
+ match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => {
+ // The ABA problem doesn't matter here. We only care that the
+ // number of senders never exceeds the maximum.
+ return Self { inner: self.inner.clone() };
+ }
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+}
+
+impl<T> Clone for BoundedSenderInner<T> {
+ fn clone(&self) -> Self {
+ // Since this atomic op isn't actually guarding any memory and we don't
+ // care about any orderings besides the ordering on the single atomic
+ // variable, a relaxed ordering is acceptable.
+ let mut curr = self.inner.num_senders.load(SeqCst);
+
+ loop {
+ // If the maximum number of senders has been reached, then fail
+ if curr == self.inner.max_senders() {
+ panic!("cannot clone `Sender` -- too many outstanding senders");
+ }
+
+ debug_assert!(curr < self.inner.max_senders());
+
+ let next = curr + 1;
+ match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => {
+ // The ABA problem doesn't matter here. We only care that the
+ // number of senders never exceeds the maximum.
+ return Self {
+ inner: self.inner.clone(),
+ sender_task: Arc::new(Mutex::new(SenderTask::new())),
+ maybe_parked: false,
+ };
+ }
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+}
+
+impl<T> Drop for UnboundedSenderInner<T> {
+ fn drop(&mut self) {
+ // Ordering between variables don't matter here
+ let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
+
+ if prev == 1 {
+ self.close_channel();
+ }
+ }
+}
+
+impl<T> Drop for BoundedSenderInner<T> {
+ fn drop(&mut self) {
+ // Ordering between variables don't matter here
+ let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
+
+ if prev == 1 {
+ self.close_channel();
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
+ }
+}
+
+impl<T> fmt::Debug for UnboundedSender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
+ }
+}
+
+/*
+ *
+ * ===== impl Receiver =====
+ *
+ */
+
+impl<T> Receiver<T> {
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ if let Some(inner) = &mut self.inner {
+ inner.set_closed();
+
+ // Wake up any threads waiting as they'll see that we've closed the
+ // channel and will continue on their merry way.
+ while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
+ task.lock().unwrap().notify();
+ }
+ }
+ }
+
+ /// Tries to receive the next message without notifying a context if empty.
+ ///
+ /// It is not recommended to call this function from inside of a future,
+ /// only when you've otherwise arranged to be notified when the channel is
+ /// no longer empty.
+ ///
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
+ pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
+ match self.next_message() {
+ Poll::Ready(msg) => Ok(msg),
+ Poll::Pending => Err(TryRecvError { _priv: () }),
+ }
+ }
+
+ fn next_message(&mut self) -> Poll<Option<T>> {
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
+ // Pop off a message
+ match unsafe { inner.message_queue.pop_spin() } {
+ Some(msg) => {
+ // If there are any parked task handles in the parked queue,
+ // pop one and unpark it.
+ self.unpark_one();
+
+ // Decrement number of messages
+ self.dec_num_messages();
+
+ Poll::Ready(Some(msg))
+ }
+ None => {
+ let state = decode_state(inner.state.load(SeqCst));
+ if state.is_closed() {
+ // If closed flag is set AND there are no pending messages
+ // it means end of stream
+ self.inner = None;
+ Poll::Ready(None)
+ } else {
+ // If queue is open, we need to return Pending
+ // to be woken up when new messages arrive.
+ // If queue is closed but num_messages is non-zero,
+ // it means that senders updated the state,
+ // but didn't put message to queue yet,
+ // so we need to park until sender unparks the task
+ // after queueing the message.
+ Poll::Pending
+ }
+ }
+ }
+ }
+
+ // Unpark a single task handle if there is one pending in the parked queue
+ fn unpark_one(&mut self) {
+ if let Some(inner) = &mut self.inner {
+ if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
+ task.lock().unwrap().notify();
+ }
+ }
+ }
+
+ fn dec_num_messages(&self) {
+ if let Some(inner) = &self.inner {
+ // OPEN_MASK is highest bit, so it's unaffected by subtraction
+ // unless there's underflow, and we know there's no underflow
+ // because number of messages at this point is always > 0.
+ inner.state.fetch_sub(1, SeqCst);
+ }
+ }
+}
+
+// The receiver does not ever take a Pin to the inner T
+impl<T> Unpin for Receiver<T> {}
+
+impl<T> FusedStream for Receiver<T> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<T> Stream for Receiver<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ // Try to read a message off of the message queue.
+ match self.next_message() {
+ Poll::Ready(msg) => {
+ if msg.is_none() {
+ self.inner = None;
+ }
+ Poll::Ready(msg)
+ }
+ Poll::Pending => {
+ // There are no messages to read, in this case, park.
+ self.inner.as_ref().unwrap().recv_task.register(cx.waker());
+ // Check queue again after parking to prevent race condition:
+ // a message could be added to the queue after previous `next_message`
+ // before `register` call.
+ self.next_message()
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if let Some(inner) = &self.inner {
+ decode_state(inner.state.load(SeqCst)).size_hint()
+ } else {
+ (0, Some(0))
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // Drain the channel of all pending messages
+ self.close();
+ if self.inner.is_some() {
+ loop {
+ match self.next_message() {
+ Poll::Ready(Some(_)) => {}
+ Poll::Ready(None) => break,
+ Poll::Pending => {
+ let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ break;
+ }
+
+ // TODO: Spinning isn't ideal, it might be worth
+ // investigating using a condvar or some other strategy
+ // here. That said, if this case is hit, then another thread
+ // is about to push the value into the queue and this isn't
+ // the only spinlock in the impl right now.
+ thread::yield_now();
+ }
+ }
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let closed = if let Some(ref inner) = self.inner {
+ decode_state(inner.state.load(SeqCst)).is_closed()
+ } else {
+ false
+ };
+
+ f.debug_struct("Receiver").field("closed", &closed).finish()
+ }
+}
+
+impl<T> UnboundedReceiver<T> {
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ if let Some(inner) = &mut self.inner {
+ inner.set_closed();
+ }
+ }
+
+ /// Tries to receive the next message without notifying a context if empty.
+ ///
+ /// It is not recommended to call this function from inside of a future,
+ /// only when you've otherwise arranged to be notified when the channel is
+ /// no longer empty.
+ ///
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
+ pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
+ match self.next_message() {
+ Poll::Ready(msg) => Ok(msg),
+ Poll::Pending => Err(TryRecvError { _priv: () }),
+ }
+ }
+
+ fn next_message(&mut self) -> Poll<Option<T>> {
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
+ // Pop off a message
+ match unsafe { inner.message_queue.pop_spin() } {
+ Some(msg) => {
+ // Decrement number of messages
+ self.dec_num_messages();
+
+ Poll::Ready(Some(msg))
+ }
+ None => {
+ let state = decode_state(inner.state.load(SeqCst));
+ if state.is_closed() {
+ // If closed flag is set AND there are no pending messages
+ // it means end of stream
+ self.inner = None;
+ Poll::Ready(None)
+ } else {
+ // If queue is open, we need to return Pending
+ // to be woken up when new messages arrive.
+ // If queue is closed but num_messages is non-zero,
+ // it means that senders updated the state,
+ // but didn't put message to queue yet,
+ // so we need to park until sender unparks the task
+ // after queueing the message.
+ Poll::Pending
+ }
+ }
+ }
+ }
+
+ fn dec_num_messages(&self) {
+ if let Some(inner) = &self.inner {
+ // OPEN_MASK is highest bit, so it's unaffected by subtraction
+ // unless there's underflow, and we know there's no underflow
+ // because number of messages at this point is always > 0.
+ inner.state.fetch_sub(1, SeqCst);
+ }
+ }
+}
+
+impl<T> FusedStream for UnboundedReceiver<T> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<T> Stream for UnboundedReceiver<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ // Try to read a message off of the message queue.
+ match self.next_message() {
+ Poll::Ready(msg) => {
+ if msg.is_none() {
+ self.inner = None;
+ }
+ Poll::Ready(msg)
+ }
+ Poll::Pending => {
+ // There are no messages to read, in this case, park.
+ self.inner.as_ref().unwrap().recv_task.register(cx.waker());
+ // Check queue again after parking to prevent race condition:
+ // a message could be added to the queue after previous `next_message`
+ // before `register` call.
+ self.next_message()
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if let Some(inner) = &self.inner {
+ decode_state(inner.state.load(SeqCst)).size_hint()
+ } else {
+ (0, Some(0))
+ }
+ }
+}
+
+impl<T> Drop for UnboundedReceiver<T> {
+ fn drop(&mut self) {
+ // Drain the channel of all pending messages
+ self.close();
+ if self.inner.is_some() {
+ loop {
+ match self.next_message() {
+ Poll::Ready(Some(_)) => {}
+ Poll::Ready(None) => break,
+ Poll::Pending => {
+ let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ break;
+ }
+
+ // TODO: Spinning isn't ideal, it might be worth
+ // investigating using a condvar or some other strategy
+ // here. That said, if this case is hit, then another thread
+ // is about to push the value into the queue and this isn't
+ // the only spinlock in the impl right now.
+ thread::yield_now();
+ }
+ }
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for UnboundedReceiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let closed = if let Some(ref inner) = self.inner {
+ decode_state(inner.state.load(SeqCst)).is_closed()
+ } else {
+ false
+ };
+
+ f.debug_struct("Receiver").field("closed", &closed).finish()
+ }
+}
+
+/*
+ *
+ * ===== impl Inner =====
+ *
+ */
+
+impl<T> UnboundedInner<T> {
+ // Clear `open` flag in the state, keep `num_messages` intact.
+ fn set_closed(&self) {
+ let curr = self.state.load(SeqCst);
+ if !decode_state(curr).is_open {
+ return;
+ }
+
+ self.state.fetch_and(!OPEN_MASK, SeqCst);
+ }
+}
+
+impl<T> BoundedInner<T> {
+ // The return value is such that the total number of messages that can be
+ // enqueued into the channel will never exceed MAX_CAPACITY
+ fn max_senders(&self) -> usize {
+ MAX_CAPACITY - self.buffer
+ }
+
+ // Clear `open` flag in the state, keep `num_messages` intact.
+ fn set_closed(&self) {
+ let curr = self.state.load(SeqCst);
+ if !decode_state(curr).is_open {
+ return;
+ }
+
+ self.state.fetch_and(!OPEN_MASK, SeqCst);
+ }
+}
+
+unsafe impl<T: Send> Send for UnboundedInner<T> {}
+unsafe impl<T: Send> Sync for UnboundedInner<T> {}
+
+unsafe impl<T: Send> Send for BoundedInner<T> {}
+unsafe impl<T: Send> Sync for BoundedInner<T> {}
+
+impl State {
+ fn is_closed(&self) -> bool {
+ !self.is_open && self.num_messages == 0
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.is_open {
+ (self.num_messages, None)
+ } else {
+ (self.num_messages, Some(self.num_messages))
+ }
+ }
+}
+
+/*
+ *
+ * ===== Helpers =====
+ *
+ */
+
+fn decode_state(num: usize) -> State {
+ State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
+}
+
+fn encode_state(state: &State) -> usize {
+ let mut num = state.num_messages;
+
+ if state.is_open {
+ num |= OPEN_MASK;
+ }
+
+ num
+}
diff --git a/third_party/rust/futures-channel/src/mpsc/queue.rs b/third_party/rust/futures-channel/src/mpsc/queue.rs
new file mode 100644
index 0000000000..02ec633fe0
--- /dev/null
+++ b/third_party/rust/futures-channel/src/mpsc/queue.rs
@@ -0,0 +1,174 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue for sending
+//! messages between asynchronous tasks.
+//!
+//! The queue implementation is essentially the same one used for mpsc channels
+//! in the standard library.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+// /queues/non-intrusive-mpsc-node-based-queue
+
+// NOTE: this implementation is lifted from the standard library and only
+// slightly modified
+
+pub(super) use self::PopResult::*;
+
+use std::cell::UnsafeCell;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+use std::thread;
+
+/// A result of the `pop` function.
+pub(super) enum PopResult<T> {
+ /// Some data has been popped
+ Data(T),
+ /// The queue is empty
+ Empty,
+ /// The queue is in an inconsistent state. Popping data should succeed, but
+ /// some pushers have yet to make enough progress in order allow a pop to
+ /// succeed. It is recommended that a pop() occur "in the near future" in
+ /// order to see if the sender has made progress or not
+ Inconsistent,
+}
+
+struct Node<T> {
+ next: AtomicPtr<Self>,
+ value: Option<T>,
+}
+
+/// The multi-producer single-consumer structure. This is not cloneable, but it
+/// may be safely shared so long as it is guaranteed that there is only one
+/// popper at a time (many pushers are allowed).
+pub(super) struct Queue<T> {
+ head: AtomicPtr<Node<T>>,
+ tail: UnsafeCell<*mut Node<T>>,
+}
+
+unsafe impl<T: Send> Send for Queue<T> {}
+unsafe impl<T: Send> Sync for Queue<T> {}
+
+impl<T> Node<T> {
+ unsafe fn new(v: Option<T>) -> *mut Self {
+ Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
+ }
+}
+
+impl<T> Queue<T> {
+ /// Creates a new queue that is safe to share among multiple producers and
+ /// one consumer.
+ pub(super) fn new() -> Self {
+ let stub = unsafe { Node::new(None) };
+ Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
+ }
+
+ /// Pushes a new value onto this queue.
+ pub(super) fn push(&self, t: T) {
+ unsafe {
+ let n = Node::new(Some(t));
+ let prev = self.head.swap(n, Ordering::AcqRel);
+ (*prev).next.store(n, Ordering::Release);
+ }
+ }
+
+ /// Pops some data from this queue.
+ ///
+ /// Note that the current implementation means that this function cannot
+ /// return `Option<T>`. It is possible for this queue to be in an
+ /// inconsistent state where many pushes have succeeded and completely
+ /// finished, but pops cannot return `Some(t)`. This inconsistent state
+ /// happens when a pusher is preempted at an inopportune moment.
+ ///
+ /// This inconsistent state means that this queue does indeed have data, but
+ /// it does not currently have access to it at this time.
+ ///
+ /// This function is unsafe because only one thread can call it at a time.
+ pub(super) unsafe fn pop(&self) -> PopResult<T> {
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+ assert!((*tail).value.is_none());
+ assert!((*next).value.is_some());
+ let ret = (*next).value.take().unwrap();
+ drop(Box::from_raw(tail));
+ return Data(ret);
+ }
+
+ if self.head.load(Ordering::Acquire) == tail {
+ Empty
+ } else {
+ Inconsistent
+ }
+ }
+
+ /// Pop an element similarly to `pop` function, but spin-wait on inconsistent
+ /// queue state instead of returning `Inconsistent`.
+ ///
+ /// This function is unsafe because only one thread can call it at a time.
+ pub(super) unsafe fn pop_spin(&self) -> Option<T> {
+ loop {
+ match self.pop() {
+ Empty => return None,
+ Data(t) => return Some(t),
+ // Inconsistent means that there will be a message to pop
+ // in a short time. This branch can only be reached if
+ // values are being produced from another thread, so there
+ // are a few ways that we can deal with this:
+ //
+ // 1) Spin
+ // 2) thread::yield_now()
+ // 3) task::current().unwrap() & return Pending
+ //
+ // For now, thread::yield_now() is used, but it would
+ // probably be better to spin a few times then yield.
+ Inconsistent => {
+ thread::yield_now();
+ }
+ }
+ }
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let mut cur = *self.tail.get();
+ while !cur.is_null() {
+ let next = (*cur).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(cur));
+ cur = next;
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-channel/src/mpsc/sink_impl.rs b/third_party/rust/futures-channel/src/mpsc/sink_impl.rs
new file mode 100644
index 0000000000..1be20162c2
--- /dev/null
+++ b/third_party/rust/futures-channel/src/mpsc/sink_impl.rs
@@ -0,0 +1,73 @@
+use super::{SendError, Sender, TrySendError, UnboundedSender};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use std::pin::Pin;
+
+impl<T> Sink<T> for Sender<T> {
+ type Error = SendError;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ (*self).poll_ready(cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ (*self).start_send(msg)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match (*self).poll_ready(cx) {
+ Poll::Ready(Err(ref e)) if e.is_disconnected() => {
+ // If the receiver disconnected, we consider the sink to be flushed.
+ Poll::Ready(Ok(()))
+ }
+ x => x,
+ }
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.disconnect();
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> Sink<T> for UnboundedSender<T> {
+ type Error = SendError;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Self::poll_ready(&*self, cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ Self::start_send(&mut *self, msg)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.disconnect();
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> Sink<T> for &UnboundedSender<T> {
+ type Error = SendError;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ UnboundedSender::poll_ready(*self, cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.unbounded_send(msg).map_err(TrySendError::into_send_error)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.close_channel();
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-channel/src/oneshot.rs b/third_party/rust/futures-channel/src/oneshot.rs
new file mode 100644
index 0000000000..70449f43d6
--- /dev/null
+++ b/third_party/rust/futures-channel/src/oneshot.rs
@@ -0,0 +1,488 @@
+//! A channel for sending a single message between asynchronous tasks.
+//!
+//! This is a single-producer, single-consumer channel.
+
+use alloc::sync::Arc;
+use core::fmt;
+use core::pin::Pin;
+use core::sync::atomic::AtomicBool;
+use core::sync::atomic::Ordering::SeqCst;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+
+use crate::lock::Lock;
+
+/// A future for a value that will be provided by another asynchronous task.
+///
+/// This is created by the [`channel`](channel) function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+/// A means of transmitting a single value to another task.
+///
+/// This is created by the [`channel`](channel) function.
+pub struct Sender<T> {
+ inner: Arc<Inner<T>>,
+}
+
+// The channels do not ever project Pin to the inner T
+impl<T> Unpin for Receiver<T> {}
+impl<T> Unpin for Sender<T> {}
+
+/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
+/// the internal synchronization between the two for send/recv operations.
+struct Inner<T> {
+ /// Indicates whether this oneshot is complete yet. This is filled in both
+ /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
+ /// appropriately.
+ ///
+ /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
+ /// unlocked and ready to be inspected.
+ ///
+ /// For `Sender` if this is `true` then the oneshot has gone away and it
+ /// can return ready from `poll_canceled`.
+ complete: AtomicBool,
+
+ /// The actual data being transferred as part of this `Receiver`. This is
+ /// filled in by `Sender::complete` and read by `Receiver::poll`.
+ ///
+ /// Note that this is protected by `Lock`, but it is in theory safe to
+ /// replace with an `UnsafeCell` as it's actually protected by `complete`
+ /// above. I wouldn't recommend doing this, however, unless someone is
+ /// supremely confident in the various atomic orderings here and there.
+ data: Lock<Option<T>>,
+
+ /// Field to store the task which is blocked in `Receiver::poll`.
+ ///
+ /// This is filled in when a oneshot is polled but not ready yet. Note that
+ /// the `Lock` here, unlike in `data` above, is important to resolve races.
+ /// Both the `Receiver` and the `Sender` halves understand that if they
+ /// can't acquire the lock then some important interference is happening.
+ rx_task: Lock<Option<Waker>>,
+
+ /// Like `rx_task` above, except for the task blocked in
+ /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
+ tx_task: Lock<Option<Waker>>,
+}
+
+/// Creates a new one-shot channel for sending a single value across asynchronous tasks.
+///
+/// The channel works for a spsc (single-producer, single-consumer) scheme.
+///
+/// This function is similar to Rust's channel constructor found in the standard
+/// library. Two halves are returned, the first of which is a `Sender` handle,
+/// used to signal the end of a computation and provide its value. The second
+/// half is a `Receiver` which implements the `Future` trait, resolving to the
+/// value that was given to the `Sender` handle.
+///
+/// Each half can be separately owned and sent across tasks.
+///
+/// # Examples
+///
+/// ```
+/// use futures::channel::oneshot;
+/// use std::{thread, time::Duration};
+///
+/// let (sender, receiver) = oneshot::channel::<i32>();
+///
+/// thread::spawn(|| {
+/// println!("THREAD: sleeping zzz...");
+/// thread::sleep(Duration::from_millis(1000));
+/// println!("THREAD: i'm awake! sending.");
+/// sender.send(3).unwrap();
+/// });
+///
+/// println!("MAIN: doing some useful stuff");
+///
+/// futures::executor::block_on(async {
+/// println!("MAIN: waiting for msg...");
+/// println!("MAIN: got: {:?}", receiver.await)
+/// });
+/// ```
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner::new());
+ let receiver = Receiver { inner: inner.clone() };
+ let sender = Sender { inner };
+ (sender, receiver)
+}
+
+impl<T> Inner<T> {
+ fn new() -> Self {
+ Self {
+ complete: AtomicBool::new(false),
+ data: Lock::new(None),
+ rx_task: Lock::new(None),
+ tx_task: Lock::new(None),
+ }
+ }
+
+ fn send(&self, t: T) -> Result<(), T> {
+ if self.complete.load(SeqCst) {
+ return Err(t);
+ }
+
+ // Note that this lock acquisition may fail if the receiver
+ // is closed and sets the `complete` flag to `true`, whereupon
+ // the receiver may call `poll()`.
+ if let Some(mut slot) = self.data.try_lock() {
+ assert!(slot.is_none());
+ *slot = Some(t);
+ drop(slot);
+
+ // If the receiver called `close()` between the check at the
+ // start of the function, and the lock being released, then
+ // the receiver may not be around to receive it, so try to
+ // pull it back out.
+ if self.complete.load(SeqCst) {
+ // If lock acquisition fails, then receiver is actually
+ // receiving it, so we're good.
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(t) = slot.take() {
+ return Err(t);
+ }
+ }
+ }
+ Ok(())
+ } else {
+ // Must have been closed
+ Err(t)
+ }
+ }
+
+ fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
+ // Fast path up first, just read the flag and see if our other half is
+ // gone. This flag is set both in our destructor and the oneshot
+ // destructor, but our destructor hasn't run yet so if it's set then the
+ // oneshot is gone.
+ if self.complete.load(SeqCst) {
+ return Poll::Ready(());
+ }
+
+ // If our other half is not gone then we need to park our current task
+ // and move it into the `tx_task` slot to get notified when it's
+ // actually gone.
+ //
+ // If `try_lock` fails, then the `Receiver` is in the process of using
+ // it, so we can deduce that it's now in the process of going away and
+ // hence we're canceled. If it succeeds then we just store our handle.
+ //
+ // Crucially we then check `complete` *again* before we return.
+ // While we were storing our handle inside `tx_task` the
+ // `Receiver` may have been dropped. The first thing it does is set the
+ // flag, and if it fails to acquire the lock it assumes that we'll see
+ // the flag later on. So... we then try to see the flag later on!
+ let handle = cx.waker().clone();
+ match self.tx_task.try_lock() {
+ Some(mut p) => *p = Some(handle),
+ None => return Poll::Ready(()),
+ }
+ if self.complete.load(SeqCst) {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn is_canceled(&self) -> bool {
+ self.complete.load(SeqCst)
+ }
+
+ fn drop_tx(&self) {
+ // Flag that we're a completed `Sender` and try to wake up a receiver.
+ // Whether or not we actually stored any data will get picked up and
+ // translated to either an item or cancellation.
+ //
+ // Note that if we fail to acquire the `rx_task` lock then that means
+ // we're in one of two situations:
+ //
+ // 1. The receiver is trying to block in `poll`
+ // 2. The receiver is being dropped
+ //
+ // In the first case it'll check the `complete` flag after it's done
+ // blocking to see if it succeeded. In the latter case we don't need to
+ // wake up anyone anyway. So in both cases it's ok to ignore the `None`
+ // case of `try_lock` and bail out.
+ //
+ // The first case crucially depends on `Lock` using `SeqCst` ordering
+ // under the hood. If it instead used `Release` / `Acquire` ordering,
+ // then it would not necessarily synchronize with `inner.complete`
+ // and deadlock might be possible, as was observed in
+ // https://github.com/rust-lang/futures-rs/pull/219.
+ self.complete.store(true, SeqCst);
+
+ if let Some(mut slot) = self.rx_task.try_lock() {
+ if let Some(task) = slot.take() {
+ drop(slot);
+ task.wake();
+ }
+ }
+
+ // If we registered a task for cancel notification drop it to reduce
+ // spurious wakeups
+ if let Some(mut slot) = self.tx_task.try_lock() {
+ drop(slot.take());
+ }
+ }
+
+ fn close_rx(&self) {
+ // Flag our completion and then attempt to wake up the sender if it's
+ // blocked. See comments in `drop` below for more info
+ self.complete.store(true, SeqCst);
+ if let Some(mut handle) = self.tx_task.try_lock() {
+ if let Some(task) = handle.take() {
+ drop(handle);
+ task.wake()
+ }
+ }
+ }
+
+ fn try_recv(&self) -> Result<Option<T>, Canceled> {
+ // If we're complete, either `::close_rx` or `::drop_tx` was called.
+ // We can assume a successful send if data is present.
+ if self.complete.load(SeqCst) {
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(data) = slot.take() {
+ return Ok(Some(data));
+ }
+ }
+ Err(Canceled)
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
+ // Check to see if some data has arrived. If it hasn't then we need to
+ // block our task.
+ //
+ // Note that the acquisition of the `rx_task` lock might fail below, but
+ // the only situation where this can happen is during `Sender::drop`
+ // when we are indeed completed already. If that's happening then we
+ // know we're completed so keep going.
+ let done = if self.complete.load(SeqCst) {
+ true
+ } else {
+ let task = cx.waker().clone();
+ match self.rx_task.try_lock() {
+ Some(mut slot) => {
+ *slot = Some(task);
+ false
+ }
+ None => true,
+ }
+ };
+
+ // If we're `done` via one of the paths above, then look at the data and
+ // figure out what the answer is. If, however, we stored `rx_task`
+ // successfully above we need to check again if we're completed in case
+ // a message was sent while `rx_task` was locked and couldn't notify us
+ // otherwise.
+ //
+ // If we're not done, and we're not complete, though, then we've
+ // successfully blocked our task and we return `Pending`.
+ if done || self.complete.load(SeqCst) {
+ // If taking the lock fails, the sender will realise that the we're
+ // `done` when it checks the `complete` flag on the way out, and
+ // will treat the send as a failure.
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(data) = slot.take() {
+ return Poll::Ready(Ok(data));
+ }
+ }
+ Poll::Ready(Err(Canceled))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn drop_rx(&self) {
+ // Indicate to the `Sender` that we're done, so any future calls to
+ // `poll_canceled` are weeded out.
+ self.complete.store(true, SeqCst);
+
+ // If we've blocked a task then there's no need for it to stick around,
+ // so we need to drop it. If this lock acquisition fails, though, then
+ // it's just because our `Sender` is trying to take the task, so we
+ // let them take care of that.
+ if let Some(mut slot) = self.rx_task.try_lock() {
+ let task = slot.take();
+ drop(slot);
+ drop(task);
+ }
+
+ // Finally, if our `Sender` wants to get notified of us going away, it
+ // would have stored something in `tx_task`. Here we try to peel that
+ // out and unpark it.
+ //
+ // Note that the `try_lock` here may fail, but only if the `Sender` is
+ // in the process of filling in the task. If that happens then we
+ // already flagged `complete` and they'll pick that up above.
+ if let Some(mut handle) = self.tx_task.try_lock() {
+ if let Some(task) = handle.take() {
+ drop(handle);
+ task.wake()
+ }
+ }
+ }
+}
+
+impl<T> Sender<T> {
+ /// Completes this oneshot with a successful result.
+ ///
+ /// This function will consume `self` and indicate to the other end, the
+ /// [`Receiver`](Receiver), that the value provided is the result of the
+ /// computation this represents.
+ ///
+ /// If the value is successfully enqueued for the remote end to receive,
+ /// then `Ok(())` is returned. If the receiving end was dropped before
+ /// this function was called, however, then `Err(t)` is returned.
+ pub fn send(self, t: T) -> Result<(), T> {
+ self.inner.send(t)
+ }
+
+ /// Polls this `Sender` half to detect whether its associated
+ /// [`Receiver`](Receiver) has been dropped.
+ ///
+ /// # Return values
+ ///
+ /// If `Ready(())` is returned then the associated `Receiver` has been
+ /// dropped, which means any work required for sending should be canceled.
+ ///
+ /// If `Pending` is returned then the associated `Receiver` is still
+ /// alive and may be able to receive a message if sent. The current task,
+ /// however, is scheduled to receive a notification if the corresponding
+ /// `Receiver` goes away.
+ pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ self.inner.poll_canceled(cx)
+ }
+
+ /// Creates a future that resolves when this `Sender`'s corresponding
+ /// [`Receiver`](Receiver) half has hung up.
+ ///
+ /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
+ /// to expose a [`Future`](core::future::Future).
+ pub fn cancellation(&mut self) -> Cancellation<'_, T> {
+ Cancellation { inner: self }
+ }
+
+ /// Tests to see whether this `Sender`'s corresponding `Receiver`
+ /// has been dropped.
+ ///
+ /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
+ /// enqueue a task for wakeup upon cancellation, but merely reports the
+ /// current state, which may be subject to concurrent modification.
+ pub fn is_canceled(&self) -> bool {
+ self.inner.is_canceled()
+ }
+
+ /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
+ /// they were created by the same call to `channel`.
+ pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
+ Arc::ptr_eq(&self.inner, &receiver.inner)
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ self.inner.drop_tx()
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
+ }
+}
+
+/// A future that resolves when the receiving end of a channel has hung up.
+///
+/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct Cancellation<'a, T> {
+ inner: &'a mut Sender<T>,
+}
+
+impl<T> Future for Cancellation<'_, T> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ self.inner.poll_canceled(cx)
+ }
+}
+
+/// Error returned from a [`Receiver`](Receiver) when the corresponding
+/// [`Sender`](Sender) is dropped.
+#[derive(Clone, Copy, PartialEq, Eq, Debug)]
+pub struct Canceled;
+
+impl fmt::Display for Canceled {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "oneshot canceled")
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for Canceled {}
+
+impl<T> Receiver<T> {
+ /// Gracefully close this receiver, preventing any subsequent attempts to
+ /// send to it.
+ ///
+ /// Any `send` operation which happens after this method returns is
+ /// guaranteed to fail. After calling this method, you can use
+ /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
+ /// message had previously been sent.
+ pub fn close(&mut self) {
+ self.inner.close_rx()
+ }
+
+ /// Attempts to receive a message outside of the context of a task.
+ ///
+ /// Does not schedule a task wakeup or have any other side effects.
+ ///
+ /// A return value of `None` must be considered immediately stale (out of
+ /// date) unless [`close`](Receiver::close) has been called first.
+ ///
+ /// Returns an error if the sender was dropped.
+ pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
+ self.inner.try_recv()
+ }
+}
+
+impl<T> Future for Receiver<T> {
+ type Output = Result<T, Canceled>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
+ self.inner.recv(cx)
+ }
+}
+
+impl<T> FusedFuture for Receiver<T> {
+ fn is_terminated(&self) -> bool {
+ if self.inner.complete.load(SeqCst) {
+ if let Some(slot) = self.inner.data.try_lock() {
+ if slot.is_some() {
+ return false;
+ }
+ }
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ self.inner.drop_rx()
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
+ }
+}