summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-stream/src')
-rw-r--r--third_party/rust/tokio-stream/src/empty.rs50
-rw-r--r--third_party/rust/tokio-stream/src/iter.rs67
-rw-r--r--third_party/rust/tokio-stream/src/lib.rs100
-rw-r--r--third_party/rust/tokio-stream/src/macros.rs68
-rw-r--r--third_party/rust/tokio-stream/src/once.rs52
-rw-r--r--third_party/rust/tokio-stream/src/pending.rs54
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext.rs1084
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/all.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/any.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/chain.rs50
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs86
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/collect.rs229
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/filter.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/filter_map.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/fold.rs57
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/fuse.rs53
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/map.rs51
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/map_while.rs52
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/merge.rs90
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/next.rs44
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/skip.rs63
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/skip_while.rs73
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/take.rs76
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/take_while.rs79
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/then.rs83
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/throttle.rs96
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/timeout.rs107
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/try_next.rs45
-rw-r--r--third_party/rust/tokio-stream/src/stream_map.rs690
-rw-r--r--third_party/rust/tokio-stream/src/wrappers.rs62
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/broadcast.rs79
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/interval.rs50
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/lines.rs60
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs65
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs59
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/read_dir.rs47
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/signal_unix.rs46
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/signal_windows.rs88
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/split.rs60
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs54
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/unix_listener.rs54
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/watch.rs132
42 files changed, 4587 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/empty.rs b/third_party/rust/tokio-stream/src/empty.rs
new file mode 100644
index 0000000000..965dcf5da7
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/empty.rs
@@ -0,0 +1,50 @@
+use crate::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`empty`](fn@empty) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Empty<T>(PhantomData<T>);
+
+impl<T> Unpin for Empty<T> {}
+unsafe impl<T> Send for Empty<T> {}
+unsafe impl<T> Sync for Empty<T> {}
+
+/// Creates a stream that yields nothing.
+///
+/// The returned stream is immediately ready and returns `None`. Use
+/// [`stream::pending()`](super::pending()) to obtain a stream that is never
+/// ready.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut none = stream::empty::<i32>();
+///
+/// assert_eq!(None, none.next().await);
+/// }
+/// ```
+pub const fn empty<T>() -> Empty<T> {
+ Empty(PhantomData)
+}
+
+impl<T> Stream for Empty<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Ready(None)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, Some(0))
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/iter.rs b/third_party/rust/tokio-stream/src/iter.rs
new file mode 100644
index 0000000000..128be616fc
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/iter.rs
@@ -0,0 +1,67 @@
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`iter`](fn@iter) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I> {
+ iter: I,
+ yield_amt: usize,
+}
+
+impl<I> Unpin for Iter<I> {}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```
+/// # async fn dox() {
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// let mut stream = stream::iter(vec![17, 19]);
+///
+/// assert_eq!(stream.next().await, Some(17));
+/// assert_eq!(stream.next().await, Some(19));
+/// assert_eq!(stream.next().await, None);
+/// # }
+/// ```
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+where
+ I: IntoIterator,
+{
+ Iter {
+ iter: i.into_iter(),
+ yield_amt: 0,
+ }
+}
+
+impl<I> Stream for Iter<I>
+where
+ I: Iterator,
+{
+ type Item = I::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
+ // TODO: add coop back
+ if self.yield_amt >= 32 {
+ self.yield_amt = 0;
+
+ cx.waker().wake_by_ref();
+
+ Poll::Pending
+ } else {
+ self.yield_amt += 1;
+
+ Poll::Ready(self.iter.next())
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/lib.rs b/third_party/rust/tokio-stream/src/lib.rs
new file mode 100644
index 0000000000..bbd4cef03e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/lib.rs
@@ -0,0 +1,100 @@
+#![allow(
+ clippy::cognitive_complexity,
+ clippy::large_enum_variant,
+ clippy::needless_doctest_main
+)]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ unreachable_pub
+)]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+#![doc(test(
+ no_crate_inject,
+ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
+))]
+
+//! Stream utilities for Tokio.
+//!
+//! A `Stream` is an asynchronous sequence of values. It can be thought of as
+//! an asynchronous version of the standard library's `Iterator` trait.
+//!
+//! This crate provides helpers to work with them. For examples of usage and a more in-depth
+//! description of streams you can also refer to the [streams
+//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website.
+//!
+//! # Iterating over a Stream
+//!
+//! Due to similarities with the standard library's `Iterator` trait, some new
+//! users may assume that they can use `for in` syntax to iterate over a
+//! `Stream`, but this is unfortunately not possible. Instead, you can use a
+//! `while let` loop as follows:
+//!
+//! ```rust
+//! use tokio_stream::{self as stream, StreamExt};
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let mut stream = stream::iter(vec![0, 1, 2]);
+//!
+//! while let Some(value) = stream.next().await {
+//! println!("Got {}", value);
+//! }
+//! }
+//! ```
+//!
+//! # Returning a Stream from a function
+//!
+//! A common way to stream values from a function is to pass in the sender
+//! half of a channel and use the receiver as the stream. This requires awaiting
+//! both futures to ensure progress is made. Another alternative is the
+//! [async-stream] crate, which contains macros that provide a `yield` keyword
+//! and allow you to return an `impl Stream`.
+//!
+//! [async-stream]: https://docs.rs/async-stream
+//!
+//! # Conversion to and from AsyncRead/AsyncWrite
+//!
+//! It is often desirable to convert a `Stream` into an [`AsyncRead`],
+//! especially when dealing with plaintext formats streamed over the network.
+//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also
+//! another commonly required feature. To enable these conversions,
+//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`]
+//! types when the io feature is enabled.
+//!
+//! [`tokio-util`]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html
+//! [`tokio::io`]: https://docs.rs/tokio/1.0/tokio/io/index.html
+//! [`AsyncRead`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncRead.html
+//! [`AsyncWrite`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncWrite.html
+//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html
+//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html
+
+#[macro_use]
+mod macros;
+
+pub mod wrappers;
+
+mod stream_ext;
+pub use stream_ext::{collect::FromStream, StreamExt};
+cfg_time! {
+ pub use stream_ext::timeout::{Elapsed, Timeout};
+}
+
+mod empty;
+pub use empty::{empty, Empty};
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod once;
+pub use once::{once, Once};
+
+mod pending;
+pub use pending::{pending, Pending};
+
+mod stream_map;
+pub use stream_map::StreamMap;
+
+#[doc(no_inline)]
+pub use futures_core::Stream;
diff --git a/third_party/rust/tokio-stream/src/macros.rs b/third_party/rust/tokio-stream/src/macros.rs
new file mode 100644
index 0000000000..1e3b61bac7
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/macros.rs
@@ -0,0 +1,68 @@
+macro_rules! cfg_fs {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "fs")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_io_util {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "io-util")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_net {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "net")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_time {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_sync {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "sync")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_signal {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "signal")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! ready {
+ ($e:expr $(,)?) => {
+ match $e {
+ std::task::Poll::Ready(t) => t,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }
+ };
+}
diff --git a/third_party/rust/tokio-stream/src/once.rs b/third_party/rust/tokio-stream/src/once.rs
new file mode 100644
index 0000000000..04b4c052b8
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/once.rs
@@ -0,0 +1,52 @@
+use crate::{Iter, Stream};
+
+use core::option;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`once`](fn@once) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Once<T> {
+ iter: Iter<option::IntoIter<T>>,
+}
+
+impl<I> Unpin for Once<I> {}
+
+/// Creates a stream that emits an element exactly once.
+///
+/// The returned stream is immediately ready and emits the provided value once.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// // one is the loneliest number
+/// let mut one = stream::once(1);
+///
+/// assert_eq!(Some(1), one.next().await);
+///
+/// // just one, that's all we get
+/// assert_eq!(None, one.next().await);
+/// }
+/// ```
+pub fn once<T>(value: T) -> Once<T> {
+ Once {
+ iter: crate::iter(Some(value).into_iter()),
+ }
+}
+
+impl<T> Stream for Once<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::new(&mut self.iter).poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/pending.rs b/third_party/rust/tokio-stream/src/pending.rs
new file mode 100644
index 0000000000..b50fd33354
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/pending.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`pending`](fn@pending) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Pending<T>(PhantomData<T>);
+
+impl<T> Unpin for Pending<T> {}
+unsafe impl<T> Send for Pending<T> {}
+unsafe impl<T> Sync for Pending<T> {}
+
+/// Creates a stream that is never ready
+///
+/// The returned stream is never ready. Attempting to call
+/// [`next()`](crate::StreamExt::next) will never complete. Use
+/// [`stream::empty()`](super::empty()) to obtain a stream that is is
+/// immediately empty but returns no values.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```no_run
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut never = stream::pending::<i32>();
+///
+/// // This will never complete
+/// never.next().await;
+///
+/// unreachable!();
+/// }
+/// ```
+pub const fn pending<T>() -> Pending<T> {
+ Pending(PhantomData)
+}
+
+impl<T> Stream for Pending<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, None)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext.rs b/third_party/rust/tokio-stream/src/stream_ext.rs
new file mode 100644
index 0000000000..52d32024ff
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext.rs
@@ -0,0 +1,1084 @@
+use core::future::Future;
+use futures_core::Stream;
+
+mod all;
+use all::AllFuture;
+
+mod any;
+use any::AnyFuture;
+
+mod chain;
+use chain::Chain;
+
+pub(crate) mod collect;
+use collect::{Collect, FromStream};
+
+mod filter;
+use filter::Filter;
+
+mod filter_map;
+use filter_map::FilterMap;
+
+mod fold;
+use fold::FoldFuture;
+
+mod fuse;
+use fuse::Fuse;
+
+mod map;
+use map::Map;
+
+mod map_while;
+use map_while::MapWhile;
+
+mod merge;
+use merge::Merge;
+
+mod next;
+use next::Next;
+
+mod skip;
+use skip::Skip;
+
+mod skip_while;
+use skip_while::SkipWhile;
+
+mod take;
+use take::Take;
+
+mod take_while;
+use take_while::TakeWhile;
+
+mod then;
+use then::Then;
+
+mod try_next;
+use try_next::TryNext;
+
+cfg_time! {
+ pub(crate) mod timeout;
+ use timeout::Timeout;
+ use tokio::time::Duration;
+ mod throttle;
+ use throttle::{throttle, Throttle};
+ mod chunks_timeout;
+ use chunks_timeout::ChunksTimeout;
+}
+
+/// An extension trait for the [`Stream`] trait that provides a variety of
+/// convenient combinator functions.
+///
+/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
+/// in the [futures] crate, however both Tokio and futures provide separate
+/// `StreamExt` utility traits, and some utilities are only available on one of
+/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
+/// trait in the futures crate.
+///
+/// If you need utilities from both `StreamExt` traits, you should prefer to
+/// import one of them, and use the other through the fully qualified call
+/// syntax. For example:
+/// ```
+/// // import one of the traits:
+/// use futures::stream::StreamExt;
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() {
+///
+/// let a = tokio_stream::iter(vec![1, 3, 5]);
+/// let b = tokio_stream::iter(vec![2, 4, 6]);
+///
+/// // use the fully qualified call syntax for the other trait:
+/// let merged = tokio_stream::StreamExt::merge(a, b);
+///
+/// // use normal call notation for futures::stream::StreamExt::collect
+/// let output: Vec<_> = merged.collect().await;
+/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
+/// # }
+/// ```
+///
+/// [`Stream`]: crate::Stream
+/// [futures]: https://docs.rs/futures
+/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
+pub trait StreamExt: Stream {
+ /// Consumes and returns the next value in the stream or `None` if the
+ /// stream is finished.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn next(&mut self) -> Option<Self::Item>;
+ /// ```
+ ///
+ /// Note that because `next` doesn't take ownership over the stream,
+ /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+ /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+ /// be done by boxing the stream using [`Box::pin`] or
+ /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+ /// crate.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=3);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn next(&mut self) -> Next<'_, Self>
+ where
+ Self: Unpin,
+ {
+ Next::new(self)
+ }
+
+ /// Consumes and returns the next item in the stream. If an error is
+ /// encountered before the next item, the error is returned instead.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn try_next(&mut self) -> Result<Option<T>, E>;
+ /// ```
+ ///
+ /// This is similar to the [`next`](StreamExt::next) combinator,
+ /// but returns a [`Result<Option<T>, E>`](Result) rather than
+ /// an [`Option<Result<T, E>>`](Option), making for easy use
+ /// with the [`?`](std::ops::Try) operator.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(stream.try_next().await, Err("nope"));
+ /// # }
+ /// ```
+ fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
+ where
+ Self: Stream<Item = Result<T, E>> + Unpin,
+ {
+ TryNext::new(self)
+ }
+
+ /// Maps this stream's items to a different type, returning a new stream of
+ /// the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available. It is executed inline with calls to
+ /// [`poll_next`](Stream::poll_next).
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let mut stream = stream.map(|x| x + 3);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn map<T, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnMut(Self::Item) -> T,
+ Self: Sized,
+ {
+ Map::new(self, f)
+ }
+
+ /// Map this stream's items to a different type for as long as determined by
+ /// the provided closure. A stream of the target type will be returned,
+ /// which will yield elements until the closure returns `None`.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, until it returns `None`. It is executed inline
+ /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
+ /// the underlying stream will not be polled again.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=10);
+ /// let mut stream = stream.map_while(|x| {
+ /// if x < 4 {
+ /// Some(x + 3)
+ /// } else {
+ /// None
+ /// }
+ /// });
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
+ where
+ F: FnMut(Self::Item) -> Option<T>,
+ Self: Sized,
+ {
+ MapWhile::new(self, f)
+ }
+
+ /// Maps this stream's items asynchronously to a different type, returning a
+ /// new stream of the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, and the returned future is executed. Only one
+ /// future is executed at the time.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `then` methods in the
+ /// standard library.
+ ///
+ /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
+ /// returned by this method. To handle this, you can use `tokio::pin!` as in
+ /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// async fn do_async_work(value: i32) -> i32 {
+ /// value + 3
+ /// }
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let stream = stream.then(do_async_work);
+ ///
+ /// tokio::pin!(stream);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
+ where
+ F: FnMut(Self::Item) -> Fut,
+ Fut: Future,
+ Self: Sized,
+ {
+ Then::new(self, f)
+ }
+
+ /// Combine two streams into one by interleaving the output of both as it
+ /// is produced.
+ ///
+ /// Values are produced from the merged stream in the order they arrive from
+ /// the two source streams. If both source streams provide values
+ /// simultaneously, the merge stream alternates between them. This provides
+ /// some level of fairness. You should not chain calls to `merge`, as this
+ /// will break the fairness of the merging.
+ ///
+ /// The merged stream completes once **both** source streams complete. When
+ /// one source stream completes before the other, the merge stream
+ /// exclusively polls the remaining stream.
+ ///
+ /// For merging multiple streams, consider using [`StreamMap`] instead.
+ ///
+ /// [`StreamMap`]: crate::StreamMap
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamExt, Stream};
+ /// use tokio::sync::mpsc;
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ /// use std::pin::Pin;
+ ///
+ /// # /*
+ /// #[tokio::main]
+ /// # */
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// async fn main() {
+ /// # time::pause();
+ /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
+ /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
+ ///
+ /// // Convert the channels to a `Stream`.
+ /// let rx1 = Box::pin(async_stream::stream! {
+ /// while let Some(item) = rx1.recv().await {
+ /// yield item;
+ /// }
+ /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+ ///
+ /// let rx2 = Box::pin(async_stream::stream! {
+ /// while let Some(item) = rx2.recv().await {
+ /// yield item;
+ /// }
+ /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+ ///
+ /// let mut rx = rx1.merge(rx2);
+ ///
+ /// tokio::spawn(async move {
+ /// // Send some values immediately
+ /// tx1.send(1).await.unwrap();
+ /// tx1.send(2).await.unwrap();
+ ///
+ /// // Let the other task send values
+ /// time::sleep(Duration::from_millis(20)).await;
+ ///
+ /// tx1.send(4).await.unwrap();
+ /// });
+ ///
+ /// tokio::spawn(async move {
+ /// // Wait for the first task to send values
+ /// time::sleep(Duration::from_millis(5)).await;
+ ///
+ /// tx2.send(3).await.unwrap();
+ ///
+ /// time::sleep(Duration::from_millis(25)).await;
+ ///
+ /// // Send the final value
+ /// tx2.send(5).await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(1, rx.next().await.unwrap());
+ /// assert_eq!(2, rx.next().await.unwrap());
+ /// assert_eq!(3, rx.next().await.unwrap());
+ /// assert_eq!(4, rx.next().await.unwrap());
+ /// assert_eq!(5, rx.next().await.unwrap());
+ ///
+ /// // The merged stream is consumed
+ /// assert!(rx.next().await.is_none());
+ /// }
+ /// ```
+ fn merge<U>(self, other: U) -> Merge<Self, U>
+ where
+ U: Stream<Item = Self::Item>,
+ Self: Sized,
+ {
+ Merge::new(self, other)
+ }
+
+ /// Filters the values produced by this stream according to the provided
+ /// predicate.
+ ///
+ /// As values of this stream are made available, the provided predicate `f`
+ /// will be run against them. If the predicate
+ /// resolves to `true`, then the stream will yield the value, but if the
+ /// predicate resolves to `false`, then the value
+ /// will be discarded and the next value will be produced.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to [`Iterator::filter`] method in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=8);
+ /// let mut evens = stream.filter(|x| x % 2 == 0);
+ ///
+ /// assert_eq!(Some(2), evens.next().await);
+ /// assert_eq!(Some(4), evens.next().await);
+ /// assert_eq!(Some(6), evens.next().await);
+ /// assert_eq!(Some(8), evens.next().await);
+ /// assert_eq!(None, evens.next().await);
+ /// # }
+ /// ```
+ fn filter<F>(self, f: F) -> Filter<Self, F>
+ where
+ F: FnMut(&Self::Item) -> bool,
+ Self: Sized,
+ {
+ Filter::new(self, f)
+ }
+
+ /// Filters the values produced by this stream while simultaneously mapping
+ /// them to a different type according to the provided closure.
+ ///
+ /// As values of this stream are made available, the provided function will
+ /// be run on them. If the predicate `f` resolves to
+ /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
+ /// it resolves to [`None`], then the value will be skipped.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
+ /// standard library.
+ ///
+ /// # Examples
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=8);
+ /// let mut evens = stream.filter_map(|x| {
+ /// if x % 2 == 0 { Some(x + 1) } else { None }
+ /// });
+ ///
+ /// assert_eq!(Some(3), evens.next().await);
+ /// assert_eq!(Some(5), evens.next().await);
+ /// assert_eq!(Some(7), evens.next().await);
+ /// assert_eq!(Some(9), evens.next().await);
+ /// assert_eq!(None, evens.next().await);
+ /// # }
+ /// ```
+ fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
+ where
+ F: FnMut(Self::Item) -> Option<T>,
+ Self: Sized,
+ {
+ FilterMap::new(self, f)
+ }
+
+ /// Creates a stream which ends after the first `None`.
+ ///
+ /// After a stream returns `None`, behavior is undefined. Future calls to
+ /// `poll_next` may or may not return `Some(T)` again or they may panic.
+ /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
+ /// return `None` forever.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{Stream, StreamExt};
+ ///
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ ///
+ /// // a stream which alternates between Some and None
+ /// struct Alternate {
+ /// state: i32,
+ /// }
+ ///
+ /// impl Stream for Alternate {
+ /// type Item = i32;
+ ///
+ /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+ /// let val = self.state;
+ /// self.state = self.state + 1;
+ ///
+ /// // if it's even, Some(i32), else None
+ /// if val % 2 == 0 {
+ /// Poll::Ready(Some(val))
+ /// } else {
+ /// Poll::Ready(None)
+ /// }
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut stream = Alternate { state: 0 };
+ ///
+ /// // the stream goes back and forth
+ /// assert_eq!(stream.next().await, Some(0));
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // however, once it is fused
+ /// let mut stream = stream.fuse();
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // it will always return `None` after the first time.
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// }
+ /// ```
+ fn fuse(self) -> Fuse<Self>
+ where
+ Self: Sized,
+ {
+ Fuse::new(self)
+ }
+
+ /// Creates a new stream of at most `n` items of the underlying stream.
+ ///
+ /// Once `n` items have been yielded from this stream then it will always
+ /// return that the stream is done.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=10).take(3);
+ ///
+ /// assert_eq!(Some(1), stream.next().await);
+ /// assert_eq!(Some(2), stream.next().await);
+ /// assert_eq!(Some(3), stream.next().await);
+ /// assert_eq!(None, stream.next().await);
+ /// # }
+ /// ```
+ fn take(self, n: usize) -> Take<Self>
+ where
+ Self: Sized,
+ {
+ Take::new(self, n)
+ }
+
+ /// Take elements from this stream while the provided predicate
+ /// resolves to `true`.
+ ///
+ /// This function, like `Iterator::take_while`, will take elements from the
+ /// stream until the predicate `f` resolves to `false`. Once one element
+ /// returns false it will always return that the stream is done.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
+ ///
+ /// assert_eq!(Some(1), stream.next().await);
+ /// assert_eq!(Some(2), stream.next().await);
+ /// assert_eq!(Some(3), stream.next().await);
+ /// assert_eq!(None, stream.next().await);
+ /// # }
+ /// ```
+ fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
+ where
+ F: FnMut(&Self::Item) -> bool,
+ Self: Sized,
+ {
+ TakeWhile::new(self, f)
+ }
+
+ /// Creates a new stream that will skip the `n` first items of the
+ /// underlying stream.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=10).skip(7);
+ ///
+ /// assert_eq!(Some(8), stream.next().await);
+ /// assert_eq!(Some(9), stream.next().await);
+ /// assert_eq!(Some(10), stream.next().await);
+ /// assert_eq!(None, stream.next().await);
+ /// # }
+ /// ```
+ fn skip(self, n: usize) -> Skip<Self>
+ where
+ Self: Sized,
+ {
+ Skip::new(self, n)
+ }
+
+ /// Skip elements from the underlying stream while the provided predicate
+ /// resolves to `true`.
+ ///
+ /// This function, like [`Iterator::skip_while`], will ignore elements from the
+ /// stream until the predicate `f` resolves to `false`. Once one element
+ /// returns false, the rest of the elements will be yielded.
+ ///
+ /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
+ ///
+ /// assert_eq!(Some(3), stream.next().await);
+ /// assert_eq!(Some(4), stream.next().await);
+ /// assert_eq!(Some(1), stream.next().await);
+ /// assert_eq!(None, stream.next().await);
+ /// # }
+ /// ```
+ fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
+ where
+ F: FnMut(&Self::Item) -> bool,
+ Self: Sized,
+ {
+ SkipWhile::new(self, f)
+ }
+
+ /// Tests if every element of the stream matches a predicate.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn all<F>(&mut self, f: F) -> bool;
+ /// ```
+ ///
+ /// `all()` takes a closure that returns `true` or `false`. It applies
+ /// this closure to each element of the stream, and if they all return
+ /// `true`, then so does `all`. If any of them return `false`, it
+ /// returns `false`. An empty stream returns `true`.
+ ///
+ /// `all()` is short-circuiting; in other words, it will stop processing
+ /// as soon as it finds a `false`, given that no matter what else happens,
+ /// the result will also be `false`.
+ ///
+ /// An empty stream returns `true`.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// assert!(stream::iter(&a).all(|&x| x > 0).await);
+ ///
+ /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
+ /// # }
+ /// ```
+ ///
+ /// Stopping at the first `false`:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// let mut iter = stream::iter(&a);
+ ///
+ /// assert!(!iter.all(|&x| x != 2).await);
+ ///
+ /// // we can still use `iter`, as there are more elements.
+ /// assert_eq!(iter.next().await, Some(&3));
+ /// # }
+ /// ```
+ fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
+ where
+ Self: Unpin,
+ F: FnMut(Self::Item) -> bool,
+ {
+ AllFuture::new(self, f)
+ }
+
+ /// Tests if any element of the stream matches a predicate.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn any<F>(&mut self, f: F) -> bool;
+ /// ```
+ ///
+ /// `any()` takes a closure that returns `true` or `false`. It applies
+ /// this closure to each element of the stream, and if any of them return
+ /// `true`, then so does `any()`. If they all return `false`, it
+ /// returns `false`.
+ ///
+ /// `any()` is short-circuiting; in other words, it will stop processing
+ /// as soon as it finds a `true`, given that no matter what else happens,
+ /// the result will also be `true`.
+ ///
+ /// An empty stream returns `false`.
+ ///
+ /// Basic usage:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// assert!(stream::iter(&a).any(|&x| x > 0).await);
+ ///
+ /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
+ /// # }
+ /// ```
+ ///
+ /// Stopping at the first `true`:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// let mut iter = stream::iter(&a);
+ ///
+ /// assert!(iter.any(|&x| x != 2).await);
+ ///
+ /// // we can still use `iter`, as there are more elements.
+ /// assert_eq!(iter.next().await, Some(&2));
+ /// # }
+ /// ```
+ fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
+ where
+ Self: Unpin,
+ F: FnMut(Self::Item) -> bool,
+ {
+ AnyFuture::new(self, f)
+ }
+
+ /// Combine two streams into one by first returning all values from the
+ /// first stream then all values from the second stream.
+ ///
+ /// As long as `self` still has values to emit, no values from `other` are
+ /// emitted, even if some are ready.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let one = stream::iter(vec![1, 2, 3]);
+ /// let two = stream::iter(vec![4, 5, 6]);
+ ///
+ /// let mut stream = one.chain(two);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// assert_eq!(stream.next().await, None);
+ /// }
+ /// ```
+ fn chain<U>(self, other: U) -> Chain<Self, U>
+ where
+ U: Stream<Item = Self::Item>,
+ Self: Sized,
+ {
+ Chain::new(self, other)
+ }
+
+ /// A combinator that applies a function to every element in a stream
+ /// producing a single, final value.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn fold<B, F>(self, init: B, f: F) -> B;
+ /// ```
+ ///
+ /// # Examples
+ /// Basic usage:
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, *};
+ ///
+ /// let s = stream::iter(vec![1u8, 2, 3]);
+ /// let sum = s.fold(0, |acc, x| acc + x).await;
+ ///
+ /// assert_eq!(sum, 6);
+ /// # }
+ /// ```
+ fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
+ where
+ Self: Sized,
+ F: FnMut(B, Self::Item) -> B,
+ {
+ FoldFuture::new(self, init, f)
+ }
+
+ /// Drain stream pushing all emitted values into a collection.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn collect<T>(self) -> T;
+ /// ```
+ ///
+ /// `collect` streams all values, awaiting as needed. Values are pushed into
+ /// a collection. A number of different target collection types are
+ /// supported, including [`Vec`](std::vec::Vec),
+ /// [`String`](std::string::String), and [`Bytes`].
+ ///
+ /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
+ ///
+ /// # `Result`
+ ///
+ /// `collect()` can also be used with streams of type `Result<T, E>` where
+ /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
+ /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
+ /// streaming is terminated and `collect()` returns the `Err`.
+ ///
+ /// # Notes
+ ///
+ /// `FromStream` is currently a sealed trait. Stabilization is pending
+ /// enhancements to the Rust language.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let doubled: Vec<i32> =
+ /// stream::iter(vec![1, 2, 3])
+ /// .map(|x| x * 2)
+ /// .collect()
+ /// .await;
+ ///
+ /// assert_eq!(vec![2, 4, 6], doubled);
+ /// }
+ /// ```
+ ///
+ /// Collecting a stream of `Result` values
+ ///
+ /// ```
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // A stream containing only `Ok` values will be collected
+ /// let values: Result<Vec<i32>, &str> =
+ /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
+ /// .collect()
+ /// .await;
+ ///
+ /// assert_eq!(Ok(vec![1, 2, 3]), values);
+ ///
+ /// // A stream containing `Err` values will return the first error.
+ /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
+ ///
+ /// let values: Result<Vec<i32>, &str> =
+ /// stream::iter(results)
+ /// .collect()
+ /// .await;
+ ///
+ /// assert_eq!(Err("no"), values);
+ /// }
+ /// ```
+ fn collect<T>(self) -> Collect<Self, T>
+ where
+ T: FromStream<Self::Item>,
+ Self: Sized,
+ {
+ Collect::new(self)
+ }
+
+ /// Applies a per-item timeout to the passed stream.
+ ///
+ /// `timeout()` takes a `Duration` that represents the maximum amount of
+ /// time each element of the stream has to complete before timing out.
+ ///
+ /// If the wrapped stream yields a value before the deadline is reached, the
+ /// value is returned. Otherwise, an error is returned. The caller may decide
+ /// to continue consuming the stream and will eventually get the next source
+ /// stream value once it becomes available.
+ ///
+ /// # Notes
+ ///
+ /// This function consumes the stream passed into it and returns a
+ /// wrapped version of it.
+ ///
+ /// Polling the returned stream will continue to poll the inner stream even
+ /// if one or more items time out.
+ ///
+ /// # Examples
+ ///
+ /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use std::time::Duration;
+ /// # let int_stream = stream::iter(1..=3);
+ ///
+ /// let int_stream = int_stream.timeout(Duration::from_secs(1));
+ /// tokio::pin!(int_stream);
+ ///
+ /// // When no items time out, we get the 3 elements in succession:
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If the second item times out, we get an error and continue polling the stream:
+ /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert!(int_stream.try_next().await.is_err());
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If we want to stop consuming the source stream the first time an
+ /// // element times out, we can use the `take_while` operator:
+ /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// let mut int_stream = int_stream.take_while(Result::is_ok);
+ ///
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn timeout(self, duration: Duration) -> Timeout<Self>
+ where
+ Self: Sized,
+ {
+ Timeout::new(self, duration)
+ }
+
+ /// Slows down a stream by enforcing a delay between items.
+ ///
+ /// The underlying timer behind this utility has a granularity of one millisecond.
+ ///
+ /// # Example
+ ///
+ /// Create a throttled stream.
+ /// ```rust,no_run
+ /// use std::time::Duration;
+ /// use tokio_stream::StreamExt;
+ ///
+ /// # async fn dox() {
+ /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
+ /// tokio::pin!(item_stream);
+ ///
+ /// loop {
+ /// // The string will be produced at most every 2 seconds
+ /// println!("{:?}", item_stream.next().await);
+ /// }
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn throttle(self, duration: Duration) -> Throttle<Self>
+ where
+ Self: Sized,
+ {
+ throttle(duration, self)
+ }
+
+ /// Batches the items in the given stream using a maximum duration and size for each batch.
+ ///
+ /// This stream returns the next batch of items in the following situations:
+ /// 1. The inner stream has returned at least `max_size` many items since the last batch.
+ /// 2. The time since the first item of a batch is greater than the given duration.
+ /// 3. The end of the stream is reached.
+ ///
+ /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
+ /// will not be emitted if no items are received upstream.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if `max_size` is zero
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use std::time::Duration;
+ /// use tokio::time;
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use futures::FutureExt;
+ ///
+ /// #[tokio::main]
+ /// # async fn _unused() {}
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// let iter = vec![1, 2, 3, 4].into_iter();
+ /// let stream0 = stream::iter(iter);
+ ///
+ /// let iter = vec![5].into_iter();
+ /// let stream1 = stream::iter(iter)
+ /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+ ///
+ /// let chunk_stream = stream0
+ /// .chain(stream1)
+ /// .chunks_timeout(3, Duration::from_secs(2));
+ /// tokio::pin!(chunk_stream);
+ ///
+ /// // a full batch was received
+ /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
+ /// // deadline was reached before max_size was reached
+ /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ /// // last element in the stream
+ /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
+ /// }
+ /// ```
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ #[track_caller]
+ fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
+ where
+ Self: Sized,
+ {
+ assert!(max_size > 0, "`max_size` must be non-zero.");
+ ChunksTimeout::new(self, max_size, duration)
+ }
+}
+
+impl<St: ?Sized> StreamExt for St where St: Stream {}
+
+/// Merge the size hints from two streams.
+fn merge_size_hints(
+ (left_low, left_high): (usize, Option<usize>),
+ (right_low, right_high): (usize, Option<usize>),
+) -> (usize, Option<usize>) {
+ let low = left_low.saturating_add(right_low);
+ let high = match (left_high, right_high) {
+ (Some(h1), Some(h2)) => h1.checked_add(h2),
+ _ => None,
+ };
+ (low, high)
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/all.rs b/third_party/rust/tokio-stream/src/stream_ext/all.rs
new file mode 100644
index 0000000000..b4dbc1e97c
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/all.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`all`](super::StreamExt::all) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct AllFuture<'a, St: ?Sized, F> {
+ stream: &'a mut St,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
+ pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+ Self {
+ stream,
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, F> Future for AllFuture<'_, St, F>
+where
+ St: ?Sized + Stream + Unpin,
+ F: FnMut(St::Item) -> bool,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ let mut stream = Pin::new(me.stream);
+
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if !(me.f)(v) {
+ return Poll::Ready(false);
+ }
+ }
+ None => return Poll::Ready(true),
+ }
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/any.rs b/third_party/rust/tokio-stream/src/stream_ext/any.rs
new file mode 100644
index 0000000000..31394f249b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/any.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`any`](super::StreamExt::any) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct AnyFuture<'a, St: ?Sized, F> {
+ stream: &'a mut St,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> {
+ pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+ Self {
+ stream,
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, F> Future for AnyFuture<'_, St, F>
+where
+ St: ?Sized + Stream + Unpin,
+ F: FnMut(St::Item) -> bool,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ let mut stream = Pin::new(me.stream);
+
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if (me.f)(v) {
+ return Poll::Ready(true);
+ }
+ }
+ None => return Poll::Ready(false),
+ }
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/chain.rs b/third_party/rust/tokio-stream/src/stream_ext/chain.rs
new file mode 100644
index 0000000000..bd64f33ce4
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/chain.rs
@@ -0,0 +1,50 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`chain`](super::StreamExt::chain) method.
+ pub struct Chain<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: U,
+ }
+}
+
+impl<T, U> Chain<T, U> {
+ pub(super) fn new(a: T, b: U) -> Chain<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Chain { a: Fuse::new(a), b }
+ }
+}
+
+impl<T, U> Stream for Chain<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ use Poll::Ready;
+
+ let me = self.project();
+
+ if let Some(v) = ready!(me.a.poll_next(cx)) {
+ return Ready(Some(v));
+ }
+
+ me.b.poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
new file mode 100644
index 0000000000..48acd9328b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,86 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{sleep, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct ChunksTimeout<S: Stream> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Option<Sleep>,
+ duration: Duration,
+ items: Vec<S::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<S: Stream> ChunksTimeout<S> {
+ pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
+ ChunksTimeout {
+ stream: Fuse::new(stream),
+ deadline: None,
+ duration,
+ items: Vec::with_capacity(max_size),
+ cap: max_size,
+ }
+ }
+}
+
+impl<S: Stream> Stream for ChunksTimeout<S> {
+ type Item = Vec<S::Item>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.as_mut().project();
+ loop {
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Pending => break,
+ Poll::Ready(Some(item)) => {
+ if me.items.is_empty() {
+ me.deadline.set(Some(sleep(*me.duration)));
+ me.items.reserve_exact(*me.cap);
+ }
+ me.items.push(item);
+ if me.items.len() >= *me.cap {
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+ }
+ Poll::Ready(None) => {
+ // Returning Some here is only correct because we fuse the inner stream.
+ let last = if me.items.is_empty() {
+ None
+ } else {
+ Some(std::mem::take(me.items))
+ };
+
+ return Poll::Ready(last);
+ }
+ }
+ }
+
+ if !me.items.is_empty() {
+ if let Some(deadline) = me.deadline.as_pin_mut() {
+ ready!(deadline.poll(cx));
+ }
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = (lower / self.cap).saturating_add(chunk_len);
+ let upper = upper.and_then(|x| x.checked_add(chunk_len));
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/collect.rs b/third_party/rust/tokio-stream/src/stream_ext/collect.rs
new file mode 100644
index 0000000000..8548b74556
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/collect.rs
@@ -0,0 +1,229 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+// Do not export this struct until `FromStream` can be unsealed.
+pin_project! {
+ /// Future returned by the [`collect`](super::StreamExt::collect) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[derive(Debug)]
+ pub struct Collect<T, U>
+ where
+ T: Stream,
+ U: FromStream<T::Item>,
+ {
+ #[pin]
+ stream: T,
+ collection: U::InternalCollection,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+/// Convert from a [`Stream`](crate::Stream).
+///
+/// This trait is not intended to be used directly. Instead, call
+/// [`StreamExt::collect()`](super::StreamExt::collect).
+///
+/// # Implementing
+///
+/// Currently, this trait may not be implemented by third parties. The trait is
+/// sealed in order to make changes in the future. Stabilization is pending
+/// enhancements to the Rust language.
+pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
+
+impl<T, U> Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ pub(super) fn new(stream: T) -> Collect<T, U> {
+ let (lower, upper) = stream.size_hint();
+ let collection = U::initialize(sealed::Internal, lower, upper);
+
+ Collect {
+ stream,
+ collection,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<T, U> Future for Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ type Output = U;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
+ use Poll::Ready;
+
+ loop {
+ let me = self.as_mut().project();
+
+ let item = match ready!(me.stream.poll_next(cx)) {
+ Some(item) => item,
+ None => {
+ return Ready(U::finalize(sealed::Internal, me.collection));
+ }
+ };
+
+ if !U::extend(sealed::Internal, me.collection, item) {
+ return Ready(U::finalize(sealed::Internal, me.collection));
+ }
+ }
+ }
+}
+
+// ===== FromStream implementations
+
+impl FromStream<()> for () {}
+
+impl sealed::FromStreamPriv<()> for () {
+ type InternalCollection = ();
+
+ fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
+
+ fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
+ true
+ }
+
+ fn finalize(_: sealed::Internal, _collection: &mut ()) {}
+}
+
+impl<T: AsRef<str>> FromStream<T> for String {}
+
+impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
+ type InternalCollection = String;
+
+ fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
+ String::new()
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
+ collection.push_str(item.as_ref());
+ true
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut String) -> String {
+ mem::take(collection)
+ }
+}
+
+impl<T> FromStream<T> for Vec<T> {}
+
+impl<T> sealed::FromStreamPriv<T> for Vec<T> {
+ type InternalCollection = Vec<T>;
+
+ fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
+ Vec::with_capacity(lower)
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+ collection.push(item);
+ true
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
+ mem::take(collection)
+ }
+}
+
+impl<T> FromStream<T> for Box<[T]> {}
+
+impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
+ type InternalCollection = Vec<T>;
+
+ fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+ <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
+ .into_boxed_slice()
+ }
+}
+
+impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
+
+impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
+where
+ U: FromStream<T>,
+{
+ type InternalCollection = Result<U::InternalCollection, E>;
+
+ fn initialize(
+ _: sealed::Internal,
+ lower: usize,
+ upper: Option<usize>,
+ ) -> Result<U::InternalCollection, E> {
+ Ok(U::initialize(sealed::Internal, lower, upper))
+ }
+
+ fn extend(
+ _: sealed::Internal,
+ collection: &mut Self::InternalCollection,
+ item: Result<T, E>,
+ ) -> bool {
+ assert!(collection.is_ok());
+ match item {
+ Ok(item) => {
+ let collection = collection.as_mut().ok().expect("invalid state");
+ U::extend(sealed::Internal, collection, item)
+ }
+ Err(err) => {
+ *collection = Err(err);
+ false
+ }
+ }
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
+ if let Ok(collection) = collection.as_mut() {
+ Ok(U::finalize(sealed::Internal, collection))
+ } else {
+ let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
+
+ Err(res.map(drop).unwrap_err())
+ }
+ }
+}
+
+pub(crate) mod sealed {
+ #[doc(hidden)]
+ pub trait FromStreamPriv<T> {
+ /// Intermediate type used during collection process
+ ///
+ /// The name of this type is internal and cannot be relied upon.
+ type InternalCollection;
+
+ /// Initialize the collection
+ fn initialize(
+ internal: Internal,
+ lower: usize,
+ upper: Option<usize>,
+ ) -> Self::InternalCollection;
+
+ /// Extend the collection with the received item
+ ///
+ /// Return `true` to continue streaming, `false` complete collection.
+ fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
+
+ /// Finalize collection into target type.
+ fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
+ }
+
+ #[allow(missing_debug_implementations)]
+ pub struct Internal;
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter.rs b/third_party/rust/tokio-stream/src/stream_ext/filter.rs
new file mode 100644
index 0000000000..f3dd8716b4
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/filter.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`filter`](super::StreamExt::filter) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Filter<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Filter<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Filter")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> Filter<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f }
+ }
+}
+
+impl<St, F> Stream for Filter<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if (self.as_mut().project().f)(&e) {
+ return Poll::Ready(Some(e));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs
new file mode 100644
index 0000000000..fe604a6f4b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FilterMap<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for FilterMap<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FilterMap")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> FilterMap<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for FilterMap<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Option<T>,
+{
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if let Some(e) = (self.as_mut().project().f)(e) {
+ return Poll::Ready(Some(e));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fold.rs b/third_party/rust/tokio-stream/src/stream_ext/fold.rs
new file mode 100644
index 0000000000..e2e97d8f37
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/fold.rs
@@ -0,0 +1,57 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future returned by the [`fold`](super::StreamExt::fold) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct FoldFuture<St, B, F> {
+ #[pin]
+ stream: St,
+ acc: Option<B>,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<St, B, F> FoldFuture<St, B, F> {
+ pub(super) fn new(stream: St, init: B, f: F) -> Self {
+ Self {
+ stream,
+ acc: Some(init),
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, B, F> Future for FoldFuture<St, B, F>
+where
+ St: Stream,
+ F: FnMut(B, St::Item) -> B,
+{
+ type Output = B;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ let next = ready!(me.stream.as_mut().poll_next(cx));
+
+ match next {
+ Some(v) => {
+ let old = me.acc.take().unwrap();
+ let new = (me.f)(old, v);
+ *me.acc = Some(new);
+ }
+ None => return Poll::Ready(me.acc.take().unwrap()),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fuse.rs b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
new file mode 100644
index 0000000000..2500641d95
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
@@ -0,0 +1,53 @@
+use crate::Stream;
+
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Stream returned by [`fuse()`][super::StreamExt::fuse].
+ #[derive(Debug)]
+ pub struct Fuse<T> {
+ #[pin]
+ stream: Option<T>,
+ }
+}
+
+impl<T> Fuse<T>
+where
+ T: Stream,
+{
+ pub(crate) fn new(stream: T) -> Fuse<T> {
+ Fuse {
+ stream: Some(stream),
+ }
+ }
+}
+
+impl<T> Stream for Fuse<T>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let res = match Option::as_pin_mut(self.as_mut().project().stream) {
+ Some(stream) => ready!(stream.poll_next(cx)),
+ None => return Poll::Ready(None),
+ };
+
+ if res.is_none() {
+ // Do not poll the stream anymore
+ self.as_mut().project().stream.set(None);
+ }
+
+ Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self.stream {
+ Some(ref stream) => stream.size_hint(),
+ None => (0, Some(0)),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/map.rs b/third_party/rust/tokio-stream/src/stream_ext/map.rs
new file mode 100644
index 0000000000..e6b47cd258
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/map.rs
@@ -0,0 +1,51 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Map").field("stream", &self.stream).finish()
+ }
+}
+
+impl<St, F> Map<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Map { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.as_mut()
+ .project()
+ .stream
+ .poll_next(cx)
+ .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/map_while.rs b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs
new file mode 100644
index 0000000000..d4fd825656
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs
@@ -0,0 +1,52 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map_while`](super::StreamExt::map_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct MapWhile<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for MapWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapWhile")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> MapWhile<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ MapWhile { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for MapWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Option<T>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let me = self.project();
+ let f = me.f;
+ me.stream.poll_next(cx).map(|opt| opt.and_then(f))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (_, upper) = self.stream.size_hint();
+ (0, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/merge.rs b/third_party/rust/tokio-stream/src/stream_ext/merge.rs
new file mode 100644
index 0000000000..9d5123c85a
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/merge.rs
@@ -0,0 +1,90 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`merge`](super::StreamExt::merge) method.
+ pub struct Merge<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: Fuse<U>,
+ // When `true`, poll `a` first, otherwise, `poll` b`.
+ a_first: bool,
+ }
+}
+
+impl<T, U> Merge<T, U> {
+ pub(super) fn new(a: T, b: U) -> Merge<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Merge {
+ a: Fuse::new(a),
+ b: Fuse::new(b),
+ a_first: true,
+ }
+ }
+}
+
+impl<T, U> Stream for Merge<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let me = self.project();
+ let a_first = *me.a_first;
+
+ // Toggle the flag
+ *me.a_first = !a_first;
+
+ if a_first {
+ poll_next(me.a, me.b, cx)
+ } else {
+ poll_next(me.b, me.a, cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+ }
+}
+
+fn poll_next<T, U>(
+ first: Pin<&mut T>,
+ second: Pin<&mut U>,
+ cx: &mut Context<'_>,
+) -> Poll<Option<T::Item>>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ use Poll::*;
+
+ let mut done = true;
+
+ match first.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ match second.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ if done {
+ Ready(None)
+ } else {
+ Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/next.rs b/third_party/rust/tokio-stream/src/stream_ext/next.rs
new file mode 100644
index 0000000000..706069fa6e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/next.rs
@@ -0,0 +1,44 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`next`](super::StreamExt::next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Next<'a, St: ?Sized> {
+ stream: &'a mut St,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized> Next<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Next {
+ stream,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
+ type Output = Option<St::Item>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ Pin::new(me.stream).poll_next(cx)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip.rs b/third_party/rust/tokio-stream/src/stream_ext/skip.rs
new file mode 100644
index 0000000000..80a0a0aff0
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/skip.rs
@@ -0,0 +1,63 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`skip`](super::StreamExt::skip) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Skip<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
+}
+
+impl<St> fmt::Debug for Skip<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Skip")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St> Skip<St> {
+ pub(super) fn new(stream: St, remaining: usize) -> Self {
+ Self { stream, remaining }
+ }
+}
+
+impl<St> Stream for Skip<St>
+where
+ St: Stream,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if self.remaining == 0 {
+ return Poll::Ready(Some(e));
+ }
+ *self.as_mut().project().remaining -= 1;
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = lower.saturating_sub(self.remaining);
+ let upper = upper.map(|x| x.saturating_sub(self.remaining));
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs
new file mode 100644
index 0000000000..985a92666e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs
@@ -0,0 +1,73 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct SkipWhile<St, F> {
+ #[pin]
+ stream: St,
+ predicate: Option<F>,
+ }
+}
+
+impl<St, F> fmt::Debug for SkipWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SkipWhile")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> SkipWhile<St, F> {
+ pub(super) fn new(stream: St, predicate: F) -> Self {
+ Self {
+ stream,
+ predicate: Some(predicate),
+ }
+ }
+}
+
+impl<St, F> Stream for SkipWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ if let Some(predicate) = this.predicate {
+ loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(item) => {
+ if !(predicate)(&item) {
+ *this.predicate = None;
+ return Poll::Ready(Some(item));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ } else {
+ this.stream.poll_next(cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ if self.predicate.is_some() {
+ return (0, upper);
+ }
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/take.rs b/third_party/rust/tokio-stream/src/stream_ext/take.rs
new file mode 100644
index 0000000000..c75648f606
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/take.rs
@@ -0,0 +1,76 @@
+use crate::Stream;
+
+use core::cmp;
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`take`](super::StreamExt::take) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Take<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
+}
+
+impl<St> fmt::Debug for Take<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Take")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St> Take<St> {
+ pub(super) fn new(stream: St, remaining: usize) -> Self {
+ Self { stream, remaining }
+ }
+}
+
+impl<St> Stream for Take<St>
+where
+ St: Stream,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if *self.as_mut().project().remaining > 0 {
+ self.as_mut().project().stream.poll_next(cx).map(|ready| {
+ match &ready {
+ Some(_) => {
+ *self.as_mut().project().remaining -= 1;
+ }
+ None => {
+ *self.as_mut().project().remaining = 0;
+ }
+ }
+ ready
+ })
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.remaining == 0 {
+ return (0, Some(0));
+ }
+
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = cmp::min(lower, self.remaining as usize);
+
+ let upper = match upper {
+ Some(x) if x < self.remaining as usize => Some(x),
+ _ => Some(self.remaining as usize),
+ };
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/take_while.rs b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs
new file mode 100644
index 0000000000..5ce4dd98a9
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs
@@ -0,0 +1,79 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`take_while`](super::StreamExt::take_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TakeWhile<St, F> {
+ #[pin]
+ stream: St,
+ predicate: F,
+ done: bool,
+ }
+}
+
+impl<St, F> fmt::Debug for TakeWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TakeWhile")
+ .field("stream", &self.stream)
+ .field("done", &self.done)
+ .finish()
+ }
+}
+
+impl<St, F> TakeWhile<St, F> {
+ pub(super) fn new(stream: St, predicate: F) -> Self {
+ Self {
+ stream,
+ predicate,
+ done: false,
+ }
+ }
+}
+
+impl<St, F> Stream for TakeWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if !*self.as_mut().project().done {
+ self.as_mut().project().stream.poll_next(cx).map(|ready| {
+ let ready = ready.and_then(|item| {
+ if !(self.as_mut().project().predicate)(&item) {
+ None
+ } else {
+ Some(item)
+ }
+ });
+
+ if ready.is_none() {
+ *self.as_mut().project().done = true;
+ }
+
+ ready
+ })
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done {
+ return (0, Some(0));
+ }
+
+ let (_, upper) = self.stream.size_hint();
+
+ (0, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/then.rs b/third_party/rust/tokio-stream/src/stream_ext/then.rs
new file mode 100644
index 0000000000..cc7caa721e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/then.rs
@@ -0,0 +1,83 @@
+use crate::Stream;
+
+use core::fmt;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`then`](super::StreamExt::then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Then<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Then")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> Then<St, Fut, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Then {
+ stream,
+ future: None,
+ f,
+ }
+ }
+}
+
+impl<St, F, Fut> Stream for Then<St, Fut, F>
+where
+ St: Stream,
+ Fut: Future,
+ F: FnMut(St::Item) -> Fut,
+{
+ type Item = Fut::Output;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
+ let mut me = self.project();
+
+ loop {
+ if let Some(future) = me.future.as_mut().as_pin_mut() {
+ match future.poll(cx) {
+ Poll::Ready(item) => {
+ me.future.set(None);
+ return Poll::Ready(Some(item));
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item)) => {
+ me.future.set(Some((me.f)(item)));
+ }
+ Poll::Ready(None) => return Poll::Ready(None),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let future_len = usize::from(self.future.is_some());
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = lower.saturating_add(future_len);
+ let upper = upper.and_then(|upper| upper.checked_add(future_len));
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/throttle.rs b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs
new file mode 100644
index 0000000000..50001392ee
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs
@@ -0,0 +1,96 @@
+//! Slow down a stream by enforcing a delay between items.
+
+use crate::Stream;
+use tokio::time::{Duration, Instant, Sleep};
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+use pin_project_lite::pin_project;
+
+pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
+where
+ T: Stream,
+{
+ Throttle {
+ delay: tokio::time::sleep_until(Instant::now() + duration),
+ duration,
+ has_delayed: true,
+ stream,
+ }
+}
+
+pin_project! {
+ /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to
+ /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Throttle<T> {
+ #[pin]
+ delay: Sleep,
+ duration: Duration,
+
+ // Set to true when `delay` has returned ready, but `stream` hasn't.
+ has_delayed: bool,
+
+ // The stream to throttle
+ #[pin]
+ stream: T,
+ }
+}
+
+impl<T> Throttle<T> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &T {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this combinator
+ /// is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the stream
+ /// which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so care
+ /// should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> T {
+ self.stream
+ }
+}
+
+impl<T: Stream> Stream for Throttle<T> {
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.project();
+ let dur = *me.duration;
+
+ if !*me.has_delayed && !is_zero(dur) {
+ ready!(me.delay.as_mut().poll(cx));
+ *me.has_delayed = true;
+ }
+
+ let value = ready!(me.stream.poll_next(cx));
+
+ if value.is_some() {
+ if !is_zero(dur) {
+ me.delay.reset(Instant::now() + dur);
+ }
+
+ *me.has_delayed = false;
+ }
+
+ Poll::Ready(value)
+ }
+}
+
+fn is_zero(dur: Duration) -> bool {
+ dur == Duration::from_millis(0)
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs
new file mode 100644
index 0000000000..a440d203ec
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs
@@ -0,0 +1,107 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{Instant, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct Timeout<S> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Sleep,
+ duration: Duration,
+ poll_deadline: bool,
+ }
+}
+
+/// Error returned by `Timeout`.
+#[derive(Debug, PartialEq, Eq)]
+pub struct Elapsed(());
+
+impl<S: Stream> Timeout<S> {
+ pub(super) fn new(stream: S, duration: Duration) -> Self {
+ let next = Instant::now() + duration;
+ let deadline = tokio::time::sleep_until(next);
+
+ Timeout {
+ stream: Fuse::new(stream),
+ deadline,
+ duration,
+ poll_deadline: true,
+ }
+ }
+}
+
+impl<S: Stream> Stream for Timeout<S> {
+ type Item = Result<S::Item, Elapsed>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let me = self.project();
+
+ match me.stream.poll_next(cx) {
+ Poll::Ready(v) => {
+ if v.is_some() {
+ let next = Instant::now() + *me.duration;
+ me.deadline.reset(next);
+ *me.poll_deadline = true;
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+ Poll::Pending => {}
+ };
+
+ if *me.poll_deadline {
+ ready!(me.deadline.poll(cx));
+ *me.poll_deadline = false;
+ return Poll::Ready(Some(Err(Elapsed::new())));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ // The timeout stream may insert an error before and after each message
+ // from the underlying stream, but no more than one error between each
+ // message. Hence the upper bound is computed as 2x+1.
+
+ // Using a helper function to enable use of question mark operator.
+ fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+ value?.checked_mul(2)?.checked_add(1)
+ }
+
+ (lower, twice_plus_one(upper))
+ }
+}
+
+// ===== impl Elapsed =====
+
+impl Elapsed {
+ pub(crate) fn new() -> Self {
+ Elapsed(())
+ }
+}
+
+impl fmt::Display for Elapsed {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "deadline has elapsed".fmt(fmt)
+ }
+}
+
+impl std::error::Error for Elapsed {}
+
+impl From<Elapsed> for std::io::Error {
+ fn from(_err: Elapsed) -> std::io::Error {
+ std::io::ErrorKind::TimedOut.into()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/try_next.rs b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs
new file mode 100644
index 0000000000..93aa3bc15f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs
@@ -0,0 +1,45 @@
+use crate::stream_ext::Next;
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_next`](super::StreamExt::try_next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryNext<'a, St: ?Sized> {
+ #[pin]
+ inner: Next<'a, St>,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized> TryNext<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Self {
+ inner: Next::new(stream),
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> {
+ type Output = Result<Option<T>, E>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ me.inner.poll(cx).map(Option::transpose)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_map.rs b/third_party/rust/tokio-stream/src/stream_map.rs
new file mode 100644
index 0000000000..215980474b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_map.rs
@@ -0,0 +1,690 @@
+use crate::Stream;
+
+use std::borrow::Borrow;
+use std::hash::Hash;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Combine many streams into one, indexing each source stream with a unique
+/// key.
+///
+/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
+/// streams into a single merged stream that yields values in the order that
+/// they arrive from the source streams. However, `StreamMap` has a lot more
+/// flexibility in usage patterns.
+///
+/// `StreamMap` can:
+///
+/// * Merge an arbitrary number of streams.
+/// * Track which source stream the value was received from.
+/// * Handle inserting and removing streams from the set of managed streams at
+/// any point during iteration.
+///
+/// All source streams held by `StreamMap` are indexed using a key. This key is
+/// included with the value when a source stream yields a value. The key is also
+/// used to remove the stream from the `StreamMap` before the stream has
+/// completed streaming.
+///
+/// # `Unpin`
+///
+/// Because the `StreamMap` API moves streams during runtime, both streams and
+/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
+/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
+/// pin the stream in the heap.
+///
+/// # Implementation
+///
+/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
+/// internal implementation detail will persist in future versions, but it is
+/// important to know the runtime implications. In general, `StreamMap` works
+/// best with a "smallish" number of streams as all entries are scanned on
+/// insert, remove, and polling. In cases where a large number of streams need
+/// to be merged, it may be advisable to use tasks sending values on a shared
+/// [`mpsc`] channel.
+///
+/// [`StreamExt::merge`]: crate::StreamExt::merge
+/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
+/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
+/// [`Box::pin`]: std::boxed::Box::pin
+///
+/// # Examples
+///
+/// Merging two streams, then remove them after receiving the first value
+///
+/// ```
+/// use tokio_stream::{StreamExt, StreamMap, Stream};
+/// use tokio::sync::mpsc;
+/// use std::pin::Pin;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
+/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
+///
+/// // Convert the channels to a `Stream`.
+/// let rx1 = Box::pin(async_stream::stream! {
+/// while let Some(item) = rx1.recv().await {
+/// yield item;
+/// }
+/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+/// let rx2 = Box::pin(async_stream::stream! {
+/// while let Some(item) = rx2.recv().await {
+/// yield item;
+/// }
+/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+/// tokio::spawn(async move {
+/// tx1.send(1).await.unwrap();
+///
+/// // This value will never be received. The send may or may not return
+/// // `Err` depending on if the remote end closed first or not.
+/// let _ = tx1.send(2).await;
+/// });
+///
+/// tokio::spawn(async move {
+/// tx2.send(3).await.unwrap();
+/// let _ = tx2.send(4).await;
+/// });
+///
+/// let mut map = StreamMap::new();
+///
+/// // Insert both streams
+/// map.insert("one", rx1);
+/// map.insert("two", rx2);
+///
+/// // Read twice
+/// for _ in 0..2 {
+/// let (key, val) = map.next().await.unwrap();
+///
+/// if key == "one" {
+/// assert_eq!(val, 1);
+/// } else {
+/// assert_eq!(val, 3);
+/// }
+///
+/// // Remove the stream to prevent reading the next value
+/// map.remove(key);
+/// }
+/// }
+/// ```
+///
+/// This example models a read-only client to a chat system with channels. The
+/// client sends commands to join and leave channels. `StreamMap` is used to
+/// manage active channel subscriptions.
+///
+/// For simplicity, messages are displayed with `println!`, but they could be
+/// sent to the client over a socket.
+///
+/// ```no_run
+/// use tokio_stream::{Stream, StreamExt, StreamMap};
+///
+/// enum Command {
+/// Join(String),
+/// Leave(String),
+/// }
+///
+/// fn commands() -> impl Stream<Item = Command> {
+/// // Streams in user commands by parsing `stdin`.
+/// # tokio_stream::pending()
+/// }
+///
+/// // Join a channel, returns a stream of messages received on the channel.
+/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
+/// // left as an exercise to the reader
+/// # tokio_stream::pending()
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut channels = StreamMap::new();
+///
+/// // Input commands (join / leave channels).
+/// let cmds = commands();
+/// tokio::pin!(cmds);
+///
+/// loop {
+/// tokio::select! {
+/// Some(cmd) = cmds.next() => {
+/// match cmd {
+/// Command::Join(chan) => {
+/// // Join the channel and add it to the `channels`
+/// // stream map
+/// let msgs = join(&chan);
+/// channels.insert(chan, msgs);
+/// }
+/// Command::Leave(chan) => {
+/// channels.remove(&chan);
+/// }
+/// }
+/// }
+/// Some((chan, msg)) = channels.next() => {
+/// // Received a message, display it on stdout with the channel
+/// // it originated from.
+/// println!("{}: {}", chan, msg);
+/// }
+/// // Both the `commands` stream and the `channels` stream are
+/// // complete. There is no more work to do, so leave the loop.
+/// else => break,
+/// }
+/// }
+/// }
+/// ```
+#[derive(Debug)]
+pub struct StreamMap<K, V> {
+ /// Streams stored in the map
+ entries: Vec<(K, V)>,
+}
+
+impl<K, V> StreamMap<K, V> {
+ /// An iterator visiting all key-value pairs in arbitrary order.
+ ///
+ /// The iterator element type is &'a (K, V).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for (key, stream) in map.iter() {
+ /// println!("({}, {:?})", key, stream);
+ /// }
+ /// ```
+ pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
+ self.entries.iter()
+ }
+
+ /// An iterator visiting all key-value pairs mutably in arbitrary order.
+ ///
+ /// The iterator element type is &'a mut (K, V).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for (key, stream) in map.iter_mut() {
+ /// println!("({}, {:?})", key, stream);
+ /// }
+ /// ```
+ pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
+ self.entries.iter_mut()
+ }
+
+ /// Creates an empty `StreamMap`.
+ ///
+ /// The stream map is initially created with a capacity of `0`, so it will
+ /// not allocate until it is first inserted into.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
+ /// ```
+ pub fn new() -> StreamMap<K, V> {
+ StreamMap { entries: vec![] }
+ }
+
+ /// Creates an empty `StreamMap` with the specified capacity.
+ ///
+ /// The stream map will be able to hold at least `capacity` elements without
+ /// reallocating. If `capacity` is 0, the stream map will not allocate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
+ /// ```
+ pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
+ StreamMap {
+ entries: Vec::with_capacity(capacity),
+ }
+ }
+
+ /// Returns an iterator visiting all keys in arbitrary order.
+ ///
+ /// The iterator element type is &'a K.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for key in map.keys() {
+ /// println!("{}", key);
+ /// }
+ /// ```
+ pub fn keys(&self) -> impl Iterator<Item = &K> {
+ self.iter().map(|(k, _)| k)
+ }
+
+ /// An iterator visiting all values in arbitrary order.
+ ///
+ /// The iterator element type is &'a V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values(&self) -> impl Iterator<Item = &V> {
+ self.iter().map(|(_, v)| v)
+ }
+
+ /// An iterator visiting all values mutably in arbitrary order.
+ ///
+ /// The iterator element type is &'a mut V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values_mut() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
+ self.iter_mut().map(|(_, v)| v)
+ }
+
+ /// Returns the number of streams the map can hold without reallocating.
+ ///
+ /// This number is a lower bound; the `StreamMap` might be able to hold
+ /// more, but is guaranteed to be able to hold at least this many.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
+ /// assert!(map.capacity() >= 100);
+ /// ```
+ pub fn capacity(&self) -> usize {
+ self.entries.capacity()
+ }
+
+ /// Returns the number of streams in the map.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// assert_eq!(a.len(), 0);
+ /// a.insert(1, pending::<i32>());
+ /// assert_eq!(a.len(), 1);
+ /// ```
+ pub fn len(&self) -> usize {
+ self.entries.len()
+ }
+
+ /// Returns `true` if the map contains no elements.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// assert!(a.is_empty());
+ /// a.insert(1, pending::<i32>());
+ /// assert!(!a.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.entries.is_empty()
+ }
+
+ /// Clears the map, removing all key-stream pairs. Keeps the allocated
+ /// memory for reuse.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// a.insert(1, pending::<i32>());
+ /// a.clear();
+ /// assert!(a.is_empty());
+ /// ```
+ pub fn clear(&mut self) {
+ self.entries.clear();
+ }
+
+ /// Insert a key-stream pair into the map.
+ ///
+ /// If the map did not have this key present, `None` is returned.
+ ///
+ /// If the map did have this key present, the new `stream` replaces the old
+ /// one and the old stream is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// assert!(map.insert(37, pending::<i32>()).is_none());
+ /// assert!(!map.is_empty());
+ ///
+ /// map.insert(37, pending());
+ /// assert!(map.insert(37, pending()).is_some());
+ /// ```
+ pub fn insert(&mut self, k: K, stream: V) -> Option<V>
+ where
+ K: Hash + Eq,
+ {
+ let ret = self.remove(&k);
+ self.entries.push((k, stream));
+
+ ret
+ }
+
+ /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert!(map.remove(&1).is_some());
+ /// assert!(map.remove(&1).is_none());
+ /// ```
+ pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return Some(self.entries.swap_remove(i).1);
+ }
+ }
+
+ None
+ }
+
+ /// Returns `true` if the map contains a stream for the specified key.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert_eq!(map.contains_key(&1), true);
+ /// assert_eq!(map.contains_key(&2), false);
+ /// ```
+ pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return true;
+ }
+ }
+
+ false
+ }
+}
+
+impl<K, V> StreamMap<K, V>
+where
+ K: Unpin,
+ V: Stream + Unpin,
+{
+ /// Polls the next value, includes the vec entry index
+ fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
+ use Poll::*;
+
+ let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
+ let mut idx = start;
+
+ for _ in 0..self.entries.len() {
+ let (_, stream) = &mut self.entries[idx];
+
+ match Pin::new(stream).poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some((idx, val))),
+ Ready(None) => {
+ // Remove the entry
+ self.entries.swap_remove(idx);
+
+ // Check if this was the last entry, if so the cursor needs
+ // to wrap
+ if idx == self.entries.len() {
+ idx = 0;
+ } else if idx < start && start <= self.entries.len() {
+ // The stream being swapped into the current index has
+ // already been polled, so skip it.
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ Pending => {
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ }
+
+ // If the map is empty, then the stream is complete.
+ if self.entries.is_empty() {
+ Ready(None)
+ } else {
+ Pending
+ }
+ }
+}
+
+impl<K, V> Default for StreamMap<K, V> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<K, V> Stream for StreamMap<K, V>
+where
+ K: Clone + Unpin,
+ V: Stream + Unpin,
+{
+ type Item = (K, V::Item);
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
+ let key = self.entries[idx].0.clone();
+ Poll::Ready(Some((key, val)))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let mut ret = (0, Some(0));
+
+ for (_, stream) in &self.entries {
+ let hint = stream.size_hint();
+
+ ret.0 += hint.0;
+
+ match (ret.1, hint.1) {
+ (Some(a), Some(b)) => ret.1 = Some(a + b),
+ (Some(_), None) => ret.1 = None,
+ _ => {}
+ }
+ }
+
+ ret
+ }
+}
+
+impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
+where
+ K: Hash + Eq,
+{
+ fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+ let iterator = iter.into_iter();
+ let (lower_bound, _) = iterator.size_hint();
+ let mut stream_map = Self::with_capacity(lower_bound);
+
+ for (key, value) in iterator {
+ stream_map.insert(key, value);
+ }
+
+ stream_map
+ }
+}
+
+impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
+ fn extend<T>(&mut self, iter: T)
+ where
+ T: IntoIterator<Item = (K, V)>,
+ {
+ self.entries.extend(iter);
+ }
+}
+
+mod rand {
+ use std::cell::Cell;
+
+ mod loom {
+ #[cfg(not(loom))]
+ pub(crate) mod rand {
+ use std::collections::hash_map::RandomState;
+ use std::hash::{BuildHasher, Hash, Hasher};
+ use std::sync::atomic::AtomicU32;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ static COUNTER: AtomicU32 = AtomicU32::new(1);
+
+ pub(crate) fn seed() -> u64 {
+ let rand_state = RandomState::new();
+
+ let mut hasher = rand_state.build_hasher();
+
+ // Hash some unique-ish data to generate some new state
+ COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
+
+ // Get the seed
+ hasher.finish()
+ }
+ }
+
+ #[cfg(loom)]
+ pub(crate) mod rand {
+ pub(crate) fn seed() -> u64 {
+ 1
+ }
+ }
+ }
+
+ /// Fast random number generate
+ ///
+ /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
+ /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
+ /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
+ /// This generator passes the SmallCrush suite, part of TestU01 framework:
+ /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
+ #[derive(Debug)]
+ pub(crate) struct FastRand {
+ one: Cell<u32>,
+ two: Cell<u32>,
+ }
+
+ impl FastRand {
+ /// Initialize a new, thread-local, fast random number generator.
+ pub(crate) fn new(seed: u64) -> FastRand {
+ let one = (seed >> 32) as u32;
+ let mut two = seed as u32;
+
+ if two == 0 {
+ // This value cannot be zero
+ two = 1;
+ }
+
+ FastRand {
+ one: Cell::new(one),
+ two: Cell::new(two),
+ }
+ }
+
+ pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
+ // This is similar to fastrand() % n, but faster.
+ // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
+ let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
+ (mul >> 32) as u32
+ }
+
+ fn fastrand(&self) -> u32 {
+ let mut s1 = self.one.get();
+ let s0 = self.two.get();
+
+ s1 ^= s1 << 17;
+ s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
+
+ self.one.set(s0);
+ self.two.set(s1);
+
+ s0.wrapping_add(s1)
+ }
+ }
+
+ // Used by `StreamMap`
+ pub(crate) fn thread_rng_n(n: u32) -> u32 {
+ thread_local! {
+ static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
+ }
+
+ THREAD_RNG.with(|rng| rng.fastrand_n(n))
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers.rs b/third_party/rust/tokio-stream/src/wrappers.rs
new file mode 100644
index 0000000000..62cabe4f7d
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers.rs
@@ -0,0 +1,62 @@
+//! Wrappers for Tokio types that implement `Stream`.
+
+/// Error types for the wrappers.
+pub mod errors {
+ cfg_sync! {
+ pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
+ }
+}
+
+mod mpsc_bounded;
+pub use mpsc_bounded::ReceiverStream;
+
+mod mpsc_unbounded;
+pub use mpsc_unbounded::UnboundedReceiverStream;
+
+cfg_sync! {
+ mod broadcast;
+ pub use broadcast::BroadcastStream;
+
+ mod watch;
+ pub use watch::WatchStream;
+}
+
+cfg_signal! {
+ #[cfg(unix)]
+ mod signal_unix;
+ #[cfg(unix)]
+ pub use signal_unix::SignalStream;
+
+ #[cfg(any(windows, docsrs))]
+ mod signal_windows;
+ #[cfg(any(windows, docsrs))]
+ pub use signal_windows::{CtrlCStream, CtrlBreakStream};
+}
+
+cfg_time! {
+ mod interval;
+ pub use interval::IntervalStream;
+}
+
+cfg_net! {
+ mod tcp_listener;
+ pub use tcp_listener::TcpListenerStream;
+
+ #[cfg(unix)]
+ mod unix_listener;
+ #[cfg(unix)]
+ pub use unix_listener::UnixListenerStream;
+}
+
+cfg_io_util! {
+ mod split;
+ pub use split::SplitStream;
+
+ mod lines;
+ pub use lines::LinesStream;
+}
+
+cfg_fs! {
+ mod read_dir;
+ pub use read_dir::ReadDirStream;
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/broadcast.rs b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs
new file mode 100644
index 0000000000..711066466a
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs
@@ -0,0 +1,79 @@
+use std::pin::Pin;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::broadcast::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct BroadcastStream<T> {
+ inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
+}
+
+/// An error returned from the inner stream of a [`BroadcastStream`].
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum BroadcastStreamRecvError {
+ /// The receiver lagged too far behind. Attempting to receive again will
+ /// return the oldest message still retained by the channel.
+ ///
+ /// Includes the number of skipped messages.
+ Lagged(u64),
+}
+
+impl fmt::Display for BroadcastStreamRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
+ }
+ }
+}
+
+impl std::error::Error for BroadcastStreamRecvError {}
+
+async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
+ let result = rx.recv().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Send> BroadcastStream<T> {
+ /// Create a new `BroadcastStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
+ type Item = Result<T, BroadcastStreamRecvError>;
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, rx) = ready!(self.inner.poll(cx));
+ self.inner.set(make_future(rx));
+ match result {
+ Ok(item) => Poll::Ready(Some(Ok(item))),
+ Err(RecvError::Closed) => Poll::Ready(None),
+ Err(RecvError::Lagged(n)) => {
+ Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for BroadcastStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BroadcastStream").finish()
+ }
+}
+
+impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/interval.rs b/third_party/rust/tokio-stream/src/wrappers/interval.rs
new file mode 100644
index 0000000000..2bf0194bd0
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/interval.rs
@@ -0,0 +1,50 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::time::{Instant, Interval};
+
+/// A wrapper around [`Interval`] that implements [`Stream`].
+///
+/// [`Interval`]: struct@tokio::time::Interval
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+pub struct IntervalStream {
+ inner: Interval,
+}
+
+impl IntervalStream {
+ /// Create a new `IntervalStream`.
+ pub fn new(interval: Interval) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `Interval`.
+ pub fn into_inner(self) -> Interval {
+ self.inner
+ }
+}
+
+impl Stream for IntervalStream {
+ type Item = Instant;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
+ self.inner.poll_tick(cx).map(Some)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (std::usize::MAX, None)
+ }
+}
+
+impl AsRef<Interval> for IntervalStream {
+ fn as_ref(&self) -> &Interval {
+ &self.inner
+ }
+}
+
+impl AsMut<Interval> for IntervalStream {
+ fn as_mut(&mut self) -> &mut Interval {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/lines.rs b/third_party/rust/tokio-stream/src/wrappers/lines.rs
new file mode 100644
index 0000000000..ad3c25349f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/lines.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Lines};
+
+pin_project! {
+ /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
+ ///
+ /// [`tokio::io::Lines`]: struct@tokio::io::Lines
+ /// [`Stream`]: trait@crate::Stream
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+ pub struct LinesStream<R> {
+ #[pin]
+ inner: Lines<R>,
+ }
+}
+
+impl<R> LinesStream<R> {
+ /// Create a new `LinesStream`.
+ pub fn new(lines: Lines<R>) -> Self {
+ Self { inner: lines }
+ }
+
+ /// Get back the inner `Lines`.
+ pub fn into_inner(self) -> Lines<R> {
+ self.inner
+ }
+
+ /// Obtain a pinned reference to the inner `Lines<R>`.
+ #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> {
+ self.project().inner
+ }
+}
+
+impl<R: AsyncBufRead> Stream for LinesStream<R> {
+ type Item = io::Result<String>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project()
+ .inner
+ .poll_next_line(cx)
+ .map(Result::transpose)
+ }
+}
+
+impl<R> AsRef<Lines<R>> for LinesStream<R> {
+ fn as_ref(&self) -> &Lines<R> {
+ &self.inner
+ }
+}
+
+impl<R> AsMut<Lines<R>> for LinesStream<R> {
+ fn as_mut(&mut self) -> &mut Lines<R> {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs
new file mode 100644
index 0000000000..b5362680ee
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs
@@ -0,0 +1,65 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::Receiver;
+
+/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct ReceiverStream<T> {
+ inner: Receiver<T>,
+}
+
+impl<T> ReceiverStream<T> {
+ /// Create a new `ReceiverStream`.
+ pub fn new(recv: Receiver<T>) -> Self {
+ Self { inner: recv }
+ }
+
+ /// Get back the inner `Receiver`.
+ pub fn into_inner(self) -> Receiver<T> {
+ self.inner
+ }
+
+ /// Closes the receiving half of a channel without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered. Any
+ /// outstanding [`Permit`] values will still be able to send messages.
+ ///
+ /// To guarantee no messages are dropped, after calling `close()`, you must
+ /// receive all items from the stream until `None` is returned.
+ ///
+ /// [`Permit`]: struct@tokio::sync::mpsc::Permit
+ pub fn close(&mut self) {
+ self.inner.close()
+ }
+}
+
+impl<T> Stream for ReceiverStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
+ fn as_ref(&self) -> &Receiver<T> {
+ &self.inner
+ }
+}
+
+impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
+ fn as_mut(&mut self) -> &mut Receiver<T> {
+ &mut self.inner
+ }
+}
+
+impl<T> From<Receiver<T>> for ReceiverStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs
new file mode 100644
index 0000000000..54597b7f6f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs
@@ -0,0 +1,59 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::UnboundedReceiver;
+
+/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct UnboundedReceiverStream<T> {
+ inner: UnboundedReceiver<T>,
+}
+
+impl<T> UnboundedReceiverStream<T> {
+ /// Create a new `UnboundedReceiverStream`.
+ pub fn new(recv: UnboundedReceiver<T>) -> Self {
+ Self { inner: recv }
+ }
+
+ /// Get back the inner `UnboundedReceiver`.
+ pub fn into_inner(self) -> UnboundedReceiver<T> {
+ self.inner
+ }
+
+ /// Closes the receiving half of a channel without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.inner.close()
+ }
+}
+
+impl<T> Stream for UnboundedReceiverStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn as_ref(&self) -> &UnboundedReceiver<T> {
+ &self.inner
+ }
+}
+
+impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
+ &mut self.inner
+ }
+}
+
+impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn from(recv: UnboundedReceiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/read_dir.rs b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs
new file mode 100644
index 0000000000..b5cf54f79e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs
@@ -0,0 +1,47 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::fs::{DirEntry, ReadDir};
+
+/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
+///
+/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
+pub struct ReadDirStream {
+ inner: ReadDir,
+}
+
+impl ReadDirStream {
+ /// Create a new `ReadDirStream`.
+ pub fn new(read_dir: ReadDir) -> Self {
+ Self { inner: read_dir }
+ }
+
+ /// Get back the inner `ReadDir`.
+ pub fn into_inner(self) -> ReadDir {
+ self.inner
+ }
+}
+
+impl Stream for ReadDirStream {
+ type Item = io::Result<DirEntry>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_next_entry(cx).map(Result::transpose)
+ }
+}
+
+impl AsRef<ReadDir> for ReadDirStream {
+ fn as_ref(&self) -> &ReadDir {
+ &self.inner
+ }
+}
+
+impl AsMut<ReadDir> for ReadDirStream {
+ fn as_mut(&mut self) -> &mut ReadDir {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs
new file mode 100644
index 0000000000..2f74e7d152
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs
@@ -0,0 +1,46 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::unix::Signal;
+
+/// A wrapper around [`Signal`] that implements [`Stream`].
+///
+/// [`Signal`]: struct@tokio::signal::unix::Signal
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))]
+pub struct SignalStream {
+ inner: Signal,
+}
+
+impl SignalStream {
+ /// Create a new `SignalStream`.
+ pub fn new(interval: Signal) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `Signal`.
+ pub fn into_inner(self) -> Signal {
+ self.inner
+ }
+}
+
+impl Stream for SignalStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<Signal> for SignalStream {
+ fn as_ref(&self) -> &Signal {
+ &self.inner
+ }
+}
+
+impl AsMut<Signal> for SignalStream {
+ fn as_mut(&mut self) -> &mut Signal {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs
new file mode 100644
index 0000000000..4631fbad8d
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs
@@ -0,0 +1,88 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::windows::{CtrlBreak, CtrlC};
+
+/// A wrapper around [`CtrlC`] that implements [`Stream`].
+///
+/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlCStream {
+ inner: CtrlC,
+}
+
+impl CtrlCStream {
+ /// Create a new `CtrlCStream`.
+ pub fn new(interval: CtrlC) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlC`.
+ pub fn into_inner(self) -> CtrlC {
+ self.inner
+ }
+}
+
+impl Stream for CtrlCStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlC> for CtrlCStream {
+ fn as_ref(&self) -> &CtrlC {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlC> for CtrlCStream {
+ fn as_mut(&mut self) -> &mut CtrlC {
+ &mut self.inner
+ }
+}
+
+/// A wrapper around [`CtrlBreak`] that implements [`Stream`].
+///
+/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlBreakStream {
+ inner: CtrlBreak,
+}
+
+impl CtrlBreakStream {
+ /// Create a new `CtrlBreakStream`.
+ pub fn new(interval: CtrlBreak) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlBreak`.
+ pub fn into_inner(self) -> CtrlBreak {
+ self.inner
+ }
+}
+
+impl Stream for CtrlBreakStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlBreak> for CtrlBreakStream {
+ fn as_ref(&self) -> &CtrlBreak {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlBreak> for CtrlBreakStream {
+ fn as_mut(&mut self) -> &mut CtrlBreak {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/split.rs b/third_party/rust/tokio-stream/src/wrappers/split.rs
new file mode 100644
index 0000000000..5a6bb2d408
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/split.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Split};
+
+pin_project! {
+ /// A wrapper around [`tokio::io::Split`] that implements [`Stream`].
+ ///
+ /// [`tokio::io::Split`]: struct@tokio::io::Split
+ /// [`Stream`]: trait@crate::Stream
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+ pub struct SplitStream<R> {
+ #[pin]
+ inner: Split<R>,
+ }
+}
+
+impl<R> SplitStream<R> {
+ /// Create a new `SplitStream`.
+ pub fn new(split: Split<R>) -> Self {
+ Self { inner: split }
+ }
+
+ /// Get back the inner `Split`.
+ pub fn into_inner(self) -> Split<R> {
+ self.inner
+ }
+
+ /// Obtain a pinned reference to the inner `Split<R>`.
+ #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> {
+ self.project().inner
+ }
+}
+
+impl<R: AsyncBufRead> Stream for SplitStream<R> {
+ type Item = io::Result<Vec<u8>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project()
+ .inner
+ .poll_next_segment(cx)
+ .map(Result::transpose)
+ }
+}
+
+impl<R> AsRef<Split<R>> for SplitStream<R> {
+ fn as_ref(&self) -> &Split<R> {
+ &self.inner
+ }
+}
+
+impl<R> AsMut<Split<R>> for SplitStream<R> {
+ fn as_mut(&mut self) -> &mut Split<R> {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs
new file mode 100644
index 0000000000..ce7cb16350
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{TcpListener, TcpStream};
+
+/// A wrapper around [`TcpListener`] that implements [`Stream`].
+///
+/// [`TcpListener`]: struct@tokio::net::TcpListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+pub struct TcpListenerStream {
+ inner: TcpListener,
+}
+
+impl TcpListenerStream {
+ /// Create a new `TcpListenerStream`.
+ pub fn new(listener: TcpListener) -> Self {
+ Self { inner: listener }
+ }
+
+ /// Get back the inner `TcpListener`.
+ pub fn into_inner(self) -> TcpListener {
+ self.inner
+ }
+}
+
+impl Stream for TcpListenerStream {
+ type Item = io::Result<TcpStream>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<io::Result<TcpStream>>> {
+ match self.inner.poll_accept(cx) {
+ Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+ Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl AsRef<TcpListener> for TcpListenerStream {
+ fn as_ref(&self) -> &TcpListener {
+ &self.inner
+ }
+}
+
+impl AsMut<TcpListener> for TcpListenerStream {
+ fn as_mut(&mut self) -> &mut TcpListener {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs
new file mode 100644
index 0000000000..0beba588c2
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{UnixListener, UnixStream};
+
+/// A wrapper around [`UnixListener`] that implements [`Stream`].
+///
+/// [`UnixListener`]: struct@tokio::net::UnixListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))]
+pub struct UnixListenerStream {
+ inner: UnixListener,
+}
+
+impl UnixListenerStream {
+ /// Create a new `UnixListenerStream`.
+ pub fn new(listener: UnixListener) -> Self {
+ Self { inner: listener }
+ }
+
+ /// Get back the inner `UnixListener`.
+ pub fn into_inner(self) -> UnixListener {
+ self.inner
+ }
+}
+
+impl Stream for UnixListenerStream {
+ type Item = io::Result<UnixStream>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<io::Result<UnixStream>>> {
+ match self.inner.poll_accept(cx) {
+ Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+ Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl AsRef<UnixListener> for UnixListenerStream {
+ fn as_ref(&self) -> &UnixListener {
+ &self.inner
+ }
+}
+
+impl AsMut<UnixListener> for UnixListenerStream {
+ fn as_mut(&mut self) -> &mut UnixListener {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/watch.rs b/third_party/rust/tokio-stream/src/wrappers/watch.rs
new file mode 100644
index 0000000000..ec8ead06da
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/watch.rs
@@ -0,0 +1,132 @@
+use std::pin::Pin;
+use tokio::sync::watch::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+use tokio::sync::watch::error::RecvError;
+
+/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
+///
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
+///
+/// # Examples
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// assert_eq!(rx.next().await, Some("hello"));
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct WatchStream<T> {
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
+}
+
+async fn make_future<T: Clone + Send + Sync>(
+ mut rx: Receiver<T>,
+) -> (Result<(), RecvError>, Receiver<T>) {
+ let result = rx.changed().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
+ /// Create a new `WatchStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ /// Create a new `WatchStream` that waits for the value to be changed.
+ pub fn from_changes(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(make_future(rx));
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(make_future(rx));
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl<T> Unpin for WatchStream<T> {}
+
+impl<T> fmt::Debug for WatchStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WatchStream").finish()
+ }
+}
+
+impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}