//! Definition of the `JoinAll` combinator, waiting for all of a list of futures //! to finish. use alloc::boxed::Box; use alloc::vec::Vec; use core::fmt; use core::future::Future; use core::iter::FromIterator; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } #[must_use = "futures do nothing unless you `.await` or poll them"] /// Future for the [`join_all`] function. pub struct JoinAll where F: Future, { kind: JoinAllKind, } #[cfg(not(futures_no_atomic_cas))] pub(crate) const SMALL: usize = 30; enum JoinAllKind where F: Future, { Small { elems: Pin]>>, }, #[cfg(not(futures_no_atomic_cas))] Big { fut: Collect, Vec>, }, } impl fmt::Debug for JoinAll where F: Future + fmt::Debug, F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.kind { JoinAllKind::Small { ref elems } => { f.debug_struct("JoinAll").field("elems", elems).finish() } #[cfg(not(futures_no_atomic_cas))] JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), } } } /// Creates a future which represents a collection of the outputs of the futures /// given. /// /// The returned future will drive execution for all of its underlying futures, /// collecting the results into a destination `Vec` in the same order as they /// were provided. /// /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// /// # See Also /// /// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance /// reasons if the number of futures is large. You may want to look into using it or /// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. /// /// Some examples for additional functionality provided by these are: /// /// * Adding new futures to the set even after it has been started. /// /// * Only polling the specific futures that have been woken. In cases where /// you have a lot of futures this will result in much more efficient polling. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::future::join_all; /// /// async fn foo(i: u32) -> u32 { i } /// /// let futures = vec![foo(1), foo(2), foo(3)]; /// /// assert_eq!(join_all(futures).await, [1, 2, 3]); /// # }); /// ``` pub fn join_all(iter: I) -> JoinAll where I: IntoIterator, I::Item: Future, { let iter = iter.into_iter(); #[cfg(futures_no_atomic_cas)] { let kind = JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; assert_future::::Output>, _>(JoinAll { kind }) } #[cfg(not(futures_no_atomic_cas))] { let kind = match iter.size_hint().1 { Some(max) if max <= SMALL => JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into(), }, _ => JoinAllKind::Big { fut: iter.collect::>().collect() }, }; assert_future::::Output>, _>(JoinAll { kind }) } } impl Future for JoinAll where F: Future, { type Output = Vec; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match &mut self.kind { JoinAllKind::Small { elems } => { let mut all_done = true; for elem in iter_pin_mut(elems.as_mut()) { if elem.poll(cx).is_pending() { all_done = false; } } if all_done { let mut elems = mem::replace(elems, Box::pin([])); let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); Poll::Ready(result) } else { Poll::Pending } } #[cfg(not(futures_no_atomic_cas))] JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } impl FromIterator for JoinAll { fn from_iter>(iter: T) -> Self { join_all(iter) } }