summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs')
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs176
1 files changed, 176 insertions, 0 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
new file mode 100644
index 000000000..a74dfc451
--- /dev/null
+++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
@@ -0,0 +1,176 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+use pin_project_lite::pin_project;
+
+use crate::future::Either;
+use crate::stream::stream::flatten_unordered::{
+ FlattenUnorderedWithFlowController, FlowController, FlowStep,
+};
+use crate::stream::IntoStream;
+use crate::TryStreamExt;
+
+delegate_all!(
+ /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
+ TryFlattenUnordered<St>(
+ FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ + New[
+ |stream: St, limit: impl Into<Option<usize>>|
+ FlattenUnorderedWithFlowController::new(
+ NestedTryStreamIntoEitherTryStream::new(stream),
+ limit.into()
+ )
+ ]
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+);
+
+pin_project! {
+ /// Emits either successful streams or single-item streams containing the underlying errors.
+ /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct NestedTryStreamIntoEitherTryStream<St>
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+ {
+ #[pin]
+ stream: St
+ }
+}
+
+impl<St> NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn new(stream: St) -> Self {
+ Self { stream }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Emits a single item immediately, then stream will be terminated.
+#[derive(Debug, Clone)]
+pub struct Single<T>(Option<T>);
+
+impl<T> Single<T> {
+ /// Constructs new `Single` with the given value.
+ fn new(val: T) -> Self {
+ Self(Some(val))
+ }
+
+ /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
+ fn next_immediate(&mut self) -> Option<T> {
+ self.0.take()
+ }
+}
+
+impl<T> Unpin for Single<T> {}
+
+impl<T> Stream for Single<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(self.0.take())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
+ }
+}
+
+/// Immediately propagates errors occurred in the base stream.
+#[derive(Debug, Clone, Copy)]
+pub struct PropagateBaseStreamError<St>(PhantomData<St>);
+
+type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
+type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
+
+impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
+ match item {
+ // A new successful inner stream received
+ st @ Either::Left(_) => FlowStep::Continue(st),
+ // An error encountered
+ Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
+ }
+ }
+}
+
+type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
+
+impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ // Item is either an inner stream or a stream containing a single error.
+ // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
+ type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let item = ready!(self.project().stream.try_poll_next(cx));
+
+ let out = match item {
+ Some(res) => match res {
+ // Emit successful inner stream as is
+ Ok(stream) => Either::Left(stream.into_stream()),
+ // Wrap an error into a stream containing a single item
+ err @ Err(_) => {
+ let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
+
+ Either::Right(Single::new(res))
+ }
+ },
+ None => return Poll::Ready(None),
+ };
+
+ Poll::Ready(Some(out))
+ }
+}
+
+impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + FusedStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + Sink<Item>,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
+{
+ type Error = <St as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}