diff options
Diffstat (limited to 'vendor/futures-util/src/future')
-rw-r--r-- | vendor/futures-util/src/future/either.rs | 58 | ||||
-rw-r--r-- | vendor/futures-util/src/future/future/fuse.rs | 12 | ||||
-rw-r--r-- | vendor/futures-util/src/future/future/shared.rs | 64 | ||||
-rw-r--r-- | vendor/futures-util/src/future/join_all.rs | 29 | ||||
-rw-r--r-- | vendor/futures-util/src/future/pending.rs | 1 | ||||
-rw-r--r-- | vendor/futures-util/src/future/select.rs | 30 | ||||
-rw-r--r-- | vendor/futures-util/src/future/select_all.rs | 3 | ||||
-rw-r--r-- | vendor/futures-util/src/future/select_ok.rs | 2 | ||||
-rw-r--r-- | vendor/futures-util/src/future/try_future/mod.rs | 6 | ||||
-rw-r--r-- | vendor/futures-util/src/future/try_join_all.rs | 137 | ||||
-rw-r--r-- | vendor/futures-util/src/future/try_select.rs | 13 |
11 files changed, 248 insertions, 107 deletions
diff --git a/vendor/futures-util/src/future/either.rs b/vendor/futures-util/src/future/either.rs index 9602de7a4..27e5064df 100644 --- a/vendor/futures-util/src/future/either.rs +++ b/vendor/futures-util/src/future/either.rs @@ -33,11 +33,31 @@ pub enum Either<A, B> { } impl<A, B> Either<A, B> { - fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> { + // SAFETY: We can use `new_unchecked` because the `inner` parts are + // guaranteed to be pinned, as they come from `self` which is pinned. unsafe { - match self.get_unchecked_mut() { - Either::Left(a) => Either::Left(Pin::new_unchecked(a)), - Either::Right(b) => Either::Right(Pin::new_unchecked(b)), + match *Pin::get_ref(self) { + Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)), + } + } + } + + /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + // SAFETY: `get_unchecked_mut` is fine because we don't move anything. + // We can use `new_unchecked` because the `inner` parts are guaranteed + // to be pinned, as they come from `self` which is pinned, and we never + // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We + // also don't have an implementation of `Drop`, nor manual `Unpin`. + unsafe { + match *Pin::get_unchecked_mut(self) { + Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)), } } } @@ -85,7 +105,7 @@ where type Output = A::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll(cx), Either::Right(x) => x.poll(cx), } @@ -113,7 +133,7 @@ where type Item = A::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_next(cx), Either::Right(x) => x.poll_next(cx), } @@ -149,28 +169,28 @@ where type Error = A::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_ready(cx), Either::Right(x) => x.poll_ready(cx), } } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.start_send(item), Either::Right(x) => x.start_send(item), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -198,7 +218,7 @@ mod if_std { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read(cx, buf), Either::Right(x) => x.poll_read(cx, buf), } @@ -209,7 +229,7 @@ mod if_std { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read_vectored(cx, bufs), Either::Right(x) => x.poll_read_vectored(cx, bufs), } @@ -226,7 +246,7 @@ mod if_std { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write(cx, buf), Either::Right(x) => x.poll_write(cx, buf), } @@ -237,21 +257,21 @@ mod if_std { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write_vectored(cx, bufs), Either::Right(x) => x.poll_write_vectored(cx, bufs), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -268,7 +288,7 @@ mod if_std { cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_seek(cx, pos), Either::Right(x) => x.poll_seek(cx, pos), } @@ -281,14 +301,14 @@ mod if_std { B: AsyncBufRead, { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_fill_buf(cx), Either::Right(x) => x.poll_fill_buf(cx), } } fn consume(self: Pin<&mut Self>, amt: usize) { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.consume(amt), Either::Right(x) => x.consume(amt), } diff --git a/vendor/futures-util/src/future/future/fuse.rs b/vendor/futures-util/src/future/future/fuse.rs index 597aec1a4..225790672 100644 --- a/vendor/futures-util/src/future/future/fuse.rs +++ b/vendor/futures-util/src/future/future/fuse.rs @@ -1,6 +1,5 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; -use futures_core::ready; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -81,13 +80,12 @@ impl<Fut: Future> Future for Fuse<Fut> { type Output = Fut::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { - Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { - Some(fut) => { - let output = ready!(fut.poll(cx)); + match self.as_mut().project().inner.as_pin_mut() { + Some(fut) => fut.poll(cx).map(|output| { self.project().inner.set(None); output - } - None => return Poll::Pending, - }) + }), + None => Poll::Pending, + } } } diff --git a/vendor/futures-util/src/future/future/shared.rs b/vendor/futures-util/src/future/future/shared.rs index 9b31932fe..ecd1b426d 100644 --- a/vendor/futures-util/src/future/future/shared.rs +++ b/vendor/futures-util/src/future/future/shared.rs @@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker}; use slab::Slab; use std::cell::UnsafeCell; use std::fmt; +use std::hash::Hasher; use std::pin::Pin; +use std::ptr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, SeqCst}; use std::sync::{Arc, Mutex, Weak}; @@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> { impl<Fut> Shared<Fut> where Fut: Future, - Fut::Output: Clone, { /// Returns [`Some`] containing a reference to this [`Shared`]'s output if /// it has already been computed by a clone or [`None`] if it hasn't been @@ -139,6 +140,7 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the strong count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn strong_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::strong_count(arc)) } @@ -152,15 +154,44 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the weak count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn weak_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::weak_count(arc)) } + + /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`. + pub fn ptr_hash<H: Hasher>(&self, state: &mut H) { + match self.inner.as_ref() { + Some(arc) => { + state.write_u8(1); + ptr::hash(Arc::as_ptr(arc), state); + } + None => { + state.write_u8(0); + } + } + } + + /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to + /// `Arc::ptr_eq`). + /// + /// Returns `false` if either `Shared` has terminated. + pub fn ptr_eq(&self, rhs: &Self) -> bool { + let lhs = match self.inner.as_ref() { + Some(lhs) => lhs, + None => return false, + }; + let rhs = match rhs.inner.as_ref() { + Some(rhs) => rhs, + None => return false, + }; + Arc::ptr_eq(lhs, rhs) + } } impl<Fut> Inner<Fut> where Fut: Future, - Fut::Output: Clone, { /// Safety: callers must first ensure that `self.inner.state` /// is `COMPLETE` @@ -170,6 +201,13 @@ where FutureOrOutput::Future(_) => unreachable!(), } } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ /// Registers the current task to receive a wakeup when we are awoken. fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { let mut wakers_guard = self.notifier.wakers.lock().unwrap(); @@ -262,19 +300,20 @@ where let waker = waker_ref(&inner.notifier); let mut cx = Context::from_waker(&waker); - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } impl Drop for Reset<'_> { fn drop(&mut self) { - use std::thread; - - if thread::panicking() { - self.0.store(POISONED, SeqCst); + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); } } } - let _reset = Reset(&inner.notifier.state); + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; let output = { let future = unsafe { @@ -284,12 +323,15 @@ where } }; - match future.poll(&mut cx) { + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { Poll::Pending => { if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success - drop(_reset); + drop(reset); this.inner = Some(inner); return Poll::Pending; } else { @@ -313,7 +355,7 @@ where waker.wake(); } - drop(_reset); // Make borrow checker happy + drop(reset); // Make borrow checker happy drop(wakers_guard); // Safety: We're in the COMPLETE state diff --git a/vendor/futures-util/src/future/join_all.rs b/vendor/futures-util/src/future/join_all.rs index 2e52ac17f..7dc159ba0 100644 --- a/vendor/futures-util/src/future/join_all.rs +++ b/vendor/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { +pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind<F> +enum JoinAllKind<F> where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } } diff --git a/vendor/futures-util/src/future/pending.rs b/vendor/futures-util/src/future/pending.rs index 92c78d52b..b8e28686e 100644 --- a/vendor/futures-util/src/future/pending.rs +++ b/vendor/futures-util/src/future/pending.rs @@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> { /// unreachable!(); /// # }); /// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] pub fn pending<T>() -> Pending<T> { assert_future::<T, _>(Pending { _data: marker::PhantomData }) } diff --git a/vendor/futures-util/src/future/select.rs b/vendor/futures-util/src/future/select.rs index bd44f20f7..7e33d195f 100644 --- a/vendor/futures-util/src/future/select.rs +++ b/vendor/futures-util/src/future/select.rs @@ -99,17 +99,27 @@ where type Output = Either<(A::Output, B), (B::Output, A)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where + /// `Option::unwrap` does not. + #[inline(always)] + fn unwrap_option<T>(value: Option<T>) -> T { + match value { + None => unreachable!(), + Some(value) => value, + } } + + let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0))); + } + + Poll::Pending } } diff --git a/vendor/futures-util/src/future/select_all.rs b/vendor/futures-util/src/future/select_all.rs index 106e50844..0a51d0da6 100644 --- a/vendor/futures-util/src/future/select_all.rs +++ b/vendor/futures-util/src/future/select_all.rs @@ -58,8 +58,9 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> { }); match item { Some((idx, res)) => { + #[allow(clippy::let_underscore_future)] let _ = self.inner.swap_remove(idx); - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) } None => Poll::Pending, diff --git a/vendor/futures-util/src/future/select_ok.rs b/vendor/futures-util/src/future/select_ok.rs index 0ad83c6db..5d5579930 100644 --- a/vendor/futures-util/src/future/select_ok.rs +++ b/vendor/futures-util/src/future/select_ok.rs @@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { drop(self.inner.remove(idx)); match res { Ok(e) => { - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/vendor/futures-util/src/future/try_future/mod.rs b/vendor/futures-util/src/future/try_future/mod.rs index fb3bdd8a0..e5bc70071 100644 --- a/vendor/futures-util/src/future/try_future/mod.rs +++ b/vendor/futures-util/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err<E, F>(self, f: F) -> MapErr<Self, F> where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::<i32>(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, diff --git a/vendor/futures-util/src/future/try_join_all.rs b/vendor/futures-util/src/future/try_join_all.rs index 29244af83..506f45065 100644 --- a/vendor/futures-util/src/future/try_join_all.rs +++ b/vendor/futures-util/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState<E = ()> { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll<F> where F: TryFuture, { - elems: Pin<Box<[TryMaybeDone<F>]>>, + kind: TryJoinAllKind<F>, +} + +enum TryJoinAllKind<F> +where + F: TryFuture, +{ + Small { + elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, + }, } impl<F> fmt::Debug for TryJoinAll<F> @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -60,6 +77,20 @@ where /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// +/// # See Also +/// +/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// +/// Some examples for additional functionality provided by these are: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// /// # Examples /// /// ``` @@ -83,15 +114,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item> +pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } } impl<F> Future for TryJoinAll<F> @@ -101,36 +154,46 @@ where type Output = Result<Vec<F::Ok>, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> { +impl<F> FromIterator<F> for TryJoinAll<F> +where + F: TryFuture, +{ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { try_join_all(iter) } diff --git a/vendor/futures-util/src/future/try_select.rs b/vendor/futures-util/src/future/try_select.rs index 4d0b7ff13..bc282f7db 100644 --- a/vendor/futures-util/src/future/try_select.rs +++ b/vendor/futures-util/src/future/try_select.rs @@ -12,6 +12,9 @@ pub struct TrySelect<A, B> { impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} +type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>; +type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>; + /// Waits for either one of two differently-typed futures to complete. /// /// This function will return a new future which awaits for either one of both @@ -52,10 +55,9 @@ where A: TryFuture + Unpin, B: TryFuture + Unpin, { - super::assert_future::< - Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, - _, - >(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect { + inner: Some((future1, future2)), + }) } impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> @@ -63,8 +65,7 @@ where A: TryFuture, B: TryFuture, { - #[allow(clippy::type_complexity)] - type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; + type Output = Result<EitherOk<A, B>, EitherErr<A, B>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); |