use {Async, Poll}; use stream::{Stream, Fuse}; /// An adapter for merging the output of two streams. /// /// The merged stream produces items from one or both of the underlying /// streams as they become available. Errors, however, are not merged: you #[derive(Debug)] /// get at most one error at a time. #[must_use = "streams do nothing unless polled"] pub struct Zip { stream1: Fuse, stream2: Fuse, queued1: Option, queued2: Option, } pub fn new(stream1: S1, stream2: S2) -> Zip where S1: Stream, S2: Stream { Zip { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, queued2: None, } } impl Stream for Zip where S1: Stream, S2: Stream { type Item = (S1::Item, S2::Item); type Error = S1::Error; fn poll(&mut self) -> Poll, Self::Error> { if self.queued1.is_none() { match self.stream1.poll()? { Async::Ready(Some(item1)) => self.queued1 = Some(item1), Async::Ready(None) | Async::NotReady => {} } } if self.queued2.is_none() { match self.stream2.poll()? { Async::Ready(Some(item2)) => self.queued2 = Some(item2), Async::Ready(None) | Async::NotReady => {} } } if self.queued1.is_some() && self.queued2.is_some() { let pair = (self.queued1.take().unwrap(), self.queued2.take().unwrap()); Ok(Async::Ready(Some(pair))) } else if self.stream1.is_done() || self.stream2.is_done() { Ok(Async::Ready(None)) } else { Ok(Async::NotReady) } } }