diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/zip.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/stream/zip.rs | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/zip.rs b/third_party/rust/futures-0.1.31/src/stream/zip.rs new file mode 100644 index 0000000000..17e3c69ffe --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/zip.rs @@ -0,0 +1,59 @@ +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from one or both of the underlying +/// streams as they become available. Errors, however, are not merged: you +#[derive(Debug)] +/// get at most one error at a time. +#[must_use = "streams do nothing unless polled"] +pub struct Zip<S1: Stream, S2: Stream> { + stream1: Fuse<S1>, + stream2: Fuse<S2>, + queued1: Option<S1::Item>, + queued2: Option<S2::Item>, +} + +pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Zip<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + Zip { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + queued1: None, + queued2: None, + } +} + +impl<S1, S2> Stream for Zip<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + type Item = (S1::Item, S2::Item); + type Error = S1::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if self.queued1.is_none() { + match self.stream1.poll()? { + Async::Ready(Some(item1)) => self.queued1 = Some(item1), + Async::Ready(None) | Async::NotReady => {} + } + } + if self.queued2.is_none() { + match self.stream2.poll()? { + Async::Ready(Some(item2)) => self.queued2 = Some(item2), + Async::Ready(None) | Async::NotReady => {} + } + } + + if self.queued1.is_some() && self.queued2.is_some() { + let pair = (self.queued1.take().unwrap(), + self.queued2.take().unwrap()); + Ok(Async::Ready(Some(pair))) + } else if self.stream1.is_done() || self.stream2.is_done() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} |