diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream')
45 files changed, 5548 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/and_then.rs b/third_party/rust/futures-0.1.31/src/stream/and_then.rs new file mode 100644 index 0000000000..1fac8b952d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/and_then.rs @@ -0,0 +1,106 @@ +use {IntoFuture, Future, Poll, Async}; +use stream::Stream; + +/// A stream combinator which chains a computation onto values produced by a +/// stream. +/// +/// This structure is produced by the `Stream::and_then` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct AndThen<S, F, U> + where U: IntoFuture, +{ + stream: S, + future: Option<U::Future>, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> AndThen<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Error=S::Error>, +{ + AndThen { + stream: s, + future: None, + f: f, + } +} + +impl<S, F, U> AndThen<S, F, U> + where U: IntoFuture, +{ + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F, U: IntoFuture> ::sink::Sink for AndThen<S, F, U> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for AndThen<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Error=S::Error>, +{ + type Item = U::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<U::Item>, S::Error> { + if self.future.is_none() { + let item = match try_ready!(self.stream.poll()) { + None => return Ok(Async::Ready(None)), + Some(e) => e, + }; + self.future = Some((self.f)(item).into_future()); + } + assert!(self.future.is_some()); + match self.future.as_mut().unwrap().poll() { + Ok(Async::Ready(e)) => { + self.future = None; + Ok(Async::Ready(Some(e))) + } + Err(e) => { + self.future = None; + Err(e) + } + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs b/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs new file mode 100644 index 0000000000..3011108cf3 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/buffer_unordered.rs @@ -0,0 +1,130 @@ +use std::fmt; + +use {Async, IntoFuture, Poll}; +use stream::{Stream, Fuse, FuturesUnordered}; + +/// An adaptor for a stream of futures to execute the futures concurrently, if +/// possible, delivering results as they become available. +/// +/// This adaptor will buffer up a list of pending futures, and then return their +/// results in the order that they complete. This is created by the +/// `Stream::buffer_unordered` method. +#[must_use = "streams do nothing unless polled"] +pub struct BufferUnordered<S> + where S: Stream, + S::Item: IntoFuture, +{ + stream: Fuse<S>, + queue: FuturesUnordered<<S::Item as IntoFuture>::Future>, + max: usize, +} + +impl<S> fmt::Debug for BufferUnordered<S> + where S: Stream + fmt::Debug, + S::Item: IntoFuture, + <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BufferUnordered") + .field("stream", &self.stream) + .field("queue", &self.queue) + .field("max", &self.max) + .finish() + } +} + +pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + BufferUnordered { + stream: super::fuse::new(s), + queue: FuturesUnordered::new(), + max: amt, + } +} + +impl<S> BufferUnordered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + self.stream.get_ref() + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + self.stream.get_mut() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +impl<S> Stream for BufferUnordered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + type Item = <S::Item as IntoFuture>::Item; + type Error = <S as Stream>::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First up, try to spawn off as many futures as possible by filling up + // our slab of futures. + while self.queue.len() < self.max { + let future = match self.stream.poll()? { + Async::Ready(Some(s)) => s.into_future(), + Async::Ready(None) | + Async::NotReady => break, + }; + + self.queue.push(future); + } + + // Try polling a new future + if let Some(val) = try_ready!(self.queue.poll()) { + return Ok(Async::Ready(Some(val))); + } + + // If we've gotten this far, then there are no events for us to process + // and nothing was ready, so figure out if we're not done yet or if + // we've reached the end. + if self.stream.is_done() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for BufferUnordered<S> + where S: ::sink::Sink + Stream, + S::Item: IntoFuture, +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/buffered.rs b/third_party/rust/futures-0.1.31/src/stream/buffered.rs new file mode 100644 index 0000000000..5616b73d7a --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/buffered.rs @@ -0,0 +1,132 @@ +use std::fmt; + +use {Async, IntoFuture, Poll}; +use stream::{Stream, Fuse, FuturesOrdered}; + +/// An adaptor for a stream of futures to execute the futures concurrently, if +/// possible. +/// +/// This adaptor will buffer up a list of pending futures, and then return their +/// results in the order that they were pulled out of the original stream. This +/// is created by the `Stream::buffered` method. +#[must_use = "streams do nothing unless polled"] +pub struct Buffered<S> + where S: Stream, + S::Item: IntoFuture, +{ + stream: Fuse<S>, + queue: FuturesOrdered<<S::Item as IntoFuture>::Future>, + max: usize, +} + +impl<S> fmt::Debug for Buffered<S> + where S: Stream + fmt::Debug, + S::Item: IntoFuture, + <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug, + <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug, + <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Buffered") + .field("stream", &self.stream) + .field("queue", &self.queue) + .field("max", &self.max) + .finish() + } +} + +pub fn new<S>(s: S, amt: usize) -> Buffered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + Buffered { + stream: super::fuse::new(s), + queue: FuturesOrdered::new(), + max: amt, + } +} + +impl<S> Buffered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + self.stream.get_ref() + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + self.stream.get_mut() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Buffered<S> + where S: ::sink::Sink + Stream, + S::Item: IntoFuture, +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S> Stream for Buffered<S> + where S: Stream, + S::Item: IntoFuture<Error=<S as Stream>::Error>, +{ + type Item = <S::Item as IntoFuture>::Item; + type Error = <S as Stream>::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First up, try to spawn off as many futures as possible by filling up + // our slab of futures. + while self.queue.len() < self.max { + let future = match self.stream.poll()? { + Async::Ready(Some(s)) => s.into_future(), + Async::Ready(None) | + Async::NotReady => break, + }; + + self.queue.push(future); + } + + // Try polling a new future + if let Some(val) = try_ready!(self.queue.poll()) { + return Ok(Async::Ready(Some(val))); + } + + // If we've gotten this far, then there are no events for us to process + // and nothing was ready, so figure out if we're not done yet or if + // we've reached the end. + if self.stream.is_done() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs new file mode 100644 index 0000000000..d3244946e5 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs @@ -0,0 +1,71 @@ +use std::prelude::v1::*; +use std::any::Any; +use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; +use std::mem; + +use super::super::{Poll, Async}; +use super::Stream; + +/// Stream for the `catch_unwind` combinator. +/// +/// This is created by the `Stream::catch_unwind` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct CatchUnwind<S> where S: Stream { + state: CatchUnwindState<S>, +} + +pub fn new<S>(stream: S) -> CatchUnwind<S> + where S: Stream + UnwindSafe, +{ + CatchUnwind { + state: CatchUnwindState::Stream(stream), + } +} + +#[derive(Debug)] +enum CatchUnwindState<S> { + Stream(S), + Eof, + Done, +} + +impl<S> Stream for CatchUnwind<S> + where S: Stream + UnwindSafe, +{ + type Item = Result<S::Item, S::Error>; + type Error = Box<Any + Send>; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) { + CatchUnwindState::Done => panic!("cannot poll after eof"), + CatchUnwindState::Eof => { + self.state = CatchUnwindState::Done; + return Ok(Async::Ready(None)); + } + CatchUnwindState::Stream(stream) => stream, + }; + let res = catch_unwind(|| (stream.poll(), stream)); + match res { + Err(e) => Err(e), // and state is already Eof + Ok((poll, stream)) => { + self.state = CatchUnwindState::Stream(stream); + match poll { + Err(e) => Ok(Async::Ready(Some(Err(e)))), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + } + } + } + } +} + +impl<S: Stream> Stream for AssertUnwindSafe<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.0.poll() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/chain.rs b/third_party/rust/futures-0.1.31/src/stream/chain.rs new file mode 100644 index 0000000000..0ff0e5ce6f --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/chain.rs @@ -0,0 +1,57 @@ +use core::mem; + +use stream::Stream; +use {Async, Poll}; + + +/// State of chain stream. +#[derive(Debug)] +enum State<S1, S2> { + /// Emitting elements of first stream + First(S1, S2), + /// Emitting elements of second stream + Second(S2), + /// Temporary value to replace first with second + Temp, +} + +/// An adapter for chaining the output of two streams. +/// +/// The resulting stream produces items from first stream and then +/// from second stream. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Chain<S1, S2> { + state: State<S1, S2> +} + +pub fn new<S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2> + where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>, +{ + Chain { state: State::First(s1, s2) } +} + +impl<S1, S2> Stream for Chain<S1, S2> + where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>, +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + match self.state { + State::First(ref mut s1, ref _s2) => match s1.poll() { + Ok(Async::Ready(None)) => (), // roll + x => return x, + }, + State::Second(ref mut s2) => return s2.poll(), + State::Temp => unreachable!(), + } + + self.state = match mem::replace(&mut self.state, State::Temp) { + State::First(_s1, s2) => State::Second(s2), + _ => unreachable!(), + }; + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/channel.rs b/third_party/rust/futures-0.1.31/src/stream/channel.rs new file mode 100644 index 0000000000..89a419d150 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/channel.rs @@ -0,0 +1,114 @@ +#![cfg(feature = "with-deprecated")] +#![deprecated(since = "0.1.4", note = "use sync::mpsc::channel instead")] +#![allow(deprecated)] + +use std::any::Any; +use std::error::Error; +use std::fmt; + +use {Poll, Async, Stream, Future, Sink}; +use sink::Send; +use sync::mpsc; + +/// Creates an in-memory channel implementation of the `Stream` trait. +/// +/// This method creates a concrete implementation of the `Stream` trait which +/// can be used to send values across threads in a streaming fashion. This +/// channel is unique in that it implements back pressure to ensure that the +/// sender never outpaces the receiver. The `Sender::send` method will only +/// allow sending one message and the next message can only be sent once the +/// first was consumed. +/// +/// The `Receiver` returned implements the `Stream` trait and has access to any +/// number of the associated combinators for transforming the result. +pub fn channel<T, E>() -> (Sender<T, E>, Receiver<T, E>) { + let (tx, rx) = mpsc::channel(0); + (Sender { inner: tx }, Receiver { inner: rx }) +} + +/// The transmission end of a channel which is used to send values. +/// +/// This is created by the `channel` method in the `stream` module. +#[derive(Debug)] +pub struct Sender<T, E> { + inner: mpsc::Sender<Result<T, E>>, +} + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is a concrete implementation of a stream which can be used to represent +/// a stream of values being computed elsewhere. This is created by the +/// `channel` method in the `stream` module. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Receiver<T, E> { + inner: mpsc::Receiver<Result<T, E>>, +} + +/// Error type for sending, used when the receiving end of the channel is dropped +pub struct SendError<T, E>(Result<T, E>); + +/// Future returned by `Sender::send`. +#[derive(Debug)] +pub struct FutureSender<T, E> { + inner: Send<mpsc::Sender<Result<T, E>>>, +} + +impl<T, E> fmt::Debug for SendError<T, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError") + .field(&"...") + .finish() + } +} + +impl<T, E> fmt::Display for SendError<T, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl<T, E> Error for SendError<T, E> + where T: Any, E: Any +{ + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + + +impl<T, E> Stream for Receiver<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<T>, E> { + match self.inner.poll().expect("cannot fail") { + Async::Ready(Some(Ok(e))) => Ok(Async::Ready(Some(e))), + Async::Ready(Some(Err(e))) => Err(e), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +impl<T, E> Sender<T, E> { + /// Sends a new value along this channel to the receiver. + /// + /// This method consumes the sender and returns a future which will resolve + /// to the sender again when the value sent has been consumed. + pub fn send(self, t: Result<T, E>) -> FutureSender<T, E> { + FutureSender { inner: self.inner.send(t) } + } +} + +impl<T, E> Future for FutureSender<T, E> { + type Item = Sender<T, E>; + type Error = SendError<T, E>; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + match self.inner.poll() { + Ok(a) => Ok(a.map(|a| Sender { inner: a })), + Err(e) => Err(SendError(e.into_inner())), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/chunks.rs b/third_party/rust/futures-0.1.31/src/stream/chunks.rs new file mode 100644 index 0000000000..dbfaeb89ec --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/chunks.rs @@ -0,0 +1,136 @@ +use std::mem; +use std::prelude::v1::*; + +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// An adaptor that chunks up elements in a vector. +/// +/// This adaptor will buffer up a list of items in the stream and pass on the +/// vector used for buffering when a specified capacity has been reached. This +/// is created by the `Stream::chunks` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Chunks<S> + where S: Stream +{ + items: Vec<S::Item>, + err: Option<S::Error>, + stream: Fuse<S>, + cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475 +} + +pub fn new<S>(s: S, capacity: usize) -> Chunks<S> + where S: Stream +{ + assert!(capacity > 0); + + Chunks { + items: Vec::with_capacity(capacity), + err: None, + stream: super::fuse::new(s), + cap: capacity, + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Chunks<S> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + + +impl<S> Chunks<S> where S: Stream { + fn take(&mut self) -> Vec<S::Item> { + let cap = self.cap; + mem::replace(&mut self.items, Vec::with_capacity(cap)) + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + self.stream.get_ref() + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + self.stream.get_mut() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +impl<S> Stream for Chunks<S> + where S: Stream +{ + type Item = Vec<<S as Stream>::Item>; + type Error = <S as Stream>::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if let Some(err) = self.err.take() { + return Err(err) + } + + loop { + match self.stream.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + + // Push the item into the buffer and check whether it is full. + // If so, replace our buffer with a new and empty one and return + // the full one. + Ok(Async::Ready(Some(item))) => { + self.items.push(item); + if self.items.len() >= self.cap { + return Ok(Some(self.take()).into()) + } + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Ok(Async::Ready(None)) => { + return if self.items.len() > 0 { + let full_buf = mem::replace(&mut self.items, Vec::new()); + Ok(Some(full_buf).into()) + } else { + Ok(Async::Ready(None)) + } + } + + // If we've got buffered items be sure to return them first, + // we'll defer our error for later. + Err(e) => { + if self.items.len() == 0 { + return Err(e) + } else { + self.err = Some(e); + return Ok(Some(self.take()).into()) + } + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/collect.rs b/third_party/rust/futures-0.1.31/src/stream/collect.rs new file mode 100644 index 0000000000..8bd9d0e1dc --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/collect.rs @@ -0,0 +1,52 @@ +use std::prelude::v1::*; + +use std::mem; + +use {Future, Poll, Async}; +use stream::Stream; + +/// A future which collects all of the values of a stream into a vector. +/// +/// This future is created by the `Stream::collect` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Collect<S> where S: Stream { + stream: S, + items: Vec<S::Item>, +} + +pub fn new<S>(s: S) -> Collect<S> + where S: Stream, +{ + Collect { + stream: s, + items: Vec::new(), + } +} + +impl<S: Stream> Collect<S> { + fn finish(&mut self) -> Vec<S::Item> { + mem::replace(&mut self.items, Vec::new()) + } +} + +impl<S> Future for Collect<S> + where S: Stream, +{ + type Item = Vec<S::Item>; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Vec<S::Item>, S::Error> { + loop { + match self.stream.poll() { + Ok(Async::Ready(Some(e))) => self.items.push(e), + Ok(Async::Ready(None)) => return Ok(Async::Ready(self.finish())), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + self.finish(); + return Err(e) + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/concat.rs b/third_party/rust/futures-0.1.31/src/stream/concat.rs new file mode 100644 index 0000000000..a0da71bdd5 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/concat.rs @@ -0,0 +1,172 @@ +use core::mem; +use core::fmt::{Debug, Formatter, Result as FmtResult}; +use core::default::Default; + +use {Poll, Async}; +use future::Future; +use stream::Stream; + +/// A stream combinator to concatenate the results of a stream into the first +/// yielded item. +/// +/// This structure is produced by the `Stream::concat2` method. +#[must_use = "streams do nothing unless polled"] +pub struct Concat2<S> + where S: Stream, +{ + inner: ConcatSafe<S> +} + +impl<S: Debug> Debug for Concat2<S> where S: Stream, S::Item: Debug { + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + fmt.debug_struct("Concat2") + .field("inner", &self.inner) + .finish() + } +} + +pub fn new2<S>(s: S) -> Concat2<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, +{ + Concat2 { + inner: new_safe(s) + } +} + +impl<S> Future for Concat2<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, + +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + self.inner.poll().map(|a| { + match a { + Async::NotReady => Async::NotReady, + Async::Ready(None) => Async::Ready(Default::default()), + Async::Ready(Some(e)) => Async::Ready(e) + } + }) + } +} + + +/// A stream combinator to concatenate the results of a stream into the first +/// yielded item. +/// +/// This structure is produced by the `Stream::concat` method. +#[deprecated(since="0.1.18", note="please use `Stream::Concat2` instead")] +#[must_use = "streams do nothing unless polled"] +pub struct Concat<S> + where S: Stream, +{ + inner: ConcatSafe<S> +} + +#[allow(deprecated)] +impl<S: Debug> Debug for Concat<S> where S: Stream, S::Item: Debug { + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + fmt.debug_struct("Concat") + .field("inner", &self.inner) + .finish() + } +} + +#[allow(deprecated)] +pub fn new<S>(s: S) -> Concat<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator, +{ + Concat { + inner: new_safe(s) + } +} + +#[allow(deprecated)] +impl<S> Future for Concat<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator, + +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + self.inner.poll().map(|a| { + match a { + Async::NotReady => Async::NotReady, + Async::Ready(None) => panic!("attempted concatenation of empty stream"), + Async::Ready(Some(e)) => Async::Ready(e) + } + }) + } +} + + +#[derive(Debug)] +struct ConcatSafe<S> + where S: Stream, +{ + stream: S, + extend: Inner<S::Item>, +} + +fn new_safe<S>(s: S) -> ConcatSafe<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator, +{ + ConcatSafe { + stream: s, + extend: Inner::First, + } +} + +impl<S> Future for ConcatSafe<S> + where S: Stream, + S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator, + +{ + type Item = Option<S::Item>; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + loop { + match self.stream.poll() { + Ok(Async::Ready(Some(i))) => { + match self.extend { + Inner::First => { + self.extend = Inner::Extending(i); + }, + Inner::Extending(ref mut e) => { + e.extend(i); + }, + Inner::Done => unreachable!(), + } + }, + Ok(Async::Ready(None)) => { + match mem::replace(&mut self.extend, Inner::Done) { + Inner::First => return Ok(Async::Ready(None)), + Inner::Extending(e) => return Ok(Async::Ready(Some(e))), + Inner::Done => panic!("cannot poll Concat again") + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + self.extend = Inner::Done; + return Err(e) + } + } + } + } +} + + +#[derive(Debug)] +enum Inner<E> { + First, + Extending(E), + Done, +} diff --git a/third_party/rust/futures-0.1.31/src/stream/empty.rs b/third_party/rust/futures-0.1.31/src/stream/empty.rs new file mode 100644 index 0000000000..c53fb80238 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/empty.rs @@ -0,0 +1,29 @@ +use core::marker; + +use stream::Stream; +use {Poll, Async}; + +/// A stream which contains no elements. +/// +/// This stream can be created with the `stream::empty` function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Empty<T, E> { + _data: marker::PhantomData<(T, E)>, +} + +/// Creates a stream which contains no elements. +/// +/// The returned stream will always return `Ready(None)` when polled. +pub fn empty<T, E>() -> Empty<T, E> { + Empty { _data: marker::PhantomData } +} + +impl<T, E> Stream for Empty<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::Ready(None)) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/filter.rs b/third_party/rust/futures-0.1.31/src/stream/filter.rs new file mode 100644 index 0000000000..99c4abd657 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/filter.rs @@ -0,0 +1,89 @@ +use {Async, Poll}; +use stream::Stream; + +/// A stream combinator used to filter the results of a stream and only yield +/// some values. +/// +/// This structure is produced by the `Stream::filter` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Filter<S, F> { + stream: S, + f: F, +} + +pub fn new<S, F>(s: S, f: F) -> Filter<S, F> + where S: Stream, + F: FnMut(&S::Item) -> bool, +{ + Filter { + stream: s, + f: f, + } +} + +impl<S, F> Filter<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for Filter<S, F> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F> Stream for Filter<S, F> + where S: Stream, + F: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + loop { + match try_ready!(self.stream.poll()) { + Some(e) => { + if (self.f)(&e) { + return Ok(Async::Ready(Some(e))) + } + } + None => return Ok(Async::Ready(None)), + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/filter_map.rs b/third_party/rust/futures-0.1.31/src/stream/filter_map.rs new file mode 100644 index 0000000000..f91d26a45c --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/filter_map.rs @@ -0,0 +1,89 @@ +use {Async, Poll}; +use stream::Stream; + +/// A combinator used to filter the results of a stream and simultaneously map +/// them to a different type. +/// +/// This structure is returned by the `Stream::filter_map` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct FilterMap<S, F> { + stream: S, + f: F, +} + +pub fn new<S, F, B>(s: S, f: F) -> FilterMap<S, F> + where S: Stream, + F: FnMut(S::Item) -> Option<B>, +{ + FilterMap { + stream: s, + f: f, + } +} + +impl<S, F> FilterMap<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for FilterMap<S, F> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, B> Stream for FilterMap<S, F> + where S: Stream, + F: FnMut(S::Item) -> Option<B>, +{ + type Item = B; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<B>, S::Error> { + loop { + match try_ready!(self.stream.poll()) { + Some(e) => { + if let Some(e) = (self.f)(e) { + return Ok(Async::Ready(Some(e))) + } + } + None => return Ok(Async::Ready(None)), + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/flatten.rs b/third_party/rust/futures-0.1.31/src/stream/flatten.rs new file mode 100644 index 0000000000..4baf9045a0 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/flatten.rs @@ -0,0 +1,96 @@ +use {Poll, Async}; +use stream::Stream; + +/// A combinator used to flatten a stream-of-streams into one long stream of +/// elements. +/// +/// This combinator is created by the `Stream::flatten` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Flatten<S> + where S: Stream, +{ + stream: S, + next: Option<S::Item>, +} + +pub fn new<S>(s: S) -> Flatten<S> + where S: Stream, + S::Item: Stream, + <S::Item as Stream>::Error: From<S::Error>, +{ + Flatten { + stream: s, + next: None, + } +} + +impl<S: Stream> Flatten<S> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Flatten<S> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S> Stream for Flatten<S> + where S: Stream, + S::Item: Stream, + <S::Item as Stream>::Error: From<S::Error>, +{ + type Item = <S::Item as Stream>::Item; + type Error = <S::Item as Stream>::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + if self.next.is_none() { + match try_ready!(self.stream.poll()) { + Some(e) => self.next = Some(e), + None => return Ok(Async::Ready(None)), + } + } + assert!(self.next.is_some()); + match self.next.as_mut().unwrap().poll() { + Ok(Async::Ready(None)) => self.next = None, + other => return other, + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/fold.rs b/third_party/rust/futures-0.1.31/src/stream/fold.rs new file mode 100644 index 0000000000..7fa24b449d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/fold.rs @@ -0,0 +1,81 @@ +use core::mem; + +use {Future, Poll, IntoFuture, Async}; +use stream::Stream; + +/// A future used to collect all the results of a stream into one generic type. +/// +/// This future is returned by the `Stream::fold` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Fold<S, F, Fut, T> where Fut: IntoFuture { + stream: S, + f: F, + state: State<T, Fut::Future>, +} + +#[derive(Debug)] +enum State<T, F> where F: Future { + /// Placeholder state when doing work + Empty, + + /// Ready to process the next stream item; current accumulator is the `T` + Ready(T), + + /// Working on a future the process the previous stream item + Processing(F), +} + +pub fn new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T> + where S: Stream, + F: FnMut(T, S::Item) -> Fut, + Fut: IntoFuture<Item = T>, + S::Error: From<Fut::Error>, +{ + Fold { + stream: s, + f: f, + state: State::Ready(t), + } +} + +impl<S, F, Fut, T> Future for Fold<S, F, Fut, T> + where S: Stream, + F: FnMut(T, S::Item) -> Fut, + Fut: IntoFuture<Item = T>, + S::Error: From<Fut::Error>, +{ + type Item = T; + type Error = S::Error; + + fn poll(&mut self) -> Poll<T, S::Error> { + loop { + match mem::replace(&mut self.state, State::Empty) { + State::Empty => panic!("cannot poll Fold twice"), + State::Ready(state) => { + match self.stream.poll()? { + Async::Ready(Some(e)) => { + let future = (self.f)(state, e); + let future = future.into_future(); + self.state = State::Processing(future); + } + Async::Ready(None) => return Ok(Async::Ready(state)), + Async::NotReady => { + self.state = State::Ready(state); + return Ok(Async::NotReady) + } + } + } + State::Processing(mut fut) => { + match fut.poll()? { + Async::Ready(state) => self.state = State::Ready(state), + Async::NotReady => { + self.state = State::Processing(fut); + return Ok(Async::NotReady) + } + } + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/for_each.rs b/third_party/rust/futures-0.1.31/src/stream/for_each.rs new file mode 100644 index 0000000000..c7e1cde5bb --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/for_each.rs @@ -0,0 +1,51 @@ +use {Async, Future, IntoFuture, Poll}; +use stream::Stream; + +/// A stream combinator which executes a unit closure over each item on a +/// stream. +/// +/// This structure is returned by the `Stream::for_each` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct ForEach<S, F, U> where U: IntoFuture { + stream: S, + f: F, + fut: Option<U::Future>, +} + +pub fn new<S, F, U>(s: S, f: F) -> ForEach<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Item = (), Error = S::Error>, +{ + ForEach { + stream: s, + f: f, + fut: None, + } +} + +impl<S, F, U> Future for ForEach<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Item= (), Error = S::Error>, +{ + type Item = (); + type Error = S::Error; + + fn poll(&mut self) -> Poll<(), S::Error> { + loop { + if let Some(mut fut) = self.fut.take() { + if fut.poll()?.is_not_ready() { + self.fut = Some(fut); + return Ok(Async::NotReady); + } + } + + match try_ready!(self.stream.poll()) { + Some(e) => self.fut = Some((self.f)(e).into_future()), + None => return Ok(Async::Ready(())), + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/forward.rs b/third_party/rust/futures-0.1.31/src/stream/forward.rs new file mode 100644 index 0000000000..6722af8c20 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/forward.rs @@ -0,0 +1,110 @@ +use {Poll, Async, Future, AsyncSink}; +use stream::{Stream, Fuse}; +use sink::Sink; + +/// Future for the `Stream::forward` combinator, which sends a stream of values +/// to a sink and then waits until the sink has fully flushed those values. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Forward<T: Stream, U> { + sink: Option<U>, + stream: Option<Fuse<T>>, + buffered: Option<T::Item>, +} + + +pub fn new<T, U>(stream: T, sink: U) -> Forward<T, U> + where U: Sink<SinkItem=T::Item>, + T: Stream, + T::Error: From<U::SinkError>, +{ + Forward { + sink: Some(sink), + stream: Some(stream.fuse()), + buffered: None, + } +} + +impl<T, U> Forward<T, U> + where U: Sink<SinkItem=T::Item>, + T: Stream, + T::Error: From<U::SinkError>, +{ + /// Get a shared reference to the inner sink. + /// If this combinator has already been polled to completion, None will be returned. + pub fn sink_ref(&self) -> Option<&U> { + self.sink.as_ref() + } + + /// Get a mutable reference to the inner sink. + /// If this combinator has already been polled to completion, None will be returned. + pub fn sink_mut(&mut self) -> Option<&mut U> { + self.sink.as_mut() + } + + /// Get a shared reference to the inner stream. + /// If this combinator has already been polled to completion, None will be returned. + pub fn stream_ref(&self) -> Option<&T> { + self.stream.as_ref().map(|x| x.get_ref()) + } + + /// Get a mutable reference to the inner stream. + /// If this combinator has already been polled to completion, None will be returned. + pub fn stream_mut(&mut self) -> Option<&mut T> { + self.stream.as_mut().map(|x| x.get_mut()) + } + + fn take_result(&mut self) -> (T, U) { + let sink = self.sink.take() + .expect("Attempted to poll Forward after completion"); + let fuse = self.stream.take() + .expect("Attempted to poll Forward after completion"); + (fuse.into_inner(), sink) + } + + fn try_start_send(&mut self, item: T::Item) -> Poll<(), U::SinkError> { + debug_assert!(self.buffered.is_none()); + if let AsyncSink::NotReady(item) = self.sink_mut() + .expect("Attempted to poll Forward after completion") + .start_send(item)? + { + self.buffered = Some(item); + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + } +} + +impl<T, U> Future for Forward<T, U> + where U: Sink<SinkItem=T::Item>, + T: Stream, + T::Error: From<U::SinkError>, +{ + type Item = (T, U); + type Error = T::Error; + + fn poll(&mut self) -> Poll<(T, U), T::Error> { + // If we've got an item buffered already, we need to write it to the + // sink before we can do anything else + if let Some(item) = self.buffered.take() { + try_ready!(self.try_start_send(item)) + } + + loop { + match self.stream.as_mut() + .expect("Attempted to poll Forward after completion") + .poll()? + { + Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)), + Async::Ready(None) => { + try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").close()); + return Ok(Async::Ready(self.take_result())) + } + Async::NotReady => { + try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").poll_complete()); + return Ok(Async::NotReady) + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/from_err.rs b/third_party/rust/futures-0.1.31/src/stream/from_err.rs new file mode 100644 index 0000000000..4028542dfc --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/from_err.rs @@ -0,0 +1,80 @@ +use core::marker::PhantomData; +use poll::Poll; +use Async; +use stream::Stream; + +/// A stream combinator to change the error type of a stream. +/// +/// This is created by the `Stream::from_err` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct FromErr<S, E> { + stream: S, + f: PhantomData<E> +} + +pub fn new<S, E>(stream: S) -> FromErr<S, E> + where S: Stream +{ + FromErr { + stream: stream, + f: PhantomData + } +} + +impl<S, E> FromErr<S, E> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + + +impl<S: Stream, E: From<S::Error>> Stream for FromErr<S, E> { + type Item = S::Item; + type Error = E; + + fn poll(&mut self) -> Poll<Option<S::Item>, E> { + let e = match self.stream.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + other => other, + }; + e.map_err(From::from) + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S: Stream + ::sink::Sink, E> ::sink::Sink for FromErr<S, E> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> ::StartSend<Self::SinkItem, Self::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.stream.close() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/fuse.rs b/third_party/rust/futures-0.1.31/src/stream/fuse.rs new file mode 100644 index 0000000000..e39c31f348 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/fuse.rs @@ -0,0 +1,89 @@ +use {Poll, Async}; +use stream::Stream; + +/// A stream which "fuse"s a stream once it's terminated. +/// +/// Normally streams can behave unpredictably when used after they have already +/// finished, but `Fuse` continues to return `None` from `poll` forever when +/// finished. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Fuse<S> { + stream: S, + done: bool, +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Fuse<S> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +pub fn new<S: Stream>(s: S) -> Fuse<S> { + Fuse { stream: s, done: false } +} + +impl<S: Stream> Stream for Fuse<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + if self.done { + Ok(Async::Ready(None)) + } else { + let r = self.stream.poll(); + if let Ok(Async::Ready(None)) = r { + self.done = true; + } + r + } + } +} + +impl<S> Fuse<S> { + /// Returns whether the underlying stream has finished or not. + /// + /// If this method returns `true`, then all future calls to poll are + /// guaranteed to return `None`. If this returns `false`, then the + /// underlying stream is still in use. + pub fn is_done(&self) -> bool { + self.done + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/future.rs b/third_party/rust/futures-0.1.31/src/stream/future.rs new file mode 100644 index 0000000000..5b052ee4d3 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/future.rs @@ -0,0 +1,76 @@ +use {Future, Poll, Async}; +use stream::Stream; + +/// A combinator used to temporarily convert a stream into a future. +/// +/// This future is returned by the `Stream::into_future` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct StreamFuture<S> { + stream: Option<S>, +} + +pub fn new<S: Stream>(s: S) -> StreamFuture<S> { + StreamFuture { stream: Some(s) } +} + +impl<S> StreamFuture<S> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + /// + /// This method returns an `Option` to account for the fact that `StreamFuture`'s + /// implementation of `Future::poll` consumes the underlying stream during polling + /// in order to return it to the caller of `Future::poll` if the stream yielded + /// an element. + pub fn get_ref(&self) -> Option<&S> { + self.stream.as_ref() + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + /// + /// This method returns an `Option` to account for the fact that `StreamFuture`'s + /// implementation of `Future::poll` consumes the underlying stream during polling + /// in order to return it to the caller of `Future::poll` if the stream yielded + /// an element. + pub fn get_mut(&mut self) -> Option<&mut S> { + self.stream.as_mut() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + /// + /// This method returns an `Option` to account for the fact that `StreamFuture`'s + /// implementation of `Future::poll` consumes the underlying stream during polling + /// in order to return it to the caller of `Future::poll` if the stream yielded + /// an element. + pub fn into_inner(self) -> Option<S> { + self.stream + } +} + +impl<S: Stream> Future for StreamFuture<S> { + type Item = (Option<S::Item>, S); + type Error = (S::Error, S); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let item = { + let s = self.stream.as_mut().expect("polling StreamFuture twice"); + match s.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(e)) => Ok(e), + Err(e) => Err(e), + } + }; + let stream = self.stream.take().unwrap(); + match item { + Ok(e) => Ok(Async::Ready((e, stream))), + Err(e) => Err((e, stream)), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs b/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs new file mode 100644 index 0000000000..561bbb5189 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/futures_ordered.rs @@ -0,0 +1,219 @@ +use std::cmp::{Eq, PartialEq, PartialOrd, Ord, Ordering}; +use std::collections::BinaryHeap; +use std::fmt::{self, Debug}; +use std::iter::FromIterator; + +use {Async, Future, IntoFuture, Poll, Stream}; +use stream::FuturesUnordered; + +#[derive(Debug)] +struct OrderWrapper<T> { + item: T, + index: usize, +} + +impl<T> PartialEq for OrderWrapper<T> { + fn eq(&self, other: &Self) -> bool { + self.index == other.index + } +} + +impl<T> Eq for OrderWrapper<T> {} + +impl<T> PartialOrd for OrderWrapper<T> { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl<T> Ord for OrderWrapper<T> { + fn cmp(&self, other: &Self) -> Ordering { + // BinaryHeap is a max heap, so compare backwards here. + other.index.cmp(&self.index) + } +} + +impl<T> Future for OrderWrapper<T> + where T: Future +{ + type Item = OrderWrapper<T::Item>; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let result = try_ready!(self.item.poll()); + Ok(Async::Ready(OrderWrapper { + item: result, + index: self.index + })) + } +} + +/// An unbounded queue of futures. +/// +/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order +/// on top of the set of futures. While futures in the set will race to +/// completion in parallel, results will only be returned in the order their +/// originating futures were added to the queue. +/// +/// Futures are pushed into this queue and their realized values are yielded in +/// order. This structure is optimized to manage a large number of futures. +/// Futures managed by `FuturesOrdered` will only be polled when they generate +/// notifications. This reduces the required amount of work needed to coordinate +/// large numbers of futures. +/// +/// When a `FuturesOrdered` is first created, it does not contain any futures. +/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be +/// returned. Futures are submitted to the queue using `push`; however, the +/// future will **not** be polled at this point. `FuturesOrdered` will only +/// poll managed futures when `FuturesOrdered::poll` is called. As such, it +/// is important to call `poll` after pushing new futures. +/// +/// If `FuturesOrdered::poll` returns `Ok(Async::Ready(None))` this means that +/// the queue is currently not managing any futures. A future may be submitted +/// to the queue at a later time. At that point, a call to +/// `FuturesOrdered::poll` will either return the future's resolved value +/// **or** `Ok(Async::NotReady)` if the future has not yet completed. When +/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will +/// return `Ok(Async::NotReady)` until the first future completes, even if +/// some of the later futures have already completed. +/// +/// Note that you can create a ready-made `FuturesOrdered` via the +/// `futures_ordered` function in the `stream` module, or you can start with an +/// empty queue with the `FuturesOrdered::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct FuturesOrdered<T> + where T: Future +{ + in_progress: FuturesUnordered<OrderWrapper<T>>, + queued_results: BinaryHeap<OrderWrapper<T::Item>>, + next_incoming_index: usize, + next_outgoing_index: usize, +} + +/// Converts a list of futures into a `Stream` of results from the futures. +/// +/// This function will take an list of futures (e.g. a vector, an iterator, +/// etc), and return a stream. The stream will yield items as they become +/// available on the futures internally, in the order that their originating +/// futures were submitted to the queue. If the futures complete out of order, +/// items will be stored internally within `FuturesOrdered` until all preceding +/// items have been yielded. +/// +/// Note that the returned queue can also be used to dynamically push more +/// futures into the queue as they become available. +pub fn futures_ordered<I>(futures: I) -> FuturesOrdered<<I::Item as IntoFuture>::Future> + where I: IntoIterator, + I::Item: IntoFuture +{ + let mut queue = FuturesOrdered::new(); + + for future in futures { + queue.push(future.into_future()); + } + + return queue +} + +impl<T> Default for FuturesOrdered<T> where T: Future { + fn default() -> Self { + FuturesOrdered::new() + } +} + +impl<T> FuturesOrdered<T> + where T: Future +{ + /// Constructs a new, empty `FuturesOrdered` + /// + /// The returned `FuturesOrdered` does not contain any futures and, in this + /// state, `FuturesOrdered::poll` will return `Ok(Async::Ready(None))`. + pub fn new() -> FuturesOrdered<T> { + FuturesOrdered { + in_progress: FuturesUnordered::new(), + queued_results: BinaryHeap::new(), + next_incoming_index: 0, + next_outgoing_index: 0, + } + } + + /// Returns the number of futures contained in the queue. + /// + /// This represents the total number of in-flight futures, both + /// those currently processing and those that have completed but + /// which are waiting for earlier futures to complete. + pub fn len(&self) -> usize { + self.in_progress.len() + self.queued_results.len() + } + + /// Returns `true` if the queue contains no futures + pub fn is_empty(&self) -> bool { + self.in_progress.is_empty() && self.queued_results.is_empty() + } + + /// Push a future into the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push(&mut self, future: T) { + let wrapped = OrderWrapper { + item: future, + index: self.next_incoming_index, + }; + self.next_incoming_index += 1; + self.in_progress.push(wrapped); + } +} + +impl<T> Stream for FuturesOrdered<T> + where T: Future +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // Get any completed futures from the unordered set. + loop { + match self.in_progress.poll()? { + Async::Ready(Some(result)) => self.queued_results.push(result), + Async::Ready(None) | Async::NotReady => break, + } + } + + if let Some(next_result) = self.queued_results.peek() { + // PeekMut::pop is not stable yet QQ + if next_result.index != self.next_outgoing_index { + return Ok(Async::NotReady); + } + } else if !self.in_progress.is_empty() { + return Ok(Async::NotReady); + } else { + return Ok(Async::Ready(None)); + } + + let next_result = self.queued_results.pop().unwrap(); + self.next_outgoing_index += 1; + Ok(Async::Ready(Some(next_result.item))) + } +} + +impl<T: Debug> Debug for FuturesOrdered<T> + where T: Future +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "FuturesOrdered {{ ... }}") + } +} + +impl<F: Future> FromIterator<F> for FuturesOrdered<F> { + fn from_iter<T>(iter: T) -> Self + where T: IntoIterator<Item = F> + { + let mut new = FuturesOrdered::new(); + for future in iter.into_iter() { + new.push(future); + } + new + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs b/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs new file mode 100644 index 0000000000..3f25c86f39 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/futures_unordered.rs @@ -0,0 +1,707 @@ +//! An unbounded set of futures. + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::usize; + +use {task, Stream, Future, Poll, Async}; +use executor::{Notify, UnsafeNotify, NotifyHandle}; +use task_impl::{self, AtomicTask}; + +/// An unbounded set of futures. +/// +/// This "combinator" also serves a special function in this library, providing +/// the ability to maintain a set of futures that and manage driving them all +/// to completion. +/// +/// Futures are pushed into this set and their realized values are yielded as +/// they are ready. This structure is optimized to manage a large number of +/// futures. Futures managed by `FuturesUnordered` will only be polled when they +/// generate notifications. This reduces the required amount of work needed to +/// coordinate large numbers of futures. +/// +/// When a `FuturesUnordered` is first created, it does not contain any futures. +/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be +/// returned. Futures are submitted to the set using `push`; however, the +/// future will **not** be polled at this point. `FuturesUnordered` will only +/// poll managed futures when `FuturesUnordered::poll` is called. As such, it +/// is important to call `poll` after pushing new futures. +/// +/// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that +/// the set is currently not managing any futures. A future may be submitted +/// to the set at a later time. At that point, a call to +/// `FuturesUnordered::poll` will either return the future's resolved value +/// **or** `Ok(Async::NotReady)` if the future has not yet completed. +/// +/// Note that you can create a ready-made `FuturesUnordered` via the +/// `futures_unordered` function in the `stream` module, or you can start with an +/// empty set with the `FuturesUnordered::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct FuturesUnordered<F> { + inner: Arc<Inner<F>>, + len: usize, + head_all: *const Node<F>, +} + +unsafe impl<T: Send> Send for FuturesUnordered<T> {} +unsafe impl<T: Sync> Sync for FuturesUnordered<T> {} + +// FuturesUnordered is implemented using two linked lists. One which links all +// futures managed by a `FuturesUnordered` and one that tracks futures that have +// been scheduled for polling. The first linked list is not thread safe and is +// only accessed by the thread that owns the `FuturesUnordered` value. The +// second linked list is an implementation of the intrusive MPSC queue algorithm +// described by 1024cores.net. +// +// When a future is submitted to the set a node is allocated and inserted in +// both linked lists. The next call to `poll` will (eventually) see this node +// and call `poll` on the future. +// +// Before a managed future is polled, the current task's `Notify` is replaced +// with one that is aware of the specific future being run. This ensures that +// task notifications generated by that specific future are visible to +// `FuturesUnordered`. When a notification is received, the node is scheduled +// for polling by being inserted into the concurrent linked list. +// +// Each node uses an `AtomicUsize` to track it's state. The node state is the +// reference count (the number of outstanding handles to the node) as well as a +// flag tracking if the node is currently inserted in the atomic queue. When the +// future is notified, it will only insert itself into the linked list if it +// isn't currently inserted. + +#[allow(missing_debug_implementations)] +struct Inner<T> { + // The task using `FuturesUnordered`. + parent: AtomicTask, + + // Head/tail of the readiness queue + head_readiness: AtomicPtr<Node<T>>, + tail_readiness: UnsafeCell<*const Node<T>>, + stub: Arc<Node<T>>, +} + +struct Node<T> { + // The future + future: UnsafeCell<Option<T>>, + + // Next pointer for linked list tracking all active nodes + next_all: UnsafeCell<*const Node<T>>, + + // Previous node in linked list tracking all active nodes + prev_all: UnsafeCell<*const Node<T>>, + + // Next pointer in readiness queue + next_readiness: AtomicPtr<Node<T>>, + + // Queue that we'll be enqueued to when notified + queue: Weak<Inner<T>>, + + // Whether or not this node is currently in the mpsc queue. + queued: AtomicBool, +} + +enum Dequeue<T> { + Data(*const Node<T>), + Empty, + Inconsistent, +} + +impl<T> Default for FuturesUnordered<T> where T: Future { + fn default() -> Self { + FuturesUnordered::new() + } +} + +impl<T> FuturesUnordered<T> + where T: Future, +{ + /// Constructs a new, empty `FuturesUnordered` + /// + /// The returned `FuturesUnordered` does not contain any futures and, in this + /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`. + pub fn new() -> FuturesUnordered<T> { + let stub = Arc::new(Node { + future: UnsafeCell::new(None), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node<T>; + let inner = Arc::new(Inner { + parent: AtomicTask::new(), + head_readiness: AtomicPtr::new(stub_ptr as *mut _), + tail_readiness: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + FuturesUnordered { + len: 0, + head_all: ptr::null_mut(), + inner: inner, + } + } +} + +impl<T> FuturesUnordered<T> { + /// Returns the number of futures contained in the set. + /// + /// This represents the total number of in-flight futures. + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the set contains no futures + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Push a future into the set. + /// + /// This function submits the given future to the set for managing. This + /// function will not call `poll` on the submitted future. The caller must + /// ensure that `FuturesUnordered::poll` is called in order to receive task + /// notifications. + pub fn push(&mut self, future: T) { + let node = Arc::new(Node { + future: UnsafeCell::new(Some(future)), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Arc::downgrade(&self.inner), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.link(node); + + // We'll need to get the future "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // futures are ready. To do that we unconditionally enqueue it for + // polling here. + self.inner.enqueue(ptr); + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> IterMut<T> { + IterMut { + node: self.head_all, + len: self.len, + _marker: PhantomData + } + } + + fn release_node(&mut self, node: Arc<Node<T>>) { + // The future is done, try to reset the queued flag. This will prevent + // `notify` from doing any work in the future + let prev = node.queued.swap(true, SeqCst); + + // Drop the future, even if it hasn't finished yet. This is safe + // because we're dropping the future on the thread that owns + // `FuturesUnordered`, which correctly tracks T's lifetimes and such. + unsafe { + drop((*node.future.get()).take()); + } + + // If the queued flag was previously set then it means that this node + // is still in our internal mpsc queue. We then transfer ownership + // of our reference count to the mpsc queue, and it'll come along and + // free it later, noticing that the future is `None`. + // + // If, however, the queued flag was *not* set then we're safe to + // release our reference count on the internal node. The queued flag + // was set above so all future `enqueue` operations will not actually + // enqueue the node, so our node will never see the mpsc queue again. + // The node itself will be deallocated once all reference counts have + // been dropped by the various owning tasks elsewhere. + if prev { + mem::forget(node); + } + } + + /// Insert a new node into the internal linked list. + fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> { + let ptr = arc2ptr(node); + unsafe { + *(*ptr).next_all.get() = self.head_all; + if !self.head_all.is_null() { + *(*self.head_all).prev_all.get() = ptr; + } + } + + self.head_all = ptr; + self.len += 1; + return ptr + } + + /// Remove the node from the linked list tracking all nodes currently + /// managed by `FuturesUnordered`. + unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> { + let node = ptr2arc(node); + let next = *node.next_all.get(); + let prev = *node.prev_all.get(); + *node.next_all.get() = ptr::null_mut(); + *node.prev_all.get() = ptr::null_mut(); + + if !next.is_null() { + *(*next).prev_all.get() = prev; + } + + if !prev.is_null() { + *(*prev).next_all.get() = next; + } else { + self.head_all = next; + } + self.len -= 1; + return node + } +} + +impl<T> Stream for FuturesUnordered<T> + where T: Future +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + + // Keep track of how many child futures we have polled, + // in case we want to forcibly yield. + let mut polled = 0; + + // Ensure `parent` is correctly set. + self.inner.parent.register(); + + loop { + let node = match unsafe { self.inner.dequeue() } { + Dequeue::Empty => { + if self.is_empty() { + return Ok(Async::Ready(None)); + } else { + return Ok(Async::NotReady) + } + } + Dequeue::Inconsistent => { + // At this point, it may be worth yielding the thread & + // spinning a few times... but for now, just yield using the + // task system. + task::current().notify(); + return Ok(Async::NotReady); + } + Dequeue::Data(node) => node, + }; + + debug_assert!(node != self.inner.stub()); + + unsafe { + let mut future = match (*(*node).future.get()).take() { + Some(future) => future, + + // If the future has already gone away then we're just + // cleaning out this node. See the comment in + // `release_node` for more information, but we're basically + // just taking ownership of our reference count here. + None => { + let node = ptr2arc(node); + assert!((*node.next_all.get()).is_null()); + assert!((*node.prev_all.get()).is_null()); + continue + } + }; + + // Unset queued flag... this must be done before + // polling. This ensures that the future gets + // rescheduled if it is notified **during** a call + // to `poll`. + let prev = (*node).queued.swap(false, SeqCst); + assert!(prev); + + // We're going to need to be very careful if the `poll` + // function below panics. We need to (a) not leak memory and + // (b) ensure that we still don't have any use-after-frees. To + // manage this we do a few things: + // + // * This "bomb" here will call `release_node` if dropped + // abnormally. That way we'll be sure the memory management + // of the `node` is managed correctly. + // * The future was extracted above (taken ownership). That way + // if it panics we're guaranteed that the future is + // dropped on this thread and doesn't accidentally get + // dropped on a different thread (bad). + // * We unlink the node from our internal queue to preemptively + // assume it'll panic, in which case we'll want to discard it + // regardless. + struct Bomb<'a, T: 'a> { + queue: &'a mut FuturesUnordered<T>, + node: Option<Arc<Node<T>>>, + } + impl<'a, T> Drop for Bomb<'a, T> { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + self.queue.release_node(node); + } + } + } + let mut bomb = Bomb { + node: Some(self.unlink(node)), + queue: self, + }; + + // Poll the underlying future with the appropriate `notify` + // implementation. This is where a large bit of the unsafety + // starts to stem from internally. The `notify` instance itself + // is basically just our `Arc<Node<T>>` and tracks the mpsc + // queue of ready futures. + // + // Critically though `Node<T>` won't actually access `T`, the + // future, while it's floating around inside of `Task` + // instances. These structs will basically just use `T` to size + // the internal allocation, appropriately accessing fields and + // deallocating the node if need be. + let res = { + let notify = NodeToHandle(bomb.node.as_ref().unwrap()); + task_impl::with_notify(¬ify, 0, || { + future.poll() + }) + }; + polled += 1; + + let ret = match res { + Ok(Async::NotReady) => { + let node = bomb.node.take().unwrap(); + *node.future.get() = Some(future); + bomb.queue.link(node); + + if polled == yield_every { + // We have polled a large number of futures in a row without yielding. + // To ensure we do not starve other tasks waiting on the executor, + // we yield here, but immediately wake ourselves up to continue. + task_impl::current().notify(); + return Ok(Async::NotReady); + } + continue + } + Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))), + Err(e) => Err(e), + }; + return ret + } + } + } +} + +impl<T: Debug> Debug for FuturesUnordered<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "FuturesUnordered {{ ... }}") + } +} + +impl<T> Drop for FuturesUnordered<T> { + fn drop(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures associated + // with it. At the same time though there may be tons of `Task` handles + // flying around which contain `Node<T>` references inside them. We'll + // let those naturally get deallocated when the `Task` itself goes out + // of scope or gets notified. + unsafe { + while !self.head_all.is_null() { + let head = self.head_all; + let node = self.unlink(head); + self.release_node(node); + } + } + + // Note that at this point we could still have a bunch of nodes in the + // mpsc queue. None of those nodes, however, have futures associated + // with them so they're safe to destroy on any thread. At this point + // the `FuturesUnordered` struct, the owner of the one strong reference + // to `Inner<T>` will drop the strong reference. At that point + // whichever thread releases the strong refcount last (be it this + // thread or some other thread as part of an `upgrade`) will clear out + // the mpsc queue and free all remaining nodes. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. + } +} + +impl<F: Future> FromIterator<F> for FuturesUnordered<F> { + fn from_iter<T>(iter: T) -> Self + where T: IntoIterator<Item = F> + { + let mut new = FuturesUnordered::new(); + for future in iter.into_iter() { + new.push(future); + } + new + } +} + +#[derive(Debug)] +/// Mutable iterator over all futures in the unordered set. +pub struct IterMut<'a, F: 'a> { + node: *const Node<F>, + len: usize, + _marker: PhantomData<&'a mut FuturesUnordered<F>> +} + +impl<'a, F> Iterator for IterMut<'a, F> { + type Item = &'a mut F; + + fn next(&mut self) -> Option<&'a mut F> { + if self.node.is_null() { + return None; + } + unsafe { + let future = (*(*self.node).future.get()).as_mut().unwrap(); + let next = *(*self.node).next_all.get(); + self.node = next; + self.len -= 1; + return Some(future); + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.len, Some(self.len)) + } +} + +impl<'a, F> ExactSizeIterator for IterMut<'a, F> {} + +impl<T> Inner<T> { + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. + fn enqueue(&self, node: *const Node<T>) { + unsafe { + debug_assert!((*node).queued.load(Relaxed)); + + // This action does not require any coordination + (*node).next_readiness.store(ptr::null_mut(), Relaxed); + + // Note that these atomic orderings come from 1024cores + let node = node as *mut _; + let prev = self.head_readiness.swap(node, AcqRel); + (*prev).next_readiness.store(node, Release); + } + } + + /// The dequeue function from the 1024cores intrusive MPSC queue algorithm + /// + /// Note that this unsafe as it required mutual exclusion (only one thread + /// can call this) to be guaranteed elsewhere. + unsafe fn dequeue(&self) -> Dequeue<T> { + let mut tail = *self.tail_readiness.get(); + let mut next = (*tail).next_readiness.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return Dequeue::Empty; + } + + *self.tail_readiness.get() = next; + tail = next; + next = (*next).next_readiness.load(Acquire); + } + + if !next.is_null() { + *self.tail_readiness.get() = next; + debug_assert!(tail != self.stub()); + return Dequeue::Data(tail); + } + + if self.head_readiness.load(Acquire) as *const _ != tail { + return Dequeue::Inconsistent; + } + + self.enqueue(self.stub()); + + next = (*tail).next_readiness.load(Acquire); + + if !next.is_null() { + *self.tail_readiness.get() = next; + return Dequeue::Data(tail); + } + + Dequeue::Inconsistent + } + + fn stub(&self) -> *const Node<T> { + &*self.stub + } +} + +impl<T> Drop for Inner<T> { + fn drop(&mut self) { + // Once we're in the destructor for `Inner<T>` we need to clear out the + // mpsc queue of nodes if there's anything left in there. + // + // Note that each node has a strong reference count associated with it + // which is owned by the mpsc queue. All nodes should have had their + // futures dropped already by the `FuturesUnordered` destructor above, + // so we're just pulling out nodes and dropping their refcounts. + unsafe { + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(ptr2arc(ptr)), + } + } + } + } +} + +#[allow(missing_debug_implementations)] +struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>); + +impl<'a, T> Clone for NodeToHandle<'a, T> { + fn clone(&self) -> Self { + NodeToHandle(self.0) + } +} + +impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle { + fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle { + unsafe { + let ptr = handle.0.clone(); + let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr); + NotifyHandle::new(hide_lt(ptr)) + } + } +} + +struct ArcNode<T>(PhantomData<T>); + +// We should never touch `T` on any thread other than the one owning +// `FuturesUnordered`, so this should be a safe operation. +unsafe impl<T> Send for ArcNode<T> {} +unsafe impl<T> Sync for ArcNode<T> {} + +impl<T> Notify for ArcNode<T> { + fn notify(&self, _id: usize) { + unsafe { + let me: *const ArcNode<T> = self; + let me: *const *const ArcNode<T> = &me; + let me = me as *const Arc<Node<T>>; + Node::notify(&*me) + } + } +} + +unsafe impl<T> UnsafeNotify for ArcNode<T> { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcNode<T> = self; + let me: *const *const ArcNode<T> = &me; + let me = &*(me as *const Arc<Node<T>>); + NodeToHandle(me).into() + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcNode<T> = self; + let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>; + ptr::drop_in_place(me); + } +} + +unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify { + mem::transmute(p as *mut UnsafeNotify) +} + +impl<T> Node<T> { + fn notify(me: &Arc<Node<T>>) { + let inner = match me.queue.upgrade() { + Some(inner) => inner, + None => return, + }; + + // It's our job to notify the node that it's ready to get polled, + // meaning that we need to enqueue it into the readiness queue. To + // do this we flag that we're ready to be queued, and if successful + // we then do the literal queueing operation, ensuring that we're + // only queued once. + // + // Once the node is inserted we be sure to notify the parent task, + // as it'll want to come along and pick up our node now. + // + // Note that we don't change the reference count of the node here, + // we're just enqueueing the raw pointer. The `FuturesUnordered` + // implementation guarantees that if we set the `queued` flag true that + // there's a reference count held by the main `FuturesUnordered` queue + // still. + let prev = me.queued.swap(true, SeqCst); + if !prev { + inner.enqueue(&**me); + inner.parent.notify(); + } + } +} + +impl<T> Drop for Node<T> { + fn drop(&mut self) { + // Currently a `Node<T>` is sent across all threads for any lifetime, + // regardless of `T`. This means that for memory safety we can't + // actually touch `T` at any time except when we have a reference to the + // `FuturesUnordered` itself. + // + // Consequently it *should* be the case that we always drop futures from + // the `FuturesUnordered` instance, but this is a bomb in place to catch + // any bugs in that logic. + unsafe { + if (*self.future.get()).is_some() { + abort("future still here when dropping"); + } + } + } +} + +fn arc2ptr<T>(ptr: Arc<T>) -> *const T { + let addr = &*ptr as *const T; + mem::forget(ptr); + return addr +} + +unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> { + let anchor = mem::transmute::<usize, Arc<T>>(0x10); + let addr = &*anchor as *const T; + mem::forget(anchor); + let offset = addr as isize - 0x10; + mem::transmute::<isize, Arc<T>>(ptr as isize - offset) +} + +fn abort(s: &str) -> ! { + struct DoublePanic; + + impl Drop for DoublePanic { + fn drop(&mut self) { + panic!("panicking twice to abort the program"); + } + } + + let _bomb = DoublePanic; + panic!("{}", s); +} diff --git a/third_party/rust/futures-0.1.31/src/stream/inspect.rs b/third_party/rust/futures-0.1.31/src/stream/inspect.rs new file mode 100644 index 0000000000..fc8f7f4ea2 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/inspect.rs @@ -0,0 +1,84 @@ +use {Stream, Poll, Async}; + +/// Do something with the items of a stream, passing it on. +/// +/// This is created by the `Stream::inspect` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Inspect<S, F> where S: Stream { + stream: S, + inspect: F, +} + +pub fn new<S, F>(stream: S, f: F) -> Inspect<S, F> + where S: Stream, + F: FnMut(&S::Item) -> (), +{ + Inspect { + stream: stream, + inspect: f, + } +} + +impl<S: Stream, F> Inspect<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for Inspect<S, F> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F> Stream for Inspect<S, F> + where S: Stream, + F: FnMut(&S::Item), +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + match try_ready!(self.stream.poll()) { + Some(e) => { + (self.inspect)(&e); + Ok(Async::Ready(Some(e))) + } + None => Ok(Async::Ready(None)), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs b/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs new file mode 100644 index 0000000000..5c56a217ff --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/inspect_err.rs @@ -0,0 +1,81 @@ +use {Stream, Poll}; + +/// Do something with the error of a stream, passing it on. +/// +/// This is created by the `Stream::inspect_err` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct InspectErr<S, F> where S: Stream { + stream: S, + inspect: F, +} + +pub fn new<S, F>(stream: S, f: F) -> InspectErr<S, F> + where S: Stream, + F: FnMut(&S::Error) -> (), +{ + InspectErr { + stream: stream, + inspect: f, + } +} + +impl<S: Stream, F> InspectErr<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for InspectErr<S, F> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F> Stream for InspectErr<S, F> + where S: Stream, + F: FnMut(&S::Error), +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.stream.poll().map_err(|e| { + (self.inspect)(&e); + e + }) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/iter.rs b/third_party/rust/futures-0.1.31/src/stream/iter.rs new file mode 100644 index 0000000000..e0b9379353 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/iter.rs @@ -0,0 +1,46 @@ +#![deprecated(note = "implementation moved to `iter_ok` and `iter_result`")] +#![allow(deprecated)] + +use Poll; +use stream::{iter_result, IterResult, Stream}; + +/// A stream which is just a shim over an underlying instance of `Iterator`. +/// +/// This stream will never block and is always ready. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter<I>(IterResult<I>); + +/// Converts an `Iterator` over `Result`s into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter simply +/// always calls `iter.next()` and returns that. +/// +/// ```rust +/// use futures::*; +/// +/// let mut stream = stream::iter(vec![Ok(17), Err(false), Ok(19)]); +/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll()); +/// assert_eq!(Err(false), stream.poll()); +/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(None)), stream.poll()); +/// ``` +#[inline] +pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter> + where J: IntoIterator<Item=Result<T, E>>, +{ + Iter(iter_result(i)) +} + +impl<I, T, E> Stream for Iter<I> + where I: Iterator<Item=Result<T, E>>, +{ + type Item = T; + type Error = E; + + #[inline] + fn poll(&mut self) -> Poll<Option<T>, E> { + self.0.poll() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs b/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs new file mode 100644 index 0000000000..9c8d871399 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/iter_ok.rs @@ -0,0 +1,48 @@ +use core::marker; + +use {Async, Poll}; +use stream::Stream; + +/// A stream which is just a shim over an underlying instance of `Iterator`. +/// +/// This stream will never block and is always ready. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct IterOk<I, E> { + iter: I, + _marker: marker::PhantomData<fn() -> E>, +} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ```rust +/// use futures::*; +/// +/// let mut stream = stream::iter_ok::<_, ()>(vec![17, 19]); +/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(None)), stream.poll()); +/// ``` +pub fn iter_ok<I, E>(i: I) -> IterOk<I::IntoIter, E> + where I: IntoIterator, +{ + IterOk { + iter: i.into_iter(), + _marker: marker::PhantomData, + } +} + +impl<I, E> Stream for IterOk<I, E> + where I: Iterator, +{ + type Item = I::Item; + type Error = E; + + fn poll(&mut self) -> Poll<Option<I::Item>, E> { + Ok(Async::Ready(self.iter.next())) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/iter_result.rs b/third_party/rust/futures-0.1.31/src/stream/iter_result.rs new file mode 100644 index 0000000000..4eef5da08e --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/iter_result.rs @@ -0,0 +1,51 @@ +use {Async, Poll}; +use stream::Stream; + +/// A stream which is just a shim over an underlying instance of `Iterator`. +/// +/// This stream will never block and is always ready. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct IterResult<I> { + iter: I, +} + +/// Converts an `Iterator` over `Result`s into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter simply +/// always calls `iter.next()` and returns that. +/// +/// ```rust +/// use futures::*; +/// +/// let mut stream = stream::iter_result(vec![Ok(17), Err(false), Ok(19)]); +/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll()); +/// assert_eq!(Err(false), stream.poll()); +/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(None)), stream.poll()); +/// ``` +pub fn iter_result<J, T, E>(i: J) -> IterResult<J::IntoIter> +where + J: IntoIterator<Item = Result<T, E>>, +{ + IterResult { + iter: i.into_iter(), + } +} + +impl<I, T, E> Stream for IterResult<I> +where + I: Iterator<Item = Result<T, E>>, +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<T>, E> { + match self.iter.next() { + Some(Ok(e)) => Ok(Async::Ready(Some(e))), + Some(Err(e)) => Err(e), + None => Ok(Async::Ready(None)), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/map.rs b/third_party/rust/futures-0.1.31/src/stream/map.rs new file mode 100644 index 0000000000..702e980b3f --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/map.rs @@ -0,0 +1,81 @@ +use {Async, Poll}; +use stream::Stream; + +/// A stream combinator which will change the type of a stream from one +/// type to another. +/// +/// This is produced by the `Stream::map` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Map<S, F> { + stream: S, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> Map<S, F> + where S: Stream, + F: FnMut(S::Item) -> U, +{ + Map { + stream: s, + f: f, + } +} + +impl<S, F> Map<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for Map<S, F> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for Map<S, F> + where S: Stream, + F: FnMut(S::Item) -> U, +{ + type Item = U; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<U>, S::Error> { + let option = try_ready!(self.stream.poll()); + Ok(Async::Ready(option.map(&mut self.f))) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/map_err.rs b/third_party/rust/futures-0.1.31/src/stream/map_err.rs new file mode 100644 index 0000000000..8d1c0fc083 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/map_err.rs @@ -0,0 +1,80 @@ +use Poll; +use stream::Stream; + +/// A stream combinator which will change the error type of a stream from one +/// type to another. +/// +/// This is produced by the `Stream::map_err` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct MapErr<S, F> { + stream: S, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> MapErr<S, F> + where S: Stream, + F: FnMut(S::Error) -> U, +{ + MapErr { + stream: s, + f: f, + } +} + +impl<S, F> MapErr<S, F> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F> ::sink::Sink for MapErr<S, F> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for MapErr<S, F> + where S: Stream, + F: FnMut(S::Error) -> U, +{ + type Item = S::Item; + type Error = U; + + fn poll(&mut self) -> Poll<Option<S::Item>, U> { + self.stream.poll().map_err(&mut self.f) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/merge.rs b/third_party/rust/futures-0.1.31/src/stream/merge.rs new file mode 100644 index 0000000000..af7505e69a --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/merge.rs @@ -0,0 +1,82 @@ +#![deprecated(note = "functionality provided by `select` now")] +#![allow(deprecated)] + +use {Poll, Async}; +use stream::{Stream, Fuse}; + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from one or both of the underlying +/// streams as they become available. Errors, however, are not merged: you +/// get at most one error at a time. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Merge<S1, S2: Stream> { + stream1: Fuse<S1>, + stream2: Fuse<S2>, + queued_error: Option<S2::Error>, +} + +pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + Merge { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + queued_error: None, + } +} + +/// An item returned from a merge stream, which represents an item from one or +/// both of the underlying streams. +#[derive(Debug)] +pub enum MergedItem<I1, I2> { + /// An item from the first stream + First(I1), + /// An item from the second stream + Second(I2), + /// Items from both streams + Both(I1, I2), +} + +impl<S1, S2> Stream for Merge<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + type Item = MergedItem<S1::Item, S2::Item>; + type Error = S1::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if let Some(e) = self.queued_error.take() { + return Err(e) + } + + match self.stream1.poll()? { + Async::NotReady => { + match try_ready!(self.stream2.poll()) { + Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))), + None => Ok(Async::NotReady), + } + } + Async::Ready(None) => { + match try_ready!(self.stream2.poll()) { + Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))), + None => Ok(Async::Ready(None)), + } + } + Async::Ready(Some(item1)) => { + match self.stream2.poll() { + Err(e) => { + self.queued_error = Some(e); + Ok(Async::Ready(Some(MergedItem::First(item1)))) + } + Ok(Async::NotReady) | Ok(Async::Ready(None)) => { + Ok(Async::Ready(Some(MergedItem::First(item1)))) + } + Ok(Async::Ready(Some(item2))) => { + Ok(Async::Ready(Some(MergedItem::Both(item1, item2)))) + } + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/mod.rs b/third_party/rust/futures-0.1.31/src/stream/mod.rs new file mode 100644 index 0000000000..2d90362470 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/mod.rs @@ -0,0 +1,1146 @@ +//! Asynchronous streams +//! +//! This module contains the `Stream` trait and a number of adaptors for this +//! trait. This trait is very similar to the `Iterator` trait in the standard +//! library except that it expresses the concept of blocking as well. A stream +//! here is a sequential sequence of values which may take some amount of time +//! in between to produce. +//! +//! A stream may request that it is blocked between values while the next value +//! is calculated, and provides a way to get notified once the next value is +//! ready as well. +//! +//! You can find more information/tutorials about streams [online at +//! https://tokio.rs][online] +//! +//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ + +use {IntoFuture, Poll}; + +mod iter; +#[allow(deprecated)] +pub use self::iter::{iter, Iter}; +#[cfg(feature = "with-deprecated")] +#[allow(deprecated)] +pub use self::Iter as IterStream; +mod iter_ok; +pub use self::iter_ok::{iter_ok, IterOk}; +mod iter_result; +pub use self::iter_result::{iter_result, IterResult}; + +mod repeat; +pub use self::repeat::{repeat, Repeat}; + +mod and_then; +mod chain; +mod concat; +mod empty; +mod filter; +mod filter_map; +mod flatten; +mod fold; +mod for_each; +mod from_err; +mod fuse; +mod future; +mod inspect; +mod inspect_err; +mod map; +mod map_err; +mod merge; +mod once; +mod or_else; +mod peek; +mod poll_fn; +mod select; +mod skip; +mod skip_while; +mod take; +mod take_while; +mod then; +mod unfold; +mod zip; +mod forward; +pub use self::and_then::AndThen; +pub use self::chain::Chain; +#[allow(deprecated)] +pub use self::concat::Concat; +pub use self::concat::Concat2; +pub use self::empty::{Empty, empty}; +pub use self::filter::Filter; +pub use self::filter_map::FilterMap; +pub use self::flatten::Flatten; +pub use self::fold::Fold; +pub use self::for_each::ForEach; +pub use self::from_err::FromErr; +pub use self::fuse::Fuse; +pub use self::future::StreamFuture; +pub use self::inspect::Inspect; +pub use self::inspect_err::InspectErr; +pub use self::map::Map; +pub use self::map_err::MapErr; +#[allow(deprecated)] +pub use self::merge::{Merge, MergedItem}; +pub use self::once::{Once, once}; +pub use self::or_else::OrElse; +pub use self::peek::Peekable; +pub use self::poll_fn::{poll_fn, PollFn}; +pub use self::select::Select; +pub use self::skip::Skip; +pub use self::skip_while::SkipWhile; +pub use self::take::Take; +pub use self::take_while::TakeWhile; +pub use self::then::Then; +pub use self::unfold::{Unfold, unfold}; +pub use self::zip::Zip; +pub use self::forward::Forward; +use sink::{Sink}; + +if_std! { + use std; + + mod buffered; + mod buffer_unordered; + mod catch_unwind; + mod chunks; + mod collect; + mod wait; + mod channel; + mod split; + pub mod futures_unordered; + mod futures_ordered; + pub use self::buffered::Buffered; + pub use self::buffer_unordered::BufferUnordered; + pub use self::catch_unwind::CatchUnwind; + pub use self::chunks::Chunks; + pub use self::collect::Collect; + pub use self::wait::Wait; + pub use self::split::{SplitStream, SplitSink, ReuniteError}; + pub use self::futures_unordered::FuturesUnordered; + pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; + + #[doc(hidden)] + #[cfg(feature = "with-deprecated")] + #[allow(deprecated)] + pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError}; + + /// A type alias for `Box<Stream + Send>` + #[doc(hidden)] + #[deprecated(note = "removed without replacement, recommended to use a \ + local extension trait or function if needed, more \ + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] + pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>; + + impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + (**self).poll() + } + } +} + +/// A stream of values, not all of which may have been produced yet. +/// +/// `Stream` is a trait to represent any source of sequential events or items +/// which acts like an iterator but long periods of time may pass between +/// items. Like `Future` the methods of `Stream` never block and it is thus +/// suitable for programming in an asynchronous fashion. This trait is very +/// similar to the `Iterator` trait in the standard library where `Some` is +/// used to signal elements of the stream and `None` is used to indicate that +/// the stream is finished. +/// +/// Like futures a stream has basic combinators to transform the stream, perform +/// more work on each item, etc. +/// +/// You can find more information/tutorials about streams [online at +/// https://tokio.rs][online] +/// +/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ +/// +/// # Streams as Futures +/// +/// Any instance of `Stream` can also be viewed as a `Future` where the resolved +/// value is the next item in the stream along with the rest of the stream. The +/// `into_future` adaptor can be used here to convert any stream into a future +/// for use with other future methods like `join` and `select`. +/// +/// # Errors +/// +/// Streams, like futures, can also model errors in their computation. All +/// streams have an associated `Error` type like with futures. Currently as of +/// the 0.1 release of this library an error on a stream **does not terminate +/// the stream**. That is, after one error is received, another error may be +/// received from the same stream (it's valid to keep polling). +/// +/// This property of streams, however, is [being considered] for change in 0.2 +/// where an error on a stream is similar to `None`, it terminates the stream +/// entirely. If one of these use cases suits you perfectly and not the other, +/// please feel welcome to comment on [the issue][being considered]! +/// +/// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206 +#[must_use = "streams do nothing unless polled"] +pub trait Stream { + /// The type of item this stream will yield on success. + type Item; + + /// The type of error this stream may generate. + type Error; + + /// Attempt to pull out the next value of this stream, returning `None` if + /// the stream is finished. + /// + /// This method, like `Future::poll`, is the sole method of pulling out a + /// value from a stream. This method must also be run within the context of + /// a task typically and implementors of this trait must ensure that + /// implementations of this method do not block, as it may cause consumers + /// to behave badly. + /// + /// # Return value + /// + /// If `NotReady` is returned then this stream's next value is not ready + /// yet and implementations will ensure that the current task will be + /// notified when the next value may be ready. If `Some` is returned then + /// the returned value represents the next value on the stream. `Err` + /// indicates an error happened, while `Ok` indicates whether there was a + /// new item on the stream or whether the stream has terminated. + /// + /// # Panics + /// + /// Once a stream is finished, that is `Ready(None)` has been returned, + /// further calls to `poll` may result in a panic or other "bad behavior". + /// If this is difficult to guard against then the `fuse` adapter can be + /// used to ensure that `poll` always has well-defined semantics. + // TODO: more here + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; + + // TODO: should there also be a method like `poll` but doesn't return an + // item? basically just says "please make more progress internally" + // seems crucial for buffering to actually make any sense. + + /// Creates an iterator which blocks the current thread until each item of + /// this stream is resolved. + /// + /// This method will consume ownership of this stream, returning an + /// implementation of a standard iterator. This iterator will *block the + /// current thread* on each call to `next` if the item in the stream isn't + /// ready yet. + /// + /// > **Note:** This method is not appropriate to call on event loops or + /// > similar I/O situations because it will prevent the event + /// > loop from making progress (this blocks the thread). This + /// > method should only be called when it's guaranteed that the + /// > blocking work associated with this stream will be completed + /// > by another thread. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// The returned iterator does not attempt to catch panics. If the `poll` + /// function panics, panics will be propagated to the caller of `next`. + #[cfg(feature = "use_std")] + fn wait(self) -> Wait<Self> + where Self: Sized + { + wait::new(self) + } + + /// Convenience function for turning this stream into a trait object. + /// + /// This simply avoids the need to write `Box::new` and can often help with + /// type inference as well by always returning a trait object. Note that + /// this method requires the `Send` bound and returns a `BoxStream`, which + /// also encodes this. If you'd like to create a `Box<Stream>` without the + /// `Send` bound, then the `Box::new` function can be used instead. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// use futures::stream::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel(1); + /// let a: BoxStream<i32, ()> = rx.boxed(); + /// ``` + #[cfg(feature = "use_std")] + #[doc(hidden)] + #[deprecated(note = "removed without replacement, recommended to use a \ + local extension trait or function if needed, more \ + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] + #[allow(deprecated)] + fn boxed(self) -> BoxStream<Self::Item, Self::Error> + where Self: Sized + Send + 'static, + { + ::std::boxed::Box::new(self) + } + + /// Converts this stream into a `Future`. + /// + /// A stream can be viewed as a future which will resolve to a pair containing + /// the next element of the stream plus the remaining stream. If the stream + /// terminates, then the next element is `None` and the remaining stream is + /// still passed back, to allow reclamation of its resources. + /// + /// The returned future can be used to compose streams and futures together by + /// placing everything into the "world of futures". + fn into_future(self) -> StreamFuture<Self> + where Self: Sized + { + future::new(self) + } + + /// Converts a stream of type `T` to a stream of type `U`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the callback will be executed inline with + /// calls to `poll`. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let rx = rx.map(|x| x + 3); + /// ``` + fn map<U, F>(self, f: F) -> Map<Self, F> + where F: FnMut(Self::Item) -> U, + Self: Sized + { + map::new(self, f) + } + + /// Converts a stream of error type `T` to a stream of error type `U`. + /// + /// The provided closure is executed over all errors of this stream as + /// they are made available, and the callback will be executed inline with + /// calls to `poll`. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `map_err` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let rx = rx.map_err(|()| 3); + /// ``` + fn map_err<U, F>(self, f: F) -> MapErr<Self, F> + where F: FnMut(Self::Error) -> U, + Self: Sized + { + map_err::new(self, f) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate will + /// be run against them. If the predicate returns `true` then the stream + /// will yield the value, but if the predicate returns `false` then the + /// value will be discarded and the next value will be produced. + /// + /// All errors are passed through without filtering in this combinator. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `filter` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let evens = rx.filter(|x| x % 2 == 0); + /// ``` + fn filter<F>(self, f: F) -> Filter<Self, F> + where F: FnMut(&Self::Item) -> bool, + Self: Sized + { + filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate returns `Some(e)` then the stream will + /// yield the value `e`, but if the predicate returns `None` then the next + /// value will be produced. + /// + /// All errors are passed through without filtering in this combinator. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `filter_map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let evens_plus_one = rx.filter_map(|x| { + /// if x % 0 == 2 { + /// Some(x + 1) + /// } else { + /// None + /// } + /// }); + /// ``` + fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F> + where F: FnMut(Self::Item) -> Option<B>, + Self: Sized + { + filter_map::new(self, f) + } + + /// Chain on a computation for when a value is ready, passing the resulting + /// item to the provided closure `f`. + /// + /// This function can be used to ensure a computation runs regardless of + /// the next value on the stream. The closure provided will be yielded a + /// `Result` once a value is ready, and the returned future will then be run + /// to completion to produce the next value on this stream. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// + /// let rx = rx.then(|result| { + /// match result { + /// Ok(e) => Ok(e + 3), + /// Err(()) => Err(4), + /// } + /// }); + /// ``` + fn then<F, U>(self, f: F) -> Then<Self, F, U> + where F: FnMut(Result<Self::Item, Self::Error>) -> U, + U: IntoFuture, + Self: Sized + { + then::new(self, f) + } + + /// Chain on a computation for when a value is ready, passing the successful + /// results to the provided closure `f`. + /// + /// This function can be used to run a unit of work when the next successful + /// value on a stream is ready. The closure provided will be yielded a value + /// when ready, and the returned future will then be run to completion to + /// produce the next value on this stream. + /// + /// Any errors produced by this stream will not be passed to the closure, + /// and will be passed through. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + /// + /// To process the entire stream and return a single future representing + /// success or error, use `for_each` instead. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// + /// let rx = rx.and_then(|result| { + /// if result % 2 == 0 { + /// Ok(result) + /// } else { + /// Err(()) + /// } + /// }); + /// ``` + fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U> + where F: FnMut(Self::Item) -> U, + U: IntoFuture<Error = Self::Error>, + Self: Sized + { + and_then::new(self, f) + } + + /// Chain on a computation for when an error happens, passing the + /// erroneous result to the provided closure `f`. + /// + /// This function can be used to run a unit of work and attempt to recover from + /// an error if one happens. The closure provided will be yielded an error + /// when one appears, and the returned future will then be run to completion + /// to produce the next value on this stream. + /// + /// Any successful values produced by this stream will not be passed to the + /// closure, and will be passed through. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U> + where F: FnMut(Self::Error) -> U, + U: IntoFuture<Item = Self::Item>, + Self: Sized + { + or_else::new(self, f) + } + + /// Collect all of the values of this stream into a vector, returning a + /// future representing the result of that computation. + /// + /// This combinator will collect all successful results of this stream and + /// collect them into a `Vec<Self::Item>`. If an error happens then all + /// collected elements will be dropped and the error will be returned. + /// + /// The returned future will be resolved whenever an error happens or when + /// the stream returns `Ok(None)`. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(|| { + /// for i in (0..5).rev() { + /// tx = tx.send(i + 1).wait().unwrap(); + /// } + /// }); + /// + /// let mut result = rx.collect(); + /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1])); + /// ``` + #[cfg(feature = "use_std")] + fn collect(self) -> Collect<Self> + where Self: Sized + { + collect::new(self) + } + + /// Concatenate all results of a stream into a single extendable + /// destination, returning a future representing the end result. + /// + /// This combinator will extend the first item with the contents + /// of all the successful results of the stream. If the stream is + /// empty, the default value will be returned. If an error occurs, + /// all the results will be dropped and the error will be returned. + /// + /// The name `concat2` is an intermediate measure until the release of + /// futures 0.2, at which point it will be renamed back to `concat`. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(move || { + /// for i in (0..3).rev() { + /// let n = i * 3; + /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap(); + /// } + /// }); + /// let result = rx.concat2(); + /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); + /// ``` + fn concat2(self) -> Concat2<Self> + where Self: Sized, + Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, + { + concat::new2(self) + } + + /// Concatenate all results of a stream into a single extendable + /// destination, returning a future representing the end result. + /// + /// This combinator will extend the first item with the contents + /// of all the successful results of the stream. If an error occurs, + /// all the results will be dropped and the error will be returned. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(move || { + /// for i in (0..3).rev() { + /// let n = i * 3; + /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap(); + /// } + /// }); + /// let result = rx.concat(); + /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); + /// ``` + /// + /// # Panics + /// + /// It's important to note that this function will panic if the stream + /// is empty, which is the reason for its deprecation. + #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")] + #[allow(deprecated)] + fn concat(self) -> Concat<Self> + where Self: Sized, + Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator, + { + concat::new(self) + } + + /// Execute an accumulating computation over a stream, collecting all the + /// values into one final result. + /// + /// This combinator will collect all successful results of this stream + /// according to the closure provided. The initial state is also provided to + /// this method and then is returned again by each execution of the closure. + /// Once the entire stream has been exhausted the returned future will + /// resolve to this value. + /// + /// If an error happens then collected state will be dropped and the error + /// will be returned. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::stream; + /// use futures::future; + /// + /// let number_stream = stream::iter_ok::<_, ()>(0..6); + /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x)); + /// assert_eq!(sum.wait(), Ok(15)); + /// ``` + fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T> + where F: FnMut(T, Self::Item) -> Fut, + Fut: IntoFuture<Item = T>, + Self::Error: From<Fut::Error>, + Self: Sized + { + fold::new(self, f, init) + } + + /// Flattens a stream of streams into just one continuous stream. + /// + /// If this stream's elements are themselves streams then this combinator + /// will flatten out the entire stream to one long chain of elements. Any + /// errors are passed through without looking at them, but otherwise each + /// individual stream will get exhausted before moving on to the next. + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (tx1, rx1) = mpsc::channel::<i32>(1); + /// let (tx2, rx2) = mpsc::channel::<i32>(1); + /// let (tx3, rx3) = mpsc::channel(1); + /// + /// thread::spawn(|| { + /// tx1.send(1).wait().unwrap() + /// .send(2).wait().unwrap(); + /// }); + /// thread::spawn(|| { + /// tx2.send(3).wait().unwrap() + /// .send(4).wait().unwrap(); + /// }); + /// thread::spawn(|| { + /// tx3.send(rx1).wait().unwrap() + /// .send(rx2).wait().unwrap(); + /// }); + /// + /// let mut result = rx3.flatten().collect(); + /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4])); + /// ``` + fn flatten(self) -> Flatten<Self> + where Self::Item: Stream, + <Self::Item as Stream>::Error: From<Self::Error>, + Self: Sized + { + flatten::new(self) + } + + /// Skip elements on this stream while the predicate provided resolves to + /// `true`. + /// + /// This function, like `Iterator::skip_while`, will skip elements on the + /// stream until the `predicate` resolves to `false`. Once one element + /// returns false all future elements will be returned from the underlying + /// stream. + fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R> + where P: FnMut(&Self::Item) -> R, + R: IntoFuture<Item=bool, Error=Self::Error>, + Self: Sized + { + skip_while::new(self, pred) + } + + /// Take elements from this stream while the predicate provided resolves to + /// `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the `predicate` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R> + where P: FnMut(&Self::Item) -> R, + R: IntoFuture<Item=bool, Error=Self::Error>, + Self: Sized + { + take_while::new(self, pred) + } + + /// Runs this stream to completion, executing the provided closure for each + /// element on the stream. + /// + /// The closure provided will be called for each item this stream resolves + /// to successfully, producing a future. That future will then be executed + /// to completion before moving on to the next item. + /// + /// The returned value is a `Future` where the `Item` type is `()` and + /// errors are otherwise threaded through. Any error on the stream or in the + /// closure will cause iteration to be halted immediately and the future + /// will resolve to that error. + /// + /// To process each item in the stream and produce another stream instead + /// of a single future, use `and_then` instead. + fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U> + where F: FnMut(Self::Item) -> U, + U: IntoFuture<Item=(), Error = Self::Error>, + Self: Sized + { + for_each::new(self, f) + } + + /// Map this stream's error to any error implementing `From` for + /// this stream's `Error`, returning a new stream. + /// + /// This function does for streams what `try!` does for `Result`, + /// by letting the compiler infer the type of the resulting error. + /// Just as `map_err` above, this is useful for example to ensure + /// that streams have the same error type when used with + /// combinators. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E> + where Self: Sized, + { + from_err::new(self) + } + + /// Creates a new stream of at most `amt` items of the underlying stream. + /// + /// Once `amt` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Errors + /// + /// Any errors yielded from underlying stream, before the desired amount of + /// items is reached, are passed through and do not affect the total number + /// of items taken. + fn take(self, amt: u64) -> Take<Self> + where Self: Sized + { + take::new(self, amt) + } + + /// Creates a new stream which skips `amt` items of the underlying stream. + /// + /// Once `amt` items have been skipped from this stream then it will always + /// return the remaining items on this stream. + /// + /// # Errors + /// + /// All errors yielded from underlying stream are passed through and do not + /// affect the total number of items skipped. + fn skip(self, amt: u64) -> Skip<Self> + where Self: Sized + { + skip::new(self, amt) + } + + /// Fuse a stream such that `poll` will never again be called once it has + /// finished. + /// + /// Currently once a stream has returned `None` from `poll` any further + /// calls could exhibit bad behavior such as block forever, panic, never + /// return, etc. If it is known that `poll` may be called after stream has + /// already finished, then this method can be used to ensure that it has + /// defined semantics. + /// + /// Once a stream has been `fuse`d and it finishes, then it will forever + /// return `None` from `poll`. This, unlike for the traits `poll` method, + /// is guaranteed. + /// + /// Also note that as soon as this stream returns `None` it will be dropped + /// to reclaim resources associated with it. + fn fuse(self) -> Fuse<Self> + where Self: Sized + { + fuse::new(self) + } + + /// Borrows a stream, rather than consuming it. + /// + /// This is useful to allow applying stream adaptors while still retaining + /// ownership of the original stream. + /// + /// ``` + /// use futures::prelude::*; + /// use futures::stream; + /// use futures::future; + /// + /// let mut stream = stream::iter_ok::<_, ()>(1..5); + /// + /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait(); + /// assert_eq!(sum, Ok(3)); + /// + /// // You can use the stream again + /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait(); + /// assert_eq!(sum, Ok(7)); + /// ``` + fn by_ref(&mut self) -> &mut Self + where Self: Sized + { + self + } + + /// Catches unwinding panics while polling the stream. + /// + /// Caught panic (if any) will be the last element of the resulting stream. + /// + /// In general, panics within a stream can propagate all the way out to the + /// task level. This combinator makes it possible to halt unwinding within + /// the stream itself. It's most commonly used within task executors. This + /// method should not be used for error handling. + /// + /// Note that this method requires the `UnwindSafe` bound from the standard + /// library. This isn't always applied automatically, and the standard + /// library provides an `AssertUnwindSafe` wrapper type to apply it + /// after-the fact. To assist using this method, the `Stream` trait is also + /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ```rust + /// use futures::prelude::*; + /// use futures::stream; + /// + /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + /// // panic on second element + /// let stream_panicking = stream.map(|o| o.unwrap()); + /// let mut iter = stream_panicking.catch_unwind().wait(); + /// + /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + /// assert!(iter.next().unwrap().is_err()); + /// assert!(iter.next().is_none()); + /// ``` + #[cfg(feature = "use_std")] + fn catch_unwind(self) -> CatchUnwind<Self> + where Self: Sized + std::panic::UnwindSafe + { + catch_unwind::new(self) + } + + /// An adaptor for creating a buffered list of pending futures. + /// + /// If this stream's item can be converted into a future, then this adaptor + /// will buffer up to at most `amt` futures and then return results in the + /// same order as the underlying stream. No more than `amt` futures will be + /// buffered at any point in time, and less than `amt` may also be buffered + /// depending on the state of each future. + /// + /// The returned stream will be a stream of each future's result, with + /// errors passed through whenever they occur. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn buffered(self, amt: usize) -> Buffered<Self> + where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, + Self: Sized + { + buffered::new(self, amt) + } + + /// An adaptor for creating a buffered list of pending futures (unordered). + /// + /// If this stream's item can be converted into a future, then this adaptor + /// will buffer up to `amt` futures and then return results in the order + /// in which they complete. No more than `amt` futures will be buffered at + /// any point in time, and less than `amt` may also be buffered depending on + /// the state of each future. + /// + /// The returned stream will be a stream of each future's result, with + /// errors passed through whenever they occur. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self> + where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, + Self: Sized + { + buffer_unordered::new(self, amt) + } + + /// An adapter for merging the output of two streams. + /// + /// The merged stream produces items from one or both of the underlying + /// streams as they become available. Errors, however, are not merged: you + /// get at most one error at a time. + #[deprecated(note = "functionality provided by `select` now")] + #[allow(deprecated)] + fn merge<S>(self, other: S) -> Merge<Self, S> + where S: Stream<Error = Self::Error>, + Self: Sized, + { + merge::new(self, other) + } + + /// An adapter for zipping two streams together. + /// + /// The zipped stream waits for both streams to produce an item, and then + /// returns that pair. If an error happens, then that error will be returned + /// immediately. If either stream ends then the zipped stream will also end. + fn zip<S>(self, other: S) -> Zip<Self, S> + where S: Stream<Error = Self::Error>, + Self: Sized, + { + zip::new(self, other) + } + + /// Adapter for chaining two stream. + /// + /// The resulting stream emits elements from the first stream, and when + /// first stream reaches the end, emits the elements from the second stream. + /// + /// ```rust + /// use futures::prelude::*; + /// use futures::stream; + /// + /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]); + /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]); + /// let mut chain = stream1.chain(stream2).wait(); + /// + /// assert_eq!(Some(Ok(10)), chain.next()); + /// assert_eq!(Some(Err(false)), chain.next()); + /// assert_eq!(Some(Err(true)), chain.next()); + /// assert_eq!(Some(Ok(20)), chain.next()); + /// assert_eq!(None, chain.next()); + /// ``` + fn chain<S>(self, other: S) -> Chain<Self, S> + where S: Stream<Item = Self::Item, Error = Self::Error>, + Self: Sized + { + chain::new(self, other) + } + + /// Creates a new stream which exposes a `peek` method. + /// + /// Calling `peek` returns a reference to the next item in the stream. + fn peekable(self) -> Peekable<Self> + where Self: Sized + { + peek::new(self) + } + + /// An adaptor for chunking up items of the stream inside a vector. + /// + /// This combinator will attempt to pull items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// Errors are passed through the stream unbuffered. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// This method will panic of `capacity` is zero. + #[cfg(feature = "use_std")] + fn chunks(self, capacity: usize) -> Chunks<Self> + where Self: Sized + { + chunks::new(self, capacity) + } + + /// Creates a stream that selects the next element from either this stream + /// or the provided one, whichever is ready first. + /// + /// This combinator will attempt to pull items from both streams. Each + /// stream will be polled in a round-robin fashion, and whenever a stream is + /// ready to yield an item that item is yielded. + /// + /// The `select` function is similar to `merge` except that it requires both + /// streams to have the same item and error types. + /// + /// Error are passed through from either stream. + fn select<S>(self, other: S) -> Select<Self, S> + where S: Stream<Item = Self::Item, Error = Self::Error>, + Self: Sized, + { + select::new(self, other) + } + + /// A future that completes after the given stream has been fully processed + /// into the sink, including flushing. + /// + /// This future will drive the stream to keep producing items until it is + /// exhausted, sending each item to the sink. It will complete once both the + /// stream is exhausted, and the sink has fully processed received item, + /// flushed successfully, and closed successfully. + /// + /// Doing `stream.forward(sink)` is roughly equivalent to + /// `sink.send_all(stream)`. The returned future will exhaust all items from + /// `self`, sending them all to `sink`. Furthermore the `sink` will be + /// closed and flushed. + /// + /// On completion, the pair `(stream, sink)` is returned. + fn forward<S>(self, sink: S) -> Forward<Self, S> + where S: Sink<SinkItem = Self::Item>, + Self::Error: From<S::SinkError>, + Self: Sized + { + forward::new(self, sink) + } + + /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` + /// objects. + /// + /// This can be useful when you want to split ownership between tasks, or + /// allow direct interaction between the two objects (e.g. via + /// `Sink::send_all`). + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn split(self) -> (SplitSink<Self>, SplitStream<Self>) + where Self: super::sink::Sink + Sized + { + split::split(self) + } + + /// Do something with each item of this stream, afterwards passing it on. + /// + /// This is similar to the `Iterator::inspect` method in the standard + /// library where it allows easily inspecting each value as it passes + /// through the stream, for example to debug what's going on. + fn inspect<F>(self, f: F) -> Inspect<Self, F> + where F: FnMut(&Self::Item), + Self: Sized, + { + inspect::new(self, f) + } + + /// Do something with the error of this stream, afterwards passing it on. + /// + /// This is similar to the `Stream::inspect` method where it allows + /// easily inspecting the error as it passes through the stream, for + /// example to debug what's going on. + fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> + where F: FnMut(&Self::Error), + Self: Sized, + { + inspect_err::new(self, f) + } +} + +impl<'a, S: ?Sized + Stream> Stream for &'a mut S { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + (**self).poll() + } +} + +/// Converts a list of futures into a `Stream` of results from the futures. +/// +/// This function will take an list of futures (e.g. a vector, an iterator, +/// etc), and return a stream. The stream will yield items as they become +/// available on the futures internally, in the order that they become +/// available. This function is similar to `buffer_unordered` in that it may +/// return items in a different order than in the list specified. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +#[cfg(feature = "use_std")] +pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future> + where I: IntoIterator, + I::Item: IntoFuture +{ + let mut set = FuturesUnordered::new(); + + for future in futures { + set.push(future.into_future()); + } + + return set +} diff --git a/third_party/rust/futures-0.1.31/src/stream/once.rs b/third_party/rust/futures-0.1.31/src/stream/once.rs new file mode 100644 index 0000000000..24fb327bd6 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/once.rs @@ -0,0 +1,35 @@ +use {Poll, Async}; +use stream::Stream; + +/// A stream which emits single element and then EOF. +/// +/// This stream will never block and is always ready. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Once<T, E>(Option<Result<T, E>>); + +/// Creates a stream of single element +/// +/// ```rust +/// use futures::*; +/// +/// let mut stream = stream::once::<(), _>(Err(17)); +/// assert_eq!(Err(17), stream.poll()); +/// assert_eq!(Ok(Async::Ready(None)), stream.poll()); +/// ``` +pub fn once<T, E>(item: Result<T, E>) -> Once<T, E> { + Once(Some(item)) +} + +impl<T, E> Stream for Once<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<T>, E> { + match self.0.take() { + Some(Ok(e)) => Ok(Async::Ready(Some(e))), + Some(Err(e)) => Err(e), + None => Ok(Async::Ready(None)), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/or_else.rs b/third_party/rust/futures-0.1.31/src/stream/or_else.rs new file mode 100644 index 0000000000..2d15fa2b70 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/or_else.rs @@ -0,0 +1,80 @@ +use {IntoFuture, Future, Poll, Async}; +use stream::Stream; + +/// A stream combinator which chains a computation onto errors produced by a +/// stream. +/// +/// This structure is produced by the `Stream::or_else` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct OrElse<S, F, U> + where U: IntoFuture, +{ + stream: S, + future: Option<U::Future>, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> OrElse<S, F, U> + where S: Stream, + F: FnMut(S::Error) -> U, + U: IntoFuture<Item=S::Item>, +{ + OrElse { + stream: s, + future: None, + f: f, + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F, U> ::sink::Sink for OrElse<S, F, U> + where S: ::sink::Sink, U: IntoFuture +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for OrElse<S, F, U> + where S: Stream, + F: FnMut(S::Error) -> U, + U: IntoFuture<Item=S::Item>, +{ + type Item = S::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, U::Error> { + if self.future.is_none() { + let item = match self.stream.poll() { + Ok(Async::Ready(e)) => return Ok(Async::Ready(e)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => e, + }; + self.future = Some((self.f)(item).into_future()); + } + assert!(self.future.is_some()); + match self.future.as_mut().unwrap().poll() { + Ok(Async::Ready(e)) => { + self.future = None; + Ok(Async::Ready(Some(e))) + } + Err(e) => { + self.future = None; + Err(e) + } + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/peek.rs b/third_party/rust/futures-0.1.31/src/stream/peek.rs new file mode 100644 index 0000000000..96e657663b --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/peek.rs @@ -0,0 +1,74 @@ +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// A `Stream` that implements a `peek` method. +/// +/// The `peek` method can be used to retrieve a reference +/// to the next `Stream::Item` if available. A subsequent +/// call to `poll` will return the owned item. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Peekable<S: Stream> { + stream: Fuse<S>, + peeked: Option<S::Item>, +} + + +pub fn new<S: Stream>(stream: S) -> Peekable<S> { + Peekable { + stream: stream.fuse(), + peeked: None + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Peekable<S> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S: Stream> Stream for Peekable<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if let Some(item) = self.peeked.take() { + return Ok(Async::Ready(Some(item))) + } + self.stream.poll() + } +} + + +impl<S: Stream> Peekable<S> { + /// Peek retrieves a reference to the next item in the stream. + /// + /// This method polls the underlying stream and return either a reference + /// to the next item if the stream is ready or passes through any errors. + pub fn peek(&mut self) -> Poll<Option<&S::Item>, S::Error> { + if self.peeked.is_some() { + return Ok(Async::Ready(self.peeked.as_ref())) + } + match try_ready!(self.poll()) { + None => Ok(Async::Ready(None)), + Some(item) => { + self.peeked = Some(item); + Ok(Async::Ready(self.peeked.as_ref())) + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs b/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs new file mode 100644 index 0000000000..fbc7df0844 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/poll_fn.rs @@ -0,0 +1,49 @@ +//! Definition of the `PollFn` combinator + +use {Stream, Poll}; + +/// A stream which adapts a function returning `Poll`. +/// +/// Created by the `poll_fn` function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct PollFn<F> { + inner: F, +} + +/// Creates a new stream wrapping around a function returning `Poll`. +/// +/// Polling the returned stream delegates to the wrapped function. +/// +/// # Examples +/// +/// ``` +/// use futures::stream::poll_fn; +/// use futures::{Async, Poll}; +/// +/// let mut counter = 1usize; +/// +/// let read_stream = poll_fn(move || -> Poll<Option<String>, std::io::Error> { +/// if counter == 0 { return Ok(Async::Ready(None)); } +/// counter -= 1; +/// Ok(Async::Ready(Some("Hello, World!".to_owned()))) +/// }); +/// ``` +pub fn poll_fn<T, E, F>(f: F) -> PollFn<F> +where + F: FnMut() -> Poll<Option<T>, E>, +{ + PollFn { inner: f } +} + +impl<T, E, F> Stream for PollFn<F> +where + F: FnMut() -> Poll<Option<T>, E>, +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<T>, E> { + (self.inner)() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/repeat.rs b/third_party/rust/futures-0.1.31/src/stream/repeat.rs new file mode 100644 index 0000000000..e3cb5ff49c --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/repeat.rs @@ -0,0 +1,53 @@ +use core::marker; + + +use stream::Stream; + +use {Async, Poll}; + + +/// Stream that produces the same element repeatedly. +/// +/// This structure is created by the `stream::repeat` function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Repeat<T, E> + where T: Clone +{ + item: T, + error: marker::PhantomData<E>, +} + +/// Create a stream which produces the same item repeatedly. +/// +/// Stream never produces an error or EOF. Note that you likely want to avoid +/// usage of `collect` or such on the returned stream as it will exhaust +/// available memory as it tries to just fill up all RAM. +/// +/// ```rust +/// use futures::*; +/// +/// let mut stream = stream::repeat::<_, bool>(10); +/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll()); +/// assert_eq!(Ok(Async::Ready(Some(10))), stream.poll()); +/// ``` +pub fn repeat<T, E>(item: T) -> Repeat<T, E> + where T: Clone +{ + Repeat { + item: item, + error: marker::PhantomData, + } +} + +impl<T, E> Stream for Repeat<T, E> + where T: Clone +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::Ready(Some(self.item.clone()))) + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/select.rs b/third_party/rust/futures-0.1.31/src/stream/select.rs new file mode 100644 index 0000000000..ae6b66cf14 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/select.rs @@ -0,0 +1,64 @@ +use {Poll, Async}; +use stream::{Stream, Fuse}; + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from either of the underlying streams as +/// they become available, and the streams are polled in a round-robin fashion. +/// Errors, however, are not merged: you get at most one error at a time. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Select<S1, S2> { + stream1: Fuse<S1>, + stream2: Fuse<S2>, + flag: bool, +} + +pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2> + where S1: Stream, + S2: Stream<Item = S1::Item, Error = S1::Error> +{ + Select { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + flag: false, + } +} + +impl<S1, S2> Stream for Select<S1, S2> + where S1: Stream, + S2: Stream<Item = S1::Item, Error = S1::Error> +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> { + let (a, b) = if self.flag { + (&mut self.stream2 as &mut Stream<Item=_, Error=_>, + &mut self.stream1 as &mut Stream<Item=_, Error=_>) + } else { + (&mut self.stream1 as &mut Stream<Item=_, Error=_>, + &mut self.stream2 as &mut Stream<Item=_, Error=_>) + }; + self.flag = !self.flag; + + let a_done = match a.poll()? { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) => true, + Async::NotReady => false, + }; + + match b.poll()? { + Async::Ready(Some(item)) => { + // If the other stream isn't finished yet, give them a chance to + // go first next time as we pulled something off `b`. + if !a_done { + self.flag = !self.flag; + } + Ok(Some(item).into()) + } + Async::Ready(None) if a_done => Ok(None.into()), + Async::Ready(None) | Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/skip.rs b/third_party/rust/futures-0.1.31/src/stream/skip.rs new file mode 100644 index 0000000000..a1d7b49797 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/skip.rs @@ -0,0 +1,84 @@ +use {Poll, Async}; +use stream::Stream; + +/// A stream combinator which skips a number of elements before continuing. +/// +/// This structure is produced by the `Stream::skip` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Skip<S> { + stream: S, + remaining: u64, +} + +pub fn new<S>(s: S, amt: u64) -> Skip<S> + where S: Stream, +{ + Skip { + stream: s, + remaining: amt, + } +} + +impl<S> Skip<S> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Skip<S> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S> Stream for Skip<S> + where S: Stream, +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + while self.remaining > 0 { + match try_ready!(self.stream.poll()) { + Some(_) => self.remaining -= 1, + None => return Ok(Async::Ready(None)), + } + } + + self.stream.poll() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/skip_while.rs b/third_party/rust/futures-0.1.31/src/stream/skip_while.rs new file mode 100644 index 0000000000..b571996c24 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/skip_while.rs @@ -0,0 +1,113 @@ +use {Async, Poll, IntoFuture, Future}; +use stream::Stream; + +/// A stream combinator which skips elements of a stream while a predicate +/// holds. +/// +/// This structure is produced by the `Stream::skip_while` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct SkipWhile<S, P, R> where S: Stream, R: IntoFuture { + stream: S, + pred: P, + pending: Option<(R::Future, S::Item)>, + done_skipping: bool, +} + +pub fn new<S, P, R>(s: S, p: P) -> SkipWhile<S, P, R> + where S: Stream, + P: FnMut(&S::Item) -> R, + R: IntoFuture<Item=bool, Error=S::Error>, +{ + SkipWhile { + stream: s, + pred: p, + pending: None, + done_skipping: false, + } +} + +impl<S, P, R> SkipWhile<S, P, R> where S: Stream, R: IntoFuture { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, P, R> ::sink::Sink for SkipWhile<S, P, R> + where S: ::sink::Sink + Stream, R: IntoFuture +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, P, R> Stream for SkipWhile<S, P, R> + where S: Stream, + P: FnMut(&S::Item) -> R, + R: IntoFuture<Item=bool, Error=S::Error>, +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + if self.done_skipping { + return self.stream.poll(); + } + + loop { + if self.pending.is_none() { + let item = match try_ready!(self.stream.poll()) { + Some(e) => e, + None => return Ok(Async::Ready(None)), + }; + self.pending = Some(((self.pred)(&item).into_future(), item)); + } + + assert!(self.pending.is_some()); + match self.pending.as_mut().unwrap().0.poll() { + Ok(Async::Ready(true)) => self.pending = None, + Ok(Async::Ready(false)) => { + let (_, item) = self.pending.take().unwrap(); + self.done_skipping = true; + return Ok(Async::Ready(Some(item))) + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + self.pending = None; + return Err(e) + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/split.rs b/third_party/rust/futures-0.1.31/src/stream/split.rs new file mode 100644 index 0000000000..ddaa52997d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/split.rs @@ -0,0 +1,105 @@ +use std::any::Any; +use std::error::Error; +use std::fmt; + +use {StartSend, Sink, Stream, Poll, Async, AsyncSink}; +use sync::BiLock; + +/// A `Stream` part of the split pair +#[derive(Debug)] +pub struct SplitStream<S>(BiLock<S>); + +impl<S> SplitStream<S> { + /// Attempts to put the two "halves" of a split `Stream + Sink` back + /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are + /// a matching pair originating from the same call to `Stream::split`. + pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> { + other.reunite(self) + } +} + +impl<S: Stream> Stream for SplitStream<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.poll(), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// A `Sink` part of the split pair +#[derive(Debug)] +pub struct SplitSink<S>(BiLock<S>); + +impl<S> SplitSink<S> { + /// Attempts to put the two "halves" of a split `Stream + Sink` back + /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are + /// a matching pair originating from the same call to `Stream::split`. + pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> { + self.0.reunite(other.0).map_err(|err| { + ReuniteError(SplitSink(err.0), SplitStream(err.1)) + }) + } +} + +impl<S: Sink> Sink for SplitSink<S> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) + -> StartSend<S::SinkItem, S::SinkError> + { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.start_send(item), + Async::NotReady => Ok(AsyncSink::NotReady(item)), + } + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.poll_complete(), + Async::NotReady => Ok(Async::NotReady), + } + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.close(), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) { + let (a, b) = BiLock::new(s); + let read = SplitStream(a); + let write = SplitSink(b); + (write, read) +} + +/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves +/// of a `Stream + Split`, and thus could not be `reunite`d. +pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>); + +impl<T> fmt::Debug for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("ReuniteError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair") + } +} + +impl<T: Any> Error for ReuniteError<T> { + fn description(&self) -> &str { + "tried to reunite a SplitStream and SplitSink that don't form a pair" + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/take.rs b/third_party/rust/futures-0.1.31/src/stream/take.rs new file mode 100644 index 0000000000..0ca68496eb --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/take.rs @@ -0,0 +1,86 @@ +use {Async, Poll}; +use stream::Stream; + +/// A stream combinator which returns a maximum number of elements. +/// +/// This structure is produced by the `Stream::take` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Take<S> { + stream: S, + remaining: u64, +} + +pub fn new<S>(s: S, amt: u64) -> Take<S> + where S: Stream, +{ + Take { + stream: s, + remaining: amt, + } +} + +impl<S> Take<S> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S> ::sink::Sink for Take<S> + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S> Stream for Take<S> + where S: Stream, +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + if self.remaining == 0 { + Ok(Async::Ready(None)) + } else { + let next = try_ready!(self.stream.poll()); + match next { + Some(_) => self.remaining -= 1, + None => self.remaining = 0, + } + Ok(Async::Ready(next)) + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/take_while.rs b/third_party/rust/futures-0.1.31/src/stream/take_while.rs new file mode 100644 index 0000000000..732ae855de --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/take_while.rs @@ -0,0 +1,113 @@ +use {Async, Poll, IntoFuture, Future}; +use stream::Stream; + +/// A stream combinator which takes elements from a stream while a predicate +/// holds. +/// +/// This structure is produced by the `Stream::take_while` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct TakeWhile<S, P, R> where S: Stream, R: IntoFuture { + stream: S, + pred: P, + pending: Option<(R::Future, S::Item)>, + done_taking: bool, +} + +pub fn new<S, P, R>(s: S, p: P) -> TakeWhile<S, P, R> + where S: Stream, + P: FnMut(&S::Item) -> R, + R: IntoFuture<Item=bool, Error=S::Error>, +{ + TakeWhile { + stream: s, + pred: p, + pending: None, + done_taking: false, + } +} + +impl<S, P, R> TakeWhile<S, P, R> where S: Stream, R: IntoFuture { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, P, R> ::sink::Sink for TakeWhile<S, P, R> + where S: ::sink::Sink + Stream, R: IntoFuture +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, P, R> Stream for TakeWhile<S, P, R> + where S: Stream, + P: FnMut(&S::Item) -> R, + R: IntoFuture<Item=bool, Error=S::Error>, +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + if self.done_taking { + return Ok(Async::Ready(None)); + } + + if self.pending.is_none() { + let item = match try_ready!(self.stream.poll()) { + Some(e) => e, + None => return Ok(Async::Ready(None)), + }; + self.pending = Some(((self.pred)(&item).into_future(), item)); + } + + assert!(self.pending.is_some()); + match self.pending.as_mut().unwrap().0.poll() { + Ok(Async::Ready(true)) => { + let (_, item) = self.pending.take().unwrap(); + Ok(Async::Ready(Some(item))) + }, + Ok(Async::Ready(false)) => { + self.done_taking = true; + Ok(Async::Ready(None)) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + self.pending = None; + Err(e) + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/then.rs b/third_party/rust/futures-0.1.31/src/stream/then.rs new file mode 100644 index 0000000000..cab338e922 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/then.rs @@ -0,0 +1,81 @@ +use {Async, IntoFuture, Future, Poll}; +use stream::Stream; + +/// A stream combinator which chains a computation onto each item produced by a +/// stream. +/// +/// This structure is produced by the `Stream::then` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Then<S, F, U> + where U: IntoFuture, +{ + stream: S, + future: Option<U::Future>, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> Then<S, F, U> + where S: Stream, + F: FnMut(Result<S::Item, S::Error>) -> U, + U: IntoFuture, +{ + Then { + stream: s, + future: None, + f: f, + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F, U> ::sink::Sink for Then<S, F, U> + where S: ::sink::Sink, U: IntoFuture, +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for Then<S, F, U> + where S: Stream, + F: FnMut(Result<S::Item, S::Error>) -> U, + U: IntoFuture, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<U::Item>, U::Error> { + if self.future.is_none() { + let item = match self.stream.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(e))) => Ok(e), + Err(e) => Err(e), + }; + self.future = Some((self.f)(item).into_future()); + } + assert!(self.future.is_some()); + match self.future.as_mut().unwrap().poll() { + Ok(Async::Ready(e)) => { + self.future = None; + Ok(Async::Ready(Some(e))) + } + Err(e) => { + self.future = None; + Err(e) + } + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/unfold.rs b/third_party/rust/futures-0.1.31/src/stream/unfold.rs new file mode 100644 index 0000000000..ac427b8c3b --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/unfold.rs @@ -0,0 +1,114 @@ +use core::mem; + +use {Future, IntoFuture, Async, Poll}; +use stream::Stream; + +/// Creates a `Stream` from a seed and a closure returning a `Future`. +/// +/// This function is the dual for the `Stream::fold()` adapter: while +/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a +/// `Stream` from a seed value. +/// +/// `unfold()` will call the provided closure with the provided seed, then wait +/// for the returned `Future` to complete with `(a, b)`. It will then yield the +/// value `a`, and use `b` as the next internal state. +/// +/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` +/// will stop producing items and return `Ok(Async::Ready(None))` in future +/// calls to `poll()`. +/// +/// In case of error generated by the returned `Future`, the error will be +/// returned by the `Stream`. The `Stream` will then yield +/// `Ok(Async::Ready(None))` in future calls to `poll()`. +/// +/// This function can typically be used when wanting to go from the "world of +/// futures" to the "world of streams": the provided closure can build a +/// `Future` using other library functions working on futures, and `unfold()` +/// will turn it into a `Stream` by repeating the operation. +/// +/// # Example +/// +/// ```rust +/// use futures::stream::{self, Stream}; +/// use futures::future::{self, Future}; +/// +/// let mut stream = stream::unfold(0, |state| { +/// if state <= 2 { +/// let next_state = state + 1; +/// let yielded = state * 2; +/// let fut = future::ok::<_, u32>((yielded, next_state)); +/// Some(fut) +/// } else { +/// None +/// } +/// }); +/// +/// let result = stream.collect().wait(); +/// assert_eq!(result, Ok(vec![0, 2, 4])); +/// ``` +pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> + where F: FnMut(T) -> Option<Fut>, + Fut: IntoFuture<Item = (It, T)>, +{ + Unfold { + f: f, + state: State::Ready(init), + } +} + +/// A stream which creates futures, polls them and return their result +/// +/// This stream is returned by the `futures::stream::unfold` method +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Unfold<T, F, Fut> where Fut: IntoFuture { + f: F, + state: State<T, Fut::Future>, +} + +impl <T, F, Fut, It> Stream for Unfold<T, F, Fut> + where F: FnMut(T) -> Option<Fut>, + Fut: IntoFuture<Item = (It, T)>, +{ + type Item = It; + type Error = Fut::Error; + + fn poll(&mut self) -> Poll<Option<It>, Fut::Error> { + loop { + match mem::replace(&mut self.state, State::Empty) { + // State::Empty may happen if the future returned an error + State::Empty => { return Ok(Async::Ready(None)); } + State::Ready(state) => { + match (self.f)(state) { + Some(fut) => { self.state = State::Processing(fut.into_future()); } + None => { return Ok(Async::Ready(None)); } + } + } + State::Processing(mut fut) => { + match fut.poll()? { + Async:: Ready((item, next_state)) => { + self.state = State::Ready(next_state); + return Ok(Async::Ready(Some(item))); + } + Async::NotReady => { + self.state = State::Processing(fut); + return Ok(Async::NotReady); + } + } + } + } + } + } +} + +#[derive(Debug)] +enum State<T, F> where F: Future { + /// Placeholder state when doing work, or when the returned Future generated an error + Empty, + + /// Ready to generate new future; current internal state is the `T` + Ready(T), + + /// Working on a future generated previously + Processing(F), +} diff --git a/third_party/rust/futures-0.1.31/src/stream/wait.rs b/third_party/rust/futures-0.1.31/src/stream/wait.rs new file mode 100644 index 0000000000..80acb6c2a6 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/wait.rs @@ -0,0 +1,53 @@ +use stream::Stream; +use executor; + +/// A stream combinator which converts an asynchronous stream to a **blocking +/// iterator**. +/// +/// Created by the `Stream::wait` method, this function transforms any stream +/// into a standard iterator. This is implemented by blocking the current thread +/// while items on the underlying stream aren't ready yet. +#[must_use = "iterators do nothing unless advanced"] +#[derive(Debug)] +pub struct Wait<S> { + stream: executor::Spawn<S>, +} + +impl<S> Wait<S> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + self.stream.get_ref() + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + self.stream.get_mut() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +pub fn new<S: Stream>(s: S) -> Wait<S> { + Wait { + stream: executor::spawn(s), + } +} + +impl<S: Stream> Iterator for Wait<S> { + type Item = Result<S::Item, S::Error>; + + fn next(&mut self) -> Option<Self::Item> { + self.stream.wait_stream() + } +} diff --git a/third_party/rust/futures-0.1.31/src/stream/zip.rs b/third_party/rust/futures-0.1.31/src/stream/zip.rs new file mode 100644 index 0000000000..17e3c69ffe --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/zip.rs @@ -0,0 +1,59 @@ +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from one or both of the underlying +/// streams as they become available. Errors, however, are not merged: you +#[derive(Debug)] +/// get at most one error at a time. +#[must_use = "streams do nothing unless polled"] +pub struct Zip<S1: Stream, S2: Stream> { + stream1: Fuse<S1>, + stream2: Fuse<S2>, + queued1: Option<S1::Item>, + queued2: Option<S2::Item>, +} + +pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Zip<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + Zip { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + queued1: None, + queued2: None, + } +} + +impl<S1, S2> Stream for Zip<S1, S2> + where S1: Stream, S2: Stream<Error = S1::Error> +{ + type Item = (S1::Item, S2::Item); + type Error = S1::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if self.queued1.is_none() { + match self.stream1.poll()? { + Async::Ready(Some(item1)) => self.queued1 = Some(item1), + Async::Ready(None) | Async::NotReady => {} + } + } + if self.queued2.is_none() { + match self.stream2.poll()? { + Async::Ready(Some(item2)) => self.queued2 = Some(item2), + Async::Ready(None) | Async::NotReady => {} + } + } + + if self.queued1.is_some() && self.queued2.is_some() { + let pair = (self.queued1.take().unwrap(), + self.queued2.take().unwrap()); + Ok(Async::Ready(Some(pair))) + } else if self.stream1.is_done() || self.stream2.is_done() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} |