use core::mem; use {Future, IntoFuture, Async, Poll}; use stream::Stream; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// /// This function is the dual for the `Stream::fold()` adapter: while /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a /// `Stream` from a seed value. /// /// `unfold()` will call the provided closure with the provided seed, then wait /// for the returned `Future` to complete with `(a, b)`. It will then yield the /// value `a`, and use `b` as the next internal state. /// /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` /// will stop producing items and return `Ok(Async::Ready(None))` in future /// calls to `poll()`. /// /// In case of error generated by the returned `Future`, the error will be /// returned by the `Stream`. The `Stream` will then yield /// `Ok(Async::Ready(None))` in future calls to `poll()`. /// /// This function can typically be used when wanting to go from the "world of /// futures" to the "world of streams": the provided closure can build a /// `Future` using other library functions working on futures, and `unfold()` /// will turn it into a `Stream` by repeating the operation. /// /// # Example /// /// ```rust /// use futures::stream::{self, Stream}; /// use futures::future::{self, Future}; /// /// let mut stream = stream::unfold(0, |state| { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; /// let fut = future::ok::<_, u32>((yielded, next_state)); /// Some(fut) /// } else { /// None /// } /// }); /// /// let result = stream.collect().wait(); /// assert_eq!(result, Ok(vec![0, 2, 4])); /// ``` pub fn unfold(init: T, f: F) -> Unfold where F: FnMut(T) -> Option, Fut: IntoFuture, { Unfold { f: f, state: State::Ready(init), } } /// A stream which creates futures, polls them and return their result /// /// This stream is returned by the `futures::stream::unfold` method #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Unfold where Fut: IntoFuture { f: F, state: State, } impl Stream for Unfold where F: FnMut(T) -> Option, Fut: IntoFuture, { type Item = It; type Error = Fut::Error; fn poll(&mut self) -> Poll, Fut::Error> { loop { match mem::replace(&mut self.state, State::Empty) { // State::Empty may happen if the future returned an error State::Empty => { return Ok(Async::Ready(None)); } State::Ready(state) => { match (self.f)(state) { Some(fut) => { self.state = State::Processing(fut.into_future()); } None => { return Ok(Async::Ready(None)); } } } State::Processing(mut fut) => { match fut.poll()? { Async:: Ready((item, next_state)) => { self.state = State::Ready(next_state); return Ok(Async::Ready(Some(item))); } Async::NotReady => { self.state = State::Processing(fut); return Ok(Async::NotReady); } } } } } } } #[derive(Debug)] enum State where F: Future { /// Placeholder state when doing work, or when the returned Future generated an error Empty, /// Ready to generate new future; current internal state is the `T` Ready(T), /// Working on a future generated previously Processing(F), }