summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/stream/try_stream
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/futures-util/src/stream/try_stream
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-util/src/stream/try_stream')
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/and_then.rs105
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/into_async_read.rs166
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/into_stream.rs52
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/mod.rs1064
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/or_else.rs109
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_buffer_unordered.rs86
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_buffered.rs87
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_chunks.rs131
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_collect.rs52
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_concat.rs51
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_filter.rs112
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_filter_map.rs106
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_flatten.rs84
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_fold.rs93
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_for_each.rs68
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_for_each_concurrent.rs133
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_next.rs34
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_skip_while.rs120
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_take_while.rs129
-rw-r--r--third_party/rust/futures-util/src/stream/try_stream/try_unfold.rs122
20 files changed, 2904 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/stream/try_stream/and_then.rs b/third_party/rust/futures-util/src/stream/try_stream/and_then.rs
new file mode 100644
index 0000000000..2f8b6f2589
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/and_then.rs
@@ -0,0 +1,105 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct AndThen<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AndThen")
+ .field("stream", &self.stream)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> AndThen<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: TryFuture<Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, future: None, f }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for AndThen<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: TryFuture<Error = St::Error>,
+{
+ type Item = Result<Fut::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ this.future.set(None);
+ break Some(item);
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.future.set(Some((this.f)(item)));
+ } else {
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let future_len = usize::from(self.future.is_some());
+ let (lower, upper) = self.stream.size_hint();
+ let lower = lower.saturating_add(future_len);
+ let upper = match upper {
+ Some(x) => x.checked_add(future_len),
+ None => None,
+ };
+ (lower, upper)
+ }
+}
+
+impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: TryFuture<Error = St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.future.is_none() && self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>
+where
+ S: Sink<Item>,
+{
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/into_async_read.rs b/third_party/rust/futures-util/src/stream/try_stream/into_async_read.rs
new file mode 100644
index 0000000000..ffbfc7eae9
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/into_async_read.rs
@@ -0,0 +1,166 @@
+use core::pin::Pin;
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::cmp;
+use std::io::{Error, Result};
+
+pin_project! {
+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ pub struct IntoAsyncRead<St>
+ where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+ {
+ #[pin]
+ stream: St,
+ state: ReadState<St::Ok>,
+ }
+}
+
+#[derive(Debug)]
+enum ReadState<T: AsRef<[u8]>> {
+ Ready { chunk: T, chunk_start: usize },
+ PendingChunk,
+ Eof,
+}
+
+impl<St> IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+{
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, state: ReadState::PendingChunk }
+ }
+}
+
+impl<St> AsyncRead for IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize>> {
+ let mut this = self.project();
+
+ loop {
+ match this.state {
+ ReadState::Ready { chunk, chunk_start } => {
+ let chunk = chunk.as_ref();
+ let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
+
+ buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]);
+ *chunk_start += len;
+
+ if chunk.len() == *chunk_start {
+ *this.state = ReadState::PendingChunk;
+ }
+
+ return Poll::Ready(Ok(len));
+ }
+ ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(chunk)) => {
+ if !chunk.as_ref().is_empty() {
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
+ }
+ }
+ Some(Err(err)) => {
+ *this.state = ReadState::Eof;
+ return Poll::Ready(Err(err));
+ }
+ None => {
+ *this.state = ReadState::Eof;
+ return Poll::Ready(Ok(0));
+ }
+ },
+ ReadState::Eof => {
+ return Poll::Ready(Ok(0));
+ }
+ }
+ }
+ }
+}
+
+impl<St> AsyncWrite for IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error> + AsyncWrite,
+ St::Ok: AsRef<[u8]>,
+{
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_close(cx)
+ }
+}
+
+impl<St> AsyncBufRead for IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+{
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ let mut this = self.project();
+
+ while let ReadState::PendingChunk = this.state {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(chunk)) => {
+ if !chunk.as_ref().is_empty() {
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
+ }
+ }
+ Some(Err(err)) => {
+ *this.state = ReadState::Eof;
+ return Poll::Ready(Err(err));
+ }
+ None => {
+ *this.state = ReadState::Eof;
+ return Poll::Ready(Ok(&[]));
+ }
+ }
+ }
+
+ if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
+ let chunk = chunk.as_ref();
+ return Poll::Ready(Ok(&chunk[chunk_start..]));
+ }
+
+ // To get to this point we must be in ReadState::Eof
+ Poll::Ready(Ok(&[]))
+ }
+
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ let this = self.project();
+
+ // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
+ if amount == 0 {
+ return;
+ }
+ if let ReadState::Ready { chunk, chunk_start } = this.state {
+ *chunk_start += amount;
+ debug_assert!(*chunk_start <= chunk.as_ref().len());
+ if *chunk_start >= chunk.as_ref().len() {
+ *this.state = ReadState::PendingChunk;
+ }
+ } else {
+ debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/into_stream.rs b/third_party/rust/futures-util/src/stream/try_stream/into_stream.rs
new file mode 100644
index 0000000000..2126258af7
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/into_stream.rs
@@ -0,0 +1,52 @@
+use core::pin::Pin;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct IntoStream<St> {
+ #[pin]
+ stream: St,
+ }
+}
+
+impl<St> IntoStream<St> {
+ #[inline]
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St: TryStream> Stream for IntoStream<St> {
+ type Item = Result<St::Ok, St::Error>;
+
+ #[inline]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().stream.try_poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S: Sink<Item>, Item> Sink<Item> for IntoStream<S> {
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/mod.rs b/third_party/rust/futures-util/src/stream/try_stream/mod.rs
new file mode 100644
index 0000000000..bc4c6e4f6a
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/mod.rs
@@ -0,0 +1,1064 @@
+//! Streams
+//!
+//! This module contains a number of functions for working with `Streams`s
+//! that return `Result`s, allowing for short-circuiting computations.
+
+#[cfg(feature = "compat")]
+use crate::compat::Compat;
+use crate::fns::{
+ inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
+ IntoFn, MapErrFn, MapOkFn,
+};
+use crate::future::assert_future;
+use crate::stream::assert_stream;
+use crate::stream::{Inspect, Map};
+#[cfg(feature = "alloc")]
+use alloc::vec::Vec;
+use core::pin::Pin;
+use futures_core::{
+ future::{Future, TryFuture},
+ stream::TryStream,
+ task::{Context, Poll},
+};
+
+mod and_then;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::and_then::AndThen;
+
+delegate_all!(
+ /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
+ ErrInto<St, E>(
+ MapErr<St, IntoFn<E>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
+ InspectOk<St, F>(
+ Inspect<IntoStream<St>, InspectOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
+ InspectErr<St, F>(
+ Inspect<IntoStream<St>, InspectErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
+);
+
+mod into_stream;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::into_stream::IntoStream;
+
+delegate_all!(
+ /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
+ MapOk<St, F>(
+ Map<IntoStream<St>, MapOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
+ MapErr<St, F>(
+ Map<IntoStream<St>, MapErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
+);
+
+mod or_else;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::or_else::OrElse;
+
+mod try_next;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_next::TryNext;
+
+mod try_for_each;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_for_each::TryForEach;
+
+mod try_filter;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_filter::TryFilter;
+
+mod try_filter_map;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_filter_map::TryFilterMap;
+
+mod try_flatten;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten::TryFlatten;
+
+mod try_collect;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_collect::TryCollect;
+
+mod try_concat;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_concat::TryConcat;
+
+#[cfg(feature = "alloc")]
+mod try_chunks;
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_chunks::{TryChunks, TryChunksError};
+
+mod try_fold;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_fold::TryFold;
+
+mod try_unfold;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_unfold::{try_unfold, TryUnfold};
+
+mod try_skip_while;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_skip_while::TrySkipWhile;
+
+mod try_take_while;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_take_while::TryTakeWhile;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_buffer_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_buffer_unordered::TryBufferUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_buffered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_buffered::TryBuffered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_for_each_concurrent;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_for_each_concurrent::TryForEachConcurrent;
+
+#[cfg(feature = "io")]
+#[cfg(feature = "std")]
+mod into_async_read;
+#[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::into_async_read::IntoAsyncRead;
+
+impl<S: ?Sized + TryStream> TryStreamExt for S {}
+
+/// Adapters specific to `Result`-returning streams
+pub trait TryStreamExt: TryStream {
+ /// Wraps the current stream in a new stream which converts the error type
+ /// into the one provided.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let mut stream =
+ /// stream::iter(vec![Ok(()), Err(5i32)])
+ /// .err_into::<i64>();
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(())));
+ /// assert_eq!(stream.try_next().await, Err(5i64));
+ /// # })
+ /// ```
+ fn err_into<E>(self) -> ErrInto<Self, E>
+ where
+ Self: Sized,
+ Self::Error: Into<E>,
+ {
+ assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
+ }
+
+ /// Wraps the current stream in a new stream which maps the success value
+ /// using the provided closure.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let mut stream =
+ /// stream::iter(vec![Ok(5), Err(0)])
+ /// .map_ok(|x| x + 2);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(7)));
+ /// assert_eq!(stream.try_next().await, Err(0));
+ /// # })
+ /// ```
+ fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
+ where
+ Self: Sized,
+ F: FnMut(Self::Ok) -> T,
+ {
+ assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
+ }
+
+ /// Wraps the current stream in a new stream which maps the error value
+ /// using the provided closure.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let mut stream =
+ /// stream::iter(vec![Ok(5), Err(0)])
+ /// .map_err(|x| x + 2);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(5)));
+ /// assert_eq!(stream.try_next().await, Err(2));
+ /// # })
+ /// ```
+ fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
+ where
+ Self: Sized,
+ F: FnMut(Self::Error) -> E,
+ {
+ assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
+ }
+
+ /// Chain on a computation for when a value is ready, passing the successful
+ /// results to the provided closure `f`.
+ ///
+ /// This function can be used to run a unit of work when the next successful
+ /// value on a stream is ready. The closure provided will be yielded a value
+ /// when ready, and the returned future will then be run to completion to
+ /// produce the next value on this stream.
+ ///
+ /// Any errors produced by this stream will not be passed to the closure,
+ /// and will be passed through.
+ ///
+ /// The returned value of the closure must implement the `TryFuture` trait
+ /// and can represent some more work to be done before the composed stream
+ /// is finished.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ ///
+ /// To process the entire stream and return a single future representing
+ /// success or error, use `try_for_each` instead.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::channel::mpsc;
+ /// use futures::future;
+ /// use futures::stream::TryStreamExt;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
+ ///
+ /// let rx = rx.and_then(|result| {
+ /// future::ok(if result % 2 == 0 {
+ /// Some(result)
+ /// } else {
+ /// None
+ /// })
+ /// });
+ /// ```
+ fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
+ where
+ F: FnMut(Self::Ok) -> Fut,
+ Fut: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
+ }
+
+ /// Chain on a computation for when an error happens, passing the
+ /// erroneous result to the provided closure `f`.
+ ///
+ /// This function can be used to run a unit of work and attempt to recover from
+ /// an error if one happens. The closure provided will be yielded an error
+ /// when one appears, and the returned future will then be run to completion
+ /// to produce the next value on this stream.
+ ///
+ /// Any successful values produced by this stream will not be passed to the
+ /// closure, and will be passed through.
+ ///
+ /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
+ /// and can represent some more work to be done before the composed stream
+ /// is finished.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
+ where
+ F: FnMut(Self::Error) -> Fut,
+ Fut: TryFuture<Ok = Self::Ok>,
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
+ }
+
+ /// Do something with the success value of this stream, afterwards passing
+ /// it on.
+ ///
+ /// This is similar to the `StreamExt::inspect` method where it allows
+ /// easily inspecting the success value as it passes through the stream, for
+ /// example to debug what's going on.
+ fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
+ where
+ F: FnMut(&Self::Ok),
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
+ }
+
+ /// Do something with the error value of this stream, afterwards passing it on.
+ ///
+ /// This is similar to the `StreamExt::inspect` method where it allows
+ /// easily inspecting the error value as it passes through the stream, for
+ /// example to debug what's going on.
+ fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
+ where
+ F: FnMut(&Self::Error),
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
+ }
+
+ /// Wraps a [`TryStream`] into a type that implements
+ /// [`Stream`](futures_core::stream::Stream)
+ ///
+ /// [`TryStream`]s currently do not implement the
+ /// [`Stream`](futures_core::stream::Stream) trait because of limitations
+ /// of the compiler.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::stream::{Stream, TryStream, TryStreamExt};
+ ///
+ /// # type T = i32;
+ /// # type E = ();
+ /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
+ /// # futures::stream::empty()
+ /// # }
+ /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
+ ///
+ /// take_stream(make_try_stream().into_stream());
+ /// ```
+ fn into_stream(self) -> IntoStream<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
+ }
+
+ /// Creates a future that attempts to resolve the next item in the stream.
+ /// If an error is encountered before the next item, the error is returned
+ /// instead.
+ ///
+ /// This is similar to the `Stream::next` combinator, but returns a
+ /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
+ /// for easy use with the `?` operator.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(())));
+ /// assert_eq!(stream.try_next().await, Err(()));
+ /// # })
+ /// ```
+ fn try_next(&mut self) -> TryNext<'_, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
+ }
+
+ /// Attempts to run this stream to completion, executing the provided
+ /// asynchronous closure for each element on the stream.
+ ///
+ /// The provided closure will be called for each item this stream produces,
+ /// yielding a future. That future will then be executed to completion
+ /// before moving on to the next item.
+ ///
+ /// The returned value is a [`Future`](futures_core::future::Future) where the
+ /// [`Output`](futures_core::future::Future::Output) type is
+ /// `Result<(), Self::Error>`. If any of the intermediate
+ /// futures or the stream returns an error, this future will return
+ /// immediately with an error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let mut x = 0i32;
+ ///
+ /// {
+ /// let fut = stream::repeat(Ok(1)).try_for_each(|item| {
+ /// x += item;
+ /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
+ /// });
+ /// assert_eq!(fut.await, Err(()));
+ /// }
+ ///
+ /// assert_eq!(x, 3);
+ /// # })
+ /// ```
+ fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
+ where
+ F: FnMut(Self::Ok) -> Fut,
+ Fut: TryFuture<Ok = (), Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
+ }
+
+ /// Skip elements on this stream while the provided asynchronous predicate
+ /// resolves to `true`.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
+ /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
+ ///
+ /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
+ /// assert_eq!(output, Ok(vec![3, 2]));
+ /// # })
+ /// ```
+ fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
+ where
+ F: FnMut(&Self::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
+ }
+
+ /// Take elements on this stream while the provided asynchronous predicate
+ /// resolves to `true`.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
+ /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
+ ///
+ /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
+ /// assert_eq!(output, Ok(vec![1, 2]));
+ /// # })
+ /// ```
+ fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
+ where
+ F: FnMut(&Self::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
+ }
+
+ /// Attempts to run this stream to completion, executing the provided asynchronous
+ /// closure for each element on the stream concurrently as elements become
+ /// available, exiting as soon as an error occurs.
+ ///
+ /// This is similar to
+ /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
+ /// but will resolve to an error immediately if the underlying stream or the provided
+ /// closure return an error.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::oneshot;
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ ///
+ /// let (tx1, rx1) = oneshot::channel();
+ /// let (tx2, rx2) = oneshot::channel();
+ /// let (_tx3, rx3) = oneshot::channel();
+ ///
+ /// let stream = stream::iter(vec![rx1, rx2, rx3]);
+ /// let fut = stream.map(Ok).try_for_each_concurrent(
+ /// /* limit */ 2,
+ /// |rx| async move {
+ /// let res: Result<(), oneshot::Canceled> = rx.await;
+ /// res
+ /// }
+ /// );
+ ///
+ /// tx1.send(()).unwrap();
+ /// // Drop the second sender so that `rx2` resolves to `Canceled`.
+ /// drop(tx2);
+ ///
+ /// // The final result is an error because the second future
+ /// // resulted in an error.
+ /// assert_eq!(Err(oneshot::Canceled), fut.await);
+ /// # })
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_for_each_concurrent<Fut, F>(
+ self,
+ limit: impl Into<Option<usize>>,
+ f: F,
+ ) -> TryForEachConcurrent<Self, Fut, F>
+ where
+ F: FnMut(Self::Ok) -> Fut,
+ Fut: Future<Output = Result<(), Self::Error>>,
+ Self: Sized,
+ {
+ assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
+ self,
+ limit.into(),
+ f,
+ ))
+ }
+
+ /// Attempt to transform a stream into a collection,
+ /// returning a future representing the result of that computation.
+ ///
+ /// This combinator will collect all successful results of this stream and
+ /// collect them into the specified collection type. If an error happens then all
+ /// collected elements will be dropped and the error will be returned.
+ ///
+ /// The returned future will be resolved when the stream terminates.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::TryStreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx, rx) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// for i in 1..=5 {
+ /// tx.unbounded_send(Ok(i)).unwrap();
+ /// }
+ /// tx.unbounded_send(Err(6)).unwrap();
+ /// });
+ ///
+ /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
+ /// assert_eq!(output, Err(6));
+ /// # })
+ /// ```
+ fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
+ where
+ Self: Sized,
+ {
+ assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
+ }
+
+ /// An adaptor for chunking up successful items of the stream inside a vector.
+ ///
+ /// This combinator will attempt to pull successful items from this stream and buffer
+ /// them into a local vector. At most `capacity` items will get buffered
+ /// before they're yielded from the returned stream.
+ ///
+ /// Note that the vectors returned from this iterator may not always have
+ /// `capacity` elements. If the underlying stream ended and only a partial
+ /// vector was created, it'll be returned. Additionally if an error happens
+ /// from the underlying stream then the currently buffered items will be
+ /// yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryChunksError, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
+ /// let mut stream = stream.try_chunks(2);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
+ /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
+ /// # })
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `capacity` is zero.
+ #[cfg(feature = "alloc")]
+ fn try_chunks(self, capacity: usize) -> TryChunks<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
+ TryChunks::new(self, capacity),
+ )
+ }
+
+ /// Attempt to filter the values produced by this stream according to the
+ /// provided asynchronous closure.
+ ///
+ /// As values of this stream are made available, the provided predicate `f`
+ /// will be run on them. If the predicate returns a `Future` which resolves
+ /// to `true`, then the stream will yield the value, but if the predicate
+ /// return a `Future` which resolves to `false`, then the value will be
+ /// discarded and the next value will be produced.
+ ///
+ /// All errors are passed through without filtering in this combinator.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `filter` methods in
+ /// the standard library.
+ ///
+ /// # Examples
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
+ /// let mut evens = stream.try_filter(|x| {
+ /// future::ready(x % 2 == 0)
+ /// });
+ ///
+ /// assert_eq!(evens.next().await, Some(Ok(2)));
+ /// assert_eq!(evens.next().await, Some(Err("error")));
+ /// # })
+ /// ```
+ fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
+ where
+ Fut: Future<Output = bool>,
+ F: FnMut(&Self::Ok) -> Fut,
+ Self: Sized,
+ {
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
+ }
+
+ /// Attempt to filter the values produced by this stream while
+ /// simultaneously mapping them to a different type according to the
+ /// provided asynchronous closure.
+ ///
+ /// As values of this stream are made available, the provided function will
+ /// be run on them. If the future returned by the predicate `f` resolves to
+ /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
+ /// it resolves to [`None`] then the next value will be produced.
+ ///
+ /// All errors are passed through without filtering in this combinator.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `filter_map` methods in
+ /// the standard library.
+ ///
+ /// # Examples
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ /// use futures::pin_mut;
+ ///
+ /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
+ /// let halves = stream.try_filter_map(|x| async move {
+ /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
+ /// Ok(ret)
+ /// });
+ ///
+ /// pin_mut!(halves);
+ /// assert_eq!(halves.next().await, Some(Ok(3)));
+ /// assert_eq!(halves.next().await, Some(Err("error")));
+ /// # })
+ /// ```
+ fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
+ where
+ Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
+ F: FnMut(Self::Ok) -> Fut,
+ Self: Sized,
+ {
+ assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
+ }
+
+ /// Flattens a stream of streams into just one continuous stream.
+ ///
+ /// If this stream's elements are themselves streams then this combinator
+ /// will flatten out the entire stream to one long chain of elements. Any
+ /// errors are passed through without looking at them, but otherwise each
+ /// individual stream will get exhausted before moving on to the next.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(Ok(1)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(Ok(2)).unwrap();
+ /// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(Ok(rx1)).unwrap();
+ /// tx3.unbounded_send(Ok(rx2)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
+ /// });
+ ///
+ /// let mut stream = rx3.try_flatten();
+ /// assert_eq!(stream.next().await, Some(Ok(1)));
+ /// assert_eq!(stream.next().await, Some(Ok(2)));
+ /// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
+ /// # });
+ /// ```
+ fn try_flatten(self) -> TryFlatten<Self>
+ where
+ Self::Ok: TryStream,
+ <Self::Ok as TryStream>::Error: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlatten::new(self),
+ )
+ }
+
+ /// Attempt to execute an accumulating asynchronous computation over a
+ /// stream, collecting all the values into one final result.
+ ///
+ /// This combinator will accumulate all values returned by this stream
+ /// according to the closure provided. The initial state is also provided to
+ /// this method and then is returned again by each execution of the closure.
+ /// Once the entire stream has been exhausted the returned future will
+ /// resolve to this value.
+ ///
+ /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
+ /// exit early if an error is encountered in either the stream or the
+ /// provided closure.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
+ /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
+ /// assert_eq!(sum.await, Ok(3));
+ ///
+ /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
+ /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
+ /// assert_eq!(sum.await, Err(2));
+ /// # })
+ /// ```
+ fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
+ where
+ F: FnMut(T, Self::Ok) -> Fut,
+ Fut: TryFuture<Ok = T, Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
+ }
+
+ /// Attempt to concatenate all items of a stream into a single
+ /// extendable destination, returning a future representing the end result.
+ ///
+ /// This combinator will extend the first item with the contents of all
+ /// the subsequent successful results of the stream. If the stream is empty,
+ /// the default value will be returned.
+ ///
+ /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
+ ///
+ /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
+ /// exit early if an error is encountered in the stream.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::TryStreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
+ ///
+ /// thread::spawn(move || {
+ /// for i in (0..3).rev() {
+ /// let n = i * 3;
+ /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
+ /// }
+ /// });
+ ///
+ /// let result = rx.try_concat().await;
+ ///
+ /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
+ /// # });
+ /// ```
+ fn try_concat(self) -> TryConcat<Self>
+ where
+ Self: Sized,
+ Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
+ {
+ assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
+ }
+
+ /// Attempt to execute several futures from a stream concurrently (unordered).
+ ///
+ /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
+ /// that matches the stream's `Error` type.
+ ///
+ /// This adaptor will buffer up to `n` futures and then return their
+ /// outputs in the order in which they complete. If the underlying stream
+ /// returns an error, it will be immediately propagated.
+ ///
+ /// The returned stream will be a stream of results, each containing either
+ /// an error or a future's output. An error can be produced either by the
+ /// underlying stream itself or by one of the futures it yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// Results are returned in the order of completion:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::oneshot;
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ ///
+ /// let (send_one, recv_one) = oneshot::channel();
+ /// let (send_two, recv_two) = oneshot::channel();
+ ///
+ /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
+ ///
+ /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
+ ///
+ /// send_two.send(2i32)?;
+ /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
+ ///
+ /// send_one.send(1i32)?;
+ /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
+ ///
+ /// assert_eq!(buffered.next().await, None);
+ /// # Ok::<(), i32>(()) }).unwrap();
+ /// ```
+ ///
+ /// Errors from the underlying stream itself are propagated:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ ///
+ /// let (sink, stream_of_futures) = mpsc::unbounded();
+ /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
+ ///
+ /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
+ /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
+ ///
+ /// sink.unbounded_send(Err("error in the stream"))?;
+ /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
+ where
+ Self::Ok: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
+ TryBufferUnordered::new(self, n),
+ )
+ }
+
+ /// Attempt to execute several futures from a stream concurrently.
+ ///
+ /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
+ /// that matches the stream's `Error` type.
+ ///
+ /// This adaptor will buffer up to `n` futures and then return their
+ /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
+ /// be immediately propagated.
+ ///
+ /// The returned stream will be a stream of results, each containing either
+ /// an error or a future's output. An error can be produced either by the
+ /// underlying stream itself or by one of the futures it yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// Results are returned in the order of addition:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::oneshot;
+ /// use futures::future::lazy;
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ ///
+ /// let (send_one, recv_one) = oneshot::channel();
+ /// let (send_two, recv_two) = oneshot::channel();
+ ///
+ /// let mut buffered = lazy(move |cx| {
+ /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
+ ///
+ /// let mut buffered = stream_of_futures.try_buffered(10);
+ ///
+ /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
+ ///
+ /// send_two.send(2i32)?;
+ /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
+ /// Ok::<_, i32>(buffered)
+ /// }).await?;
+ ///
+ /// send_one.send(1i32)?;
+ /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
+ /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
+ ///
+ /// assert_eq!(buffered.next().await, None);
+ /// # Ok::<(), i32>(()) }).unwrap();
+ /// ```
+ ///
+ /// Errors from the underlying stream itself are propagated:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ ///
+ /// let (sink, stream_of_futures) = mpsc::unbounded();
+ /// let mut buffered = stream_of_futures.try_buffered(10);
+ ///
+ /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
+ /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
+ ///
+ /// sink.unbounded_send(Err("error in the stream"))?;
+ /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_buffered(self, n: usize) -> TryBuffered<Self>
+ where
+ Self::Ok: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
+ self, n,
+ ))
+ }
+
+ // TODO: false positive warning from rustdoc. Verify once #43466 settles
+ //
+ /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
+ /// stream types.
+ fn try_poll_next_unpin(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).try_poll_next(cx)
+ }
+
+ /// Wraps a [`TryStream`] into a stream compatible with libraries using
+ /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
+ /// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
+ /// use futures::future::{FutureExt, TryFutureExt};
+ /// # let (tx, rx) = futures::channel::oneshot::channel();
+ ///
+ /// let future03 = async {
+ /// println!("Running on the pool");
+ /// tx.send(42).unwrap();
+ /// };
+ ///
+ /// let future01 = future03
+ /// .unit_error() // Make it a TryFuture
+ /// .boxed() // Make it Unpin
+ /// .compat();
+ ///
+ /// tokio::run(future01);
+ /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
+ /// ```
+ #[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
+ fn compat(self) -> Compat<Self>
+ where
+ Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+
+ /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryStreamExt};
+ /// use futures::io::AsyncReadExt;
+ ///
+ /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
+ /// let mut reader = stream.into_async_read();
+ ///
+ /// let mut buf = Vec::new();
+ /// reader.read_to_end(&mut buf).await.unwrap();
+ /// assert_eq!(buf, [1, 2, 3, 4, 5]);
+ /// # })
+ /// ```
+ #[cfg(feature = "io")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ #[cfg(feature = "std")]
+ fn into_async_read(self) -> IntoAsyncRead<Self>
+ where
+ Self: Sized + TryStreamExt<Error = std::io::Error>,
+ Self::Ok: AsRef<[u8]>,
+ {
+ crate::io::assert_read(IntoAsyncRead::new(self))
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/or_else.rs b/third_party/rust/futures-util/src/stream/try_stream/or_else.rs
new file mode 100644
index 0000000000..53aceb8e64
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/or_else.rs
@@ -0,0 +1,109 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct OrElse<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OrElse")
+ .field("stream", &self.stream)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> OrElse<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Error) -> Fut,
+ Fut: TryFuture<Ok = St::Ok>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, future: None, f }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for OrElse<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Error) -> Fut,
+ Fut: TryFuture<Ok = St::Ok>,
+{
+ type Item = Result<St::Ok, Fut::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ this.future.set(None);
+ break Some(item);
+ } else {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(item)) => break Some(Ok(item)),
+ Some(Err(e)) => {
+ this.future.set(Some((this.f)(e)));
+ }
+ None => break None,
+ }
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let future_len = usize::from(self.future.is_some());
+ let (lower, upper) = self.stream.size_hint();
+ let lower = lower.saturating_add(future_len);
+ let upper = match upper {
+ Some(x) => x.checked_add(future_len),
+ None => None,
+ };
+ (lower, upper)
+ }
+}
+
+impl<St, Fut, F> FusedStream for OrElse<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(St::Error) -> Fut,
+ Fut: TryFuture<Ok = St::Ok>,
+{
+ fn is_terminated(&self) -> bool {
+ self.future.is_none() && self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>
+where
+ S: Sink<Item>,
+{
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/third_party/rust/futures-util/src/stream/try_stream/try_buffer_unordered.rs
new file mode 100644
index 0000000000..9a899d4ea6
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_buffer_unordered.rs
@@ -0,0 +1,86 @@
+use crate::future::{IntoFuture, TryFutureExt};
+use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt};
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the
+ /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryBufferUnordered<St>
+ where St: TryStream
+ {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
+ max: usize,
+ }
+}
+
+impl<St> TryBufferUnordered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture,
+{
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
+ stream: IntoStream::new(stream).fuse(),
+ in_progress_queue: FuturesUnordered::new(),
+ max: n,
+ }
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+impl<St> Stream for TryBufferUnordered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture<Error = St::Error>,
+{
+ type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ // First up, try to spawn off as many futures as possible by filling up
+ // our queue of futures. Propagate errors from the stream immediately.
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
+ Poll::Ready(None) | Poll::Pending => break,
+ }
+ }
+
+ // Attempt to pull the next value from the in_progress_queue
+ match this.in_progress_queue.poll_next_unpin(cx) {
+ x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
+ Poll::Ready(None) => {}
+ }
+
+ // If more values are still coming from the stream, we're not done yet
+ if this.stream.is_done() {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item, E> Sink<Item> for TryBufferUnordered<S>
+where
+ S: TryStream + Sink<Item, Error = E>,
+ S::Ok: TryFuture<Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_buffered.rs b/third_party/rust/futures-util/src/stream/try_stream/try_buffered.rs
new file mode 100644
index 0000000000..9f48e5c0a7
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_buffered.rs
@@ -0,0 +1,87 @@
+use crate::future::{IntoFuture, TryFutureExt};
+use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt};
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryBuffered<St>
+ where
+ St: TryStream,
+ St::Ok: TryFuture,
+ {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ in_progress_queue: FuturesOrdered<IntoFuture<St::Ok>>,
+ max: usize,
+ }
+}
+
+impl<St> TryBuffered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture,
+{
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
+ stream: IntoStream::new(stream).fuse(),
+ in_progress_queue: FuturesOrdered::new(),
+ max: n,
+ }
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+impl<St> Stream for TryBuffered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture<Error = St::Error>,
+{
+ type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ // First up, try to spawn off as many futures as possible by filling up
+ // our queue of futures. Propagate errors from the stream immediately.
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
+ Poll::Ready(None) | Poll::Pending => break,
+ }
+ }
+
+ // Attempt to pull the next value from the in_progress_queue
+ match this.in_progress_queue.poll_next_unpin(cx) {
+ x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
+ Poll::Ready(None) => {}
+ }
+
+ // If more values are still coming from the stream, we're not done yet
+ if this.stream.is_done() {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item, E> Sink<Item> for TryBuffered<S>
+where
+ S: TryStream + Sink<Item, Error = E>,
+ S::Ok: TryFuture<Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_chunks.rs b/third_party/rust/futures-util/src/stream/try_stream/try_chunks.rs
new file mode 100644
index 0000000000..3bb253a714
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_chunks.rs
@@ -0,0 +1,131 @@
+use crate::stream::{Fuse, IntoStream, StreamExt};
+
+use alloc::vec::Vec;
+use core::pin::Pin;
+use core::{fmt, mem};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryChunks<St: TryStream> {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ items: Vec<St::Ok>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<St: TryStream> TryChunks<St> {
+ pub(super) fn new(stream: St, capacity: usize) -> Self {
+ assert!(capacity > 0);
+
+ Self {
+ stream: IntoStream::new(stream).fuse(),
+ items: Vec::with_capacity(capacity),
+ cap: capacity,
+ }
+ }
+
+ fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
+ let cap = self.cap;
+ mem::replace(self.project().items, Vec::with_capacity(cap))
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+impl<St: TryStream> Stream for TryChunks<St> {
+ #[allow(clippy::type_complexity)]
+ type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.as_mut().project();
+ loop {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ // Push the item into the buffer and check whether it is full.
+ // If so, replace our buffer with a new and empty one and return
+ // the full one.
+ Some(item) => match item {
+ Ok(item) => {
+ this.items.push(item);
+ if this.items.len() >= *this.cap {
+ return Poll::Ready(Some(Ok(self.take())));
+ }
+ }
+ Err(e) => {
+ return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
+ }
+ },
+
+ // Since the underlying stream ran out of values, return what we
+ // have buffered, if we have anything.
+ None => {
+ let last = if this.items.is_empty() {
+ None
+ } else {
+ let full_buf = mem::take(this.items);
+ Some(full_buf)
+ };
+
+ return Poll::Ready(last.map(Ok));
+ }
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = usize::from(!self.items.is_empty());
+ let (lower, upper) = self.stream.size_hint();
+ let lower = (lower / self.cap).saturating_add(chunk_len);
+ let upper = match upper {
+ Some(x) => x.checked_add(chunk_len),
+ None => None,
+ };
+ (lower, upper)
+ }
+}
+
+impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.items.is_empty()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item> Sink<Item> for TryChunks<S>
+where
+ S: TryStream + Sink<Item>,
+{
+ type Error = <S as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
+
+/// Error indicating, that while chunk was collected inner stream produced an error.
+///
+/// Contains all items that were collected before an error occurred, and the stream error itself.
+#[derive(PartialEq, Eq)]
+pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
+
+impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_collect.rs b/third_party/rust/futures-util/src/stream/try_stream/try_collect.rs
new file mode 100644
index 0000000000..3e5963f033
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_collect.rs
@@ -0,0 +1,52 @@
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, TryStream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryCollect<St, C> {
+ #[pin]
+ stream: St,
+ items: C,
+ }
+}
+
+impl<St: TryStream, C: Default> TryCollect<St, C> {
+ pub(super) fn new(s: St) -> Self {
+ Self { stream: s, items: Default::default() }
+ }
+}
+
+impl<St, C> FusedFuture for TryCollect<St, C>
+where
+ St: TryStream + FusedStream,
+ C: Default + Extend<St::Ok>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St, C> Future for TryCollect<St, C>
+where
+ St: TryStream,
+ C: Default + Extend<St::Ok>,
+{
+ type Output = Result<C, St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ Poll::Ready(Ok(loop {
+ match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ Some(x) => this.items.extend(Some(x)),
+ None => break mem::take(this.items),
+ }
+ }))
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_concat.rs b/third_party/rust/futures-util/src/stream/try_stream/try_concat.rs
new file mode 100644
index 0000000000..58fb6a5413
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_concat.rs
@@ -0,0 +1,51 @@
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryConcat<St: TryStream> {
+ #[pin]
+ stream: St,
+ accum: Option<St::Ok>,
+ }
+}
+
+impl<St> TryConcat<St>
+where
+ St: TryStream,
+ St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
+{
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, accum: None }
+ }
+}
+
+impl<St> Future for TryConcat<St>
+where
+ St: TryStream,
+ St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
+{
+ type Output = Result<St::Ok, St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ Poll::Ready(Ok(loop {
+ if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ if let Some(a) = this.accum {
+ a.extend(x)
+ } else {
+ *this.accum = Some(x)
+ }
+ } else {
+ break this.accum.take().unwrap_or_default();
+ }
+ }))
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_filter.rs b/third_party/rust/futures-util/src/stream/try_stream/try_filter.rs
new file mode 100644
index 0000000000..11d58243fd
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_filter.rs
@@ -0,0 +1,112 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFilter<St, Fut, F>
+ where St: TryStream
+ {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F>
+where
+ St: TryStream + fmt::Debug,
+ St::Ok: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryFilter")
+ .field("stream", &self.stream)
+ .field("pending_fut", &self.pending_fut)
+ .field("pending_item", &self.pending_item)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryFilter<St, Fut, F>
+where
+ St: TryStream,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, pending_fut: None, pending_item: None }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ fn is_terminated(&self) -> bool {
+ self.pending_fut.is_none() && self.stream.is_terminated()
+ }
+}
+
+impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
+where
+ St: TryStream,
+ Fut: Future<Output = bool>,
+ F: FnMut(&St::Ok) -> Fut,
+{
+ type Item = Result<St::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.poll(cx));
+ this.pending_fut.set(None);
+ if res {
+ break this.pending_item.take().map(Ok);
+ }
+ *this.pending_item = None;
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let pending_len = usize::from(self.pending_fut.is_some());
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item, E> Sink<Item> for TryFilter<S, Fut, F>
+where
+ S: TryStream + Sink<Item, Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_filter_map.rs b/third_party/rust/futures-util/src/stream/try_stream/try_filter_map.rs
new file mode 100644
index 0000000000..ed1201732b
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_filter_map.rs
@@ -0,0 +1,106 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFilterMap<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryFilterMap")
+ .field("stream", &self.stream)
+ .field("pending", &self.pending)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryFilterMap<St, Fut, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, pending: None }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ Fut: TryFuture<Ok = Option<T>, Error = St::Error>,
+ F: FnMut(St::Ok) -> Fut,
+{
+ fn is_terminated(&self) -> bool {
+ self.pending.is_none() && self.stream.is_terminated()
+ }
+}
+
+impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
+where
+ St: TryStream,
+ Fut: TryFuture<Ok = Option<T>, Error = St::Error>,
+ F: FnMut(St::Ok) -> Fut,
+{
+ type Item = Result<T, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(p) = this.pending.as_mut().as_pin_mut() {
+ // We have an item in progress, poll that until it's done
+ let res = ready!(p.try_poll(cx));
+ this.pending.set(None);
+ let item = res?;
+ if item.is_some() {
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ // No item in progress, but the stream is still going
+ this.pending.set(Some((this.f)(item)));
+ } else {
+ // The stream is done
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let pending_len = usize::from(self.pending.is_some());
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item> Sink<Item> for TryFilterMap<S, Fut, F>
+where
+ S: Sink<Item>,
+{
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_flatten.rs b/third_party/rust/futures-util/src/stream/try_stream/try_flatten.rs
new file mode 100644
index 0000000000..4fc04a07bb
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_flatten.rs
@@ -0,0 +1,84 @@
+use core::pin::Pin;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFlatten<St>
+ where
+ St: TryStream,
+ {
+ #[pin]
+ stream: St,
+ #[pin]
+ next: Option<St::Ok>,
+ }
+}
+
+impl<St> TryFlatten<St>
+where
+ St: TryStream,
+ St::Ok: TryStream,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, next: None }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St> FusedStream for TryFlatten<St>
+where
+ St: TryStream + FusedStream,
+ St::Ok: TryStream,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.next.is_none() && self.stream.is_terminated()
+ }
+}
+
+impl<St> Stream for TryFlatten<St>
+where
+ St: TryStream,
+ St::Ok: TryStream,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(s) = this.next.as_mut().as_pin_mut() {
+ if let Some(item) = ready!(s.try_poll_next(cx)?) {
+ break Some(Ok(item));
+ } else {
+ this.next.set(None);
+ }
+ } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.next.set(Some(s));
+ } else {
+ break None;
+ }
+ })
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item> Sink<Item> for TryFlatten<S>
+where
+ S: TryStream + Sink<Item>,
+{
+ type Error = <S as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_fold.rs b/third_party/rust/futures-util/src/stream/try_stream/try_fold.rs
new file mode 100644
index 0000000000..d344d96e7d
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_fold.rs
@@ -0,0 +1,93 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryFold<St, Fut, T, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ accum: Option<T>,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+ T: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryFold")
+ .field("stream", &self.stream)
+ .field("accum", &self.accum)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, T, F> TryFold<St, Fut, T, F>
+where
+ St: TryStream,
+ F: FnMut(T, St::Ok) -> Fut,
+ Fut: TryFuture<Ok = T, Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F, t: T) -> Self {
+ Self { stream, f, accum: Some(t), future: None }
+ }
+}
+
+impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F>
+where
+ St: TryStream,
+ F: FnMut(T, St::Ok) -> Fut,
+ Fut: TryFuture<Ok = T, Error = St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.accum.is_none() && self.future.is_none()
+ }
+}
+
+impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
+where
+ St: TryStream,
+ F: FnMut(T, St::Ok) -> Fut,
+ Fut: TryFuture<Ok = T, Error = St::Error>,
+{
+ type Output = Result<T, St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ let res = ready!(fut.try_poll(cx));
+ this.future.set(None);
+ match res {
+ Ok(a) => *this.accum = Some(a),
+ Err(e) => break Err(e),
+ }
+ } else if this.accum.is_some() {
+ // we're waiting on a new item from the stream
+ let res = ready!(this.stream.as_mut().try_poll_next(cx));
+ let a = this.accum.take().unwrap();
+ match res {
+ Some(Ok(item)) => this.future.set(Some((this.f)(a, item))),
+ Some(Err(e)) => break Err(e),
+ None => break Ok(a),
+ }
+ } else {
+ panic!("Fold polled after completion")
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_for_each.rs b/third_party/rust/futures-util/src/stream/try_stream/try_for_each.rs
new file mode 100644
index 0000000000..6a081d84e7
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_for_each.rs
@@ -0,0 +1,68 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryForEach<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryForEach")
+ .field("stream", &self.stream)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryForEach<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: TryFuture<Ok = (), Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, future: None }
+ }
+}
+
+impl<St, Fut, F> Future for TryForEach<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: TryFuture<Ok = (), Error = St::Error>,
+{
+ type Output = Result<(), St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ ready!(fut.try_poll(cx))?;
+ this.future.set(None);
+ } else {
+ match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ Some(e) => this.future.set(Some((this.f)(e))),
+ None => break,
+ }
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_for_each_concurrent.rs b/third_party/rust/futures-util/src/stream/try_stream/try_for_each_concurrent.rs
new file mode 100644
index 0000000000..62734c746b
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_for_each_concurrent.rs
@@ -0,0 +1,133 @@
+use crate::stream::{FuturesUnordered, StreamExt};
+use core::fmt;
+use core::mem;
+use core::num::NonZeroUsize;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the
+ /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
+ /// method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryForEachConcurrent<St, Fut, F> {
+ #[pin]
+ stream: Option<St>,
+ f: F,
+ futures: FuturesUnordered<Fut>,
+ limit: Option<NonZeroUsize>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryForEachConcurrent")
+ .field("stream", &self.stream)
+ .field("futures", &self.futures)
+ .field("limit", &self.limit)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = Result<(), St::Error>>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_none() && self.futures.is_empty()
+ }
+}
+
+impl<St, Fut, F> TryForEachConcurrent<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = Result<(), St::Error>>,
+{
+ pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
+ Self {
+ stream: Some(stream),
+ // Note: `limit` = 0 gets ignored.
+ limit: limit.and_then(NonZeroUsize::new),
+ f,
+ futures: FuturesUnordered::new(),
+ }
+ }
+}
+
+impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = Result<(), St::Error>>,
+{
+ type Output = Result<(), St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ let mut made_progress_this_iter = false;
+
+ // Check if we've already created a number of futures greater than `limit`
+ if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
+ let poll_res = match this.stream.as_mut().as_pin_mut() {
+ Some(stream) => stream.try_poll_next(cx),
+ None => Poll::Ready(None),
+ };
+
+ let elem = match poll_res {
+ Poll::Ready(Some(Ok(elem))) => {
+ made_progress_this_iter = true;
+ Some(elem)
+ }
+ Poll::Ready(None) => {
+ this.stream.set(None);
+ None
+ }
+ Poll::Pending => None,
+ Poll::Ready(Some(Err(e))) => {
+ // Empty the stream and futures so that we know
+ // the future has completed.
+ this.stream.set(None);
+ drop(mem::replace(this.futures, FuturesUnordered::new()));
+ return Poll::Ready(Err(e));
+ }
+ };
+
+ if let Some(elem) = elem {
+ this.futures.push((this.f)(elem));
+ }
+ }
+
+ match this.futures.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
+ Poll::Ready(None) => {
+ if this.stream.is_none() {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Poll::Pending => {}
+ Poll::Ready(Some(Err(e))) => {
+ // Empty the stream and futures so that we know
+ // the future has completed.
+ this.stream.set(None);
+ drop(mem::replace(this.futures, FuturesUnordered::new()));
+ return Poll::Ready(Err(e));
+ }
+ }
+
+ if !made_progress_this_iter {
+ return Poll::Pending;
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_next.rs b/third_party/rust/futures-util/src/stream/try_stream/try_next.rs
new file mode 100644
index 0000000000..13fcf80cae
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_next.rs
@@ -0,0 +1,34 @@
+use crate::stream::TryStreamExt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::{FusedStream, TryStream};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`try_next`](super::TryStreamExt::try_next) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct TryNext<'a, St: ?Sized> {
+ stream: &'a mut St,
+}
+
+impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {}
+
+impl<'a, St: ?Sized + TryStream + Unpin> TryNext<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Self { stream }
+ }
+}
+
+impl<St: ?Sized + TryStream + Unpin + FusedStream> FusedFuture for TryNext<'_, St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St: ?Sized + TryStream + Unpin> Future for TryNext<'_, St> {
+ type Output = Result<Option<St::Ok>, St::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.stream.try_poll_next_unpin(cx)?.map(Ok)
+ }
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_skip_while.rs b/third_party/rust/futures-util/src/stream/try_stream/try_skip_while.rs
new file mode 100644
index 0000000000..52aa2d478b
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_skip_while.rs
@@ -0,0 +1,120 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ done_skipping: bool,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
+where
+ St: TryStream + fmt::Debug,
+ St::Ok: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TrySkipWhile")
+ .field("stream", &self.stream)
+ .field("pending_fut", &self.pending_fut)
+ .field("pending_item", &self.pending_item)
+ .field("done_skipping", &self.done_skipping)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TrySkipWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, pending_fut: None, pending_item: None, done_skipping: false }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ type Item = Result<St::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ if *this.done_skipping {
+ return this.stream.try_poll_next(cx);
+ }
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.try_poll(cx));
+ this.pending_fut.set(None);
+ let skipped = res?;
+ let item = this.pending_item.take();
+ if !skipped {
+ *this.done_skipping = true;
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let pending_len = usize::from(self.pending_item.is_some());
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
+}
+
+impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.pending_item.is_none() && self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F>
+where
+ S: TryStream + Sink<Item, Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_take_while.rs b/third_party/rust/futures-util/src/stream/try_stream/try_take_while.rs
new file mode 100644
index 0000000000..4b5ff1ad38
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_take_while.rs
@@ -0,0 +1,129 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryTakeWhile<St, Fut, F>
+ where
+ St: TryStream,
+ {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ done_taking: bool,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream + fmt::Debug,
+ St::Ok: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryTakeWhile")
+ .field("stream", &self.stream)
+ .field("pending_fut", &self.pending_fut)
+ .field("pending_item", &self.pending_item)
+ .field("done_taking", &self.done_taking)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryTakeWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, pending_fut: None, pending_item: None, done_taking: false }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ type Item = Result<St::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ if *this.done_taking {
+ return Poll::Ready(None);
+ }
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.try_poll(cx));
+ this.pending_fut.set(None);
+ let take = res?;
+ let item = this.pending_item.take();
+ if take {
+ break item.map(Ok);
+ } else {
+ *this.done_taking = true;
+ break None;
+ }
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done_taking {
+ return (0, Some(0));
+ }
+
+ let pending_len = usize::from(self.pending_item.is_some());
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
+}
+
+impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
+where
+ S: TryStream + Sink<Item, Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/third_party/rust/futures-util/src/stream/try_stream/try_unfold.rs b/third_party/rust/futures-util/src/stream/try_stream/try_unfold.rs
new file mode 100644
index 0000000000..fd9cdf1d8c
--- /dev/null
+++ b/third_party/rust/futures-util/src/stream/try_stream/try_unfold.rs
@@ -0,0 +1,122 @@
+use super::assert_stream;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
+///
+/// This function is the dual for the `TryStream::try_fold()` adapter: while
+/// `TryStream::try_fold()` reduces a `TryStream` to one single value,
+/// `try_unfold()` creates a `TryStream` from a seed value.
+///
+/// `try_unfold()` will call the provided closure with the provided seed, then
+/// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
+/// yield the value `a`, and use `b` as the next internal state.
+///
+/// If the closure returns `None` instead of `Some(TryFuture)`, then the
+/// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
+/// future calls to `poll()`.
+///
+/// In case of error generated by the returned `TryFuture`, the error will be
+/// returned by the `TryStream`. The `TryStream` will then yield
+/// `Poll::Ready(None)` in future calls to `poll()`.
+///
+/// This function can typically be used when wanting to go from the "world of
+/// futures" to the "world of streams": the provided closure can build a
+/// `TryFuture` using other library functions working on futures, and
+/// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
+///
+/// # Example
+///
+/// ```
+/// # #[derive(Debug, PartialEq)]
+/// # struct SomeError;
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, TryStreamExt};
+///
+/// let stream = stream::try_unfold(0, |state| async move {
+/// if state < 0 {
+/// return Err(SomeError);
+/// }
+///
+/// if state <= 2 {
+/// let next_state = state + 1;
+/// let yielded = state * 2;
+/// Ok(Some((yielded, next_state)))
+/// } else {
+/// Ok(None)
+/// }
+/// });
+///
+/// let result: Result<Vec<i32>, _> = stream.try_collect().await;
+/// assert_eq!(result, Ok(vec![0, 2, 4]));
+/// # });
+/// ```
+pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
+where
+ F: FnMut(T) -> Fut,
+ Fut: TryFuture<Ok = Option<(Item, T)>>,
+{
+ assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None })
+}
+
+pin_project! {
+ /// Stream for the [`try_unfold`] function.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryUnfold<T, F, Fut> {
+ f: F,
+ state: Option<T>,
+ #[pin]
+ fut: Option<Fut>,
+ }
+}
+
+impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
+where
+ T: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryUnfold").field("state", &self.state).field("fut", &self.fut).finish()
+ }
+}
+
+impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
+where
+ F: FnMut(T) -> Fut,
+ Fut: TryFuture<Ok = Option<(Item, T)>>,
+{
+ type Item = Result<Item, Fut::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ if let Some(state) = this.state.take() {
+ this.fut.set(Some((this.f)(state)));
+ }
+
+ match this.fut.as_mut().as_pin_mut() {
+ None => {
+ // The future previously errored
+ Poll::Ready(None)
+ }
+ Some(future) => {
+ let step = ready!(future.try_poll(cx));
+ this.fut.set(None);
+
+ match step {
+ Ok(Some((item, next_state))) => {
+ *this.state = Some(next_state);
+ Poll::Ready(Some(Ok(item)))
+ }
+ Ok(None) => Poll::Ready(None),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ }
+ }
+ }
+}