diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/futures-util/src/future | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-util/src/future')
30 files changed, 4700 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/future/abortable.rs b/third_party/rust/futures-util/src/future/abortable.rs new file mode 100644 index 0000000000..d017ab7340 --- /dev/null +++ b/third_party/rust/futures-util/src/future/abortable.rs @@ -0,0 +1,19 @@ +use super::assert_future; +use crate::future::{AbortHandle, Abortable, Aborted}; +use futures_core::future::Future; + +/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it. +/// +/// This function is a convenient (but less flexible) alternative to calling +/// `AbortHandle::new` and `Abortable::new` manually. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle) +where + Fut: Future, +{ + let (handle, reg) = AbortHandle::new_pair(); + let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg)); + (abortable, handle) +} diff --git a/third_party/rust/futures-util/src/future/either.rs b/third_party/rust/futures-util/src/future/either.rs new file mode 100644 index 0000000000..27e5064dfb --- /dev/null +++ b/third_party/rust/futures-util/src/future/either.rs @@ -0,0 +1,317 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +/// Combines two different futures, streams, or sinks having the same associated types into a single type. +/// +/// This is useful when conditionally choosing between two distinct future types: +/// +/// ```rust +/// use futures::future::Either; +/// +/// # futures::executor::block_on(async { +/// let cond = true; +/// +/// let fut = if cond { +/// Either::Left(async move { 12 }) +/// } else { +/// Either::Right(async move { 44 }) +/// }; +/// +/// assert_eq!(fut.await, 12); +/// # }) +/// ``` +#[derive(Debug, Clone)] +pub enum Either<A, B> { + /// First branch of the type + Left(/* #[pin] */ A), + /// Second branch of the type + Right(/* #[pin] */ B), +} + +impl<A, B> Either<A, B> { + /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> { + // SAFETY: We can use `new_unchecked` because the `inner` parts are + // guaranteed to be pinned, as they come from `self` which is pinned. + unsafe { + match *Pin::get_ref(self) { + Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)), + } + } + } + + /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + // SAFETY: `get_unchecked_mut` is fine because we don't move anything. + // We can use `new_unchecked` because the `inner` parts are guaranteed + // to be pinned, as they come from `self` which is pinned, and we never + // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We + // also don't have an implementation of `Drop`, nor manual `Unpin`. + unsafe { + match *Pin::get_unchecked_mut(self) { + Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)), + } + } + } +} + +impl<A, B, T> Either<(T, A), (T, B)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the first element of the pairs. + pub fn factor_first(self) -> (T, Either<A, B>) { + match self { + Either::Left((x, a)) => (x, Either::Left(a)), + Either::Right((x, b)) => (x, Either::Right(b)), + } + } +} + +impl<A, B, T> Either<(A, T), (B, T)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the second element of the pairs. + pub fn factor_second(self) -> (Either<A, B>, T) { + match self { + Either::Left((a, x)) => (Either::Left(a), x), + Either::Right((b, x)) => (Either::Right(b), x), + } + } +} + +impl<T> Either<T, T> { + /// Extract the value of an either over two equivalent types. + pub fn into_inner(self) -> T { + match self { + Either::Left(x) => x, + Either::Right(x) => x, + } + } +} + +impl<A, B> Future for Either<A, B> +where + A: Future, + B: Future<Output = A::Output>, +{ + type Output = A::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.as_pin_mut() { + Either::Left(x) => x.poll(cx), + Either::Right(x) => x.poll(cx), + } + } +} + +impl<A, B> FusedFuture for Either<A, B> +where + A: FusedFuture, + B: FusedFuture<Output = A::Output>, +{ + fn is_terminated(&self) -> bool { + match self { + Either::Left(x) => x.is_terminated(), + Either::Right(x) => x.is_terminated(), + } + } +} + +impl<A, B> Stream for Either<A, B> +where + A: Stream, + B: Stream<Item = A::Item>, +{ + type Item = A::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_next(cx), + Either::Right(x) => x.poll_next(cx), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + Either::Left(x) => x.size_hint(), + Either::Right(x) => x.size_hint(), + } + } +} + +impl<A, B> FusedStream for Either<A, B> +where + A: FusedStream, + B: FusedStream<Item = A::Item>, +{ + fn is_terminated(&self) -> bool { + match self { + Either::Left(x) => x.is_terminated(), + Either::Right(x) => x.is_terminated(), + } + } +} + +#[cfg(feature = "sink")] +impl<A, B, Item> Sink<Item> for Either<A, B> +where + A: Sink<Item>, + B: Sink<Item, Error = A::Error>, +{ + type Error = A::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_ready(cx), + Either::Right(x) => x.poll_ready(cx), + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + match self.as_pin_mut() { + Either::Left(x) => x.start_send(item), + Either::Right(x) => x.start_send(item), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_flush(cx), + Either::Right(x) => x.poll_flush(cx), + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_close(cx), + Either::Right(x) => x.poll_close(cx), + } + } +} + +#[cfg(feature = "io")] +#[cfg(feature = "std")] +mod if_std { + use super::*; + + use core::pin::Pin; + use core::task::{Context, Poll}; + use futures_io::{ + AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom, + }; + + impl<A, B> AsyncRead for Either<A, B> + where + A: AsyncRead, + B: AsyncRead, + { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<usize>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_read(cx, buf), + Either::Right(x) => x.poll_read(cx, buf), + } + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<Result<usize>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_read_vectored(cx, bufs), + Either::Right(x) => x.poll_read_vectored(cx, bufs), + } + } + } + + impl<A, B> AsyncWrite for Either<A, B> + where + A: AsyncWrite, + B: AsyncWrite, + { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_write(cx, buf), + Either::Right(x) => x.poll_write(cx, buf), + } + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<Result<usize>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_write_vectored(cx, bufs), + Either::Right(x) => x.poll_write_vectored(cx, bufs), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_flush(cx), + Either::Right(x) => x.poll_flush(cx), + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_close(cx), + Either::Right(x) => x.poll_close(cx), + } + } + } + + impl<A, B> AsyncSeek for Either<A, B> + where + A: AsyncSeek, + B: AsyncSeek, + { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<Result<u64>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_seek(cx, pos), + Either::Right(x) => x.poll_seek(cx, pos), + } + } + } + + impl<A, B> AsyncBufRead for Either<A, B> + where + A: AsyncBufRead, + B: AsyncBufRead, + { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { + match self.as_pin_mut() { + Either::Left(x) => x.poll_fill_buf(cx), + Either::Right(x) => x.poll_fill_buf(cx), + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + match self.as_pin_mut() { + Either::Left(x) => x.consume(amt), + Either::Right(x) => x.consume(amt), + } + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/catch_unwind.rs b/third_party/rust/futures-util/src/future/future/catch_unwind.rs new file mode 100644 index 0000000000..0e09d6eeb0 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/catch_unwind.rs @@ -0,0 +1,38 @@ +use core::any::Any; +use core::pin::Pin; +use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; + +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CatchUnwind<Fut> { + #[pin] + future: Fut, + } +} + +impl<Fut> CatchUnwind<Fut> +where + Fut: Future + UnwindSafe, +{ + pub(super) fn new(future: Fut) -> Self { + Self { future } + } +} + +impl<Fut> Future for CatchUnwind<Fut> +where + Fut: Future + UnwindSafe, +{ + type Output = Result<Fut::Output, Box<dyn Any + Send>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let f = self.project().future; + catch_unwind(AssertUnwindSafe(|| f.poll(cx)))?.map(Ok) + } +} diff --git a/third_party/rust/futures-util/src/future/future/flatten.rs b/third_party/rust/futures-util/src/future/future/flatten.rs new file mode 100644 index 0000000000..bd767af344 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/flatten.rs @@ -0,0 +1,153 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + #[project = FlattenProj] + #[derive(Debug)] + pub enum Flatten<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } +} + +impl<Fut1, Fut2> Flatten<Fut1, Fut2> { + pub(crate) fn new(future: Fut1) -> Self { + Self::First { f: future } + } +} + +impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> +where + Fut: Future, + Fut::Output: Future, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } +} + +impl<Fut> Future for Flatten<Fut, Fut::Output> +where + Fut: Future, + Fut::Output: Future, +{ + type Output = <Fut::Output as Future>::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(loop { + match self.as_mut().project() { + FlattenProj::First { f } => { + let f = ready!(f.poll(cx)); + self.set(Self::Second { f }); + } + FlattenProj::Second { f } => { + let output = ready!(f.poll(cx)); + self.set(Self::Empty); + break output; + } + FlattenProj::Empty => panic!("Flatten polled after completion"), + } + }) + } +} + +impl<Fut> FusedStream for Flatten<Fut, Fut::Output> +where + Fut: Future, + Fut::Output: Stream, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } +} + +impl<Fut> Stream for Flatten<Fut, Fut::Output> +where + Fut: Future, + Fut::Output: Stream, +{ + type Item = <Fut::Output as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(loop { + match self.as_mut().project() { + FlattenProj::First { f } => { + let f = ready!(f.poll(cx)); + self.set(Self::Second { f }); + } + FlattenProj::Second { f } => { + let output = ready!(f.poll_next(cx)); + if output.is_none() { + self.set(Self::Empty); + } + break output; + } + FlattenProj::Empty => break None, + } + }) + } +} + +#[cfg(feature = "sink")] +impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output> +where + Fut: Future, + Fut::Output: Sink<Item>, +{ + type Error = <Fut::Output as Sink<Item>>::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(loop { + match self.as_mut().project() { + FlattenProj::First { f } => { + let f = ready!(f.poll(cx)); + self.set(Self::Second { f }); + } + FlattenProj::Second { f } => { + break ready!(f.poll_ready(cx)); + } + FlattenProj::Empty => panic!("poll_ready called after eof"), + } + }) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + match self.project() { + FlattenProj::First { .. } => panic!("poll_ready not called first"), + FlattenProj::Second { f } => f.start_send(item), + FlattenProj::Empty => panic!("start_send called after eof"), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.project() { + FlattenProj::First { .. } => Poll::Ready(Ok(())), + FlattenProj::Second { f } => f.poll_flush(cx), + FlattenProj::Empty => panic!("poll_flush called after eof"), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let res = match self.as_mut().project() { + FlattenProj::Second { f } => f.poll_close(cx), + _ => Poll::Ready(Ok(())), + }; + if res.is_ready() { + self.set(Self::Empty); + } + res + } +} diff --git a/third_party/rust/futures-util/src/future/future/fuse.rs b/third_party/rust/futures-util/src/future/future/fuse.rs new file mode 100644 index 0000000000..597aec1a40 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/fuse.rs @@ -0,0 +1,93 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`fuse`](super::FutureExt::fuse) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Fuse<Fut> { + #[pin] + inner: Option<Fut>, + } +} + +impl<Fut> Fuse<Fut> { + pub(super) fn new(f: Fut) -> Self { + Self { inner: Some(f) } + } +} + +impl<Fut: Future> Fuse<Fut> { + /// Creates a new `Fuse`-wrapped future which is already terminated. + /// + /// This can be useful in combination with looping and the `select!` + /// macro, which bypasses terminated futures. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::future::{Fuse, FusedFuture, FutureExt}; + /// use futures::select; + /// use futures::stream::StreamExt; + /// use futures::pin_mut; + /// + /// let (sender, mut stream) = mpsc::unbounded(); + /// + /// // Send a few messages into the stream + /// sender.unbounded_send(()).unwrap(); + /// sender.unbounded_send(()).unwrap(); + /// drop(sender); + /// + /// // Use `Fuse::terminated()` to create an already-terminated future + /// // which may be instantiated later. + /// let foo_printer = Fuse::terminated(); + /// pin_mut!(foo_printer); + /// + /// loop { + /// select! { + /// _ = foo_printer => {}, + /// () = stream.select_next_some() => { + /// if !foo_printer.is_terminated() { + /// println!("Foo is already being printed!"); + /// } else { + /// foo_printer.set(async { + /// // do some other async operations + /// println!("Printing foo from `foo_printer` future"); + /// }.fuse()); + /// } + /// }, + /// complete => break, // `foo_printer` is terminated and the stream is done + /// } + /// } + /// # }); + /// ``` + pub fn terminated() -> Self { + Self { inner: None } + } +} + +impl<Fut: Future> FusedFuture for Fuse<Fut> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<Fut: Future> Future for Fuse<Fut> { + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { + Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { + Some(fut) => { + let output = ready!(fut.poll(cx)); + self.project().inner.set(None); + output + } + None => return Poll::Pending, + }) + } +} diff --git a/third_party/rust/futures-util/src/future/future/map.rs b/third_party/rust/futures-util/src/future/future/map.rs new file mode 100644 index 0000000000..7471aba000 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/map.rs @@ -0,0 +1,66 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +use crate::fns::FnOnce1; + +pin_project! { + /// Internal Map future + #[project = MapProj] + #[project_replace = MapProjReplace] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub enum Map<Fut, F> { + Incomplete { + #[pin] + future: Fut, + f: F, + }, + Complete, + } +} + +impl<Fut, F> Map<Fut, F> { + /// Creates a new Map. + pub(crate) fn new(future: Fut, f: F) -> Self { + Self::Incomplete { future, f } + } +} + +impl<Fut, F, T> FusedFuture for Map<Fut, F> +where + Fut: Future, + F: FnOnce1<Fut::Output, Output = T>, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Incomplete { .. } => false, + Self::Complete => true, + } + } +} + +impl<Fut, F, T> Future for Map<Fut, F> +where + Fut: Future, + F: FnOnce1<Fut::Output, Output = T>, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + match self.as_mut().project() { + MapProj::Incomplete { future, .. } => { + let output = ready!(future.poll(cx)); + match self.project_replace(Map::Complete) { + MapProjReplace::Incomplete { f, .. } => Poll::Ready(f.call_once(output)), + MapProjReplace::Complete => unreachable!(), + } + } + MapProj::Complete => { + panic!("Map must not be polled after it returned `Poll::Ready`") + } + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/mod.rs b/third_party/rust/futures-util/src/future/future/mod.rs new file mode 100644 index 0000000000..c11d108207 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/mod.rs @@ -0,0 +1,610 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the `FutureExt` trait which adds methods to `Future` types. + +#[cfg(feature = "alloc")] +use alloc::boxed::Box; +use core::pin::Pin; + +use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; +use crate::future::{assert_future, Either}; +use crate::never::Never; +use crate::stream::assert_stream; +#[cfg(feature = "alloc")] +use futures_core::future::{BoxFuture, LocalBoxFuture}; +use futures_core::{ + future::Future, + stream::Stream, + task::{Context, Poll}, +}; +use pin_utils::pin_mut; + +// Combinators + +mod flatten; +mod fuse; +mod map; + +delegate_all!( + /// Future for the [`flatten`](super::FutureExt::flatten) method. + Flatten<F>( + flatten::Flatten<F, <F as Future>::Output> + ): Debug + Future + FusedFuture + New[|x: F| flatten::Flatten::new(x)] + where F: Future +); + +delegate_all!( + /// Stream for the [`flatten_stream`](FutureExt::flatten_stream) method. + FlattenStream<F>( + flatten::Flatten<F, <F as Future>::Output> + ): Debug + Sink + Stream + FusedStream + New[|x: F| flatten::Flatten::new(x)] + where F: Future +); + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use fuse::Fuse; + +delegate_all!( + /// Future for the [`map`](super::FutureExt::map) method. + Map<Fut, F>( + map::Map<Fut, F> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)] +); + +delegate_all!( + /// Stream for the [`into_stream`](FutureExt::into_stream) method. + IntoStream<F>( + crate::stream::Once<F> + ): Debug + Stream + FusedStream + New[|x: F| crate::stream::Once::new(x)] +); + +delegate_all!( + /// Future for the [`map_into`](FutureExt::map_into) combinator. + MapInto<Fut, T>( + Map<Fut, IntoFn<T>> + ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, into_fn())] +); + +delegate_all!( + /// Future for the [`then`](FutureExt::then) method. + Then<Fut1, Fut2, F>( + flatten::Flatten<Map<Fut1, F>, Fut2> + ): Debug + Future + FusedFuture + New[|x: Fut1, y: F| flatten::Flatten::new(Map::new(x, y))] +); + +delegate_all!( + /// Future for the [`inspect`](FutureExt::inspect) method. + Inspect<Fut, F>( + map::Map<Fut, InspectFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, inspect_fn(f))] +); + +delegate_all!( + /// Future for the [`never_error`](super::FutureExt::never_error) combinator. + NeverError<Fut>( + Map<Fut, OkFn<Never>> + ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] +); + +delegate_all!( + /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator. + UnitError<Fut>( + Map<Fut, OkFn<()>> + ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] +); + +#[cfg(feature = "std")] +mod catch_unwind; +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::catch_unwind::CatchUnwind; + +#[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] +#[cfg(feature = "std")] +mod remote_handle; +#[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::remote_handle::{Remote, RemoteHandle}; + +#[cfg(feature = "std")] +mod shared; +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::shared::{Shared, WeakShared}; + +impl<T: ?Sized> FutureExt for T where T: Future {} + +/// An extension trait for `Future`s that provides a variety of convenient +/// adapters. +pub trait FutureExt: Future { + /// Map this future's output to a different type, returning a new future of + /// the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying future. This is useful to + /// chain along a computation once a future has been resolved. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 1 }; + /// let new_future = future.map(|x| x + 3); + /// assert_eq!(new_future.await, 4); + /// # }); + /// ``` + fn map<U, F>(self, f: F) -> Map<Self, F> + where + F: FnOnce(Self::Output) -> U, + Self: Sized, + { + assert_future::<U, _>(Map::new(self, f)) + } + + /// Map this future's output to a different type, returning a new future of + /// the resulting type. + /// + /// This function is equivalent to calling `map(Into::into)` but allows naming + /// the return type. + fn map_into<U>(self) -> MapInto<Self, U> + where + Self::Output: Into<U>, + Self: Sized, + { + assert_future::<U, _>(MapInto::new(self)) + } + + /// Chain on a computation for when a future finished, passing the result of + /// the future to the provided closure `f`. + /// + /// The returned value of the closure must implement the `Future` trait + /// and can represent some more work to be done before the composed future + /// is finished. + /// + /// The closure `f` is only run *after* successful completion of the `self` + /// future. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future_of_1 = async { 1 }; + /// let future_of_4 = future_of_1.then(|x| async move { x + 3 }); + /// assert_eq!(future_of_4.await, 4); + /// # }); + /// ``` + fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> + where + F: FnOnce(Self::Output) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::<Fut::Output, _>(Then::new(self, f)) + } + + /// Wrap this future in an `Either` future, making it the left-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `right_future` method to write `if` + /// statements that evaluate to different futures in different branches. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let x = 6; + /// let future = if x < 10 { + /// async { true }.left_future() + /// } else { + /// async { false }.right_future() + /// }; + /// + /// assert_eq!(future.await, true); + /// # }); + /// ``` + fn left_future<B>(self) -> Either<Self, B> + where + B: Future<Output = Self::Output>, + Self: Sized, + { + assert_future::<Self::Output, _>(Either::Left(self)) + } + + /// Wrap this future in an `Either` future, making it the right-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `left_future` method to write `if` + /// statements that evaluate to different futures in different branches. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let x = 6; + /// let future = if x > 10 { + /// async { true }.left_future() + /// } else { + /// async { false }.right_future() + /// }; + /// + /// assert_eq!(future.await, false); + /// # }); + /// ``` + fn right_future<A>(self) -> Either<A, Self> + where + A: Future<Output = Self::Output>, + Self: Sized, + { + assert_future::<Self::Output, _>(Either::Right(self)) + } + + /// Convert this future into a single element stream. + /// + /// The returned stream contains single success if this future resolves to + /// success or single error if this future resolves into error. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::stream::StreamExt; + /// + /// let future = async { 17 }; + /// let stream = future.into_stream(); + /// let collected: Vec<_> = stream.collect().await; + /// assert_eq!(collected, vec![17]); + /// # }); + /// ``` + fn into_stream(self) -> IntoStream<Self> + where + Self: Sized, + { + assert_stream::<Self::Output, _>(IntoStream::new(self)) + } + + /// Flatten the execution of this future when the output of this + /// future is itself another future. + /// + /// This can be useful when combining futures together to flatten the + /// computation out the final result. + /// + /// This method is roughly equivalent to `self.then(|x| x)`. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let nested_future = async { async { 1 } }; + /// let future = nested_future.flatten(); + /// assert_eq!(future.await, 1); + /// # }); + /// ``` + fn flatten(self) -> Flatten<Self> + where + Self::Output: Future, + Self: Sized, + { + let f = Flatten::new(self); + assert_future::<<<Self as Future>::Output as Future>::Output, _>(f) + } + + /// Flatten the execution of this future when the successful result of this + /// future is a stream. + /// + /// This can be useful when stream initialization is deferred, and it is + /// convenient to work with that stream as if stream was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::stream::{self, StreamExt}; + /// + /// let stream_items = vec![17, 18, 19]; + /// let future_of_a_stream = async { stream::iter(stream_items) }; + /// + /// let stream = future_of_a_stream.flatten_stream(); + /// let list: Vec<_> = stream.collect().await; + /// assert_eq!(list, vec![17, 18, 19]); + /// # }); + /// ``` + fn flatten_stream(self) -> FlattenStream<Self> + where + Self::Output: Stream, + Self: Sized, + { + assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self)) + } + + /// Fuse a future such that `poll` will never again be called once it has + /// completed. This method can be used to turn any `Future` into a + /// `FusedFuture`. + /// + /// Normally, once a future has returned `Poll::Ready` from `poll`, + /// any further calls could exhibit bad behavior such as blocking + /// forever, panicking, never returning, etc. If it is known that `poll` + /// may be called too often then this method can be used to ensure that it + /// has defined semantics. + /// + /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready` + /// previously, it will return `Poll::Pending`, from `poll` again (and will + /// continue to do so for all future calls to `poll`). + /// + /// This combinator will drop the underlying future as soon as it has been + /// completed to ensure resources are reclaimed as soon as possible. + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + let f = Fuse::new(self); + assert_future::<Self::Output, _>(f) + } + + /// Do something with the output of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 1 }; + /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x)); + /// assert_eq!(new_future.await, 1); + /// # }); + /// ``` + fn inspect<F>(self, f: F) -> Inspect<Self, F> + where + F: FnOnce(&Self::Output), + Self: Sized, + { + assert_future::<Self::Output, _>(Inspect::new(self, f)) + } + + /// Catches unwinding panics while polling the future. + /// + /// In general, panics within a future can propagate all the way out to the + /// task level. This combinator makes it possible to halt unwinding within + /// the future itself. It's most commonly used within task executors. It's + /// not recommended to use this for error handling. + /// + /// Note that this method requires the `UnwindSafe` bound from the standard + /// library. This isn't always applied automatically, and the standard + /// library provides an `AssertUnwindSafe` wrapper type to apply it + /// after-the fact. To assist using this method, the `Future` trait is also + /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{self, FutureExt, Ready}; + /// + /// let future = future::ready(2); + /// assert!(future.catch_unwind().await.is_ok()); + /// + /// let future = future::lazy(|_| -> Ready<i32> { + /// unimplemented!() + /// }); + /// assert!(future.catch_unwind().await.is_err()); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn catch_unwind(self) -> CatchUnwind<Self> + where + Self: Sized + ::std::panic::UnwindSafe, + { + assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new( + self, + )) + } + + /// Create a cloneable handle to this future where all handles will resolve + /// to the same result. + /// + /// The `shared` combinator method provides a method to convert any future + /// into a cloneable future. It enables a future to be polled by multiple + /// threads. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 6 }; + /// let shared1 = future.shared(); + /// let shared2 = shared1.clone(); + /// + /// assert_eq!(6, shared1.await); + /// assert_eq!(6, shared2.await); + /// # }); + /// ``` + /// + /// ``` + /// // Note, unlike most examples this is written in the context of a + /// // synchronous function to better illustrate the cross-thread aspect of + /// // the `shared` combinator. + /// + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::executor::block_on; + /// use std::thread; + /// + /// let future = async { 6 }; + /// let shared1 = future.shared(); + /// let shared2 = shared1.clone(); + /// let join_handle = thread::spawn(move || { + /// assert_eq!(6, block_on(shared2)); + /// }); + /// assert_eq!(6, shared1.await); + /// join_handle.join().unwrap(); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn shared(self) -> Shared<Self> + where + Self: Sized, + Self::Output: Clone, + { + assert_future::<Self::Output, _>(Shared::new(self)) + } + + /// Turn this future into a future that yields `()` on completion and sends + /// its output to another future on a separate task. + /// + /// This can be used with spawning executors to easily retrieve the result + /// of a future executing on a separate task or thread. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "channel")] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] + #[cfg(feature = "std")] + fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) + where + Self: Sized, + { + let (wrapped, handle) = remote_handle::remote_handle(self); + (assert_future::<(), _>(wrapped), handle) + } + + /// Wrap the future in a Box, pinning it. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "alloc")] + fn boxed<'a>(self) -> BoxFuture<'a, Self::Output> + where + Self: Sized + Send + 'a, + { + assert_future::<Self::Output, _>(Box::pin(self)) + } + + /// Wrap the future in a Box, pinning it. + /// + /// Similar to `boxed`, but without the `Send` requirement. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "alloc")] + fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> + where + Self: Sized + 'a, + { + assert_future::<Self::Output, _>(Box::pin(self)) + } + + /// Turns a [`Future<Output = T>`](Future) into a + /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture). + fn unit_error(self) -> UnitError<Self> + where + Self: Sized, + { + assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self)) + } + + /// Turns a [`Future<Output = T>`](Future) into a + /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture). + fn never_error(self) -> NeverError<Self> + where + Self: Sized, + { + assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self)) + } + + /// A convenience for calling `Future::poll` on `Unpin` future types. + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> + where + Self: Unpin, + { + Pin::new(self).poll(cx) + } + + /// Evaluates and consumes the future, returning the resulting output if + /// the future is ready after the first call to `Future::poll`. + /// + /// If `poll` instead returns `Poll::Pending`, `None` is returned. + /// + /// This method is useful in cases where immediacy is more important than + /// waiting for a result. It is also convenient for quickly obtaining + /// the value of a future that is known to always resolve immediately. + /// + /// # Examples + /// + /// ``` + /// # use futures::prelude::*; + /// use futures::{future::ready, future::pending}; + /// let future_ready = ready("foobar"); + /// let future_pending = pending::<&'static str>(); + /// + /// assert_eq!(future_ready.now_or_never(), Some("foobar")); + /// assert_eq!(future_pending.now_or_never(), None); + /// ``` + /// + /// In cases where it is absolutely known that a future should always + /// resolve immediately and never return `Poll::Pending`, this method can + /// be combined with `expect()`: + /// + /// ``` + /// # use futures::{prelude::*, future::ready}; + /// let future_ready = ready("foobar"); + /// + /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar"); + /// ``` + fn now_or_never(self) -> Option<Self::Output> + where + Self: Sized, + { + let noop_waker = crate::task::noop_waker(); + let mut cx = Context::from_waker(&noop_waker); + + let this = self; + pin_mut!(this); + match this.poll(&mut cx) { + Poll::Ready(x) => Some(x), + _ => None, + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/remote_handle.rs b/third_party/rust/futures-util/src/future/future/remote_handle.rs new file mode 100644 index 0000000000..1358902cab --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/remote_handle.rs @@ -0,0 +1,126 @@ +use { + crate::future::{CatchUnwind, FutureExt}, + futures_channel::oneshot::{self, Receiver, Sender}, + futures_core::{ + future::Future, + ready, + task::{Context, Poll}, + }, + pin_project_lite::pin_project, + std::{ + any::Any, + fmt, + panic::{self, AssertUnwindSafe}, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + }, +}; + +/// The handle to a remote future returned by +/// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this, +/// the remote future will be woken up to be dropped by the executor. +/// +/// ## Unwind safety +/// +/// When the remote future panics, [Remote] will catch the unwind and transfer it to +/// the thread where `RemoteHandle` is being awaited. This is good for the common +/// case where [Remote] is spawned on a threadpool. It is unlikely that other code +/// in the executor working thread shares mutable data with the spawned future and we +/// preserve the executor from losing its working threads. +/// +/// If you run the future locally and send the handle of to be awaited elsewhere, you +/// must be careful with regard to unwind safety because the thread in which the future +/// is polled will keep running after the panic and the thread running the [RemoteHandle] +/// will unwind. +#[must_use = "dropping a remote handle cancels the underlying future"] +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] +pub struct RemoteHandle<T> { + rx: Receiver<thread::Result<T>>, + keep_running: Arc<AtomicBool>, +} + +impl<T> RemoteHandle<T> { + /// Drops this handle *without* canceling the underlying future. + /// + /// This method can be used if you want to drop the handle, but let the + /// execution continue. + pub fn forget(self) { + self.keep_running.store(true, Ordering::SeqCst); + } +} + +impl<T: 'static> Future for RemoteHandle<T> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + match ready!(self.rx.poll_unpin(cx)) { + Ok(Ok(output)) => Poll::Ready(output), + // the remote future panicked. + Ok(Err(e)) => panic::resume_unwind(e), + // The oneshot sender was dropped. + Err(e) => panic::resume_unwind(Box::new(e)), + } + } +} + +type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>; + +pin_project! { + /// A future which sends its output to the corresponding `RemoteHandle`. + /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] + pub struct Remote<Fut: Future> { + tx: Option<Sender<SendMsg<Fut>>>, + keep_running: Arc<AtomicBool>, + #[pin] + future: CatchUnwind<AssertUnwindSafe<Fut>>, + } +} + +impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Remote").field(&self.future).finish() + } +} + +impl<Fut: Future> Future for Remote<Fut> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let this = self.project(); + + if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready() + && !this.keep_running.load(Ordering::SeqCst) + { + // Cancelled, bail out + return Poll::Ready(()); + } + + let output = ready!(this.future.poll(cx)); + + // if the receiving end has gone away then that's ok, we just ignore the + // send error here. + drop(this.tx.take().unwrap().send(output)); + Poll::Ready(()) + } +} + +pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) { + let (tx, rx) = oneshot::channel(); + let keep_running = Arc::new(AtomicBool::new(false)); + + // Unwind Safety: See the docs for RemoteHandle. + let wrapped = Remote { + future: AssertUnwindSafe(future).catch_unwind(), + tx: Some(tx), + keep_running: keep_running.clone(), + }; + + (wrapped, RemoteHandle { rx, keep_running }) +} diff --git a/third_party/rust/futures-util/src/future/future/shared.rs b/third_party/rust/futures-util/src/future/future/shared.rs new file mode 100644 index 0000000000..ecd1b426db --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/shared.rs @@ -0,0 +1,413 @@ +use crate::task::{waker_ref, ArcWake}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; +use slab::Slab; +use std::cell::UnsafeCell; +use std::fmt; +use std::hash::Hasher; +use std::pin::Pin; +use std::ptr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, SeqCst}; +use std::sync::{Arc, Mutex, Weak}; + +/// Future for the [`shared`](super::FutureExt::shared) method. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Shared<Fut: Future> { + inner: Option<Arc<Inner<Fut>>>, + waker_key: usize, +} + +struct Inner<Fut: Future> { + future_or_output: UnsafeCell<FutureOrOutput<Fut>>, + notifier: Arc<Notifier>, +} + +struct Notifier { + state: AtomicUsize, + wakers: Mutex<Option<Slab<Option<Waker>>>>, +} + +/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`. +pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>); + +impl<Fut: Future> Clone for WeakShared<Fut> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +// The future itself is polled behind the `Arc`, so it won't be moved +// when `Shared` is moved. +impl<Fut: Future> Unpin for Shared<Fut> {} + +impl<Fut: Future> fmt::Debug for Shared<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("inner", &self.inner) + .field("waker_key", &self.waker_key) + .finish() + } +} + +impl<Fut: Future> fmt::Debug for Inner<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Inner").finish() + } +} + +impl<Fut: Future> fmt::Debug for WeakShared<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakShared").finish() + } +} + +enum FutureOrOutput<Fut: Future> { + Future(Fut), + Output(Fut::Output), +} + +unsafe impl<Fut> Send for Inner<Fut> +where + Fut: Future + Send, + Fut::Output: Send + Sync, +{ +} + +unsafe impl<Fut> Sync for Inner<Fut> +where + Fut: Future + Send, + Fut::Output: Send + Sync, +{ +} + +const IDLE: usize = 0; +const POLLING: usize = 1; +const COMPLETE: usize = 2; +const POISONED: usize = 3; + +const NULL_WAKER_KEY: usize = usize::max_value(); + +impl<Fut: Future> Shared<Fut> { + pub(super) fn new(future: Fut) -> Self { + let inner = Inner { + future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)), + notifier: Arc::new(Notifier { + state: AtomicUsize::new(IDLE), + wakers: Mutex::new(Some(Slab::new())), + }), + }; + + Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY } + } +} + +impl<Fut> Shared<Fut> +where + Fut: Future, +{ + /// Returns [`Some`] containing a reference to this [`Shared`]'s output if + /// it has already been computed by a clone or [`None`] if it hasn't been + /// computed yet or this [`Shared`] already returned its output from + /// [`poll`](Future::poll). + pub fn peek(&self) -> Option<&Fut::Output> { + if let Some(inner) = self.inner.as_ref() { + match inner.notifier.state.load(SeqCst) { + COMPLETE => unsafe { return Some(inner.output()) }, + POISONED => panic!("inner future panicked during poll"), + _ => {} + } + } + None + } + + /// Creates a new [`WeakShared`] for this [`Shared`]. + /// + /// Returns [`None`] if it has already been polled to completion. + pub fn downgrade(&self) -> Option<WeakShared<Fut>> { + if let Some(inner) = self.inner.as_ref() { + return Some(WeakShared(Arc::downgrade(inner))); + } + None + } + + /// Gets the number of strong pointers to this allocation. + /// + /// Returns [`None`] if it has already been polled to completion. + /// + /// # Safety + /// + /// This method by itself is safe, but using it correctly requires extra care. Another thread + /// can change the strong count at any time, including potentially between calling this method + /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] + pub fn strong_count(&self) -> Option<usize> { + self.inner.as_ref().map(|arc| Arc::strong_count(arc)) + } + + /// Gets the number of weak pointers to this allocation. + /// + /// Returns [`None`] if it has already been polled to completion. + /// + /// # Safety + /// + /// This method by itself is safe, but using it correctly requires extra care. Another thread + /// can change the weak count at any time, including potentially between calling this method + /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] + pub fn weak_count(&self) -> Option<usize> { + self.inner.as_ref().map(|arc| Arc::weak_count(arc)) + } + + /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`. + pub fn ptr_hash<H: Hasher>(&self, state: &mut H) { + match self.inner.as_ref() { + Some(arc) => { + state.write_u8(1); + ptr::hash(Arc::as_ptr(arc), state); + } + None => { + state.write_u8(0); + } + } + } + + /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to + /// `Arc::ptr_eq`). + /// + /// Returns `false` if either `Shared` has terminated. + pub fn ptr_eq(&self, rhs: &Self) -> bool { + let lhs = match self.inner.as_ref() { + Some(lhs) => lhs, + None => return false, + }; + let rhs = match rhs.inner.as_ref() { + Some(rhs) => rhs, + None => return false, + }; + Arc::ptr_eq(lhs, rhs) + } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, +{ + /// Safety: callers must first ensure that `self.inner.state` + /// is `COMPLETE` + unsafe fn output(&self) -> &Fut::Output { + match &*self.future_or_output.get() { + FutureOrOutput::Output(ref item) => item, + FutureOrOutput::Future(_) => unreachable!(), + } + } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + /// Registers the current task to receive a wakeup when we are awoken. + fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { + let mut wakers_guard = self.notifier.wakers.lock().unwrap(); + + let wakers = match wakers_guard.as_mut() { + Some(wakers) => wakers, + None => return, + }; + + let new_waker = cx.waker(); + + if *waker_key == NULL_WAKER_KEY { + *waker_key = wakers.insert(Some(new_waker.clone())); + } else { + match wakers[*waker_key] { + Some(ref old_waker) if new_waker.will_wake(old_waker) => {} + // Could use clone_from here, but Waker doesn't specialize it. + ref mut slot => *slot = Some(new_waker.clone()), + } + } + debug_assert!(*waker_key != NULL_WAKER_KEY); + } + + /// Safety: callers must first ensure that `inner.state` + /// is `COMPLETE` + unsafe fn take_or_clone_output(self: Arc<Self>) -> Fut::Output { + match Arc::try_unwrap(self) { + Ok(inner) => match inner.future_or_output.into_inner() { + FutureOrOutput::Output(item) => item, + FutureOrOutput::Future(_) => unreachable!(), + }, + Err(inner) => inner.output().clone(), + } + } +} + +impl<Fut> FusedFuture for Shared<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<Fut> Future for Shared<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + + let inner = this.inner.take().expect("Shared future polled again after completion"); + + // Fast path for when the wrapped future has already completed + if inner.notifier.state.load(Acquire) == COMPLETE { + // Safety: We're in the COMPLETE state + return unsafe { Poll::Ready(inner.take_or_clone_output()) }; + } + + inner.record_waker(&mut this.waker_key, cx); + + match inner + .notifier + .state + .compare_exchange(IDLE, POLLING, SeqCst, SeqCst) + .unwrap_or_else(|x| x) + { + IDLE => { + // Lock acquired, fall through + } + POLLING => { + // Another task is currently polling, at this point we just want + // to ensure that the waker for this task is registered + this.inner = Some(inner); + return Poll::Pending; + } + COMPLETE => { + // Safety: We're in the COMPLETE state + return unsafe { Poll::Ready(inner.take_or_clone_output()) }; + } + POISONED => panic!("inner future panicked during poll"), + _ => unreachable!(), + } + + let waker = waker_ref(&inner.notifier); + let mut cx = Context::from_waker(&waker); + + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } + + impl Drop for Reset<'_> { + fn drop(&mut self) { + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); + } + } + } + + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; + + let output = { + let future = unsafe { + match &mut *inner.future_or_output.get() { + FutureOrOutput::Future(fut) => Pin::new_unchecked(fut), + _ => unreachable!(), + } + }; + + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { + Poll::Pending => { + if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() + { + // Success + drop(reset); + this.inner = Some(inner); + return Poll::Pending; + } else { + unreachable!() + } + } + Poll::Ready(output) => output, + } + }; + + unsafe { + *inner.future_or_output.get() = FutureOrOutput::Output(output); + } + + inner.notifier.state.store(COMPLETE, SeqCst); + + // Wake all tasks and drop the slab + let mut wakers_guard = inner.notifier.wakers.lock().unwrap(); + let mut wakers = wakers_guard.take().unwrap(); + for waker in wakers.drain().flatten() { + waker.wake(); + } + + drop(reset); // Make borrow checker happy + drop(wakers_guard); + + // Safety: We're in the COMPLETE state + unsafe { Poll::Ready(inner.take_or_clone_output()) } + } +} + +impl<Fut> Clone for Shared<Fut> +where + Fut: Future, +{ + fn clone(&self) -> Self { + Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY } + } +} + +impl<Fut> Drop for Shared<Fut> +where + Fut: Future, +{ + fn drop(&mut self) { + if self.waker_key != NULL_WAKER_KEY { + if let Some(ref inner) = self.inner { + if let Ok(mut wakers) = inner.notifier.wakers.lock() { + if let Some(wakers) = wakers.as_mut() { + wakers.remove(self.waker_key); + } + } + } + } + } +} + +impl ArcWake for Notifier { + fn wake_by_ref(arc_self: &Arc<Self>) { + let wakers = &mut *arc_self.wakers.lock().unwrap(); + if let Some(wakers) = wakers.as_mut() { + for (_key, opt_waker) in wakers { + if let Some(waker) = opt_waker.take() { + waker.wake(); + } + } + } + } +} + +impl<Fut: Future> WeakShared<Fut> { + /// Attempts to upgrade this [`WeakShared`] into a [`Shared`]. + /// + /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled + /// to completion. + pub fn upgrade(&self) -> Option<Shared<Fut>> { + Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY }) + } +} diff --git a/third_party/rust/futures-util/src/future/join.rs b/third_party/rust/futures-util/src/future/join.rs new file mode 100644 index 0000000000..740ffbc988 --- /dev/null +++ b/third_party/rust/futures-util/src/future/join.rs @@ -0,0 +1,217 @@ +#![allow(non_snake_case)] + +use super::assert_future; +use crate::future::{maybe_done, MaybeDone}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Join:ident, <$($Fut:ident),*>), + )*) => ($( + pin_project! { + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<$($Fut: Future),*> { + $(#[pin] $Fut: MaybeDone<$Fut>,)* + } + } + + impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*> + where + $( + $Fut: Future + fmt::Debug, + $Fut::Output: fmt::Debug, + )* + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(stringify!($Join)) + $(.field(stringify!($Fut), &self.$Fut))* + .finish() + } + } + + impl<$($Fut: Future),*> $Join<$($Fut),*> { + fn new($($Fut: $Fut),*) -> Self { + Self { + $($Fut: maybe_done($Fut)),* + } + } + } + + impl<$($Fut: Future),*> Future for $Join<$($Fut),*> { + type Output = ($($Fut::Output),*); + + fn poll( + self: Pin<&mut Self>, cx: &mut Context<'_> + ) -> Poll<Self::Output> { + let mut all_done = true; + let mut futures = self.project(); + $( + all_done &= futures.$Fut.as_mut().poll(cx).is_ready(); + )* + + if all_done { + Poll::Ready(($(futures.$Fut.take_output().unwrap()), *)) + } else { + Poll::Pending + } + } + } + + impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> { + fn is_terminated(&self) -> bool { + $( + self.$Fut.is_terminated() + ) && * + } + } + )*) +} + +generate! { + /// Future for the [`join`](join()) function. + (Join, <Fut1, Fut2>), + + /// Future for the [`join3`] function. + (Join3, <Fut1, Fut2, Fut3>), + + /// Future for the [`join4`] function. + (Join4, <Fut1, Fut2, Fut3, Fut4>), + + /// Future for the [`join5`] function. + (Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>), +} + +/// Joins the result of two futures, waiting for them both to complete. +/// +/// This function will return a new future which awaits both futures to +/// complete. The returned future will finish with a tuple of both results. +/// +/// Note that this function consumes the passed futures and returns a +/// wrapped version of it. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let pair = future::join(a, b); +/// +/// assert_eq!(pair.await, (1, 2)); +/// # }); +/// ``` +pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2> +where + Fut1: Future, + Fut2: Future, +{ + let f = Join::new(future1, future2); + assert_future::<(Fut1::Output, Fut2::Output), _>(f) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let tuple = future::join3(a, b, c); +/// +/// assert_eq!(tuple.await, (1, 2, 3)); +/// # }); +/// ``` +pub fn join3<Fut1, Fut2, Fut3>( + future1: Fut1, + future2: Fut2, + future3: Fut3, +) -> Join3<Fut1, Fut2, Fut3> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, +{ + let f = Join3::new(future1, future2, future3); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let d = async { 4 }; +/// let tuple = future::join4(a, b, c, d); +/// +/// assert_eq!(tuple.await, (1, 2, 3, 4)); +/// # }); +/// ``` +pub fn join4<Fut1, Fut2, Fut3, Fut4>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, +) -> Join4<Fut1, Fut2, Fut3, Fut4> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, + Fut4: Future, +{ + let f = Join4::new(future1, future2, future3, future4); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let d = async { 4 }; +/// let e = async { 5 }; +/// let tuple = future::join5(a, b, c, d, e); +/// +/// assert_eq!(tuple.await, (1, 2, 3, 4, 5)); +/// # }); +/// ``` +pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, + future5: Fut5, +) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, + Fut4: Future, + Fut5: Future, +{ + let f = Join5::new(future1, future2, future3, future4, future5); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output, Fut5::Output), _>(f) +} diff --git a/third_party/rust/futures-util/src/future/join_all.rs b/third_party/rust/futures-util/src/future/join_all.rs new file mode 100644 index 0000000000..7dc159ba07 --- /dev/null +++ b/third_party/rust/futures-util/src/future/join_all.rs @@ -0,0 +1,166 @@ +//! Definition of the `JoinAll` combinator, waiting for all of a list of futures +//! to finish. + +use alloc::boxed::Box; +use alloc::vec::Vec; +use core::fmt; +use core::future::Future; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{assert_future, MaybeDone}; + +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{Collect, FuturesOrdered, StreamExt}; + +pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { + // Safety: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) +} + +#[must_use = "futures do nothing unless you `.await` or poll them"] +/// Future for the [`join_all`] function. +pub struct JoinAll<F> +where + F: Future, +{ + kind: JoinAllKind<F>, +} + +#[cfg(not(futures_no_atomic_cas))] +pub(crate) const SMALL: usize = 30; + +enum JoinAllKind<F> +where + F: Future, +{ + Small { + elems: Pin<Box<[MaybeDone<F>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: Collect<FuturesOrdered<F>, Vec<F::Output>>, + }, +} + +impl<F> fmt::Debug for JoinAll<F> +where + F: Future + fmt::Debug, + F::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.kind { + JoinAllKind::Small { ref elems } => { + f.debug_struct("JoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } + } +} + +/// Creates a future which represents a collection of the outputs of the futures +/// given. +/// +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec<T>` in the same order as they +/// were provided. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # See Also +/// +/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// +/// Some examples for additional functionality provided by these are: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::join_all; +/// +/// async fn foo(i: u32) -> u32 { i } +/// +/// let futures = vec![foo(1), foo(2), foo(3)]; +/// +/// assert_eq!(join_all(futures).await, [1, 2, 3]); +/// # }); +/// ``` +pub fn join_all<I>(iter: I) -> JoinAll<I::Item> +where + I: IntoIterator, + I::Item: Future, +{ + let iter = iter.into_iter(); + + #[cfg(futures_no_atomic_cas)] + { + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() }; + + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, + }; + + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } +} + +impl<F> Future for JoinAll<F> +where + F: Future, +{ + type Output = Vec<F::Output>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match &mut self.kind { + JoinAllKind::Small { elems } => { + let mut all_done = true; + + for elem in iter_pin_mut(elems.as_mut()) { + if elem.poll(cx).is_pending() { + all_done = false; + } + } + + if all_done { + let mut elems = mem::replace(elems, Box::pin([])); + let result = + iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); + Poll::Ready(result) + } else { + Poll::Pending + } + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), + } + } +} + +impl<F: Future> FromIterator<F> for JoinAll<F> { + fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { + join_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/lazy.rs b/third_party/rust/futures-util/src/future/lazy.rs new file mode 100644 index 0000000000..e9a8cf2fa9 --- /dev/null +++ b/third_party/rust/futures-util/src/future/lazy.rs @@ -0,0 +1,60 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`lazy`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Lazy<F> { + f: Option<F>, +} + +// safe because we never generate `Pin<&mut F>` +impl<F> Unpin for Lazy<F> {} + +/// Creates a new future that allows delayed execution of a closure. +/// +/// The provided closure is only run once the future is polled. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::lazy(|_| 1); +/// assert_eq!(a.await, 1); +/// +/// let b = future::lazy(|_| -> i32 { +/// panic!("oh no!") +/// }); +/// drop(b); // closure is never run +/// # }); +/// ``` +pub fn lazy<F, R>(f: F) -> Lazy<F> +where + F: FnOnce(&mut Context<'_>) -> R, +{ + assert_future::<R, _>(Lazy { f: Some(f) }) +} + +impl<F, R> FusedFuture for Lazy<F> +where + F: FnOnce(&mut Context<'_>) -> R, +{ + fn is_terminated(&self) -> bool { + self.f.is_none() + } +} + +impl<F, R> Future for Lazy<F> +where + F: FnOnce(&mut Context<'_>) -> R, +{ + type Output = R; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> { + Poll::Ready((self.f.take().expect("Lazy polled after completion"))(cx)) + } +} diff --git a/third_party/rust/futures-util/src/future/maybe_done.rs b/third_party/rust/futures-util/src/future/maybe_done.rs new file mode 100644 index 0000000000..26e6c27588 --- /dev/null +++ b/third_party/rust/futures-util/src/future/maybe_done.rs @@ -0,0 +1,104 @@ +//! Definition of the MaybeDone combinator + +use super::assert_future; +use core::mem; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; + +/// A future that may have completed. +/// +/// This is created by the [`maybe_done()`] function. +#[derive(Debug)] +pub enum MaybeDone<Fut: Future> { + /// A not-yet-completed future + Future(/* #[pin] */ Fut), + /// The output of the completed future + Done(Fut::Output), + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take_output`](MaybeDone::take_output) method. + Gone, +} + +impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} + +/// Wraps a future into a `MaybeDone` +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// use futures::pin_mut; +/// +/// let future = future::maybe_done(async { 5 }); +/// pin_mut!(future); +/// assert_eq!(future.as_mut().take_output(), None); +/// let () = future.as_mut().await; +/// assert_eq!(future.as_mut().take_output(), Some(5)); +/// assert_eq!(future.as_mut().take_output(), None); +/// # }); +/// ``` +pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { + assert_future::<(), _>(MaybeDone::Future(future)) +} + +impl<Fut: Future> MaybeDone<Fut> { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take_output`](MaybeDone::take_output) + /// has not yet been called. + #[inline] + pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { + unsafe { + match self.get_unchecked_mut() { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempt to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { + match &*self { + Self::Done(_) => {} + Self::Future(_) | Self::Gone => return None, + } + unsafe { + match mem::replace(self.get_unchecked_mut(), Self::Gone) { + MaybeDone::Done(output) => Some(output), + _ => unreachable!(), + } + } + } +} + +impl<Fut: Future> FusedFuture for MaybeDone<Fut> { + fn is_terminated(&self) -> bool { + match self { + Self::Future(_) => false, + Self::Done(_) | Self::Gone => true, + } + } +} + +impl<Fut: Future> Future for MaybeDone<Fut> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(f) => { + let res = ready!(Pin::new_unchecked(f).poll(cx)); + self.set(Self::Done(res)); + } + MaybeDone::Done(_) => {} + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + } + Poll::Ready(()) + } +} diff --git a/third_party/rust/futures-util/src/future/mod.rs b/third_party/rust/futures-util/src/future/mod.rs new file mode 100644 index 0000000000..374e36512f --- /dev/null +++ b/third_party/rust/futures-util/src/future/mod.rs @@ -0,0 +1,131 @@ +//! Asynchronous values. +//! +//! This module contains: +//! +//! - The [`Future`] trait. +//! - The [`FutureExt`] and [`TryFutureExt`] trait, which provides adapters for +//! chaining and composing futures. +//! - Top-level future combinators like [`lazy`](lazy()) which creates a future +//! from a closure that defines its return value, and [`ready`](ready()), +//! which constructs a future with an immediate defined value. + +#[doc(no_inline)] +pub use core::future::Future; + +#[cfg(feature = "alloc")] +pub use futures_core::future::{BoxFuture, LocalBoxFuture}; +pub use futures_core::future::{FusedFuture, TryFuture}; +pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; + +// Extension traits and combinators +#[allow(clippy::module_inception)] +mod future; +pub use self::future::{ + Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, MapInto, NeverError, Then, UnitError, +}; + +#[deprecated(note = "This is now an alias for [Flatten](Flatten)")] +pub use self::future::FlattenStream; + +#[cfg(feature = "std")] +pub use self::future::CatchUnwind; + +#[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] +#[cfg(feature = "std")] +pub use self::future::{Remote, RemoteHandle}; + +#[cfg(feature = "std")] +pub use self::future::{Shared, WeakShared}; + +mod try_future; +pub use self::try_future::{ + AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto, + OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, +}; + +#[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] +pub use self::try_future::FlattenSink; + +// Primitive futures + +mod lazy; +pub use self::lazy::{lazy, Lazy}; + +mod pending; +pub use self::pending::{pending, Pending}; + +mod maybe_done; +pub use self::maybe_done::{maybe_done, MaybeDone}; + +mod try_maybe_done; +pub use self::try_maybe_done::{try_maybe_done, TryMaybeDone}; + +mod option; +pub use self::option::OptionFuture; + +mod poll_fn; +pub use self::poll_fn::{poll_fn, PollFn}; + +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + +mod ready; +pub use self::ready::{err, ok, ready, Ready}; + +mod join; +pub use self::join::{join, join3, join4, join5, Join, Join3, Join4, Join5}; + +#[cfg(feature = "alloc")] +mod join_all; +#[cfg(feature = "alloc")] +pub use self::join_all::{join_all, JoinAll}; + +mod select; +pub use self::select::{select, Select}; + +#[cfg(feature = "alloc")] +mod select_all; +#[cfg(feature = "alloc")] +pub use self::select_all::{select_all, SelectAll}; + +mod try_join; +pub use self::try_join::{ + try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5, +}; + +#[cfg(feature = "alloc")] +mod try_join_all; +#[cfg(feature = "alloc")] +pub use self::try_join_all::{try_join_all, TryJoinAll}; + +mod try_select; +pub use self::try_select::{try_select, TrySelect}; + +#[cfg(feature = "alloc")] +mod select_ok; +#[cfg(feature = "alloc")] +pub use self::select_ok::{select_ok, SelectOk}; + +mod either; +pub use self::either::Either; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod abortable; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use abortable::abortable; + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +pub(crate) fn assert_future<T, F>(future: F) -> F +where + F: Future<Output = T>, +{ + future +} diff --git a/third_party/rust/futures-util/src/future/option.rs b/third_party/rust/futures-util/src/future/option.rs new file mode 100644 index 0000000000..0bc377758a --- /dev/null +++ b/third_party/rust/futures-util/src/future/option.rs @@ -0,0 +1,64 @@ +//! Definition of the `Option` (optional step) combinator + +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// A future representing a value which may or may not be present. + /// + /// Created by the [`From`] implementation for [`Option`](std::option::Option). + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::OptionFuture; + /// + /// let mut a: OptionFuture<_> = Some(async { 123 }).into(); + /// assert_eq!(a.await, Some(123)); + /// + /// a = None.into(); + /// assert_eq!(a.await, None); + /// # }); + /// ``` + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct OptionFuture<F> { + #[pin] + inner: Option<F>, + } +} + +impl<F> Default for OptionFuture<F> { + fn default() -> Self { + Self { inner: None } + } +} + +impl<F: Future> Future for OptionFuture<F> { + type Output = Option<F::Output>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.project().inner.as_pin_mut() { + Some(x) => x.poll(cx).map(Some), + None => Poll::Ready(None), + } + } +} + +impl<F: FusedFuture> FusedFuture for OptionFuture<F> { + fn is_terminated(&self) -> bool { + match &self.inner { + Some(x) => x.is_terminated(), + None => true, + } + } +} + +impl<T> From<Option<T>> for OptionFuture<T> { + fn from(option: Option<T>) -> Self { + Self { inner: option } + } +} diff --git a/third_party/rust/futures-util/src/future/pending.rs b/third_party/rust/futures-util/src/future/pending.rs new file mode 100644 index 0000000000..b8e28686e1 --- /dev/null +++ b/third_party/rust/futures-util/src/future/pending.rs @@ -0,0 +1,55 @@ +use super::assert_future; +use core::marker; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`pending()`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Pending<T> { + _data: marker::PhantomData<T>, +} + +impl<T> FusedFuture for Pending<T> { + fn is_terminated(&self) -> bool { + true + } +} + +/// Creates a future which never resolves, representing a computation that never +/// finishes. +/// +/// The returned future will forever return [`Poll::Pending`]. +/// +/// # Examples +/// +/// ```ignore +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let future = future::pending(); +/// let () = future.await; +/// unreachable!(); +/// # }); +/// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] +pub fn pending<T>() -> Pending<T> { + assert_future::<T, _>(Pending { _data: marker::PhantomData }) +} + +impl<T> Future for Pending<T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> { + Poll::Pending + } +} + +impl<T> Unpin for Pending<T> {} + +impl<T> Clone for Pending<T> { + fn clone(&self) -> Self { + pending() + } +} diff --git a/third_party/rust/futures-util/src/future/poll_fn.rs b/third_party/rust/futures-util/src/future/poll_fn.rs new file mode 100644 index 0000000000..19311570b5 --- /dev/null +++ b/third_party/rust/futures-util/src/future/poll_fn.rs @@ -0,0 +1,58 @@ +//! Definition of the `PollFn` adapter combinator + +use super::assert_future; +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +/// Future for the [`poll_fn`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PollFn<F> { + f: F, +} + +impl<F> Unpin for PollFn<F> {} + +/// Creates a new future wrapping around a function returning [`Poll`]. +/// +/// Polling the returned future delegates to the wrapped function. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::poll_fn; +/// use futures::task::{Context, Poll}; +/// +/// fn read_line(_cx: &mut Context<'_>) -> Poll<String> { +/// Poll::Ready("Hello, World!".into()) +/// } +/// +/// let read_future = poll_fn(read_line); +/// assert_eq!(read_future.await, "Hello, World!".to_owned()); +/// # }); +/// ``` +pub fn poll_fn<T, F>(f: F) -> PollFn<F> +where + F: FnMut(&mut Context<'_>) -> Poll<T>, +{ + assert_future::<T, _>(PollFn { f }) +} + +impl<F> fmt::Debug for PollFn<F> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl<T, F> Future for PollFn<F> +where + F: FnMut(&mut Context<'_>) -> Poll<T>, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + (&mut self.f)(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/poll_immediate.rs b/third_party/rust/futures-util/src/future/poll_immediate.rs new file mode 100644 index 0000000000..5ae555c73e --- /dev/null +++ b/third_party/rust/futures-util/src/future/poll_immediate.rs @@ -0,0 +1,126 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate<T> { + #[pin] + future: Option<T> + } +} + +impl<T, F> Future for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Output = Option<T>; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let mut this = self.project(); + let inner = + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); + match inner.poll(cx) { + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } + Poll::Pending => Poll::Ready(None), + } + } +} + +impl<T: Future> FusedFuture for PollImmediate<T> { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done. +/// The stream will never return [Poll::Pending](core::task::Poll::Pending) +/// so polling it in a tight loop is worse than using a blocking synchronous function. +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future, pin_mut}; +/// use future::FusedFuture; +/// +/// let f = async { 1_u32 }; +/// pin_mut!(f); +/// let mut r = future::poll_immediate(f); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// let mut p = future::poll_immediate(f); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl<T, F> Stream for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Item = Poll<T>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); + t + }))), + } + } +} + +/// Creates a future that is immediately ready with an Option of a value. +/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready). +/// +/// # Caution +/// +/// When consuming the future by this function, note the following: +/// +/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value. +/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate(async { 1_u32 }); +/// assert_eq!(r.await, Some(1)); +/// +/// let p = future::poll_immediate(future::pending::<i32>()); +/// assert_eq!(p.await, None); +/// # }); +/// ``` +/// +/// ### Reusing a future +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{future, pin_mut}; +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// assert_eq!(None, future::poll_immediate(&mut f).await); +/// assert_eq!(42, f.await); +/// # }); +/// ``` +pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> { + assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) }) +} diff --git a/third_party/rust/futures-util/src/future/ready.rs b/third_party/rust/futures-util/src/future/ready.rs new file mode 100644 index 0000000000..e3d791b3cf --- /dev/null +++ b/third_party/rust/futures-util/src/future/ready.rs @@ -0,0 +1,82 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`ready`](ready()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Ready<T>(Option<T>); + +impl<T> Ready<T> { + /// Unwraps the value from this immediately ready future. + #[inline] + pub fn into_inner(mut self) -> T { + self.0.take().unwrap() + } +} + +impl<T> Unpin for Ready<T> {} + +impl<T> FusedFuture for Ready<T> { + fn is_terminated(&self) -> bool { + self.0.is_none() + } +} + +impl<T> Future for Ready<T> { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> { + Poll::Ready(self.0.take().expect("Ready polled after completion")) + } +} + +/// Creates a future that is immediately ready with a value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(1); +/// assert_eq!(a.await, 1); +/// # }); +/// ``` +pub fn ready<T>(t: T) -> Ready<T> { + assert_future::<T, _>(Ready(Some(t))) +} + +/// Create a future that is immediately ready with a success value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ok::<i32, i32>(1); +/// assert_eq!(a.await, Ok(1)); +/// # }); +/// ``` +pub fn ok<T, E>(t: T) -> Ready<Result<T, E>> { + Ready(Some(Ok(t))) +} + +/// Create a future that is immediately ready with an error value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::err::<i32, i32>(1); +/// assert_eq!(a.await, Err(1)); +/// # }); +/// ``` +pub fn err<T, E>(err: E) -> Ready<Result<T, E>> { + Ready(Some(Err(err))) +} diff --git a/third_party/rust/futures-util/src/future/select.rs b/third_party/rust/futures-util/src/future/select.rs new file mode 100644 index 0000000000..e693a30b00 --- /dev/null +++ b/third_party/rust/futures-util/src/future/select.rs @@ -0,0 +1,125 @@ +use super::assert_future; +use crate::future::{Either, FutureExt}; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`select()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Select<A, B> { + inner: Option<(A, B)>, +} + +impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// A simple example +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{ +/// pin_mut, +/// future::Either, +/// future::self, +/// }; +/// +/// // These two futures have different types even though their outputs have the same type. +/// let future1 = async { +/// future::pending::<()>().await; // will never finish +/// 1 +/// }; +/// let future2 = async { +/// future::ready(2).await +/// }; +/// +/// // 'select' requires Future + Unpin bounds +/// pin_mut!(future1); +/// pin_mut!(future2); +/// +/// let value = match future::select(future1, future2).await { +/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` +/// // `_` represents `future2` +/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2` +/// // `_` represents `future1` +/// }; +/// +/// assert!(value == 2); +/// # }); +/// ``` +/// +/// A more complex example +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt}; +/// +/// // A poor-man's join implemented on top of select +/// +/// fn join<A, B>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)> +/// where A: Future + Unpin, +/// B: Future + Unpin, +/// { +/// future::select(a, b).then(|either| { +/// match either { +/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(), +/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(), +/// } +/// }) +/// } +/// ``` +pub fn select<A, B>(future1: A, future2: B) -> Select<A, B> +where + A: Future + Unpin, + B: Future + Unpin, +{ + assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { + inner: Some((future1, future2)), + }) +} + +impl<A, B> Future for Select<A, B> +where + A: Future + Unpin, + B: Future + Unpin, +{ + type Output = Either<(A::Output, B), (B::Output, A)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, b))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, a))); + } + + self.inner = Some((a, b)); + Poll::Pending + } +} + +impl<A, B> FusedFuture for Select<A, B> +where + A: Future + Unpin, + B: Future + Unpin, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} diff --git a/third_party/rust/futures-util/src/future/select_all.rs b/third_party/rust/futures-util/src/future/select_all.rs new file mode 100644 index 0000000000..0a51d0da6c --- /dev/null +++ b/third_party/rust/futures-util/src/future/select_all.rs @@ -0,0 +1,75 @@ +use super::assert_future; +use crate::future::FutureExt; +use alloc::vec::Vec; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +/// Future for the [`select_all`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SelectAll<Fut> { + inner: Vec<Fut>, +} + +impl<Fut: Unpin> Unpin for SelectAll<Fut> {} + +/// Creates a new future which will select over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready. Upon +/// completion the item resolved will be returned, along with the index of the +/// future that was ready and the list of all the remaining futures. +/// +/// There are no guarantees provided on the order of the list with the remaining +/// futures. They might be swapped around, reversed, or completely random. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_all<I>(iter: I) -> SelectAll<I::Item> +where + I: IntoIterator, + I::Item: Future + Unpin, +{ + let ret = SelectAll { inner: iter.into_iter().collect() }; + assert!(!ret.inner.is_empty()); + assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret) +} + +impl<Fut> SelectAll<Fut> { + /// Consumes this combinator, returning the underlying futures. + pub fn into_inner(self) -> Vec<Fut> { + self.inner + } +} + +impl<Fut: Future + Unpin> Future for SelectAll<Fut> { + type Output = (Fut::Output, usize, Vec<Fut>); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + }); + match item { + Some((idx, res)) => { + #[allow(clippy::let_underscore_future)] + let _ = self.inner.swap_remove(idx); + let rest = mem::take(&mut self.inner); + Poll::Ready((res, idx, rest)) + } + None => Poll::Pending, + } + } +} + +impl<Fut: Future + Unpin> FromIterator<Fut> for SelectAll<Fut> { + fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self { + select_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/select_ok.rs b/third_party/rust/futures-util/src/future/select_ok.rs new file mode 100644 index 0000000000..5d5579930b --- /dev/null +++ b/third_party/rust/futures-util/src/future/select_ok.rs @@ -0,0 +1,85 @@ +use super::assert_future; +use crate::future::TryFutureExt; +use alloc::vec::Vec; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`select_ok`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SelectOk<Fut> { + inner: Vec<Fut>, +} + +impl<Fut: Unpin> Unpin for SelectOk<Fut> {} + +/// Creates a new future which will select the first successful future over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike +/// `select_all`, this will only return the first successful completion, or the last +/// failure. This is useful in contexts where any success is desired and failures +/// are ignored, unless all the futures fail. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_ok<I>(iter: I) -> SelectOk<I::Item> +where + I: IntoIterator, + I::Item: TryFuture + Unpin, +{ + let ret = SelectOk { inner: iter.into_iter().collect() }; + assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); + assert_future::< + Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, + _, + >(ret) +} + +impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { + type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // loop until we've either exhausted all errors, a success was hit, or nothing is ready + loop { + let item = + self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + }); + match item { + Some((idx, res)) => { + // always remove Ok or Err, if it's not the last Err continue looping + drop(self.inner.remove(idx)); + match res { + Ok(e) => { + let rest = mem::take(&mut self.inner); + return Poll::Ready(Ok((e, rest))); + } + Err(e) => { + if self.inner.is_empty() { + return Poll::Ready(Err(e)); + } + } + } + } + None => { + // based on the filter above, nothing is ready, return + return Poll::Pending; + } + } + } + } +} + +impl<Fut: TryFuture + Unpin> FromIterator<Fut> for SelectOk<Fut> { + fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self { + select_ok(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/into_future.rs b/third_party/rust/futures-util/src/future/try_future/into_future.rs new file mode 100644 index 0000000000..9f093d0e2e --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/into_future.rs @@ -0,0 +1,36 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`into_future`](super::TryFutureExt::into_future) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct IntoFuture<Fut> { + #[pin] + future: Fut, + } +} + +impl<Fut> IntoFuture<Fut> { + #[inline] + pub(crate) fn new(future: Fut) -> Self { + Self { future } + } +} + +impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> { + fn is_terminated(&self) -> bool { + self.future.is_terminated() + } +} + +impl<Fut: TryFuture> Future for IntoFuture<Fut> { + type Output = Result<Fut::Ok, Fut::Error>; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.project().future.try_poll(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/mod.rs b/third_party/rust/futures-util/src/future/try_future/mod.rs new file mode 100644 index 0000000000..e5bc700714 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/mod.rs @@ -0,0 +1,625 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the `FutureExt` trait which adds methods to `Future` types. + +#[cfg(feature = "compat")] +use crate::compat::Compat; +use core::pin::Pin; +use futures_core::{ + future::TryFuture, + stream::TryStream, + task::{Context, Poll}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use crate::fns::{ + inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn, + unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn, + UnwrapOrElseFn, +}; +use crate::future::{assert_future, Inspect, Map}; +use crate::stream::assert_stream; + +// Combinators +mod into_future; +mod try_flatten; +mod try_flatten_err; + +delegate_all!( + /// Future for the [`try_flatten`](TryFutureExt::try_flatten) method. + TryFlatten<Fut1, Fut2>( + try_flatten::TryFlatten<Fut1, Fut2> + ): Debug + Future + FusedFuture + New[|x: Fut1| try_flatten::TryFlatten::new(x)] +); + +delegate_all!( + /// Future for the [`try_flatten_err`](TryFutureExt::try_flatten_err) method. + TryFlattenErr<Fut1, Fut2>( + try_flatten_err::TryFlattenErr<Fut1, Fut2> + ): Debug + Future + FusedFuture + New[|x: Fut1| try_flatten_err::TryFlattenErr::new(x)] +); + +delegate_all!( + /// Future for the [`try_flatten_stream`](TryFutureExt::try_flatten_stream) method. + TryFlattenStream<Fut>( + try_flatten::TryFlatten<Fut, Fut::Ok> + ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)] + where Fut: TryFuture +); + +#[cfg(feature = "sink")] +delegate_all!( + /// Sink for the [`flatten_sink`](TryFutureExt::flatten_sink) method. + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] + FlattenSink<Fut, Si>( + try_flatten::TryFlatten<Fut, Si> + ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)] +); + +delegate_all!( + /// Future for the [`and_then`](TryFutureExt::and_then) method. + AndThen<Fut1, Fut2, F>( + TryFlatten<MapOk<Fut1, F>, Fut2> + ): Debug + Future + FusedFuture + New[|x: Fut1, f: F| TryFlatten::new(MapOk::new(x, f))] +); + +delegate_all!( + /// Future for the [`or_else`](TryFutureExt::or_else) method. + OrElse<Fut1, Fut2, F>( + TryFlattenErr<MapErr<Fut1, F>, Fut2> + ): Debug + Future + FusedFuture + New[|x: Fut1, f: F| TryFlattenErr::new(MapErr::new(x, f))] +); + +delegate_all!( + /// Future for the [`err_into`](TryFutureExt::err_into) method. + ErrInto<Fut, E>( + MapErr<Fut, IntoFn<E>> + ): Debug + Future + FusedFuture + New[|x: Fut| MapErr::new(x, into_fn())] +); + +delegate_all!( + /// Future for the [`ok_into`](TryFutureExt::ok_into) method. + OkInto<Fut, E>( + MapOk<Fut, IntoFn<E>> + ): Debug + Future + FusedFuture + New[|x: Fut| MapOk::new(x, into_fn())] +); + +delegate_all!( + /// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method. + InspectOk<Fut, F>( + Inspect<IntoFuture<Fut>, InspectOkFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_ok_fn(f))] +); + +delegate_all!( + /// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method. + InspectErr<Fut, F>( + Inspect<IntoFuture<Fut>, InspectErrFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_err_fn(f))] +); + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::into_future::IntoFuture; + +delegate_all!( + /// Future for the [`map_ok`](TryFutureExt::map_ok) method. + MapOk<Fut, F>( + Map<IntoFuture<Fut>, MapOkFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_ok_fn(f))] +); + +delegate_all!( + /// Future for the [`map_err`](TryFutureExt::map_err) method. + MapErr<Fut, F>( + Map<IntoFuture<Fut>, MapErrFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_err_fn(f))] +); + +delegate_all!( + /// Future for the [`map_ok_or_else`](TryFutureExt::map_ok_or_else) method. + MapOkOrElse<Fut, F, G>( + Map<IntoFuture<Fut>, MapOkOrElseFn<F, G>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(IntoFuture::new(x), map_ok_or_else_fn(f, g))] +); + +delegate_all!( + /// Future for the [`unwrap_or_else`](TryFutureExt::unwrap_or_else) method. + UnwrapOrElse<Fut, F>( + Map<IntoFuture<Fut>, UnwrapOrElseFn<F>> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), unwrap_or_else_fn(f))] +); + +impl<Fut: ?Sized + TryFuture> TryFutureExt for Fut {} + +/// Adapters specific to [`Result`]-returning futures +pub trait TryFutureExt: TryFuture { + /// Flattens the execution of this future when the successful result of this + /// future is a [`Sink`]. + /// + /// This can be useful when sink initialization is deferred, and it is + /// convenient to work with that sink as if the sink was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::{Future, TryFutureExt}; + /// use futures::sink::Sink; + /// # use futures::channel::mpsc::{self, SendError}; + /// # type T = i32; + /// # type E = SendError; + /// + /// fn make_sink_async() -> impl Future<Output = Result< + /// impl Sink<T, Error = E>, + /// E, + /// >> { // ... } + /// # let (tx, _rx) = mpsc::unbounded::<i32>(); + /// # futures::future::ready(Ok(tx)) + /// # } + /// fn take_sink(sink: impl Sink<T, Error = E>) { /* ... */ } + /// + /// let fut = make_sink_async(); + /// take_sink(fut.flatten_sink()) + /// ``` + #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] + fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok> + where + Self::Ok: Sink<Item, Error = Self::Error>, + Self: Sized, + { + crate::sink::assert_sink::<Item, Self::Error, _>(FlattenSink::new(self)) + } + + /// Maps this future's success value to a different value. + /// + /// This method can be used to change the [`Ok`](TryFuture::Ok) type of the + /// future into a different type. It is similar to the [`Result::map`] + /// method. You can use this method to chain along a computation once the + /// future has been resolved. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.map_ok(|x| x + 3); + /// assert_eq!(future.await, Ok(4)); + /// # }); + /// ``` + /// + /// Calling [`map_ok`](TryFutureExt::map_ok) on an errored future has no + /// effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.map_ok(|x| x + 3); + /// assert_eq!(future.await, Err(1)); + /// # }); + /// ``` + fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> + where + F: FnOnce(Self::Ok) -> T, + Self: Sized, + { + assert_future::<Result<T, Self::Error>, _>(MapOk::new(self, f)) + } + + /// Maps this future's success value to a different value, and permits for error handling resulting in the same type. + /// + /// This method can be used to coalesce your [`Ok`](TryFuture::Ok) type and [`Error`](TryFuture::Error) into another type, + /// where that type is the same for both outcomes. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// The provided closure `e` will only be called if this future is resolved + /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(5) }; + /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); + /// assert_eq!(future.await, 8); + /// + /// let future = async { Err::<i32, i32>(5) }; + /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); + /// assert_eq!(future.await, 10); + /// # }); + /// ``` + /// + fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E> + where + F: FnOnce(Self::Ok) -> T, + E: FnOnce(Self::Error) -> T, + Self: Sized, + { + assert_future::<T, _>(MapOkOrElse::new(self, f, e)) + } + + /// Maps this future's error value to a different value. + /// + /// This method can be used to change the [`Error`](TryFuture::Error) type + /// of the future into a different type. It is similar to the + /// [`Result::map_err`] method. You can use this method for example to + /// ensure that futures have the same [`Error`](TryFuture::Error) type when + /// using [`select!`] or [`join!`]. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.map_err(|x| x + 3); + /// assert_eq!(future.await, Err(4)); + /// # }); + /// ``` + /// + /// Calling [`map_err`](TryFutureExt::map_err) on a successful future has + /// no effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.map_err(|x| x + 3); + /// assert_eq!(future.await, Ok(1)); + /// # }); + /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select + fn map_err<E, F>(self, f: F) -> MapErr<Self, F> + where + F: FnOnce(Self::Error) -> E, + Self: Sized, + { + assert_future::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) + } + + /// Maps this future's [`Error`](TryFuture::Error) to a new error type + /// using the [`Into`](std::convert::Into) trait. + /// + /// This method does for futures what the `?`-operator does for + /// [`Result`]: It lets the compiler infer the type of the resulting + /// error. Just as [`map_err`](TryFutureExt::map_err), this is useful for + /// example to ensure that futures have the same [`Error`](TryFuture::Error) + /// type when using [`select!`] or [`join!`]. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future_err_u8 = async { Err::<(), u8>(1) }; + /// let future_err_i32 = future_err_u8.err_into::<i32>(); + /// # }); + /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select + fn err_into<E>(self) -> ErrInto<Self, E> + where + Self: Sized, + Self::Error: Into<E>, + { + assert_future::<Result<Self::Ok, E>, _>(ErrInto::new(self)) + } + + /// Maps this future's [`Ok`](TryFuture::Ok) to a new type + /// using the [`Into`](std::convert::Into) trait. + fn ok_into<U>(self) -> OkInto<Self, U> + where + Self: Sized, + Self::Ok: Into<U>, + { + assert_future::<Result<U, Self::Error>, _>(OkInto::new(self)) + } + + /// Executes another future after this one resolves successfully. The + /// success value is passed to a closure to create this subsequent future. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If this future resolves to an [`Err`], panics, or is + /// dropped, then the provided closure will never be invoked. The + /// [`Error`](TryFuture::Error) type of this future and the future + /// returned by `f` have to match. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.and_then(|x| async move { Ok::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Ok(4)); + /// # }); + /// ``` + /// + /// Calling [`and_then`](TryFutureExt::and_then) on an errored future has no + /// effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.and_then(|x| async move { Err::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Err(1)); + /// # }); + /// ``` + fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> + where + F: FnOnce(Self::Ok) -> Fut, + Fut: TryFuture<Error = Self::Error>, + Self: Sized, + { + assert_future::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) + } + + /// Executes another future if this one resolves to an error. The + /// error value is passed to a closure to create this subsequent future. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Err`]. If this future resolves to an [`Ok`], panics, or is + /// dropped, then the provided closure will never be invoked. The + /// [`Ok`](TryFuture::Ok) type of this future and the future returned by `f` + /// have to match. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.or_else(|x| async move { Err::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Err(4)); + /// # }); + /// ``` + /// + /// Calling [`or_else`](TryFutureExt::or_else) on a successful future has + /// no effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.or_else(|x| async move { Ok::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Ok(1)); + /// # }); + /// ``` + fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> + where + F: FnOnce(Self::Error) -> Fut, + Fut: TryFuture<Ok = Self::Ok>, + Self: Sized, + { + assert_future::<Result<Fut::Ok, Fut::Error>, _>(OrElse::new(self, f)) + } + + /// Do something with the success value of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect_ok`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// + /// let future = async { Ok::<_, ()>(1) }; + /// let new_future = future.inspect_ok(|&x| println!("about to resolve: {}", x)); + /// assert_eq!(new_future.await, Ok(1)); + /// # }); + /// ``` + fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> + where + F: FnOnce(&Self::Ok), + Self: Sized, + { + assert_future::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) + } + + /// Do something with the error value of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect_err`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// + /// let future = async { Err::<(), _>(1) }; + /// let new_future = future.inspect_err(|&x| println!("about to error: {}", x)); + /// assert_eq!(new_future.await, Err(1)); + /// # }); + /// ``` + fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> + where + F: FnOnce(&Self::Error), + Self: Sized, + { + assert_future::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) + } + + /// Flatten the execution of this future when the successful result of this + /// future is another future. + /// + /// This is equivalent to `future.and_then(|x| x)`. + fn try_flatten(self) -> TryFlatten<Self, Self::Ok> + where + Self::Ok: TryFuture<Error = Self::Error>, + Self: Sized, + { + assert_future::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryFlatten::new(self)) + } + + /// Flatten the execution of this future when the successful result of this + /// future is a stream. + /// + /// This can be useful when stream initialization is deferred, and it is + /// convenient to work with that stream as if stream was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// use futures::stream::{self, TryStreamExt}; + /// + /// let stream_items = vec![17, 18, 19].into_iter().map(Ok); + /// let future_of_a_stream = async { Ok::<_, ()>(stream::iter(stream_items)) }; + /// + /// let stream = future_of_a_stream.try_flatten_stream(); + /// let list = stream.try_collect::<Vec<_>>().await; + /// assert_eq!(list, Ok(vec![17, 18, 19])); + /// # }); + /// ``` + fn try_flatten_stream(self) -> TryFlattenStream<Self> + where + Self::Ok: TryStream<Error = Self::Error>, + Self: Sized, + { + assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new( + self, + )) + } + + /// Unwraps this future's output, producing a future with this future's + /// [`Ok`](TryFuture::Ok) type as its + /// [`Output`](std::future::Future::Output) type. + /// + /// If this future is resolved successfully, the returned future will + /// contain the original future's success value as output. Otherwise, the + /// closure `f` is called with the error value to produce an alternate + /// success value. + /// + /// This method is similar to the [`Result::unwrap_or_else`] method. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<(), &str>("Boom!") }; + /// let future = future.unwrap_or_else(|_| ()); + /// assert_eq!(future.await, ()); + /// # }); + /// ``` + fn unwrap_or_else<F>(self, f: F) -> UnwrapOrElse<Self, F> + where + Self: Sized, + F: FnOnce(Self::Error) -> Self::Ok, + { + assert_future::<Self::Ok, _>(UnwrapOrElse::new(self, f)) + } + + /// Wraps a [`TryFuture`] into a future compatible with libraries using + /// futures 0.1 future definitions. Requires the `compat` feature to enable. + #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] + fn compat(self) -> Compat<Self> + where + Self: Sized + Unpin, + { + Compat::new(self) + } + + /// Wraps a [`TryFuture`] into a type that implements + /// [`Future`](std::future::Future). + /// + /// [`TryFuture`]s currently do not implement the + /// [`Future`](std::future::Future) trait due to limitations of the + /// compiler. + /// + /// # Examples + /// + /// ``` + /// use futures::future::{Future, TryFuture, TryFutureExt}; + /// + /// # type T = i32; + /// # type E = (); + /// fn make_try_future() -> impl TryFuture<Ok = T, Error = E> { // ... } + /// # async { Ok::<i32, ()>(1) } + /// # } + /// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ } + /// + /// take_future(make_try_future().into_future()); + /// ``` + fn into_future(self) -> IntoFuture<Self> + where + Self: Sized, + { + assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self)) + } + + /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] + /// future types. + fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>> + where + Self: Unpin, + { + Pin::new(self).try_poll(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten.rs new file mode 100644 index 0000000000..1ce4559ac2 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/try_flatten.rs @@ -0,0 +1,162 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + #[project = TryFlattenProj] + #[derive(Debug)] + pub enum TryFlatten<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } +} + +impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { + pub(crate) fn new(future: Fut1) -> Self { + Self::First { f: future } + } +} + +impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } +} + +impl<Fut> Future for TryFlatten<Fut, Fut::Ok> +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, +{ + type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(loop { + match self.as_mut().project() { + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); + } + }, + TryFlattenProj::Second { f } => { + let output = ready!(f.try_poll(cx)); + self.set(Self::Empty); + break output; + } + TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), + } + }) + } +} + +impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } +} + +impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, +{ + type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(loop { + match self.as_mut().project() { + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Some(Err(e)); + } + }, + TryFlattenProj::Second { f } => { + let output = ready!(f.try_poll_next(cx)); + if output.is_none() { + self.set(Self::Empty); + } + break output; + } + TryFlattenProj::Empty => break None, + } + }) + } +} + +#[cfg(feature = "sink")] +impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok> +where + Fut: TryFuture, + Fut::Ok: Sink<Item, Error = Fut::Error>, +{ + type Error = Fut::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(loop { + match self.as_mut().project() { + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); + } + }, + TryFlattenProj::Second { f } => { + break ready!(f.poll_ready(cx)); + } + TryFlattenProj::Empty => panic!("poll_ready called after eof"), + } + }) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + match self.project() { + TryFlattenProj::First { .. } => panic!("poll_ready not called first"), + TryFlattenProj::Second { f } => f.start_send(item), + TryFlattenProj::Empty => panic!("start_send called after eof"), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.project() { + TryFlattenProj::First { .. } => Poll::Ready(Ok(())), + TryFlattenProj::Second { f } => f.poll_flush(cx), + TryFlattenProj::Empty => panic!("poll_flush called after eof"), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let res = match self.as_mut().project() { + TryFlattenProj::Second { f } => f.poll_close(cx), + _ => Poll::Ready(Ok(())), + }; + if res.is_ready() { + self.set(Self::Empty); + } + res + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs new file mode 100644 index 0000000000..39b7d9f5f6 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs @@ -0,0 +1,62 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + #[project = TryFlattenErrProj] + #[derive(Debug)] + pub enum TryFlattenErr<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } +} + +impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> { + pub(crate) fn new(future: Fut1) -> Self { + Self::First { f: future } + } +} + +impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, +{ + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } +} + +impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, +{ + type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(loop { + match self.as_mut().project() { + TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) { + Err(f) => self.set(Self::Second { f }), + Ok(e) => { + self.set(Self::Empty); + break Ok(e); + } + }, + TryFlattenErrProj::Second { f } => { + let output = ready!(f.try_poll(cx)); + self.set(Self::Empty); + break output; + } + TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"), + } + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_join.rs b/third_party/rust/futures-util/src/future/try_join.rs new file mode 100644 index 0000000000..6af1f0ccbf --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_join.rs @@ -0,0 +1,256 @@ +#![allow(non_snake_case)] + +use crate::future::{assert_future, try_maybe_done, TryMaybeDone}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Join:ident, <Fut1, $($Fut:ident),*>), + )*) => ($( + pin_project! { + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> { + #[pin] Fut1: TryMaybeDone<Fut1>, + $(#[pin] $Fut: TryMaybeDone<$Fut>,)* + } + } + + impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture + fmt::Debug, + Fut1::Ok: fmt::Debug, + Fut1::Error: fmt::Debug, + $( + $Fut: TryFuture + fmt::Debug, + $Fut::Ok: fmt::Debug, + $Fut::Error: fmt::Debug, + )* + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(stringify!($Join)) + .field("Fut1", &self.Fut1) + $(.field(stringify!($Fut), &self.$Fut))* + .finish() + } + } + + impl<Fut1, $($Fut),*> $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture, + $( + $Fut: TryFuture<Error=Fut1::Error> + ),* + { + fn new(Fut1: Fut1, $($Fut: $Fut),*) -> Self { + Self { + Fut1: try_maybe_done(Fut1), + $($Fut: try_maybe_done($Fut)),* + } + } + } + + impl<Fut1, $($Fut),*> Future for $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture, + $( + $Fut: TryFuture<Error=Fut1::Error> + ),* + { + type Output = Result<(Fut1::Ok, $($Fut::Ok),*), Fut1::Error>; + + fn poll( + self: Pin<&mut Self>, cx: &mut Context<'_> + ) -> Poll<Self::Output> { + let mut all_done = true; + let mut futures = self.project(); + all_done &= futures.Fut1.as_mut().poll(cx)?.is_ready(); + $( + all_done &= futures.$Fut.as_mut().poll(cx)?.is_ready(); + )* + + if all_done { + Poll::Ready(Ok(( + futures.Fut1.take_output().unwrap(), + $( + futures.$Fut.take_output().unwrap() + ),* + ))) + } else { + Poll::Pending + } + } + } + )*) +} + +generate! { + /// Future for the [`try_join`](try_join()) function. + (TryJoin, <Fut1, Fut2>), + + /// Future for the [`try_join3`] function. + (TryJoin3, <Fut1, Fut2, Fut3>), + + /// Future for the [`try_join4`] function. + (TryJoin4, <Fut1, Fut2, Fut3, Fut4>), + + /// Future for the [`try_join5`] function. + (TryJoin5, <Fut1, Fut2, Fut3, Fut4, Fut5>), +} + +/// Joins the result of two futures, waiting for them both to complete or +/// for one to produce an error. +/// +/// This function will return a new future which awaits both futures to +/// complete. If successful, the returned future will finish with a tuple of +/// both results. If unsuccessful, it will complete with the first error +/// encountered. +/// +/// Note that this function consumes the passed futures and returns a +/// wrapped version of it. +/// +/// # Examples +/// +/// When used on multiple futures that return [`Ok`], `try_join` will return +/// [`Ok`] of a tuple of the values: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let pair = future::try_join(a, b); +/// +/// assert_eq!(pair.await, Ok((1, 2))); +/// # }); +/// ``` +/// +/// If one of the futures resolves to an error, `try_join` will return +/// that error: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Err::<i32, i32>(2)); +/// let pair = future::try_join(a, b); +/// +/// assert_eq!(pair.await, Err(2)); +/// # }); +/// ``` +pub fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, +{ + assert_future::<Result<(Fut1::Ok, Fut2::Ok), Fut1::Error>, _>(TryJoin::new(future1, future2)) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let tuple = future::try_join3(a, b, c); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3))); +/// # }); +/// ``` +pub fn try_join3<Fut1, Fut2, Fut3>( + future1: Fut1, + future2: Fut2, + future3: Fut3, +) -> TryJoin3<Fut1, Fut2, Fut3> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, +{ + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok), Fut1::Error>, _>(TryJoin3::new( + future1, future2, future3, + )) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let d = future::ready(Ok::<i32, i32>(4)); +/// let tuple = future::try_join4(a, b, c, d); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3, 4))); +/// # }); +/// ``` +pub fn try_join4<Fut1, Fut2, Fut3, Fut4>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, +) -> TryJoin4<Fut1, Fut2, Fut3, Fut4> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, + Fut4: TryFuture<Error = Fut1::Error>, +{ + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok), Fut1::Error>, _>( + TryJoin4::new(future1, future2, future3, future4), + ) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let d = future::ready(Ok::<i32, i32>(4)); +/// let e = future::ready(Ok::<i32, i32>(5)); +/// let tuple = future::try_join5(a, b, c, d, e); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3, 4, 5))); +/// # }); +/// ``` +pub fn try_join5<Fut1, Fut2, Fut3, Fut4, Fut5>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, + future5: Fut5, +) -> TryJoin5<Fut1, Fut2, Fut3, Fut4, Fut5> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, + Fut4: TryFuture<Error = Fut1::Error>, + Fut5: TryFuture<Error = Fut1::Error>, +{ + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok, Fut5::Ok), Fut1::Error>, _>( + TryJoin5::new(future1, future2, future3, future4, future5), + ) +} diff --git a/third_party/rust/futures-util/src/future/try_join_all.rs b/third_party/rust/futures-util/src/future/try_join_all.rs new file mode 100644 index 0000000000..506f450657 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_join_all.rs @@ -0,0 +1,200 @@ +//! Definition of the `TryJoinAll` combinator, waiting for all of a list of +//! futures to finish with either success or error. + +use alloc::boxed::Box; +use alloc::vec::Vec; +use core::fmt; +use core::future::Future; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; + +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; + +enum FinalState<E = ()> { + Pending, + AllDone, + Error(E), +} + +/// Future for the [`try_join_all`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct TryJoinAll<F> +where + F: TryFuture, +{ + kind: TryJoinAllKind<F>, +} + +enum TryJoinAllKind<F> +where + F: TryFuture, +{ + Small { + elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, + }, +} + +impl<F> fmt::Debug for TryJoinAll<F> +where + F: TryFuture + fmt::Debug, + F::Ok: fmt::Debug, + F::Error: fmt::Debug, + F::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } + } +} + +/// Creates a future which represents either a collection of the results of the +/// futures given or an error. +/// +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec<T>` in the same order as they +/// were provided. +/// +/// If any future returns an error then all other futures will be canceled and +/// an error will be returned immediately. If all futures complete successfully, +/// however, then the returned future will succeed with a `Vec` of all the +/// successful results. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # See Also +/// +/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// +/// Some examples for additional functionality provided by these are: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::{self, try_join_all}; +/// +/// let futures = vec![ +/// future::ok::<u32, u32>(1), +/// future::ok::<u32, u32>(2), +/// future::ok::<u32, u32>(3), +/// ]; +/// +/// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3])); +/// +/// let futures = vec![ +/// future::ok::<u32, u32>(1), +/// future::err::<u32, u32>(2), +/// future::ok::<u32, u32>(3), +/// ]; +/// +/// assert_eq!(try_join_all(futures).await, Err(2)); +/// # }); +/// ``` +pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> +where + I: IntoIterator, + I::Item: TryFuture, +{ + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } +} + +impl<F> Future for TryJoinAll<F> +where + F: TryFuture, +{ + type Output = Result<Vec<F::Ok>, F::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } + } + + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), + } + } +} + +impl<F> FromIterator<F> for TryJoinAll<F> +where + F: TryFuture, +{ + fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { + try_join_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/try_maybe_done.rs b/third_party/rust/futures-util/src/future/try_maybe_done.rs new file mode 100644 index 0000000000..24044d2c27 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_maybe_done.rs @@ -0,0 +1,92 @@ +//! Definition of the TryMaybeDone combinator + +use super::assert_future; +use core::mem; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; + +/// A future that may have completed with an error. +/// +/// This is created by the [`try_maybe_done()`] function. +#[derive(Debug)] +pub enum TryMaybeDone<Fut: TryFuture> { + /// A not-yet-completed future + Future(/* #[pin] */ Fut), + /// The output of the completed future + Done(Fut::Ok), + /// The empty variant after the result of a [`TryMaybeDone`] has been + /// taken using the [`take_output`](TryMaybeDone::take_output) method, + /// or if the future returned an error. + Gone, +} + +impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {} + +/// Wraps a future into a `TryMaybeDone` +pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> { + assert_future::<Result<(), Fut::Error>, _>(TryMaybeDone::Future(future)) +} + +impl<Fut: TryFuture> TryMaybeDone<Fut> { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has completed successfully and [`take_output`](TryMaybeDone::take_output) + /// has not yet been called. + #[inline] + pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> { + unsafe { + match self.get_unchecked_mut() { + TryMaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempt to take the output of a `TryMaybeDone` without driving it + /// towards completion. + #[inline] + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> { + match &*self { + Self::Done(_) => {} + Self::Future(_) | Self::Gone => return None, + } + unsafe { + match mem::replace(self.get_unchecked_mut(), Self::Gone) { + TryMaybeDone::Done(output) => Some(output), + _ => unreachable!(), + } + } + } +} + +impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> { + fn is_terminated(&self) -> bool { + match self { + Self::Future(_) => false, + Self::Done(_) | Self::Gone => true, + } + } +} + +impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { + type Output = Result<(), Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + unsafe { + match self.as_mut().get_unchecked_mut() { + TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) { + Ok(res) => self.set(Self::Done(res)), + Err(e) => { + self.set(Self::Gone); + return Poll::Ready(Err(e)); + } + }, + TryMaybeDone::Done(_) => {} + TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"), + } + } + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/future/try_select.rs b/third_party/rust/futures-util/src/future/try_select.rs new file mode 100644 index 0000000000..4d0b7ff135 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_select.rs @@ -0,0 +1,84 @@ +use crate::future::{Either, TryFutureExt}; +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`try_select()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct TrySelect<A, B> { + inner: Option<(A, B)>, +} + +impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// success/error type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt}; +/// +/// // A poor-man's try_join implemented on top of select +/// +/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E> +/// where A: TryFuture<Error = E> + Unpin + 'static, +/// B: TryFuture<Error = E> + Unpin + 'static, +/// E: 'static, +/// { +/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> { +/// match res { +/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))), +/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))), +/// Err(Either::Left((e, _))) => Box::new(future::err(e)), +/// Err(Either::Right((e, _))) => Box::new(future::err(e)), +/// } +/// }) +/// } +/// ``` +pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B> +where + A: TryFuture + Unpin, + B: TryFuture + Unpin, +{ + super::assert_future::< + Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, + _, + >(TrySelect { inner: Some((future1, future2)) }) +} + +impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> +where + A: TryFuture, + B: TryFuture, +{ + #[allow(clippy::type_complexity)] + type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + match a.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), + Poll::Pending => match b.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + }, + } + } +} |