summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/and_then.rs106
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs130
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/buffered.rs132
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs71
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/chain.rs57
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/channel.rs114
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/chunks.rs136
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/collect.rs52
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/concat.rs172
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/empty.rs29
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/filter.rs89
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/filter_map.rs89
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/flatten.rs96
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/fold.rs81
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/for_each.rs51
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/forward.rs110
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/from_err.rs80
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/fuse.rs89
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/future.rs76
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs219
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs707
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/inspect.rs84
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/inspect_err.rs81
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/iter.rs46
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/iter_ok.rs48
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/iter_result.rs51
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/map.rs81
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/map_err.rs80
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/merge.rs82
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/mod.rs1146
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/once.rs35
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/or_else.rs80
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/peek.rs74
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/poll_fn.rs49
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/repeat.rs53
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/select.rs64
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/skip.rs84
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/skip_while.rs113
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/split.rs105
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/take.rs86
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/take_while.rs113
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/then.rs81
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/unfold.rs114
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/wait.rs53
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/zip.rs59
45 files changed, 5548 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/and_then.rs b/third_party/rust/futures-0.1.31/src/stream/and_then.rs
new file mode 100644
index 0000000000..1fac8b952d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/and_then.rs
@@ -0,0 +1,106 @@
+use {IntoFuture, Future, Poll, Async};
+use stream::Stream;
+
+/// A stream combinator which chains a computation onto values produced by a
+/// stream.
+///
+/// This structure is produced by the `Stream::and_then` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct AndThen<S, F, U>
+ where U: IntoFuture,
+{
+ stream: S,
+ future: Option<U::Future>,
+ f: F,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> AndThen<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+ U: IntoFuture<Error=S::Error>,
+{
+ AndThen {
+ stream: s,
+ future: None,
+ f: f,
+ }
+}
+
+impl<S, F, U> AndThen<S, F, U>
+ where U: IntoFuture,
+{
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F, U: IntoFuture> ::sink::Sink for AndThen<S, F, U>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, U> Stream for AndThen<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+ U: IntoFuture<Error=S::Error>,
+{
+ type Item = U::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<U::Item>, S::Error> {
+ if self.future.is_none() {
+ let item = match try_ready!(self.stream.poll()) {
+ None => return Ok(Async::Ready(None)),
+ Some(e) => e,
+ };
+ self.future = Some((self.f)(item).into_future());
+ }
+ assert!(self.future.is_some());
+ match self.future.as_mut().unwrap().poll() {
+ Ok(Async::Ready(e)) => {
+ self.future = None;
+ Ok(Async::Ready(Some(e)))
+ }
+ Err(e) => {
+ self.future = None;
+ Err(e)
+ }
+ Ok(Async::NotReady) => Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs b/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs
new file mode 100644
index 0000000000..3011108cf3
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs
@@ -0,0 +1,130 @@
+use std::fmt;
+
+use {Async, IntoFuture, Poll};
+use stream::{Stream, Fuse, FuturesUnordered};
+
+/// An adaptor for a stream of futures to execute the futures concurrently, if
+/// possible, delivering results as they become available.
+///
+/// This adaptor will buffer up a list of pending futures, and then return their
+/// results in the order that they complete. This is created by the
+/// `Stream::buffer_unordered` method.
+#[must_use = "streams do nothing unless polled"]
+pub struct BufferUnordered<S>
+ where S: Stream,
+ S::Item: IntoFuture,
+{
+ stream: Fuse<S>,
+ queue: FuturesUnordered<<S::Item as IntoFuture>::Future>,
+ max: usize,
+}
+
+impl<S> fmt::Debug for BufferUnordered<S>
+ where S: Stream + fmt::Debug,
+ S::Item: IntoFuture,
+ <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BufferUnordered")
+ .field("stream", &self.stream)
+ .field("queue", &self.queue)
+ .field("max", &self.max)
+ .finish()
+ }
+}
+
+pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ BufferUnordered {
+ stream: super::fuse::new(s),
+ queue: FuturesUnordered::new(),
+ max: amt,
+ }
+}
+
+impl<S> BufferUnordered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ self.stream.get_ref()
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.stream.get_mut()
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream.into_inner()
+ }
+}
+
+impl<S> Stream for BufferUnordered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ type Item = <S::Item as IntoFuture>::Item;
+ type Error = <S as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First up, try to spawn off as many futures as possible by filling up
+ // our slab of futures.
+ while self.queue.len() < self.max {
+ let future = match self.stream.poll()? {
+ Async::Ready(Some(s)) => s.into_future(),
+ Async::Ready(None) |
+ Async::NotReady => break,
+ };
+
+ self.queue.push(future);
+ }
+
+ // Try polling a new future
+ if let Some(val) = try_ready!(self.queue.poll()) {
+ return Ok(Async::Ready(Some(val)));
+ }
+
+ // If we've gotten this far, then there are no events for us to process
+ // and nothing was ready, so figure out if we're not done yet or if
+ // we've reached the end.
+ if self.stream.is_done() {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for BufferUnordered<S>
+ where S: ::sink::Sink + Stream,
+ S::Item: IntoFuture,
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/buffered.rs b/third_party/rust/futures-0.1.31/src/stream/buffered.rs
new file mode 100644
index 0000000000..5616b73d7a
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/buffered.rs
@@ -0,0 +1,132 @@
+use std::fmt;
+
+use {Async, IntoFuture, Poll};
+use stream::{Stream, Fuse, FuturesOrdered};
+
+/// An adaptor for a stream of futures to execute the futures concurrently, if
+/// possible.
+///
+/// This adaptor will buffer up a list of pending futures, and then return their
+/// results in the order that they were pulled out of the original stream. This
+/// is created by the `Stream::buffered` method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Buffered<S>
+ where S: Stream,
+ S::Item: IntoFuture,
+{
+ stream: Fuse<S>,
+ queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
+ max: usize,
+}
+
+impl<S> fmt::Debug for Buffered<S>
+ where S: Stream + fmt::Debug,
+ S::Item: IntoFuture,
+ <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
+ <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug,
+ <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Buffered")
+ .field("stream", &self.stream)
+ .field("queue", &self.queue)
+ .field("max", &self.max)
+ .finish()
+ }
+}
+
+pub fn new<S>(s: S, amt: usize) -> Buffered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ Buffered {
+ stream: super::fuse::new(s),
+ queue: FuturesOrdered::new(),
+ max: amt,
+ }
+}
+
+impl<S> Buffered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ self.stream.get_ref()
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.stream.get_mut()
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream.into_inner()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Buffered<S>
+ where S: ::sink::Sink + Stream,
+ S::Item: IntoFuture,
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S> Stream for Buffered<S>
+ where S: Stream,
+ S::Item: IntoFuture<Error=<S as Stream>::Error>,
+{
+ type Item = <S::Item as IntoFuture>::Item;
+ type Error = <S as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First up, try to spawn off as many futures as possible by filling up
+ // our slab of futures.
+ while self.queue.len() < self.max {
+ let future = match self.stream.poll()? {
+ Async::Ready(Some(s)) => s.into_future(),
+ Async::Ready(None) |
+ Async::NotReady => break,
+ };
+
+ self.queue.push(future);
+ }
+
+ // Try polling a new future
+ if let Some(val) = try_ready!(self.queue.poll()) {
+ return Ok(Async::Ready(Some(val)));
+ }
+
+ // If we've gotten this far, then there are no events for us to process
+ // and nothing was ready, so figure out if we're not done yet or if
+ // we've reached the end.
+ if self.stream.is_done() {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs
new file mode 100644
index 0000000000..d3244946e5
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs
@@ -0,0 +1,71 @@
+use std::prelude::v1::*;
+use std::any::Any;
+use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
+use std::mem;
+
+use super::super::{Poll, Async};
+use super::Stream;
+
+/// Stream for the `catch_unwind` combinator.
+///
+/// This is created by the `Stream::catch_unwind` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct CatchUnwind<S> where S: Stream {
+ state: CatchUnwindState<S>,
+}
+
+pub fn new<S>(stream: S) -> CatchUnwind<S>
+ where S: Stream + UnwindSafe,
+{
+ CatchUnwind {
+ state: CatchUnwindState::Stream(stream),
+ }
+}
+
+#[derive(Debug)]
+enum CatchUnwindState<S> {
+ Stream(S),
+ Eof,
+ Done,
+}
+
+impl<S> Stream for CatchUnwind<S>
+ where S: Stream + UnwindSafe,
+{
+ type Item = Result<S::Item, S::Error>;
+ type Error = Box<Any + Send>;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) {
+ CatchUnwindState::Done => panic!("cannot poll after eof"),
+ CatchUnwindState::Eof => {
+ self.state = CatchUnwindState::Done;
+ return Ok(Async::Ready(None));
+ }
+ CatchUnwindState::Stream(stream) => stream,
+ };
+ let res = catch_unwind(|| (stream.poll(), stream));
+ match res {
+ Err(e) => Err(e), // and state is already Eof
+ Ok((poll, stream)) => {
+ self.state = CatchUnwindState::Stream(stream);
+ match poll {
+ Err(e) => Ok(Async::Ready(Some(Err(e)))),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))),
+ Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
+ }
+ }
+ }
+ }
+}
+
+impl<S: Stream> Stream for AssertUnwindSafe<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.0.poll()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/chain.rs b/third_party/rust/futures-0.1.31/src/stream/chain.rs
new file mode 100644
index 0000000000..0ff0e5ce6f
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/chain.rs
@@ -0,0 +1,57 @@
+use core::mem;
+
+use stream::Stream;
+use {Async, Poll};
+
+
+/// State of chain stream.
+#[derive(Debug)]
+enum State<S1, S2> {
+ /// Emitting elements of first stream
+ First(S1, S2),
+ /// Emitting elements of second stream
+ Second(S2),
+ /// Temporary value to replace first with second
+ Temp,
+}
+
+/// An adapter for chaining the output of two streams.
+///
+/// The resulting stream produces items from first stream and then
+/// from second stream.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Chain<S1, S2> {
+ state: State<S1, S2>
+}
+
+pub fn new<S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
+ where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
+{
+ Chain { state: State::First(s1, s2) }
+}
+
+impl<S1, S2> Stream for Chain<S1, S2>
+ where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
+{
+ type Item = S1::Item;
+ type Error = S1::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.state {
+ State::First(ref mut s1, ref _s2) => match s1.poll() {
+ Ok(Async::Ready(None)) => (), // roll
+ x => return x,
+ },
+ State::Second(ref mut s2) => return s2.poll(),
+ State::Temp => unreachable!(),
+ }
+
+ self.state = match mem::replace(&mut self.state, State::Temp) {
+ State::First(_s1, s2) => State::Second(s2),
+ _ => unreachable!(),
+ };
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/channel.rs b/third_party/rust/futures-0.1.31/src/stream/channel.rs
new file mode 100644
index 0000000000..89a419d150
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/channel.rs
@@ -0,0 +1,114 @@
+#![cfg(feature = "with-deprecated")]
+#![deprecated(since = "0.1.4", note = "use sync::mpsc::channel instead")]
+#![allow(deprecated)]
+
+use std::any::Any;
+use std::error::Error;
+use std::fmt;
+
+use {Poll, Async, Stream, Future, Sink};
+use sink::Send;
+use sync::mpsc;
+
+/// Creates an in-memory channel implementation of the `Stream` trait.
+///
+/// This method creates a concrete implementation of the `Stream` trait which
+/// can be used to send values across threads in a streaming fashion. This
+/// channel is unique in that it implements back pressure to ensure that the
+/// sender never outpaces the receiver. The `Sender::send` method will only
+/// allow sending one message and the next message can only be sent once the
+/// first was consumed.
+///
+/// The `Receiver` returned implements the `Stream` trait and has access to any
+/// number of the associated combinators for transforming the result.
+pub fn channel<T, E>() -> (Sender<T, E>, Receiver<T, E>) {
+ let (tx, rx) = mpsc::channel(0);
+ (Sender { inner: tx }, Receiver { inner: rx })
+}
+
+/// The transmission end of a channel which is used to send values.
+///
+/// This is created by the `channel` method in the `stream` module.
+#[derive(Debug)]
+pub struct Sender<T, E> {
+ inner: mpsc::Sender<Result<T, E>>,
+}
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is a concrete implementation of a stream which can be used to represent
+/// a stream of values being computed elsewhere. This is created by the
+/// `channel` method in the `stream` module.
+#[must_use = "streams do nothing unless polled"]
+#[derive(Debug)]
+pub struct Receiver<T, E> {
+ inner: mpsc::Receiver<Result<T, E>>,
+}
+
+/// Error type for sending, used when the receiving end of the channel is dropped
+pub struct SendError<T, E>(Result<T, E>);
+
+/// Future returned by `Sender::send`.
+#[derive(Debug)]
+pub struct FutureSender<T, E> {
+ inner: Send<mpsc::Sender<Result<T, E>>>,
+}
+
+impl<T, E> fmt::Debug for SendError<T, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("SendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T, E> fmt::Display for SendError<T, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "send failed because receiver is gone")
+ }
+}
+
+impl<T, E> Error for SendError<T, E>
+ where T: Any, E: Any
+{
+ fn description(&self) -> &str {
+ "send failed because receiver is gone"
+ }
+}
+
+
+impl<T, E> Stream for Receiver<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ match self.inner.poll().expect("cannot fail") {
+ Async::Ready(Some(Ok(e))) => Ok(Async::Ready(Some(e))),
+ Async::Ready(Some(Err(e))) => Err(e),
+ Async::Ready(None) => Ok(Async::Ready(None)),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+impl<T, E> Sender<T, E> {
+ /// Sends a new value along this channel to the receiver.
+ ///
+ /// This method consumes the sender and returns a future which will resolve
+ /// to the sender again when the value sent has been consumed.
+ pub fn send(self, t: Result<T, E>) -> FutureSender<T, E> {
+ FutureSender { inner: self.inner.send(t) }
+ }
+}
+
+impl<T, E> Future for FutureSender<T, E> {
+ type Item = Sender<T, E>;
+ type Error = SendError<T, E>;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ match self.inner.poll() {
+ Ok(a) => Ok(a.map(|a| Sender { inner: a })),
+ Err(e) => Err(SendError(e.into_inner())),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/chunks.rs b/third_party/rust/futures-0.1.31/src/stream/chunks.rs
new file mode 100644
index 0000000000..dbfaeb89ec
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/chunks.rs
@@ -0,0 +1,136 @@
+use std::mem;
+use std::prelude::v1::*;
+
+use {Async, Poll};
+use stream::{Stream, Fuse};
+
+/// An adaptor that chunks up elements in a vector.
+///
+/// This adaptor will buffer up a list of items in the stream and pass on the
+/// vector used for buffering when a specified capacity has been reached. This
+/// is created by the `Stream::chunks` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Chunks<S>
+ where S: Stream
+{
+ items: Vec<S::Item>,
+ err: Option<S::Error>,
+ stream: Fuse<S>,
+ cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475
+}
+
+pub fn new<S>(s: S, capacity: usize) -> Chunks<S>
+ where S: Stream
+{
+ assert!(capacity > 0);
+
+ Chunks {
+ items: Vec::with_capacity(capacity),
+ err: None,
+ stream: super::fuse::new(s),
+ cap: capacity,
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Chunks<S>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+
+impl<S> Chunks<S> where S: Stream {
+ fn take(&mut self) -> Vec<S::Item> {
+ let cap = self.cap;
+ mem::replace(&mut self.items, Vec::with_capacity(cap))
+ }
+
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ self.stream.get_ref()
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.stream.get_mut()
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream.into_inner()
+ }
+}
+
+impl<S> Stream for Chunks<S>
+ where S: Stream
+{
+ type Item = Vec<<S as Stream>::Item>;
+ type Error = <S as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if let Some(err) = self.err.take() {
+ return Err(err)
+ }
+
+ loop {
+ match self.stream.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+
+ // Push the item into the buffer and check whether it is full.
+ // If so, replace our buffer with a new and empty one and return
+ // the full one.
+ Ok(Async::Ready(Some(item))) => {
+ self.items.push(item);
+ if self.items.len() >= self.cap {
+ return Ok(Some(self.take()).into())
+ }
+ }
+
+ // Since the underlying stream ran out of values, return what we
+ // have buffered, if we have anything.
+ Ok(Async::Ready(None)) => {
+ return if self.items.len() > 0 {
+ let full_buf = mem::replace(&mut self.items, Vec::new());
+ Ok(Some(full_buf).into())
+ } else {
+ Ok(Async::Ready(None))
+ }
+ }
+
+ // If we've got buffered items be sure to return them first,
+ // we'll defer our error for later.
+ Err(e) => {
+ if self.items.len() == 0 {
+ return Err(e)
+ } else {
+ self.err = Some(e);
+ return Ok(Some(self.take()).into())
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/collect.rs b/third_party/rust/futures-0.1.31/src/stream/collect.rs
new file mode 100644
index 0000000000..8bd9d0e1dc
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/collect.rs
@@ -0,0 +1,52 @@
+use std::prelude::v1::*;
+
+use std::mem;
+
+use {Future, Poll, Async};
+use stream::Stream;
+
+/// A future which collects all of the values of a stream into a vector.
+///
+/// This future is created by the `Stream::collect` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Collect<S> where S: Stream {
+ stream: S,
+ items: Vec<S::Item>,
+}
+
+pub fn new<S>(s: S) -> Collect<S>
+ where S: Stream,
+{
+ Collect {
+ stream: s,
+ items: Vec::new(),
+ }
+}
+
+impl<S: Stream> Collect<S> {
+ fn finish(&mut self) -> Vec<S::Item> {
+ mem::replace(&mut self.items, Vec::new())
+ }
+}
+
+impl<S> Future for Collect<S>
+ where S: Stream,
+{
+ type Item = Vec<S::Item>;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Vec<S::Item>, S::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(e))) => self.items.push(e),
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(self.finish())),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => {
+ self.finish();
+ return Err(e)
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/concat.rs b/third_party/rust/futures-0.1.31/src/stream/concat.rs
new file mode 100644
index 0000000000..a0da71bdd5
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/concat.rs
@@ -0,0 +1,172 @@
+use core::mem;
+use core::fmt::{Debug, Formatter, Result as FmtResult};
+use core::default::Default;
+
+use {Poll, Async};
+use future::Future;
+use stream::Stream;
+
+/// A stream combinator to concatenate the results of a stream into the first
+/// yielded item.
+///
+/// This structure is produced by the `Stream::concat2` method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Concat2<S>
+ where S: Stream,
+{
+ inner: ConcatSafe<S>
+}
+
+impl<S: Debug> Debug for Concat2<S> where S: Stream, S::Item: Debug {
+ fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
+ fmt.debug_struct("Concat2")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+pub fn new2<S>(s: S) -> Concat2<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
+{
+ Concat2 {
+ inner: new_safe(s)
+ }
+}
+
+impl<S> Future for Concat2<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
+
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ self.inner.poll().map(|a| {
+ match a {
+ Async::NotReady => Async::NotReady,
+ Async::Ready(None) => Async::Ready(Default::default()),
+ Async::Ready(Some(e)) => Async::Ready(e)
+ }
+ })
+ }
+}
+
+
+/// A stream combinator to concatenate the results of a stream into the first
+/// yielded item.
+///
+/// This structure is produced by the `Stream::concat` method.
+#[deprecated(since="0.1.18", note="please use `Stream::Concat2` instead")]
+#[must_use = "streams do nothing unless polled"]
+pub struct Concat<S>
+ where S: Stream,
+{
+ inner: ConcatSafe<S>
+}
+
+#[allow(deprecated)]
+impl<S: Debug> Debug for Concat<S> where S: Stream, S::Item: Debug {
+ fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
+ fmt.debug_struct("Concat")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+#[allow(deprecated)]
+pub fn new<S>(s: S) -> Concat<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
+{
+ Concat {
+ inner: new_safe(s)
+ }
+}
+
+#[allow(deprecated)]
+impl<S> Future for Concat<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
+
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ self.inner.poll().map(|a| {
+ match a {
+ Async::NotReady => Async::NotReady,
+ Async::Ready(None) => panic!("attempted concatenation of empty stream"),
+ Async::Ready(Some(e)) => Async::Ready(e)
+ }
+ })
+ }
+}
+
+
+#[derive(Debug)]
+struct ConcatSafe<S>
+ where S: Stream,
+{
+ stream: S,
+ extend: Inner<S::Item>,
+}
+
+fn new_safe<S>(s: S) -> ConcatSafe<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
+{
+ ConcatSafe {
+ stream: s,
+ extend: Inner::First,
+ }
+}
+
+impl<S> Future for ConcatSafe<S>
+ where S: Stream,
+ S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
+
+{
+ type Item = Option<S::Item>;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(i))) => {
+ match self.extend {
+ Inner::First => {
+ self.extend = Inner::Extending(i);
+ },
+ Inner::Extending(ref mut e) => {
+ e.extend(i);
+ },
+ Inner::Done => unreachable!(),
+ }
+ },
+ Ok(Async::Ready(None)) => {
+ match mem::replace(&mut self.extend, Inner::Done) {
+ Inner::First => return Ok(Async::Ready(None)),
+ Inner::Extending(e) => return Ok(Async::Ready(Some(e))),
+ Inner::Done => panic!("cannot poll Concat again")
+ }
+ },
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => {
+ self.extend = Inner::Done;
+ return Err(e)
+ }
+ }
+ }
+ }
+}
+
+
+#[derive(Debug)]
+enum Inner<E> {
+ First,
+ Extending(E),
+ Done,
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/empty.rs b/third_party/rust/futures-0.1.31/src/stream/empty.rs
new file mode 100644
index 0000000000..c53fb80238
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/empty.rs
@@ -0,0 +1,29 @@
+use core::marker;
+
+use stream::Stream;
+use {Poll, Async};
+
+/// A stream which contains no elements.
+///
+/// This stream can be created with the `stream::empty` function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Empty<T, E> {
+ _data: marker::PhantomData<(T, E)>,
+}
+
+/// Creates a stream which contains no elements.
+///
+/// The returned stream will always return `Ready(None)` when polled.
+pub fn empty<T, E>() -> Empty<T, E> {
+ Empty { _data: marker::PhantomData }
+}
+
+impl<T, E> Stream for Empty<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(Async::Ready(None))
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/filter.rs b/third_party/rust/futures-0.1.31/src/stream/filter.rs
new file mode 100644
index 0000000000..99c4abd657
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/filter.rs
@@ -0,0 +1,89 @@
+use {Async, Poll};
+use stream::Stream;
+
+/// A stream combinator used to filter the results of a stream and only yield
+/// some values.
+///
+/// This structure is produced by the `Stream::filter` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Filter<S, F> {
+ stream: S,
+ f: F,
+}
+
+pub fn new<S, F>(s: S, f: F) -> Filter<S, F>
+ where S: Stream,
+ F: FnMut(&S::Item) -> bool,
+{
+ Filter {
+ stream: s,
+ f: f,
+ }
+}
+
+impl<S, F> Filter<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for Filter<S, F>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F> Stream for Filter<S, F>
+ where S: Stream,
+ F: FnMut(&S::Item) -> bool,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ loop {
+ match try_ready!(self.stream.poll()) {
+ Some(e) => {
+ if (self.f)(&e) {
+ return Ok(Async::Ready(Some(e)))
+ }
+ }
+ None => return Ok(Async::Ready(None)),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/filter_map.rs b/third_party/rust/futures-0.1.31/src/stream/filter_map.rs
new file mode 100644
index 0000000000..f91d26a45c
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/filter_map.rs
@@ -0,0 +1,89 @@
+use {Async, Poll};
+use stream::Stream;
+
+/// A combinator used to filter the results of a stream and simultaneously map
+/// them to a different type.
+///
+/// This structure is returned by the `Stream::filter_map` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct FilterMap<S, F> {
+ stream: S,
+ f: F,
+}
+
+pub fn new<S, F, B>(s: S, f: F) -> FilterMap<S, F>
+ where S: Stream,
+ F: FnMut(S::Item) -> Option<B>,
+{
+ FilterMap {
+ stream: s,
+ f: f,
+ }
+}
+
+impl<S, F> FilterMap<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for FilterMap<S, F>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, B> Stream for FilterMap<S, F>
+ where S: Stream,
+ F: FnMut(S::Item) -> Option<B>,
+{
+ type Item = B;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<B>, S::Error> {
+ loop {
+ match try_ready!(self.stream.poll()) {
+ Some(e) => {
+ if let Some(e) = (self.f)(e) {
+ return Ok(Async::Ready(Some(e)))
+ }
+ }
+ None => return Ok(Async::Ready(None)),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/flatten.rs b/third_party/rust/futures-0.1.31/src/stream/flatten.rs
new file mode 100644
index 0000000000..4baf9045a0
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/flatten.rs
@@ -0,0 +1,96 @@
+use {Poll, Async};
+use stream::Stream;
+
+/// A combinator used to flatten a stream-of-streams into one long stream of
+/// elements.
+///
+/// This combinator is created by the `Stream::flatten` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Flatten<S>
+ where S: Stream,
+{
+ stream: S,
+ next: Option<S::Item>,
+}
+
+pub fn new<S>(s: S) -> Flatten<S>
+ where S: Stream,
+ S::Item: Stream,
+ <S::Item as Stream>::Error: From<S::Error>,
+{
+ Flatten {
+ stream: s,
+ next: None,
+ }
+}
+
+impl<S: Stream> Flatten<S> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Flatten<S>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S> Stream for Flatten<S>
+ where S: Stream,
+ S::Item: Stream,
+ <S::Item as Stream>::Error: From<S::Error>,
+{
+ type Item = <S::Item as Stream>::Item;
+ type Error = <S::Item as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ if self.next.is_none() {
+ match try_ready!(self.stream.poll()) {
+ Some(e) => self.next = Some(e),
+ None => return Ok(Async::Ready(None)),
+ }
+ }
+ assert!(self.next.is_some());
+ match self.next.as_mut().unwrap().poll() {
+ Ok(Async::Ready(None)) => self.next = None,
+ other => return other,
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/fold.rs b/third_party/rust/futures-0.1.31/src/stream/fold.rs
new file mode 100644
index 0000000000..7fa24b449d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/fold.rs
@@ -0,0 +1,81 @@
+use core::mem;
+
+use {Future, Poll, IntoFuture, Async};
+use stream::Stream;
+
+/// A future used to collect all the results of a stream into one generic type.
+///
+/// This future is returned by the `Stream::fold` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Fold<S, F, Fut, T> where Fut: IntoFuture {
+ stream: S,
+ f: F,
+ state: State<T, Fut::Future>,
+}
+
+#[derive(Debug)]
+enum State<T, F> where F: Future {
+ /// Placeholder state when doing work
+ Empty,
+
+ /// Ready to process the next stream item; current accumulator is the `T`
+ Ready(T),
+
+ /// Working on a future the process the previous stream item
+ Processing(F),
+}
+
+pub fn new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T>
+ where S: Stream,
+ F: FnMut(T, S::Item) -> Fut,
+ Fut: IntoFuture<Item = T>,
+ S::Error: From<Fut::Error>,
+{
+ Fold {
+ stream: s,
+ f: f,
+ state: State::Ready(t),
+ }
+}
+
+impl<S, F, Fut, T> Future for Fold<S, F, Fut, T>
+ where S: Stream,
+ F: FnMut(T, S::Item) -> Fut,
+ Fut: IntoFuture<Item = T>,
+ S::Error: From<Fut::Error>,
+{
+ type Item = T;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<T, S::Error> {
+ loop {
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Empty => panic!("cannot poll Fold twice"),
+ State::Ready(state) => {
+ match self.stream.poll()? {
+ Async::Ready(Some(e)) => {
+ let future = (self.f)(state, e);
+ let future = future.into_future();
+ self.state = State::Processing(future);
+ }
+ Async::Ready(None) => return Ok(Async::Ready(state)),
+ Async::NotReady => {
+ self.state = State::Ready(state);
+ return Ok(Async::NotReady)
+ }
+ }
+ }
+ State::Processing(mut fut) => {
+ match fut.poll()? {
+ Async::Ready(state) => self.state = State::Ready(state),
+ Async::NotReady => {
+ self.state = State::Processing(fut);
+ return Ok(Async::NotReady)
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/for_each.rs b/third_party/rust/futures-0.1.31/src/stream/for_each.rs
new file mode 100644
index 0000000000..c7e1cde5bb
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/for_each.rs
@@ -0,0 +1,51 @@
+use {Async, Future, IntoFuture, Poll};
+use stream::Stream;
+
+/// A stream combinator which executes a unit closure over each item on a
+/// stream.
+///
+/// This structure is returned by the `Stream::for_each` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct ForEach<S, F, U> where U: IntoFuture {
+ stream: S,
+ f: F,
+ fut: Option<U::Future>,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> ForEach<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+ U: IntoFuture<Item = (), Error = S::Error>,
+{
+ ForEach {
+ stream: s,
+ f: f,
+ fut: None,
+ }
+}
+
+impl<S, F, U> Future for ForEach<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+ U: IntoFuture<Item= (), Error = S::Error>,
+{
+ type Item = ();
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<(), S::Error> {
+ loop {
+ if let Some(mut fut) = self.fut.take() {
+ if fut.poll()?.is_not_ready() {
+ self.fut = Some(fut);
+ return Ok(Async::NotReady);
+ }
+ }
+
+ match try_ready!(self.stream.poll()) {
+ Some(e) => self.fut = Some((self.f)(e).into_future()),
+ None => return Ok(Async::Ready(())),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/forward.rs b/third_party/rust/futures-0.1.31/src/stream/forward.rs
new file mode 100644
index 0000000000..6722af8c20
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/forward.rs
@@ -0,0 +1,110 @@
+use {Poll, Async, Future, AsyncSink};
+use stream::{Stream, Fuse};
+use sink::Sink;
+
+/// Future for the `Stream::forward` combinator, which sends a stream of values
+/// to a sink and then waits until the sink has fully flushed those values.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct Forward<T: Stream, U> {
+ sink: Option<U>,
+ stream: Option<Fuse<T>>,
+ buffered: Option<T::Item>,
+}
+
+
+pub fn new<T, U>(stream: T, sink: U) -> Forward<T, U>
+ where U: Sink<SinkItem=T::Item>,
+ T: Stream,
+ T::Error: From<U::SinkError>,
+{
+ Forward {
+ sink: Some(sink),
+ stream: Some(stream.fuse()),
+ buffered: None,
+ }
+}
+
+impl<T, U> Forward<T, U>
+ where U: Sink<SinkItem=T::Item>,
+ T: Stream,
+ T::Error: From<U::SinkError>,
+{
+ /// Get a shared reference to the inner sink.
+ /// If this combinator has already been polled to completion, None will be returned.
+ pub fn sink_ref(&self) -> Option<&U> {
+ self.sink.as_ref()
+ }
+
+ /// Get a mutable reference to the inner sink.
+ /// If this combinator has already been polled to completion, None will be returned.
+ pub fn sink_mut(&mut self) -> Option<&mut U> {
+ self.sink.as_mut()
+ }
+
+ /// Get a shared reference to the inner stream.
+ /// If this combinator has already been polled to completion, None will be returned.
+ pub fn stream_ref(&self) -> Option<&T> {
+ self.stream.as_ref().map(|x| x.get_ref())
+ }
+
+ /// Get a mutable reference to the inner stream.
+ /// If this combinator has already been polled to completion, None will be returned.
+ pub fn stream_mut(&mut self) -> Option<&mut T> {
+ self.stream.as_mut().map(|x| x.get_mut())
+ }
+
+ fn take_result(&mut self) -> (T, U) {
+ let sink = self.sink.take()
+ .expect("Attempted to poll Forward after completion");
+ let fuse = self.stream.take()
+ .expect("Attempted to poll Forward after completion");
+ (fuse.into_inner(), sink)
+ }
+
+ fn try_start_send(&mut self, item: T::Item) -> Poll<(), U::SinkError> {
+ debug_assert!(self.buffered.is_none());
+ if let AsyncSink::NotReady(item) = self.sink_mut()
+ .expect("Attempted to poll Forward after completion")
+ .start_send(item)?
+ {
+ self.buffered = Some(item);
+ return Ok(Async::NotReady)
+ }
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T, U> Future for Forward<T, U>
+ where U: Sink<SinkItem=T::Item>,
+ T: Stream,
+ T::Error: From<U::SinkError>,
+{
+ type Item = (T, U);
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<(T, U), T::Error> {
+ // If we've got an item buffered already, we need to write it to the
+ // sink before we can do anything else
+ if let Some(item) = self.buffered.take() {
+ try_ready!(self.try_start_send(item))
+ }
+
+ loop {
+ match self.stream.as_mut()
+ .expect("Attempted to poll Forward after completion")
+ .poll()?
+ {
+ Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)),
+ Async::Ready(None) => {
+ try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").close());
+ return Ok(Async::Ready(self.take_result()))
+ }
+ Async::NotReady => {
+ try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").poll_complete());
+ return Ok(Async::NotReady)
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/from_err.rs b/third_party/rust/futures-0.1.31/src/stream/from_err.rs
new file mode 100644
index 0000000000..4028542dfc
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/from_err.rs
@@ -0,0 +1,80 @@
+use core::marker::PhantomData;
+use poll::Poll;
+use Async;
+use stream::Stream;
+
+/// A stream combinator to change the error type of a stream.
+///
+/// This is created by the `Stream::from_err` method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct FromErr<S, E> {
+ stream: S,
+ f: PhantomData<E>
+}
+
+pub fn new<S, E>(stream: S) -> FromErr<S, E>
+ where S: Stream
+{
+ FromErr {
+ stream: stream,
+ f: PhantomData
+ }
+}
+
+impl<S, E> FromErr<S, E> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+
+impl<S: Stream, E: From<S::Error>> Stream for FromErr<S, E> {
+ type Item = S::Item;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, E> {
+ let e = match self.stream.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ other => other,
+ };
+ e.map_err(From::from)
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S: Stream + ::sink::Sink, E> ::sink::Sink for FromErr<S, E> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> ::StartSend<Self::SinkItem, Self::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.stream.close()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/fuse.rs b/third_party/rust/futures-0.1.31/src/stream/fuse.rs
new file mode 100644
index 0000000000..e39c31f348
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/fuse.rs
@@ -0,0 +1,89 @@
+use {Poll, Async};
+use stream::Stream;
+
+/// A stream which "fuse"s a stream once it's terminated.
+///
+/// Normally streams can behave unpredictably when used after they have already
+/// finished, but `Fuse` continues to return `None` from `poll` forever when
+/// finished.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Fuse<S> {
+ stream: S,
+ done: bool,
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Fuse<S>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+pub fn new<S: Stream>(s: S) -> Fuse<S> {
+ Fuse { stream: s, done: false }
+}
+
+impl<S: Stream> Stream for Fuse<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ if self.done {
+ Ok(Async::Ready(None))
+ } else {
+ let r = self.stream.poll();
+ if let Ok(Async::Ready(None)) = r {
+ self.done = true;
+ }
+ r
+ }
+ }
+}
+
+impl<S> Fuse<S> {
+ /// Returns whether the underlying stream has finished or not.
+ ///
+ /// If this method returns `true`, then all future calls to poll are
+ /// guaranteed to return `None`. If this returns `false`, then the
+ /// underlying stream is still in use.
+ pub fn is_done(&self) -> bool {
+ self.done
+ }
+
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/future.rs b/third_party/rust/futures-0.1.31/src/stream/future.rs
new file mode 100644
index 0000000000..5b052ee4d3
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/future.rs
@@ -0,0 +1,76 @@
+use {Future, Poll, Async};
+use stream::Stream;
+
+/// A combinator used to temporarily convert a stream into a future.
+///
+/// This future is returned by the `Stream::into_future` method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct StreamFuture<S> {
+ stream: Option<S>,
+}
+
+pub fn new<S: Stream>(s: S) -> StreamFuture<S> {
+ StreamFuture { stream: Some(s) }
+}
+
+impl<S> StreamFuture<S> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ ///
+ /// This method returns an `Option` to account for the fact that `StreamFuture`'s
+ /// implementation of `Future::poll` consumes the underlying stream during polling
+ /// in order to return it to the caller of `Future::poll` if the stream yielded
+ /// an element.
+ pub fn get_ref(&self) -> Option<&S> {
+ self.stream.as_ref()
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ ///
+ /// This method returns an `Option` to account for the fact that `StreamFuture`'s
+ /// implementation of `Future::poll` consumes the underlying stream during polling
+ /// in order to return it to the caller of `Future::poll` if the stream yielded
+ /// an element.
+ pub fn get_mut(&mut self) -> Option<&mut S> {
+ self.stream.as_mut()
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ ///
+ /// This method returns an `Option` to account for the fact that `StreamFuture`'s
+ /// implementation of `Future::poll` consumes the underlying stream during polling
+ /// in order to return it to the caller of `Future::poll` if the stream yielded
+ /// an element.
+ pub fn into_inner(self) -> Option<S> {
+ self.stream
+ }
+}
+
+impl<S: Stream> Future for StreamFuture<S> {
+ type Item = (Option<S::Item>, S);
+ type Error = (S::Error, S);
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let item = {
+ let s = self.stream.as_mut().expect("polling StreamFuture twice");
+ match s.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(e)) => Ok(e),
+ Err(e) => Err(e),
+ }
+ };
+ let stream = self.stream.take().unwrap();
+ match item {
+ Ok(e) => Ok(Async::Ready((e, stream))),
+ Err(e) => Err((e, stream)),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs b/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs
new file mode 100644
index 0000000000..561bbb5189
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs
@@ -0,0 +1,219 @@
+use std::cmp::{Eq, PartialEq, PartialOrd, Ord, Ordering};
+use std::collections::BinaryHeap;
+use std::fmt::{self, Debug};
+use std::iter::FromIterator;
+
+use {Async, Future, IntoFuture, Poll, Stream};
+use stream::FuturesUnordered;
+
+#[derive(Debug)]
+struct OrderWrapper<T> {
+ item: T,
+ index: usize,
+}
+
+impl<T> PartialEq for OrderWrapper<T> {
+ fn eq(&self, other: &Self) -> bool {
+ self.index == other.index
+ }
+}
+
+impl<T> Eq for OrderWrapper<T> {}
+
+impl<T> PartialOrd for OrderWrapper<T> {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl<T> Ord for OrderWrapper<T> {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // BinaryHeap is a max heap, so compare backwards here.
+ other.index.cmp(&self.index)
+ }
+}
+
+impl<T> Future for OrderWrapper<T>
+ where T: Future
+{
+ type Item = OrderWrapper<T::Item>;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let result = try_ready!(self.item.poll());
+ Ok(Async::Ready(OrderWrapper {
+ item: result,
+ index: self.index
+ }))
+ }
+}
+
+/// An unbounded queue of futures.
+///
+/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
+/// on top of the set of futures. While futures in the set will race to
+/// completion in parallel, results will only be returned in the order their
+/// originating futures were added to the queue.
+///
+/// Futures are pushed into this queue and their realized values are yielded in
+/// order. This structure is optimized to manage a large number of futures.
+/// Futures managed by `FuturesOrdered` will only be polled when they generate
+/// notifications. This reduces the required amount of work needed to coordinate
+/// large numbers of futures.
+///
+/// When a `FuturesOrdered` is first created, it does not contain any futures.
+/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
+/// returned. Futures are submitted to the queue using `push`; however, the
+/// future will **not** be polled at this point. `FuturesOrdered` will only
+/// poll managed futures when `FuturesOrdered::poll` is called. As such, it
+/// is important to call `poll` after pushing new futures.
+///
+/// If `FuturesOrdered::poll` returns `Ok(Async::Ready(None))` this means that
+/// the queue is currently not managing any futures. A future may be submitted
+/// to the queue at a later time. At that point, a call to
+/// `FuturesOrdered::poll` will either return the future's resolved value
+/// **or** `Ok(Async::NotReady)` if the future has not yet completed. When
+/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
+/// return `Ok(Async::NotReady)` until the first future completes, even if
+/// some of the later futures have already completed.
+///
+/// Note that you can create a ready-made `FuturesOrdered` via the
+/// `futures_ordered` function in the `stream` module, or you can start with an
+/// empty queue with the `FuturesOrdered::new` constructor.
+#[must_use = "streams do nothing unless polled"]
+pub struct FuturesOrdered<T>
+ where T: Future
+{
+ in_progress: FuturesUnordered<OrderWrapper<T>>,
+ queued_results: BinaryHeap<OrderWrapper<T::Item>>,
+ next_incoming_index: usize,
+ next_outgoing_index: usize,
+}
+
+/// Converts a list of futures into a `Stream` of results from the futures.
+///
+/// This function will take an list of futures (e.g. a vector, an iterator,
+/// etc), and return a stream. The stream will yield items as they become
+/// available on the futures internally, in the order that their originating
+/// futures were submitted to the queue. If the futures complete out of order,
+/// items will be stored internally within `FuturesOrdered` until all preceding
+/// items have been yielded.
+///
+/// Note that the returned queue can also be used to dynamically push more
+/// futures into the queue as they become available.
+pub fn futures_ordered<I>(futures: I) -> FuturesOrdered<<I::Item as IntoFuture>::Future>
+ where I: IntoIterator,
+ I::Item: IntoFuture
+{
+ let mut queue = FuturesOrdered::new();
+
+ for future in futures {
+ queue.push(future.into_future());
+ }
+
+ return queue
+}
+
+impl<T> Default for FuturesOrdered<T> where T: Future {
+ fn default() -> Self {
+ FuturesOrdered::new()
+ }
+}
+
+impl<T> FuturesOrdered<T>
+ where T: Future
+{
+ /// Constructs a new, empty `FuturesOrdered`
+ ///
+ /// The returned `FuturesOrdered` does not contain any futures and, in this
+ /// state, `FuturesOrdered::poll` will return `Ok(Async::Ready(None))`.
+ pub fn new() -> FuturesOrdered<T> {
+ FuturesOrdered {
+ in_progress: FuturesUnordered::new(),
+ queued_results: BinaryHeap::new(),
+ next_incoming_index: 0,
+ next_outgoing_index: 0,
+ }
+ }
+
+ /// Returns the number of futures contained in the queue.
+ ///
+ /// This represents the total number of in-flight futures, both
+ /// those currently processing and those that have completed but
+ /// which are waiting for earlier futures to complete.
+ pub fn len(&self) -> usize {
+ self.in_progress.len() + self.queued_results.len()
+ }
+
+ /// Returns `true` if the queue contains no futures
+ pub fn is_empty(&self) -> bool {
+ self.in_progress.is_empty() && self.queued_results.is_empty()
+ }
+
+ /// Push a future into the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications.
+ pub fn push(&mut self, future: T) {
+ let wrapped = OrderWrapper {
+ item: future,
+ index: self.next_incoming_index,
+ };
+ self.next_incoming_index += 1;
+ self.in_progress.push(wrapped);
+ }
+}
+
+impl<T> Stream for FuturesOrdered<T>
+ where T: Future
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // Get any completed futures from the unordered set.
+ loop {
+ match self.in_progress.poll()? {
+ Async::Ready(Some(result)) => self.queued_results.push(result),
+ Async::Ready(None) | Async::NotReady => break,
+ }
+ }
+
+ if let Some(next_result) = self.queued_results.peek() {
+ // PeekMut::pop is not stable yet QQ
+ if next_result.index != self.next_outgoing_index {
+ return Ok(Async::NotReady);
+ }
+ } else if !self.in_progress.is_empty() {
+ return Ok(Async::NotReady);
+ } else {
+ return Ok(Async::Ready(None));
+ }
+
+ let next_result = self.queued_results.pop().unwrap();
+ self.next_outgoing_index += 1;
+ Ok(Async::Ready(Some(next_result.item)))
+ }
+}
+
+impl<T: Debug> Debug for FuturesOrdered<T>
+ where T: Future
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "FuturesOrdered {{ ... }}")
+ }
+}
+
+impl<F: Future> FromIterator<F> for FuturesOrdered<F> {
+ fn from_iter<T>(iter: T) -> Self
+ where T: IntoIterator<Item = F>
+ {
+ let mut new = FuturesOrdered::new();
+ for future in iter.into_iter() {
+ new.push(future);
+ }
+ new
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs b/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs
new file mode 100644
index 0000000000..3f25c86f39
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs
@@ -0,0 +1,707 @@
+//! An unbounded set of futures.
+
+use std::cell::UnsafeCell;
+use std::fmt::{self, Debug};
+use std::iter::FromIterator;
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
+use std::sync::atomic::{AtomicPtr, AtomicBool};
+use std::sync::{Arc, Weak};
+use std::usize;
+
+use {task, Stream, Future, Poll, Async};
+use executor::{Notify, UnsafeNotify, NotifyHandle};
+use task_impl::{self, AtomicTask};
+
+/// An unbounded set of futures.
+///
+/// This "combinator" also serves a special function in this library, providing
+/// the ability to maintain a set of futures that and manage driving them all
+/// to completion.
+///
+/// Futures are pushed into this set and their realized values are yielded as
+/// they are ready. This structure is optimized to manage a large number of
+/// futures. Futures managed by `FuturesUnordered` will only be polled when they
+/// generate notifications. This reduces the required amount of work needed to
+/// coordinate large numbers of futures.
+///
+/// When a `FuturesUnordered` is first created, it does not contain any futures.
+/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
+/// returned. Futures are submitted to the set using `push`; however, the
+/// future will **not** be polled at this point. `FuturesUnordered` will only
+/// poll managed futures when `FuturesUnordered::poll` is called. As such, it
+/// is important to call `poll` after pushing new futures.
+///
+/// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that
+/// the set is currently not managing any futures. A future may be submitted
+/// to the set at a later time. At that point, a call to
+/// `FuturesUnordered::poll` will either return the future's resolved value
+/// **or** `Ok(Async::NotReady)` if the future has not yet completed.
+///
+/// Note that you can create a ready-made `FuturesUnordered` via the
+/// `futures_unordered` function in the `stream` module, or you can start with an
+/// empty set with the `FuturesUnordered::new` constructor.
+#[must_use = "streams do nothing unless polled"]
+pub struct FuturesUnordered<F> {
+ inner: Arc<Inner<F>>,
+ len: usize,
+ head_all: *const Node<F>,
+}
+
+unsafe impl<T: Send> Send for FuturesUnordered<T> {}
+unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
+
+// FuturesUnordered is implemented using two linked lists. One which links all
+// futures managed by a `FuturesUnordered` and one that tracks futures that have
+// been scheduled for polling. The first linked list is not thread safe and is
+// only accessed by the thread that owns the `FuturesUnordered` value. The
+// second linked list is an implementation of the intrusive MPSC queue algorithm
+// described by 1024cores.net.
+//
+// When a future is submitted to the set a node is allocated and inserted in
+// both linked lists. The next call to `poll` will (eventually) see this node
+// and call `poll` on the future.
+//
+// Before a managed future is polled, the current task's `Notify` is replaced
+// with one that is aware of the specific future being run. This ensures that
+// task notifications generated by that specific future are visible to
+// `FuturesUnordered`. When a notification is received, the node is scheduled
+// for polling by being inserted into the concurrent linked list.
+//
+// Each node uses an `AtomicUsize` to track it's state. The node state is the
+// reference count (the number of outstanding handles to the node) as well as a
+// flag tracking if the node is currently inserted in the atomic queue. When the
+// future is notified, it will only insert itself into the linked list if it
+// isn't currently inserted.
+
+#[allow(missing_debug_implementations)]
+struct Inner<T> {
+ // The task using `FuturesUnordered`.
+ parent: AtomicTask,
+
+ // Head/tail of the readiness queue
+ head_readiness: AtomicPtr<Node<T>>,
+ tail_readiness: UnsafeCell<*const Node<T>>,
+ stub: Arc<Node<T>>,
+}
+
+struct Node<T> {
+ // The future
+ future: UnsafeCell<Option<T>>,
+
+ // Next pointer for linked list tracking all active nodes
+ next_all: UnsafeCell<*const Node<T>>,
+
+ // Previous node in linked list tracking all active nodes
+ prev_all: UnsafeCell<*const Node<T>>,
+
+ // Next pointer in readiness queue
+ next_readiness: AtomicPtr<Node<T>>,
+
+ // Queue that we'll be enqueued to when notified
+ queue: Weak<Inner<T>>,
+
+ // Whether or not this node is currently in the mpsc queue.
+ queued: AtomicBool,
+}
+
+enum Dequeue<T> {
+ Data(*const Node<T>),
+ Empty,
+ Inconsistent,
+}
+
+impl<T> Default for FuturesUnordered<T> where T: Future {
+ fn default() -> Self {
+ FuturesUnordered::new()
+ }
+}
+
+impl<T> FuturesUnordered<T>
+ where T: Future,
+{
+ /// Constructs a new, empty `FuturesUnordered`
+ ///
+ /// The returned `FuturesUnordered` does not contain any futures and, in this
+ /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`.
+ pub fn new() -> FuturesUnordered<T> {
+ let stub = Arc::new(Node {
+ future: UnsafeCell::new(None),
+ next_all: UnsafeCell::new(ptr::null()),
+ prev_all: UnsafeCell::new(ptr::null()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Weak::new(),
+ });
+ let stub_ptr = &*stub as *const Node<T>;
+ let inner = Arc::new(Inner {
+ parent: AtomicTask::new(),
+ head_readiness: AtomicPtr::new(stub_ptr as *mut _),
+ tail_readiness: UnsafeCell::new(stub_ptr),
+ stub: stub,
+ });
+
+ FuturesUnordered {
+ len: 0,
+ head_all: ptr::null_mut(),
+ inner: inner,
+ }
+ }
+}
+
+impl<T> FuturesUnordered<T> {
+ /// Returns the number of futures contained in the set.
+ ///
+ /// This represents the total number of in-flight futures.
+ pub fn len(&self) -> usize {
+ self.len
+ }
+
+ /// Returns `true` if the set contains no futures
+ pub fn is_empty(&self) -> bool {
+ self.len == 0
+ }
+
+ /// Push a future into the set.
+ ///
+ /// This function submits the given future to the set for managing. This
+ /// function will not call `poll` on the submitted future. The caller must
+ /// ensure that `FuturesUnordered::poll` is called in order to receive task
+ /// notifications.
+ pub fn push(&mut self, future: T) {
+ let node = Arc::new(Node {
+ future: UnsafeCell::new(Some(future)),
+ next_all: UnsafeCell::new(ptr::null_mut()),
+ prev_all: UnsafeCell::new(ptr::null_mut()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Arc::downgrade(&self.inner),
+ });
+
+ // Right now our node has a strong reference count of 1. We transfer
+ // ownership of this reference count to our internal linked list
+ // and we'll reclaim ownership through the `unlink` function below.
+ let ptr = self.link(node);
+
+ // We'll need to get the future "into the system" to start tracking it,
+ // e.g. getting its unpark notifications going to us tracking which
+ // futures are ready. To do that we unconditionally enqueue it for
+ // polling here.
+ self.inner.enqueue(ptr);
+ }
+
+ /// Returns an iterator that allows modifying each future in the set.
+ pub fn iter_mut(&mut self) -> IterMut<T> {
+ IterMut {
+ node: self.head_all,
+ len: self.len,
+ _marker: PhantomData
+ }
+ }
+
+ fn release_node(&mut self, node: Arc<Node<T>>) {
+ // The future is done, try to reset the queued flag. This will prevent
+ // `notify` from doing any work in the future
+ let prev = node.queued.swap(true, SeqCst);
+
+ // Drop the future, even if it hasn't finished yet. This is safe
+ // because we're dropping the future on the thread that owns
+ // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
+ unsafe {
+ drop((*node.future.get()).take());
+ }
+
+ // If the queued flag was previously set then it means that this node
+ // is still in our internal mpsc queue. We then transfer ownership
+ // of our reference count to the mpsc queue, and it'll come along and
+ // free it later, noticing that the future is `None`.
+ //
+ // If, however, the queued flag was *not* set then we're safe to
+ // release our reference count on the internal node. The queued flag
+ // was set above so all future `enqueue` operations will not actually
+ // enqueue the node, so our node will never see the mpsc queue again.
+ // The node itself will be deallocated once all reference counts have
+ // been dropped by the various owning tasks elsewhere.
+ if prev {
+ mem::forget(node);
+ }
+ }
+
+ /// Insert a new node into the internal linked list.
+ fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
+ let ptr = arc2ptr(node);
+ unsafe {
+ *(*ptr).next_all.get() = self.head_all;
+ if !self.head_all.is_null() {
+ *(*self.head_all).prev_all.get() = ptr;
+ }
+ }
+
+ self.head_all = ptr;
+ self.len += 1;
+ return ptr
+ }
+
+ /// Remove the node from the linked list tracking all nodes currently
+ /// managed by `FuturesUnordered`.
+ unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
+ let node = ptr2arc(node);
+ let next = *node.next_all.get();
+ let prev = *node.prev_all.get();
+ *node.next_all.get() = ptr::null_mut();
+ *node.prev_all.get() = ptr::null_mut();
+
+ if !next.is_null() {
+ *(*next).prev_all.get() = prev;
+ }
+
+ if !prev.is_null() {
+ *(*prev).next_all.get() = next;
+ } else {
+ self.head_all = next;
+ }
+ self.len -= 1;
+ return node
+ }
+}
+
+impl<T> Stream for FuturesUnordered<T>
+ where T: Future
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
+ // Variable to determine how many times it is allowed to poll underlying
+ // futures without yielding.
+ //
+ // A single call to `poll_next` may potentially do a lot of work before
+ // yielding. This happens in particular if the underlying futures are awoken
+ // frequently but continue to return `Pending`. This is problematic if other
+ // tasks are waiting on the executor, since they do not get to run. This value
+ // caps the number of calls to `poll` on underlying futures a single call to
+ // `poll_next` is allowed to make.
+ //
+ // The value is the length of FuturesUnordered. This ensures that each
+ // future is polled only once at most per iteration.
+ //
+ // See also https://github.com/rust-lang/futures-rs/issues/2047.
+ let yield_every = self.len();
+
+ // Keep track of how many child futures we have polled,
+ // in case we want to forcibly yield.
+ let mut polled = 0;
+
+ // Ensure `parent` is correctly set.
+ self.inner.parent.register();
+
+ loop {
+ let node = match unsafe { self.inner.dequeue() } {
+ Dequeue::Empty => {
+ if self.is_empty() {
+ return Ok(Async::Ready(None));
+ } else {
+ return Ok(Async::NotReady)
+ }
+ }
+ Dequeue::Inconsistent => {
+ // At this point, it may be worth yielding the thread &
+ // spinning a few times... but for now, just yield using the
+ // task system.
+ task::current().notify();
+ return Ok(Async::NotReady);
+ }
+ Dequeue::Data(node) => node,
+ };
+
+ debug_assert!(node != self.inner.stub());
+
+ unsafe {
+ let mut future = match (*(*node).future.get()).take() {
+ Some(future) => future,
+
+ // If the future has already gone away then we're just
+ // cleaning out this node. See the comment in
+ // `release_node` for more information, but we're basically
+ // just taking ownership of our reference count here.
+ None => {
+ let node = ptr2arc(node);
+ assert!((*node.next_all.get()).is_null());
+ assert!((*node.prev_all.get()).is_null());
+ continue
+ }
+ };
+
+ // Unset queued flag... this must be done before
+ // polling. This ensures that the future gets
+ // rescheduled if it is notified **during** a call
+ // to `poll`.
+ let prev = (*node).queued.swap(false, SeqCst);
+ assert!(prev);
+
+ // We're going to need to be very careful if the `poll`
+ // function below panics. We need to (a) not leak memory and
+ // (b) ensure that we still don't have any use-after-frees. To
+ // manage this we do a few things:
+ //
+ // * This "bomb" here will call `release_node` if dropped
+ // abnormally. That way we'll be sure the memory management
+ // of the `node` is managed correctly.
+ // * The future was extracted above (taken ownership). That way
+ // if it panics we're guaranteed that the future is
+ // dropped on this thread and doesn't accidentally get
+ // dropped on a different thread (bad).
+ // * We unlink the node from our internal queue to preemptively
+ // assume it'll panic, in which case we'll want to discard it
+ // regardless.
+ struct Bomb<'a, T: 'a> {
+ queue: &'a mut FuturesUnordered<T>,
+ node: Option<Arc<Node<T>>>,
+ }
+ impl<'a, T> Drop for Bomb<'a, T> {
+ fn drop(&mut self) {
+ if let Some(node) = self.node.take() {
+ self.queue.release_node(node);
+ }
+ }
+ }
+ let mut bomb = Bomb {
+ node: Some(self.unlink(node)),
+ queue: self,
+ };
+
+ // Poll the underlying future with the appropriate `notify`
+ // implementation. This is where a large bit of the unsafety
+ // starts to stem from internally. The `notify` instance itself
+ // is basically just our `Arc<Node<T>>` and tracks the mpsc
+ // queue of ready futures.
+ //
+ // Critically though `Node<T>` won't actually access `T`, the
+ // future, while it's floating around inside of `Task`
+ // instances. These structs will basically just use `T` to size
+ // the internal allocation, appropriately accessing fields and
+ // deallocating the node if need be.
+ let res = {
+ let notify = NodeToHandle(bomb.node.as_ref().unwrap());
+ task_impl::with_notify(&notify, 0, || {
+ future.poll()
+ })
+ };
+ polled += 1;
+
+ let ret = match res {
+ Ok(Async::NotReady) => {
+ let node = bomb.node.take().unwrap();
+ *node.future.get() = Some(future);
+ bomb.queue.link(node);
+
+ if polled == yield_every {
+ // We have polled a large number of futures in a row without yielding.
+ // To ensure we do not starve other tasks waiting on the executor,
+ // we yield here, but immediately wake ourselves up to continue.
+ task_impl::current().notify();
+ return Ok(Async::NotReady);
+ }
+ continue
+ }
+ Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
+ Err(e) => Err(e),
+ };
+ return ret
+ }
+ }
+ }
+}
+
+impl<T: Debug> Debug for FuturesUnordered<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "FuturesUnordered {{ ... }}")
+ }
+}
+
+impl<T> Drop for FuturesUnordered<T> {
+ fn drop(&mut self) {
+ // When a `FuturesUnordered` is dropped we want to drop all futures associated
+ // with it. At the same time though there may be tons of `Task` handles
+ // flying around which contain `Node<T>` references inside them. We'll
+ // let those naturally get deallocated when the `Task` itself goes out
+ // of scope or gets notified.
+ unsafe {
+ while !self.head_all.is_null() {
+ let head = self.head_all;
+ let node = self.unlink(head);
+ self.release_node(node);
+ }
+ }
+
+ // Note that at this point we could still have a bunch of nodes in the
+ // mpsc queue. None of those nodes, however, have futures associated
+ // with them so they're safe to destroy on any thread. At this point
+ // the `FuturesUnordered` struct, the owner of the one strong reference
+ // to `Inner<T>` will drop the strong reference. At that point
+ // whichever thread releases the strong refcount last (be it this
+ // thread or some other thread as part of an `upgrade`) will clear out
+ // the mpsc queue and free all remaining nodes.
+ //
+ // While that freeing operation isn't guaranteed to happen here, it's
+ // guaranteed to happen "promptly" as no more "blocking work" will
+ // happen while there's a strong refcount held.
+ }
+}
+
+impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
+ fn from_iter<T>(iter: T) -> Self
+ where T: IntoIterator<Item = F>
+ {
+ let mut new = FuturesUnordered::new();
+ for future in iter.into_iter() {
+ new.push(future);
+ }
+ new
+ }
+}
+
+#[derive(Debug)]
+/// Mutable iterator over all futures in the unordered set.
+pub struct IterMut<'a, F: 'a> {
+ node: *const Node<F>,
+ len: usize,
+ _marker: PhantomData<&'a mut FuturesUnordered<F>>
+}
+
+impl<'a, F> Iterator for IterMut<'a, F> {
+ type Item = &'a mut F;
+
+ fn next(&mut self) -> Option<&'a mut F> {
+ if self.node.is_null() {
+ return None;
+ }
+ unsafe {
+ let future = (*(*self.node).future.get()).as_mut().unwrap();
+ let next = *(*self.node).next_all.get();
+ self.node = next;
+ self.len -= 1;
+ return Some(future);
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (self.len, Some(self.len))
+ }
+}
+
+impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
+
+impl<T> Inner<T> {
+ /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
+ fn enqueue(&self, node: *const Node<T>) {
+ unsafe {
+ debug_assert!((*node).queued.load(Relaxed));
+
+ // This action does not require any coordination
+ (*node).next_readiness.store(ptr::null_mut(), Relaxed);
+
+ // Note that these atomic orderings come from 1024cores
+ let node = node as *mut _;
+ let prev = self.head_readiness.swap(node, AcqRel);
+ (*prev).next_readiness.store(node, Release);
+ }
+ }
+
+ /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
+ ///
+ /// Note that this unsafe as it required mutual exclusion (only one thread
+ /// can call this) to be guaranteed elsewhere.
+ unsafe fn dequeue(&self) -> Dequeue<T> {
+ let mut tail = *self.tail_readiness.get();
+ let mut next = (*tail).next_readiness.load(Acquire);
+
+ if tail == self.stub() {
+ if next.is_null() {
+ return Dequeue::Empty;
+ }
+
+ *self.tail_readiness.get() = next;
+ tail = next;
+ next = (*next).next_readiness.load(Acquire);
+ }
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ debug_assert!(tail != self.stub());
+ return Dequeue::Data(tail);
+ }
+
+ if self.head_readiness.load(Acquire) as *const _ != tail {
+ return Dequeue::Inconsistent;
+ }
+
+ self.enqueue(self.stub());
+
+ next = (*tail).next_readiness.load(Acquire);
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ return Dequeue::Data(tail);
+ }
+
+ Dequeue::Inconsistent
+ }
+
+ fn stub(&self) -> *const Node<T> {
+ &*self.stub
+ }
+}
+
+impl<T> Drop for Inner<T> {
+ fn drop(&mut self) {
+ // Once we're in the destructor for `Inner<T>` we need to clear out the
+ // mpsc queue of nodes if there's anything left in there.
+ //
+ // Note that each node has a strong reference count associated with it
+ // which is owned by the mpsc queue. All nodes should have had their
+ // futures dropped already by the `FuturesUnordered` destructor above,
+ // so we're just pulling out nodes and dropping their refcounts.
+ unsafe {
+ loop {
+ match self.dequeue() {
+ Dequeue::Empty => break,
+ Dequeue::Inconsistent => abort("inconsistent in drop"),
+ Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
+ }
+ }
+ }
+ }
+}
+
+#[allow(missing_debug_implementations)]
+struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
+
+impl<'a, T> Clone for NodeToHandle<'a, T> {
+ fn clone(&self) -> Self {
+ NodeToHandle(self.0)
+ }
+}
+
+impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
+ fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
+ unsafe {
+ let ptr = handle.0.clone();
+ let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
+ NotifyHandle::new(hide_lt(ptr))
+ }
+ }
+}
+
+struct ArcNode<T>(PhantomData<T>);
+
+// We should never touch `T` on any thread other than the one owning
+// `FuturesUnordered`, so this should be a safe operation.
+unsafe impl<T> Send for ArcNode<T> {}
+unsafe impl<T> Sync for ArcNode<T> {}
+
+impl<T> Notify for ArcNode<T> {
+ fn notify(&self, _id: usize) {
+ unsafe {
+ let me: *const ArcNode<T> = self;
+ let me: *const *const ArcNode<T> = &me;
+ let me = me as *const Arc<Node<T>>;
+ Node::notify(&*me)
+ }
+ }
+}
+
+unsafe impl<T> UnsafeNotify for ArcNode<T> {
+ unsafe fn clone_raw(&self) -> NotifyHandle {
+ let me: *const ArcNode<T> = self;
+ let me: *const *const ArcNode<T> = &me;
+ let me = &*(me as *const Arc<Node<T>>);
+ NodeToHandle(me).into()
+ }
+
+ unsafe fn drop_raw(&self) {
+ let mut me: *const ArcNode<T> = self;
+ let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
+ ptr::drop_in_place(me);
+ }
+}
+
+unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
+ mem::transmute(p as *mut UnsafeNotify)
+}
+
+impl<T> Node<T> {
+ fn notify(me: &Arc<Node<T>>) {
+ let inner = match me.queue.upgrade() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ // It's our job to notify the node that it's ready to get polled,
+ // meaning that we need to enqueue it into the readiness queue. To
+ // do this we flag that we're ready to be queued, and if successful
+ // we then do the literal queueing operation, ensuring that we're
+ // only queued once.
+ //
+ // Once the node is inserted we be sure to notify the parent task,
+ // as it'll want to come along and pick up our node now.
+ //
+ // Note that we don't change the reference count of the node here,
+ // we're just enqueueing the raw pointer. The `FuturesUnordered`
+ // implementation guarantees that if we set the `queued` flag true that
+ // there's a reference count held by the main `FuturesUnordered` queue
+ // still.
+ let prev = me.queued.swap(true, SeqCst);
+ if !prev {
+ inner.enqueue(&**me);
+ inner.parent.notify();
+ }
+ }
+}
+
+impl<T> Drop for Node<T> {
+ fn drop(&mut self) {
+ // Currently a `Node<T>` is sent across all threads for any lifetime,
+ // regardless of `T`. This means that for memory safety we can't
+ // actually touch `T` at any time except when we have a reference to the
+ // `FuturesUnordered` itself.
+ //
+ // Consequently it *should* be the case that we always drop futures from
+ // the `FuturesUnordered` instance, but this is a bomb in place to catch
+ // any bugs in that logic.
+ unsafe {
+ if (*self.future.get()).is_some() {
+ abort("future still here when dropping");
+ }
+ }
+ }
+}
+
+fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
+ let addr = &*ptr as *const T;
+ mem::forget(ptr);
+ return addr
+}
+
+unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
+ let anchor = mem::transmute::<usize, Arc<T>>(0x10);
+ let addr = &*anchor as *const T;
+ mem::forget(anchor);
+ let offset = addr as isize - 0x10;
+ mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
+}
+
+fn abort(s: &str) -> ! {
+ struct DoublePanic;
+
+ impl Drop for DoublePanic {
+ fn drop(&mut self) {
+ panic!("panicking twice to abort the program");
+ }
+ }
+
+ let _bomb = DoublePanic;
+ panic!("{}", s);
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/inspect.rs b/third_party/rust/futures-0.1.31/src/stream/inspect.rs
new file mode 100644
index 0000000000..fc8f7f4ea2
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/inspect.rs
@@ -0,0 +1,84 @@
+use {Stream, Poll, Async};
+
+/// Do something with the items of a stream, passing it on.
+///
+/// This is created by the `Stream::inspect` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Inspect<S, F> where S: Stream {
+ stream: S,
+ inspect: F,
+}
+
+pub fn new<S, F>(stream: S, f: F) -> Inspect<S, F>
+ where S: Stream,
+ F: FnMut(&S::Item) -> (),
+{
+ Inspect {
+ stream: stream,
+ inspect: f,
+ }
+}
+
+impl<S: Stream, F> Inspect<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for Inspect<S, F>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F> Stream for Inspect<S, F>
+ where S: Stream,
+ F: FnMut(&S::Item),
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ match try_ready!(self.stream.poll()) {
+ Some(e) => {
+ (self.inspect)(&e);
+ Ok(Async::Ready(Some(e)))
+ }
+ None => Ok(Async::Ready(None)),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs b/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs
new file mode 100644
index 0000000000..5c56a217ff
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs
@@ -0,0 +1,81 @@
+use {Stream, Poll};
+
+/// Do something with the error of a stream, passing it on.
+///
+/// This is created by the `Stream::inspect_err` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct InspectErr<S, F> where S: Stream {
+ stream: S,
+ inspect: F,
+}
+
+pub fn new<S, F>(stream: S, f: F) -> InspectErr<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> (),
+{
+ InspectErr {
+ stream: stream,
+ inspect: f,
+ }
+}
+
+impl<S: Stream, F> InspectErr<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for InspectErr<S, F>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F> Stream for InspectErr<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error),
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.stream.poll().map_err(|e| {
+ (self.inspect)(&e);
+ e
+ })
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/iter.rs b/third_party/rust/futures-0.1.31/src/stream/iter.rs
new file mode 100644
index 0000000000..e0b9379353
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/iter.rs
@@ -0,0 +1,46 @@
+#![deprecated(note = "implementation moved to `iter_ok` and `iter_result`")]
+#![allow(deprecated)]
+
+use Poll;
+use stream::{iter_result, IterResult, Stream};
+
+/// A stream which is just a shim over an underlying instance of `Iterator`.
+///
+/// This stream will never block and is always ready.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I>(IterResult<I>);
+
+/// Converts an `Iterator` over `Result`s into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter simply
+/// always calls `iter.next()` and returns that.
+///
+/// ```rust
+/// use futures::*;
+///
+/// let mut stream = stream::iter(vec![Ok(17), Err(false), Ok(19)]);
+/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll());
+/// assert_eq!(Err(false), stream.poll());
+/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
+/// ```
+#[inline]
+pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
+ where J: IntoIterator<Item=Result<T, E>>,
+{
+ Iter(iter_result(i))
+}
+
+impl<I, T, E> Stream for Iter<I>
+ where I: Iterator<Item=Result<T, E>>,
+{
+ type Item = T;
+ type Error = E;
+
+ #[inline]
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ self.0.poll()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs b/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs
new file mode 100644
index 0000000000..9c8d871399
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs
@@ -0,0 +1,48 @@
+use core::marker;
+
+use {Async, Poll};
+use stream::Stream;
+
+/// A stream which is just a shim over an underlying instance of `Iterator`.
+///
+/// This stream will never block and is always ready.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct IterOk<I, E> {
+ iter: I,
+ _marker: marker::PhantomData<fn() -> E>,
+}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```rust
+/// use futures::*;
+///
+/// let mut stream = stream::iter_ok::<_, ()>(vec![17, 19]);
+/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
+/// ```
+pub fn iter_ok<I, E>(i: I) -> IterOk<I::IntoIter, E>
+ where I: IntoIterator,
+{
+ IterOk {
+ iter: i.into_iter(),
+ _marker: marker::PhantomData,
+ }
+}
+
+impl<I, E> Stream for IterOk<I, E>
+ where I: Iterator,
+{
+ type Item = I::Item;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<I::Item>, E> {
+ Ok(Async::Ready(self.iter.next()))
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/iter_result.rs b/third_party/rust/futures-0.1.31/src/stream/iter_result.rs
new file mode 100644
index 0000000000..4eef5da08e
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/iter_result.rs
@@ -0,0 +1,51 @@
+use {Async, Poll};
+use stream::Stream;
+
+/// A stream which is just a shim over an underlying instance of `Iterator`.
+///
+/// This stream will never block and is always ready.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct IterResult<I> {
+ iter: I,
+}
+
+/// Converts an `Iterator` over `Result`s into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter simply
+/// always calls `iter.next()` and returns that.
+///
+/// ```rust
+/// use futures::*;
+///
+/// let mut stream = stream::iter_result(vec![Ok(17), Err(false), Ok(19)]);
+/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll());
+/// assert_eq!(Err(false), stream.poll());
+/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
+/// ```
+pub fn iter_result<J, T, E>(i: J) -> IterResult<J::IntoIter>
+where
+ J: IntoIterator<Item = Result<T, E>>,
+{
+ IterResult {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I, T, E> Stream for IterResult<I>
+where
+ I: Iterator<Item = Result<T, E>>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ match self.iter.next() {
+ Some(Ok(e)) => Ok(Async::Ready(Some(e))),
+ Some(Err(e)) => Err(e),
+ None => Ok(Async::Ready(None)),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/map.rs b/third_party/rust/futures-0.1.31/src/stream/map.rs
new file mode 100644
index 0000000000..702e980b3f
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/map.rs
@@ -0,0 +1,81 @@
+use {Async, Poll};
+use stream::Stream;
+
+/// A stream combinator which will change the type of a stream from one
+/// type to another.
+///
+/// This is produced by the `Stream::map` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Map<S, F> {
+ stream: S,
+ f: F,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> Map<S, F>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+{
+ Map {
+ stream: s,
+ f: f,
+ }
+}
+
+impl<S, F> Map<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for Map<S, F>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, U> Stream for Map<S, F>
+ where S: Stream,
+ F: FnMut(S::Item) -> U,
+{
+ type Item = U;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<U>, S::Error> {
+ let option = try_ready!(self.stream.poll());
+ Ok(Async::Ready(option.map(&mut self.f)))
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/map_err.rs b/third_party/rust/futures-0.1.31/src/stream/map_err.rs
new file mode 100644
index 0000000000..8d1c0fc083
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/map_err.rs
@@ -0,0 +1,80 @@
+use Poll;
+use stream::Stream;
+
+/// A stream combinator which will change the error type of a stream from one
+/// type to another.
+///
+/// This is produced by the `Stream::map_err` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct MapErr<S, F> {
+ stream: S,
+ f: F,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> MapErr<S, F>
+ where S: Stream,
+ F: FnMut(S::Error) -> U,
+{
+ MapErr {
+ stream: s,
+ f: f,
+ }
+}
+
+impl<S, F> MapErr<S, F> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F> ::sink::Sink for MapErr<S, F>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, U> Stream for MapErr<S, F>
+ where S: Stream,
+ F: FnMut(S::Error) -> U,
+{
+ type Item = S::Item;
+ type Error = U;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, U> {
+ self.stream.poll().map_err(&mut self.f)
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/merge.rs b/third_party/rust/futures-0.1.31/src/stream/merge.rs
new file mode 100644
index 0000000000..af7505e69a
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/merge.rs
@@ -0,0 +1,82 @@
+#![deprecated(note = "functionality provided by `select` now")]
+#![allow(deprecated)]
+
+use {Poll, Async};
+use stream::{Stream, Fuse};
+
+/// An adapter for merging the output of two streams.
+///
+/// The merged stream produces items from one or both of the underlying
+/// streams as they become available. Errors, however, are not merged: you
+/// get at most one error at a time.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Merge<S1, S2: Stream> {
+ stream1: Fuse<S1>,
+ stream2: Fuse<S2>,
+ queued_error: Option<S2::Error>,
+}
+
+pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ Merge {
+ stream1: stream1.fuse(),
+ stream2: stream2.fuse(),
+ queued_error: None,
+ }
+}
+
+/// An item returned from a merge stream, which represents an item from one or
+/// both of the underlying streams.
+#[derive(Debug)]
+pub enum MergedItem<I1, I2> {
+ /// An item from the first stream
+ First(I1),
+ /// An item from the second stream
+ Second(I2),
+ /// Items from both streams
+ Both(I1, I2),
+}
+
+impl<S1, S2> Stream for Merge<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ type Item = MergedItem<S1::Item, S2::Item>;
+ type Error = S1::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if let Some(e) = self.queued_error.take() {
+ return Err(e)
+ }
+
+ match self.stream1.poll()? {
+ Async::NotReady => {
+ match try_ready!(self.stream2.poll()) {
+ Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
+ None => Ok(Async::NotReady),
+ }
+ }
+ Async::Ready(None) => {
+ match try_ready!(self.stream2.poll()) {
+ Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
+ None => Ok(Async::Ready(None)),
+ }
+ }
+ Async::Ready(Some(item1)) => {
+ match self.stream2.poll() {
+ Err(e) => {
+ self.queued_error = Some(e);
+ Ok(Async::Ready(Some(MergedItem::First(item1))))
+ }
+ Ok(Async::NotReady) | Ok(Async::Ready(None)) => {
+ Ok(Async::Ready(Some(MergedItem::First(item1))))
+ }
+ Ok(Async::Ready(Some(item2))) => {
+ Ok(Async::Ready(Some(MergedItem::Both(item1, item2))))
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/mod.rs b/third_party/rust/futures-0.1.31/src/stream/mod.rs
new file mode 100644
index 0000000000..2d90362470
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/mod.rs
@@ -0,0 +1,1146 @@
+//! Asynchronous streams
+//!
+//! This module contains the `Stream` trait and a number of adaptors for this
+//! trait. This trait is very similar to the `Iterator` trait in the standard
+//! library except that it expresses the concept of blocking as well. A stream
+//! here is a sequential sequence of values which may take some amount of time
+//! in between to produce.
+//!
+//! A stream may request that it is blocked between values while the next value
+//! is calculated, and provides a way to get notified once the next value is
+//! ready as well.
+//!
+//! You can find more information/tutorials about streams [online at
+//! https://tokio.rs][online]
+//!
+//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
+
+use {IntoFuture, Poll};
+
+mod iter;
+#[allow(deprecated)]
+pub use self::iter::{iter, Iter};
+#[cfg(feature = "with-deprecated")]
+#[allow(deprecated)]
+pub use self::Iter as IterStream;
+mod iter_ok;
+pub use self::iter_ok::{iter_ok, IterOk};
+mod iter_result;
+pub use self::iter_result::{iter_result, IterResult};
+
+mod repeat;
+pub use self::repeat::{repeat, Repeat};
+
+mod and_then;
+mod chain;
+mod concat;
+mod empty;
+mod filter;
+mod filter_map;
+mod flatten;
+mod fold;
+mod for_each;
+mod from_err;
+mod fuse;
+mod future;
+mod inspect;
+mod inspect_err;
+mod map;
+mod map_err;
+mod merge;
+mod once;
+mod or_else;
+mod peek;
+mod poll_fn;
+mod select;
+mod skip;
+mod skip_while;
+mod take;
+mod take_while;
+mod then;
+mod unfold;
+mod zip;
+mod forward;
+pub use self::and_then::AndThen;
+pub use self::chain::Chain;
+#[allow(deprecated)]
+pub use self::concat::Concat;
+pub use self::concat::Concat2;
+pub use self::empty::{Empty, empty};
+pub use self::filter::Filter;
+pub use self::filter_map::FilterMap;
+pub use self::flatten::Flatten;
+pub use self::fold::Fold;
+pub use self::for_each::ForEach;
+pub use self::from_err::FromErr;
+pub use self::fuse::Fuse;
+pub use self::future::StreamFuture;
+pub use self::inspect::Inspect;
+pub use self::inspect_err::InspectErr;
+pub use self::map::Map;
+pub use self::map_err::MapErr;
+#[allow(deprecated)]
+pub use self::merge::{Merge, MergedItem};
+pub use self::once::{Once, once};
+pub use self::or_else::OrElse;
+pub use self::peek::Peekable;
+pub use self::poll_fn::{poll_fn, PollFn};
+pub use self::select::Select;
+pub use self::skip::Skip;
+pub use self::skip_while::SkipWhile;
+pub use self::take::Take;
+pub use self::take_while::TakeWhile;
+pub use self::then::Then;
+pub use self::unfold::{Unfold, unfold};
+pub use self::zip::Zip;
+pub use self::forward::Forward;
+use sink::{Sink};
+
+if_std! {
+ use std;
+
+ mod buffered;
+ mod buffer_unordered;
+ mod catch_unwind;
+ mod chunks;
+ mod collect;
+ mod wait;
+ mod channel;
+ mod split;
+ pub mod futures_unordered;
+ mod futures_ordered;
+ pub use self::buffered::Buffered;
+ pub use self::buffer_unordered::BufferUnordered;
+ pub use self::catch_unwind::CatchUnwind;
+ pub use self::chunks::Chunks;
+ pub use self::collect::Collect;
+ pub use self::wait::Wait;
+ pub use self::split::{SplitStream, SplitSink, ReuniteError};
+ pub use self::futures_unordered::FuturesUnordered;
+ pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
+
+ #[doc(hidden)]
+ #[cfg(feature = "with-deprecated")]
+ #[allow(deprecated)]
+ pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError};
+
+ /// A type alias for `Box<Stream + Send>`
+ #[doc(hidden)]
+ #[deprecated(note = "removed without replacement, recommended to use a \
+ local extension trait or function if needed, more \
+ details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
+ pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>;
+
+ impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ (**self).poll()
+ }
+ }
+}
+
+/// A stream of values, not all of which may have been produced yet.
+///
+/// `Stream` is a trait to represent any source of sequential events or items
+/// which acts like an iterator but long periods of time may pass between
+/// items. Like `Future` the methods of `Stream` never block and it is thus
+/// suitable for programming in an asynchronous fashion. This trait is very
+/// similar to the `Iterator` trait in the standard library where `Some` is
+/// used to signal elements of the stream and `None` is used to indicate that
+/// the stream is finished.
+///
+/// Like futures a stream has basic combinators to transform the stream, perform
+/// more work on each item, etc.
+///
+/// You can find more information/tutorials about streams [online at
+/// https://tokio.rs][online]
+///
+/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
+///
+/// # Streams as Futures
+///
+/// Any instance of `Stream` can also be viewed as a `Future` where the resolved
+/// value is the next item in the stream along with the rest of the stream. The
+/// `into_future` adaptor can be used here to convert any stream into a future
+/// for use with other future methods like `join` and `select`.
+///
+/// # Errors
+///
+/// Streams, like futures, can also model errors in their computation. All
+/// streams have an associated `Error` type like with futures. Currently as of
+/// the 0.1 release of this library an error on a stream **does not terminate
+/// the stream**. That is, after one error is received, another error may be
+/// received from the same stream (it's valid to keep polling).
+///
+/// This property of streams, however, is [being considered] for change in 0.2
+/// where an error on a stream is similar to `None`, it terminates the stream
+/// entirely. If one of these use cases suits you perfectly and not the other,
+/// please feel welcome to comment on [the issue][being considered]!
+///
+/// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206
+#[must_use = "streams do nothing unless polled"]
+pub trait Stream {
+ /// The type of item this stream will yield on success.
+ type Item;
+
+ /// The type of error this stream may generate.
+ type Error;
+
+ /// Attempt to pull out the next value of this stream, returning `None` if
+ /// the stream is finished.
+ ///
+ /// This method, like `Future::poll`, is the sole method of pulling out a
+ /// value from a stream. This method must also be run within the context of
+ /// a task typically and implementors of this trait must ensure that
+ /// implementations of this method do not block, as it may cause consumers
+ /// to behave badly.
+ ///
+ /// # Return value
+ ///
+ /// If `NotReady` is returned then this stream's next value is not ready
+ /// yet and implementations will ensure that the current task will be
+ /// notified when the next value may be ready. If `Some` is returned then
+ /// the returned value represents the next value on the stream. `Err`
+ /// indicates an error happened, while `Ok` indicates whether there was a
+ /// new item on the stream or whether the stream has terminated.
+ ///
+ /// # Panics
+ ///
+ /// Once a stream is finished, that is `Ready(None)` has been returned,
+ /// further calls to `poll` may result in a panic or other "bad behavior".
+ /// If this is difficult to guard against then the `fuse` adapter can be
+ /// used to ensure that `poll` always has well-defined semantics.
+ // TODO: more here
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
+
+ // TODO: should there also be a method like `poll` but doesn't return an
+ // item? basically just says "please make more progress internally"
+ // seems crucial for buffering to actually make any sense.
+
+ /// Creates an iterator which blocks the current thread until each item of
+ /// this stream is resolved.
+ ///
+ /// This method will consume ownership of this stream, returning an
+ /// implementation of a standard iterator. This iterator will *block the
+ /// current thread* on each call to `next` if the item in the stream isn't
+ /// ready yet.
+ ///
+ /// > **Note:** This method is not appropriate to call on event loops or
+ /// > similar I/O situations because it will prevent the event
+ /// > loop from making progress (this blocks the thread). This
+ /// > method should only be called when it's guaranteed that the
+ /// > blocking work associated with this stream will be completed
+ /// > by another thread.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Panics
+ ///
+ /// The returned iterator does not attempt to catch panics. If the `poll`
+ /// function panics, panics will be propagated to the caller of `next`.
+ #[cfg(feature = "use_std")]
+ fn wait(self) -> Wait<Self>
+ where Self: Sized
+ {
+ wait::new(self)
+ }
+
+ /// Convenience function for turning this stream into a trait object.
+ ///
+ /// This simply avoids the need to write `Box::new` and can often help with
+ /// type inference as well by always returning a trait object. Note that
+ /// this method requires the `Send` bound and returns a `BoxStream`, which
+ /// also encodes this. If you'd like to create a `Box<Stream>` without the
+ /// `Send` bound, then the `Box::new` function can be used instead.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::stream::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel(1);
+ /// let a: BoxStream<i32, ()> = rx.boxed();
+ /// ```
+ #[cfg(feature = "use_std")]
+ #[doc(hidden)]
+ #[deprecated(note = "removed without replacement, recommended to use a \
+ local extension trait or function if needed, more \
+ details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
+ #[allow(deprecated)]
+ fn boxed(self) -> BoxStream<Self::Item, Self::Error>
+ where Self: Sized + Send + 'static,
+ {
+ ::std::boxed::Box::new(self)
+ }
+
+ /// Converts this stream into a `Future`.
+ ///
+ /// A stream can be viewed as a future which will resolve to a pair containing
+ /// the next element of the stream plus the remaining stream. If the stream
+ /// terminates, then the next element is `None` and the remaining stream is
+ /// still passed back, to allow reclamation of its resources.
+ ///
+ /// The returned future can be used to compose streams and futures together by
+ /// placing everything into the "world of futures".
+ fn into_future(self) -> StreamFuture<Self>
+ where Self: Sized
+ {
+ future::new(self)
+ }
+
+ /// Converts a stream of type `T` to a stream of type `U`.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, and the callback will be executed inline with
+ /// calls to `poll`.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ /// let rx = rx.map(|x| x + 3);
+ /// ```
+ fn map<U, F>(self, f: F) -> Map<Self, F>
+ where F: FnMut(Self::Item) -> U,
+ Self: Sized
+ {
+ map::new(self, f)
+ }
+
+ /// Converts a stream of error type `T` to a stream of error type `U`.
+ ///
+ /// The provided closure is executed over all errors of this stream as
+ /// they are made available, and the callback will be executed inline with
+ /// calls to `poll`.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it, similar to the existing `map_err` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ /// let rx = rx.map_err(|()| 3);
+ /// ```
+ fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
+ where F: FnMut(Self::Error) -> U,
+ Self: Sized
+ {
+ map_err::new(self, f)
+ }
+
+ /// Filters the values produced by this stream according to the provided
+ /// predicate.
+ ///
+ /// As values of this stream are made available, the provided predicate will
+ /// be run against them. If the predicate returns `true` then the stream
+ /// will yield the value, but if the predicate returns `false` then the
+ /// value will be discarded and the next value will be produced.
+ ///
+ /// All errors are passed through without filtering in this combinator.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it, similar to the existing `filter` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ /// let evens = rx.filter(|x| x % 2 == 0);
+ /// ```
+ fn filter<F>(self, f: F) -> Filter<Self, F>
+ where F: FnMut(&Self::Item) -> bool,
+ Self: Sized
+ {
+ filter::new(self, f)
+ }
+
+ /// Filters the values produced by this stream while simultaneously mapping
+ /// them to a different type.
+ ///
+ /// As values of this stream are made available, the provided function will
+ /// be run on them. If the predicate returns `Some(e)` then the stream will
+ /// yield the value `e`, but if the predicate returns `None` then the next
+ /// value will be produced.
+ ///
+ /// All errors are passed through without filtering in this combinator.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it, similar to the existing `filter_map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ /// let evens_plus_one = rx.filter_map(|x| {
+ /// if x % 0 == 2 {
+ /// Some(x + 1)
+ /// } else {
+ /// None
+ /// }
+ /// });
+ /// ```
+ fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
+ where F: FnMut(Self::Item) -> Option<B>,
+ Self: Sized
+ {
+ filter_map::new(self, f)
+ }
+
+ /// Chain on a computation for when a value is ready, passing the resulting
+ /// item to the provided closure `f`.
+ ///
+ /// This function can be used to ensure a computation runs regardless of
+ /// the next value on the stream. The closure provided will be yielded a
+ /// `Result` once a value is ready, and the returned future will then be run
+ /// to completion to produce the next value on this stream.
+ ///
+ /// The returned value of the closure must implement the `IntoFuture` trait
+ /// and can represent some more work to be done before the composed stream
+ /// is finished. Note that the `Result` type implements the `IntoFuture`
+ /// trait so it is possible to simply alter the `Result` yielded to the
+ /// closure and return it.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ ///
+ /// let rx = rx.then(|result| {
+ /// match result {
+ /// Ok(e) => Ok(e + 3),
+ /// Err(()) => Err(4),
+ /// }
+ /// });
+ /// ```
+ fn then<F, U>(self, f: F) -> Then<Self, F, U>
+ where F: FnMut(Result<Self::Item, Self::Error>) -> U,
+ U: IntoFuture,
+ Self: Sized
+ {
+ then::new(self, f)
+ }
+
+ /// Chain on a computation for when a value is ready, passing the successful
+ /// results to the provided closure `f`.
+ ///
+ /// This function can be used to run a unit of work when the next successful
+ /// value on a stream is ready. The closure provided will be yielded a value
+ /// when ready, and the returned future will then be run to completion to
+ /// produce the next value on this stream.
+ ///
+ /// Any errors produced by this stream will not be passed to the closure,
+ /// and will be passed through.
+ ///
+ /// The returned value of the closure must implement the `IntoFuture` trait
+ /// and can represent some more work to be done before the composed stream
+ /// is finished. Note that the `Result` type implements the `IntoFuture`
+ /// trait so it is possible to simply alter the `Result` yielded to the
+ /// closure and return it.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ ///
+ /// To process the entire stream and return a single future representing
+ /// success or error, use `for_each` instead.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (_tx, rx) = mpsc::channel::<i32>(1);
+ ///
+ /// let rx = rx.and_then(|result| {
+ /// if result % 2 == 0 {
+ /// Ok(result)
+ /// } else {
+ /// Err(())
+ /// }
+ /// });
+ /// ```
+ fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U>
+ where F: FnMut(Self::Item) -> U,
+ U: IntoFuture<Error = Self::Error>,
+ Self: Sized
+ {
+ and_then::new(self, f)
+ }
+
+ /// Chain on a computation for when an error happens, passing the
+ /// erroneous result to the provided closure `f`.
+ ///
+ /// This function can be used to run a unit of work and attempt to recover from
+ /// an error if one happens. The closure provided will be yielded an error
+ /// when one appears, and the returned future will then be run to completion
+ /// to produce the next value on this stream.
+ ///
+ /// Any successful values produced by this stream will not be passed to the
+ /// closure, and will be passed through.
+ ///
+ /// The returned value of the closure must implement the `IntoFuture` trait
+ /// and can represent some more work to be done before the composed stream
+ /// is finished. Note that the `Result` type implements the `IntoFuture`
+ /// trait so it is possible to simply alter the `Result` yielded to the
+ /// closure and return it.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U>
+ where F: FnMut(Self::Error) -> U,
+ U: IntoFuture<Item = Self::Item>,
+ Self: Sized
+ {
+ or_else::new(self, f)
+ }
+
+ /// Collect all of the values of this stream into a vector, returning a
+ /// future representing the result of that computation.
+ ///
+ /// This combinator will collect all successful results of this stream and
+ /// collect them into a `Vec<Self::Item>`. If an error happens then all
+ /// collected elements will be dropped and the error will be returned.
+ ///
+ /// The returned future will be resolved whenever an error happens or when
+ /// the stream returns `Ok(None)`.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (mut tx, rx) = mpsc::channel(1);
+ ///
+ /// thread::spawn(|| {
+ /// for i in (0..5).rev() {
+ /// tx = tx.send(i + 1).wait().unwrap();
+ /// }
+ /// });
+ ///
+ /// let mut result = rx.collect();
+ /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1]));
+ /// ```
+ #[cfg(feature = "use_std")]
+ fn collect(self) -> Collect<Self>
+ where Self: Sized
+ {
+ collect::new(self)
+ }
+
+ /// Concatenate all results of a stream into a single extendable
+ /// destination, returning a future representing the end result.
+ ///
+ /// This combinator will extend the first item with the contents
+ /// of all the successful results of the stream. If the stream is
+ /// empty, the default value will be returned. If an error occurs,
+ /// all the results will be dropped and the error will be returned.
+ ///
+ /// The name `concat2` is an intermediate measure until the release of
+ /// futures 0.2, at which point it will be renamed back to `concat`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (mut tx, rx) = mpsc::channel(1);
+ ///
+ /// thread::spawn(move || {
+ /// for i in (0..3).rev() {
+ /// let n = i * 3;
+ /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
+ /// }
+ /// });
+ /// let result = rx.concat2();
+ /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
+ /// ```
+ fn concat2(self) -> Concat2<Self>
+ where Self: Sized,
+ Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
+ {
+ concat::new2(self)
+ }
+
+ /// Concatenate all results of a stream into a single extendable
+ /// destination, returning a future representing the end result.
+ ///
+ /// This combinator will extend the first item with the contents
+ /// of all the successful results of the stream. If an error occurs,
+ /// all the results will be dropped and the error will be returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (mut tx, rx) = mpsc::channel(1);
+ ///
+ /// thread::spawn(move || {
+ /// for i in (0..3).rev() {
+ /// let n = i * 3;
+ /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
+ /// }
+ /// });
+ /// let result = rx.concat();
+ /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// It's important to note that this function will panic if the stream
+ /// is empty, which is the reason for its deprecation.
+ #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")]
+ #[allow(deprecated)]
+ fn concat(self) -> Concat<Self>
+ where Self: Sized,
+ Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
+ {
+ concat::new(self)
+ }
+
+ /// Execute an accumulating computation over a stream, collecting all the
+ /// values into one final result.
+ ///
+ /// This combinator will collect all successful results of this stream
+ /// according to the closure provided. The initial state is also provided to
+ /// this method and then is returned again by each execution of the closure.
+ /// Once the entire stream has been exhausted the returned future will
+ /// resolve to this value.
+ ///
+ /// If an error happens then collected state will be dropped and the error
+ /// will be returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::stream;
+ /// use futures::future;
+ ///
+ /// let number_stream = stream::iter_ok::<_, ()>(0..6);
+ /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
+ /// assert_eq!(sum.wait(), Ok(15));
+ /// ```
+ fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
+ where F: FnMut(T, Self::Item) -> Fut,
+ Fut: IntoFuture<Item = T>,
+ Self::Error: From<Fut::Error>,
+ Self: Sized
+ {
+ fold::new(self, f, init)
+ }
+
+ /// Flattens a stream of streams into just one continuous stream.
+ ///
+ /// If this stream's elements are themselves streams then this combinator
+ /// will flatten out the entire stream to one long chain of elements. Any
+ /// errors are passed through without looking at them, but otherwise each
+ /// individual stream will get exhausted before moving on to the next.
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// use futures::prelude::*;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (tx1, rx1) = mpsc::channel::<i32>(1);
+ /// let (tx2, rx2) = mpsc::channel::<i32>(1);
+ /// let (tx3, rx3) = mpsc::channel(1);
+ ///
+ /// thread::spawn(|| {
+ /// tx1.send(1).wait().unwrap()
+ /// .send(2).wait().unwrap();
+ /// });
+ /// thread::spawn(|| {
+ /// tx2.send(3).wait().unwrap()
+ /// .send(4).wait().unwrap();
+ /// });
+ /// thread::spawn(|| {
+ /// tx3.send(rx1).wait().unwrap()
+ /// .send(rx2).wait().unwrap();
+ /// });
+ ///
+ /// let mut result = rx3.flatten().collect();
+ /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4]));
+ /// ```
+ fn flatten(self) -> Flatten<Self>
+ where Self::Item: Stream,
+ <Self::Item as Stream>::Error: From<Self::Error>,
+ Self: Sized
+ {
+ flatten::new(self)
+ }
+
+ /// Skip elements on this stream while the predicate provided resolves to
+ /// `true`.
+ ///
+ /// This function, like `Iterator::skip_while`, will skip elements on the
+ /// stream until the `predicate` resolves to `false`. Once one element
+ /// returns false all future elements will be returned from the underlying
+ /// stream.
+ fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R>
+ where P: FnMut(&Self::Item) -> R,
+ R: IntoFuture<Item=bool, Error=Self::Error>,
+ Self: Sized
+ {
+ skip_while::new(self, pred)
+ }
+
+ /// Take elements from this stream while the predicate provided resolves to
+ /// `true`.
+ ///
+ /// This function, like `Iterator::take_while`, will take elements from the
+ /// stream until the `predicate` resolves to `false`. Once one element
+ /// returns false it will always return that the stream is done.
+ fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
+ where P: FnMut(&Self::Item) -> R,
+ R: IntoFuture<Item=bool, Error=Self::Error>,
+ Self: Sized
+ {
+ take_while::new(self, pred)
+ }
+
+ /// Runs this stream to completion, executing the provided closure for each
+ /// element on the stream.
+ ///
+ /// The closure provided will be called for each item this stream resolves
+ /// to successfully, producing a future. That future will then be executed
+ /// to completion before moving on to the next item.
+ ///
+ /// The returned value is a `Future` where the `Item` type is `()` and
+ /// errors are otherwise threaded through. Any error on the stream or in the
+ /// closure will cause iteration to be halted immediately and the future
+ /// will resolve to that error.
+ ///
+ /// To process each item in the stream and produce another stream instead
+ /// of a single future, use `and_then` instead.
+ fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U>
+ where F: FnMut(Self::Item) -> U,
+ U: IntoFuture<Item=(), Error = Self::Error>,
+ Self: Sized
+ {
+ for_each::new(self, f)
+ }
+
+ /// Map this stream's error to any error implementing `From` for
+ /// this stream's `Error`, returning a new stream.
+ ///
+ /// This function does for streams what `try!` does for `Result`,
+ /// by letting the compiler infer the type of the resulting error.
+ /// Just as `map_err` above, this is useful for example to ensure
+ /// that streams have the same error type when used with
+ /// combinators.
+ ///
+ /// Note that this function consumes the receiving stream and returns a
+ /// wrapped version of it.
+ fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E>
+ where Self: Sized,
+ {
+ from_err::new(self)
+ }
+
+ /// Creates a new stream of at most `amt` items of the underlying stream.
+ ///
+ /// Once `amt` items have been yielded from this stream then it will always
+ /// return that the stream is done.
+ ///
+ /// # Errors
+ ///
+ /// Any errors yielded from underlying stream, before the desired amount of
+ /// items is reached, are passed through and do not affect the total number
+ /// of items taken.
+ fn take(self, amt: u64) -> Take<Self>
+ where Self: Sized
+ {
+ take::new(self, amt)
+ }
+
+ /// Creates a new stream which skips `amt` items of the underlying stream.
+ ///
+ /// Once `amt` items have been skipped from this stream then it will always
+ /// return the remaining items on this stream.
+ ///
+ /// # Errors
+ ///
+ /// All errors yielded from underlying stream are passed through and do not
+ /// affect the total number of items skipped.
+ fn skip(self, amt: u64) -> Skip<Self>
+ where Self: Sized
+ {
+ skip::new(self, amt)
+ }
+
+ /// Fuse a stream such that `poll` will never again be called once it has
+ /// finished.
+ ///
+ /// Currently once a stream has returned `None` from `poll` any further
+ /// calls could exhibit bad behavior such as block forever, panic, never
+ /// return, etc. If it is known that `poll` may be called after stream has
+ /// already finished, then this method can be used to ensure that it has
+ /// defined semantics.
+ ///
+ /// Once a stream has been `fuse`d and it finishes, then it will forever
+ /// return `None` from `poll`. This, unlike for the traits `poll` method,
+ /// is guaranteed.
+ ///
+ /// Also note that as soon as this stream returns `None` it will be dropped
+ /// to reclaim resources associated with it.
+ fn fuse(self) -> Fuse<Self>
+ where Self: Sized
+ {
+ fuse::new(self)
+ }
+
+ /// Borrows a stream, rather than consuming it.
+ ///
+ /// This is useful to allow applying stream adaptors while still retaining
+ /// ownership of the original stream.
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::stream;
+ /// use futures::future;
+ ///
+ /// let mut stream = stream::iter_ok::<_, ()>(1..5);
+ ///
+ /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait();
+ /// assert_eq!(sum, Ok(3));
+ ///
+ /// // You can use the stream again
+ /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait();
+ /// assert_eq!(sum, Ok(7));
+ /// ```
+ fn by_ref(&mut self) -> &mut Self
+ where Self: Sized
+ {
+ self
+ }
+
+ /// Catches unwinding panics while polling the stream.
+ ///
+ /// Caught panic (if any) will be the last element of the resulting stream.
+ ///
+ /// In general, panics within a stream can propagate all the way out to the
+ /// task level. This combinator makes it possible to halt unwinding within
+ /// the stream itself. It's most commonly used within task executors. This
+ /// method should not be used for error handling.
+ ///
+ /// Note that this method requires the `UnwindSafe` bound from the standard
+ /// library. This isn't always applied automatically, and the standard
+ /// library provides an `AssertUnwindSafe` wrapper type to apply it
+ /// after-the fact. To assist using this method, the `Stream` trait is also
+ /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use futures::prelude::*;
+ /// use futures::stream;
+ ///
+ /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
+ /// // panic on second element
+ /// let stream_panicking = stream.map(|o| o.unwrap());
+ /// let mut iter = stream_panicking.catch_unwind().wait();
+ ///
+ /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
+ /// assert!(iter.next().unwrap().is_err());
+ /// assert!(iter.next().is_none());
+ /// ```
+ #[cfg(feature = "use_std")]
+ fn catch_unwind(self) -> CatchUnwind<Self>
+ where Self: Sized + std::panic::UnwindSafe
+ {
+ catch_unwind::new(self)
+ }
+
+ /// An adaptor for creating a buffered list of pending futures.
+ ///
+ /// If this stream's item can be converted into a future, then this adaptor
+ /// will buffer up to at most `amt` futures and then return results in the
+ /// same order as the underlying stream. No more than `amt` futures will be
+ /// buffered at any point in time, and less than `amt` may also be buffered
+ /// depending on the state of each future.
+ ///
+ /// The returned stream will be a stream of each future's result, with
+ /// errors passed through whenever they occur.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "use_std")]
+ fn buffered(self, amt: usize) -> Buffered<Self>
+ where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
+ Self: Sized
+ {
+ buffered::new(self, amt)
+ }
+
+ /// An adaptor for creating a buffered list of pending futures (unordered).
+ ///
+ /// If this stream's item can be converted into a future, then this adaptor
+ /// will buffer up to `amt` futures and then return results in the order
+ /// in which they complete. No more than `amt` futures will be buffered at
+ /// any point in time, and less than `amt` may also be buffered depending on
+ /// the state of each future.
+ ///
+ /// The returned stream will be a stream of each future's result, with
+ /// errors passed through whenever they occur.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "use_std")]
+ fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
+ where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
+ Self: Sized
+ {
+ buffer_unordered::new(self, amt)
+ }
+
+ /// An adapter for merging the output of two streams.
+ ///
+ /// The merged stream produces items from one or both of the underlying
+ /// streams as they become available. Errors, however, are not merged: you
+ /// get at most one error at a time.
+ #[deprecated(note = "functionality provided by `select` now")]
+ #[allow(deprecated)]
+ fn merge<S>(self, other: S) -> Merge<Self, S>
+ where S: Stream<Error = Self::Error>,
+ Self: Sized,
+ {
+ merge::new(self, other)
+ }
+
+ /// An adapter for zipping two streams together.
+ ///
+ /// The zipped stream waits for both streams to produce an item, and then
+ /// returns that pair. If an error happens, then that error will be returned
+ /// immediately. If either stream ends then the zipped stream will also end.
+ fn zip<S>(self, other: S) -> Zip<Self, S>
+ where S: Stream<Error = Self::Error>,
+ Self: Sized,
+ {
+ zip::new(self, other)
+ }
+
+ /// Adapter for chaining two stream.
+ ///
+ /// The resulting stream emits elements from the first stream, and when
+ /// first stream reaches the end, emits the elements from the second stream.
+ ///
+ /// ```rust
+ /// use futures::prelude::*;
+ /// use futures::stream;
+ ///
+ /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
+ /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
+ /// let mut chain = stream1.chain(stream2).wait();
+ ///
+ /// assert_eq!(Some(Ok(10)), chain.next());
+ /// assert_eq!(Some(Err(false)), chain.next());
+ /// assert_eq!(Some(Err(true)), chain.next());
+ /// assert_eq!(Some(Ok(20)), chain.next());
+ /// assert_eq!(None, chain.next());
+ /// ```
+ fn chain<S>(self, other: S) -> Chain<Self, S>
+ where S: Stream<Item = Self::Item, Error = Self::Error>,
+ Self: Sized
+ {
+ chain::new(self, other)
+ }
+
+ /// Creates a new stream which exposes a `peek` method.
+ ///
+ /// Calling `peek` returns a reference to the next item in the stream.
+ fn peekable(self) -> Peekable<Self>
+ where Self: Sized
+ {
+ peek::new(self)
+ }
+
+ /// An adaptor for chunking up items of the stream inside a vector.
+ ///
+ /// This combinator will attempt to pull items from this stream and buffer
+ /// them into a local vector. At most `capacity` items will get buffered
+ /// before they're yielded from the returned stream.
+ ///
+ /// Note that the vectors returned from this iterator may not always have
+ /// `capacity` elements. If the underlying stream ended and only a partial
+ /// vector was created, it'll be returned. Additionally if an error happens
+ /// from the underlying stream then the currently buffered items will be
+ /// yielded.
+ ///
+ /// Errors are passed through the stream unbuffered.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic of `capacity` is zero.
+ #[cfg(feature = "use_std")]
+ fn chunks(self, capacity: usize) -> Chunks<Self>
+ where Self: Sized
+ {
+ chunks::new(self, capacity)
+ }
+
+ /// Creates a stream that selects the next element from either this stream
+ /// or the provided one, whichever is ready first.
+ ///
+ /// This combinator will attempt to pull items from both streams. Each
+ /// stream will be polled in a round-robin fashion, and whenever a stream is
+ /// ready to yield an item that item is yielded.
+ ///
+ /// The `select` function is similar to `merge` except that it requires both
+ /// streams to have the same item and error types.
+ ///
+ /// Error are passed through from either stream.
+ fn select<S>(self, other: S) -> Select<Self, S>
+ where S: Stream<Item = Self::Item, Error = Self::Error>,
+ Self: Sized,
+ {
+ select::new(self, other)
+ }
+
+ /// A future that completes after the given stream has been fully processed
+ /// into the sink, including flushing.
+ ///
+ /// This future will drive the stream to keep producing items until it is
+ /// exhausted, sending each item to the sink. It will complete once both the
+ /// stream is exhausted, and the sink has fully processed received item,
+ /// flushed successfully, and closed successfully.
+ ///
+ /// Doing `stream.forward(sink)` is roughly equivalent to
+ /// `sink.send_all(stream)`. The returned future will exhaust all items from
+ /// `self`, sending them all to `sink`. Furthermore the `sink` will be
+ /// closed and flushed.
+ ///
+ /// On completion, the pair `(stream, sink)` is returned.
+ fn forward<S>(self, sink: S) -> Forward<Self, S>
+ where S: Sink<SinkItem = Self::Item>,
+ Self::Error: From<S::SinkError>,
+ Self: Sized
+ {
+ forward::new(self, sink)
+ }
+
+ /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
+ /// objects.
+ ///
+ /// This can be useful when you want to split ownership between tasks, or
+ /// allow direct interaction between the two objects (e.g. via
+ /// `Sink::send_all`).
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "use_std")]
+ fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
+ where Self: super::sink::Sink + Sized
+ {
+ split::split(self)
+ }
+
+ /// Do something with each item of this stream, afterwards passing it on.
+ ///
+ /// This is similar to the `Iterator::inspect` method in the standard
+ /// library where it allows easily inspecting each value as it passes
+ /// through the stream, for example to debug what's going on.
+ fn inspect<F>(self, f: F) -> Inspect<Self, F>
+ where F: FnMut(&Self::Item),
+ Self: Sized,
+ {
+ inspect::new(self, f)
+ }
+
+ /// Do something with the error of this stream, afterwards passing it on.
+ ///
+ /// This is similar to the `Stream::inspect` method where it allows
+ /// easily inspecting the error as it passes through the stream, for
+ /// example to debug what's going on.
+ fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
+ where F: FnMut(&Self::Error),
+ Self: Sized,
+ {
+ inspect_err::new(self, f)
+ }
+}
+
+impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ (**self).poll()
+ }
+}
+
+/// Converts a list of futures into a `Stream` of results from the futures.
+///
+/// This function will take an list of futures (e.g. a vector, an iterator,
+/// etc), and return a stream. The stream will yield items as they become
+/// available on the futures internally, in the order that they become
+/// available. This function is similar to `buffer_unordered` in that it may
+/// return items in a different order than in the list specified.
+///
+/// Note that the returned set can also be used to dynamically push more
+/// futures into the set as they become available.
+#[cfg(feature = "use_std")]
+pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
+ where I: IntoIterator,
+ I::Item: IntoFuture
+{
+ let mut set = FuturesUnordered::new();
+
+ for future in futures {
+ set.push(future.into_future());
+ }
+
+ return set
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/once.rs b/third_party/rust/futures-0.1.31/src/stream/once.rs
new file mode 100644
index 0000000000..24fb327bd6
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/once.rs
@@ -0,0 +1,35 @@
+use {Poll, Async};
+use stream::Stream;
+
+/// A stream which emits single element and then EOF.
+///
+/// This stream will never block and is always ready.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Once<T, E>(Option<Result<T, E>>);
+
+/// Creates a stream of single element
+///
+/// ```rust
+/// use futures::*;
+///
+/// let mut stream = stream::once::<(), _>(Err(17));
+/// assert_eq!(Err(17), stream.poll());
+/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
+/// ```
+pub fn once<T, E>(item: Result<T, E>) -> Once<T, E> {
+ Once(Some(item))
+}
+
+impl<T, E> Stream for Once<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ match self.0.take() {
+ Some(Ok(e)) => Ok(Async::Ready(Some(e))),
+ Some(Err(e)) => Err(e),
+ None => Ok(Async::Ready(None)),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/or_else.rs b/third_party/rust/futures-0.1.31/src/stream/or_else.rs
new file mode 100644
index 0000000000..2d15fa2b70
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/or_else.rs
@@ -0,0 +1,80 @@
+use {IntoFuture, Future, Poll, Async};
+use stream::Stream;
+
+/// A stream combinator which chains a computation onto errors produced by a
+/// stream.
+///
+/// This structure is produced by the `Stream::or_else` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct OrElse<S, F, U>
+ where U: IntoFuture,
+{
+ stream: S,
+ future: Option<U::Future>,
+ f: F,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> OrElse<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Error) -> U,
+ U: IntoFuture<Item=S::Item>,
+{
+ OrElse {
+ stream: s,
+ future: None,
+ f: f,
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F, U> ::sink::Sink for OrElse<S, F, U>
+ where S: ::sink::Sink, U: IntoFuture
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, U> Stream for OrElse<S, F, U>
+ where S: Stream,
+ F: FnMut(S::Error) -> U,
+ U: IntoFuture<Item=S::Item>,
+{
+ type Item = S::Item;
+ type Error = U::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, U::Error> {
+ if self.future.is_none() {
+ let item = match self.stream.poll() {
+ Ok(Async::Ready(e)) => return Ok(Async::Ready(e)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => e,
+ };
+ self.future = Some((self.f)(item).into_future());
+ }
+ assert!(self.future.is_some());
+ match self.future.as_mut().unwrap().poll() {
+ Ok(Async::Ready(e)) => {
+ self.future = None;
+ Ok(Async::Ready(Some(e)))
+ }
+ Err(e) => {
+ self.future = None;
+ Err(e)
+ }
+ Ok(Async::NotReady) => Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/peek.rs b/third_party/rust/futures-0.1.31/src/stream/peek.rs
new file mode 100644
index 0000000000..96e657663b
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/peek.rs
@@ -0,0 +1,74 @@
+use {Async, Poll};
+use stream::{Stream, Fuse};
+
+/// A `Stream` that implements a `peek` method.
+///
+/// The `peek` method can be used to retrieve a reference
+/// to the next `Stream::Item` if available. A subsequent
+/// call to `poll` will return the owned item.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Peekable<S: Stream> {
+ stream: Fuse<S>,
+ peeked: Option<S::Item>,
+}
+
+
+pub fn new<S: Stream>(stream: S) -> Peekable<S> {
+ Peekable {
+ stream: stream.fuse(),
+ peeked: None
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Peekable<S>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S: Stream> Stream for Peekable<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if let Some(item) = self.peeked.take() {
+ return Ok(Async::Ready(Some(item)))
+ }
+ self.stream.poll()
+ }
+}
+
+
+impl<S: Stream> Peekable<S> {
+ /// Peek retrieves a reference to the next item in the stream.
+ ///
+ /// This method polls the underlying stream and return either a reference
+ /// to the next item if the stream is ready or passes through any errors.
+ pub fn peek(&mut self) -> Poll<Option<&S::Item>, S::Error> {
+ if self.peeked.is_some() {
+ return Ok(Async::Ready(self.peeked.as_ref()))
+ }
+ match try_ready!(self.poll()) {
+ None => Ok(Async::Ready(None)),
+ Some(item) => {
+ self.peeked = Some(item);
+ Ok(Async::Ready(self.peeked.as_ref()))
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs b/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs
new file mode 100644
index 0000000000..fbc7df0844
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs
@@ -0,0 +1,49 @@
+//! Definition of the `PollFn` combinator
+
+use {Stream, Poll};
+
+/// A stream which adapts a function returning `Poll`.
+///
+/// Created by the `poll_fn` function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct PollFn<F> {
+ inner: F,
+}
+
+/// Creates a new stream wrapping around a function returning `Poll`.
+///
+/// Polling the returned stream delegates to the wrapped function.
+///
+/// # Examples
+///
+/// ```
+/// use futures::stream::poll_fn;
+/// use futures::{Async, Poll};
+///
+/// let mut counter = 1usize;
+///
+/// let read_stream = poll_fn(move || -> Poll<Option<String>, std::io::Error> {
+/// if counter == 0 { return Ok(Async::Ready(None)); }
+/// counter -= 1;
+/// Ok(Async::Ready(Some("Hello, World!".to_owned())))
+/// });
+/// ```
+pub fn poll_fn<T, E, F>(f: F) -> PollFn<F>
+where
+ F: FnMut() -> Poll<Option<T>, E>,
+{
+ PollFn { inner: f }
+}
+
+impl<T, E, F> Stream for PollFn<F>
+where
+ F: FnMut() -> Poll<Option<T>, E>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ (self.inner)()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/repeat.rs b/third_party/rust/futures-0.1.31/src/stream/repeat.rs
new file mode 100644
index 0000000000..e3cb5ff49c
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/repeat.rs
@@ -0,0 +1,53 @@
+use core::marker;
+
+
+use stream::Stream;
+
+use {Async, Poll};
+
+
+/// Stream that produces the same element repeatedly.
+///
+/// This structure is created by the `stream::repeat` function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Repeat<T, E>
+ where T: Clone
+{
+ item: T,
+ error: marker::PhantomData<E>,
+}
+
+/// Create a stream which produces the same item repeatedly.
+///
+/// Stream never produces an error or EOF. Note that you likely want to avoid
+/// usage of `collect` or such on the returned stream as it will exhaust
+/// available memory as it tries to just fill up all RAM.
+///
+/// ```rust
+/// use futures::*;
+///
+/// let mut stream = stream::repeat::<_, bool>(10);
+/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll());
+/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll());
+/// ```
+pub fn repeat<T, E>(item: T) -> Repeat<T, E>
+ where T: Clone
+{
+ Repeat {
+ item: item,
+ error: marker::PhantomData,
+ }
+}
+
+impl<T, E> Stream for Repeat<T, E>
+ where T: Clone
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(Async::Ready(Some(self.item.clone())))
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/select.rs b/third_party/rust/futures-0.1.31/src/stream/select.rs
new file mode 100644
index 0000000000..ae6b66cf14
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/select.rs
@@ -0,0 +1,64 @@
+use {Poll, Async};
+use stream::{Stream, Fuse};
+
+/// An adapter for merging the output of two streams.
+///
+/// The merged stream produces items from either of the underlying streams as
+/// they become available, and the streams are polled in a round-robin fashion.
+/// Errors, however, are not merged: you get at most one error at a time.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Select<S1, S2> {
+ stream1: Fuse<S1>,
+ stream2: Fuse<S2>,
+ flag: bool,
+}
+
+pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
+ where S1: Stream,
+ S2: Stream<Item = S1::Item, Error = S1::Error>
+{
+ Select {
+ stream1: stream1.fuse(),
+ stream2: stream2.fuse(),
+ flag: false,
+ }
+}
+
+impl<S1, S2> Stream for Select<S1, S2>
+ where S1: Stream,
+ S2: Stream<Item = S1::Item, Error = S1::Error>
+{
+ type Item = S1::Item;
+ type Error = S1::Error;
+
+ fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
+ let (a, b) = if self.flag {
+ (&mut self.stream2 as &mut Stream<Item=_, Error=_>,
+ &mut self.stream1 as &mut Stream<Item=_, Error=_>)
+ } else {
+ (&mut self.stream1 as &mut Stream<Item=_, Error=_>,
+ &mut self.stream2 as &mut Stream<Item=_, Error=_>)
+ };
+ self.flag = !self.flag;
+
+ let a_done = match a.poll()? {
+ Async::Ready(Some(item)) => return Ok(Some(item).into()),
+ Async::Ready(None) => true,
+ Async::NotReady => false,
+ };
+
+ match b.poll()? {
+ Async::Ready(Some(item)) => {
+ // If the other stream isn't finished yet, give them a chance to
+ // go first next time as we pulled something off `b`.
+ if !a_done {
+ self.flag = !self.flag;
+ }
+ Ok(Some(item).into())
+ }
+ Async::Ready(None) if a_done => Ok(None.into()),
+ Async::Ready(None) | Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/skip.rs b/third_party/rust/futures-0.1.31/src/stream/skip.rs
new file mode 100644
index 0000000000..a1d7b49797
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/skip.rs
@@ -0,0 +1,84 @@
+use {Poll, Async};
+use stream::Stream;
+
+/// A stream combinator which skips a number of elements before continuing.
+///
+/// This structure is produced by the `Stream::skip` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Skip<S> {
+ stream: S,
+ remaining: u64,
+}
+
+pub fn new<S>(s: S, amt: u64) -> Skip<S>
+ where S: Stream,
+{
+ Skip {
+ stream: s,
+ remaining: amt,
+ }
+}
+
+impl<S> Skip<S> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Skip<S>
+ where S: ::sink::Sink
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S> Stream for Skip<S>
+ where S: Stream,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ while self.remaining > 0 {
+ match try_ready!(self.stream.poll()) {
+ Some(_) => self.remaining -= 1,
+ None => return Ok(Async::Ready(None)),
+ }
+ }
+
+ self.stream.poll()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/skip_while.rs b/third_party/rust/futures-0.1.31/src/stream/skip_while.rs
new file mode 100644
index 0000000000..b571996c24
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/skip_while.rs
@@ -0,0 +1,113 @@
+use {Async, Poll, IntoFuture, Future};
+use stream::Stream;
+
+/// A stream combinator which skips elements of a stream while a predicate
+/// holds.
+///
+/// This structure is produced by the `Stream::skip_while` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct SkipWhile<S, P, R> where S: Stream, R: IntoFuture {
+ stream: S,
+ pred: P,
+ pending: Option<(R::Future, S::Item)>,
+ done_skipping: bool,
+}
+
+pub fn new<S, P, R>(s: S, p: P) -> SkipWhile<S, P, R>
+ where S: Stream,
+ P: FnMut(&S::Item) -> R,
+ R: IntoFuture<Item=bool, Error=S::Error>,
+{
+ SkipWhile {
+ stream: s,
+ pred: p,
+ pending: None,
+ done_skipping: false,
+ }
+}
+
+impl<S, P, R> SkipWhile<S, P, R> where S: Stream, R: IntoFuture {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, P, R> ::sink::Sink for SkipWhile<S, P, R>
+ where S: ::sink::Sink + Stream, R: IntoFuture
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, P, R> Stream for SkipWhile<S, P, R>
+ where S: Stream,
+ P: FnMut(&S::Item) -> R,
+ R: IntoFuture<Item=bool, Error=S::Error>,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ if self.done_skipping {
+ return self.stream.poll();
+ }
+
+ loop {
+ if self.pending.is_none() {
+ let item = match try_ready!(self.stream.poll()) {
+ Some(e) => e,
+ None => return Ok(Async::Ready(None)),
+ };
+ self.pending = Some(((self.pred)(&item).into_future(), item));
+ }
+
+ assert!(self.pending.is_some());
+ match self.pending.as_mut().unwrap().0.poll() {
+ Ok(Async::Ready(true)) => self.pending = None,
+ Ok(Async::Ready(false)) => {
+ let (_, item) = self.pending.take().unwrap();
+ self.done_skipping = true;
+ return Ok(Async::Ready(Some(item)))
+ }
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => {
+ self.pending = None;
+ return Err(e)
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/split.rs b/third_party/rust/futures-0.1.31/src/stream/split.rs
new file mode 100644
index 0000000000..ddaa52997d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/split.rs
@@ -0,0 +1,105 @@
+use std::any::Any;
+use std::error::Error;
+use std::fmt;
+
+use {StartSend, Sink, Stream, Poll, Async, AsyncSink};
+use sync::BiLock;
+
+/// A `Stream` part of the split pair
+#[derive(Debug)]
+pub struct SplitStream<S>(BiLock<S>);
+
+impl<S> SplitStream<S> {
+ /// Attempts to put the two "halves" of a split `Stream + Sink` back
+ /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
+ /// a matching pair originating from the same call to `Stream::split`.
+ pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
+ other.reunite(self)
+ }
+}
+
+impl<S: Stream> Stream for SplitStream<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.poll(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+/// A `Sink` part of the split pair
+#[derive(Debug)]
+pub struct SplitSink<S>(BiLock<S>);
+
+impl<S> SplitSink<S> {
+ /// Attempts to put the two "halves" of a split `Stream + Sink` back
+ /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
+ /// a matching pair originating from the same call to `Stream::split`.
+ pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> {
+ self.0.reunite(other.0).map_err(|err| {
+ ReuniteError(SplitSink(err.0), SplitStream(err.1))
+ })
+ }
+}
+
+impl<S: Sink> Sink for SplitSink<S> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem)
+ -> StartSend<S::SinkItem, S::SinkError>
+ {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.start_send(item),
+ Async::NotReady => Ok(AsyncSink::NotReady(item)),
+ }
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.poll_complete(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.close(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) {
+ let (a, b) = BiLock::new(s);
+ let read = SplitStream(a);
+ let write = SplitSink(b);
+ (write, read)
+}
+
+/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
+/// of a `Stream + Split`, and thus could not be `reunite`d.
+pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>);
+
+impl<T> fmt::Debug for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("ReuniteError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair")
+ }
+}
+
+impl<T: Any> Error for ReuniteError<T> {
+ fn description(&self) -> &str {
+ "tried to reunite a SplitStream and SplitSink that don't form a pair"
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/take.rs b/third_party/rust/futures-0.1.31/src/stream/take.rs
new file mode 100644
index 0000000000..0ca68496eb
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/take.rs
@@ -0,0 +1,86 @@
+use {Async, Poll};
+use stream::Stream;
+
+/// A stream combinator which returns a maximum number of elements.
+///
+/// This structure is produced by the `Stream::take` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Take<S> {
+ stream: S,
+ remaining: u64,
+}
+
+pub fn new<S>(s: S, amt: u64) -> Take<S>
+ where S: Stream,
+{
+ Take {
+ stream: s,
+ remaining: amt,
+ }
+}
+
+impl<S> Take<S> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S> ::sink::Sink for Take<S>
+ where S: ::sink::Sink + Stream
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S> Stream for Take<S>
+ where S: Stream,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ if self.remaining == 0 {
+ Ok(Async::Ready(None))
+ } else {
+ let next = try_ready!(self.stream.poll());
+ match next {
+ Some(_) => self.remaining -= 1,
+ None => self.remaining = 0,
+ }
+ Ok(Async::Ready(next))
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/take_while.rs b/third_party/rust/futures-0.1.31/src/stream/take_while.rs
new file mode 100644
index 0000000000..732ae855de
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/take_while.rs
@@ -0,0 +1,113 @@
+use {Async, Poll, IntoFuture, Future};
+use stream::Stream;
+
+/// A stream combinator which takes elements from a stream while a predicate
+/// holds.
+///
+/// This structure is produced by the `Stream::take_while` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct TakeWhile<S, P, R> where S: Stream, R: IntoFuture {
+ stream: S,
+ pred: P,
+ pending: Option<(R::Future, S::Item)>,
+ done_taking: bool,
+}
+
+pub fn new<S, P, R>(s: S, p: P) -> TakeWhile<S, P, R>
+ where S: Stream,
+ P: FnMut(&S::Item) -> R,
+ R: IntoFuture<Item=bool, Error=S::Error>,
+{
+ TakeWhile {
+ stream: s,
+ pred: p,
+ pending: None,
+ done_taking: false,
+ }
+}
+
+impl<S, P, R> TakeWhile<S, P, R> where S: Stream, R: IntoFuture {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, P, R> ::sink::Sink for TakeWhile<S, P, R>
+ where S: ::sink::Sink + Stream, R: IntoFuture
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, P, R> Stream for TakeWhile<S, P, R>
+ where S: Stream,
+ P: FnMut(&S::Item) -> R,
+ R: IntoFuture<Item=bool, Error=S::Error>,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ if self.done_taking {
+ return Ok(Async::Ready(None));
+ }
+
+ if self.pending.is_none() {
+ let item = match try_ready!(self.stream.poll()) {
+ Some(e) => e,
+ None => return Ok(Async::Ready(None)),
+ };
+ self.pending = Some(((self.pred)(&item).into_future(), item));
+ }
+
+ assert!(self.pending.is_some());
+ match self.pending.as_mut().unwrap().0.poll() {
+ Ok(Async::Ready(true)) => {
+ let (_, item) = self.pending.take().unwrap();
+ Ok(Async::Ready(Some(item)))
+ },
+ Ok(Async::Ready(false)) => {
+ self.done_taking = true;
+ Ok(Async::Ready(None))
+ }
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(e) => {
+ self.pending = None;
+ Err(e)
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/then.rs b/third_party/rust/futures-0.1.31/src/stream/then.rs
new file mode 100644
index 0000000000..cab338e922
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/then.rs
@@ -0,0 +1,81 @@
+use {Async, IntoFuture, Future, Poll};
+use stream::Stream;
+
+/// A stream combinator which chains a computation onto each item produced by a
+/// stream.
+///
+/// This structure is produced by the `Stream::then` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Then<S, F, U>
+ where U: IntoFuture,
+{
+ stream: S,
+ future: Option<U::Future>,
+ f: F,
+}
+
+pub fn new<S, F, U>(s: S, f: F) -> Then<S, F, U>
+ where S: Stream,
+ F: FnMut(Result<S::Item, S::Error>) -> U,
+ U: IntoFuture,
+{
+ Then {
+ stream: s,
+ future: None,
+ f: f,
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<S, F, U> ::sink::Sink for Then<S, F, U>
+ where S: ::sink::Sink, U: IntoFuture,
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
+ self.stream.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.stream.close()
+ }
+}
+
+impl<S, F, U> Stream for Then<S, F, U>
+ where S: Stream,
+ F: FnMut(Result<S::Item, S::Error>) -> U,
+ U: IntoFuture,
+{
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn poll(&mut self) -> Poll<Option<U::Item>, U::Error> {
+ if self.future.is_none() {
+ let item = match self.stream.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+ Ok(Async::Ready(Some(e))) => Ok(e),
+ Err(e) => Err(e),
+ };
+ self.future = Some((self.f)(item).into_future());
+ }
+ assert!(self.future.is_some());
+ match self.future.as_mut().unwrap().poll() {
+ Ok(Async::Ready(e)) => {
+ self.future = None;
+ Ok(Async::Ready(Some(e)))
+ }
+ Err(e) => {
+ self.future = None;
+ Err(e)
+ }
+ Ok(Async::NotReady) => Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/unfold.rs b/third_party/rust/futures-0.1.31/src/stream/unfold.rs
new file mode 100644
index 0000000000..ac427b8c3b
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/unfold.rs
@@ -0,0 +1,114 @@
+use core::mem;
+
+use {Future, IntoFuture, Async, Poll};
+use stream::Stream;
+
+/// Creates a `Stream` from a seed and a closure returning a `Future`.
+///
+/// This function is the dual for the `Stream::fold()` adapter: while
+/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a
+/// `Stream` from a seed value.
+///
+/// `unfold()` will call the provided closure with the provided seed, then wait
+/// for the returned `Future` to complete with `(a, b)`. It will then yield the
+/// value `a`, and use `b` as the next internal state.
+///
+/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()`
+/// will stop producing items and return `Ok(Async::Ready(None))` in future
+/// calls to `poll()`.
+///
+/// In case of error generated by the returned `Future`, the error will be
+/// returned by the `Stream`. The `Stream` will then yield
+/// `Ok(Async::Ready(None))` in future calls to `poll()`.
+///
+/// This function can typically be used when wanting to go from the "world of
+/// futures" to the "world of streams": the provided closure can build a
+/// `Future` using other library functions working on futures, and `unfold()`
+/// will turn it into a `Stream` by repeating the operation.
+///
+/// # Example
+///
+/// ```rust
+/// use futures::stream::{self, Stream};
+/// use futures::future::{self, Future};
+///
+/// let mut stream = stream::unfold(0, |state| {
+/// if state <= 2 {
+/// let next_state = state + 1;
+/// let yielded = state * 2;
+/// let fut = future::ok::<_, u32>((yielded, next_state));
+/// Some(fut)
+/// } else {
+/// None
+/// }
+/// });
+///
+/// let result = stream.collect().wait();
+/// assert_eq!(result, Ok(vec![0, 2, 4]));
+/// ```
+pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
+ where F: FnMut(T) -> Option<Fut>,
+ Fut: IntoFuture<Item = (It, T)>,
+{
+ Unfold {
+ f: f,
+ state: State::Ready(init),
+ }
+}
+
+/// A stream which creates futures, polls them and return their result
+///
+/// This stream is returned by the `futures::stream::unfold` method
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Unfold<T, F, Fut> where Fut: IntoFuture {
+ f: F,
+ state: State<T, Fut::Future>,
+}
+
+impl <T, F, Fut, It> Stream for Unfold<T, F, Fut>
+ where F: FnMut(T) -> Option<Fut>,
+ Fut: IntoFuture<Item = (It, T)>,
+{
+ type Item = It;
+ type Error = Fut::Error;
+
+ fn poll(&mut self) -> Poll<Option<It>, Fut::Error> {
+ loop {
+ match mem::replace(&mut self.state, State::Empty) {
+ // State::Empty may happen if the future returned an error
+ State::Empty => { return Ok(Async::Ready(None)); }
+ State::Ready(state) => {
+ match (self.f)(state) {
+ Some(fut) => { self.state = State::Processing(fut.into_future()); }
+ None => { return Ok(Async::Ready(None)); }
+ }
+ }
+ State::Processing(mut fut) => {
+ match fut.poll()? {
+ Async:: Ready((item, next_state)) => {
+ self.state = State::Ready(next_state);
+ return Ok(Async::Ready(Some(item)));
+ }
+ Async::NotReady => {
+ self.state = State::Processing(fut);
+ return Ok(Async::NotReady);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+enum State<T, F> where F: Future {
+ /// Placeholder state when doing work, or when the returned Future generated an error
+ Empty,
+
+ /// Ready to generate new future; current internal state is the `T`
+ Ready(T),
+
+ /// Working on a future generated previously
+ Processing(F),
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/wait.rs b/third_party/rust/futures-0.1.31/src/stream/wait.rs
new file mode 100644
index 0000000000..80acb6c2a6
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/wait.rs
@@ -0,0 +1,53 @@
+use stream::Stream;
+use executor;
+
+/// A stream combinator which converts an asynchronous stream to a **blocking
+/// iterator**.
+///
+/// Created by the `Stream::wait` method, this function transforms any stream
+/// into a standard iterator. This is implemented by blocking the current thread
+/// while items on the underlying stream aren't ready yet.
+#[must_use = "iterators do nothing unless advanced"]
+#[derive(Debug)]
+pub struct Wait<S> {
+ stream: executor::Spawn<S>,
+}
+
+impl<S> Wait<S> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &S {
+ self.stream.get_ref()
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.stream.get_mut()
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.stream.into_inner()
+ }
+}
+
+pub fn new<S: Stream>(s: S) -> Wait<S> {
+ Wait {
+ stream: executor::spawn(s),
+ }
+}
+
+impl<S: Stream> Iterator for Wait<S> {
+ type Item = Result<S::Item, S::Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.stream.wait_stream()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/stream/zip.rs b/third_party/rust/futures-0.1.31/src/stream/zip.rs
new file mode 100644
index 0000000000..17e3c69ffe
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/zip.rs
@@ -0,0 +1,59 @@
+use {Async, Poll};
+use stream::{Stream, Fuse};
+
+/// An adapter for merging the output of two streams.
+///
+/// The merged stream produces items from one or both of the underlying
+/// streams as they become available. Errors, however, are not merged: you
+#[derive(Debug)]
+/// get at most one error at a time.
+#[must_use = "streams do nothing unless polled"]
+pub struct Zip<S1: Stream, S2: Stream> {
+ stream1: Fuse<S1>,
+ stream2: Fuse<S2>,
+ queued1: Option<S1::Item>,
+ queued2: Option<S2::Item>,
+}
+
+pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Zip<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ Zip {
+ stream1: stream1.fuse(),
+ stream2: stream2.fuse(),
+ queued1: None,
+ queued2: None,
+ }
+}
+
+impl<S1, S2> Stream for Zip<S1, S2>
+ where S1: Stream, S2: Stream<Error = S1::Error>
+{
+ type Item = (S1::Item, S2::Item);
+ type Error = S1::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.queued1.is_none() {
+ match self.stream1.poll()? {
+ Async::Ready(Some(item1)) => self.queued1 = Some(item1),
+ Async::Ready(None) | Async::NotReady => {}
+ }
+ }
+ if self.queued2.is_none() {
+ match self.stream2.poll()? {
+ Async::Ready(Some(item2)) => self.queued2 = Some(item2),
+ Async::Ready(None) | Async::NotReady => {}
+ }
+ }
+
+ if self.queued1.is_some() && self.queued2.is_some() {
+ let pair = (self.queued1.take().unwrap(),
+ self.queued2.take().unwrap());
+ Ok(Async::Ready(Some(pair)))
+ } else if self.stream1.is_done() || self.stream2.is_done() {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}