summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream/collect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/collect.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/collect.rs52
1 files changed, 52 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/collect.rs b/third_party/rust/futures-0.1.31/src/stream/collect.rs
new file mode 100644
index 0000000000..8bd9d0e1dc
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/collect.rs
@@ -0,0 +1,52 @@
+use std::prelude::v1::*;
+
+use std::mem;
+
+use {Future, Poll, Async};
+use stream::Stream;
+
+/// A future which collects all of the values of a stream into a vector.
+///
+/// This future is created by the `Stream::collect` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Collect<S> where S: Stream {
+ stream: S,
+ items: Vec<S::Item>,
+}
+
+pub fn new<S>(s: S) -> Collect<S>
+ where S: Stream,
+{
+ Collect {
+ stream: s,
+ items: Vec::new(),
+ }
+}
+
+impl<S: Stream> Collect<S> {
+ fn finish(&mut self) -> Vec<S::Item> {
+ mem::replace(&mut self.items, Vec::new())
+ }
+}
+
+impl<S> Future for Collect<S>
+ where S: Stream,
+{
+ type Item = Vec<S::Item>;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Vec<S::Item>, S::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(e))) => self.items.push(e),
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(self.finish())),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => {
+ self.finish();
+ return Err(e)
+ }
+ }
+ }
+ }
+}