diff options
Diffstat (limited to 'vendor/futures-util/src/stream/stream/take_until.rs')
-rw-r--r-- | vendor/futures-util/src/stream/stream/take_until.rs | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/vendor/futures-util/src/stream/stream/take_until.rs b/vendor/futures-util/src/stream/stream/take_until.rs new file mode 100644 index 000000000..d14f9ce10 --- /dev/null +++ b/vendor/futures-util/src/stream/stream/take_until.rs @@ -0,0 +1,170 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +// FIXME: docs, tests + +pin_project! { + /// Stream for the [`take_until`](super::StreamExt::take_until) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeUntil<St: Stream, Fut: Future> { + #[pin] + stream: St, + // Contains the inner Future on start and None once the inner Future is resolved + // or taken out by the user. + #[pin] + fut: Option<Fut>, + // Contains fut's return value once fut is resolved + fut_result: Option<Fut::Output>, + // Whether the future was taken out by the user. + free: bool, + } +} + +impl<St, Fut> fmt::Debug for TakeUntil<St, Fut> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + Fut: Future + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish() + } +} + +impl<St, Fut> TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + pub(super) fn new(stream: St, fut: Fut) -> Self { + Self { stream, fut: Some(fut), fut_result: None, free: false } + } + + delegate_access_inner!(stream, St, ()); + + /// Extract the stopping future out of the combinator. + /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet. + /// Taking out the future means the combinator will be yielding + /// elements from the wrapped stream without ever stopping it. + pub fn take_future(&mut self) -> Option<Fut> { + if self.fut.is_some() { + self.free = true; + } + + self.fut.take() + } + + /// Once the stopping future is resolved, this method can be used + /// to extract the value returned by the stopping future. + /// + /// This may be used to retrieve arbitrary data from the stopping + /// future, for example a reason why the stream was stopped. + /// + /// This method will return `None` if the future isn't resolved yet, + /// or if the result was already taken out. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// use futures::task::Poll; + /// + /// let stream = stream::iter(1..=10); + /// + /// let mut i = 0; + /// let stop_fut = future::poll_fn(|_cx| { + /// i += 1; + /// if i <= 5 { + /// Poll::Pending + /// } else { + /// Poll::Ready("reason") + /// } + /// }); + /// + /// let mut stream = stream.take_until(stop_fut); + /// let _ = stream.by_ref().collect::<Vec<_>>().await; + /// + /// let result = stream.take_result().unwrap(); + /// assert_eq!(result, "reason"); + /// # }); + /// ``` + pub fn take_result(&mut self) -> Option<Fut::Output> { + self.fut_result.take() + } + + /// Whether the stream was stopped yet by the stopping future + /// being resolved. + pub fn is_stopped(&self) -> bool { + !self.free && self.fut.is_none() + } +} + +impl<St, Fut> Stream for TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { + let mut this = self.project(); + + if let Some(f) = this.fut.as_mut().as_pin_mut() { + if let Poll::Ready(result) = f.poll(cx) { + this.fut.set(None); + *this.fut_result = Some(result); + } + } + + if !*this.free && this.fut.is_none() { + // Future resolved, inner stream stopped + Poll::Ready(None) + } else { + // Future either not resolved yet or taken out by the user + let item = ready!(this.stream.poll_next(cx)); + if item.is_none() { + this.fut.set(None); + } + Poll::Ready(item) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.is_stopped() { + return (0, Some(0)); + } + + self.stream.size_hint() + } +} + +impl<St, Fut> FusedStream for TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.is_stopped() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut> +where + S: Stream + Sink<Item>, + Fut: Future, +{ + type Error = S::Error; + + delegate_sink!(stream, Item); +} |