diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio-stream/src/wrappers/watch.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-stream/src/wrappers/watch.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/wrappers/watch.rs | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/wrappers/watch.rs b/third_party/rust/tokio-stream/src/wrappers/watch.rs new file mode 100644 index 0000000000..ec8ead06da --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/watch.rs @@ -0,0 +1,132 @@ +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// assert_eq!(rx.next().await, Some("hello")); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct WatchStream<T> { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, +} + +async fn make_future<T: Clone + Send + Sync>( + mut rx: Receiver<T>, +) -> (Result<(), RecvError>, Receiver<T>) { + let result = rx.changed().await; + (result, rx) +} + +impl<T: 'static + Clone + Send + Sync> WatchStream<T> { + /// Create a new `WatchStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } + } + } +} + +impl<T> Unpin for WatchStream<T> {} + +impl<T> fmt::Debug for WatchStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} + +impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} |