summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/future
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/future')
-rw-r--r--vendor/futures-util/src/future/either.rs58
-rw-r--r--vendor/futures-util/src/future/future/fuse.rs12
-rw-r--r--vendor/futures-util/src/future/future/shared.rs64
-rw-r--r--vendor/futures-util/src/future/join_all.rs29
-rw-r--r--vendor/futures-util/src/future/pending.rs1
-rw-r--r--vendor/futures-util/src/future/select.rs30
-rw-r--r--vendor/futures-util/src/future/select_all.rs3
-rw-r--r--vendor/futures-util/src/future/select_ok.rs2
-rw-r--r--vendor/futures-util/src/future/try_future/mod.rs6
-rw-r--r--vendor/futures-util/src/future/try_join_all.rs137
-rw-r--r--vendor/futures-util/src/future/try_select.rs13
11 files changed, 248 insertions, 107 deletions
diff --git a/vendor/futures-util/src/future/either.rs b/vendor/futures-util/src/future/either.rs
index 9602de7a4..27e5064df 100644
--- a/vendor/futures-util/src/future/either.rs
+++ b/vendor/futures-util/src/future/either.rs
@@ -33,11 +33,31 @@ pub enum Either<A, B> {
}
impl<A, B> Either<A, B> {
- fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> {
+ // SAFETY: We can use `new_unchecked` because the `inner` parts are
+ // guaranteed to be pinned, as they come from `self` which is pinned.
unsafe {
- match self.get_unchecked_mut() {
- Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
- Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
+ match *Pin::get_ref(self) {
+ Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)),
+ }
+ }
+ }
+
+ /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ // SAFETY: `get_unchecked_mut` is fine because we don't move anything.
+ // We can use `new_unchecked` because the `inner` parts are guaranteed
+ // to be pinned, as they come from `self` which is pinned, and we never
+ // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We
+ // also don't have an implementation of `Drop`, nor manual `Unpin`.
+ unsafe {
+ match *Pin::get_unchecked_mut(self) {
+ Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)),
}
}
}
@@ -85,7 +105,7 @@ where
type Output = A::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll(cx),
Either::Right(x) => x.poll(cx),
}
@@ -113,7 +133,7 @@ where
type Item = A::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_next(cx),
Either::Right(x) => x.poll_next(cx),
}
@@ -149,28 +169,28 @@ where
type Error = A::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_ready(cx),
Either::Right(x) => x.poll_ready(cx),
}
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.start_send(item),
Either::Right(x) => x.start_send(item),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -198,7 +218,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read(cx, buf),
Either::Right(x) => x.poll_read(cx, buf),
}
@@ -209,7 +229,7 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read_vectored(cx, bufs),
Either::Right(x) => x.poll_read_vectored(cx, bufs),
}
@@ -226,7 +246,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write(cx, buf),
Either::Right(x) => x.poll_write(cx, buf),
}
@@ -237,21 +257,21 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write_vectored(cx, bufs),
Either::Right(x) => x.poll_write_vectored(cx, bufs),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -268,7 +288,7 @@ mod if_std {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_seek(cx, pos),
Either::Right(x) => x.poll_seek(cx, pos),
}
@@ -281,14 +301,14 @@ mod if_std {
B: AsyncBufRead,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_fill_buf(cx),
Either::Right(x) => x.poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.consume(amt),
Either::Right(x) => x.consume(amt),
}
diff --git a/vendor/futures-util/src/future/future/fuse.rs b/vendor/futures-util/src/future/future/fuse.rs
index 597aec1a4..225790672 100644
--- a/vendor/futures-util/src/future/future/fuse.rs
+++ b/vendor/futures-util/src/future/future/fuse.rs
@@ -1,6 +1,5 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
-use futures_core::ready;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -81,13 +80,12 @@ impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
- Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
- Some(fut) => {
- let output = ready!(fut.poll(cx));
+ match self.as_mut().project().inner.as_pin_mut() {
+ Some(fut) => fut.poll(cx).map(|output| {
self.project().inner.set(None);
output
- }
- None => return Poll::Pending,
- })
+ }),
+ None => Poll::Pending,
+ }
}
}
diff --git a/vendor/futures-util/src/future/future/shared.rs b/vendor/futures-util/src/future/future/shared.rs
index 9b31932fe..ecd1b426d 100644
--- a/vendor/futures-util/src/future/future/shared.rs
+++ b/vendor/futures-util/src/future/future/shared.rs
@@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
+use std::hash::Hasher;
use std::pin::Pin;
+use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex, Weak};
@@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> {
impl<Fut> Shared<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Returns [`Some`] containing a reference to this [`Shared`]'s output if
/// it has already been computed by a clone or [`None`] if it hasn't been
@@ -139,6 +140,7 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the strong count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn strong_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::strong_count(arc))
}
@@ -152,15 +154,44 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the weak count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn weak_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::weak_count(arc))
}
+
+ /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`.
+ pub fn ptr_hash<H: Hasher>(&self, state: &mut H) {
+ match self.inner.as_ref() {
+ Some(arc) => {
+ state.write_u8(1);
+ ptr::hash(Arc::as_ptr(arc), state);
+ }
+ None => {
+ state.write_u8(0);
+ }
+ }
+ }
+
+ /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to
+ /// `Arc::ptr_eq`).
+ ///
+ /// Returns `false` if either `Shared` has terminated.
+ pub fn ptr_eq(&self, rhs: &Self) -> bool {
+ let lhs = match self.inner.as_ref() {
+ Some(lhs) => lhs,
+ None => return false,
+ };
+ let rhs = match rhs.inner.as_ref() {
+ Some(rhs) => rhs,
+ None => return false,
+ };
+ Arc::ptr_eq(lhs, rhs)
+ }
}
impl<Fut> Inner<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Safety: callers must first ensure that `self.inner.state`
/// is `COMPLETE`
@@ -170,6 +201,13 @@ where
FutureOrOutput::Future(_) => unreachable!(),
}
}
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
/// Registers the current task to receive a wakeup when we are awoken.
fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
let mut wakers_guard = self.notifier.wakers.lock().unwrap();
@@ -262,19 +300,20 @@ where
let waker = waker_ref(&inner.notifier);
let mut cx = Context::from_waker(&waker);
- struct Reset<'a>(&'a AtomicUsize);
+ struct Reset<'a> {
+ state: &'a AtomicUsize,
+ did_not_panic: bool,
+ }
impl Drop for Reset<'_> {
fn drop(&mut self) {
- use std::thread;
-
- if thread::panicking() {
- self.0.store(POISONED, SeqCst);
+ if !self.did_not_panic {
+ self.state.store(POISONED, SeqCst);
}
}
}
- let _reset = Reset(&inner.notifier.state);
+ let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };
let output = {
let future = unsafe {
@@ -284,12 +323,15 @@ where
}
};
- match future.poll(&mut cx) {
+ let poll_result = future.poll(&mut cx);
+ reset.did_not_panic = true;
+
+ match poll_result {
Poll::Pending => {
if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
{
// Success
- drop(_reset);
+ drop(reset);
this.inner = Some(inner);
return Poll::Pending;
} else {
@@ -313,7 +355,7 @@ where
waker.wake();
}
- drop(_reset); // Make borrow checker happy
+ drop(reset); // Make borrow checker happy
drop(wakers_guard);
// Safety: We're in the COMPLETE state
diff --git a/vendor/futures-util/src/future/join_all.rs b/vendor/futures-util/src/future/join_all.rs
index 2e52ac17f..7dc159ba0 100644
--- a/vendor/futures-util/src/future/join_all.rs
+++ b/vendor/futures-util/src/future/join_all.rs
@@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone};
#[cfg(not(futures_no_atomic_cas))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
@@ -32,9 +32,9 @@ where
}
#[cfg(not(futures_no_atomic_cas))]
-const SMALL: usize = 30;
+pub(crate) const SMALL: usize = 30;
-pub(crate) enum JoinAllKind<F>
+enum JoinAllKind<F>
where
F: Future,
{
@@ -104,26 +104,25 @@ where
I: IntoIterator,
I::Item: Future,
{
+ let iter = iter.into_iter();
+
#[cfg(futures_no_atomic_cas)]
{
- let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
- let kind = JoinAllKind::Small { elems };
+ let kind =
+ JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() };
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
+
#[cfg(not(futures_no_atomic_cas))]
{
- let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
- None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
- Some(max) => {
- if max <= SMALL {
- let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
- JoinAllKind::Small { elems }
- } else {
- JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
- }
- }
+ Some(max) if max <= SMALL => JoinAllKind::Small {
+ elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
};
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
}
diff --git a/vendor/futures-util/src/future/pending.rs b/vendor/futures-util/src/future/pending.rs
index 92c78d52b..b8e28686e 100644
--- a/vendor/futures-util/src/future/pending.rs
+++ b/vendor/futures-util/src/future/pending.rs
@@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> {
/// unreachable!();
/// # });
/// ```
+#[cfg_attr(docsrs, doc(alias = "never"))]
pub fn pending<T>() -> Pending<T> {
assert_future::<T, _>(Pending { _data: marker::PhantomData })
}
diff --git a/vendor/futures-util/src/future/select.rs b/vendor/futures-util/src/future/select.rs
index bd44f20f7..7e33d195f 100644
--- a/vendor/futures-util/src/future/select.rs
+++ b/vendor/futures-util/src/future/select.rs
@@ -99,17 +99,27 @@ where
type Output = Either<(A::Output, B), (B::Output, A)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
- match a.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Left((x, b))),
- Poll::Pending => match b.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Right((x, a))),
- Poll::Pending => {
- self.inner = Some((a, b));
- Poll::Pending
- }
- },
+ /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where
+ /// `Option::unwrap` does not.
+ #[inline(always)]
+ fn unwrap_option<T>(value: Option<T>) -> T {
+ match value {
+ None => unreachable!(),
+ Some(value) => value,
+ }
}
+
+ let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");
+
+ if let Poll::Ready(val) = a.poll_unpin(cx) {
+ return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1)));
+ }
+
+ if let Poll::Ready(val) = b.poll_unpin(cx) {
+ return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0)));
+ }
+
+ Poll::Pending
}
}
diff --git a/vendor/futures-util/src/future/select_all.rs b/vendor/futures-util/src/future/select_all.rs
index 106e50844..0a51d0da6 100644
--- a/vendor/futures-util/src/future/select_all.rs
+++ b/vendor/futures-util/src/future/select_all.rs
@@ -58,8 +58,9 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
});
match item {
Some((idx, res)) => {
+ #[allow(clippy::let_underscore_future)]
let _ = self.inner.swap_remove(idx);
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
Poll::Ready((res, idx, rest))
}
None => Poll::Pending,
diff --git a/vendor/futures-util/src/future/select_ok.rs b/vendor/futures-util/src/future/select_ok.rs
index 0ad83c6db..5d5579930 100644
--- a/vendor/futures-util/src/future/select_ok.rs
+++ b/vendor/futures-util/src/future/select_ok.rs
@@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
drop(self.inner.remove(idx));
match res {
Ok(e) => {
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
diff --git a/vendor/futures-util/src/future/try_future/mod.rs b/vendor/futures-util/src/future/try_future/mod.rs
index fb3bdd8a0..e5bc70071 100644
--- a/vendor/futures-util/src/future/try_future/mod.rs
+++ b/vendor/futures-util/src/future/try_future/mod.rs
@@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture {
/// assert_eq!(future.await, Ok(1));
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
@@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture {
/// let future_err_i32 = future_err_u8.err_into::<i32>();
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn err_into<E>(self) -> ErrInto<Self, E>
where
Self: Sized,
diff --git a/vendor/futures-util/src/future/try_join_all.rs b/vendor/futures-util/src/future/try_join_all.rs
index 29244af83..506f45065 100644
--- a/vendor/futures-util/src/future/try_join_all.rs
+++ b/vendor/futures-util/src/future/try_join_all.rs
@@ -10,14 +10,11 @@ use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
-use super::{assert_future, TryFuture, TryMaybeDone};
+use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
- // Safety: `std` _could_ make this unsound if it were to decide Pin's
- // invariants aren't required to transmit through slices. Otherwise this has
- // the same safety as a normal field pin projection.
- unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
-}
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};
+use crate::TryFutureExt;
enum FinalState<E = ()> {
Pending,
@@ -31,7 +28,20 @@ pub struct TryJoinAll<F>
where
F: TryFuture,
{
- elems: Pin<Box<[TryMaybeDone<F>]>>,
+ kind: TryJoinAllKind<F>,
+}
+
+enum TryJoinAllKind<F>
+where
+ F: TryFuture,
+{
+ Small {
+ elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
+ },
}
impl<F> fmt::Debug for TryJoinAll<F>
@@ -39,9 +49,16 @@ where
F: TryFuture + fmt::Debug,
F::Ok: fmt::Debug,
F::Error: fmt::Debug,
+ F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
+ match self.kind {
+ TryJoinAllKind::Small { ref elems } => {
+ f.debug_struct("TryJoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
}
}
@@ -60,6 +77,20 @@ where
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
+/// # See Also
+///
+/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+///
+/// Some examples for additional functionality provided by these are:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+///
/// # Examples
///
/// ```
@@ -83,15 +114,37 @@ where
/// assert_eq!(try_join_all(futures).await, Err(2));
/// # });
/// ```
-pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
+pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>
where
I: IntoIterator,
I::Item: TryFuture,
{
- let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
- assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
- TryJoinAll { elems: elems.into() },
- )
+ let iter = iter.into_iter().map(TryFutureExt::into_future);
+
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let kind = TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
+
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let kind = match iter.size_hint().1 {
+ Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() },
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
}
impl<F> Future for TryJoinAll<F>
@@ -101,36 +154,46 @@ where
type Output = Result<Vec<F::Ok>, F::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut state = FinalState::AllDone;
-
- for elem in iter_pin_mut(self.elems.as_mut()) {
- match elem.try_poll(cx) {
- Poll::Pending => state = FinalState::Pending,
- Poll::Ready(Ok(())) => {}
- Poll::Ready(Err(e)) => {
- state = FinalState::Error(e);
- break;
+ match &mut self.kind {
+ TryJoinAllKind::Small { elems } => {
+ let mut state = FinalState::AllDone;
+
+ for elem in join_all::iter_pin_mut(elems.as_mut()) {
+ match elem.try_poll(cx) {
+ Poll::Pending => state = FinalState::Pending,
+ Poll::Ready(Ok(())) => {}
+ Poll::Ready(Err(e)) => {
+ state = FinalState::Error(e);
+ break;
+ }
+ }
}
- }
- }
- match state {
- FinalState::Pending => Poll::Pending,
- FinalState::AllDone => {
- let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let results =
- iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
- Poll::Ready(Ok(results))
- }
- FinalState::Error(e) => {
- let _ = mem::replace(&mut self.elems, Box::pin([]));
- Poll::Ready(Err(e))
+ match state {
+ FinalState::Pending => Poll::Pending,
+ FinalState::AllDone => {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let results = join_all::iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_output().unwrap())
+ .collect();
+ Poll::Ready(Ok(results))
+ }
+ FinalState::Error(e) => {
+ let _ = mem::replace(elems, Box::pin([]));
+ Poll::Ready(Err(e))
+ }
+ }
}
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
-impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
+impl<F> FromIterator<F> for TryJoinAll<F>
+where
+ F: TryFuture,
+{
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
try_join_all(iter)
}
diff --git a/vendor/futures-util/src/future/try_select.rs b/vendor/futures-util/src/future/try_select.rs
index 4d0b7ff13..bc282f7db 100644
--- a/vendor/futures-util/src/future/try_select.rs
+++ b/vendor/futures-util/src/future/try_select.rs
@@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
+type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
+type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;
+
/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
@@ -52,10 +55,9 @@ where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
- super::assert_future::<
- Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
- _,
- >(TrySelect { inner: Some((future1, future2)) })
+ super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
+ inner: Some((future1, future2)),
+ })
}
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
@@ -63,8 +65,7 @@ where
A: TryFuture,
B: TryFuture,
{
- #[allow(clippy::type_complexity)]
- type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
+ type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");