use {Async, IntoFuture, Future, Poll}; use stream::Stream; /// A stream combinator which chains a computation onto each item produced by a /// stream. /// /// This structure is produced by the `Stream::then` method. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Then where U: IntoFuture, { stream: S, future: Option, f: F, } pub fn new(s: S, f: F) -> Then where S: Stream, F: FnMut(Result) -> U, U: IntoFuture, { Then { stream: s, future: None, f: f, } } // Forwarding impl of Sink from the underlying stream impl ::sink::Sink for Then where S: ::sink::Sink, U: IntoFuture, { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: S::SinkItem) -> ::StartSend { self.stream.start_send(item) } fn poll_complete(&mut self) -> Poll<(), S::SinkError> { self.stream.poll_complete() } fn close(&mut self) -> Poll<(), S::SinkError> { self.stream.close() } } impl Stream for Then where S: Stream, F: FnMut(Result) -> U, U: IntoFuture, { type Item = U::Item; type Error = U::Error; fn poll(&mut self) -> Poll, U::Error> { if self.future.is_none() { let item = match self.stream.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(e))) => Ok(e), Err(e) => Err(e), }; self.future = Some((self.f)(item).into_future()); } assert!(self.future.is_some()); match self.future.as_mut().unwrap().poll() { Ok(Async::Ready(e)) => { self.future = None; Ok(Async::Ready(Some(e))) } Err(e) => { self.future = None; Err(e) } Ok(Async::NotReady) => Ok(Async::NotReady) } } }