diff options
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/stream_ext.rs | 1084 |
1 files changed, 1084 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext.rs b/third_party/rust/tokio-stream/src/stream_ext.rs new file mode 100644 index 0000000000..52d32024ff --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext.rs @@ -0,0 +1,1084 @@ +use core::future::Future; +use futures_core::Stream; + +mod all; +use all::AllFuture; + +mod any; +use any::AnyFuture; + +mod chain; +use chain::Chain; + +pub(crate) mod collect; +use collect::{Collect, FromStream}; + +mod filter; +use filter::Filter; + +mod filter_map; +use filter_map::FilterMap; + +mod fold; +use fold::FoldFuture; + +mod fuse; +use fuse::Fuse; + +mod map; +use map::Map; + +mod map_while; +use map_while::MapWhile; + +mod merge; +use merge::Merge; + +mod next; +use next::Next; + +mod skip; +use skip::Skip; + +mod skip_while; +use skip_while::SkipWhile; + +mod take; +use take::Take; + +mod take_while; +use take_while::TakeWhile; + +mod then; +use then::Then; + +mod try_next; +use try_next::TryNext; + +cfg_time! { + pub(crate) mod timeout; + use timeout::Timeout; + use tokio::time::Duration; + mod throttle; + use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; +} + +/// An extension trait for the [`Stream`] trait that provides a variety of +/// convenient combinator functions. +/// +/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found +/// in the [futures] crate, however both Tokio and futures provide separate +/// `StreamExt` utility traits, and some utilities are only available on one of +/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` +/// trait in the futures crate. +/// +/// If you need utilities from both `StreamExt` traits, you should prefer to +/// import one of them, and use the other through the fully qualified call +/// syntax. For example: +/// ``` +/// // import one of the traits: +/// use futures::stream::StreamExt; +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// +/// let a = tokio_stream::iter(vec![1, 3, 5]); +/// let b = tokio_stream::iter(vec![2, 4, 6]); +/// +/// // use the fully qualified call syntax for the other trait: +/// let merged = tokio_stream::StreamExt::merge(a, b); +/// +/// // use normal call notation for futures::stream::StreamExt::collect +/// let output: Vec<_> = merged.collect().await; +/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); +/// # } +/// ``` +/// +/// [`Stream`]: crate::Stream +/// [futures]: https://docs.rs/futures +/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html +pub trait StreamExt: Stream { + /// Consumes and returns the next value in the stream or `None` if the + /// stream is finished. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option<Self::Item>; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Consumes and returns the next item in the stream. If an error is + /// encountered before the next item, the error is returned instead. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn try_next(&mut self) -> Result<Option<T>, E>; + /// ``` + /// + /// This is similar to the [`next`](StreamExt::next) combinator, + /// but returns a [`Result<Option<T>, E>`](Result) rather than + /// an [`Option<Result<T, E>>`](Option), making for easy use + /// with the [`?`](std::ops::Try) operator. + /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(1))); + /// assert_eq!(stream.try_next().await, Ok(Some(2))); + /// assert_eq!(stream.try_next().await, Err("nope")); + /// # } + /// ``` + fn try_next<T, E>(&mut self) -> TryNext<'_, Self> + where + Self: Stream<Item = Result<T, E>> + Unpin, + { + TryNext::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map<T, F>(self, f: F) -> Map<Self, F> + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } + + /// Map this stream's items to a different type for as long as determined by + /// the provided closure. A stream of the target type will be returned, + /// which will yield elements until the closure returns `None`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, until it returns `None`. It is executed inline + /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned, + /// the underlying stream will not be polled again. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the [`Iterator::map_while`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let mut stream = stream.map_while(|x| { + /// if x < 4 { + /// Some(x + 3) + /// } else { + /// None + /// } + /// }); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + MapWhile::new(self, f) + } + + /// Maps this stream's items asynchronously to a different type, returning a + /// new stream of the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the returned future is executed. Only one + /// future is executed at the time. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `then` methods in the + /// standard library. + /// + /// Be aware that if the future is not `Unpin`, then neither is the `Stream` + /// returned by this method. To handle this, you can use `tokio::pin!` as in + /// the example below or put the stream in a `Box` with `Box::pin(stream)`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// async fn do_async_work(value: i32) -> i32 { + /// value + 3 + /// } + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.then(do_async_work); + /// + /// tokio::pin!(stream); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + Then::new(self, f) + } + + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. You should not chain calls to `merge`, as this + /// will break the fairness of the merging. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// For merging multiple streams, consider using [`StreamMap`] instead. + /// + /// [`StreamMap`]: crate::StreamMap + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamExt, Stream}; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// use std::pin::Pin; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// # time::pause(); + /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); + /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); + /// + /// // Convert the channels to a `Stream`. + /// let rx1 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx1.recv().await { + /// yield item; + /// } + /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; + /// + /// let rx2 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx2.recv().await { + /// yield item; + /// } + /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::sleep(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::sleep(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::sleep(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge<U>(self, other: U) -> Merge<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Merge::new(self, other) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate `f` + /// will be run against them. If the predicate + /// resolves to `true`, then the stream will yield the value, but if the + /// predicate resolves to `false`, then the value + /// will be discarded and the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter(|x| x % 2 == 0); + /// + /// assert_eq!(Some(2), evens.next().await); + /// assert_eq!(Some(4), evens.next().await); + /// assert_eq!(Some(6), evens.next().await); + /// assert_eq!(Some(8), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter<F>(self, f: F) -> Filter<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + Filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type according to the provided closure. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate `f` resolves to + /// [`Some(item)`](Some) then the stream will yield the value `item`, but if + /// it resolves to [`None`], then the value will be skipped. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter_map`] method in the + /// standard library. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter_map(|x| { + /// if x % 2 == 0 { Some(x + 1) } else { None } + /// }); + /// + /// assert_eq!(Some(3), evens.next().await); + /// assert_eq!(Some(5), evens.next().await); + /// assert_eq!(Some(7), evens.next().await); + /// assert_eq!(Some(9), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + FilterMap::new(self, f) + } + + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + Fuse::new(self) + } + + /// Creates a new stream of at most `n` items of the underlying stream. + /// + /// Once `n` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take(3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take(self, n: usize) -> Take<Self> + where + Self: Sized, + { + Take::new(self, n) + } + + /// Take elements from this stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take_while<F>(self, f: F) -> TakeWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + TakeWhile::new(self, f) + } + + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip<Self> + where + Self: Sized, + { + Skip::new(self, n) + } + + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + + /// Tests if every element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all<F>(&mut self, f: F) -> bool; + /// ``` + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all`. If any of them return `false`, it + /// returns `false`. An empty stream returns `true`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).all(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).all(|&x| x > 2).await); + /// # } + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(!iter.all(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&3)); + /// # } + /// ``` + fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AllFuture::new(self, f) + } + + /// Tests if any element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any<F>(&mut self, f: F) -> bool; + /// ``` + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } + + /// Combine two streams into one by first returning all values from the + /// first stream then all values from the second stream. + /// + /// As long as `self` still has values to emit, no values from `other` are + /// emitted, even if some are ready. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let one = stream::iter(vec![1, 2, 3]); + /// let two = stream::iter(vec![4, 5, 6]); + /// + /// let mut stream = one.chain(two); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn chain<U>(self, other: U) -> Chain<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Chain::new(self, other) + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold<B, F>(self, init: B, f: F) -> B; + /// ``` + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + + /// Drain stream pushing all emitted values into a collection. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect<T>(self) -> T; + /// ``` + /// + /// `collect` streams all values, awaiting as needed. Values are pushed into + /// a collection. A number of different target collection types are + /// supported, including [`Vec`](std::vec::Vec), + /// [`String`](std::string::String), and [`Bytes`]. + /// + /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html + /// + /// # `Result` + /// + /// `collect()` can also be used with streams of type `Result<T, E>` where + /// `T: FromStream<_>`. In this case, `collect()` will stream as long as + /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, + /// streaming is terminated and `collect()` returns the `Err`. + /// + /// # Notes + /// + /// `FromStream` is currently a sealed trait. Stabilization is pending + /// enhancements to the Rust language. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let doubled: Vec<i32> = + /// stream::iter(vec![1, 2, 3]) + /// .map(|x| x * 2) + /// .collect() + /// .await; + /// + /// assert_eq!(vec![2, 4, 6], doubled); + /// } + /// ``` + /// + /// Collecting a stream of `Result` values + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// // A stream containing only `Ok` values will be collected + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) + /// .collect() + /// .await; + /// + /// assert_eq!(Ok(vec![1, 2, 3]), values); + /// + /// // A stream containing `Err` values will return the first error. + /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; + /// + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(results) + /// .collect() + /// .await; + /// + /// assert_eq!(Err("no"), values); + /// } + /// ``` + fn collect<T>(self) -> Collect<Self, T> + where + T: FromStream<Self::Item>, + Self: Sized, + { + Collect::new(self) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout(Duration::from_secs(1)); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, duration) + } + + /// Slows down a stream by enforcing a delay between items. + /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio_stream::StreamExt; + /// + /// # async fn dox() { + /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// tokio::pin!(item_stream); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + throttle(duration, self) + } + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first item of a batch is greater than the given duration. + /// 3. The end of the stream is reached. + /// + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches + /// will not be emitted if no items are received upstream. + /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// + /// # Example + /// + /// ```rust + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// # async fn _unused() {} + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// let iter = vec![1, 2, 3, 4].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![5].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); + /// + /// // a full batch was received + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before max_size was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // last element in the stream + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + #[track_caller] + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> + where + Self: Sized, + { + assert!(max_size > 0, "`max_size` must be non-zero."); + ChunksTimeout::new(self, max_size, duration) + } +} + +impl<St: ?Sized> StreamExt for St where St: Stream {} + +/// Merge the size hints from two streams. +fn merge_size_hints( + (left_low, left_high): (usize, Option<usize>), + (right_low, right_high): (usize, Option<usize>), +) -> (usize, Option<usize>) { + let low = left_low.saturating_add(right_low); + let high = match (left_high, right_high) { + (Some(h1), Some(h2)) => h1.checked_add(h2), + _ => None, + }; + (low, high) +} |