use core::mem; use {Future, Poll, IntoFuture, Async}; use stream::Stream; /// A future used to collect all the results of a stream into one generic type. /// /// This future is returned by the `Stream::fold` method. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Fold where Fut: IntoFuture { stream: S, f: F, state: State, } #[derive(Debug)] enum State where F: Future { /// Placeholder state when doing work Empty, /// Ready to process the next stream item; current accumulator is the `T` Ready(T), /// Working on a future the process the previous stream item Processing(F), } pub fn new(s: S, f: F, t: T) -> Fold where S: Stream, F: FnMut(T, S::Item) -> Fut, Fut: IntoFuture, S::Error: From, { Fold { stream: s, f: f, state: State::Ready(t), } } impl Future for Fold where S: Stream, F: FnMut(T, S::Item) -> Fut, Fut: IntoFuture, S::Error: From, { type Item = T; type Error = S::Error; fn poll(&mut self) -> Poll { loop { match mem::replace(&mut self.state, State::Empty) { State::Empty => panic!("cannot poll Fold twice"), State::Ready(state) => { match self.stream.poll()? { Async::Ready(Some(e)) => { let future = (self.f)(state, e); let future = future.into_future(); self.state = State::Processing(future); } Async::Ready(None) => return Ok(Async::Ready(state)), Async::NotReady => { self.state = State::Ready(state); return Ok(Async::NotReady) } } } State::Processing(mut fut) => { match fut.poll()? { Async::Ready(state) => self.state = State::Ready(state), Async::NotReady => { self.state = State::Processing(fut); return Ok(Async::NotReady) } } } } } } }