From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- vendor/futures-util/src/stream/unfold.rs | 119 +++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 vendor/futures-util/src/stream/unfold.rs (limited to 'vendor/futures-util/src/stream/unfold.rs') diff --git a/vendor/futures-util/src/stream/unfold.rs b/vendor/futures-util/src/stream/unfold.rs new file mode 100644 index 000000000..7d8ef6bab --- /dev/null +++ b/vendor/futures-util/src/stream/unfold.rs @@ -0,0 +1,119 @@ +use super::assert_stream; +use crate::unfold_state::UnfoldState; +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +/// Creates a `Stream` from a seed and a closure returning a `Future`. +/// +/// This function is the dual for the `Stream::fold()` adapter: while +/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a +/// `Stream` from a seed value. +/// +/// `unfold()` will call the provided closure with the provided seed, then wait +/// for the returned `Future` to complete with `(a, b)`. It will then yield the +/// value `a`, and use `b` as the next internal state. +/// +/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` +/// will stop producing items and return `Poll::Ready(None)` in future +/// calls to `poll()`. +/// +/// This function can typically be used when wanting to go from the "world of +/// futures" to the "world of streams": the provided closure can build a +/// `Future` using other library functions working on futures, and `unfold()` +/// will turn it into a `Stream` by repeating the operation. +/// +/// # Example +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// +/// let stream = stream::unfold(0, |state| async move { +/// if state <= 2 { +/// let next_state = state + 1; +/// let yielded = state * 2; +/// Some((yielded, next_state)) +/// } else { +/// None +/// } +/// }); +/// +/// let result = stream.collect::>().await; +/// assert_eq!(result, vec![0, 2, 4]); +/// # }); +/// ``` +pub fn unfold(init: T, f: F) -> Unfold +where + F: FnMut(T) -> Fut, + Fut: Future>, +{ + assert_stream::(Unfold { f, state: UnfoldState::Value { value: init } }) +} + +pin_project! { + /// Stream for the [`unfold`] function. + #[must_use = "streams do nothing unless polled"] + pub struct Unfold { + f: F, + #[pin] + state: UnfoldState, + } +} + +impl fmt::Debug for Unfold +where + T: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Unfold").field("state", &self.state).finish() + } +} + +impl FusedStream for Unfold +where + F: FnMut(T) -> Fut, + Fut: Future>, +{ + fn is_terminated(&self) -> bool { + if let UnfoldState::Empty = self.state { + true + } else { + false + } + } +} + +impl Stream for Unfold +where + F: FnMut(T) -> Fut, + Fut: Future>, +{ + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if let Some(state) = this.state.as_mut().take_value() { + this.state.set(UnfoldState::Future { future: (this.f)(state) }); + } + + let step = match this.state.as_mut().project_future() { + Some(fut) => ready!(fut.poll(cx)), + None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"), + }; + + if let Some((item, next_state)) = step { + this.state.set(UnfoldState::Value { value: next_state }); + Poll::Ready(Some(item)) + } else { + this.state.set(UnfoldState::Empty); + Poll::Ready(None) + } + } +} -- cgit v1.2.3