diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/tokio-stream/src/wrappers | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-stream/src/wrappers')
12 files changed, 794 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/wrappers/broadcast.rs b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs new file mode 100644 index 0000000000..711066466a --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs @@ -0,0 +1,79 @@ +use std::pin::Pin; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; + +/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct BroadcastStream<T> { + inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>, +} + +/// An error returned from the inner stream of a [`BroadcastStream`]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BroadcastStreamRecvError { + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), +} + +impl fmt::Display for BroadcastStreamRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), + } + } +} + +impl std::error::Error for BroadcastStreamRecvError {} + +async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) { + let result = rx.recv().await; + (result, rx) +} + +impl<T: 'static + Clone + Send> BroadcastStream<T> { + /// Create a new `BroadcastStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> { + type Item = Result<T, BroadcastStreamRecvError>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(item) => Poll::Ready(Some(Ok(item))), + Err(RecvError::Closed) => Poll::Ready(None), + Err(RecvError::Lagged(n)) => { + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } + } + } +} + +impl<T> fmt::Debug for BroadcastStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BroadcastStream").finish() + } +} + +impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/interval.rs b/third_party/rust/tokio-stream/src/wrappers/interval.rs new file mode 100644 index 0000000000..2bf0194bd0 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/interval.rs @@ -0,0 +1,50 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{Instant, Interval}; + +/// A wrapper around [`Interval`] that implements [`Stream`]. +/// +/// [`Interval`]: struct@tokio::time::Interval +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "time")))] +pub struct IntervalStream { + inner: Interval, +} + +impl IntervalStream { + /// Create a new `IntervalStream`. + pub fn new(interval: Interval) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Interval`. + pub fn into_inner(self) -> Interval { + self.inner + } +} + +impl Stream for IntervalStream { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> { + self.inner.poll_tick(cx).map(Some) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (std::usize::MAX, None) + } +} + +impl AsRef<Interval> for IntervalStream { + fn as_ref(&self) -> &Interval { + &self.inner + } +} + +impl AsMut<Interval> for IntervalStream { + fn as_mut(&mut self) -> &mut Interval { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/lines.rs b/third_party/rust/tokio-stream/src/wrappers/lines.rs new file mode 100644 index 0000000000..ad3c25349f --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/lines.rs @@ -0,0 +1,60 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Lines}; + +pin_project! { + /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. + /// + /// [`tokio::io::Lines`]: struct@tokio::io::Lines + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct LinesStream<R> { + #[pin] + inner: Lines<R>, + } +} + +impl<R> LinesStream<R> { + /// Create a new `LinesStream`. + pub fn new(lines: Lines<R>) -> Self { + Self { inner: lines } + } + + /// Get back the inner `Lines`. + pub fn into_inner(self) -> Lines<R> { + self.inner + } + + /// Obtain a pinned reference to the inner `Lines<R>`. + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546 + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> { + self.project().inner + } +} + +impl<R: AsyncBufRead> Stream for LinesStream<R> { + type Item = io::Result<String>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project() + .inner + .poll_next_line(cx) + .map(Result::transpose) + } +} + +impl<R> AsRef<Lines<R>> for LinesStream<R> { + fn as_ref(&self) -> &Lines<R> { + &self.inner + } +} + +impl<R> AsMut<Lines<R>> for LinesStream<R> { + fn as_mut(&mut self) -> &mut Lines<R> { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs new file mode 100644 index 0000000000..b5362680ee --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -0,0 +1,65 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::Receiver; + +/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct ReceiverStream<T> { + inner: Receiver<T>, +} + +impl<T> ReceiverStream<T> { + /// Create a new `ReceiverStream`. + pub fn new(recv: Receiver<T>) -> Self { + Self { inner: recv } + } + + /// Get back the inner `Receiver`. + pub fn into_inner(self) -> Receiver<T> { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// To guarantee no messages are dropped, after calling `close()`, you must + /// receive all items from the stream until `None` is returned. + /// + /// [`Permit`]: struct@tokio::sync::mpsc::Permit + pub fn close(&mut self) { + self.inner.close() + } +} + +impl<T> Stream for ReceiverStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_recv(cx) + } +} + +impl<T> AsRef<Receiver<T>> for ReceiverStream<T> { + fn as_ref(&self) -> &Receiver<T> { + &self.inner + } +} + +impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { + fn as_mut(&mut self) -> &mut Receiver<T> { + &mut self.inner + } +} + +impl<T> From<Receiver<T>> for ReceiverStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs new file mode 100644 index 0000000000..54597b7f6f --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::UnboundedReceiver; + +/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct UnboundedReceiverStream<T> { + inner: UnboundedReceiver<T>, +} + +impl<T> UnboundedReceiverStream<T> { + /// Create a new `UnboundedReceiverStream`. + pub fn new(recv: UnboundedReceiver<T>) -> Self { + Self { inner: recv } + } + + /// Get back the inner `UnboundedReceiver`. + pub fn into_inner(self) -> UnboundedReceiver<T> { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.inner.close() + } +} + +impl<T> Stream for UnboundedReceiverStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_recv(cx) + } +} + +impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn as_ref(&self) -> &UnboundedReceiver<T> { + &self.inner + } +} + +impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn as_mut(&mut self) -> &mut UnboundedReceiver<T> { + &mut self.inner + } +} + +impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn from(recv: UnboundedReceiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/read_dir.rs b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs new file mode 100644 index 0000000000..b5cf54f79e --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs @@ -0,0 +1,47 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::{DirEntry, ReadDir}; + +/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. +/// +/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "fs")))] +pub struct ReadDirStream { + inner: ReadDir, +} + +impl ReadDirStream { + /// Create a new `ReadDirStream`. + pub fn new(read_dir: ReadDir) -> Self { + Self { inner: read_dir } + } + + /// Get back the inner `ReadDir`. + pub fn into_inner(self) -> ReadDir { + self.inner + } +} + +impl Stream for ReadDirStream { + type Item = io::Result<DirEntry>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_next_entry(cx).map(Result::transpose) + } +} + +impl AsRef<ReadDir> for ReadDirStream { + fn as_ref(&self) -> &ReadDir { + &self.inner + } +} + +impl AsMut<ReadDir> for ReadDirStream { + fn as_mut(&mut self) -> &mut ReadDir { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs new file mode 100644 index 0000000000..2f74e7d152 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs @@ -0,0 +1,46 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::signal::unix::Signal; + +/// A wrapper around [`Signal`] that implements [`Stream`]. +/// +/// [`Signal`]: struct@tokio::signal::unix::Signal +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))] +pub struct SignalStream { + inner: Signal, +} + +impl SignalStream { + /// Create a new `SignalStream`. + pub fn new(interval: Signal) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Signal`. + pub fn into_inner(self) -> Signal { + self.inner + } +} + +impl Stream for SignalStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<Signal> for SignalStream { + fn as_ref(&self) -> &Signal { + &self.inner + } +} + +impl AsMut<Signal> for SignalStream { + fn as_mut(&mut self) -> &mut Signal { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs new file mode 100644 index 0000000000..4631fbad8d --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs @@ -0,0 +1,88 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::signal::windows::{CtrlBreak, CtrlC}; + +/// A wrapper around [`CtrlC`] that implements [`Stream`]. +/// +/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] +pub struct CtrlCStream { + inner: CtrlC, +} + +impl CtrlCStream { + /// Create a new `CtrlCStream`. + pub fn new(interval: CtrlC) -> Self { + Self { inner: interval } + } + + /// Get back the inner `CtrlC`. + pub fn into_inner(self) -> CtrlC { + self.inner + } +} + +impl Stream for CtrlCStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<CtrlC> for CtrlCStream { + fn as_ref(&self) -> &CtrlC { + &self.inner + } +} + +impl AsMut<CtrlC> for CtrlCStream { + fn as_mut(&mut self) -> &mut CtrlC { + &mut self.inner + } +} + +/// A wrapper around [`CtrlBreak`] that implements [`Stream`]. +/// +/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] +pub struct CtrlBreakStream { + inner: CtrlBreak, +} + +impl CtrlBreakStream { + /// Create a new `CtrlBreakStream`. + pub fn new(interval: CtrlBreak) -> Self { + Self { inner: interval } + } + + /// Get back the inner `CtrlBreak`. + pub fn into_inner(self) -> CtrlBreak { + self.inner + } +} + +impl Stream for CtrlBreakStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<CtrlBreak> for CtrlBreakStream { + fn as_ref(&self) -> &CtrlBreak { + &self.inner + } +} + +impl AsMut<CtrlBreak> for CtrlBreakStream { + fn as_mut(&mut self) -> &mut CtrlBreak { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/split.rs b/third_party/rust/tokio-stream/src/wrappers/split.rs new file mode 100644 index 0000000000..5a6bb2d408 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/split.rs @@ -0,0 +1,60 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Split}; + +pin_project! { + /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. + /// + /// [`tokio::io::Split`]: struct@tokio::io::Split + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct SplitStream<R> { + #[pin] + inner: Split<R>, + } +} + +impl<R> SplitStream<R> { + /// Create a new `SplitStream`. + pub fn new(split: Split<R>) -> Self { + Self { inner: split } + } + + /// Get back the inner `Split`. + pub fn into_inner(self) -> Split<R> { + self.inner + } + + /// Obtain a pinned reference to the inner `Split<R>`. + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546 + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> { + self.project().inner + } +} + +impl<R: AsyncBufRead> Stream for SplitStream<R> { + type Item = io::Result<Vec<u8>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project() + .inner + .poll_next_segment(cx) + .map(Result::transpose) + } +} + +impl<R> AsRef<Split<R>> for SplitStream<R> { + fn as_ref(&self) -> &Split<R> { + &self.inner + } +} + +impl<R> AsMut<Split<R>> for SplitStream<R> { + fn as_mut(&mut self) -> &mut Split<R> { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs new file mode 100644 index 0000000000..ce7cb16350 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{TcpListener, TcpStream}; + +/// A wrapper around [`TcpListener`] that implements [`Stream`]. +/// +/// [`TcpListener`]: struct@tokio::net::TcpListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct TcpListenerStream { + inner: TcpListener, +} + +impl TcpListenerStream { + /// Create a new `TcpListenerStream`. + pub fn new(listener: TcpListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `TcpListener`. + pub fn into_inner(self) -> TcpListener { + self.inner + } +} + +impl Stream for TcpListenerStream { + type Item = io::Result<TcpStream>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<io::Result<TcpStream>>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef<TcpListener> for TcpListenerStream { + fn as_ref(&self) -> &TcpListener { + &self.inner + } +} + +impl AsMut<TcpListener> for TcpListenerStream { + fn as_mut(&mut self) -> &mut TcpListener { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs new file mode 100644 index 0000000000..0beba588c2 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{UnixListener, UnixStream}; + +/// A wrapper around [`UnixListener`] that implements [`Stream`]. +/// +/// [`UnixListener`]: struct@tokio::net::UnixListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))] +pub struct UnixListenerStream { + inner: UnixListener, +} + +impl UnixListenerStream { + /// Create a new `UnixListenerStream`. + pub fn new(listener: UnixListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `UnixListener`. + pub fn into_inner(self) -> UnixListener { + self.inner + } +} + +impl Stream for UnixListenerStream { + type Item = io::Result<UnixStream>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<io::Result<UnixStream>>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef<UnixListener> for UnixListenerStream { + fn as_ref(&self) -> &UnixListener { + &self.inner + } +} + +impl AsMut<UnixListener> for UnixListenerStream { + fn as_mut(&mut self) -> &mut UnixListener { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/watch.rs b/third_party/rust/tokio-stream/src/wrappers/watch.rs new file mode 100644 index 0000000000..ec8ead06da --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/watch.rs @@ -0,0 +1,132 @@ +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// assert_eq!(rx.next().await, Some("hello")); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct WatchStream<T> { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, +} + +async fn make_future<T: Clone + Send + Sync>( + mut rx: Receiver<T>, +) -> (Result<(), RecvError>, Receiver<T>) { + let result = rx.changed().await; + (result, rx) +} + +impl<T: 'static + Clone + Send + Sync> WatchStream<T> { + /// Create a new `WatchStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } + } + } +} + +impl<T> Unpin for WatchStream<T> {} + +impl<T> fmt::Debug for WatchStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} + +impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} |