use {Async, Future, IntoFuture, Poll}; use stream::Stream; /// A stream combinator which executes a unit closure over each item on a /// stream. /// /// This structure is returned by the `Stream::for_each` method. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct ForEach where U: IntoFuture { stream: S, f: F, fut: Option, } pub fn new(s: S, f: F) -> ForEach where S: Stream, F: FnMut(S::Item) -> U, U: IntoFuture, { ForEach { stream: s, f: f, fut: None, } } impl Future for ForEach where S: Stream, F: FnMut(S::Item) -> U, U: IntoFuture, { type Item = (); type Error = S::Error; fn poll(&mut self) -> Poll<(), S::Error> { loop { if let Some(mut fut) = self.fut.take() { if fut.poll()?.is_not_ready() { self.fut = Some(fut); return Ok(Async::NotReady); } } match try_ready!(self.stream.poll()) { Some(e) => self.fut = Some((self.f)(e).into_future()), None => return Ok(Async::Ready(())), } } } }