diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/future/join_all.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/future/join_all.rs | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/future/join_all.rs b/third_party/rust/futures-0.1.31/src/future/join_all.rs new file mode 100644 index 0000000000..398a7a4736 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/future/join_all.rs @@ -0,0 +1,136 @@ +//! Definition of the `JoinAll` combinator, waiting for all of a list of futures +//! to finish. + +use std::prelude::v1::*; + +use std::fmt; +use std::mem; + +use {Future, IntoFuture, Poll, Async}; + +#[derive(Debug)] +enum ElemState<T> where T: Future { + Pending(T), + Done(T::Item), +} + +/// A future which takes a list of futures and resolves with a vector of the +/// completed values. +/// +/// This future is created with the `join_all` method. +#[must_use = "futures do nothing unless polled"] +pub struct JoinAll<I> + where I: IntoIterator, + I::Item: IntoFuture, +{ + elems: Vec<ElemState<<I::Item as IntoFuture>::Future>>, +} + +impl<I> fmt::Debug for JoinAll<I> + where I: IntoIterator, + I::Item: IntoFuture, + <<I as IntoIterator>::Item as IntoFuture>::Future: fmt::Debug, + <<I as IntoIterator>::Item as IntoFuture>::Item: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("JoinAll") + .field("elems", &self.elems) + .finish() + } +} + +/// Creates a future which represents a collection of the results of the futures +/// given. +/// +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec<T>` in the same order as they +/// were provided. If any future returns an error then all other futures will be +/// canceled and an error will be returned immediately. If all futures complete +/// successfully, however, then the returned future will succeed with a `Vec` of +/// all the successful results. +/// +/// # Examples +/// +/// ``` +/// use futures::future::*; +/// +/// let f = join_all(vec![ +/// ok::<u32, u32>(1), +/// ok::<u32, u32>(2), +/// ok::<u32, u32>(3), +/// ]); +/// let f = f.map(|x| { +/// assert_eq!(x, [1, 2, 3]); +/// }); +/// +/// let f = join_all(vec![ +/// Box::new(ok::<u32, u32>(1)), +/// Box::new(err::<u32, u32>(2)), +/// Box::new(ok::<u32, u32>(3)), +/// ]); +/// let f = f.then(|x| { +/// assert_eq!(x, Err(2)); +/// x +/// }); +/// ``` +pub fn join_all<I>(i: I) -> JoinAll<I> + where I: IntoIterator, + I::Item: IntoFuture, +{ + let elems = i.into_iter().map(|f| { + ElemState::Pending(f.into_future()) + }).collect(); + JoinAll { elems: elems } +} + +impl<I> Future for JoinAll<I> + where I: IntoIterator, + I::Item: IntoFuture, +{ + type Item = Vec<<I::Item as IntoFuture>::Item>; + type Error = <I::Item as IntoFuture>::Error; + + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let mut all_done = true; + + for idx in 0 .. self.elems.len() { + let done_val = match self.elems[idx] { + ElemState::Pending(ref mut t) => { + match t.poll() { + Ok(Async::Ready(v)) => Ok(v), + Ok(Async::NotReady) => { + all_done = false; + continue + } + Err(e) => Err(e), + } + } + ElemState::Done(ref mut _v) => continue, + }; + + match done_val { + Ok(v) => self.elems[idx] = ElemState::Done(v), + Err(e) => { + // On completion drop all our associated resources + // ASAP. + self.elems = Vec::new(); + return Err(e) + } + } + } + + if all_done { + let elems = mem::replace(&mut self.elems, Vec::new()); + let result = elems.into_iter().map(|e| { + match e { + ElemState::Done(t) => t, + _ => unreachable!(), + } + }).collect(); + Ok(Async::Ready(result)) + } else { + Ok(Async::NotReady) + } + } +} |