use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{SendError, TryRecvError}; use std::fmt; use std::task::{Context, Poll}; /// Send values to the associated `UnboundedReceiver`. /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. pub struct UnboundedSender { chan: chan::Tx, } impl Clone for UnboundedSender { fn clone(&self) -> Self { UnboundedSender { chan: self.chan.clone(), } } } impl fmt::Debug for UnboundedSender { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("UnboundedSender") .field("chan", &self.chan) .finish() } } /// Receive values from the associated `UnboundedSender`. /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, } impl fmt::Debug for UnboundedReceiver { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("UnboundedReceiver") .field("chan", &self.chan) .finish() } } /// Creates an unbounded mpsc channel for communicating between asynchronous /// tasks. /// /// A `send` on this channel will always succeed as long as the receive half has /// not been closed. If the receiver falls behind, messages will be arbitrarily /// buffered. /// /// **Note** that the amount of available system memory is an implicit bound to /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { let (tx, rx) = chan::channel(AtomicUsize::new(0)); let tx = UnboundedSender::new(tx); let rx = UnboundedReceiver::new(rx); (tx, rx) } /// No capacity type Semaphore = AtomicUsize; impl UnboundedReceiver { pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { UnboundedReceiver { chan } } #[doc(hidden)] // TODO: doc pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.chan.recv(cx) } /// Receives the next value for this receiver. /// /// `None` is returned when all `Sender` halves have dropped, indicating /// that no further values can be sent on the channel. /// /// # Examples /// /// ``` /// use tokio::sync::mpsc; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = mpsc::unbounded_channel(); /// /// tokio::spawn(async move { /// tx.send("hello").unwrap(); /// }); /// /// assert_eq!(Some("hello"), rx.recv().await); /// assert_eq!(None, rx.recv().await); /// } /// ``` /// /// Values are buffered: /// /// ``` /// use tokio::sync::mpsc; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = mpsc::unbounded_channel(); /// /// tx.send("hello").unwrap(); /// tx.send("world").unwrap(); /// /// assert_eq!(Some("hello"), rx.recv().await); /// assert_eq!(Some("world"), rx.recv().await); /// } /// ``` pub async fn recv(&mut self) -> Option { use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } /// Attempts to return a pending value on this receiver without blocking. /// /// This method will never block the caller in order to wait for data to /// become available. Instead, this will always return immediately with /// a possible option of pending data on the channel. /// /// This is useful for a flavor of "optimistic check" before deciding to /// block on a receiver. /// /// Compared with recv, this function has two failure cases instead of /// one (one for disconnection, one for an empty buffer). pub fn try_recv(&mut self) -> Result { self.chan.try_recv() } /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. pub fn close(&mut self) { self.chan.close(); } } #[cfg(feature = "stream")] impl crate::stream::Stream for UnboundedReceiver { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) } } impl UnboundedSender { pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { UnboundedSender { chan } } /// Attempts to send a message on this `UnboundedSender` without blocking. /// /// If the receive half of the channel is closed, either due to [`close`] /// being called or the [`UnboundedReceiver`] having been dropped, /// the function returns an error. The error includes the value passed to `send`. /// /// [`close`]: UnboundedReceiver::close /// [`UnboundedReceiver`]: UnboundedReceiver pub fn send(&self, message: T) -> Result<(), SendError> { self.chan.send_unbounded(message)?; Ok(()) } }