diff options
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs')
-rw-r--r-- | vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 000000000..a74dfc451 --- /dev/null +++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -0,0 +1,176 @@ +use core::marker::PhantomData; +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::flatten_unordered::{ + FlattenUnorderedWithFlowController, FlowController, FlowStep, +}; +use crate::stream::IntoStream; +use crate::TryStreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered<St>( + FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into<Option<usize>>| + FlattenUnorderedWithFlowController::new( + NestedTryStreamIntoEitherTryStream::new(stream), + limit.into() + ) + ] + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> +); + +pin_project! { + /// Emits either successful streams or single-item streams containing the underlying errors. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct NestedTryStreamIntoEitherTryStream<St> + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> + { + #[pin] + stream: St + } +} + +impl<St> NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Emits a single item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct Single<T>(Option<T>); + +impl<T> Single<T> { + /// Constructs new `Single` with the given value. + fn new(val: T) -> Self { + Self(Some(val)) + } + + /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. + fn next_immediate(&mut self) -> Option<T> { + self.0.take() + } +} + +impl<T> Unpin for Single<T> {} + +impl<T> Stream for Single<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +/// Immediately propagates errors occurred in the base stream. +#[derive(Debug, Clone, Copy)] +pub struct PropagateBaseStreamError<St>(PhantomData<St>); + +type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; +type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; + +impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { + match item { + // A new successful inner stream received + st @ Either::Left(_) => FlowStep::Continue(st), + // An error encountered + Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), + } + } +} + +type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; + +impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = match item { + Some(res) => match res { + // Emit successful inner stream as is + Ok(stream) => Either::Left(stream.into_stream()), + // Wrap an error into a stream containing a single item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(Single::new(res)) + } + }, + None => return Poll::Ready(None), + }; + + Poll::Ready(Some(out)) + } +} + +impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + FusedStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + Sink<Item>, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, +{ + type Error = <St as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} |