summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream/try_stream/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/mod.rs')
-rw-r--r--vendor/futures-util/src/stream/try_stream/mod.rs92
1 files changed, 79 insertions, 13 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/mod.rs b/vendor/futures-util/src/stream/try_stream/mod.rs
index 455ddca3f..414a40dbe 100644
--- a/vendor/futures-util/src/stream/try_stream/mod.rs
+++ b/vendor/futures-util/src/stream/try_stream/mod.rs
@@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
+
use futures_core::{
future::{Future, TryFuture},
stream::TryStream,
@@ -88,6 +89,14 @@ mod try_flatten;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_flatten::TryFlatten;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_flatten_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten_unordered::TryFlattenUnordered;
+
mod try_collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_collect::TryCollect;
@@ -711,6 +720,63 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
}
+ /// Flattens a stream of streams into just one continuous stream. Produced streams
+ /// will be polled concurrently and any errors will be passed through without looking at them.
+ /// If the underlying base stream returns an error, it will be **immediately** propagated.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(Ok(1)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(Ok(2)).unwrap();
+ /// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(Ok(rx1)).unwrap();
+ /// tx3.unbounded_send(Ok(rx2)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
+ /// });
+ ///
+ /// let stream = rx3.try_flatten_unordered(None);
+ /// let mut values: Vec<_> = stream.collect().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
+ where
+ Self::Ok: TryStream + Unpin,
+ <Self::Ok as TryStream>::Error: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlattenUnordered::new(self, limit),
+ )
+ }
+
/// Flattens a stream of streams into just one continuous stream.
///
/// If this stream's elements are themselves streams then this combinator
@@ -736,17 +802,21 @@ pub trait TryStreamExt: TryStream {
/// thread::spawn(move || {
/// tx2.unbounded_send(Ok(2)).unwrap();
/// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
/// });
/// thread::spawn(move || {
/// tx3.unbounded_send(Ok(rx1)).unwrap();
/// tx3.unbounded_send(Ok(rx2)).unwrap();
- /// tx3.unbounded_send(Err(4)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
/// });
///
/// let mut stream = rx3.try_flatten();
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn try_flatten(self) -> TryFlatten<Self>
@@ -914,7 +984,7 @@ pub trait TryStreamExt: TryStream {
/// that matches the stream's `Error` type.
///
/// This adaptor will buffer up to `n` futures and then return their
- /// outputs in the order. If the underlying stream returns an error, it will
+ /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
/// be immediately propagated.
///
/// The returned stream will be a stream of results, each containing either
@@ -1001,6 +1071,7 @@ pub trait TryStreamExt: TryStream {
/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::future::{FutureExt, TryFutureExt};
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
@@ -1026,12 +1097,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}
- /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
- ///
- /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
- /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
- /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
- /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
+ /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
@@ -1043,12 +1109,12 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
- /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
+ /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
- /// let mut buf = Vec::new();
///
- /// assert!(reader.read_to_end(&mut buf).await.is_ok());
- /// assert_eq!(buf, &[1, 2, 3, 4, 5]);
+ /// let mut buf = Vec::new();
+ /// reader.read_to_end(&mut buf).await.unwrap();
+ /// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
@@ -1056,7 +1122,7 @@ pub trait TryStreamExt: TryStream {
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
- Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
+ Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))