diff options
Diffstat (limited to 'third_party/rust/tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r-- | third_party/rust/tokio/src/sync/mpsc/unbounded.rs | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/mpsc/unbounded.rs b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs new file mode 100644 index 0000000000..ba543fe4c8 --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs @@ -0,0 +1,176 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::{SendError, TryRecvError}; + +use std::fmt; +use std::task::{Context, Poll}; + +/// Send values to the associated `UnboundedReceiver`. +/// +/// Instances are created by the +/// [`unbounded_channel`](unbounded_channel) function. +pub struct UnboundedSender<T> { + chan: chan::Tx<T, Semaphore>, +} + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> Self { + UnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for UnboundedSender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedSender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `UnboundedSender`. +/// +/// Instances are created by the +/// [`unbounded_channel`](unbounded_channel) function. +pub struct UnboundedReceiver<T> { + /// The channel receiver + chan: chan::Rx<T, Semaphore>, +} + +impl<T> fmt::Debug for UnboundedReceiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedReceiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Creates an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let (tx, rx) = chan::channel(AtomicUsize::new(0)); + + let tx = UnboundedSender::new(tx); + let rx = UnboundedReceiver::new(rx); + + (tx, rx) +} + +/// No capacity +type Semaphore = AtomicUsize; + +impl<T> UnboundedReceiver<T> { + pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { + UnboundedReceiver { chan } + } + + #[doc(hidden)] // TODO: doc + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + + /// Receives the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tokio::spawn(async move { + /// tx.send("hello").unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.send("hello").unwrap(); + /// tx.send("world").unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option<T> { + use crate::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with + /// a possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with recv, this function has two failure cases instead of + /// one (one for disconnection, one for an empty buffer). + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + self.chan.try_recv() + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +#[cfg(feature = "stream")] +impl<T> crate::stream::Stream for UnboundedReceiver<T> { + type Item = T; + + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) + } +} + +impl<T> UnboundedSender<T> { + pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { + UnboundedSender { chan } + } + + /// Attempts to send a message on this `UnboundedSender` without blocking. + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`UnboundedReceiver`] having been dropped, + /// the function returns an error. The error includes the value passed to `send`. + /// + /// [`close`]: UnboundedReceiver::close + /// [`UnboundedReceiver`]: UnboundedReceiver + pub fn send(&self, message: T) -> Result<(), SendError<T>> { + self.chan.send_unbounded(message)?; + Ok(()) + } +} |