summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream/try_stream/try_unfold.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/try_unfold.rs')
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_unfold.rs122
1 files changed, 122 insertions, 0 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/try_unfold.rs b/vendor/futures-util/src/stream/try_stream/try_unfold.rs
new file mode 100644
index 000000000..fd9cdf1d8
--- /dev/null
+++ b/vendor/futures-util/src/stream/try_stream/try_unfold.rs
@@ -0,0 +1,122 @@
+use super::assert_stream;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
+///
+/// This function is the dual for the `TryStream::try_fold()` adapter: while
+/// `TryStream::try_fold()` reduces a `TryStream` to one single value,
+/// `try_unfold()` creates a `TryStream` from a seed value.
+///
+/// `try_unfold()` will call the provided closure with the provided seed, then
+/// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
+/// yield the value `a`, and use `b` as the next internal state.
+///
+/// If the closure returns `None` instead of `Some(TryFuture)`, then the
+/// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
+/// future calls to `poll()`.
+///
+/// In case of error generated by the returned `TryFuture`, the error will be
+/// returned by the `TryStream`. The `TryStream` will then yield
+/// `Poll::Ready(None)` in future calls to `poll()`.
+///
+/// This function can typically be used when wanting to go from the "world of
+/// futures" to the "world of streams": the provided closure can build a
+/// `TryFuture` using other library functions working on futures, and
+/// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
+///
+/// # Example
+///
+/// ```
+/// # #[derive(Debug, PartialEq)]
+/// # struct SomeError;
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, TryStreamExt};
+///
+/// let stream = stream::try_unfold(0, |state| async move {
+/// if state < 0 {
+/// return Err(SomeError);
+/// }
+///
+/// if state <= 2 {
+/// let next_state = state + 1;
+/// let yielded = state * 2;
+/// Ok(Some((yielded, next_state)))
+/// } else {
+/// Ok(None)
+/// }
+/// });
+///
+/// let result: Result<Vec<i32>, _> = stream.try_collect().await;
+/// assert_eq!(result, Ok(vec![0, 2, 4]));
+/// # });
+/// ```
+pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
+where
+ F: FnMut(T) -> Fut,
+ Fut: TryFuture<Ok = Option<(Item, T)>>,
+{
+ assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None })
+}
+
+pin_project! {
+ /// Stream for the [`try_unfold`] function.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryUnfold<T, F, Fut> {
+ f: F,
+ state: Option<T>,
+ #[pin]
+ fut: Option<Fut>,
+ }
+}
+
+impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
+where
+ T: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryUnfold").field("state", &self.state).field("fut", &self.fut).finish()
+ }
+}
+
+impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
+where
+ F: FnMut(T) -> Fut,
+ Fut: TryFuture<Ok = Option<(Item, T)>>,
+{
+ type Item = Result<Item, Fut::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ if let Some(state) = this.state.take() {
+ this.fut.set(Some((this.f)(state)));
+ }
+
+ match this.fut.as_mut().as_pin_mut() {
+ None => {
+ // The future previously errored
+ Poll::Ready(None)
+ }
+ Some(future) => {
+ let step = ready!(future.try_poll(cx));
+ this.fut.set(None);
+
+ match step {
+ Ok(Some((item, next_state))) => {
+ *this.state = Some(next_state);
+ Poll::Ready(Some(Ok(item)))
+ }
+ Ok(None) => Poll::Ready(None),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ }
+ }
+ }
+}