diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/futures-channel/src/mpsc/mod.rs | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-channel/src/mpsc/mod.rs')
-rw-r--r-- | third_party/rust/futures-channel/src/mpsc/mod.rs | 1289 |
1 files changed, 1289 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/src/mpsc/mod.rs b/third_party/rust/futures-channel/src/mpsc/mod.rs new file mode 100644 index 0000000000..f69f440439 --- /dev/null +++ b/third_party/rust/futures-channel/src/mpsc/mod.rs @@ -0,0 +1,1289 @@ +//! A multi-producer, single-consumer queue for sending values across +//! asynchronous tasks. +//! +//! Similarly to the `std`, channel creation provides [`Receiver`] and +//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to +//! read values out of the channel. If there is no message to read from the +//! channel, the current task will be notified when a new value is sent. +//! [`Sender`] implements the `Sink` trait and allows a task to send messages into +//! the channel. If the channel is at capacity, the send will be rejected and +//! the task will be notified when additional capacity is available. In other +//! words, the channel provides backpressure. +//! +//! Unbounded channels are also available using the `unbounded` constructor. +//! +//! # Disconnection +//! +//! When all [`Sender`] handles have been dropped, it is no longer +//! possible to send values into the channel. This is considered the termination +//! event of the stream. As such, [`Receiver::poll_next`] +//! will return `Ok(Ready(None))`. +//! +//! If the [`Receiver`] handle is dropped, then messages can no longer +//! be read out of the channel. In this case, all further attempts to send will +//! result in an error. +//! +//! # Clean Shutdown +//! +//! If the [`Receiver`] is simply dropped, then it is possible for +//! there to be messages still in the channel that will not be processed. As +//! such, it is usually desirable to perform a "clean" shutdown. To do this, the +//! receiver will first call `close`, which will prevent any further messages to +//! be sent into the channel. Then, the receiver consumes the channel to +//! completion, at which point the receiver can be dropped. +//! +//! [`Sender`]: struct.Sender.html +//! [`Receiver`]: struct.Receiver.html +//! [`Stream`]: ../../futures_core/stream/trait.Stream.html +//! [`Receiver::poll_next`]: +//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next + +// At the core, the channel uses an atomic FIFO queue for message passing. This +// queue is used as the primary coordination primitive. In order to enforce +// capacity limits and handle back pressure, a secondary FIFO queue is used to +// send parked task handles. +// +// The general idea is that the channel is created with a `buffer` size of `n`. +// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" +// slot to hold a message. This allows `Sender` to know for a fact that a send +// will succeed *before* starting to do the actual work of sending the value. +// Since most of this work is lock-free, once the work starts, it is impossible +// to safely revert. +// +// If the sender is unable to process a send operation, then the current +// task is parked and the handle is sent on the parked task queue. +// +// Note that the implementation guarantees that the channel capacity will never +// exceed the configured limit, however there is no *strict* guarantee that the +// receiver will wake up a parked task *immediately* when a slot becomes +// available. However, it will almost always unpark a task when a slot becomes +// available and it is *guaranteed* that a sender will be unparked when the +// message that caused the sender to become parked is read out of the channel. +// +// The steps for sending a message are roughly: +// +// 1) Increment the channel message count +// 2) If the channel is at capacity, push the task handle onto the wait queue +// 3) Push the message onto the message queue. +// +// The steps for receiving a message are roughly: +// +// 1) Pop a message from the message queue +// 2) Pop a task handle from the wait queue +// 3) Decrement the channel message count. +// +// It's important for the order of operations on lock-free structures to happen +// in reverse order between the sender and receiver. This makes the message +// queue the primary coordination structure and establishes the necessary +// happens-before semantics required for the acquire / release semantics used +// by the queue structure. + +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll, Waker}; +use futures_core::task::__internal::AtomicWaker; +use std::fmt; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +use crate::mpsc::queue::Queue; + +mod queue; +#[cfg(feature = "sink")] +mod sink_impl; + +#[derive(Debug)] +struct UnboundedSenderInner<T> { + // Channel state shared between the sender and receiver. + inner: Arc<UnboundedInner<T>>, +} + +#[derive(Debug)] +struct BoundedSenderInner<T> { + // Channel state shared between the sender and receiver. + inner: Arc<BoundedInner<T>>, + + // Handle to the task that is blocked on this sender. This handle is sent + // to the receiver half in order to be notified when the sender becomes + // unblocked. + sender_task: Arc<Mutex<SenderTask>>, + + // True if the sender might be blocked. This is an optimization to avoid + // having to lock the mutex most of the time. + maybe_parked: bool, +} + +// We never project Pin<&mut SenderInner> to `Pin<&mut T>` +impl<T> Unpin for UnboundedSenderInner<T> {} +impl<T> Unpin for BoundedSenderInner<T> {} + +/// The transmission end of a bounded mpsc channel. +/// +/// This value is created by the [`channel`](channel) function. +#[derive(Debug)] +pub struct Sender<T>(Option<BoundedSenderInner<T>>); + +/// The transmission end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender<u32> {} + +/// The receiving end of a bounded mpsc channel. +/// +/// This value is created by the [`channel`](channel) function. +#[derive(Debug)] +pub struct Receiver<T> { + inner: Option<Arc<BoundedInner<T>>>, +} + +/// The receiving end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedReceiver<T> { + inner: Option<Arc<UnboundedInner<T>>>, +} + +// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` +impl<T> Unpin for UnboundedReceiver<T> {} + +/// The error type for [`Sender`s](Sender) used as `Sink`s. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SendError { + kind: SendErrorKind, +} + +/// The error type returned from [`try_send`](Sender::try_send). +#[derive(Clone, PartialEq, Eq)] +pub struct TrySendError<T> { + err: SendError, + val: T, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +enum SendErrorKind { + Full, + Disconnected, +} + +/// The error type returned from [`try_next`](Receiver::try_next). +pub struct TryRecvError { + _priv: (), +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_full() { + write!(f, "send failed because channel is full") + } else { + write!(f, "send failed because receiver is gone") + } + } +} + +impl std::error::Error for SendError {} + +impl SendError { + /// Returns true if this error is a result of the channel being full. + pub fn is_full(&self) -> bool { + match self.kind { + SendErrorKind::Full => true, + _ => false, + } + } + + /// Returns true if this error is a result of the receiver being dropped. + pub fn is_disconnected(&self) -> bool { + match self.kind { + SendErrorKind::Disconnected => true, + _ => false, + } + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendError") + .field("kind", &self.err.kind) + .finish() + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_full() { + write!(f, "send failed because channel is full") + } else { + write!(f, "send failed because receiver is gone") + } + } +} + +impl<T: core::any::Any> std::error::Error for TrySendError<T> {} + +impl<T> TrySendError<T> { + /// Returns true if this error is a result of the channel being full. + pub fn is_full(&self) -> bool { + self.err.is_full() + } + + /// Returns true if this error is a result of the receiver being dropped. + pub fn is_disconnected(&self) -> bool { + self.err.is_disconnected() + } + + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.val + } + + /// Drops the message and converts into a `SendError`. + pub fn into_send_error(self) -> SendError { + self.err + } +} + +impl fmt::Debug for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("TryRecvError") + .finish() + } +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "receiver channel is empty") + } +} + +impl std::error::Error for TryRecvError {} + +#[derive(Debug)] +struct UnboundedInner<T> { + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<T>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: AtomicWaker, +} + +#[derive(Debug)] +struct BoundedInner<T> { + // Max buffer size of the channel. If `None` then the channel is unbounded. + buffer: usize, + + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<T>, + + // Atomic, FIFO queue used to send parked task handles to the receiver. + parked_queue: Queue<Arc<Mutex<SenderTask>>>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: AtomicWaker, +} + +// Struct representation of `Inner::state`. +#[derive(Debug, Clone, Copy)] +struct State { + // `true` when the channel is open + is_open: bool, + + // Number of messages in the channel + num_messages: usize, +} + +// The `is_open` flag is stored in the left-most bit of `Inner::state` +const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1); + +// When a new channel is created, it is created in the open state with no +// pending messages. +const INIT_STATE: usize = OPEN_MASK; + +// The maximum number of messages that a channel can track is `usize::max_value() >> 1` +const MAX_CAPACITY: usize = !(OPEN_MASK); + +// The maximum requested buffer size must be less than the maximum capacity of +// a channel. This is because each sender gets a guaranteed slot. +const MAX_BUFFER: usize = MAX_CAPACITY >> 1; + +// Sent to the consumer to wake up blocked producers +#[derive(Debug)] +struct SenderTask { + task: Option<Waker>, + is_parked: bool, +} + +impl SenderTask { + fn new() -> Self { + SenderTask { + task: None, + is_parked: false, + } + } + + fn notify(&mut self) { + self.is_parked = false; + + if let Some(task) = self.task.take() { + task.wake(); + } + } +} + +/// Creates a bounded mpsc channel for communicating between asynchronous tasks. +/// +/// Being bounded, this channel provides backpressure to ensure that the sender +/// outpaces the receiver by only a limited amount. The channel's capacity is +/// equal to `buffer + num-senders`. In other words, each sender gets a +/// guaranteed slot in the channel capacity, and on top of that there are +/// `buffer` "first come, first serve" slots available to all senders. +/// +/// The [`Receiver`](Receiver) returned implements the +/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements +/// `Sink`. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + // Check that the requested buffer size does not exceed the maximum buffer + // size permitted by the system. + assert!(buffer < MAX_BUFFER, "requested buffer size too large"); + + let inner = Arc::new(BoundedInner { + buffer, + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + parked_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: AtomicWaker::new(), + }); + + let tx = BoundedSenderInner { + inner: inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + + let rx = Receiver { + inner: Some(inner), + }; + + (Sender(Some(tx)), rx) +} + +/// Creates an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + + let inner = Arc::new(UnboundedInner { + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: AtomicWaker::new(), + }); + + let tx = UnboundedSenderInner { + inner: inner.clone(), + }; + + let rx = UnboundedReceiver { + inner: Some(inner), + }; + + (UnboundedSender(Some(tx)), rx) +} + +/* + * + * ===== impl Sender ===== + * + */ + +impl<T> UnboundedSenderInner<T> { + fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if state.is_open { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(SendError { + kind: SendErrorKind::Disconnected, + })) + } + } + + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: T) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.inner.recv_task.wake(); + } + + // Increment the number of queued messages. Returns the resulting number. + fn inc_num_messages(&self) -> Option<usize> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!(state.num_messages < MAX_CAPACITY, "buffer space \ + exhausted; sending this messages would overflow the state"); + + state.num_messages += 1; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + return Some(state.num_messages) + } + Err(actual) => curr = actual, + } + } + } + + /// Returns whether the senders send to the same receiver. + fn same_receiver(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } + + /// Returns pointer to the Arc containing sender + /// + /// The returned pointer is not referenced and should be only used for hashing! + fn ptr(&self) -> *const UnboundedInner<T> { + &*self.inner + } + + /// Returns whether this channel is closed without needing a context. + fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + + /// Closes this channel from the sender side, preventing any new messages. + fn close_channel(&self) { + // There's no need to park this sender, its dropping, + // and we don't want to check for capacity, so skip + // that stuff from `do_send`. + + self.inner.set_closed(); + self.inner.recv_task.wake(); + } +} + +impl<T> BoundedSenderInner<T> { + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // If the sender is currently blocked, reject the message + if !self.poll_unparked(None).is_ready() { + return Err(TrySendError { + err: SendError { + kind: SendErrorKind::Full, + }, + val: msg, + }); + } + + // The channel has capacity to accept the message, so send it + self.do_send_b(msg) + } + + // Do the send without failing. + // Can be called only by bounded sender. + #[allow(clippy::debug_assert_with_mut_call)] + fn do_send_b(&mut self, msg: T) + -> Result<(), TrySendError<T>> + { + // Anyone callig do_send *should* make sure there is room first, + // but assert here for tests as a sanity check. + debug_assert!(self.poll_unparked(None).is_ready()); + + // First, increment the number of messages contained by the channel. + // This operation will also atomically determine if the sender task + // should be parked. + // + // None is returned in the case that the channel has been closed by the + // receiver. This happens when `Receiver::close` is called or the + // receiver is dropped. + let park_self = match self.inc_num_messages() { + Some(num_messages) => { + // Block if the current number of pending messages has exceeded + // the configured buffer size + num_messages > self.inner.buffer + } + None => return Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }), + }; + + // If the channel has reached capacity, then the sender task needs to + // be parked. This will send the task handle on the parked task queue. + // + // However, when `do_send` is called while dropping the `Sender`, + // `task::current()` can't be called safely. In this case, in order to + // maintain internal consistency, a blank message is pushed onto the + // parked task queue. + if park_self { + self.park(); + } + + self.queue_push_and_signal(msg); + + Ok(()) + } + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: T) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.inner.recv_task.wake(); + } + + // Increment the number of queued messages. Returns the resulting number. + fn inc_num_messages(&self) -> Option<usize> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!(state.num_messages < MAX_CAPACITY, "buffer space \ + exhausted; sending this messages would overflow the state"); + + state.num_messages += 1; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + return Some(state.num_messages) + } + Err(actual) => curr = actual, + } + } + } + + fn park(&mut self) { + { + let mut sender = self.sender_task.lock().unwrap(); + sender.task = None; + sender.is_parked = true; + } + + // Send handle over queue + let t = self.sender_task.clone(); + self.inner.parked_queue.push(t); + + // Check to make sure we weren't closed after we sent our task on the + // queue + let state = decode_state(self.inner.state.load(SeqCst)); + self.maybe_parked = state.is_open; + } + + /// Polls the channel to determine if there is guaranteed capacity to send + /// at least one item without waiting. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; + /// - `Poll::Pending` if the channel may not have + /// capacity, in which case the current task is queued to be notified once + /// capacity is available; + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open { + return Poll::Ready(Err(SendError { + kind: SendErrorKind::Disconnected, + })); + } + + self.poll_unparked(Some(cx)).map(Ok) + } + + /// Returns whether the senders send to the same receiver. + fn same_receiver(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } + + /// Returns pointer to the Arc containing sender + /// + /// The returned pointer is not referenced and should be only used for hashing! + fn ptr(&self) -> *const BoundedInner<T> { + &*self.inner + } + + /// Returns whether this channel is closed without needing a context. + fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + + /// Closes this channel from the sender side, preventing any new messages. + fn close_channel(&self) { + // There's no need to park this sender, its dropping, + // and we don't want to check for capacity, so skip + // that stuff from `do_send`. + + self.inner.set_closed(); + self.inner.recv_task.wake(); + } + + fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { + // First check the `maybe_parked` variable. This avoids acquiring the + // lock in most cases + if self.maybe_parked { + // Get a lock on the task handle + let mut task = self.sender_task.lock().unwrap(); + + if !task.is_parked { + self.maybe_parked = false; + return Poll::Ready(()) + } + + // At this point, an unpark request is pending, so there will be an + // unpark sometime in the future. We just need to make sure that + // the correct task will be notified. + // + // Update the task in case the `Sender` has been moved to another + // task + task.task = cx.map(|cx| cx.waker().clone()); + + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +impl<T> Sender<T> { + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + if let Some(inner) = &mut self.0 { + inner.try_send(msg) + } else { + Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }) + } + } + + /// Send a message on the channel. + /// + /// This function should only be called after + /// [`poll_ready`](Sender::poll_ready) has reported that the channel is + /// ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + self.try_send(msg) + .map_err(|e| e.err) + } + + /// Polls the channel to determine if there is guaranteed capacity to send + /// at least one item without waiting. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; + /// - `Poll::Pending` if the channel may not have + /// capacity, in which case the current task is queued to be notified once + /// capacity is available; + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { + kind: SendErrorKind::Disconnected, + })?; + inner.poll_ready(cx) + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&mut self) { + if let Some(inner) = &mut self.0 { + inner.close_channel(); + } + } + + /// Disconnects this sender from the channel, closing it if there are no more senders left. + pub fn disconnect(&mut self) { + self.0 = None; + } + + /// Returns whether the senders send to the same receiver. + pub fn same_receiver(&self, other: &Self) -> bool { + match (&self.0, &other.0) { + (Some(inner), Some(other)) => inner.same_receiver(other), + _ => false, + } + } + + /// Hashes the receiver into the provided hasher + pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + use std::hash::Hash; + + let ptr = self.0.as_ref().map(|inner| inner.ptr()); + ptr.hash(hasher); + } +} + +impl<T> UnboundedSender<T> { + /// Check if the channel is ready to receive a message. + pub fn poll_ready( + &self, + _: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { + kind: SendErrorKind::Disconnected, + })?; + inner.poll_ready_nb() + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&self) { + if let Some(inner) = &self.0 { + inner.close_channel(); + } + } + + /// Disconnects this sender from the channel, closing it if there are no more senders left. + pub fn disconnect(&mut self) { + self.0 = None; + } + + // Do the send without parking current task. + fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { + if let Some(inner) = &self.0 { + if inner.inc_num_messages().is_some() { + inner.queue_push_and_signal(msg); + return Ok(()); + } + } + + Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }) + } + + /// Send a message on the channel. + /// + /// This method should only be called after `poll_ready` has been used to + /// verify that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + self.do_send_nb(msg) + .map_err(|e| e.err) + } + + /// Sends a message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { + self.do_send_nb(msg) + } + + /// Returns whether the senders send to the same receiver. + pub fn same_receiver(&self, other: &Self) -> bool { + match (&self.0, &other.0) { + (Some(inner), Some(other)) => inner.same_receiver(other), + _ => false, + } + } + + /// Hashes the receiver into the provided hasher + pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + use std::hash::Hash; + + let ptr = self.0.as_ref().map(|inner| inner.ptr()); + ptr.hash(hasher); + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender(self.0.clone()) + } +} + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> UnboundedSender<T> { + UnboundedSender(self.0.clone()) + } +} + +impl<T> Clone for UnboundedSenderInner<T> { + fn clone(&self) -> UnboundedSenderInner<T> { + // Since this atomic op isn't actually guarding any memory and we don't + // care about any orderings besides the ordering on the single atomic + // variable, a relaxed ordering is acceptable. + let mut curr = self.inner.num_senders.load(SeqCst); + + loop { + // If the maximum number of senders has been reached, then fail + if curr == MAX_BUFFER { + panic!("cannot clone `Sender` -- too many outstanding senders"); + } + + debug_assert!(curr < MAX_BUFFER); + + let next = curr + 1; + let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst); + + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + if actual == curr { + return UnboundedSenderInner { + inner: self.inner.clone(), + }; + } + + curr = actual; + } + } +} + +impl<T> Clone for BoundedSenderInner<T> { + fn clone(&self) -> BoundedSenderInner<T> { + // Since this atomic op isn't actually guarding any memory and we don't + // care about any orderings besides the ordering on the single atomic + // variable, a relaxed ordering is acceptable. + let mut curr = self.inner.num_senders.load(SeqCst); + + loop { + // If the maximum number of senders has been reached, then fail + if curr == self.inner.max_senders() { + panic!("cannot clone `Sender` -- too many outstanding senders"); + } + + debug_assert!(curr < self.inner.max_senders()); + + let next = curr + 1; + let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst); + + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + if actual == curr { + return BoundedSenderInner { + inner: self.inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + } + + curr = actual; + } + } +} + +impl<T> Drop for UnboundedSenderInner<T> { + fn drop(&mut self) { + // Ordering between variables don't matter here + let prev = self.inner.num_senders.fetch_sub(1, SeqCst); + + if prev == 1 { + self.close_channel(); + } + } +} + +impl<T> Drop for BoundedSenderInner<T> { + fn drop(&mut self) { + // Ordering between variables don't matter here + let prev = self.inner.num_senders.fetch_sub(1, SeqCst); + + if prev == 1 { + self.close_channel(); + } + } +} + +/* + * + * ===== impl Receiver ===== + * + */ + +impl<T> Receiver<T> { + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + if let Some(inner) = &mut self.inner { + inner.set_closed(); + + // Wake up any threads waiting as they'll see that we've closed the + // channel and will continue on their merry way. + while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { + task.lock().unwrap().notify(); + } + } + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + /// + /// This function will panic if called after `try_next` or `poll_next` has + /// returned None. + pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { + match self.next_message() { + Poll::Ready(msg) => { + Ok(msg) + }, + Poll::Pending => Err(TryRecvError { _priv: () }), + } + } + + fn next_message(&mut self) -> Poll<Option<T>> { + let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + // Pop off a message + match unsafe { inner.message_queue.pop_spin() } { + Some(msg) => { + // If there are any parked task handles in the parked queue, + // pop one and unpark it. + self.unpark_one(); + + // Decrement number of messages + self.dec_num_messages(); + + Poll::Ready(Some(msg)) + } + None => { + let state = decode_state(inner.state.load(SeqCst)); + if state.is_open || state.num_messages != 0 { + // If queue is open, we need to return Pending + // to be woken up when new messages arrive. + // If queue is closed but num_messages is non-zero, + // it means that senders updated the state, + // but didn't put message to queue yet, + // so we need to park until sender unparks the task + // after queueing the message. + Poll::Pending + } else { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } + } + } + } + + // Unpark a single task handle if there is one pending in the parked queue + fn unpark_one(&mut self) { + if let Some(inner) = &mut self.inner { + if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { + task.lock().unwrap().notify(); + } + } + } + + fn dec_num_messages(&self) { + if let Some(inner) = &self.inner { + // OPEN_MASK is highest bit, so it's unaffected by subtraction + // unless there's underflow, and we know there's no underflow + // because number of messages at this point is always > 0. + inner.state.fetch_sub(1, SeqCst); + } + } +} + +// The receiver does not ever take a Pin to the inner T +impl<T> Unpin for Receiver<T> {} + +impl<T> FusedStream for Receiver<T> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + // Try to read a message off of the message queue. + match self.next_message() { + Poll::Ready(msg) => { + if msg.is_none() { + self.inner = None; + } + Poll::Ready(msg) + }, + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.as_ref().unwrap().recv_task.register(cx.waker()); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // Drain the channel of all pending messages + self.close(); + if self.inner.is_some() { + while let Poll::Ready(Some(..)) = self.next_message() { + // ... + } + } + } +} + +impl<T> UnboundedReceiver<T> { + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + if let Some(inner) = &mut self.inner { + inner.set_closed(); + } + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + /// + /// This function will panic if called after `try_next` or `poll_next` has + /// returned None. + pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { + match self.next_message() { + Poll::Ready(msg) => { + Ok(msg) + }, + Poll::Pending => Err(TryRecvError { _priv: () }), + } + } + + fn next_message(&mut self) -> Poll<Option<T>> { + let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + // Pop off a message + match unsafe { inner.message_queue.pop_spin() } { + Some(msg) => { + // Decrement number of messages + self.dec_num_messages(); + + Poll::Ready(Some(msg)) + } + None => { + let state = decode_state(inner.state.load(SeqCst)); + if state.is_open || state.num_messages != 0 { + // If queue is open, we need to return Pending + // to be woken up when new messages arrive. + // If queue is closed but num_messages is non-zero, + // it means that senders updated the state, + // but didn't put message to queue yet, + // so we need to park until sender unparks the task + // after queueing the message. + Poll::Pending + } else { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } + } + } + } + + fn dec_num_messages(&self) { + if let Some(inner) = &self.inner { + // OPEN_MASK is highest bit, so it's unaffected by subtraction + // unless there's underflow, and we know there's no underflow + // because number of messages at this point is always > 0. + inner.state.fetch_sub(1, SeqCst); + } + } +} + +impl<T> FusedStream for UnboundedReceiver<T> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<T> Stream for UnboundedReceiver<T> { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + // Try to read a message off of the message queue. + match self.next_message() { + Poll::Ready(msg) => { + if msg.is_none() { + self.inner = None; + } + Poll::Ready(msg) + }, + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.as_ref().unwrap().recv_task.register(cx.waker()); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } + } + } +} + +impl<T> Drop for UnboundedReceiver<T> { + fn drop(&mut self) { + // Drain the channel of all pending messages + self.close(); + if self.inner.is_some() { + while let Poll::Ready(Some(..)) = self.next_message() { + // ... + } + } + } +} + +/* + * + * ===== impl Inner ===== + * + */ + +impl<T> UnboundedInner<T> { + // Clear `open` flag in the state, keep `num_messages` intact. + fn set_closed(&self) { + let curr = self.state.load(SeqCst); + if !decode_state(curr).is_open { + return; + } + + self.state.fetch_and(!OPEN_MASK, SeqCst); + } +} + +impl<T> BoundedInner<T> { + // The return value is such that the total number of messages that can be + // enqueued into the channel will never exceed MAX_CAPACITY + fn max_senders(&self) -> usize { + MAX_CAPACITY - self.buffer + } + + // Clear `open` flag in the state, keep `num_messages` intact. + fn set_closed(&self) { + let curr = self.state.load(SeqCst); + if !decode_state(curr).is_open { + return; + } + + self.state.fetch_and(!OPEN_MASK, SeqCst); + } +} + +unsafe impl<T: Send> Send for UnboundedInner<T> {} +unsafe impl<T: Send> Sync for UnboundedInner<T> {} + +unsafe impl<T: Send> Send for BoundedInner<T> {} +unsafe impl<T: Send> Sync for BoundedInner<T> {} + +/* + * + * ===== Helpers ===== + * + */ + +fn decode_state(num: usize) -> State { + State { + is_open: num & OPEN_MASK == OPEN_MASK, + num_messages: num & MAX_CAPACITY, + } +} + +fn encode_state(state: &State) -> usize { + let mut num = state.num_messages; + + if state.is_open { + num |= OPEN_MASK; + } + + num +} |