summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r--third_party/rust/tokio/src/sync/mpsc/unbounded.rs176
1 files changed, 176 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/mpsc/unbounded.rs b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs
new file mode 100644
index 0000000000..ba543fe4c8
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/mpsc/unbounded.rs
@@ -0,0 +1,176 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::{SendError, TryRecvError};
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `UnboundedReceiver`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](unbounded_channel) function.
+pub struct UnboundedSender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ UnboundedSender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for UnboundedSender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedSender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `UnboundedSender`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](unbounded_channel) function.
+pub struct UnboundedReceiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for UnboundedReceiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedReceiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Creates an unbounded mpsc channel for communicating between asynchronous
+/// tasks.
+///
+/// A `send` on this channel will always succeed as long as the receive half has
+/// not been closed. If the receiver falls behind, messages will be arbitrarily
+/// buffered.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (tx, rx) = chan::channel(AtomicUsize::new(0));
+
+ let tx = UnboundedSender::new(tx);
+ let rx = UnboundedReceiver::new(rx);
+
+ (tx, rx)
+}
+
+/// No capacity
+type Semaphore = AtomicUsize;
+
+impl<T> UnboundedReceiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
+ UnboundedReceiver { chan }
+ }
+
+ #[doc(hidden)] // TODO: doc
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
+ /// Receives the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tx.send("hello").unwrap();
+ /// tx.send("world").unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use crate::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ /// Attempts to return a pending value on this receiver without blocking.
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with
+ /// a possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with recv, this function has two failure cases instead of
+ /// one (one for disconnection, one for an empty buffer).
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+}
+
+#[cfg(feature = "stream")]
+impl<T> crate::stream::Stream for UnboundedReceiver<T> {
+ type Item = T;
+
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
+ UnboundedSender { chan }
+ }
+
+ /// Attempts to send a message on this `UnboundedSender` without blocking.
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`UnboundedReceiver`] having been dropped,
+ /// the function returns an error. The error includes the value passed to `send`.
+ ///
+ /// [`close`]: UnboundedReceiver::close
+ /// [`UnboundedReceiver`]: UnboundedReceiver
+ pub fn send(&self, message: T) -> Result<(), SendError<T>> {
+ self.chan.send_unbounded(message)?;
+ Ok(())
+ }
+}