summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/future
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-util/src/future')
-rw-r--r--third_party/rust/futures-util/src/future/abortable.rs19
-rw-r--r--third_party/rust/futures-util/src/future/either.rs317
-rw-r--r--third_party/rust/futures-util/src/future/future/catch_unwind.rs38
-rw-r--r--third_party/rust/futures-util/src/future/future/flatten.rs153
-rw-r--r--third_party/rust/futures-util/src/future/future/fuse.rs93
-rw-r--r--third_party/rust/futures-util/src/future/future/map.rs66
-rw-r--r--third_party/rust/futures-util/src/future/future/mod.rs610
-rw-r--r--third_party/rust/futures-util/src/future/future/remote_handle.rs126
-rw-r--r--third_party/rust/futures-util/src/future/future/shared.rs413
-rw-r--r--third_party/rust/futures-util/src/future/join.rs217
-rw-r--r--third_party/rust/futures-util/src/future/join_all.rs166
-rw-r--r--third_party/rust/futures-util/src/future/lazy.rs60
-rw-r--r--third_party/rust/futures-util/src/future/maybe_done.rs104
-rw-r--r--third_party/rust/futures-util/src/future/mod.rs131
-rw-r--r--third_party/rust/futures-util/src/future/option.rs64
-rw-r--r--third_party/rust/futures-util/src/future/pending.rs55
-rw-r--r--third_party/rust/futures-util/src/future/poll_fn.rs58
-rw-r--r--third_party/rust/futures-util/src/future/poll_immediate.rs126
-rw-r--r--third_party/rust/futures-util/src/future/ready.rs82
-rw-r--r--third_party/rust/futures-util/src/future/select.rs125
-rw-r--r--third_party/rust/futures-util/src/future/select_all.rs75
-rw-r--r--third_party/rust/futures-util/src/future/select_ok.rs85
-rw-r--r--third_party/rust/futures-util/src/future/try_future/into_future.rs36
-rw-r--r--third_party/rust/futures-util/src/future/try_future/mod.rs625
-rw-r--r--third_party/rust/futures-util/src/future/try_future/try_flatten.rs162
-rw-r--r--third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs62
-rw-r--r--third_party/rust/futures-util/src/future/try_join.rs256
-rw-r--r--third_party/rust/futures-util/src/future/try_join_all.rs200
-rw-r--r--third_party/rust/futures-util/src/future/try_maybe_done.rs92
-rw-r--r--third_party/rust/futures-util/src/future/try_select.rs84
30 files changed, 4700 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/future/abortable.rs b/third_party/rust/futures-util/src/future/abortable.rs
new file mode 100644
index 0000000000..d017ab7340
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/abortable.rs
@@ -0,0 +1,19 @@
+use super::assert_future;
+use crate::future::{AbortHandle, Abortable, Aborted};
+use futures_core::future::Future;
+
+/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it.
+///
+/// This function is a convenient (but less flexible) alternative to calling
+/// `AbortHandle::new` and `Abortable::new` manually.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
+where
+ Fut: Future,
+{
+ let (handle, reg) = AbortHandle::new_pair();
+ let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg));
+ (abortable, handle)
+}
diff --git a/third_party/rust/futures-util/src/future/either.rs b/third_party/rust/futures-util/src/future/either.rs
new file mode 100644
index 0000000000..27e5064dfb
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/either.rs
@@ -0,0 +1,317 @@
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::{FusedStream, Stream};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+/// Combines two different futures, streams, or sinks having the same associated types into a single type.
+///
+/// This is useful when conditionally choosing between two distinct future types:
+///
+/// ```rust
+/// use futures::future::Either;
+///
+/// # futures::executor::block_on(async {
+/// let cond = true;
+///
+/// let fut = if cond {
+/// Either::Left(async move { 12 })
+/// } else {
+/// Either::Right(async move { 44 })
+/// };
+///
+/// assert_eq!(fut.await, 12);
+/// # })
+/// ```
+#[derive(Debug, Clone)]
+pub enum Either<A, B> {
+ /// First branch of the type
+ Left(/* #[pin] */ A),
+ /// Second branch of the type
+ Right(/* #[pin] */ B),
+}
+
+impl<A, B> Either<A, B> {
+ /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> {
+ // SAFETY: We can use `new_unchecked` because the `inner` parts are
+ // guaranteed to be pinned, as they come from `self` which is pinned.
+ unsafe {
+ match *Pin::get_ref(self) {
+ Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)),
+ }
+ }
+ }
+
+ /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ // SAFETY: `get_unchecked_mut` is fine because we don't move anything.
+ // We can use `new_unchecked` because the `inner` parts are guaranteed
+ // to be pinned, as they come from `self` which is pinned, and we never
+ // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We
+ // also don't have an implementation of `Drop`, nor manual `Unpin`.
+ unsafe {
+ match *Pin::get_unchecked_mut(self) {
+ Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)),
+ }
+ }
+ }
+}
+
+impl<A, B, T> Either<(T, A), (T, B)> {
+ /// Factor out a homogeneous type from an either of pairs.
+ ///
+ /// Here, the homogeneous type is the first element of the pairs.
+ pub fn factor_first(self) -> (T, Either<A, B>) {
+ match self {
+ Either::Left((x, a)) => (x, Either::Left(a)),
+ Either::Right((x, b)) => (x, Either::Right(b)),
+ }
+ }
+}
+
+impl<A, B, T> Either<(A, T), (B, T)> {
+ /// Factor out a homogeneous type from an either of pairs.
+ ///
+ /// Here, the homogeneous type is the second element of the pairs.
+ pub fn factor_second(self) -> (Either<A, B>, T) {
+ match self {
+ Either::Left((a, x)) => (Either::Left(a), x),
+ Either::Right((b, x)) => (Either::Right(b), x),
+ }
+ }
+}
+
+impl<T> Either<T, T> {
+ /// Extract the value of an either over two equivalent types.
+ pub fn into_inner(self) -> T {
+ match self {
+ Either::Left(x) => x,
+ Either::Right(x) => x,
+ }
+ }
+}
+
+impl<A, B> Future for Either<A, B>
+where
+ A: Future,
+ B: Future<Output = A::Output>,
+{
+ type Output = A::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll(cx),
+ Either::Right(x) => x.poll(cx),
+ }
+ }
+}
+
+impl<A, B> FusedFuture for Either<A, B>
+where
+ A: FusedFuture,
+ B: FusedFuture<Output = A::Output>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Either::Left(x) => x.is_terminated(),
+ Either::Right(x) => x.is_terminated(),
+ }
+ }
+}
+
+impl<A, B> Stream for Either<A, B>
+where
+ A: Stream,
+ B: Stream<Item = A::Item>,
+{
+ type Item = A::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_next(cx),
+ Either::Right(x) => x.poll_next(cx),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self {
+ Either::Left(x) => x.size_hint(),
+ Either::Right(x) => x.size_hint(),
+ }
+ }
+}
+
+impl<A, B> FusedStream for Either<A, B>
+where
+ A: FusedStream,
+ B: FusedStream<Item = A::Item>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Either::Left(x) => x.is_terminated(),
+ Either::Right(x) => x.is_terminated(),
+ }
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<A, B, Item> Sink<Item> for Either<A, B>
+where
+ A: Sink<Item>,
+ B: Sink<Item, Error = A::Error>,
+{
+ type Error = A::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_ready(cx),
+ Either::Right(x) => x.poll_ready(cx),
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.start_send(item),
+ Either::Right(x) => x.start_send(item),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_flush(cx),
+ Either::Right(x) => x.poll_flush(cx),
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_close(cx),
+ Either::Right(x) => x.poll_close(cx),
+ }
+ }
+}
+
+#[cfg(feature = "io")]
+#[cfg(feature = "std")]
+mod if_std {
+ use super::*;
+
+ use core::pin::Pin;
+ use core::task::{Context, Poll};
+ use futures_io::{
+ AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
+ };
+
+ impl<A, B> AsyncRead for Either<A, B>
+ where
+ A: AsyncRead,
+ B: AsyncRead,
+ {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_read(cx, buf),
+ Either::Right(x) => x.poll_read(cx, buf),
+ }
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<Result<usize>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_read_vectored(cx, bufs),
+ Either::Right(x) => x.poll_read_vectored(cx, bufs),
+ }
+ }
+ }
+
+ impl<A, B> AsyncWrite for Either<A, B>
+ where
+ A: AsyncWrite,
+ B: AsyncWrite,
+ {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_write(cx, buf),
+ Either::Right(x) => x.poll_write(cx, buf),
+ }
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<Result<usize>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_write_vectored(cx, bufs),
+ Either::Right(x) => x.poll_write_vectored(cx, bufs),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_flush(cx),
+ Either::Right(x) => x.poll_flush(cx),
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_close(cx),
+ Either::Right(x) => x.poll_close(cx),
+ }
+ }
+ }
+
+ impl<A, B> AsyncSeek for Either<A, B>
+ where
+ A: AsyncSeek,
+ B: AsyncSeek,
+ {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<Result<u64>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_seek(cx, pos),
+ Either::Right(x) => x.poll_seek(cx, pos),
+ }
+ }
+ }
+
+ impl<A, B> AsyncBufRead for Either<A, B>
+ where
+ A: AsyncBufRead,
+ B: AsyncBufRead,
+ {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.poll_fill_buf(cx),
+ Either::Right(x) => x.poll_fill_buf(cx),
+ }
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ match self.as_pin_mut() {
+ Either::Left(x) => x.consume(amt),
+ Either::Right(x) => x.consume(amt),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/catch_unwind.rs b/third_party/rust/futures-util/src/future/future/catch_unwind.rs
new file mode 100644
index 0000000000..0e09d6eeb0
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/catch_unwind.rs
@@ -0,0 +1,38 @@
+use core::any::Any;
+use core::pin::Pin;
+use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe};
+
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CatchUnwind<Fut> {
+ #[pin]
+ future: Fut,
+ }
+}
+
+impl<Fut> CatchUnwind<Fut>
+where
+ Fut: Future + UnwindSafe,
+{
+ pub(super) fn new(future: Fut) -> Self {
+ Self { future }
+ }
+}
+
+impl<Fut> Future for CatchUnwind<Fut>
+where
+ Fut: Future + UnwindSafe,
+{
+ type Output = Result<Fut::Output, Box<dyn Any + Send>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let f = self.project().future;
+ catch_unwind(AssertUnwindSafe(|| f.poll(cx)))?.map(Ok)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/flatten.rs b/third_party/rust/futures-util/src/future/future/flatten.rs
new file mode 100644
index 0000000000..bd767af344
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/flatten.rs
@@ -0,0 +1,153 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ #[project = FlattenProj]
+ #[derive(Debug)]
+ pub enum Flatten<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
+}
+
+impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
+ pub(crate) fn new(future: Fut1) -> Self {
+ Self::First { f: future }
+ }
+}
+
+impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
+where
+ Fut: Future,
+ Fut::Output: Future,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Empty => true,
+ _ => false,
+ }
+ }
+}
+
+impl<Fut> Future for Flatten<Fut, Fut::Output>
+where
+ Fut: Future,
+ Fut::Output: Future,
+{
+ type Output = <Fut::Output as Future>::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ FlattenProj::First { f } => {
+ let f = ready!(f.poll(cx));
+ self.set(Self::Second { f });
+ }
+ FlattenProj::Second { f } => {
+ let output = ready!(f.poll(cx));
+ self.set(Self::Empty);
+ break output;
+ }
+ FlattenProj::Empty => panic!("Flatten polled after completion"),
+ }
+ })
+ }
+}
+
+impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
+where
+ Fut: Future,
+ Fut::Output: Stream,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Empty => true,
+ _ => false,
+ }
+ }
+}
+
+impl<Fut> Stream for Flatten<Fut, Fut::Output>
+where
+ Fut: Future,
+ Fut::Output: Stream,
+{
+ type Item = <Fut::Output as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ FlattenProj::First { f } => {
+ let f = ready!(f.poll(cx));
+ self.set(Self::Second { f });
+ }
+ FlattenProj::Second { f } => {
+ let output = ready!(f.poll_next(cx));
+ if output.is_none() {
+ self.set(Self::Empty);
+ }
+ break output;
+ }
+ FlattenProj::Empty => break None,
+ }
+ })
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output>
+where
+ Fut: Future,
+ Fut::Output: Sink<Item>,
+{
+ type Error = <Fut::Output as Sink<Item>>::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ FlattenProj::First { f } => {
+ let f = ready!(f.poll(cx));
+ self.set(Self::Second { f });
+ }
+ FlattenProj::Second { f } => {
+ break ready!(f.poll_ready(cx));
+ }
+ FlattenProj::Empty => panic!("poll_ready called after eof"),
+ }
+ })
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ match self.project() {
+ FlattenProj::First { .. } => panic!("poll_ready not called first"),
+ FlattenProj::Second { f } => f.start_send(item),
+ FlattenProj::Empty => panic!("start_send called after eof"),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.project() {
+ FlattenProj::First { .. } => Poll::Ready(Ok(())),
+ FlattenProj::Second { f } => f.poll_flush(cx),
+ FlattenProj::Empty => panic!("poll_flush called after eof"),
+ }
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let res = match self.as_mut().project() {
+ FlattenProj::Second { f } => f.poll_close(cx),
+ _ => Poll::Ready(Ok(())),
+ };
+ if res.is_ready() {
+ self.set(Self::Empty);
+ }
+ res
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/fuse.rs b/third_party/rust/futures-util/src/future/future/fuse.rs
new file mode 100644
index 0000000000..597aec1a40
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/fuse.rs
@@ -0,0 +1,93 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`fuse`](super::FutureExt::fuse) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Fuse<Fut> {
+ #[pin]
+ inner: Option<Fut>,
+ }
+}
+
+impl<Fut> Fuse<Fut> {
+ pub(super) fn new(f: Fut) -> Self {
+ Self { inner: Some(f) }
+ }
+}
+
+impl<Fut: Future> Fuse<Fut> {
+ /// Creates a new `Fuse`-wrapped future which is already terminated.
+ ///
+ /// This can be useful in combination with looping and the `select!`
+ /// macro, which bypasses terminated futures.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::future::{Fuse, FusedFuture, FutureExt};
+ /// use futures::select;
+ /// use futures::stream::StreamExt;
+ /// use futures::pin_mut;
+ ///
+ /// let (sender, mut stream) = mpsc::unbounded();
+ ///
+ /// // Send a few messages into the stream
+ /// sender.unbounded_send(()).unwrap();
+ /// sender.unbounded_send(()).unwrap();
+ /// drop(sender);
+ ///
+ /// // Use `Fuse::terminated()` to create an already-terminated future
+ /// // which may be instantiated later.
+ /// let foo_printer = Fuse::terminated();
+ /// pin_mut!(foo_printer);
+ ///
+ /// loop {
+ /// select! {
+ /// _ = foo_printer => {},
+ /// () = stream.select_next_some() => {
+ /// if !foo_printer.is_terminated() {
+ /// println!("Foo is already being printed!");
+ /// } else {
+ /// foo_printer.set(async {
+ /// // do some other async operations
+ /// println!("Printing foo from `foo_printer` future");
+ /// }.fuse());
+ /// }
+ /// },
+ /// complete => break, // `foo_printer` is terminated and the stream is done
+ /// }
+ /// }
+ /// # });
+ /// ```
+ pub fn terminated() -> Self {
+ Self { inner: None }
+ }
+}
+
+impl<Fut: Future> FusedFuture for Fuse<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<Fut: Future> Future for Fuse<Fut> {
+ type Output = Fut::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
+ Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
+ Some(fut) => {
+ let output = ready!(fut.poll(cx));
+ self.project().inner.set(None);
+ output
+ }
+ None => return Poll::Pending,
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/map.rs b/third_party/rust/futures-util/src/future/future/map.rs
new file mode 100644
index 0000000000..7471aba000
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/map.rs
@@ -0,0 +1,66 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+use crate::fns::FnOnce1;
+
+pin_project! {
+ /// Internal Map future
+ #[project = MapProj]
+ #[project_replace = MapProjReplace]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub enum Map<Fut, F> {
+ Incomplete {
+ #[pin]
+ future: Fut,
+ f: F,
+ },
+ Complete,
+ }
+}
+
+impl<Fut, F> Map<Fut, F> {
+ /// Creates a new Map.
+ pub(crate) fn new(future: Fut, f: F) -> Self {
+ Self::Incomplete { future, f }
+ }
+}
+
+impl<Fut, F, T> FusedFuture for Map<Fut, F>
+where
+ Fut: Future,
+ F: FnOnce1<Fut::Output, Output = T>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Incomplete { .. } => false,
+ Self::Complete => true,
+ }
+ }
+}
+
+impl<Fut, F, T> Future for Map<Fut, F>
+where
+ Fut: Future,
+ F: FnOnce1<Fut::Output, Output = T>,
+{
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ match self.as_mut().project() {
+ MapProj::Incomplete { future, .. } => {
+ let output = ready!(future.poll(cx));
+ match self.project_replace(Map::Complete) {
+ MapProjReplace::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
+ MapProjReplace::Complete => unreachable!(),
+ }
+ }
+ MapProj::Complete => {
+ panic!("Map must not be polled after it returned `Poll::Ready`")
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/mod.rs b/third_party/rust/futures-util/src/future/future/mod.rs
new file mode 100644
index 0000000000..c11d108207
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/mod.rs
@@ -0,0 +1,610 @@
+//! Futures
+//!
+//! This module contains a number of functions for working with `Future`s,
+//! including the `FutureExt` trait which adds methods to `Future` types.
+
+#[cfg(feature = "alloc")]
+use alloc::boxed::Box;
+use core::pin::Pin;
+
+use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
+use crate::future::{assert_future, Either};
+use crate::never::Never;
+use crate::stream::assert_stream;
+#[cfg(feature = "alloc")]
+use futures_core::future::{BoxFuture, LocalBoxFuture};
+use futures_core::{
+ future::Future,
+ stream::Stream,
+ task::{Context, Poll},
+};
+use pin_utils::pin_mut;
+
+// Combinators
+
+mod flatten;
+mod fuse;
+mod map;
+
+delegate_all!(
+ /// Future for the [`flatten`](super::FutureExt::flatten) method.
+ Flatten<F>(
+ flatten::Flatten<F, <F as Future>::Output>
+ ): Debug + Future + FusedFuture + New[|x: F| flatten::Flatten::new(x)]
+ where F: Future
+);
+
+delegate_all!(
+ /// Stream for the [`flatten_stream`](FutureExt::flatten_stream) method.
+ FlattenStream<F>(
+ flatten::Flatten<F, <F as Future>::Output>
+ ): Debug + Sink + Stream + FusedStream + New[|x: F| flatten::Flatten::new(x)]
+ where F: Future
+);
+
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use fuse::Fuse;
+
+delegate_all!(
+ /// Future for the [`map`](super::FutureExt::map) method.
+ Map<Fut, F>(
+ map::Map<Fut, F>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)]
+);
+
+delegate_all!(
+ /// Stream for the [`into_stream`](FutureExt::into_stream) method.
+ IntoStream<F>(
+ crate::stream::Once<F>
+ ): Debug + Stream + FusedStream + New[|x: F| crate::stream::Once::new(x)]
+);
+
+delegate_all!(
+ /// Future for the [`map_into`](FutureExt::map_into) combinator.
+ MapInto<Fut, T>(
+ Map<Fut, IntoFn<T>>
+ ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Future for the [`then`](FutureExt::then) method.
+ Then<Fut1, Fut2, F>(
+ flatten::Flatten<Map<Fut1, F>, Fut2>
+ ): Debug + Future + FusedFuture + New[|x: Fut1, y: F| flatten::Flatten::new(Map::new(x, y))]
+);
+
+delegate_all!(
+ /// Future for the [`inspect`](FutureExt::inspect) method.
+ Inspect<Fut, F>(
+ map::Map<Fut, InspectFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, inspect_fn(f))]
+);
+
+delegate_all!(
+ /// Future for the [`never_error`](super::FutureExt::never_error) combinator.
+ NeverError<Fut>(
+ Map<Fut, OkFn<Never>>
+ ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())]
+);
+
+delegate_all!(
+ /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator.
+ UnitError<Fut>(
+ Map<Fut, OkFn<()>>
+ ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())]
+);
+
+#[cfg(feature = "std")]
+mod catch_unwind;
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::catch_unwind::CatchUnwind;
+
+#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+#[cfg(feature = "std")]
+mod remote_handle;
+#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::remote_handle::{Remote, RemoteHandle};
+
+#[cfg(feature = "std")]
+mod shared;
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::shared::{Shared, WeakShared};
+
+impl<T: ?Sized> FutureExt for T where T: Future {}
+
+/// An extension trait for `Future`s that provides a variety of convenient
+/// adapters.
+pub trait FutureExt: Future {
+ /// Map this future's output to a different type, returning a new future of
+ /// the resulting type.
+ ///
+ /// This function is similar to the `Option::map` or `Iterator::map` where
+ /// it will change the type of the underlying future. This is useful to
+ /// chain along a computation once a future has been resolved.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 1 };
+ /// let new_future = future.map(|x| x + 3);
+ /// assert_eq!(new_future.await, 4);
+ /// # });
+ /// ```
+ fn map<U, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnOnce(Self::Output) -> U,
+ Self: Sized,
+ {
+ assert_future::<U, _>(Map::new(self, f))
+ }
+
+ /// Map this future's output to a different type, returning a new future of
+ /// the resulting type.
+ ///
+ /// This function is equivalent to calling `map(Into::into)` but allows naming
+ /// the return type.
+ fn map_into<U>(self) -> MapInto<Self, U>
+ where
+ Self::Output: Into<U>,
+ Self: Sized,
+ {
+ assert_future::<U, _>(MapInto::new(self))
+ }
+
+ /// Chain on a computation for when a future finished, passing the result of
+ /// the future to the provided closure `f`.
+ ///
+ /// The returned value of the closure must implement the `Future` trait
+ /// and can represent some more work to be done before the composed future
+ /// is finished.
+ ///
+ /// The closure `f` is only run *after* successful completion of the `self`
+ /// future.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future_of_1 = async { 1 };
+ /// let future_of_4 = future_of_1.then(|x| async move { x + 3 });
+ /// assert_eq!(future_of_4.await, 4);
+ /// # });
+ /// ```
+ fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
+ where
+ F: FnOnce(Self::Output) -> Fut,
+ Fut: Future,
+ Self: Sized,
+ {
+ assert_future::<Fut::Output, _>(Then::new(self, f))
+ }
+
+ /// Wrap this future in an `Either` future, making it the left-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `right_future` method to write `if`
+ /// statements that evaluate to different futures in different branches.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let x = 6;
+ /// let future = if x < 10 {
+ /// async { true }.left_future()
+ /// } else {
+ /// async { false }.right_future()
+ /// };
+ ///
+ /// assert_eq!(future.await, true);
+ /// # });
+ /// ```
+ fn left_future<B>(self) -> Either<Self, B>
+ where
+ B: Future<Output = Self::Output>,
+ Self: Sized,
+ {
+ assert_future::<Self::Output, _>(Either::Left(self))
+ }
+
+ /// Wrap this future in an `Either` future, making it the right-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `left_future` method to write `if`
+ /// statements that evaluate to different futures in different branches.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let x = 6;
+ /// let future = if x > 10 {
+ /// async { true }.left_future()
+ /// } else {
+ /// async { false }.right_future()
+ /// };
+ ///
+ /// assert_eq!(future.await, false);
+ /// # });
+ /// ```
+ fn right_future<A>(self) -> Either<A, Self>
+ where
+ A: Future<Output = Self::Output>,
+ Self: Sized,
+ {
+ assert_future::<Self::Output, _>(Either::Right(self))
+ }
+
+ /// Convert this future into a single element stream.
+ ///
+ /// The returned stream contains single success if this future resolves to
+ /// success or single error if this future resolves into error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::stream::StreamExt;
+ ///
+ /// let future = async { 17 };
+ /// let stream = future.into_stream();
+ /// let collected: Vec<_> = stream.collect().await;
+ /// assert_eq!(collected, vec![17]);
+ /// # });
+ /// ```
+ fn into_stream(self) -> IntoStream<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Self::Output, _>(IntoStream::new(self))
+ }
+
+ /// Flatten the execution of this future when the output of this
+ /// future is itself another future.
+ ///
+ /// This can be useful when combining futures together to flatten the
+ /// computation out the final result.
+ ///
+ /// This method is roughly equivalent to `self.then(|x| x)`.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let nested_future = async { async { 1 } };
+ /// let future = nested_future.flatten();
+ /// assert_eq!(future.await, 1);
+ /// # });
+ /// ```
+ fn flatten(self) -> Flatten<Self>
+ where
+ Self::Output: Future,
+ Self: Sized,
+ {
+ let f = Flatten::new(self);
+ assert_future::<<<Self as Future>::Output as Future>::Output, _>(f)
+ }
+
+ /// Flatten the execution of this future when the successful result of this
+ /// future is a stream.
+ ///
+ /// This can be useful when stream initialization is deferred, and it is
+ /// convenient to work with that stream as if stream was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream_items = vec![17, 18, 19];
+ /// let future_of_a_stream = async { stream::iter(stream_items) };
+ ///
+ /// let stream = future_of_a_stream.flatten_stream();
+ /// let list: Vec<_> = stream.collect().await;
+ /// assert_eq!(list, vec![17, 18, 19]);
+ /// # });
+ /// ```
+ fn flatten_stream(self) -> FlattenStream<Self>
+ where
+ Self::Output: Stream,
+ Self: Sized,
+ {
+ assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self))
+ }
+
+ /// Fuse a future such that `poll` will never again be called once it has
+ /// completed. This method can be used to turn any `Future` into a
+ /// `FusedFuture`.
+ ///
+ /// Normally, once a future has returned `Poll::Ready` from `poll`,
+ /// any further calls could exhibit bad behavior such as blocking
+ /// forever, panicking, never returning, etc. If it is known that `poll`
+ /// may be called too often then this method can be used to ensure that it
+ /// has defined semantics.
+ ///
+ /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready`
+ /// previously, it will return `Poll::Pending`, from `poll` again (and will
+ /// continue to do so for all future calls to `poll`).
+ ///
+ /// This combinator will drop the underlying future as soon as it has been
+ /// completed to ensure resources are reclaimed as soon as possible.
+ fn fuse(self) -> Fuse<Self>
+ where
+ Self: Sized,
+ {
+ let f = Fuse::new(self);
+ assert_future::<Self::Output, _>(f)
+ }
+
+ /// Do something with the output of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 1 };
+ /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x));
+ /// assert_eq!(new_future.await, 1);
+ /// # });
+ /// ```
+ fn inspect<F>(self, f: F) -> Inspect<Self, F>
+ where
+ F: FnOnce(&Self::Output),
+ Self: Sized,
+ {
+ assert_future::<Self::Output, _>(Inspect::new(self, f))
+ }
+
+ /// Catches unwinding panics while polling the future.
+ ///
+ /// In general, panics within a future can propagate all the way out to the
+ /// task level. This combinator makes it possible to halt unwinding within
+ /// the future itself. It's most commonly used within task executors. It's
+ /// not recommended to use this for error handling.
+ ///
+ /// Note that this method requires the `UnwindSafe` bound from the standard
+ /// library. This isn't always applied automatically, and the standard
+ /// library provides an `AssertUnwindSafe` wrapper type to apply it
+ /// after-the fact. To assist using this method, the `Future` trait is also
+ /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::{self, FutureExt, Ready};
+ ///
+ /// let future = future::ready(2);
+ /// assert!(future.catch_unwind().await.is_ok());
+ ///
+ /// let future = future::lazy(|_| -> Ready<i32> {
+ /// unimplemented!()
+ /// });
+ /// assert!(future.catch_unwind().await.is_err());
+ /// # });
+ /// ```
+ #[cfg(feature = "std")]
+ fn catch_unwind(self) -> CatchUnwind<Self>
+ where
+ Self: Sized + ::std::panic::UnwindSafe,
+ {
+ assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new(
+ self,
+ ))
+ }
+
+ /// Create a cloneable handle to this future where all handles will resolve
+ /// to the same result.
+ ///
+ /// The `shared` combinator method provides a method to convert any future
+ /// into a cloneable future. It enables a future to be polled by multiple
+ /// threads.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 6 };
+ /// let shared1 = future.shared();
+ /// let shared2 = shared1.clone();
+ ///
+ /// assert_eq!(6, shared1.await);
+ /// assert_eq!(6, shared2.await);
+ /// # });
+ /// ```
+ ///
+ /// ```
+ /// // Note, unlike most examples this is written in the context of a
+ /// // synchronous function to better illustrate the cross-thread aspect of
+ /// // the `shared` combinator.
+ ///
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::executor::block_on;
+ /// use std::thread;
+ ///
+ /// let future = async { 6 };
+ /// let shared1 = future.shared();
+ /// let shared2 = shared1.clone();
+ /// let join_handle = thread::spawn(move || {
+ /// assert_eq!(6, block_on(shared2));
+ /// });
+ /// assert_eq!(6, shared1.await);
+ /// join_handle.join().unwrap();
+ /// # });
+ /// ```
+ #[cfg(feature = "std")]
+ fn shared(self) -> Shared<Self>
+ where
+ Self: Sized,
+ Self::Output: Clone,
+ {
+ assert_future::<Self::Output, _>(Shared::new(self))
+ }
+
+ /// Turn this future into a future that yields `()` on completion and sends
+ /// its output to another future on a separate task.
+ ///
+ /// This can be used with spawning executors to easily retrieve the result
+ /// of a future executing on a separate task or thread.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "channel")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+ #[cfg(feature = "std")]
+ fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
+ where
+ Self: Sized,
+ {
+ let (wrapped, handle) = remote_handle::remote_handle(self);
+ (assert_future::<(), _>(wrapped), handle)
+ }
+
+ /// Wrap the future in a Box, pinning it.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "alloc")]
+ fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
+ where
+ Self: Sized + Send + 'a,
+ {
+ assert_future::<Self::Output, _>(Box::pin(self))
+ }
+
+ /// Wrap the future in a Box, pinning it.
+ ///
+ /// Similar to `boxed`, but without the `Send` requirement.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "alloc")]
+ fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output>
+ where
+ Self: Sized + 'a,
+ {
+ assert_future::<Self::Output, _>(Box::pin(self))
+ }
+
+ /// Turns a [`Future<Output = T>`](Future) into a
+ /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture).
+ fn unit_error(self) -> UnitError<Self>
+ where
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self))
+ }
+
+ /// Turns a [`Future<Output = T>`](Future) into a
+ /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture).
+ fn never_error(self) -> NeverError<Self>
+ where
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self))
+ }
+
+ /// A convenience for calling `Future::poll` on `Unpin` future types.
+ fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).poll(cx)
+ }
+
+ /// Evaluates and consumes the future, returning the resulting output if
+ /// the future is ready after the first call to `Future::poll`.
+ ///
+ /// If `poll` instead returns `Poll::Pending`, `None` is returned.
+ ///
+ /// This method is useful in cases where immediacy is more important than
+ /// waiting for a result. It is also convenient for quickly obtaining
+ /// the value of a future that is known to always resolve immediately.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use futures::prelude::*;
+ /// use futures::{future::ready, future::pending};
+ /// let future_ready = ready("foobar");
+ /// let future_pending = pending::<&'static str>();
+ ///
+ /// assert_eq!(future_ready.now_or_never(), Some("foobar"));
+ /// assert_eq!(future_pending.now_or_never(), None);
+ /// ```
+ ///
+ /// In cases where it is absolutely known that a future should always
+ /// resolve immediately and never return `Poll::Pending`, this method can
+ /// be combined with `expect()`:
+ ///
+ /// ```
+ /// # use futures::{prelude::*, future::ready};
+ /// let future_ready = ready("foobar");
+ ///
+ /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar");
+ /// ```
+ fn now_or_never(self) -> Option<Self::Output>
+ where
+ Self: Sized,
+ {
+ let noop_waker = crate::task::noop_waker();
+ let mut cx = Context::from_waker(&noop_waker);
+
+ let this = self;
+ pin_mut!(this);
+ match this.poll(&mut cx) {
+ Poll::Ready(x) => Some(x),
+ _ => None,
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/remote_handle.rs b/third_party/rust/futures-util/src/future/future/remote_handle.rs
new file mode 100644
index 0000000000..1358902cab
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/remote_handle.rs
@@ -0,0 +1,126 @@
+use {
+ crate::future::{CatchUnwind, FutureExt},
+ futures_channel::oneshot::{self, Receiver, Sender},
+ futures_core::{
+ future::Future,
+ ready,
+ task::{Context, Poll},
+ },
+ pin_project_lite::pin_project,
+ std::{
+ any::Any,
+ fmt,
+ panic::{self, AssertUnwindSafe},
+ pin::Pin,
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+ },
+ thread,
+ },
+};
+
+/// The handle to a remote future returned by
+/// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this,
+/// the remote future will be woken up to be dropped by the executor.
+///
+/// ## Unwind safety
+///
+/// When the remote future panics, [Remote] will catch the unwind and transfer it to
+/// the thread where `RemoteHandle` is being awaited. This is good for the common
+/// case where [Remote] is spawned on a threadpool. It is unlikely that other code
+/// in the executor working thread shares mutable data with the spawned future and we
+/// preserve the executor from losing its working threads.
+///
+/// If you run the future locally and send the handle of to be awaited elsewhere, you
+/// must be careful with regard to unwind safety because the thread in which the future
+/// is polled will keep running after the panic and the thread running the [RemoteHandle]
+/// will unwind.
+#[must_use = "dropping a remote handle cancels the underlying future"]
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+pub struct RemoteHandle<T> {
+ rx: Receiver<thread::Result<T>>,
+ keep_running: Arc<AtomicBool>,
+}
+
+impl<T> RemoteHandle<T> {
+ /// Drops this handle *without* canceling the underlying future.
+ ///
+ /// This method can be used if you want to drop the handle, but let the
+ /// execution continue.
+ pub fn forget(self) {
+ self.keep_running.store(true, Ordering::SeqCst);
+ }
+}
+
+impl<T: 'static> Future for RemoteHandle<T> {
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ match ready!(self.rx.poll_unpin(cx)) {
+ Ok(Ok(output)) => Poll::Ready(output),
+ // the remote future panicked.
+ Ok(Err(e)) => panic::resume_unwind(e),
+ // The oneshot sender was dropped.
+ Err(e) => panic::resume_unwind(Box::new(e)),
+ }
+ }
+}
+
+type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
+
+pin_project! {
+ /// A future which sends its output to the corresponding `RemoteHandle`.
+ /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+ pub struct Remote<Fut: Future> {
+ tx: Option<Sender<SendMsg<Fut>>>,
+ keep_running: Arc<AtomicBool>,
+ #[pin]
+ future: CatchUnwind<AssertUnwindSafe<Fut>>,
+ }
+}
+
+impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("Remote").field(&self.future).finish()
+ }
+}
+
+impl<Fut: Future> Future for Remote<Fut> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let this = self.project();
+
+ if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
+ && !this.keep_running.load(Ordering::SeqCst)
+ {
+ // Cancelled, bail out
+ return Poll::Ready(());
+ }
+
+ let output = ready!(this.future.poll(cx));
+
+ // if the receiving end has gone away then that's ok, we just ignore the
+ // send error here.
+ drop(this.tx.take().unwrap().send(output));
+ Poll::Ready(())
+ }
+}
+
+pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) {
+ let (tx, rx) = oneshot::channel();
+ let keep_running = Arc::new(AtomicBool::new(false));
+
+ // Unwind Safety: See the docs for RemoteHandle.
+ let wrapped = Remote {
+ future: AssertUnwindSafe(future).catch_unwind(),
+ tx: Some(tx),
+ keep_running: keep_running.clone(),
+ };
+
+ (wrapped, RemoteHandle { rx, keep_running })
+}
diff --git a/third_party/rust/futures-util/src/future/future/shared.rs b/third_party/rust/futures-util/src/future/future/shared.rs
new file mode 100644
index 0000000000..ecd1b426db
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/shared.rs
@@ -0,0 +1,413 @@
+use crate::task::{waker_ref, ArcWake};
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+use slab::Slab;
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::hash::Hasher;
+use std::pin::Pin;
+use std::ptr;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{Acquire, SeqCst};
+use std::sync::{Arc, Mutex, Weak};
+
+/// Future for the [`shared`](super::FutureExt::shared) method.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Shared<Fut: Future> {
+ inner: Option<Arc<Inner<Fut>>>,
+ waker_key: usize,
+}
+
+struct Inner<Fut: Future> {
+ future_or_output: UnsafeCell<FutureOrOutput<Fut>>,
+ notifier: Arc<Notifier>,
+}
+
+struct Notifier {
+ state: AtomicUsize,
+ wakers: Mutex<Option<Slab<Option<Waker>>>>,
+}
+
+/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`.
+pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>);
+
+impl<Fut: Future> Clone for WeakShared<Fut> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+// The future itself is polled behind the `Arc`, so it won't be moved
+// when `Shared` is moved.
+impl<Fut: Future> Unpin for Shared<Fut> {}
+
+impl<Fut: Future> fmt::Debug for Shared<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("inner", &self.inner)
+ .field("waker_key", &self.waker_key)
+ .finish()
+ }
+}
+
+impl<Fut: Future> fmt::Debug for Inner<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Inner").finish()
+ }
+}
+
+impl<Fut: Future> fmt::Debug for WeakShared<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WeakShared").finish()
+ }
+}
+
+enum FutureOrOutput<Fut: Future> {
+ Future(Fut),
+ Output(Fut::Output),
+}
+
+unsafe impl<Fut> Send for Inner<Fut>
+where
+ Fut: Future + Send,
+ Fut::Output: Send + Sync,
+{
+}
+
+unsafe impl<Fut> Sync for Inner<Fut>
+where
+ Fut: Future + Send,
+ Fut::Output: Send + Sync,
+{
+}
+
+const IDLE: usize = 0;
+const POLLING: usize = 1;
+const COMPLETE: usize = 2;
+const POISONED: usize = 3;
+
+const NULL_WAKER_KEY: usize = usize::max_value();
+
+impl<Fut: Future> Shared<Fut> {
+ pub(super) fn new(future: Fut) -> Self {
+ let inner = Inner {
+ future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)),
+ notifier: Arc::new(Notifier {
+ state: AtomicUsize::new(IDLE),
+ wakers: Mutex::new(Some(Slab::new())),
+ }),
+ };
+
+ Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY }
+ }
+}
+
+impl<Fut> Shared<Fut>
+where
+ Fut: Future,
+{
+ /// Returns [`Some`] containing a reference to this [`Shared`]'s output if
+ /// it has already been computed by a clone or [`None`] if it hasn't been
+ /// computed yet or this [`Shared`] already returned its output from
+ /// [`poll`](Future::poll).
+ pub fn peek(&self) -> Option<&Fut::Output> {
+ if let Some(inner) = self.inner.as_ref() {
+ match inner.notifier.state.load(SeqCst) {
+ COMPLETE => unsafe { return Some(inner.output()) },
+ POISONED => panic!("inner future panicked during poll"),
+ _ => {}
+ }
+ }
+ None
+ }
+
+ /// Creates a new [`WeakShared`] for this [`Shared`].
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ pub fn downgrade(&self) -> Option<WeakShared<Fut>> {
+ if let Some(inner) = self.inner.as_ref() {
+ return Some(WeakShared(Arc::downgrade(inner)));
+ }
+ None
+ }
+
+ /// Gets the number of strong pointers to this allocation.
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ ///
+ /// # Safety
+ ///
+ /// This method by itself is safe, but using it correctly requires extra care. Another thread
+ /// can change the strong count at any time, including potentially between calling this method
+ /// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
+ pub fn strong_count(&self) -> Option<usize> {
+ self.inner.as_ref().map(|arc| Arc::strong_count(arc))
+ }
+
+ /// Gets the number of weak pointers to this allocation.
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ ///
+ /// # Safety
+ ///
+ /// This method by itself is safe, but using it correctly requires extra care. Another thread
+ /// can change the weak count at any time, including potentially between calling this method
+ /// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
+ pub fn weak_count(&self) -> Option<usize> {
+ self.inner.as_ref().map(|arc| Arc::weak_count(arc))
+ }
+
+ /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`.
+ pub fn ptr_hash<H: Hasher>(&self, state: &mut H) {
+ match self.inner.as_ref() {
+ Some(arc) => {
+ state.write_u8(1);
+ ptr::hash(Arc::as_ptr(arc), state);
+ }
+ None => {
+ state.write_u8(0);
+ }
+ }
+ }
+
+ /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to
+ /// `Arc::ptr_eq`).
+ ///
+ /// Returns `false` if either `Shared` has terminated.
+ pub fn ptr_eq(&self, rhs: &Self) -> bool {
+ let lhs = match self.inner.as_ref() {
+ Some(lhs) => lhs,
+ None => return false,
+ };
+ let rhs = match rhs.inner.as_ref() {
+ Some(rhs) => rhs,
+ None => return false,
+ };
+ Arc::ptr_eq(lhs, rhs)
+ }
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+{
+ /// Safety: callers must first ensure that `self.inner.state`
+ /// is `COMPLETE`
+ unsafe fn output(&self) -> &Fut::Output {
+ match &*self.future_or_output.get() {
+ FutureOrOutput::Output(ref item) => item,
+ FutureOrOutput::Future(_) => unreachable!(),
+ }
+ }
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ /// Registers the current task to receive a wakeup when we are awoken.
+ fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
+ let mut wakers_guard = self.notifier.wakers.lock().unwrap();
+
+ let wakers = match wakers_guard.as_mut() {
+ Some(wakers) => wakers,
+ None => return,
+ };
+
+ let new_waker = cx.waker();
+
+ if *waker_key == NULL_WAKER_KEY {
+ *waker_key = wakers.insert(Some(new_waker.clone()));
+ } else {
+ match wakers[*waker_key] {
+ Some(ref old_waker) if new_waker.will_wake(old_waker) => {}
+ // Could use clone_from here, but Waker doesn't specialize it.
+ ref mut slot => *slot = Some(new_waker.clone()),
+ }
+ }
+ debug_assert!(*waker_key != NULL_WAKER_KEY);
+ }
+
+ /// Safety: callers must first ensure that `inner.state`
+ /// is `COMPLETE`
+ unsafe fn take_or_clone_output(self: Arc<Self>) -> Fut::Output {
+ match Arc::try_unwrap(self) {
+ Ok(inner) => match inner.future_or_output.into_inner() {
+ FutureOrOutput::Output(item) => item,
+ FutureOrOutput::Future(_) => unreachable!(),
+ },
+ Err(inner) => inner.output().clone(),
+ }
+ }
+}
+
+impl<Fut> FusedFuture for Shared<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<Fut> Future for Shared<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ type Output = Fut::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+
+ let inner = this.inner.take().expect("Shared future polled again after completion");
+
+ // Fast path for when the wrapped future has already completed
+ if inner.notifier.state.load(Acquire) == COMPLETE {
+ // Safety: We're in the COMPLETE state
+ return unsafe { Poll::Ready(inner.take_or_clone_output()) };
+ }
+
+ inner.record_waker(&mut this.waker_key, cx);
+
+ match inner
+ .notifier
+ .state
+ .compare_exchange(IDLE, POLLING, SeqCst, SeqCst)
+ .unwrap_or_else(|x| x)
+ {
+ IDLE => {
+ // Lock acquired, fall through
+ }
+ POLLING => {
+ // Another task is currently polling, at this point we just want
+ // to ensure that the waker for this task is registered
+ this.inner = Some(inner);
+ return Poll::Pending;
+ }
+ COMPLETE => {
+ // Safety: We're in the COMPLETE state
+ return unsafe { Poll::Ready(inner.take_or_clone_output()) };
+ }
+ POISONED => panic!("inner future panicked during poll"),
+ _ => unreachable!(),
+ }
+
+ let waker = waker_ref(&inner.notifier);
+ let mut cx = Context::from_waker(&waker);
+
+ struct Reset<'a> {
+ state: &'a AtomicUsize,
+ did_not_panic: bool,
+ }
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ if !self.did_not_panic {
+ self.state.store(POISONED, SeqCst);
+ }
+ }
+ }
+
+ let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };
+
+ let output = {
+ let future = unsafe {
+ match &mut *inner.future_or_output.get() {
+ FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
+ _ => unreachable!(),
+ }
+ };
+
+ let poll_result = future.poll(&mut cx);
+ reset.did_not_panic = true;
+
+ match poll_result {
+ Poll::Pending => {
+ if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
+ {
+ // Success
+ drop(reset);
+ this.inner = Some(inner);
+ return Poll::Pending;
+ } else {
+ unreachable!()
+ }
+ }
+ Poll::Ready(output) => output,
+ }
+ };
+
+ unsafe {
+ *inner.future_or_output.get() = FutureOrOutput::Output(output);
+ }
+
+ inner.notifier.state.store(COMPLETE, SeqCst);
+
+ // Wake all tasks and drop the slab
+ let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
+ let mut wakers = wakers_guard.take().unwrap();
+ for waker in wakers.drain().flatten() {
+ waker.wake();
+ }
+
+ drop(reset); // Make borrow checker happy
+ drop(wakers_guard);
+
+ // Safety: We're in the COMPLETE state
+ unsafe { Poll::Ready(inner.take_or_clone_output()) }
+ }
+}
+
+impl<Fut> Clone for Shared<Fut>
+where
+ Fut: Future,
+{
+ fn clone(&self) -> Self {
+ Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY }
+ }
+}
+
+impl<Fut> Drop for Shared<Fut>
+where
+ Fut: Future,
+{
+ fn drop(&mut self) {
+ if self.waker_key != NULL_WAKER_KEY {
+ if let Some(ref inner) = self.inner {
+ if let Ok(mut wakers) = inner.notifier.wakers.lock() {
+ if let Some(wakers) = wakers.as_mut() {
+ wakers.remove(self.waker_key);
+ }
+ }
+ }
+ }
+ }
+}
+
+impl ArcWake for Notifier {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let wakers = &mut *arc_self.wakers.lock().unwrap();
+ if let Some(wakers) = wakers.as_mut() {
+ for (_key, opt_waker) in wakers {
+ if let Some(waker) = opt_waker.take() {
+ waker.wake();
+ }
+ }
+ }
+ }
+}
+
+impl<Fut: Future> WeakShared<Fut> {
+ /// Attempts to upgrade this [`WeakShared`] into a [`Shared`].
+ ///
+ /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled
+ /// to completion.
+ pub fn upgrade(&self) -> Option<Shared<Fut>> {
+ Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/join.rs b/third_party/rust/futures-util/src/future/join.rs
new file mode 100644
index 0000000000..740ffbc988
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/join.rs
@@ -0,0 +1,217 @@
+#![allow(non_snake_case)]
+
+use super::assert_future;
+use crate::future::{maybe_done, MaybeDone};
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+macro_rules! generate {
+ ($(
+ $(#[$doc:meta])*
+ ($Join:ident, <$($Fut:ident),*>),
+ )*) => ($(
+ pin_project! {
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<$($Fut: Future),*> {
+ $(#[pin] $Fut: MaybeDone<$Fut>,)*
+ }
+ }
+
+ impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*>
+ where
+ $(
+ $Fut: Future + fmt::Debug,
+ $Fut::Output: fmt::Debug,
+ )*
+ {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct(stringify!($Join))
+ $(.field(stringify!($Fut), &self.$Fut))*
+ .finish()
+ }
+ }
+
+ impl<$($Fut: Future),*> $Join<$($Fut),*> {
+ fn new($($Fut: $Fut),*) -> Self {
+ Self {
+ $($Fut: maybe_done($Fut)),*
+ }
+ }
+ }
+
+ impl<$($Fut: Future),*> Future for $Join<$($Fut),*> {
+ type Output = ($($Fut::Output),*);
+
+ fn poll(
+ self: Pin<&mut Self>, cx: &mut Context<'_>
+ ) -> Poll<Self::Output> {
+ let mut all_done = true;
+ let mut futures = self.project();
+ $(
+ all_done &= futures.$Fut.as_mut().poll(cx).is_ready();
+ )*
+
+ if all_done {
+ Poll::Ready(($(futures.$Fut.take_output().unwrap()), *))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+
+ impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> {
+ fn is_terminated(&self) -> bool {
+ $(
+ self.$Fut.is_terminated()
+ ) && *
+ }
+ }
+ )*)
+}
+
+generate! {
+ /// Future for the [`join`](join()) function.
+ (Join, <Fut1, Fut2>),
+
+ /// Future for the [`join3`] function.
+ (Join3, <Fut1, Fut2, Fut3>),
+
+ /// Future for the [`join4`] function.
+ (Join4, <Fut1, Fut2, Fut3, Fut4>),
+
+ /// Future for the [`join5`] function.
+ (Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
+}
+
+/// Joins the result of two futures, waiting for them both to complete.
+///
+/// This function will return a new future which awaits both futures to
+/// complete. The returned future will finish with a tuple of both results.
+///
+/// Note that this function consumes the passed futures and returns a
+/// wrapped version of it.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let pair = future::join(a, b);
+///
+/// assert_eq!(pair.await, (1, 2));
+/// # });
+/// ```
+pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>
+where
+ Fut1: Future,
+ Fut2: Future,
+{
+ let f = Join::new(future1, future2);
+ assert_future::<(Fut1::Output, Fut2::Output), _>(f)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let tuple = future::join3(a, b, c);
+///
+/// assert_eq!(tuple.await, (1, 2, 3));
+/// # });
+/// ```
+pub fn join3<Fut1, Fut2, Fut3>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+) -> Join3<Fut1, Fut2, Fut3>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+{
+ let f = Join3::new(future1, future2, future3);
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let d = async { 4 };
+/// let tuple = future::join4(a, b, c, d);
+///
+/// assert_eq!(tuple.await, (1, 2, 3, 4));
+/// # });
+/// ```
+pub fn join4<Fut1, Fut2, Fut3, Fut4>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+) -> Join4<Fut1, Fut2, Fut3, Fut4>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+ Fut4: Future,
+{
+ let f = Join4::new(future1, future2, future3, future4);
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let d = async { 4 };
+/// let e = async { 5 };
+/// let tuple = future::join5(a, b, c, d, e);
+///
+/// assert_eq!(tuple.await, (1, 2, 3, 4, 5));
+/// # });
+/// ```
+pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+ future5: Fut5,
+) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+ Fut4: Future,
+ Fut5: Future,
+{
+ let f = Join5::new(future1, future2, future3, future4, future5);
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output, Fut5::Output), _>(f)
+}
diff --git a/third_party/rust/futures-util/src/future/join_all.rs b/third_party/rust/futures-util/src/future/join_all.rs
new file mode 100644
index 0000000000..7dc159ba07
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/join_all.rs
@@ -0,0 +1,166 @@
+//! Definition of the `JoinAll` combinator, waiting for all of a list of futures
+//! to finish.
+
+use alloc::boxed::Box;
+use alloc::vec::Vec;
+use core::fmt;
+use core::future::Future;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+use super::{assert_future, MaybeDone};
+
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{Collect, FuturesOrdered, StreamExt};
+
+pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+ // Safety: `std` _could_ make this unsound if it were to decide Pin's
+ // invariants aren't required to transmit through slices. Otherwise this has
+ // the same safety as a normal field pin projection.
+ unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
+}
+
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+/// Future for the [`join_all`] function.
+pub struct JoinAll<F>
+where
+ F: Future,
+{
+ kind: JoinAllKind<F>,
+}
+
+#[cfg(not(futures_no_atomic_cas))]
+pub(crate) const SMALL: usize = 30;
+
+enum JoinAllKind<F>
+where
+ F: Future,
+{
+ Small {
+ elems: Pin<Box<[MaybeDone<F>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
+ },
+}
+
+impl<F> fmt::Debug for JoinAll<F>
+where
+ F: Future + fmt::Debug,
+ F::Output: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.kind {
+ JoinAllKind::Small { ref elems } => {
+ f.debug_struct("JoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
+ }
+}
+
+/// Creates a future which represents a collection of the outputs of the futures
+/// given.
+///
+/// The returned future will drive execution for all of its underlying futures,
+/// collecting the results into a destination `Vec<T>` in the same order as they
+/// were provided.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # See Also
+///
+/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+///
+/// Some examples for additional functionality provided by these are:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::join_all;
+///
+/// async fn foo(i: u32) -> u32 { i }
+///
+/// let futures = vec![foo(1), foo(2), foo(3)];
+///
+/// assert_eq!(join_all(futures).await, [1, 2, 3]);
+/// # });
+/// ```
+pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
+where
+ I: IntoIterator,
+ I::Item: Future,
+{
+ let iter = iter.into_iter();
+
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let kind =
+ JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() };
+
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
+
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let kind = match iter.size_hint().1 {
+ Some(max) if max <= SMALL => JoinAllKind::Small {
+ elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
+ };
+
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
+}
+
+impl<F> Future for JoinAll<F>
+where
+ F: Future,
+{
+ type Output = Vec<F::Output>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match &mut self.kind {
+ JoinAllKind::Small { elems } => {
+ let mut all_done = true;
+
+ for elem in iter_pin_mut(elems.as_mut()) {
+ if elem.poll(cx).is_pending() {
+ all_done = false;
+ }
+ }
+
+ if all_done {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let result =
+ iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
+ Poll::Ready(result)
+ } else {
+ Poll::Pending
+ }
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
+ }
+ }
+}
+
+impl<F: Future> FromIterator<F> for JoinAll<F> {
+ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
+ join_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/lazy.rs b/third_party/rust/futures-util/src/future/lazy.rs
new file mode 100644
index 0000000000..e9a8cf2fa9
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/lazy.rs
@@ -0,0 +1,60 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`lazy`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Lazy<F> {
+ f: Option<F>,
+}
+
+// safe because we never generate `Pin<&mut F>`
+impl<F> Unpin for Lazy<F> {}
+
+/// Creates a new future that allows delayed execution of a closure.
+///
+/// The provided closure is only run once the future is polled.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::lazy(|_| 1);
+/// assert_eq!(a.await, 1);
+///
+/// let b = future::lazy(|_| -> i32 {
+/// panic!("oh no!")
+/// });
+/// drop(b); // closure is never run
+/// # });
+/// ```
+pub fn lazy<F, R>(f: F) -> Lazy<F>
+where
+ F: FnOnce(&mut Context<'_>) -> R,
+{
+ assert_future::<R, _>(Lazy { f: Some(f) })
+}
+
+impl<F, R> FusedFuture for Lazy<F>
+where
+ F: FnOnce(&mut Context<'_>) -> R,
+{
+ fn is_terminated(&self) -> bool {
+ self.f.is_none()
+ }
+}
+
+impl<F, R> Future for Lazy<F>
+where
+ F: FnOnce(&mut Context<'_>) -> R,
+{
+ type Output = R;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> {
+ Poll::Ready((self.f.take().expect("Lazy polled after completion"))(cx))
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/maybe_done.rs b/third_party/rust/futures-util/src/future/maybe_done.rs
new file mode 100644
index 0000000000..26e6c27588
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/maybe_done.rs
@@ -0,0 +1,104 @@
+//! Definition of the MaybeDone combinator
+
+use super::assert_future;
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+
+/// A future that may have completed.
+///
+/// This is created by the [`maybe_done()`] function.
+#[derive(Debug)]
+pub enum MaybeDone<Fut: Future> {
+ /// A not-yet-completed future
+ Future(/* #[pin] */ Fut),
+ /// The output of the completed future
+ Done(Fut::Output),
+ /// The empty variant after the result of a [`MaybeDone`] has been
+ /// taken using the [`take_output`](MaybeDone::take_output) method.
+ Gone,
+}
+
+impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
+
+/// Wraps a future into a `MaybeDone`
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+/// use futures::pin_mut;
+///
+/// let future = future::maybe_done(async { 5 });
+/// pin_mut!(future);
+/// assert_eq!(future.as_mut().take_output(), None);
+/// let () = future.as_mut().await;
+/// assert_eq!(future.as_mut().take_output(), Some(5));
+/// assert_eq!(future.as_mut().take_output(), None);
+/// # });
+/// ```
+pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
+ assert_future::<(), _>(MaybeDone::Future(future))
+}
+
+impl<Fut: Future> MaybeDone<Fut> {
+ /// Returns an [`Option`] containing a mutable reference to the output of the future.
+ /// The output of this method will be [`Some`] if and only if the inner
+ /// future has been completed and [`take_output`](MaybeDone::take_output)
+ /// has not yet been called.
+ #[inline]
+ pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ MaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
+ }
+ }
+
+ /// Attempt to take the output of a `MaybeDone` without driving it
+ /// towards completion.
+ #[inline]
+ pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
+ match &*self {
+ Self::Done(_) => {}
+ Self::Future(_) | Self::Gone => return None,
+ }
+ unsafe {
+ match mem::replace(self.get_unchecked_mut(), Self::Gone) {
+ MaybeDone::Done(output) => Some(output),
+ _ => unreachable!(),
+ }
+ }
+ }
+}
+
+impl<Fut: Future> FusedFuture for MaybeDone<Fut> {
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Future(_) => false,
+ Self::Done(_) | Self::Gone => true,
+ }
+ }
+}
+
+impl<Fut: Future> Future for MaybeDone<Fut> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ MaybeDone::Future(f) => {
+ let res = ready!(Pin::new_unchecked(f).poll(cx));
+ self.set(Self::Done(res));
+ }
+ MaybeDone::Done(_) => {}
+ MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+ }
+ }
+ Poll::Ready(())
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/mod.rs b/third_party/rust/futures-util/src/future/mod.rs
new file mode 100644
index 0000000000..374e36512f
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/mod.rs
@@ -0,0 +1,131 @@
+//! Asynchronous values.
+//!
+//! This module contains:
+//!
+//! - The [`Future`] trait.
+//! - The [`FutureExt`] and [`TryFutureExt`] trait, which provides adapters for
+//! chaining and composing futures.
+//! - Top-level future combinators like [`lazy`](lazy()) which creates a future
+//! from a closure that defines its return value, and [`ready`](ready()),
+//! which constructs a future with an immediate defined value.
+
+#[doc(no_inline)]
+pub use core::future::Future;
+
+#[cfg(feature = "alloc")]
+pub use futures_core::future::{BoxFuture, LocalBoxFuture};
+pub use futures_core::future::{FusedFuture, TryFuture};
+pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
+
+// Extension traits and combinators
+#[allow(clippy::module_inception)]
+mod future;
+pub use self::future::{
+ Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, MapInto, NeverError, Then, UnitError,
+};
+
+#[deprecated(note = "This is now an alias for [Flatten](Flatten)")]
+pub use self::future::FlattenStream;
+
+#[cfg(feature = "std")]
+pub use self::future::CatchUnwind;
+
+#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+#[cfg(feature = "std")]
+pub use self::future::{Remote, RemoteHandle};
+
+#[cfg(feature = "std")]
+pub use self::future::{Shared, WeakShared};
+
+mod try_future;
+pub use self::try_future::{
+ AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto,
+ OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse,
+};
+
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+pub use self::try_future::FlattenSink;
+
+// Primitive futures
+
+mod lazy;
+pub use self::lazy::{lazy, Lazy};
+
+mod pending;
+pub use self::pending::{pending, Pending};
+
+mod maybe_done;
+pub use self::maybe_done::{maybe_done, MaybeDone};
+
+mod try_maybe_done;
+pub use self::try_maybe_done::{try_maybe_done, TryMaybeDone};
+
+mod option;
+pub use self::option::OptionFuture;
+
+mod poll_fn;
+pub use self::poll_fn::{poll_fn, PollFn};
+
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
+mod ready;
+pub use self::ready::{err, ok, ready, Ready};
+
+mod join;
+pub use self::join::{join, join3, join4, join5, Join, Join3, Join4, Join5};
+
+#[cfg(feature = "alloc")]
+mod join_all;
+#[cfg(feature = "alloc")]
+pub use self::join_all::{join_all, JoinAll};
+
+mod select;
+pub use self::select::{select, Select};
+
+#[cfg(feature = "alloc")]
+mod select_all;
+#[cfg(feature = "alloc")]
+pub use self::select_all::{select_all, SelectAll};
+
+mod try_join;
+pub use self::try_join::{
+ try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5,
+};
+
+#[cfg(feature = "alloc")]
+mod try_join_all;
+#[cfg(feature = "alloc")]
+pub use self::try_join_all::{try_join_all, TryJoinAll};
+
+mod try_select;
+pub use self::try_select::{try_select, TrySelect};
+
+#[cfg(feature = "alloc")]
+mod select_ok;
+#[cfg(feature = "alloc")]
+pub use self::select_ok::{select_ok, SelectOk};
+
+mod either;
+pub use self::either::Either;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod abortable;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use abortable::abortable;
+
+// Just a helper function to ensure the futures we're returning all have the
+// right implementations.
+pub(crate) fn assert_future<T, F>(future: F) -> F
+where
+ F: Future<Output = T>,
+{
+ future
+}
diff --git a/third_party/rust/futures-util/src/future/option.rs b/third_party/rust/futures-util/src/future/option.rs
new file mode 100644
index 0000000000..0bc377758a
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/option.rs
@@ -0,0 +1,64 @@
+//! Definition of the `Option` (optional step) combinator
+
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// A future representing a value which may or may not be present.
+ ///
+ /// Created by the [`From`] implementation for [`Option`](std::option::Option).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::OptionFuture;
+ ///
+ /// let mut a: OptionFuture<_> = Some(async { 123 }).into();
+ /// assert_eq!(a.await, Some(123));
+ ///
+ /// a = None.into();
+ /// assert_eq!(a.await, None);
+ /// # });
+ /// ```
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct OptionFuture<F> {
+ #[pin]
+ inner: Option<F>,
+ }
+}
+
+impl<F> Default for OptionFuture<F> {
+ fn default() -> Self {
+ Self { inner: None }
+ }
+}
+
+impl<F: Future> Future for OptionFuture<F> {
+ type Output = Option<F::Output>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.project().inner.as_pin_mut() {
+ Some(x) => x.poll(cx).map(Some),
+ None => Poll::Ready(None),
+ }
+ }
+}
+
+impl<F: FusedFuture> FusedFuture for OptionFuture<F> {
+ fn is_terminated(&self) -> bool {
+ match &self.inner {
+ Some(x) => x.is_terminated(),
+ None => true,
+ }
+ }
+}
+
+impl<T> From<Option<T>> for OptionFuture<T> {
+ fn from(option: Option<T>) -> Self {
+ Self { inner: option }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/pending.rs b/third_party/rust/futures-util/src/future/pending.rs
new file mode 100644
index 0000000000..b8e28686e1
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/pending.rs
@@ -0,0 +1,55 @@
+use super::assert_future;
+use core::marker;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`pending()`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Pending<T> {
+ _data: marker::PhantomData<T>,
+}
+
+impl<T> FusedFuture for Pending<T> {
+ fn is_terminated(&self) -> bool {
+ true
+ }
+}
+
+/// Creates a future which never resolves, representing a computation that never
+/// finishes.
+///
+/// The returned future will forever return [`Poll::Pending`].
+///
+/// # Examples
+///
+/// ```ignore
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let future = future::pending();
+/// let () = future.await;
+/// unreachable!();
+/// # });
+/// ```
+#[cfg_attr(docsrs, doc(alias = "never"))]
+pub fn pending<T>() -> Pending<T> {
+ assert_future::<T, _>(Pending { _data: marker::PhantomData })
+}
+
+impl<T> Future for Pending<T> {
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
+ Poll::Pending
+ }
+}
+
+impl<T> Unpin for Pending<T> {}
+
+impl<T> Clone for Pending<T> {
+ fn clone(&self) -> Self {
+ pending()
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/poll_fn.rs b/third_party/rust/futures-util/src/future/poll_fn.rs
new file mode 100644
index 0000000000..19311570b5
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/poll_fn.rs
@@ -0,0 +1,58 @@
+//! Definition of the `PollFn` adapter combinator
+
+use super::assert_future;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`poll_fn`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct PollFn<F> {
+ f: F,
+}
+
+impl<F> Unpin for PollFn<F> {}
+
+/// Creates a new future wrapping around a function returning [`Poll`].
+///
+/// Polling the returned future delegates to the wrapped function.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::poll_fn;
+/// use futures::task::{Context, Poll};
+///
+/// fn read_line(_cx: &mut Context<'_>) -> Poll<String> {
+/// Poll::Ready("Hello, World!".into())
+/// }
+///
+/// let read_future = poll_fn(read_line);
+/// assert_eq!(read_future.await, "Hello, World!".to_owned());
+/// # });
+/// ```
+pub fn poll_fn<T, F>(f: F) -> PollFn<F>
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
+{
+ assert_future::<T, _>(PollFn { f })
+}
+
+impl<F> fmt::Debug for PollFn<F> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PollFn").finish()
+ }
+}
+
+impl<T, F> Future for PollFn<F>
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
+{
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ (&mut self.f)(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/poll_immediate.rs b/third_party/rust/futures-util/src/future/poll_immediate.rs
new file mode 100644
index 0000000000..5ae555c73e
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/poll_immediate.rs
@@ -0,0 +1,126 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::{FusedFuture, Future, Stream};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`poll_immediate`](poll_immediate()) function.
+ ///
+ /// It will never return [Poll::Pending](core::task::Poll::Pending)
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct PollImmediate<T> {
+ #[pin]
+ future: Option<T>
+ }
+}
+
+impl<T, F> Future for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Output = Option<T>;
+
+ #[inline]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let mut this = self.project();
+ let inner =
+ this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
+ match inner.poll(cx) {
+ Poll::Ready(t) => {
+ this.future.set(None);
+ Poll::Ready(Some(t))
+ }
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+}
+
+impl<T: Future> FusedFuture for PollImmediate<T> {
+ fn is_terminated(&self) -> bool {
+ self.future.is_none()
+ }
+}
+
+/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
+/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
+/// so polling it in a tight loop is worse than using a blocking synchronous function.
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::task::Poll;
+/// use futures::{StreamExt, future, pin_mut};
+/// use future::FusedFuture;
+///
+/// let f = async { 1_u32 };
+/// pin_mut!(f);
+/// let mut r = future::poll_immediate(f);
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+///
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// let mut p = future::poll_immediate(f);
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert!(!p.is_terminated());
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert!(p.is_terminated());
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+impl<T, F> Stream for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Item = Poll<T>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ match this.future.as_mut().as_pin_mut() {
+ // inner is gone, so we can signal that the stream is closed.
+ None => Poll::Ready(None),
+ Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
+ this.future.set(None);
+ t
+ }))),
+ }
+ }
+}
+
+/// Creates a future that is immediately ready with an Option of a value.
+/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
+///
+/// # Caution
+///
+/// When consuming the future by this function, note the following:
+///
+/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
+/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let r = future::poll_immediate(async { 1_u32 });
+/// assert_eq!(r.await, Some(1));
+///
+/// let p = future::poll_immediate(future::pending::<i32>());
+/// assert_eq!(p.await, None);
+/// # });
+/// ```
+///
+/// ### Reusing a future
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{future, pin_mut};
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// assert_eq!(None, future::poll_immediate(&mut f).await);
+/// assert_eq!(42, f.await);
+/// # });
+/// ```
+pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
+ assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
+}
diff --git a/third_party/rust/futures-util/src/future/ready.rs b/third_party/rust/futures-util/src/future/ready.rs
new file mode 100644
index 0000000000..e3d791b3cf
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/ready.rs
@@ -0,0 +1,82 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`ready`](ready()) function.
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Ready<T>(Option<T>);
+
+impl<T> Ready<T> {
+ /// Unwraps the value from this immediately ready future.
+ #[inline]
+ pub fn into_inner(mut self) -> T {
+ self.0.take().unwrap()
+ }
+}
+
+impl<T> Unpin for Ready<T> {}
+
+impl<T> FusedFuture for Ready<T> {
+ fn is_terminated(&self) -> bool {
+ self.0.is_none()
+ }
+}
+
+impl<T> Future for Ready<T> {
+ type Output = T;
+
+ #[inline]
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
+ Poll::Ready(self.0.take().expect("Ready polled after completion"))
+ }
+}
+
+/// Creates a future that is immediately ready with a value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(1);
+/// assert_eq!(a.await, 1);
+/// # });
+/// ```
+pub fn ready<T>(t: T) -> Ready<T> {
+ assert_future::<T, _>(Ready(Some(t)))
+}
+
+/// Create a future that is immediately ready with a success value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ok::<i32, i32>(1);
+/// assert_eq!(a.await, Ok(1));
+/// # });
+/// ```
+pub fn ok<T, E>(t: T) -> Ready<Result<T, E>> {
+ Ready(Some(Ok(t)))
+}
+
+/// Create a future that is immediately ready with an error value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::err::<i32, i32>(1);
+/// assert_eq!(a.await, Err(1));
+/// # });
+/// ```
+pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
+ Ready(Some(Err(err)))
+}
diff --git a/third_party/rust/futures-util/src/future/select.rs b/third_party/rust/futures-util/src/future/select.rs
new file mode 100644
index 0000000000..e693a30b00
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select.rs
@@ -0,0 +1,125 @@
+use super::assert_future;
+use crate::future::{Either, FutureExt};
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`select()`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct Select<A, B> {
+ inner: Option<(A, B)>,
+}
+
+impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
+
+/// Waits for either one of two differently-typed futures to complete.
+///
+/// This function will return a new future which awaits for either one of both
+/// futures to complete. The returned future will finish with both the value
+/// resolved and a future representing the completion of the other work.
+///
+/// Note that this function consumes the receiving futures and returns a
+/// wrapped version of them.
+///
+/// Also note that if both this and the second future have the same
+/// output type you can use the `Either::factor_first` method to
+/// conveniently extract out the value at the end.
+///
+/// # Examples
+///
+/// A simple example
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{
+/// pin_mut,
+/// future::Either,
+/// future::self,
+/// };
+///
+/// // These two futures have different types even though their outputs have the same type.
+/// let future1 = async {
+/// future::pending::<()>().await; // will never finish
+/// 1
+/// };
+/// let future2 = async {
+/// future::ready(2).await
+/// };
+///
+/// // 'select' requires Future + Unpin bounds
+/// pin_mut!(future1);
+/// pin_mut!(future2);
+///
+/// let value = match future::select(future1, future2).await {
+/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
+/// // `_` represents `future2`
+/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
+/// // `_` represents `future1`
+/// };
+///
+/// assert!(value == 2);
+/// # });
+/// ```
+///
+/// A more complex example
+///
+/// ```
+/// use futures::future::{self, Either, Future, FutureExt};
+///
+/// // A poor-man's join implemented on top of select
+///
+/// fn join<A, B>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)>
+/// where A: Future + Unpin,
+/// B: Future + Unpin,
+/// {
+/// future::select(a, b).then(|either| {
+/// match either {
+/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(),
+/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(),
+/// }
+/// })
+/// }
+/// ```
+pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
+{
+ assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
+ inner: Some((future1, future2)),
+ })
+}
+
+impl<A, B> Future for Select<A, B>
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
+{
+ type Output = Either<(A::Output, B), (B::Output, A)>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
+
+ if let Poll::Ready(val) = a.poll_unpin(cx) {
+ return Poll::Ready(Either::Left((val, b)));
+ }
+
+ if let Poll::Ready(val) = b.poll_unpin(cx) {
+ return Poll::Ready(Either::Right((val, a)));
+ }
+
+ self.inner = Some((a, b));
+ Poll::Pending
+ }
+}
+
+impl<A, B> FusedFuture for Select<A, B>
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/select_all.rs b/third_party/rust/futures-util/src/future/select_all.rs
new file mode 100644
index 0000000000..0a51d0da6c
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select_all.rs
@@ -0,0 +1,75 @@
+use super::assert_future;
+use crate::future::FutureExt;
+use alloc::vec::Vec;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`select_all`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct SelectAll<Fut> {
+ inner: Vec<Fut>,
+}
+
+impl<Fut: Unpin> Unpin for SelectAll<Fut> {}
+
+/// Creates a new future which will select over a list of futures.
+///
+/// The returned future will wait for any future within `iter` to be ready. Upon
+/// completion the item resolved will be returned, along with the index of the
+/// future that was ready and the list of all the remaining futures.
+///
+/// There are no guarantees provided on the order of the list with the remaining
+/// futures. They might be swapped around, reversed, or completely random.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # Panics
+///
+/// This function will panic if the iterator specified contains no items.
+pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
+where
+ I: IntoIterator,
+ I::Item: Future + Unpin,
+{
+ let ret = SelectAll { inner: iter.into_iter().collect() };
+ assert!(!ret.inner.is_empty());
+ assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret)
+}
+
+impl<Fut> SelectAll<Fut> {
+ /// Consumes this combinator, returning the underlying futures.
+ pub fn into_inner(self) -> Vec<Fut> {
+ self.inner
+ }
+}
+
+impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
+ type Output = (Fut::Output, usize, Vec<Fut>);
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) {
+ Poll::Pending => None,
+ Poll::Ready(e) => Some((i, e)),
+ });
+ match item {
+ Some((idx, res)) => {
+ #[allow(clippy::let_underscore_future)]
+ let _ = self.inner.swap_remove(idx);
+ let rest = mem::take(&mut self.inner);
+ Poll::Ready((res, idx, rest))
+ }
+ None => Poll::Pending,
+ }
+ }
+}
+
+impl<Fut: Future + Unpin> FromIterator<Fut> for SelectAll<Fut> {
+ fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
+ select_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/select_ok.rs b/third_party/rust/futures-util/src/future/select_ok.rs
new file mode 100644
index 0000000000..5d5579930b
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select_ok.rs
@@ -0,0 +1,85 @@
+use super::assert_future;
+use crate::future::TryFutureExt;
+use alloc::vec::Vec;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`select_ok`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct SelectOk<Fut> {
+ inner: Vec<Fut>,
+}
+
+impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
+
+/// Creates a new future which will select the first successful future over a list of futures.
+///
+/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike
+/// `select_all`, this will only return the first successful completion, or the last
+/// failure. This is useful in contexts where any success is desired and failures
+/// are ignored, unless all the futures fail.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # Panics
+///
+/// This function will panic if the iterator specified contains no items.
+pub fn select_ok<I>(iter: I) -> SelectOk<I::Item>
+where
+ I: IntoIterator,
+ I::Item: TryFuture + Unpin,
+{
+ let ret = SelectOk { inner: iter.into_iter().collect() };
+ assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
+ assert_future::<
+ Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
+ _,
+ >(ret)
+}
+
+impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
+ type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // loop until we've either exhausted all errors, a success was hit, or nothing is ready
+ loop {
+ let item =
+ self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
+ Poll::Pending => None,
+ Poll::Ready(e) => Some((i, e)),
+ });
+ match item {
+ Some((idx, res)) => {
+ // always remove Ok or Err, if it's not the last Err continue looping
+ drop(self.inner.remove(idx));
+ match res {
+ Ok(e) => {
+ let rest = mem::take(&mut self.inner);
+ return Poll::Ready(Ok((e, rest)));
+ }
+ Err(e) => {
+ if self.inner.is_empty() {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+ None => {
+ // based on the filter above, nothing is ready, return
+ return Poll::Pending;
+ }
+ }
+ }
+ }
+}
+
+impl<Fut: TryFuture + Unpin> FromIterator<Fut> for SelectOk<Fut> {
+ fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
+ select_ok(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/into_future.rs b/third_party/rust/futures-util/src/future/try_future/into_future.rs
new file mode 100644
index 0000000000..9f093d0e2e
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/into_future.rs
@@ -0,0 +1,36 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`into_future`](super::TryFutureExt::into_future) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct IntoFuture<Fut> {
+ #[pin]
+ future: Fut,
+ }
+}
+
+impl<Fut> IntoFuture<Fut> {
+ #[inline]
+ pub(crate) fn new(future: Fut) -> Self {
+ Self { future }
+ }
+}
+
+impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.future.is_terminated()
+ }
+}
+
+impl<Fut: TryFuture> Future for IntoFuture<Fut> {
+ type Output = Result<Fut::Ok, Fut::Error>;
+
+ #[inline]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.project().future.try_poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/mod.rs b/third_party/rust/futures-util/src/future/try_future/mod.rs
new file mode 100644
index 0000000000..e5bc700714
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/mod.rs
@@ -0,0 +1,625 @@
+//! Futures
+//!
+//! This module contains a number of functions for working with `Future`s,
+//! including the `FutureExt` trait which adds methods to `Future` types.
+
+#[cfg(feature = "compat")]
+use crate::compat::Compat;
+use core::pin::Pin;
+use futures_core::{
+ future::TryFuture,
+ stream::TryStream,
+ task::{Context, Poll},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+use crate::fns::{
+ inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn,
+ unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn,
+ UnwrapOrElseFn,
+};
+use crate::future::{assert_future, Inspect, Map};
+use crate::stream::assert_stream;
+
+// Combinators
+mod into_future;
+mod try_flatten;
+mod try_flatten_err;
+
+delegate_all!(
+ /// Future for the [`try_flatten`](TryFutureExt::try_flatten) method.
+ TryFlatten<Fut1, Fut2>(
+ try_flatten::TryFlatten<Fut1, Fut2>
+ ): Debug + Future + FusedFuture + New[|x: Fut1| try_flatten::TryFlatten::new(x)]
+);
+
+delegate_all!(
+ /// Future for the [`try_flatten_err`](TryFutureExt::try_flatten_err) method.
+ TryFlattenErr<Fut1, Fut2>(
+ try_flatten_err::TryFlattenErr<Fut1, Fut2>
+ ): Debug + Future + FusedFuture + New[|x: Fut1| try_flatten_err::TryFlattenErr::new(x)]
+);
+
+delegate_all!(
+ /// Future for the [`try_flatten_stream`](TryFutureExt::try_flatten_stream) method.
+ TryFlattenStream<Fut>(
+ try_flatten::TryFlatten<Fut, Fut::Ok>
+ ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)]
+ where Fut: TryFuture
+);
+
+#[cfg(feature = "sink")]
+delegate_all!(
+ /// Sink for the [`flatten_sink`](TryFutureExt::flatten_sink) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+ FlattenSink<Fut, Si>(
+ try_flatten::TryFlatten<Fut, Si>
+ ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)]
+);
+
+delegate_all!(
+ /// Future for the [`and_then`](TryFutureExt::and_then) method.
+ AndThen<Fut1, Fut2, F>(
+ TryFlatten<MapOk<Fut1, F>, Fut2>
+ ): Debug + Future + FusedFuture + New[|x: Fut1, f: F| TryFlatten::new(MapOk::new(x, f))]
+);
+
+delegate_all!(
+ /// Future for the [`or_else`](TryFutureExt::or_else) method.
+ OrElse<Fut1, Fut2, F>(
+ TryFlattenErr<MapErr<Fut1, F>, Fut2>
+ ): Debug + Future + FusedFuture + New[|x: Fut1, f: F| TryFlattenErr::new(MapErr::new(x, f))]
+);
+
+delegate_all!(
+ /// Future for the [`err_into`](TryFutureExt::err_into) method.
+ ErrInto<Fut, E>(
+ MapErr<Fut, IntoFn<E>>
+ ): Debug + Future + FusedFuture + New[|x: Fut| MapErr::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Future for the [`ok_into`](TryFutureExt::ok_into) method.
+ OkInto<Fut, E>(
+ MapOk<Fut, IntoFn<E>>
+ ): Debug + Future + FusedFuture + New[|x: Fut| MapOk::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method.
+ InspectOk<Fut, F>(
+ Inspect<IntoFuture<Fut>, InspectOkFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method.
+ InspectErr<Fut, F>(
+ Inspect<IntoFuture<Fut>, InspectErrFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_err_fn(f))]
+);
+
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::into_future::IntoFuture;
+
+delegate_all!(
+ /// Future for the [`map_ok`](TryFutureExt::map_ok) method.
+ MapOk<Fut, F>(
+ Map<IntoFuture<Fut>, MapOkFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Future for the [`map_err`](TryFutureExt::map_err) method.
+ MapErr<Fut, F>(
+ Map<IntoFuture<Fut>, MapErrFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_err_fn(f))]
+);
+
+delegate_all!(
+ /// Future for the [`map_ok_or_else`](TryFutureExt::map_ok_or_else) method.
+ MapOkOrElse<Fut, F, G>(
+ Map<IntoFuture<Fut>, MapOkOrElseFn<F, G>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(IntoFuture::new(x), map_ok_or_else_fn(f, g))]
+);
+
+delegate_all!(
+ /// Future for the [`unwrap_or_else`](TryFutureExt::unwrap_or_else) method.
+ UnwrapOrElse<Fut, F>(
+ Map<IntoFuture<Fut>, UnwrapOrElseFn<F>>
+ ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), unwrap_or_else_fn(f))]
+);
+
+impl<Fut: ?Sized + TryFuture> TryFutureExt for Fut {}
+
+/// Adapters specific to [`Result`]-returning futures
+pub trait TryFutureExt: TryFuture {
+ /// Flattens the execution of this future when the successful result of this
+ /// future is a [`Sink`].
+ ///
+ /// This can be useful when sink initialization is deferred, and it is
+ /// convenient to work with that sink as if the sink was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::{Future, TryFutureExt};
+ /// use futures::sink::Sink;
+ /// # use futures::channel::mpsc::{self, SendError};
+ /// # type T = i32;
+ /// # type E = SendError;
+ ///
+ /// fn make_sink_async() -> impl Future<Output = Result<
+ /// impl Sink<T, Error = E>,
+ /// E,
+ /// >> { // ... }
+ /// # let (tx, _rx) = mpsc::unbounded::<i32>();
+ /// # futures::future::ready(Ok(tx))
+ /// # }
+ /// fn take_sink(sink: impl Sink<T, Error = E>) { /* ... */ }
+ ///
+ /// let fut = make_sink_async();
+ /// take_sink(fut.flatten_sink())
+ /// ```
+ #[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+ fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
+ where
+ Self::Ok: Sink<Item, Error = Self::Error>,
+ Self: Sized,
+ {
+ crate::sink::assert_sink::<Item, Self::Error, _>(FlattenSink::new(self))
+ }
+
+ /// Maps this future's success value to a different value.
+ ///
+ /// This method can be used to change the [`Ok`](TryFuture::Ok) type of the
+ /// future into a different type. It is similar to the [`Result::map`]
+ /// method. You can use this method to chain along a computation once the
+ /// future has been resolved.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.map_ok(|x| x + 3);
+ /// assert_eq!(future.await, Ok(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`map_ok`](TryFutureExt::map_ok) on an errored future has no
+ /// effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.map_ok(|x| x + 3);
+ /// assert_eq!(future.await, Err(1));
+ /// # });
+ /// ```
+ fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
+ where
+ F: FnOnce(Self::Ok) -> T,
+ Self: Sized,
+ {
+ assert_future::<Result<T, Self::Error>, _>(MapOk::new(self, f))
+ }
+
+ /// Maps this future's success value to a different value, and permits for error handling resulting in the same type.
+ ///
+ /// This method can be used to coalesce your [`Ok`](TryFuture::Ok) type and [`Error`](TryFuture::Error) into another type,
+ /// where that type is the same for both outcomes.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// The provided closure `e` will only be called if this future is resolved
+ /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(5) };
+ /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
+ /// assert_eq!(future.await, 8);
+ ///
+ /// let future = async { Err::<i32, i32>(5) };
+ /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
+ /// assert_eq!(future.await, 10);
+ /// # });
+ /// ```
+ ///
+ fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
+ where
+ F: FnOnce(Self::Ok) -> T,
+ E: FnOnce(Self::Error) -> T,
+ Self: Sized,
+ {
+ assert_future::<T, _>(MapOkOrElse::new(self, f, e))
+ }
+
+ /// Maps this future's error value to a different value.
+ ///
+ /// This method can be used to change the [`Error`](TryFuture::Error) type
+ /// of the future into a different type. It is similar to the
+ /// [`Result::map_err`] method. You can use this method for example to
+ /// ensure that futures have the same [`Error`](TryFuture::Error) type when
+ /// using [`select!`] or [`join!`].
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.map_err(|x| x + 3);
+ /// assert_eq!(future.await, Err(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`map_err`](TryFutureExt::map_err) on a successful future has
+ /// no effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.map_err(|x| x + 3);
+ /// assert_eq!(future.await, Ok(1));
+ /// # });
+ /// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
+ fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
+ where
+ F: FnOnce(Self::Error) -> E,
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
+ }
+
+ /// Maps this future's [`Error`](TryFuture::Error) to a new error type
+ /// using the [`Into`](std::convert::Into) trait.
+ ///
+ /// This method does for futures what the `?`-operator does for
+ /// [`Result`]: It lets the compiler infer the type of the resulting
+ /// error. Just as [`map_err`](TryFutureExt::map_err), this is useful for
+ /// example to ensure that futures have the same [`Error`](TryFuture::Error)
+ /// type when using [`select!`] or [`join!`].
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future_err_u8 = async { Err::<(), u8>(1) };
+ /// let future_err_i32 = future_err_u8.err_into::<i32>();
+ /// # });
+ /// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
+ fn err_into<E>(self) -> ErrInto<Self, E>
+ where
+ Self: Sized,
+ Self::Error: Into<E>,
+ {
+ assert_future::<Result<Self::Ok, E>, _>(ErrInto::new(self))
+ }
+
+ /// Maps this future's [`Ok`](TryFuture::Ok) to a new type
+ /// using the [`Into`](std::convert::Into) trait.
+ fn ok_into<U>(self) -> OkInto<Self, U>
+ where
+ Self: Sized,
+ Self::Ok: Into<U>,
+ {
+ assert_future::<Result<U, Self::Error>, _>(OkInto::new(self))
+ }
+
+ /// Executes another future after this one resolves successfully. The
+ /// success value is passed to a closure to create this subsequent future.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If this future resolves to an [`Err`], panics, or is
+ /// dropped, then the provided closure will never be invoked. The
+ /// [`Error`](TryFuture::Error) type of this future and the future
+ /// returned by `f` have to match.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.and_then(|x| async move { Ok::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Ok(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`and_then`](TryFutureExt::and_then) on an errored future has no
+ /// effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.and_then(|x| async move { Err::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Err(1));
+ /// # });
+ /// ```
+ fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
+ where
+ F: FnOnce(Self::Ok) -> Fut,
+ Fut: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_future::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
+ }
+
+ /// Executes another future if this one resolves to an error. The
+ /// error value is passed to a closure to create this subsequent future.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Err`]. If this future resolves to an [`Ok`], panics, or is
+ /// dropped, then the provided closure will never be invoked. The
+ /// [`Ok`](TryFuture::Ok) type of this future and the future returned by `f`
+ /// have to match.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.or_else(|x| async move { Err::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Err(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`or_else`](TryFutureExt::or_else) on a successful future has
+ /// no effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.or_else(|x| async move { Ok::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Ok(1));
+ /// # });
+ /// ```
+ fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
+ where
+ F: FnOnce(Self::Error) -> Fut,
+ Fut: TryFuture<Ok = Self::Ok>,
+ Self: Sized,
+ {
+ assert_future::<Result<Fut::Ok, Fut::Error>, _>(OrElse::new(self, f))
+ }
+
+ /// Do something with the success value of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect_ok`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ ///
+ /// let future = async { Ok::<_, ()>(1) };
+ /// let new_future = future.inspect_ok(|&x| println!("about to resolve: {}", x));
+ /// assert_eq!(new_future.await, Ok(1));
+ /// # });
+ /// ```
+ fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
+ where
+ F: FnOnce(&Self::Ok),
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
+ }
+
+ /// Do something with the error value of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect_err`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ ///
+ /// let future = async { Err::<(), _>(1) };
+ /// let new_future = future.inspect_err(|&x| println!("about to error: {}", x));
+ /// assert_eq!(new_future.await, Err(1));
+ /// # });
+ /// ```
+ fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
+ where
+ F: FnOnce(&Self::Error),
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
+ }
+
+ /// Flatten the execution of this future when the successful result of this
+ /// future is another future.
+ ///
+ /// This is equivalent to `future.and_then(|x| x)`.
+ fn try_flatten(self) -> TryFlatten<Self, Self::Ok>
+ where
+ Self::Ok: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_future::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryFlatten::new(self))
+ }
+
+ /// Flatten the execution of this future when the successful result of this
+ /// future is a stream.
+ ///
+ /// This can be useful when stream initialization is deferred, and it is
+ /// convenient to work with that stream as if stream was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let stream_items = vec![17, 18, 19].into_iter().map(Ok);
+ /// let future_of_a_stream = async { Ok::<_, ()>(stream::iter(stream_items)) };
+ ///
+ /// let stream = future_of_a_stream.try_flatten_stream();
+ /// let list = stream.try_collect::<Vec<_>>().await;
+ /// assert_eq!(list, Ok(vec![17, 18, 19]));
+ /// # });
+ /// ```
+ fn try_flatten_stream(self) -> TryFlattenStream<Self>
+ where
+ Self::Ok: TryStream<Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new(
+ self,
+ ))
+ }
+
+ /// Unwraps this future's output, producing a future with this future's
+ /// [`Ok`](TryFuture::Ok) type as its
+ /// [`Output`](std::future::Future::Output) type.
+ ///
+ /// If this future is resolved successfully, the returned future will
+ /// contain the original future's success value as output. Otherwise, the
+ /// closure `f` is called with the error value to produce an alternate
+ /// success value.
+ ///
+ /// This method is similar to the [`Result::unwrap_or_else`] method.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<(), &str>("Boom!") };
+ /// let future = future.unwrap_or_else(|_| ());
+ /// assert_eq!(future.await, ());
+ /// # });
+ /// ```
+ fn unwrap_or_else<F>(self, f: F) -> UnwrapOrElse<Self, F>
+ where
+ Self: Sized,
+ F: FnOnce(Self::Error) -> Self::Ok,
+ {
+ assert_future::<Self::Ok, _>(UnwrapOrElse::new(self, f))
+ }
+
+ /// Wraps a [`TryFuture`] into a future compatible with libraries using
+ /// futures 0.1 future definitions. Requires the `compat` feature to enable.
+ #[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
+ fn compat(self) -> Compat<Self>
+ where
+ Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+
+ /// Wraps a [`TryFuture`] into a type that implements
+ /// [`Future`](std::future::Future).
+ ///
+ /// [`TryFuture`]s currently do not implement the
+ /// [`Future`](std::future::Future) trait due to limitations of the
+ /// compiler.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::{Future, TryFuture, TryFutureExt};
+ ///
+ /// # type T = i32;
+ /// # type E = ();
+ /// fn make_try_future() -> impl TryFuture<Ok = T, Error = E> { // ... }
+ /// # async { Ok::<i32, ()>(1) }
+ /// # }
+ /// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ }
+ ///
+ /// take_future(make_try_future().into_future());
+ /// ```
+ fn into_future(self) -> IntoFuture<Self>
+ where
+ Self: Sized,
+ {
+ assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self))
+ }
+
+ /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
+ /// future types.
+ fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).try_poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten.rs
new file mode 100644
index 0000000000..1ce4559ac2
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/try_flatten.rs
@@ -0,0 +1,162 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ #[project = TryFlattenProj]
+ #[derive(Debug)]
+ pub enum TryFlatten<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
+}
+
+impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
+ pub(crate) fn new(future: Fut1) -> Self {
+ Self::First { f: future }
+ }
+}
+
+impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryFuture<Error = Fut::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Empty => true,
+ _ => false,
+ }
+ }
+}
+
+impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryFuture<Error = Fut::Error>,
+{
+ type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Err(e);
+ }
+ },
+ TryFlattenProj::Second { f } => {
+ let output = ready!(f.try_poll(cx));
+ self.set(Self::Empty);
+ break output;
+ }
+ TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
+ }
+ })
+ }
+}
+
+impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Empty => true,
+ _ => false,
+ }
+ }
+}
+
+impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
+{
+ type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Some(Err(e));
+ }
+ },
+ TryFlattenProj::Second { f } => {
+ let output = ready!(f.try_poll_next(cx));
+ if output.is_none() {
+ self.set(Self::Empty);
+ }
+ break output;
+ }
+ TryFlattenProj::Empty => break None,
+ }
+ })
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+ Fut::Ok: Sink<Item, Error = Fut::Error>,
+{
+ type Error = Fut::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Err(e);
+ }
+ },
+ TryFlattenProj::Second { f } => {
+ break ready!(f.poll_ready(cx));
+ }
+ TryFlattenProj::Empty => panic!("poll_ready called after eof"),
+ }
+ })
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ match self.project() {
+ TryFlattenProj::First { .. } => panic!("poll_ready not called first"),
+ TryFlattenProj::Second { f } => f.start_send(item),
+ TryFlattenProj::Empty => panic!("start_send called after eof"),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.project() {
+ TryFlattenProj::First { .. } => Poll::Ready(Ok(())),
+ TryFlattenProj::Second { f } => f.poll_flush(cx),
+ TryFlattenProj::Empty => panic!("poll_flush called after eof"),
+ }
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let res = match self.as_mut().project() {
+ TryFlattenProj::Second { f } => f.poll_close(cx),
+ _ => Poll::Ready(Ok(())),
+ };
+ if res.is_ready() {
+ self.set(Self::Empty);
+ }
+ res
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs
new file mode 100644
index 0000000000..39b7d9f5f6
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/try_flatten_err.rs
@@ -0,0 +1,62 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ #[project = TryFlattenErrProj]
+ #[derive(Debug)]
+ pub enum TryFlattenErr<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
+}
+
+impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> {
+ pub(crate) fn new(future: Fut1) -> Self {
+ Self::First { f: future }
+ }
+}
+
+impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error>
+where
+ Fut: TryFuture,
+ Fut::Error: TryFuture<Ok = Fut::Ok>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Empty => true,
+ _ => false,
+ }
+ }
+}
+
+impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
+where
+ Fut: TryFuture,
+ Fut::Error: TryFuture<Ok = Fut::Ok>,
+{
+ type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Poll::Ready(loop {
+ match self.as_mut().project() {
+ TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) {
+ Err(f) => self.set(Self::Second { f }),
+ Ok(e) => {
+ self.set(Self::Empty);
+ break Ok(e);
+ }
+ },
+ TryFlattenErrProj::Second { f } => {
+ let output = ready!(f.try_poll(cx));
+ self.set(Self::Empty);
+ break output;
+ }
+ TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"),
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_join.rs b/third_party/rust/futures-util/src/future/try_join.rs
new file mode 100644
index 0000000000..6af1f0ccbf
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_join.rs
@@ -0,0 +1,256 @@
+#![allow(non_snake_case)]
+
+use crate::future::{assert_future, try_maybe_done, TryMaybeDone};
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+macro_rules! generate {
+ ($(
+ $(#[$doc:meta])*
+ ($Join:ident, <Fut1, $($Fut:ident),*>),
+ )*) => ($(
+ pin_project! {
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> {
+ #[pin] Fut1: TryMaybeDone<Fut1>,
+ $(#[pin] $Fut: TryMaybeDone<$Fut>,)*
+ }
+ }
+
+ impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture + fmt::Debug,
+ Fut1::Ok: fmt::Debug,
+ Fut1::Error: fmt::Debug,
+ $(
+ $Fut: TryFuture + fmt::Debug,
+ $Fut::Ok: fmt::Debug,
+ $Fut::Error: fmt::Debug,
+ )*
+ {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct(stringify!($Join))
+ .field("Fut1", &self.Fut1)
+ $(.field(stringify!($Fut), &self.$Fut))*
+ .finish()
+ }
+ }
+
+ impl<Fut1, $($Fut),*> $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture,
+ $(
+ $Fut: TryFuture<Error=Fut1::Error>
+ ),*
+ {
+ fn new(Fut1: Fut1, $($Fut: $Fut),*) -> Self {
+ Self {
+ Fut1: try_maybe_done(Fut1),
+ $($Fut: try_maybe_done($Fut)),*
+ }
+ }
+ }
+
+ impl<Fut1, $($Fut),*> Future for $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture,
+ $(
+ $Fut: TryFuture<Error=Fut1::Error>
+ ),*
+ {
+ type Output = Result<(Fut1::Ok, $($Fut::Ok),*), Fut1::Error>;
+
+ fn poll(
+ self: Pin<&mut Self>, cx: &mut Context<'_>
+ ) -> Poll<Self::Output> {
+ let mut all_done = true;
+ let mut futures = self.project();
+ all_done &= futures.Fut1.as_mut().poll(cx)?.is_ready();
+ $(
+ all_done &= futures.$Fut.as_mut().poll(cx)?.is_ready();
+ )*
+
+ if all_done {
+ Poll::Ready(Ok((
+ futures.Fut1.take_output().unwrap(),
+ $(
+ futures.$Fut.take_output().unwrap()
+ ),*
+ )))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+ )*)
+}
+
+generate! {
+ /// Future for the [`try_join`](try_join()) function.
+ (TryJoin, <Fut1, Fut2>),
+
+ /// Future for the [`try_join3`] function.
+ (TryJoin3, <Fut1, Fut2, Fut3>),
+
+ /// Future for the [`try_join4`] function.
+ (TryJoin4, <Fut1, Fut2, Fut3, Fut4>),
+
+ /// Future for the [`try_join5`] function.
+ (TryJoin5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
+}
+
+/// Joins the result of two futures, waiting for them both to complete or
+/// for one to produce an error.
+///
+/// This function will return a new future which awaits both futures to
+/// complete. If successful, the returned future will finish with a tuple of
+/// both results. If unsuccessful, it will complete with the first error
+/// encountered.
+///
+/// Note that this function consumes the passed futures and returns a
+/// wrapped version of it.
+///
+/// # Examples
+///
+/// When used on multiple futures that return [`Ok`], `try_join` will return
+/// [`Ok`] of a tuple of the values:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let pair = future::try_join(a, b);
+///
+/// assert_eq!(pair.await, Ok((1, 2)));
+/// # });
+/// ```
+///
+/// If one of the futures resolves to an error, `try_join` will return
+/// that error:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Err::<i32, i32>(2));
+/// let pair = future::try_join(a, b);
+///
+/// assert_eq!(pair.await, Err(2));
+/// # });
+/// ```
+pub fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+{
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok), Fut1::Error>, _>(TryJoin::new(future1, future2))
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let tuple = future::try_join3(a, b, c);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3)));
+/// # });
+/// ```
+pub fn try_join3<Fut1, Fut2, Fut3>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+) -> TryJoin3<Fut1, Fut2, Fut3>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+{
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok), Fut1::Error>, _>(TryJoin3::new(
+ future1, future2, future3,
+ ))
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let d = future::ready(Ok::<i32, i32>(4));
+/// let tuple = future::try_join4(a, b, c, d);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3, 4)));
+/// # });
+/// ```
+pub fn try_join4<Fut1, Fut2, Fut3, Fut4>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+) -> TryJoin4<Fut1, Fut2, Fut3, Fut4>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+ Fut4: TryFuture<Error = Fut1::Error>,
+{
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok), Fut1::Error>, _>(
+ TryJoin4::new(future1, future2, future3, future4),
+ )
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let d = future::ready(Ok::<i32, i32>(4));
+/// let e = future::ready(Ok::<i32, i32>(5));
+/// let tuple = future::try_join5(a, b, c, d, e);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3, 4, 5)));
+/// # });
+/// ```
+pub fn try_join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+ future5: Fut5,
+) -> TryJoin5<Fut1, Fut2, Fut3, Fut4, Fut5>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+ Fut4: TryFuture<Error = Fut1::Error>,
+ Fut5: TryFuture<Error = Fut1::Error>,
+{
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok, Fut5::Ok), Fut1::Error>, _>(
+ TryJoin5::new(future1, future2, future3, future4, future5),
+ )
+}
diff --git a/third_party/rust/futures-util/src/future/try_join_all.rs b/third_party/rust/futures-util/src/future/try_join_all.rs
new file mode 100644
index 0000000000..506f450657
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_join_all.rs
@@ -0,0 +1,200 @@
+//! Definition of the `TryJoinAll` combinator, waiting for all of a list of
+//! futures to finish with either success or error.
+
+use alloc::boxed::Box;
+use alloc::vec::Vec;
+use core::fmt;
+use core::future::Future;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};
+
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};
+use crate::TryFutureExt;
+
+enum FinalState<E = ()> {
+ Pending,
+ AllDone,
+ Error(E),
+}
+
+/// Future for the [`try_join_all`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct TryJoinAll<F>
+where
+ F: TryFuture,
+{
+ kind: TryJoinAllKind<F>,
+}
+
+enum TryJoinAllKind<F>
+where
+ F: TryFuture,
+{
+ Small {
+ elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
+ },
+}
+
+impl<F> fmt::Debug for TryJoinAll<F>
+where
+ F: TryFuture + fmt::Debug,
+ F::Ok: fmt::Debug,
+ F::Error: fmt::Debug,
+ F::Output: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.kind {
+ TryJoinAllKind::Small { ref elems } => {
+ f.debug_struct("TryJoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
+ }
+}
+
+/// Creates a future which represents either a collection of the results of the
+/// futures given or an error.
+///
+/// The returned future will drive execution for all of its underlying futures,
+/// collecting the results into a destination `Vec<T>` in the same order as they
+/// were provided.
+///
+/// If any future returns an error then all other futures will be canceled and
+/// an error will be returned immediately. If all futures complete successfully,
+/// however, then the returned future will succeed with a `Vec` of all the
+/// successful results.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # See Also
+///
+/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+///
+/// Some examples for additional functionality provided by these are:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::{self, try_join_all};
+///
+/// let futures = vec![
+/// future::ok::<u32, u32>(1),
+/// future::ok::<u32, u32>(2),
+/// future::ok::<u32, u32>(3),
+/// ];
+///
+/// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3]));
+///
+/// let futures = vec![
+/// future::ok::<u32, u32>(1),
+/// future::err::<u32, u32>(2),
+/// future::ok::<u32, u32>(3),
+/// ];
+///
+/// assert_eq!(try_join_all(futures).await, Err(2));
+/// # });
+/// ```
+pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>
+where
+ I: IntoIterator,
+ I::Item: TryFuture,
+{
+ let iter = iter.into_iter().map(TryFutureExt::into_future);
+
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let kind = TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
+
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let kind = match iter.size_hint().1 {
+ Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() },
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
+}
+
+impl<F> Future for TryJoinAll<F>
+where
+ F: TryFuture,
+{
+ type Output = Result<Vec<F::Ok>, F::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match &mut self.kind {
+ TryJoinAllKind::Small { elems } => {
+ let mut state = FinalState::AllDone;
+
+ for elem in join_all::iter_pin_mut(elems.as_mut()) {
+ match elem.try_poll(cx) {
+ Poll::Pending => state = FinalState::Pending,
+ Poll::Ready(Ok(())) => {}
+ Poll::Ready(Err(e)) => {
+ state = FinalState::Error(e);
+ break;
+ }
+ }
+ }
+
+ match state {
+ FinalState::Pending => Poll::Pending,
+ FinalState::AllDone => {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let results = join_all::iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_output().unwrap())
+ .collect();
+ Poll::Ready(Ok(results))
+ }
+ FinalState::Error(e) => {
+ let _ = mem::replace(elems, Box::pin([]));
+ Poll::Ready(Err(e))
+ }
+ }
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
+ }
+ }
+}
+
+impl<F> FromIterator<F> for TryJoinAll<F>
+where
+ F: TryFuture,
+{
+ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
+ try_join_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_maybe_done.rs b/third_party/rust/futures-util/src/future/try_maybe_done.rs
new file mode 100644
index 0000000000..24044d2c27
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_maybe_done.rs
@@ -0,0 +1,92 @@
+//! Definition of the TryMaybeDone combinator
+
+use super::assert_future;
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+
+/// A future that may have completed with an error.
+///
+/// This is created by the [`try_maybe_done()`] function.
+#[derive(Debug)]
+pub enum TryMaybeDone<Fut: TryFuture> {
+ /// A not-yet-completed future
+ Future(/* #[pin] */ Fut),
+ /// The output of the completed future
+ Done(Fut::Ok),
+ /// The empty variant after the result of a [`TryMaybeDone`] has been
+ /// taken using the [`take_output`](TryMaybeDone::take_output) method,
+ /// or if the future returned an error.
+ Gone,
+}
+
+impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {}
+
+/// Wraps a future into a `TryMaybeDone`
+pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> {
+ assert_future::<Result<(), Fut::Error>, _>(TryMaybeDone::Future(future))
+}
+
+impl<Fut: TryFuture> TryMaybeDone<Fut> {
+ /// Returns an [`Option`] containing a mutable reference to the output of the future.
+ /// The output of this method will be [`Some`] if and only if the inner
+ /// future has completed successfully and [`take_output`](TryMaybeDone::take_output)
+ /// has not yet been called.
+ #[inline]
+ pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ TryMaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
+ }
+ }
+
+ /// Attempt to take the output of a `TryMaybeDone` without driving it
+ /// towards completion.
+ #[inline]
+ pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> {
+ match &*self {
+ Self::Done(_) => {}
+ Self::Future(_) | Self::Gone => return None,
+ }
+ unsafe {
+ match mem::replace(self.get_unchecked_mut(), Self::Gone) {
+ TryMaybeDone::Done(output) => Some(output),
+ _ => unreachable!(),
+ }
+ }
+ }
+}
+
+impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> {
+ fn is_terminated(&self) -> bool {
+ match self {
+ Self::Future(_) => false,
+ Self::Done(_) | Self::Gone => true,
+ }
+ }
+}
+
+impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
+ type Output = Result<(), Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) {
+ Ok(res) => self.set(Self::Done(res)),
+ Err(e) => {
+ self.set(Self::Gone);
+ return Poll::Ready(Err(e));
+ }
+ },
+ TryMaybeDone::Done(_) => {}
+ TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"),
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_select.rs b/third_party/rust/futures-util/src/future/try_select.rs
new file mode 100644
index 0000000000..4d0b7ff135
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_select.rs
@@ -0,0 +1,84 @@
+use crate::future::{Either, TryFutureExt};
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`try_select()`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct TrySelect<A, B> {
+ inner: Option<(A, B)>,
+}
+
+impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
+
+/// Waits for either one of two differently-typed futures to complete.
+///
+/// This function will return a new future which awaits for either one of both
+/// futures to complete. The returned future will finish with both the value
+/// resolved and a future representing the completion of the other work.
+///
+/// Note that this function consumes the receiving futures and returns a
+/// wrapped version of them.
+///
+/// Also note that if both this and the second future have the same
+/// success/error type you can use the `Either::factor_first` method to
+/// conveniently extract out the value at the end.
+///
+/// # Examples
+///
+/// ```
+/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt};
+///
+/// // A poor-man's try_join implemented on top of select
+///
+/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E>
+/// where A: TryFuture<Error = E> + Unpin + 'static,
+/// B: TryFuture<Error = E> + Unpin + 'static,
+/// E: 'static,
+/// {
+/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> {
+/// match res {
+/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))),
+/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))),
+/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
+/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
+/// }
+/// })
+/// }
+/// ```
+pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
+where
+ A: TryFuture + Unpin,
+ B: TryFuture + Unpin,
+{
+ super::assert_future::<
+ Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
+ _,
+ >(TrySelect { inner: Some((future1, future2)) })
+}
+
+impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
+where
+ A: TryFuture,
+ B: TryFuture,
+{
+ #[allow(clippy::type_complexity)]
+ type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
+ match a.try_poll_unpin(cx) {
+ Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
+ Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
+ Poll::Pending => match b.try_poll_unpin(cx) {
+ Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
+ Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
+ Poll::Pending => {
+ self.inner = Some((a, b));
+ Poll::Pending
+ }
+ },
+ }
+ }
+}