summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream/zip.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/zip.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/zip.rs59
1 files changed, 59 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/zip.rs b/third_party/rust/futures-0.1.31/src/stream/zip.rs
new file mode 100644
index 0000000000..17e3c69ffe
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/zip.rs
@@ -0,0 +1,59 @@
+use {Async, Poll};
+use stream::{Stream, Fuse};
+
+/// An adapter for merging the output of two streams.
+///
+/// The merged stream produces items from one or both of the underlying
+/// streams as they become available. Errors, however, are not merged: you
+#[derive(Debug)]
+/// get at most one error at a time.
+#[must_use = "streams do nothing unless polled"]
+pub struct Zip<S1: Stream, S2: Stream> {
+ stream1: Fuse<S1>,
+ stream2: Fuse<S2>,
+ queued1: Option<S1::Item>,
+ queued2: Option<S2::Item>,
+}
+
+pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Zip<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ Zip {
+ stream1: stream1.fuse(),
+ stream2: stream2.fuse(),
+ queued1: None,
+ queued2: None,
+ }
+}
+
+impl<S1, S2> Stream for Zip<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ type Item = (S1::Item, S2::Item);
+ type Error = S1::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.queued1.is_none() {
+ match self.stream1.poll()? {
+ Async::Ready(Some(item1)) => self.queued1 = Some(item1),
+ Async::Ready(None) | Async::NotReady => {}
+ }
+ }
+ if self.queued2.is_none() {
+ match self.stream2.poll()? {
+ Async::Ready(Some(item2)) => self.queued2 = Some(item2),
+ Async::Ready(None) | Async::NotReady => {}
+ }
+ }
+
+ if self.queued1.is_some() && self.queued2.is_some() {
+ let pair = (self.queued1.take().unwrap(),
+ self.queued2.take().unwrap());
+ Ok(Async::Ready(Some(pair)))
+ } else if self.stream1.is_done() || self.stream2.is_done() {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}