summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/future
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-util/src/future')
-rw-r--r--third_party/rust/futures-util/src/future/abortable.rs177
-rw-r--r--third_party/rust/futures-util/src/future/either.rs304
-rw-r--r--third_party/rust/futures-util/src/future/future/catch_unwind.rs31
-rw-r--r--third_party/rust/futures-util/src/future/future/chain.rs58
-rw-r--r--third_party/rust/futures-util/src/future/future/flatten.rs56
-rw-r--r--third_party/rust/futures-util/src/future/future/flatten_stream.rs89
-rw-r--r--third_party/rust/futures-util/src/future/future/fuse.rs90
-rw-r--r--third_party/rust/futures-util/src/future/future/inspect.rs47
-rw-r--r--third_party/rust/futures-util/src/future/future/into_stream.rs43
-rw-r--r--third_party/rust/futures-util/src/future/future/map.rs49
-rw-r--r--third_party/rust/futures-util/src/future/future/mod.rs558
-rw-r--r--third_party/rust/futures-util/src/future/future/never_error.rs36
-rw-r--r--third_party/rust/futures-util/src/future/future/remote_handle.rs114
-rw-r--r--third_party/rust/futures-util/src/future/future/shared.rs335
-rw-r--r--third_party/rust/futures-util/src/future/future/then.rs46
-rw-r--r--third_party/rust/futures-util/src/future/future/unit_error.rs35
-rw-r--r--third_party/rust/futures-util/src/future/join.rs214
-rw-r--r--third_party/rust/futures-util/src/future/join_all.rs159
-rw-r--r--third_party/rust/futures-util/src/future/lazy.rs54
-rw-r--r--third_party/rust/futures-util/src/future/maybe_done.rs104
-rw-r--r--third_party/rust/futures-util/src/future/mod.rs110
-rw-r--r--third_party/rust/futures-util/src/future/option.rs62
-rw-r--r--third_party/rust/futures-util/src/future/pending.rs56
-rw-r--r--third_party/rust/futures-util/src/future/poll_fn.rs56
-rw-r--r--third_party/rust/futures-util/src/future/ready.rs81
-rw-r--r--third_party/rust/futures-util/src/future/select.rs83
-rw-r--r--third_party/rust/futures-util/src/future/select_all.rs69
-rw-r--r--third_party/rust/futures-util/src/future/select_ok.rs83
-rw-r--r--third_party/rust/futures-util/src/future/try_future/and_then.rs53
-rw-r--r--third_party/rust/futures-util/src/future/try_future/err_into.rs48
-rw-r--r--third_party/rust/futures-util/src/future/try_future/flatten_sink.rs76
-rw-r--r--third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs181
-rw-r--r--third_party/rust/futures-util/src/future/try_future/inspect_err.rs53
-rw-r--r--third_party/rust/futures-util/src/future/try_future/inspect_ok.rs53
-rw-r--r--third_party/rust/futures-util/src/future/try_future/into_future.rs36
-rw-r--r--third_party/rust/futures-util/src/future/try_future/map_err.rs52
-rw-r--r--third_party/rust/futures-util/src/future/try_future/map_ok.rs54
-rw-r--r--third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs59
-rw-r--r--third_party/rust/futures-util/src/future/try_future/mod.rs536
-rw-r--r--third_party/rust/futures-util/src/future/try_future/or_else.rs56
-rw-r--r--third_party/rust/futures-util/src/future/try_future/try_chain.rs108
-rw-r--r--third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs91
-rw-r--r--third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs55
-rw-r--r--third_party/rust/futures-util/src/future/try_join.rs262
-rw-r--r--third_party/rust/futures-util/src/future/try_join_all.rs179
-rw-r--r--third_party/rust/futures-util/src/future/try_select.rs80
46 files changed, 5231 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/future/abortable.rs b/third_party/rust/futures-util/src/future/abortable.rs
new file mode 100644
index 0000000000..281cf6b481
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/abortable.rs
@@ -0,0 +1,177 @@
+use crate::task::AtomicWaker;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+use core::fmt;
+use core::pin::Pin;
+use core::sync::atomic::{AtomicBool, Ordering};
+use alloc::sync::Arc;
+
+/// A future which can be remotely short-circuited using an `AbortHandle`.
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Abortable<Fut> {
+ future: Fut,
+ inner: Arc<AbortInner>,
+}
+
+impl<Fut: Unpin> Unpin for Abortable<Fut> {}
+
+impl<Fut> Abortable<Fut> where Fut: Future {
+ unsafe_pinned!(future: Fut);
+
+ /// Creates a new `Abortable` future using an existing `AbortRegistration`.
+ /// `AbortRegistration`s can be acquired through `AbortHandle::new`.
+ ///
+ /// When `abort` is called on the handle tied to `reg` or if `abort` has
+ /// already been called, the future will complete immediately without making
+ /// any further progress.
+ ///
+ /// Example:
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::{Abortable, AbortHandle, Aborted};
+ ///
+ /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
+ /// let future = Abortable::new(async { 2 }, abort_registration);
+ /// abort_handle.abort();
+ /// assert_eq!(future.await, Err(Aborted));
+ /// # });
+ /// ```
+ pub fn new(future: Fut, reg: AbortRegistration) -> Self {
+ Abortable {
+ future,
+ inner: reg.inner,
+ }
+ }
+}
+
+/// A registration handle for a `Abortable` future.
+/// Values of this type can be acquired from `AbortHandle::new` and are used
+/// in calls to `Abortable::new`.
+#[derive(Debug)]
+pub struct AbortRegistration {
+ inner: Arc<AbortInner>,
+}
+
+/// A handle to a `Abortable` future.
+#[derive(Debug, Clone)]
+pub struct AbortHandle {
+ inner: Arc<AbortInner>,
+}
+
+impl AbortHandle {
+ /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
+ /// to abort a running future.
+ ///
+ /// This function is usually paired with a call to `Abortable::new`.
+ ///
+ /// Example:
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::{Abortable, AbortHandle, Aborted};
+ ///
+ /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
+ /// let future = Abortable::new(async { 2 }, abort_registration);
+ /// abort_handle.abort();
+ /// assert_eq!(future.await, Err(Aborted));
+ /// # });
+ /// ```
+ pub fn new_pair() -> (Self, AbortRegistration) {
+ let inner = Arc::new(AbortInner {
+ waker: AtomicWaker::new(),
+ cancel: AtomicBool::new(false),
+ });
+
+ (
+ AbortHandle {
+ inner: inner.clone(),
+ },
+ AbortRegistration {
+ inner,
+ },
+ )
+ }
+}
+
+// Inner type storing the waker to awaken and a bool indicating that it
+// should be cancelled.
+#[derive(Debug)]
+struct AbortInner {
+ waker: AtomicWaker,
+ cancel: AtomicBool,
+}
+
+/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it.
+///
+/// This function is a convenient (but less flexible) alternative to calling
+/// `AbortHandle::new` and `Abortable::new` manually.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
+ where Fut: Future
+{
+ let (handle, reg) = AbortHandle::new_pair();
+ (
+ Abortable::new(future, reg),
+ handle,
+ )
+}
+
+/// Indicator that the `Abortable` future was aborted.
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub struct Aborted;
+
+impl fmt::Display for Aborted {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "`Abortable` future has been aborted")
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for Aborted {}
+
+impl<Fut> Future for Abortable<Fut> where Fut: Future {
+ type Output = Result<Fut::Output, Aborted>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // Check if the future has been aborted
+ if self.inner.cancel.load(Ordering::Relaxed) {
+ return Poll::Ready(Err(Aborted))
+ }
+
+ // attempt to complete the future
+ if let Poll::Ready(x) = self.as_mut().future().poll(cx) {
+ return Poll::Ready(Ok(x))
+ }
+
+ // Register to receive a wakeup if the future is aborted in the... future
+ self.inner.waker.register(cx.waker());
+
+ // Check to see if the future was aborted between the first check and
+ // registration.
+ // Checking with `Relaxed` is sufficient because `register` introduces an
+ // `AcqRel` barrier.
+ if self.inner.cancel.load(Ordering::Relaxed) {
+ return Poll::Ready(Err(Aborted))
+ }
+
+ Poll::Pending
+ }
+}
+
+impl AbortHandle {
+ /// Abort the `Abortable` future associated with this handle.
+ ///
+ /// Notifies the Abortable future associated with this handle that it
+ /// should abort. Note that if the future is currently being polled on
+ /// another thread, it will not immediately stop running. Instead, it will
+ /// continue to run until its poll method returns.
+ pub fn abort(&self) {
+ self.inner.cancel.store(true, Ordering::Relaxed);
+ self.inner.waker.wake();
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/either.rs b/third_party/rust/futures-util/src/future/either.rs
new file mode 100644
index 0000000000..24fbbe79d8
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/either.rs
@@ -0,0 +1,304 @@
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::{FusedStream, Stream};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+/// Combines two different futures, streams, or sinks having the same associated types into a single
+/// type.
+#[derive(Debug, Clone)]
+pub enum Either<A, B> {
+ /// First branch of the type
+ Left(A),
+ /// Second branch of the type
+ Right(B),
+}
+
+impl<A, B, T> Either<(T, A), (T, B)> {
+ /// Factor out a homogeneous type from an either of pairs.
+ ///
+ /// Here, the homogeneous type is the first element of the pairs.
+ pub fn factor_first(self) -> (T, Either<A, B>) {
+ match self {
+ Either::Left((x, a)) => (x, Either::Left(a)),
+ Either::Right((x, b)) => (x, Either::Right(b)),
+ }
+ }
+}
+
+impl<A, B, T> Either<(A, T), (B, T)> {
+ /// Factor out a homogeneous type from an either of pairs.
+ ///
+ /// Here, the homogeneous type is the second element of the pairs.
+ pub fn factor_second(self) -> (Either<A, B>, T) {
+ match self {
+ Either::Left((a, x)) => (Either::Left(a), x),
+ Either::Right((b, x)) => (Either::Right(b), x),
+ }
+ }
+}
+
+impl<T> Either<T, T> {
+ /// Extract the value of an either over two equivalent types.
+ pub fn into_inner(self) -> T {
+ match self {
+ Either::Left(x) => x,
+ Either::Right(x) => x,
+ }
+ }
+}
+
+impl<A, B> Future for Either<A, B>
+where
+ A: Future,
+ B: Future<Output = A::Output>,
+{
+ type Output = A::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<A::Output> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll(cx),
+ }
+ }
+ }
+}
+
+impl<A, B> FusedFuture for Either<A, B>
+where
+ A: FusedFuture,
+ B: FusedFuture<Output = A::Output>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Either::Left(x) => x.is_terminated(),
+ Either::Right(x) => x.is_terminated(),
+ }
+ }
+}
+
+impl<A, B> Stream for Either<A, B>
+where
+ A: Stream,
+ B: Stream<Item = A::Item>,
+{
+ type Item = A::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<A::Item>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_next(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_next(cx),
+ }
+ }
+ }
+}
+
+impl<A, B> FusedStream for Either<A, B>
+where
+ A: FusedStream,
+ B: FusedStream<Item = A::Item>,
+{
+ fn is_terminated(&self) -> bool {
+ match self {
+ Either::Left(x) => x.is_terminated(),
+ Either::Right(x) => x.is_terminated(),
+ }
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<A, B, Item> Sink<Item> for Either<A, B>
+where
+ A: Sink<Item>,
+ B: Sink<Item, Error = A::Error>,
+{
+ type Error = A::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_ready(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_ready(cx),
+ }
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).start_send(item),
+ Either::Right(x) => Pin::new_unchecked(x).start_send(item),
+ }
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx),
+ }
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_close(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_close(cx),
+ }
+ }
+ }
+}
+
+#[cfg(feature = "io")]
+#[cfg(feature = "std")]
+mod if_std {
+ use super::Either;
+ use core::pin::Pin;
+ use core::task::{Context, Poll};
+ #[cfg(feature = "read-initializer")]
+ use futures_io::Initializer;
+ use futures_io::{
+ AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
+ };
+
+ impl<A, B> AsyncRead for Either<A, B>
+ where
+ A: AsyncRead,
+ B: AsyncRead,
+ {
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ match self {
+ Either::Left(x) => x.initializer(),
+ Either::Right(x) => x.initializer(),
+ }
+ }
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_read(cx, buf),
+ Either::Right(x) => Pin::new_unchecked(x).poll_read(cx, buf),
+ }
+ }
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<Result<usize>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_read_vectored(cx, bufs),
+ Either::Right(x) => Pin::new_unchecked(x).poll_read_vectored(cx, bufs),
+ }
+ }
+ }
+ }
+
+ impl<A, B> AsyncWrite for Either<A, B>
+ where
+ A: AsyncWrite,
+ B: AsyncWrite,
+ {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_write(cx, buf),
+ Either::Right(x) => Pin::new_unchecked(x).poll_write(cx, buf),
+ }
+ }
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<Result<usize>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_write_vectored(cx, bufs),
+ Either::Right(x) => Pin::new_unchecked(x).poll_write_vectored(cx, bufs),
+ }
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx),
+ }
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_close(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_close(cx),
+ }
+ }
+ }
+ }
+
+ impl<A, B> AsyncSeek for Either<A, B>
+ where
+ A: AsyncSeek,
+ B: AsyncSeek,
+ {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<Result<u64>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_seek(cx, pos),
+ Either::Right(x) => Pin::new_unchecked(x).poll_seek(cx, pos),
+ }
+ }
+ }
+ }
+
+ impl<A, B> AsyncBufRead for Either<A, B>
+ where
+ A: AsyncBufRead,
+ B: AsyncBufRead,
+ {
+ fn poll_fill_buf(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<&[u8]>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).poll_fill_buf(cx),
+ Either::Right(x) => Pin::new_unchecked(x).poll_fill_buf(cx),
+ }
+ }
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(x) => Pin::new_unchecked(x).consume(amt),
+ Either::Right(x) => Pin::new_unchecked(x).consume(amt),
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/catch_unwind.rs b/third_party/rust/futures-util/src/future/future/catch_unwind.rs
new file mode 100644
index 0000000000..e88cce7e9d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/catch_unwind.rs
@@ -0,0 +1,31 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+use std::any::Any;
+use std::pin::Pin;
+use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
+
+/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct CatchUnwind<Fut> {
+ future: Fut,
+}
+
+impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
+ unsafe_pinned!(future: Fut);
+
+ pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
+ CatchUnwind { future }
+ }
+}
+
+impl<Fut> Future for CatchUnwind<Fut>
+ where Fut: Future + UnwindSafe,
+{
+ type Output = Result<Fut::Output, Box<dyn Any + Send>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/chain.rs b/third_party/rust/futures-util/src/future/future/chain.rs
new file mode 100644
index 0000000000..3f248e80fe
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/chain.rs
@@ -0,0 +1,58 @@
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub(crate) enum Chain<Fut1, Fut2, Data> {
+ First(Fut1, Option<Data>),
+ Second(Fut2),
+ Empty,
+}
+
+impl<Fut1: Unpin, Fut2: Unpin, Data> Unpin for Chain<Fut1, Fut2, Data> {}
+
+impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data> {
+ pub(crate)fn is_terminated(&self) -> bool {
+ if let Chain::Empty = *self { true } else { false }
+ }
+}
+
+impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
+ where Fut1: Future,
+ Fut2: Future,
+{
+ pub(crate) fn new(fut1: Fut1, data: Data) -> Chain<Fut1, Fut2, Data> {
+ Chain::First(fut1, Some(data))
+ }
+
+ pub(crate) fn poll<F>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ f: F,
+ ) -> Poll<Fut2::Output>
+ where F: FnOnce(Fut1::Output, Data) -> Fut2,
+ {
+ let mut f = Some(f);
+
+ // Safe to call `get_unchecked_mut` because we won't move the futures.
+ let this = unsafe { self.get_unchecked_mut() };
+
+ loop {
+ let (output, data) = match this {
+ Chain::First(fut1, data) => {
+ let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx));
+ (output, data.take().unwrap())
+ }
+ Chain::Second(fut2) => {
+ return unsafe { Pin::new_unchecked(fut2) }.poll(cx);
+ }
+ Chain::Empty => unreachable!()
+ };
+
+ *this = Chain::Empty; // Drop fut1
+ let fut2 = (f.take().unwrap())(output, data);
+ *this = Chain::Second(fut2)
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/flatten.rs b/third_party/rust/futures-util/src/future/future/flatten.rs
new file mode 100644
index 0000000000..16b3a19de9
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/flatten.rs
@@ -0,0 +1,56 @@
+use super::chain::Chain;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`flatten`](super::FutureExt::flatten) method.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Flatten<Fut>
+ where Fut: Future,
+{
+ state: Chain<Fut, Fut::Output, ()>,
+}
+
+impl<Fut> Flatten<Fut>
+ where Fut: Future,
+ Fut::Output: Future,
+{
+ unsafe_pinned!(state: Chain<Fut, Fut::Output, ()>);
+
+ pub(super) fn new(future: Fut) -> Flatten<Fut> {
+ Flatten {
+ state: Chain::new(future, ()),
+ }
+ }
+}
+
+impl<Fut> fmt::Debug for Flatten<Fut>
+ where Fut: Future + fmt::Debug,
+ Fut::Output: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Flatten")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+
+impl<Fut> FusedFuture for Flatten<Fut>
+ where Fut: Future,
+ Fut::Output: Future,
+{
+ fn is_terminated(&self) -> bool { self.state.is_terminated() }
+}
+
+impl<Fut> Future for Flatten<Fut>
+ where Fut: Future,
+ Fut::Output: Future,
+{
+ type Output = <Fut::Output as Future>::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.state().poll(cx, |a, ()| a)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/flatten_stream.rs b/third_party/rust/futures-util/src/future/future/flatten_stream.rs
new file mode 100644
index 0000000000..d1108866ca
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/flatten_stream.rs
@@ -0,0 +1,89 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct FlattenStream<Fut: Future> {
+ state: State<Fut, Fut::Output>,
+}
+
+impl<Fut: Future> FlattenStream<Fut> {
+ unsafe_pinned!(state: State<Fut, Fut::Output>);
+
+ pub(super) fn new(future: Fut) -> FlattenStream<Fut> {
+ FlattenStream {
+ state: State::Future(future)
+ }
+ }
+}
+
+impl<Fut> fmt::Debug for FlattenStream<Fut>
+ where Fut: Future + fmt::Debug,
+ Fut::Output: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenStream")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+
+#[derive(Debug)]
+enum State<Fut, St> {
+ // future is not yet called or called and not ready
+ Future(Fut),
+ // future resolved to Stream
+ Stream(St),
+}
+
+impl<Fut, St> State<Fut, St> {
+ fn get_pin_mut(self: Pin<&mut Self>) -> State<Pin<&mut Fut>, Pin<&mut St>> {
+ // safety: data is never moved via the resulting &mut reference
+ match unsafe { self.get_unchecked_mut() } {
+ // safety: the future we're re-pinning here will never be moved;
+ // it will just be polled, then dropped in place
+ State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }),
+ // safety: the stream we're repinning here will never be moved;
+ // it will just be polled, then dropped in place
+ State::Stream(s) => State::Stream(unsafe { Pin::new_unchecked(s) }),
+ }
+ }
+}
+
+impl<Fut> FusedStream for FlattenStream<Fut>
+ where Fut: Future,
+ Fut::Output: Stream + FusedStream,
+{
+ fn is_terminated(&self) -> bool {
+ match &self.state {
+ State::Future(_) => false,
+ State::Stream(stream) => stream.is_terminated(),
+ }
+ }
+}
+
+impl<Fut> Stream for FlattenStream<Fut>
+ where Fut: Future,
+ Fut::Output: Stream,
+{
+ type Item = <Fut::Output as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ loop {
+ match self.as_mut().state().get_pin_mut() {
+ State::Future(f) => {
+ let stream = ready!(f.poll(cx));
+ // Future resolved to stream.
+ // We do not return, but poll that
+ // stream in the next loop iteration.
+ self.as_mut().state().set(State::Stream(stream));
+ }
+ State::Stream(s) => return s.poll_next(cx),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/fuse.rs b/third_party/rust/futures-util/src/future/future/fuse.rs
new file mode 100644
index 0000000000..b5ef913034
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/fuse.rs
@@ -0,0 +1,90 @@
+use core::pin::Pin;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`fuse`](super::FutureExt::fuse) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Fuse<Fut> {
+ future: Option<Fut>,
+}
+
+impl<Fut: Future> Fuse<Fut> {
+ unsafe_pinned!(future: Option<Fut>);
+
+ pub(super) fn new(f: Fut) -> Fuse<Fut> {
+ Fuse {
+ future: Some(f),
+ }
+ }
+
+ /// Creates a new `Fuse`-wrapped future which is already terminated.
+ ///
+ /// This can be useful in combination with looping and the `select!`
+ /// macro, which bypasses terminated futures.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::future::{Fuse, FusedFuture, FutureExt};
+ /// use futures::select;
+ /// use futures::stream::StreamExt;
+ /// use futures::pin_mut;
+ ///
+ /// let (sender, mut stream) = mpsc::unbounded();
+ ///
+ /// // Send a few messages into the stream
+ /// sender.unbounded_send(()).unwrap();
+ /// sender.unbounded_send(()).unwrap();
+ /// drop(sender);
+ ///
+ /// // Use `Fuse::termianted()` to create an already-terminated future
+ /// // which may be instantiated later.
+ /// let foo_printer = Fuse::terminated();
+ /// pin_mut!(foo_printer);
+ ///
+ /// loop {
+ /// select! {
+ /// _ = foo_printer => {},
+ /// () = stream.select_next_some() => {
+ /// if !foo_printer.is_terminated() {
+ /// println!("Foo is already being printed!");
+ /// } else {
+ /// foo_printer.set(async {
+ /// // do some other async operations
+ /// println!("Printing foo from `foo_printer` future");
+ /// }.fuse());
+ /// }
+ /// },
+ /// complete => break, // `foo_printer` is terminated and the stream is done
+ /// }
+ /// }
+ /// # });
+ /// ```
+ pub fn terminated() -> Fuse<Fut> {
+ Fuse { future: None }
+ }
+}
+
+impl<Fut: Future> FusedFuture for Fuse<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.future.is_none()
+ }
+}
+
+impl<Fut: Future> Future for Fuse<Fut> {
+ type Output = Fut::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
+ let v = match self.as_mut().future().as_pin_mut() {
+ Some(fut) => ready!(fut.poll(cx)),
+ None => return Poll::Pending,
+ };
+
+ self.as_mut().future().set(None);
+ Poll::Ready(v)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/inspect.rs b/third_party/rust/futures-util/src/future/future/inspect.rs
new file mode 100644
index 0000000000..d67455aa6d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/inspect.rs
@@ -0,0 +1,47 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`inspect`](super::FutureExt::inspect) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Inspect<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ pub(super) fn new(future: Fut, f: F) -> Inspect<Fut, F> {
+ Inspect {
+ future,
+ f: Some(f),
+ }
+ }
+}
+
+impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}
+
+impl<Fut, F> FusedFuture for Inspect<Fut, F>
+ where Fut: FusedFuture,
+ F: FnOnce(&Fut::Output),
+{
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
+}
+
+impl<Fut, F> Future for Inspect<Fut, F>
+ where Fut: Future,
+ F: FnOnce(&Fut::Output),
+{
+ type Output = Fut::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
+ let e = ready!(self.as_mut().future().poll(cx));
+ let f = self.as_mut().f().take().expect("cannot poll Inspect twice");
+ f(&e);
+ Poll::Ready(e)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/into_stream.rs b/third_party/rust/futures-util/src/future/future/into_stream.rs
new file mode 100644
index 0000000000..616c4cbb57
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/into_stream.rs
@@ -0,0 +1,43 @@
+use crate::stream::{self, Once};
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::stream::{Stream, FusedStream};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Stream for the [`into_stream`](super::FutureExt::into_stream) method.
+#[must_use = "streams do nothing unless polled"]
+#[derive(Debug)]
+pub struct IntoStream<Fut> {
+ inner: Once<Fut>
+}
+
+impl<Fut: Future> IntoStream<Fut> {
+ unsafe_pinned!(inner: Once<Fut>);
+
+ pub(super) fn new(future: Fut) -> IntoStream<Fut> {
+ IntoStream {
+ inner: stream::once(future)
+ }
+ }
+}
+
+impl<Fut: Future> Stream for IntoStream<Fut> {
+ type Item = Fut::Output;
+
+ #[inline]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner().poll_next(cx)
+ }
+
+ #[inline]
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.inner.size_hint()
+ }
+}
+
+impl<Fut: Future> FusedStream for IntoStream<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/map.rs b/third_party/rust/futures-util/src/future/future/map.rs
new file mode 100644
index 0000000000..b5fbfb1384
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/map.rs
@@ -0,0 +1,49 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`map`](super::FutureExt::map) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Map<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut, F> Map<Fut, F> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ /// Creates a new Map.
+ pub(super) fn new(future: Fut, f: F) -> Map<Fut, F> {
+ Map { future, f: Some(f) }
+ }
+}
+
+impl<Fut: Unpin, F> Unpin for Map<Fut, F> {}
+
+impl<Fut, F, T> FusedFuture for Map<Fut, F>
+ where Fut: Future,
+ F: FnOnce(Fut::Output) -> T,
+{
+ fn is_terminated(&self) -> bool { self.f.is_none() }
+}
+
+impl<Fut, F, T> Future for Map<Fut, F>
+ where Fut: Future,
+ F: FnOnce(Fut::Output) -> T,
+{
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ self.as_mut()
+ .future()
+ .poll(cx)
+ .map(|output| {
+ let f = self.f().take()
+ .expect("Map must not be polled after it returned `Poll::Ready`");
+ f(output)
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/mod.rs b/third_party/rust/futures-util/src/future/future/mod.rs
new file mode 100644
index 0000000000..e58cafc8c0
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/mod.rs
@@ -0,0 +1,558 @@
+//! Futures
+//!
+//! This module contains a number of functions for working with `Future`s,
+//! including the `FutureExt` trait which adds methods to `Future` types.
+
+use super::{assert_future, Either};
+#[cfg(feature = "alloc")]
+use alloc::boxed::Box;
+use core::pin::Pin;
+#[cfg(feature = "alloc")]
+use futures_core::future::{BoxFuture, LocalBoxFuture};
+use futures_core::{
+ future::Future,
+ stream::Stream,
+ task::{Context, Poll},
+};
+
+// Combinators
+
+mod flatten;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::flatten::Flatten;
+
+mod flatten_stream;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::flatten_stream::FlattenStream;
+
+mod fuse;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::fuse::Fuse;
+
+mod into_stream;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::into_stream::IntoStream;
+
+mod map;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::map::Map;
+
+mod then;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::then::Then;
+
+mod inspect;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::inspect::Inspect;
+
+mod unit_error;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::unit_error::UnitError;
+
+mod never_error;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::never_error::NeverError;
+
+#[cfg(feature = "std")]
+mod catch_unwind;
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::catch_unwind::CatchUnwind;
+
+#[cfg(feature = "channel")]
+#[cfg(feature = "std")]
+mod remote_handle;
+#[cfg(feature = "channel")]
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::remote_handle::{Remote, RemoteHandle};
+
+#[cfg(feature = "std")]
+mod shared;
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::shared::Shared;
+
+// Implementation details
+
+mod chain;
+pub(crate) use self::chain::Chain;
+
+impl<T: ?Sized> FutureExt for T where T: Future {}
+
+/// An extension trait for `Future`s that provides a variety of convenient
+/// adapters.
+pub trait FutureExt: Future {
+ /// Map this future's output to a different type, returning a new future of
+ /// the resulting type.
+ ///
+ /// This function is similar to the `Option::map` or `Iterator::map` where
+ /// it will change the type of the underlying future. This is useful to
+ /// chain along a computation once a future has been resolved.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 1 };
+ /// let new_future = future.map(|x| x + 3);
+ /// assert_eq!(new_future.await, 4);
+ /// # });
+ /// ```
+ fn map<U, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnOnce(Self::Output) -> U,
+ Self: Sized,
+ {
+ assert_future::<U, _>(Map::new(self, f))
+ }
+
+ /// Chain on a computation for when a future finished, passing the result of
+ /// the future to the provided closure `f`.
+ ///
+ /// The returned value of the closure must implement the `Future` trait
+ /// and can represent some more work to be done before the composed future
+ /// is finished.
+ ///
+ /// The closure `f` is only run *after* successful completion of the `self`
+ /// future.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future_of_1 = async { 1 };
+ /// let future_of_4 = future_of_1.then(|x| async move { x + 3 });
+ /// assert_eq!(future_of_4.await, 4);
+ /// # });
+ /// ```
+ fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
+ where
+ F: FnOnce(Self::Output) -> Fut,
+ Fut: Future,
+ Self: Sized,
+ {
+ assert_future::<Fut::Output, _>(Then::new(self, f))
+ }
+
+ /// Wrap this future in an `Either` future, making it the left-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `right_future` method to write `if`
+ /// statements that evaluate to different futures in different branches.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let x = 6;
+ /// let future = if x < 10 {
+ /// async { true }.left_future()
+ /// } else {
+ /// async { false }.right_future()
+ /// };
+ ///
+ /// assert_eq!(future.await, true);
+ /// # });
+ /// ```
+ fn left_future<B>(self) -> Either<Self, B>
+ where
+ B: Future<Output = Self::Output>,
+ Self: Sized,
+ {
+ Either::Left(self)
+ }
+
+ /// Wrap this future in an `Either` future, making it the right-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `left_future` method to write `if`
+ /// statements that evaluate to different futures in different branches.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let x = 6;
+ /// let future = if x > 10 {
+ /// async { true }.left_future()
+ /// } else {
+ /// async { false }.right_future()
+ /// };
+ ///
+ /// assert_eq!(future.await, false);
+ /// # });
+ /// ```
+ fn right_future<A>(self) -> Either<A, Self>
+ where
+ A: Future<Output = Self::Output>,
+ Self: Sized,
+ {
+ Either::Right(self)
+ }
+
+ /// Convert this future into a single element stream.
+ ///
+ /// The returned stream contains single success if this future resolves to
+ /// success or single error if this future resolves into error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::stream::StreamExt;
+ ///
+ /// let future = async { 17 };
+ /// let stream = future.into_stream();
+ /// let collected: Vec<_> = stream.collect().await;
+ /// assert_eq!(collected, vec![17]);
+ /// # });
+ /// ```
+ fn into_stream(self) -> IntoStream<Self>
+ where
+ Self: Sized,
+ {
+ IntoStream::new(self)
+ }
+
+ /// Flatten the execution of this future when the output of this
+ /// future is itself another future.
+ ///
+ /// This can be useful when combining futures together to flatten the
+ /// computation out the final result.
+ ///
+ /// This method is roughly equivalent to `self.then(|x| x)`.
+ ///
+ /// Note that this function consumes the receiving future and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let nested_future = async { async { 1 } };
+ /// let future = nested_future.flatten();
+ /// assert_eq!(future.await, 1);
+ /// # });
+ /// ```
+ fn flatten(self) -> Flatten<Self>
+ where
+ Self::Output: Future,
+ Self: Sized,
+ {
+ let f = Flatten::new(self);
+ assert_future::<<<Self as Future>::Output as Future>::Output, _>(f)
+ }
+
+ /// Flatten the execution of this future when the successful result of this
+ /// future is a stream.
+ ///
+ /// This can be useful when stream initialization is deferred, and it is
+ /// convenient to work with that stream as if stream was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream_items = vec![17, 18, 19];
+ /// let future_of_a_stream = async { stream::iter(stream_items) };
+ ///
+ /// let stream = future_of_a_stream.flatten_stream();
+ /// let list: Vec<_> = stream.collect().await;
+ /// assert_eq!(list, vec![17, 18, 19]);
+ /// # });
+ /// ```
+ fn flatten_stream(self) -> FlattenStream<Self>
+ where
+ Self::Output: Stream,
+ Self: Sized,
+ {
+ FlattenStream::new(self)
+ }
+
+ /// Fuse a future such that `poll` will never again be called once it has
+ /// completed. This method can be used to turn any `Future` into a
+ /// `FusedFuture`.
+ ///
+ /// Normally, once a future has returned `Poll::Ready` from `poll`,
+ /// any further calls could exhibit bad behavior such as blocking
+ /// forever, panicking, never returning, etc. If it is known that `poll`
+ /// may be called too often then this method can be used to ensure that it
+ /// has defined semantics.
+ ///
+ /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready`
+ /// previously, it will return `Poll::Pending`, from `poll` again (and will
+ /// continue to do so for all future calls to `poll`).
+ ///
+ /// This combinator will drop the underlying future as soon as it has been
+ /// completed to ensure resources are reclaimed as soon as possible.
+ fn fuse(self) -> Fuse<Self>
+ where
+ Self: Sized,
+ {
+ let f = Fuse::new(self);
+ assert_future::<Self::Output, _>(f)
+ }
+
+ /// Do something with the output of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 1 };
+ /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x));
+ /// assert_eq!(new_future.await, 1);
+ /// # });
+ /// ```
+ fn inspect<F>(self, f: F) -> Inspect<Self, F>
+ where
+ F: FnOnce(&Self::Output),
+ Self: Sized,
+ {
+ assert_future::<Self::Output, _>(Inspect::new(self, f))
+ }
+
+ /// Catches unwinding panics while polling the future.
+ ///
+ /// In general, panics within a future can propagate all the way out to the
+ /// task level. This combinator makes it possible to halt unwinding within
+ /// the future itself. It's most commonly used within task executors. It's
+ /// not recommended to use this for error handling.
+ ///
+ /// Note that this method requires the `UnwindSafe` bound from the standard
+ /// library. This isn't always applied automatically, and the standard
+ /// library provides an `AssertUnwindSafe` wrapper type to apply it
+ /// after-the fact. To assist using this method, the `Future` trait is also
+ /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::{self, FutureExt, Ready};
+ ///
+ /// let future = future::ready(2);
+ /// assert!(future.catch_unwind().await.is_ok());
+ ///
+ /// let future = future::lazy(|_| -> Ready<i32> {
+ /// unimplemented!()
+ /// });
+ /// assert!(future.catch_unwind().await.is_err());
+ /// # });
+ /// ```
+ #[cfg(feature = "std")]
+ fn catch_unwind(self) -> CatchUnwind<Self>
+ where
+ Self: Sized + ::std::panic::UnwindSafe,
+ {
+ CatchUnwind::new(self)
+ }
+
+ /// Create a cloneable handle to this future where all handles will resolve
+ /// to the same result.
+ ///
+ /// The `shared` combinator method provides a method to convert any future
+ /// into a cloneable future. It enables a future to be polled by multiple
+ /// threads.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ ///
+ /// let future = async { 6 };
+ /// let shared1 = future.shared();
+ /// let shared2 = shared1.clone();
+ ///
+ /// assert_eq!(6, shared1.await);
+ /// assert_eq!(6, shared2.await);
+ /// # });
+ /// ```
+ ///
+ /// ```
+ /// // Note, unlike most examples this is written in the context of a
+ /// // synchronous function to better illustrate the cross-thread aspect of
+ /// // the `shared` combinator.
+ ///
+ /// # futures::executor::block_on(async {
+ /// use futures::future::FutureExt;
+ /// use futures::executor::block_on;
+ /// use std::thread;
+ ///
+ /// let future = async { 6 };
+ /// let shared1 = future.shared();
+ /// let shared2 = shared1.clone();
+ /// let join_handle = thread::spawn(move || {
+ /// assert_eq!(6, block_on(shared2));
+ /// });
+ /// assert_eq!(6, shared1.await);
+ /// join_handle.join().unwrap();
+ /// # });
+ /// ```
+ #[cfg(feature = "std")]
+ fn shared(self) -> Shared<Self>
+ where
+ Self: Sized,
+ Self::Output: Clone,
+ {
+ Shared::new(self)
+ }
+
+ /// Turn this future into a future that yields `()` on completion and sends
+ /// its output to another future on a separate task.
+ ///
+ /// This can be used with spawning executors to easily retrieve the result
+ /// of a future executing on a separate task or thread.
+ ///
+ /// This method is only available when the `std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "channel")]
+ #[cfg(feature = "std")]
+ fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
+ where
+ Self: Sized,
+ {
+ remote_handle::remote_handle(self)
+ }
+
+ /// Wrap the future in a Box, pinning it.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "alloc")]
+ fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
+ where
+ Self: Sized + Send + 'a,
+ {
+ Box::pin(self)
+ }
+
+ /// Wrap the future in a Box, pinning it.
+ ///
+ /// Similar to `boxed`, but without the `Send` requirement.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "alloc")]
+ fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output>
+ where
+ Self: Sized + 'a,
+ {
+ Box::pin(self)
+ }
+
+ /// Turns a [`Future<Output = T>`](Future) into a
+ /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture).
+ fn unit_error(self) -> UnitError<Self>
+ where
+ Self: Sized,
+ {
+ UnitError::new(self)
+ }
+
+ /// Turns a [`Future<Output = T>`](Future) into a
+ /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture).
+ fn never_error(self) -> NeverError<Self>
+ where
+ Self: Sized,
+ {
+ NeverError::new(self)
+ }
+
+ /// A convenience for calling `Future::poll` on `Unpin` future types.
+ fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).poll(cx)
+ }
+
+ /// Evaluates and consumes the future, returning the resulting output if
+ /// the future is ready after the first call to `Future::poll`.
+ ///
+ /// If `poll` instead returns `Poll::Pending`, `None` is returned.
+ ///
+ /// This method is useful in cases where immediacy is more important than
+ /// waiting for a result. It is also convenient for quickly obtaining
+ /// the value of a future that is known to always resolve immediately.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use futures::prelude::*;
+ /// use futures::{future::ready, future::pending};
+ /// let future_ready = ready("foobar");
+ /// let future_pending = pending::<&'static str>();
+ ///
+ /// assert_eq!(future_ready.now_or_never(), Some("foobar"));
+ /// assert_eq!(future_pending.now_or_never(), None);
+ /// ```
+ ///
+ /// In cases where it is absolutely known that a future should always
+ /// resolve immediately and never return `Poll::Pending`, this method can
+ /// be combined with `expect()`:
+ ///
+ /// ```
+ /// # use futures::{prelude::*, future::ready};
+ /// let future_ready = ready("foobar");
+ ///
+ /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar");
+ /// ```
+ fn now_or_never(mut self) -> Option<Self::Output>
+ where
+ Self: Sized,
+ {
+ let noop_waker = crate::task::noop_waker();
+ let mut cx = Context::from_waker(&noop_waker);
+
+ // SAFETY: This is safe because this method consumes the future, so `poll` is
+ // only going to be called once. Thus it doesn't matter to us if the
+ // future is `Unpin` or not.
+ let pinned = unsafe { Pin::new_unchecked(&mut self) };
+
+ match pinned.poll(&mut cx) {
+ Poll::Ready(x) => Some(x),
+ _ => None,
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/never_error.rs b/third_party/rust/futures-util/src/future/future/never_error.rs
new file mode 100644
index 0000000000..5a68e6f952
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/never_error.rs
@@ -0,0 +1,36 @@
+use crate::never::Never;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{self, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`never_error`](super::FutureExt::never_error) combinator.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct NeverError<Fut> {
+ future: Fut,
+}
+
+impl<Fut> NeverError<Fut> {
+ unsafe_pinned!(future: Fut);
+
+ pub(super) fn new(future: Fut) -> NeverError<Fut> {
+ NeverError { future }
+ }
+}
+
+impl<Fut: Unpin> Unpin for NeverError<Fut> {}
+
+impl<Fut: FusedFuture> FusedFuture for NeverError<Fut> {
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
+}
+
+impl<Fut, T> Future for NeverError<Fut>
+ where Fut: Future<Output = T>,
+{
+ type Output = Result<T, Never>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.future().poll(cx).map(Ok)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/remote_handle.rs b/third_party/rust/futures-util/src/future/future/remote_handle.rs
new file mode 100644
index 0000000000..11b2a65af7
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/remote_handle.rs
@@ -0,0 +1,114 @@
+use {
+ crate::future::{CatchUnwind, FutureExt},
+ futures_channel::oneshot::{self, Sender, Receiver},
+ futures_core::{
+ future::Future,
+ task::{Context, Poll},
+ },
+ pin_utils::{unsafe_pinned, unsafe_unpinned},
+ std::{
+ any::Any,
+ fmt,
+ panic::{self, AssertUnwindSafe},
+ pin::Pin,
+ sync::{
+ Arc,
+ atomic::{AtomicBool, Ordering},
+ },
+ thread,
+ },
+};
+
+/// The handle to a remote future returned by
+/// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this,
+/// the remote future will be woken up to be dropped by the executor.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct RemoteHandle<T> {
+ rx: Receiver<thread::Result<T>>,
+ keep_running: Arc<AtomicBool>,
+}
+
+impl<T> RemoteHandle<T> {
+ /// Drops this handle *without* canceling the underlying future.
+ ///
+ /// This method can be used if you want to drop the handle, but let the
+ /// execution continue.
+ pub fn forget(self) {
+ self.keep_running.store(true, Ordering::SeqCst);
+ }
+}
+
+impl<T: Send + 'static> Future for RemoteHandle<T> {
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ match ready!(self.rx.poll_unpin(cx)) {
+ Ok(Ok(output)) => Poll::Ready(output),
+ Ok(Err(e)) => panic::resume_unwind(e),
+ Err(e) => panic::resume_unwind(Box::new(e)),
+ }
+ }
+}
+
+type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
+
+/// A future which sends its output to the corresponding `RemoteHandle`.
+/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Remote<Fut: Future> {
+ tx: Option<Sender<SendMsg<Fut>>>,
+ keep_running: Arc<AtomicBool>,
+ future: CatchUnwind<AssertUnwindSafe<Fut>>,
+}
+
+impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("Remote")
+ .field(&self.future)
+ .finish()
+ }
+}
+
+impl<Fut: Future + Unpin> Unpin for Remote<Fut> {}
+
+impl<Fut: Future> Remote<Fut> {
+ unsafe_pinned!(future: CatchUnwind<AssertUnwindSafe<Fut>>);
+ unsafe_unpinned!(tx: Option<Sender<SendMsg<Fut>>>);
+}
+
+impl<Fut: Future> Future for Remote<Fut> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ if let Poll::Ready(_) = self.as_mut().tx().as_mut().unwrap().poll_canceled(cx) {
+ if !self.keep_running.load(Ordering::SeqCst) {
+ // Cancelled, bail out
+ return Poll::Ready(())
+ }
+ }
+
+ let output = ready!(self.as_mut().future().poll(cx));
+
+ // if the receiving end has gone away then that's ok, we just ignore the
+ // send error here.
+ drop(self.as_mut().tx().take().unwrap().send(output));
+ Poll::Ready(())
+ }
+}
+
+pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) {
+ let (tx, rx) = oneshot::channel();
+ let keep_running = Arc::new(AtomicBool::new(false));
+
+ // AssertUnwindSafe is used here because `Send + 'static` is basically
+ // an alias for an implementation of the `UnwindSafe` trait but we can't
+ // express that in the standard library right now.
+ let wrapped = Remote {
+ future: AssertUnwindSafe(future).catch_unwind(),
+ tx: Some(tx),
+ keep_running: keep_running.clone(),
+ };
+
+ (wrapped, RemoteHandle { rx, keep_running })
+}
diff --git a/third_party/rust/futures-util/src/future/future/shared.rs b/third_party/rust/futures-util/src/future/future/shared.rs
new file mode 100644
index 0000000000..816f5dd007
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/shared.rs
@@ -0,0 +1,335 @@
+use crate::task::{ArcWake, waker_ref};
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+use slab::Slab;
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
+
+/// Future for the [`shared`](super::FutureExt::shared) method.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Shared<Fut: Future> {
+ inner: Option<Arc<Inner<Fut>>>,
+ waker_key: usize,
+}
+
+struct Inner<Fut: Future> {
+ future_or_output: UnsafeCell<FutureOrOutput<Fut>>,
+ notifier: Arc<Notifier>,
+}
+
+struct Notifier {
+ state: AtomicUsize,
+ wakers: Mutex<Option<Slab<Option<Waker>>>>,
+}
+
+// The future itself is polled behind the `Arc`, so it won't be moved
+// when `Shared` is moved.
+impl<Fut: Future> Unpin for Shared<Fut> {}
+
+impl<Fut: Future> fmt::Debug for Shared<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("inner", &self.inner)
+ .field("waker_key", &self.waker_key)
+ .finish()
+ }
+}
+
+impl<Fut: Future> fmt::Debug for Inner<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Inner").finish()
+ }
+}
+
+enum FutureOrOutput<Fut: Future> {
+ Future(Fut),
+ Output(Fut::Output),
+}
+
+unsafe impl<Fut> Send for Inner<Fut>
+where
+ Fut: Future + Send,
+ Fut::Output: Send + Sync,
+{}
+
+unsafe impl<Fut> Sync for Inner<Fut>
+where
+ Fut: Future + Send,
+ Fut::Output: Send + Sync,
+{}
+
+const IDLE: usize = 0;
+const POLLING: usize = 1;
+const REPOLL: usize = 2;
+const COMPLETE: usize = 3;
+const POISONED: usize = 4;
+
+const NULL_WAKER_KEY: usize = usize::max_value();
+
+impl<Fut: Future> Shared<Fut> {
+ pub(super) fn new(future: Fut) -> Shared<Fut> {
+ let inner = Inner {
+ future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)),
+ notifier: Arc::new(Notifier {
+ state: AtomicUsize::new(IDLE),
+ wakers: Mutex::new(Some(Slab::new())),
+ }),
+ };
+
+ Shared {
+ inner: Some(Arc::new(inner)),
+ waker_key: NULL_WAKER_KEY,
+ }
+ }
+}
+
+impl<Fut> Shared<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ /// Returns [`Some`] containing a reference to this [`Shared`]'s output if
+ /// it has already been computed by a clone or [`None`] if it hasn't been
+ /// computed yet or this [`Shared`] already returned its output from
+ /// [`poll`](Future::poll).
+ pub fn peek(&self) -> Option<&Fut::Output> {
+ if let Some(inner) = self.inner.as_ref() {
+ match inner.notifier.state.load(SeqCst) {
+ COMPLETE => unsafe { return Some(inner.output()) },
+ POISONED => panic!("inner future panicked during poll"),
+ _ => {}
+ }
+ }
+ None
+ }
+
+ /// Registers the current task to receive a wakeup when `Inner` is awoken.
+ fn set_waker(&mut self, cx: &mut Context<'_>) {
+ // Acquire the lock first before checking COMPLETE to ensure there
+ // isn't a race.
+ let mut wakers_guard = if let Some(inner) = self.inner.as_ref() {
+ inner.notifier.wakers.lock().unwrap()
+ } else {
+ return;
+ };
+
+ let wakers = if let Some(wakers) = wakers_guard.as_mut() {
+ wakers
+ } else {
+ return;
+ };
+
+ if self.waker_key == NULL_WAKER_KEY {
+ self.waker_key = wakers.insert(Some(cx.waker().clone()));
+ } else {
+ let waker_slot = &mut wakers[self.waker_key];
+ let needs_replacement = if let Some(_old_waker) = waker_slot {
+ // If there's still an unwoken waker in the slot, only replace
+ // if the current one wouldn't wake the same task.
+ // TODO: This API is currently not available, so replace always
+ // !waker.will_wake_nonlocal(old_waker)
+ true
+ } else {
+ true
+ };
+ if needs_replacement {
+ *waker_slot = Some(cx.waker().clone());
+ }
+ }
+ debug_assert!(self.waker_key != NULL_WAKER_KEY);
+ }
+
+ /// Safety: callers must first ensure that `self.inner.state`
+ /// is `COMPLETE`
+ unsafe fn take_or_clone_output(&mut self) -> Fut::Output {
+ let inner = self.inner.take().unwrap();
+
+ match Arc::try_unwrap(inner) {
+ Ok(inner) => match inner.future_or_output.into_inner() {
+ FutureOrOutput::Output(item) => item,
+ FutureOrOutput::Future(_) => unreachable!(),
+ },
+ Err(inner) => inner.output().clone(),
+ }
+ }
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ /// Safety: callers must first ensure that `self.inner.state`
+ /// is `COMPLETE`
+ unsafe fn output(&self) -> &Fut::Output {
+ match &*self.future_or_output.get() {
+ FutureOrOutput::Output(ref item) => &item,
+ FutureOrOutput::Future(_) => unreachable!(),
+ }
+ }
+}
+
+impl<Fut> FusedFuture for Shared<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<Fut> Future for Shared<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
+ type Output = Fut::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+
+ this.set_waker(cx);
+
+ let inner = if let Some(inner) = this.inner.as_ref() {
+ inner
+ } else {
+ panic!("Shared future polled again after completion");
+ };
+
+ match inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) {
+ IDLE => {
+ // Lock acquired, fall through
+ }
+ POLLING | REPOLL => {
+ // Another task is currently polling, at this point we just want
+ // to ensure that the waker for this task is registered
+
+ return Poll::Pending;
+ }
+ COMPLETE => {
+ // Safety: We're in the COMPLETE state
+ return unsafe { Poll::Ready(this.take_or_clone_output()) };
+ }
+ POISONED => panic!("inner future panicked during poll"),
+ _ => unreachable!(),
+ }
+
+ let waker = waker_ref(&inner.notifier);
+ let mut cx = Context::from_waker(&waker);
+
+ struct Reset<'a>(&'a AtomicUsize);
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ use std::thread;
+
+ if thread::panicking() {
+ self.0.store(POISONED, SeqCst);
+ }
+ }
+ }
+
+ let _reset = Reset(&inner.notifier.state);
+
+ let output = loop {
+ let future = unsafe {
+ match &mut *inner.future_or_output.get() {
+ FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
+ _ => unreachable!(),
+ }
+ };
+
+ let poll = future.poll(&mut cx);
+
+ match poll {
+ Poll::Pending => {
+ let state = &inner.notifier.state;
+ match state.compare_and_swap(POLLING, IDLE, SeqCst) {
+ POLLING => {
+ // Success
+ return Poll::Pending;
+ }
+ REPOLL => {
+ // Was woken since: Gotta poll again!
+ let prev = state.swap(POLLING, SeqCst);
+ assert_eq!(prev, REPOLL);
+ }
+ _ => unreachable!(),
+ }
+ }
+ Poll::Ready(output) => break output,
+ }
+ };
+
+ unsafe {
+ *inner.future_or_output.get() =
+ FutureOrOutput::Output(output);
+ }
+
+ inner.notifier.state.store(COMPLETE, SeqCst);
+
+ // Wake all tasks and drop the slab
+ let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
+ let wakers = &mut wakers_guard.take().unwrap();
+ for (_key, opt_waker) in wakers {
+ if let Some(waker) = opt_waker.take() {
+ waker.wake();
+ }
+ }
+
+ drop(_reset); // Make borrow checker happy
+ drop(wakers_guard);
+
+ // Safety: We're in the COMPLETE state
+ unsafe { Poll::Ready(this.take_or_clone_output()) }
+ }
+}
+
+impl<Fut> Clone for Shared<Fut>
+where
+ Fut: Future,
+{
+ fn clone(&self) -> Self {
+ Shared {
+ inner: self.inner.clone(),
+ waker_key: NULL_WAKER_KEY,
+ }
+ }
+}
+
+impl<Fut> Drop for Shared<Fut>
+where
+ Fut: Future,
+{
+ fn drop(&mut self) {
+ if self.waker_key != NULL_WAKER_KEY {
+ if let Some(ref inner) = self.inner {
+ if let Ok(mut wakers) = inner.notifier.wakers.lock() {
+ if let Some(wakers) = wakers.as_mut() {
+ wakers.remove(self.waker_key);
+ }
+ }
+ }
+ }
+ }
+}
+
+impl ArcWake for Notifier {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);
+
+ let wakers = &mut *arc_self.wakers.lock().unwrap();
+ if let Some(wakers) = wakers.as_mut() {
+ for (_key, opt_waker) in wakers {
+ if let Some(waker) = opt_waker.take() {
+ waker.wake();
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/then.rs b/third_party/rust/futures-util/src/future/future/then.rs
new file mode 100644
index 0000000000..9f30f09864
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/then.rs
@@ -0,0 +1,46 @@
+use super::Chain;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`then`](super::FutureExt::then) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Then<Fut1, Fut2, F> {
+ chain: Chain<Fut1, Fut2, F>,
+}
+
+impl<Fut1, Fut2, F> Then<Fut1, Fut2, F>
+ where Fut1: Future,
+ Fut2: Future,
+{
+ unsafe_pinned!(chain: Chain<Fut1, Fut2, F>);
+
+ /// Creates a new `Then`.
+ pub(super) fn new(future: Fut1, f: F) -> Then<Fut1, Fut2, F> {
+ Then {
+ chain: Chain::new(future, f),
+ }
+ }
+}
+
+impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F>
+ where Fut1: Future,
+ Fut2: Future,
+ F: FnOnce(Fut1::Output) -> Fut2,
+{
+ fn is_terminated(&self) -> bool { self.chain.is_terminated() }
+}
+
+impl<Fut1, Fut2, F> Future for Then<Fut1, Fut2, F>
+ where Fut1: Future,
+ Fut2: Future,
+ F: FnOnce(Fut1::Output) -> Fut2,
+{
+ type Output = Fut2::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut2::Output> {
+ self.as_mut().chain().poll(cx, |output, f| f(output))
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/future/unit_error.rs b/third_party/rust/futures-util/src/future/future/unit_error.rs
new file mode 100644
index 0000000000..679e988b16
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/future/unit_error.rs
@@ -0,0 +1,35 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`unit_error`](super::FutureExt::unit_error) combinator.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct UnitError<Fut> {
+ future: Fut,
+}
+
+impl<Fut> UnitError<Fut> {
+ unsafe_pinned!(future: Fut);
+
+ pub(super) fn new(future: Fut) -> UnitError<Fut> {
+ UnitError { future }
+ }
+}
+
+impl<Fut: Unpin> Unpin for UnitError<Fut> {}
+
+impl<Fut: FusedFuture> FusedFuture for UnitError<Fut> {
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
+}
+
+impl<Fut, T> Future for UnitError<Fut>
+ where Fut: Future<Output = T>,
+{
+ type Output = Result<T, ()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, ()>> {
+ self.future().poll(cx).map(Ok)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/join.rs b/third_party/rust/futures-util/src/future/join.rs
new file mode 100644
index 0000000000..5af5b408e9
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/join.rs
@@ -0,0 +1,214 @@
+#![allow(non_snake_case)]
+
+use crate::future::{MaybeDone, maybe_done};
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+use super::assert_future;
+
+macro_rules! generate {
+ ($(
+ $(#[$doc:meta])*
+ ($Join:ident, <$($Fut:ident),*>),
+ )*) => ($(
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<$($Fut: Future),*> {
+ $($Fut: MaybeDone<$Fut>,)*
+ }
+
+ impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*>
+ where
+ $(
+ $Fut: Future + fmt::Debug,
+ $Fut::Output: fmt::Debug,
+ )*
+ {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct(stringify!($Join))
+ $(.field(stringify!($Fut), &self.$Fut))*
+ .finish()
+ }
+ }
+
+ impl<$($Fut: Future),*> $Join<$($Fut),*> {
+ fn new($($Fut: $Fut),*) -> $Join<$($Fut),*> {
+ $Join {
+ $($Fut: maybe_done($Fut)),*
+ }
+ }
+ $(
+ unsafe_pinned!($Fut: MaybeDone<$Fut>);
+ )*
+ }
+
+ impl<$($Fut: Future),*> Future for $Join<$($Fut),*> {
+ type Output = ($($Fut::Output),*);
+
+ fn poll(
+ mut self: Pin<&mut Self>, cx: &mut Context<'_>
+ ) -> Poll<Self::Output> {
+ let mut all_done = true;
+ $(
+ all_done &= self.as_mut().$Fut().poll(cx).is_ready();
+ )*
+
+ if all_done {
+ Poll::Ready(($(self.as_mut().$Fut().take_output().unwrap()), *))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+
+ impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> {
+ fn is_terminated(&self) -> bool {
+ $(
+ self.$Fut.is_terminated()
+ ) && *
+ }
+ }
+ )*)
+}
+
+generate! {
+ /// Future for the [`join`](join()) function.
+ (Join, <Fut1, Fut2>),
+
+ /// Future for the [`join3`] function.
+ (Join3, <Fut1, Fut2, Fut3>),
+
+ /// Future for the [`join4`] function.
+ (Join4, <Fut1, Fut2, Fut3, Fut4>),
+
+ /// Future for the [`join5`] function.
+ (Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
+}
+
+/// Joins the result of two futures, waiting for them both to complete.
+///
+/// This function will return a new future which awaits both futures to
+/// complete. The returned future will finish with a tuple of both results.
+///
+/// Note that this function consumes the passed futures and returns a
+/// wrapped version of it.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let pair = future::join(a, b);
+///
+/// assert_eq!(pair.await, (1, 2));
+/// # });
+/// ```
+pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>
+where
+ Fut1: Future,
+ Fut2: Future,
+{
+ let f = Join::new(future1, future2);
+ assert_future::<(Fut1::Output, Fut2::Output), _>(f)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let tuple = future::join3(a, b, c);
+///
+/// assert_eq!(tuple.await, (1, 2, 3));
+/// # });
+/// ```
+pub fn join3<Fut1, Fut2, Fut3>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+) -> Join3<Fut1, Fut2, Fut3>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+{
+ Join3::new(future1, future2, future3)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let d = async { 4 };
+/// let tuple = future::join4(a, b, c, d);
+///
+/// assert_eq!(tuple.await, (1, 2, 3, 4));
+/// # });
+/// ```
+pub fn join4<Fut1, Fut2, Fut3, Fut4>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+) -> Join4<Fut1, Fut2, Fut3, Fut4>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+ Fut4: Future,
+{
+ Join4::new(future1, future2, future3, future4)
+}
+
+/// Same as [`join`](join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = async { 1 };
+/// let b = async { 2 };
+/// let c = async { 3 };
+/// let d = async { 4 };
+/// let e = async { 5 };
+/// let tuple = future::join5(a, b, c, d, e);
+///
+/// assert_eq!(tuple.await, (1, 2, 3, 4, 5));
+/// # });
+/// ```
+pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+ future5: Fut5,
+) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5>
+where
+ Fut1: Future,
+ Fut2: Future,
+ Fut3: Future,
+ Fut4: Future,
+ Fut5: Future,
+{
+ Join5::new(future1, future2, future3, future4, future5)
+}
diff --git a/third_party/rust/futures-util/src/future/join_all.rs b/third_party/rust/futures-util/src/future/join_all.rs
new file mode 100644
index 0000000000..07408856a4
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/join_all.rs
@@ -0,0 +1,159 @@
+//! Definition of the `JoinAll` combinator, waiting for all of a list of futures
+//! to finish.
+
+use core::fmt;
+use core::future::Future;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use alloc::boxed::Box;
+use alloc::vec::Vec;
+
+#[derive(Debug)]
+enum ElemState<F>
+where
+ F: Future,
+{
+ Pending(F),
+ Done(Option<F::Output>),
+}
+
+impl<F> ElemState<F>
+where
+ F: Future,
+{
+ fn pending_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut F>> {
+ // Safety: Basic enum pin projection, no drop + optionally Unpin based
+ // on the type of this variant
+ match unsafe { self.get_unchecked_mut() } {
+ ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }),
+ ElemState::Done(_) => None,
+ }
+ }
+
+ fn take_done(self: Pin<&mut Self>) -> Option<F::Output> {
+ // Safety: Going from pin to a variant we never pin-project
+ match unsafe { self.get_unchecked_mut() } {
+ ElemState::Pending(_) => None,
+ ElemState::Done(output) => output.take(),
+ }
+ }
+}
+
+impl<F> Unpin for ElemState<F>
+where
+ F: Future + Unpin,
+{
+}
+
+fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+ // Safety: `std` _could_ make this unsound if it were to decide Pin's
+ // invariants aren't required to transmit through slices. Otherwise this has
+ // the same safety as a normal field pin projection.
+ unsafe { slice.get_unchecked_mut() }
+ .iter_mut()
+ .map(|t| unsafe { Pin::new_unchecked(t) })
+}
+
+/// Future for the [`join_all`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct JoinAll<F>
+where
+ F: Future,
+{
+ elems: Pin<Box<[ElemState<F>]>>,
+}
+
+impl<F> fmt::Debug for JoinAll<F>
+where
+ F: Future + fmt::Debug,
+ F::Output: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JoinAll")
+ .field("elems", &self.elems)
+ .finish()
+ }
+}
+
+/// Creates a future which represents a collection of the outputs of the futures
+/// given.
+///
+/// The returned future will drive execution for all of its underlying futures,
+/// collecting the results into a destination `Vec<T>` in the same order as they
+/// were provided.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # See Also
+///
+/// This is purposefully a very simple API for basic use-cases. In a lot of
+/// cases you will want to use the more powerful
+/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some
+/// examples of additional functionality that provides:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::join_all;
+///
+/// async fn foo(i: u32) -> u32 { i }
+///
+/// let futures = vec![foo(1), foo(2), foo(3)];
+///
+/// assert_eq!(join_all(futures).await, [1, 2, 3]);
+/// # });
+/// ```
+pub fn join_all<I>(i: I) -> JoinAll<I::Item>
+where
+ I: IntoIterator,
+ I::Item: Future,
+{
+ let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect();
+ JoinAll { elems: elems.into() }
+}
+
+impl<F> Future for JoinAll<F>
+where
+ F: Future,
+{
+ type Output = Vec<F::Output>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut all_done = true;
+
+ for mut elem in iter_pin_mut(self.elems.as_mut()) {
+ if let Some(pending) = elem.as_mut().pending_pin_mut() {
+ if let Poll::Ready(output) = pending.poll(cx) {
+ elem.set(ElemState::Done(Some(output)));
+ } else {
+ all_done = false;
+ }
+ }
+ }
+
+ if all_done {
+ let mut elems = mem::replace(&mut self.elems, Box::pin([]));
+ let result = iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_done().unwrap())
+ .collect();
+ Poll::Ready(result)
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+impl<F: Future> FromIterator<F> for JoinAll<F> {
+ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
+ join_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/lazy.rs b/third_party/rust/futures-util/src/future/lazy.rs
new file mode 100644
index 0000000000..5e72218d1f
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/lazy.rs
@@ -0,0 +1,54 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`lazy`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Lazy<F> {
+ f: Option<F>
+}
+
+// safe because we never generate `Pin<&mut F>`
+impl<F> Unpin for Lazy<F> {}
+
+/// Creates a new future that allows delayed execution of a closure.
+///
+/// The provided closure is only run once the future is polled.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::lazy(|_| 1);
+/// assert_eq!(a.await, 1);
+///
+/// let b = future::lazy(|_| -> i32 {
+/// panic!("oh no!")
+/// });
+/// drop(b); // closure is never run
+/// # });
+/// ```
+pub fn lazy<F, R>(f: F) -> Lazy<F>
+ where F: FnOnce(&mut Context<'_>) -> R,
+{
+ Lazy { f: Some(f) }
+}
+
+impl<F, R> FusedFuture for Lazy<F>
+ where F: FnOnce(&mut Context<'_>) -> R,
+{
+ fn is_terminated(&self) -> bool { self.f.is_none() }
+}
+
+impl<F, R> Future for Lazy<F>
+ where F: FnOnce(&mut Context<'_>) -> R,
+{
+ type Output = R;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> {
+ Poll::Ready((self.f.take().unwrap())(cx))
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/maybe_done.rs b/third_party/rust/futures-util/src/future/maybe_done.rs
new file mode 100644
index 0000000000..f16f889781
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/maybe_done.rs
@@ -0,0 +1,104 @@
+//! Definition of the MaybeDone combinator
+
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// A future that may have completed.
+///
+/// This is created by the [`maybe_done()`] function.
+#[derive(Debug)]
+pub enum MaybeDone<Fut: Future> {
+ /// A not-yet-completed future
+ Future(Fut),
+ /// The output of the completed future
+ Done(Fut::Output),
+ /// The empty variant after the result of a [`MaybeDone`] has been
+ /// taken using the [`take_output`](MaybeDone::take_output) method.
+ Gone,
+}
+
+// Safe because we never generate `Pin<&mut Fut::Output>`
+impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
+
+/// Wraps a future into a `MaybeDone`
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+/// use futures::pin_mut;
+///
+/// let future = future::maybe_done(async { 5 });
+/// pin_mut!(future);
+/// assert_eq!(future.as_mut().take_output(), None);
+/// let () = future.as_mut().await;
+/// assert_eq!(future.as_mut().take_output(), Some(5));
+/// assert_eq!(future.as_mut().take_output(), None);
+/// # });
+/// ```
+pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
+ MaybeDone::Future(future)
+}
+
+impl<Fut: Future> MaybeDone<Fut> {
+ /// Returns an [`Option`] containing a mutable reference to the output of the future.
+ /// The output of this method will be [`Some`] if and only if the inner
+ /// future has been completed and [`take_output`](MaybeDone::take_output)
+ /// has not yet been called.
+ #[inline]
+ pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
+ }
+ }
+
+ /// Attempt to take the output of a `MaybeDone` without driving it
+ /// towards completion.
+ #[inline]
+ pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(_) => {},
+ MaybeDone::Future(_) | MaybeDone::Gone => return None,
+ };
+ if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
+ Some(output)
+ } else {
+ unreachable!()
+ }
+ }
+ }
+}
+
+impl<Fut: Future> FusedFuture for MaybeDone<Fut> {
+ fn is_terminated(&self) -> bool {
+ match self {
+ MaybeDone::Future(_) => false,
+ MaybeDone::Done(_) | MaybeDone::Gone => true,
+ }
+ }
+}
+
+impl<Fut: Future> Future for MaybeDone<Fut> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let res = unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
+ MaybeDone::Done(_) => return Poll::Ready(()),
+ MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+ }
+ };
+ self.set(MaybeDone::Done(res));
+ Poll::Ready(())
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/mod.rs b/third_party/rust/futures-util/src/future/mod.rs
new file mode 100644
index 0000000000..3f4bb01436
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/mod.rs
@@ -0,0 +1,110 @@
+//! Futures
+//!
+//! This module contains a number of functions for working with `Future`s,
+//! including the [`FutureExt`] trait and the [`TryFutureExt`] trait which add
+//! methods to `Future` types.
+
+#[cfg(feature = "alloc")]
+pub use futures_core::future::{BoxFuture, LocalBoxFuture};
+pub use futures_core::future::{FusedFuture, Future, TryFuture};
+pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
+
+// Extension traits and combinators
+
+#[allow(clippy::module_inception)]
+mod future;
+pub use self::future::{
+ Flatten, FlattenStream, Fuse, FutureExt, Inspect, IntoStream, Map, NeverError, Then, UnitError,
+};
+
+#[cfg(feature = "std")]
+pub use self::future::CatchUnwind;
+
+#[cfg(feature = "channel")]
+#[cfg(feature = "std")]
+pub use self::future::{Remote, RemoteHandle};
+
+#[cfg(feature = "std")]
+pub use self::future::Shared;
+
+mod try_future;
+pub use self::try_future::{
+ AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, OrElse, TryFlattenStream,
+ TryFutureExt, UnwrapOrElse,
+};
+
+#[cfg(feature = "sink")]
+pub use self::try_future::FlattenSink;
+
+// Primitive futures
+
+mod lazy;
+pub use self::lazy::{lazy, Lazy};
+
+mod pending;
+pub use self::pending::{pending, Pending};
+
+mod maybe_done;
+pub use self::maybe_done::{maybe_done, MaybeDone};
+
+mod option;
+pub use self::option::OptionFuture;
+
+mod poll_fn;
+pub use self::poll_fn::{poll_fn, PollFn};
+
+mod ready;
+pub use self::ready::{err, ok, ready, Ready};
+
+mod join;
+pub use self::join::{join, join3, join4, join5, Join, Join3, Join4, Join5};
+
+#[cfg(feature = "alloc")]
+mod join_all;
+#[cfg(feature = "alloc")]
+pub use self::join_all::{join_all, JoinAll};
+
+mod select;
+pub use self::select::{select, Select};
+
+#[cfg(feature = "alloc")]
+mod select_all;
+#[cfg(feature = "alloc")]
+pub use self::select_all::{select_all, SelectAll};
+
+mod try_join;
+pub use self::try_join::{
+ try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5,
+};
+
+#[cfg(feature = "alloc")]
+mod try_join_all;
+#[cfg(feature = "alloc")]
+pub use self::try_join_all::{try_join_all, TryJoinAll};
+
+mod try_select;
+pub use self::try_select::{try_select, TrySelect};
+
+#[cfg(feature = "alloc")]
+mod select_ok;
+#[cfg(feature = "alloc")]
+pub use self::select_ok::{select_ok, SelectOk};
+
+mod either;
+pub use self::either::Either;
+
+cfg_target_has_atomic! {
+ #[cfg(feature = "alloc")]
+ mod abortable;
+ #[cfg(feature = "alloc")]
+ pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted};
+}
+
+// Just a helper function to ensure the futures we're returning all have the
+// right implementations.
+fn assert_future<T, F>(future: F) -> F
+where
+ F: Future<Output = T>,
+{
+ future
+}
diff --git a/third_party/rust/futures-util/src/future/option.rs b/third_party/rust/futures-util/src/future/option.rs
new file mode 100644
index 0000000000..21413525d0
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/option.rs
@@ -0,0 +1,62 @@
+//! Definition of the `Option` (optional step) combinator
+
+use core::pin::Pin;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// A future representing a value which may or may not be present.
+///
+/// Created by the [`From`] implementation for [`Option`](std::option::Option).
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::OptionFuture;
+///
+/// let mut a: OptionFuture<_> = Some(async { 123 }).into();
+/// assert_eq!(a.await, Some(123));
+///
+/// a = None.into();
+/// assert_eq!(a.await, None);
+/// # });
+/// ```
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct OptionFuture<F> {
+ option: Option<F>,
+}
+
+impl<F> OptionFuture<F> {
+ unsafe_pinned!(option: Option<F>);
+}
+
+impl<F: Future> Future for OptionFuture<F> {
+ type Output = Option<F::Output>;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ match self.option().as_pin_mut() {
+ Some(x) => x.poll(cx).map(Some),
+ None => Poll::Ready(None),
+ }
+ }
+}
+
+impl<F: FusedFuture> FusedFuture for OptionFuture<F> {
+ fn is_terminated(&self) -> bool {
+ match &self.option {
+ Some(x) => x.is_terminated(),
+ None => true,
+ }
+ }
+}
+
+impl<T> From<Option<T>> for OptionFuture<T> {
+ fn from(option: Option<T>) -> Self {
+ OptionFuture { option }
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/pending.rs b/third_party/rust/futures-util/src/future/pending.rs
new file mode 100644
index 0000000000..5a7bbb8d59
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/pending.rs
@@ -0,0 +1,56 @@
+use core::marker;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`pending()`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Pending<T> {
+ _data: marker::PhantomData<T>,
+}
+
+impl<T> FusedFuture for Pending<T> {
+ fn is_terminated(&self) -> bool {
+ true
+ }
+}
+
+/// Creates a future which never resolves, representing a computation that never
+/// finishes.
+///
+/// The returned future will forever return [`Poll::Pending`].
+///
+/// # Examples
+///
+/// ```ignore
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let future = future::pending();
+/// let () = future.await;
+/// unreachable!();
+/// # });
+/// ```
+pub fn pending<T>() -> Pending<T> {
+ Pending {
+ _data: marker::PhantomData,
+ }
+}
+
+impl<T> Future for Pending<T> {
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
+ Poll::Pending
+ }
+}
+
+impl<T> Unpin for Pending<T> {
+}
+
+impl<T> Clone for Pending<T> {
+ fn clone(&self) -> Self {
+ pending()
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/poll_fn.rs b/third_party/rust/futures-util/src/future/poll_fn.rs
new file mode 100644
index 0000000000..b7b10be85d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/poll_fn.rs
@@ -0,0 +1,56 @@
+//! Definition of the `PollFn` adapter combinator
+
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`poll_fn`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct PollFn<F> {
+ f: F,
+}
+
+impl<F> Unpin for PollFn<F> {}
+
+/// Creates a new future wrapping around a function returning [`Poll`].
+///
+/// Polling the returned future delegates to the wrapped function.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::poll_fn;
+/// use futures::task::{Context, Poll};
+///
+/// fn read_line(_cx: &mut Context<'_>) -> Poll<String> {
+/// Poll::Ready("Hello, World!".into())
+/// }
+///
+/// let read_future = poll_fn(read_line);
+/// assert_eq!(read_future.await, "Hello, World!".to_owned());
+/// # });
+/// ```
+pub fn poll_fn<T, F>(f: F) -> PollFn<F>
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>
+{
+ PollFn { f }
+}
+
+impl<F> fmt::Debug for PollFn<F> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PollFn").finish()
+ }
+}
+
+impl<T, F> Future for PollFn<F>
+ where F: FnMut(&mut Context<'_>) -> Poll<T>,
+{
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ (&mut self.f)(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/ready.rs b/third_party/rust/futures-util/src/future/ready.rs
new file mode 100644
index 0000000000..48661b3d84
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/ready.rs
@@ -0,0 +1,81 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`ready`](ready()) function.
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Ready<T>(Option<T>);
+
+impl<T> Ready<T> {
+ /// Unwraps the value from this immediately ready future.
+ #[inline]
+ pub fn into_inner(mut self) -> T {
+ self.0.take().unwrap()
+ }
+}
+
+impl<T> Unpin for Ready<T> {}
+
+impl<T> FusedFuture for Ready<T> {
+ fn is_terminated(&self) -> bool {
+ self.0.is_none()
+ }
+}
+
+impl<T> Future for Ready<T> {
+ type Output = T;
+
+ #[inline]
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
+ Poll::Ready(self.0.take().unwrap())
+ }
+}
+
+/// Creates a future that is immediately ready with a value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(1);
+/// assert_eq!(a.await, 1);
+/// # });
+/// ```
+pub fn ready<T>(t: T) -> Ready<T> {
+ Ready(Some(t))
+}
+
+/// Create a future that is immediately ready with a success value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ok::<i32, i32>(1);
+/// assert_eq!(a.await, Ok(1));
+/// # });
+/// ```
+pub fn ok<T, E>(t: T) -> Ready<Result<T, E>> {
+ Ready(Some(Ok(t)))
+}
+
+/// Create a future that is immediately ready with an error value.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::err::<i32, i32>(1);
+/// assert_eq!(a.await, Err(1));
+/// # });
+/// ```
+pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
+ Ready(Some(Err(err)))
+}
diff --git a/third_party/rust/futures-util/src/future/select.rs b/third_party/rust/futures-util/src/future/select.rs
new file mode 100644
index 0000000000..91b467dca6
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select.rs
@@ -0,0 +1,83 @@
+use core::pin::Pin;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::task::{Context, Poll};
+use crate::future::{Either, FutureExt};
+
+/// Future for the [`select()`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct Select<A, B> {
+ inner: Option<(A, B)>,
+}
+
+impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
+
+/// Waits for either one of two differently-typed futures to complete.
+///
+/// This function will return a new future which awaits for either one of both
+/// futures to complete. The returned future will finish with both the value
+/// resolved and a future representing the completion of the other work.
+///
+/// Note that this function consumes the receiving futures and returns a
+/// wrapped version of them.
+///
+/// Also note that if both this and the second future have the same
+/// output type you can use the `Either::factor_first` method to
+/// conveniently extract out the value at the end.
+///
+/// # Examples
+///
+/// ```
+/// use futures::future::{self, Either, Future, FutureExt};
+///
+/// // A poor-man's join implemented on top of select
+///
+/// fn join<A, B, E>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)>
+/// where A: Future + Unpin,
+/// B: Future + Unpin,
+/// {
+/// future::select(a, b).then(|either| {
+/// match either {
+/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(),
+/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(),
+/// }
+/// })
+/// }
+/// ```
+pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
+ where A: Future + Unpin, B: Future + Unpin
+{
+ Select { inner: Some((future1, future2)) }
+}
+
+impl<A, B> Future for Select<A, B>
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
+{
+ type Output = Either<(A::Output, B), (B::Output, A)>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
+ match a.poll_unpin(cx) {
+ Poll::Ready(x) => Poll::Ready(Either::Left((x, b))),
+ Poll::Pending => match b.poll_unpin(cx) {
+ Poll::Ready(x) => Poll::Ready(Either::Right((x, a))),
+ Poll::Pending => {
+ self.inner = Some((a, b));
+ Poll::Pending
+ }
+ }
+ }
+ }
+}
+
+impl<A, B> FusedFuture for Select<A, B>
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/select_all.rs b/third_party/rust/futures-util/src/future/select_all.rs
new file mode 100644
index 0000000000..9f7fb245bf
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select_all.rs
@@ -0,0 +1,69 @@
+use crate::future::FutureExt;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use alloc::vec::Vec;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`select_all`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct SelectAll<Fut> {
+ inner: Vec<Fut>,
+}
+
+impl<Fut: Unpin> Unpin for SelectAll<Fut> {}
+
+/// Creates a new future which will select over a list of futures.
+///
+/// The returned future will wait for any future within `iter` to be ready. Upon
+/// completion the item resolved will be returned, along with the index of the
+/// future that was ready and the list of all the remaining futures.
+///
+/// There are no guarantees provided on the order of the list with the remaining
+/// futures. They might be swapped around, reversed, or completely random.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # Panics
+///
+/// This function will panic if the iterator specified contains no items.
+pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
+ where I: IntoIterator,
+ I::Item: Future + Unpin,
+{
+ let ret = SelectAll {
+ inner: iter.into_iter().collect()
+ };
+ assert!(!ret.inner.is_empty());
+ ret
+}
+
+impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
+ type Output = (Fut::Output, usize, Vec<Fut>);
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
+ match f.poll_unpin(cx) {
+ Poll::Pending => None,
+ Poll::Ready(e) => Some((i, e)),
+ }
+ });
+ match item {
+ Some((idx, res)) => {
+ let _ = self.inner.swap_remove(idx);
+ let rest = mem::replace(&mut self.inner, Vec::new());
+ Poll::Ready((res, idx, rest))
+ }
+ None => Poll::Pending,
+ }
+ }
+}
+
+impl<Fut: Future + Unpin> FromIterator<Fut> for SelectAll<Fut> {
+ fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
+ select_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/select_ok.rs b/third_party/rust/futures-util/src/future/select_ok.rs
new file mode 100644
index 0000000000..7f4f4d65f4
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/select_ok.rs
@@ -0,0 +1,83 @@
+use crate::future::TryFutureExt;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use alloc::vec::Vec;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+
+/// Future for the [`select_ok`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct SelectOk<Fut> {
+ inner: Vec<Fut>,
+}
+
+impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
+
+/// Creates a new future which will select the first successful future over a list of futures.
+///
+/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike
+/// `select_all`, this will only return the first successful completion, or the last
+/// failure. This is useful in contexts where any success is desired and failures
+/// are ignored, unless all the futures fail.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # Panics
+///
+/// This function will panic if the iterator specified contains no items.
+pub fn select_ok<I>(iter: I) -> SelectOk<I::Item>
+ where I: IntoIterator,
+ I::Item: TryFuture + Unpin,
+{
+ let ret = SelectOk {
+ inner: iter.into_iter().collect()
+ };
+ assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
+ ret
+}
+
+impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
+ type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // loop until we've either exhausted all errors, a success was hit, or nothing is ready
+ loop {
+ let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
+ match f.try_poll_unpin(cx) {
+ Poll::Pending => None,
+ Poll::Ready(e) => Some((i, e)),
+ }
+ });
+ match item {
+ Some((idx, res)) => {
+ // always remove Ok or Err, if it's not the last Err continue looping
+ drop(self.inner.remove(idx));
+ match res {
+ Ok(e) => {
+ let rest = mem::replace(&mut self.inner, Vec::new());
+ return Poll::Ready(Ok((e, rest)))
+ }
+ Err(e) => {
+ if self.inner.is_empty() {
+ return Poll::Ready(Err(e))
+ }
+ }
+ }
+ }
+ None => {
+ // based on the filter above, nothing is ready, return
+ return Poll::Pending
+ }
+ }
+ }
+ }
+}
+
+impl<Fut: TryFuture + Unpin> FromIterator<Fut> for SelectOk<Fut> {
+ fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
+ select_ok(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/and_then.rs b/third_party/rust/futures-util/src/future/try_future/and_then.rs
new file mode 100644
index 0000000000..37333e0503
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/and_then.rs
@@ -0,0 +1,53 @@
+use super::{TryChain, TryChainAction};
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`and_then`](super::TryFutureExt::and_then) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct AndThen<Fut1, Fut2, F> {
+ try_chain: TryChain<Fut1, Fut2, F>,
+}
+
+impl<Fut1, Fut2, F> AndThen<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture,
+{
+ unsafe_pinned!(try_chain: TryChain<Fut1, Fut2, F>);
+
+ /// Creates a new `Then`.
+ pub(super) fn new(future: Fut1, f: F) -> AndThen<Fut1, Fut2, F> {
+ AndThen {
+ try_chain: TryChain::new(future, f),
+ }
+ }
+}
+
+impl<Fut1, Fut2, F> FusedFuture for AndThen<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ F: FnOnce(Fut1::Ok) -> Fut2,
+{
+ fn is_terminated(&self) -> bool {
+ self.try_chain.is_terminated()
+ }
+}
+
+impl<Fut1, Fut2, F> Future for AndThen<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ F: FnOnce(Fut1::Ok) -> Fut2,
+{
+ type Output = Result<Fut2::Ok, Fut2::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.try_chain().poll(cx, |result, async_op| {
+ match result {
+ Ok(ok) => TryChainAction::Future(async_op(ok)),
+ Err(err) => TryChainAction::Output(Err(err)),
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/err_into.rs b/third_party/rust/futures-util/src/future/try_future/err_into.rs
new file mode 100644
index 0000000000..731fcae39e
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/err_into.rs
@@ -0,0 +1,48 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`err_into`](super::TryFutureExt::err_into) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ErrInto<Fut, E> {
+ future: Fut,
+ _marker: PhantomData<E>,
+}
+
+impl<Fut: Unpin, E> Unpin for ErrInto<Fut, E> {}
+
+impl<Fut, E> ErrInto<Fut, E> {
+ unsafe_pinned!(future: Fut);
+
+ pub(super) fn new(future: Fut) -> ErrInto<Fut, E> {
+ ErrInto {
+ future,
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl<Fut, E> FusedFuture for ErrInto<Fut, E>
+ where Fut: TryFuture + FusedFuture,
+ Fut::Error: Into<E>,
+{
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
+}
+
+impl<Fut, E> Future for ErrInto<Fut, E>
+ where Fut: TryFuture,
+ Fut::Error: Into<E>,
+{
+ type Output = Result<Fut::Ok, E>;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.future().try_poll(cx)
+ .map(|res| res.map_err(Into::into))
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs b/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs
new file mode 100644
index 0000000000..d6863dd2cd
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs
@@ -0,0 +1,76 @@
+use super::FlattenStreamSink;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_utils::unsafe_pinned;
+
+/// Sink for the [`flatten_sink`](super::TryFutureExt::flatten_sink) method.
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct FlattenSink<Fut, Si>
+where
+ Fut: TryFuture<Ok = Si>,
+{
+ inner: FlattenStreamSink<Fut>,
+}
+
+impl<Fut, Si> FlattenSink<Fut, Si>
+where
+ Fut: TryFuture<Ok = Si>,
+{
+ unsafe_pinned!(inner: FlattenStreamSink<Fut>);
+
+ pub(super) fn new(future: Fut) -> Self {
+ Self {
+ inner: FlattenStreamSink::new(future),
+ }
+ }
+}
+
+impl<Fut, S> FusedStream for FlattenSink<Fut, S>
+where
+ Fut: TryFuture<Ok = S>,
+ S: TryStream<Error = Fut::Error> + FusedStream,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<Fut, S> Stream for FlattenSink<Fut, S>
+where
+ Fut: TryFuture<Ok = S>,
+ S: TryStream<Error = Fut::Error>,
+{
+ type Item = Result<S::Ok, Fut::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner().poll_next(cx)
+ }
+}
+
+impl<Fut, Si, Item> Sink<Item> for FlattenSink<Fut, Si>
+where
+ Fut: TryFuture<Ok = Si>,
+ Si: Sink<Item, Error = Fut::Error>,
+{
+ type Error = Fut::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.inner().start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs b/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs
new file mode 100644
index 0000000000..5a56bf708d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs
@@ -0,0 +1,181 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_utils::unsafe_pinned;
+
+#[must_use = "streams do nothing unless polled"]
+pub(crate) struct FlattenStreamSink<Fut>
+where
+ Fut: TryFuture,
+{
+ state: State<Fut, Fut::Ok>,
+}
+
+impl<Fut> Unpin for FlattenStreamSink<Fut>
+where
+ Fut: TryFuture + Unpin,
+ Fut::Ok: Unpin,
+{
+}
+
+impl<Fut> fmt::Debug for FlattenStreamSink<Fut>
+where
+ Fut: TryFuture + fmt::Debug,
+ Fut::Ok: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenStreamSink")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+
+impl<Fut> FlattenStreamSink<Fut>
+where
+ Fut: TryFuture,
+{
+ unsafe_pinned!(state: State<Fut, Fut::Ok>);
+
+ pub(crate) fn new(future: Fut) -> Self {
+ Self {
+ state: State::Future(future),
+ }
+ }
+}
+
+#[derive(Debug)]
+enum State<Fut, S> {
+ // future is not yet called or called and not ready
+ Future(Fut),
+ // future resolved to Stream or Sink
+ StreamOrSink(S),
+ // future resolved to error
+ Done,
+}
+
+impl<Fut, S> State<Fut, S> {
+ fn get_pin_mut(self: Pin<&mut Self>) -> State<Pin<&mut Fut>, Pin<&mut S>> {
+ // safety: data is never moved via the resulting &mut reference
+ match unsafe { self.get_unchecked_mut() } {
+ // safety: the future we're re-pinning here will never be moved;
+ // it will just be polled, then dropped in place
+ State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }),
+ // safety: the stream we're repinning here will never be moved;
+ // it will just be polled, then dropped in place
+ State::StreamOrSink(s) => State::StreamOrSink(unsafe { Pin::new_unchecked(s) }),
+ State::Done => State::Done,
+ }
+ }
+}
+
+impl<Fut> State<Fut, Fut::Ok>
+where
+ Fut: TryFuture,
+{
+ fn poll_future(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Fut::Error>> {
+ if let State::Future(f) = self.as_mut().get_pin_mut() {
+ match ready!(f.try_poll(cx)) {
+ Ok(s) => {
+ // Future resolved to stream.
+ // We do not return, but poll that
+ // stream in the next loop iteration.
+ self.set(State::StreamOrSink(s));
+ }
+ Err(e) => {
+ // Future resolved to error.
+ // We have neither a pollable stream nor a future.
+ self.set(State::Done);
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<Fut> FusedStream for FlattenStreamSink<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error> + FusedStream,
+{
+ fn is_terminated(&self) -> bool {
+ match &self.state {
+ State::Future(_) => false,
+ State::StreamOrSink(stream) => stream.is_terminated(),
+ State::Done => true,
+ }
+ }
+}
+
+impl<Fut> Stream for FlattenStreamSink<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
+{
+ type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ ready!(self.as_mut().state().poll_future(cx)?);
+ match self.as_mut().state().get_pin_mut() {
+ State::StreamOrSink(s) => s.try_poll_next(cx),
+ State::Done => Poll::Ready(None),
+ State::Future(_) => unreachable!(),
+ }
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<Fut, Item> Sink<Item> for FlattenStreamSink<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: Sink<Item, Error = Fut::Error>,
+{
+ type Error = Fut::Error;
+
+ fn poll_ready(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().state().poll_future(cx)?);
+ match self.as_mut().state().get_pin_mut() {
+ State::StreamOrSink(s) => s.poll_ready(cx),
+ State::Done => panic!("poll_ready called after eof"),
+ State::Future(_) => unreachable!(),
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ match self.state().get_pin_mut() {
+ State::StreamOrSink(s) => s.start_send(item),
+ State::Future(_) => panic!("poll_ready not called first"),
+ State::Done => panic!("start_send called after eof"),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.state().get_pin_mut() {
+ State::StreamOrSink(s) => s.poll_flush(cx),
+ // if sink not yet resolved, nothing written ==> everything flushed
+ State::Future(_) => Poll::Ready(Ok(())),
+ State::Done => panic!("poll_flush called after eof"),
+ }
+ }
+
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ let res = match self.as_mut().state().get_pin_mut() {
+ State::StreamOrSink(s) => s.poll_close(cx),
+ State::Future(_) | State::Done => Poll::Ready(Ok(())),
+ };
+ if res.is_ready() {
+ self.as_mut().state().set(State::Done);
+ }
+ res
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/inspect_err.rs b/third_party/rust/futures-util/src/future/try_future/inspect_err.rs
new file mode 100644
index 0000000000..8700337bb2
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/inspect_err.rs
@@ -0,0 +1,53 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct InspectErr<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut: Unpin, F> Unpin for InspectErr<Fut, F> {}
+
+impl<Fut, F> InspectErr<Fut, F>
+where
+ Fut: TryFuture,
+ F: FnOnce(&Fut::Error),
+{
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ pub(super) fn new(future: Fut, f: F) -> Self {
+ Self { future, f: Some(f) }
+ }
+}
+
+impl<Fut, F> FusedFuture for InspectErr<Fut, F>
+where
+ Fut: TryFuture + FusedFuture,
+ F: FnOnce(&Fut::Error),
+{
+ fn is_terminated(&self) -> bool {
+ self.future.is_terminated()
+ }
+}
+
+impl<Fut, F> Future for InspectErr<Fut, F>
+where
+ Fut: TryFuture,
+ F: FnOnce(&Fut::Error),
+{
+ type Output = Result<Fut::Ok, Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let e = ready!(self.as_mut().future().try_poll(cx));
+ if let Err(e) = &e {
+ self.as_mut().f().take().expect("cannot poll InspectErr twice")(e);
+ }
+ Poll::Ready(e)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs b/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs
new file mode 100644
index 0000000000..3d0a972226
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs
@@ -0,0 +1,53 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct InspectOk<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut: Unpin, F> Unpin for InspectOk<Fut, F> {}
+
+impl<Fut, F> InspectOk<Fut, F>
+where
+ Fut: TryFuture,
+ F: FnOnce(&Fut::Ok),
+{
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ pub(super) fn new(future: Fut, f: F) -> Self {
+ Self { future, f: Some(f) }
+ }
+}
+
+impl<Fut, F> FusedFuture for InspectOk<Fut, F>
+where
+ Fut: TryFuture + FusedFuture,
+ F: FnOnce(&Fut::Ok),
+{
+ fn is_terminated(&self) -> bool {
+ self.future.is_terminated()
+ }
+}
+
+impl<Fut, F> Future for InspectOk<Fut, F>
+where
+ Fut: TryFuture,
+ F: FnOnce(&Fut::Ok),
+{
+ type Output = Result<Fut::Ok, Fut::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let e = ready!(self.as_mut().future().try_poll(cx));
+ if let Ok(e) = &e {
+ self.as_mut().f().take().expect("cannot poll InspectOk twice")(e);
+ }
+ Poll::Ready(e)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/into_future.rs b/third_party/rust/futures-util/src/future/try_future/into_future.rs
new file mode 100644
index 0000000000..a766d5b66d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/into_future.rs
@@ -0,0 +1,36 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`into_future`](super::TryFutureExt::into_future) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct IntoFuture<Fut> {
+ future: Fut,
+}
+
+impl<Fut> IntoFuture<Fut> {
+ unsafe_pinned!(future: Fut);
+
+ #[inline]
+ pub(super) fn new(future: Fut) -> IntoFuture<Fut> {
+ IntoFuture { future }
+ }
+}
+
+impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> {
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
+}
+
+impl<Fut: TryFuture> Future for IntoFuture<Fut> {
+ type Output = Result<Fut::Ok, Fut::Error>;
+
+ #[inline]
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.future().try_poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/map_err.rs b/third_party/rust/futures-util/src/future/try_future/map_err.rs
new file mode 100644
index 0000000000..8edebad86d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/map_err.rs
@@ -0,0 +1,52 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`map_err`](super::TryFutureExt::map_err) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct MapErr<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut, F> MapErr<Fut, F> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ /// Creates a new MapErr.
+ pub(super) fn new(future: Fut, f: F) -> MapErr<Fut, F> {
+ MapErr { future, f: Some(f) }
+ }
+}
+
+impl<Fut: Unpin, F> Unpin for MapErr<Fut, F> {}
+
+impl<Fut, F, E> FusedFuture for MapErr<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Error) -> E,
+{
+ fn is_terminated(&self) -> bool { self.f.is_none() }
+}
+
+impl<Fut, F, E> Future for MapErr<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Error) -> E,
+{
+ type Output = Result<Fut::Ok, E>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.as_mut()
+ .future()
+ .try_poll(cx)
+ .map(|result| {
+ let f = self.as_mut().f().take()
+ .expect("MapErr must not be polled after it returned `Poll::Ready`");
+ result.map_err(f)
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/map_ok.rs b/third_party/rust/futures-util/src/future/try_future/map_ok.rs
new file mode 100644
index 0000000000..ab28f1443f
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/map_ok.rs
@@ -0,0 +1,54 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`map_ok`](super::TryFutureExt::map_ok) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct MapOk<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut, F> MapOk<Fut, F> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ /// Creates a new MapOk.
+ pub(super) fn new(future: Fut, f: F) -> MapOk<Fut, F> {
+ MapOk { future, f: Some(f) }
+ }
+}
+
+impl<Fut: Unpin, F> Unpin for MapOk<Fut, F> {}
+
+impl<Fut, F, T> FusedFuture for MapOk<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Ok) -> T,
+{
+ fn is_terminated(&self) -> bool {
+ self.f.is_none()
+ }
+}
+
+impl<Fut, F, T> Future for MapOk<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Ok) -> T,
+{
+ type Output = Result<T, Fut::Error>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.as_mut()
+ .future()
+ .try_poll(cx)
+ .map(|result| {
+ let op = self.as_mut().f().take()
+ .expect("MapOk must not be polled after it returned `Poll::Ready`");
+ result.map(op)
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs b/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs
new file mode 100644
index 0000000000..730b67922c
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs
@@ -0,0 +1,59 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`map_ok_or_else`](super::TryFutureExt::map_ok_or_else) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct MapOkOrElse<Fut, F, E> {
+ future: Fut,
+ f: Option<F>,
+ e: Option<E>,
+}
+
+impl<Fut, F, E> MapOkOrElse<Fut, F, E> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+ unsafe_unpinned!(e: Option<E>);
+
+ /// Creates a new MapOkOrElse.
+ pub(super) fn new(future: Fut, e: E, f: F) -> Self {
+ Self { future, f: Some(f), e: Some(e) }
+ }
+}
+
+impl<Fut: Unpin, F, E> Unpin for MapOkOrElse<Fut, F, E> {}
+
+impl<Fut, F, E, T> FusedFuture for MapOkOrElse<Fut, F, E>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Ok) -> T,
+ E: FnOnce(Fut::Error) -> T,
+{
+ fn is_terminated(&self) -> bool {
+ self.f.is_none() || self.e.is_none()
+ }
+}
+
+impl<Fut, F, E, T> Future for MapOkOrElse<Fut, F, E>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Ok) -> T,
+ E: FnOnce(Fut::Error) -> T,
+{
+ type Output = T;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.as_mut()
+ .future()
+ .try_poll(cx)
+ .map(|result| {
+ match result {
+ Ok(i) => (self.as_mut().f().take().expect("MapOkOrElse must not be polled after it returned `Poll::Ready`"))(i),
+ Err(e) => (self.as_mut().e().take().expect("MapOkOrElse must not be polled after it returned `Poll::Ready`"))(e),
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/mod.rs b/third_party/rust/futures-util/src/future/try_future/mod.rs
new file mode 100644
index 0000000000..e8e059e373
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/mod.rs
@@ -0,0 +1,536 @@
+//! Futures
+//!
+//! This module contains a number of functions for working with `Future`s,
+//! including the `FutureExt` trait which adds methods to `Future` types.
+
+#[cfg(feature = "compat")]
+use crate::compat::Compat;
+use core::pin::Pin;
+use futures_core::{
+ future::TryFuture,
+ stream::TryStream,
+ task::{Context, Poll},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+// Combinators
+
+mod and_then;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::and_then::AndThen;
+
+mod err_into;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::err_into::ErrInto;
+
+#[cfg(feature = "sink")]
+mod flatten_sink;
+#[cfg(feature = "sink")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::flatten_sink::FlattenSink;
+
+mod inspect_ok;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::inspect_ok::InspectOk;
+
+mod inspect_err;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::inspect_err::InspectErr;
+
+mod into_future;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::into_future::IntoFuture;
+
+mod map_err;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::map_err::MapErr;
+
+mod map_ok;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::map_ok::MapOk;
+
+mod map_ok_or_else;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::map_ok_or_else::MapOkOrElse;
+
+mod or_else;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::or_else::OrElse;
+
+mod try_flatten_stream;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten_stream::TryFlattenStream;
+
+mod unwrap_or_else;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::unwrap_or_else::UnwrapOrElse;
+
+// Implementation details
+
+mod flatten_stream_sink;
+pub(crate) use self::flatten_stream_sink::FlattenStreamSink;
+
+mod try_chain;
+pub(crate) use self::try_chain::{TryChain, TryChainAction};
+
+impl<Fut: ?Sized + TryFuture> TryFutureExt for Fut {}
+
+/// Adapters specific to [`Result`]-returning futures
+pub trait TryFutureExt: TryFuture {
+ /// Flattens the execution of this future when the successful result of this
+ /// future is a [`Sink`].
+ ///
+ /// This can be useful when sink initialization is deferred, and it is
+ /// convenient to work with that sink as if the sink was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::{Future, TryFutureExt};
+ /// use futures::sink::Sink;
+ /// # use futures::channel::mpsc::{self, SendError};
+ /// # type T = i32;
+ /// # type E = SendError;
+ ///
+ /// fn make_sink_async() -> impl Future<Output = Result<
+ /// impl Sink<T, Error = E>,
+ /// E,
+ /// >> { // ... }
+ /// # let (tx, _rx) = mpsc::unbounded::<i32>();
+ /// # futures::future::ready(Ok(tx))
+ /// # }
+ /// fn take_sink(sink: impl Sink<T, Error = E>) { /* ... */ }
+ ///
+ /// let fut = make_sink_async();
+ /// take_sink(fut.flatten_sink())
+ /// ```
+ #[cfg(feature = "sink")]
+ fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
+ where
+ Self::Ok: Sink<Item, Error = Self::Error>,
+ Self: Sized,
+ {
+ FlattenSink::new(self)
+ }
+
+ /// Maps this future's success value to a different value.
+ ///
+ /// This method can be used to change the [`Ok`](TryFuture::Ok) type of the
+ /// future into a different type. It is similar to the [`Result::map`]
+ /// method. You can use this method to chain along a computation once the
+ /// future has been resolved.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.map_ok(|x| x + 3);
+ /// assert_eq!(future.await, Ok(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`map_ok`](TryFutureExt::map_ok) on an errored future has no
+ /// effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.map_ok(|x| x + 3);
+ /// assert_eq!(future.await, Err(1));
+ /// # });
+ /// ```
+ fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
+ where
+ F: FnOnce(Self::Ok) -> T,
+ Self: Sized,
+ {
+ MapOk::new(self, f)
+ }
+
+ /// Maps this future's success value to a different value, and permits for error handling resulting in the same type.
+ ///
+ /// This method can be used to coalesce your [`Ok`](TryFuture::Ok) type and [`Error`](TryFuture::Error) into another type,
+ /// where that type is the same for both outcomes.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// The provided closure `e` will only be called if this future is resolved
+ /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(5) };
+ /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
+ /// assert_eq!(future.await, 8);
+ ///
+ /// let future = async { Err::<i32, i32>(5) };
+ /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
+ /// assert_eq!(future.await, 10);
+ /// # });
+ /// ```
+ ///
+ fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
+ where
+ F: FnOnce(Self::Ok) -> T,
+ E: FnOnce(Self::Error) -> T,
+ Self: Sized,
+ {
+ MapOkOrElse::new(self, e, f)
+ }
+
+ /// Maps this future's error value to a different value.
+ ///
+ /// This method can be used to change the [`Error`](TryFuture::Error) type
+ /// of the future into a different type. It is similar to the
+ /// [`Result::map_err`] method. You can use this method for example to
+ /// ensure that futures have the same [`Error`](TryFuture::Error) type when
+ /// using [`select!`] or [`join!`].
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
+ /// the provided closure will never be invoked.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.map_err(|x| x + 3);
+ /// assert_eq!(future.await, Err(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`map_err`](TryFutureExt::map_err) on a successful future has
+ /// no effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.map_err(|x| x + 3);
+ /// assert_eq!(future.await, Ok(1));
+ /// # });
+ /// ```
+ fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
+ where
+ F: FnOnce(Self::Error) -> E,
+ Self: Sized,
+ {
+ MapErr::new(self, f)
+ }
+
+ /// Maps this future's [`Error`](TryFuture::Error) to a new error type
+ /// using the [`Into`](std::convert::Into) trait.
+ ///
+ /// This method does for futures what the `?`-operator does for
+ /// [`Result`]: It lets the compiler infer the type of the resulting
+ /// error. Just as [`map_err`](TryFutureExt::map_err), this is useful for
+ /// example to ensure that futures have the same [`Error`](TryFuture::Error)
+ /// type when using [`select!`] or [`join!`].
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future_err_u8 = async { Err::<(), u8>(1) };
+ /// let future_err_i32 = future_err_u8.err_into::<i32>();
+ /// # });
+ /// ```
+ fn err_into<E>(self) -> ErrInto<Self, E>
+ where
+ Self: Sized,
+ Self::Error: Into<E>,
+ {
+ ErrInto::new(self)
+ }
+
+ /// Executes another future after this one resolves successfully. The
+ /// success value is passed to a closure to create this subsequent future.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Ok`]. If this future resolves to an [`Err`], panics, or is
+ /// dropped, then the provided closure will never be invoked. The
+ /// [`Error`](TryFuture::Error) type of this future and the future
+ /// returned by `f` have to match.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.and_then(|x| async move { Ok::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Ok(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`and_then`](TryFutureExt::and_then) on an errored future has no
+ /// effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.and_then(|x| async move { Err::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Err(1));
+ /// # });
+ /// ```
+ fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
+ where
+ F: FnOnce(Self::Ok) -> Fut,
+ Fut: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ AndThen::new(self, f)
+ }
+
+ /// Executes another future if this one resolves to an error. The
+ /// error value is passed to a closure to create this subsequent future.
+ ///
+ /// The provided closure `f` will only be called if this future is resolved
+ /// to an [`Err`]. If this future resolves to an [`Ok`], panics, or is
+ /// dropped, then the provided closure will never be invoked. The
+ /// [`Ok`](TryFuture::Ok) type of this future and the future returned by `f`
+ /// have to match.
+ ///
+ /// Note that this method consumes the future it is called on and returns a
+ /// wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<i32, i32>(1) };
+ /// let future = future.or_else(|x| async move { Err::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Err(4));
+ /// # });
+ /// ```
+ ///
+ /// Calling [`or_else`](TryFutureExt::or_else) on a successful future has
+ /// no effect:
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Ok::<i32, i32>(1) };
+ /// let future = future.or_else(|x| async move { Ok::<i32, i32>(x + 3) });
+ /// assert_eq!(future.await, Ok(1));
+ /// # });
+ /// ```
+ fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
+ where
+ F: FnOnce(Self::Error) -> Fut,
+ Fut: TryFuture<Ok = Self::Ok>,
+ Self: Sized,
+ {
+ OrElse::new(self, f)
+ }
+
+ /// Do something with the success value of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect_ok`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ ///
+ /// let future = async { Ok::<_, ()>(1) };
+ /// let new_future = future.inspect_ok(|&x| println!("about to resolve: {}", x));
+ /// assert_eq!(new_future.await, Ok(1));
+ /// # });
+ /// ```
+ fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
+ where
+ F: FnOnce(&Self::Ok),
+ Self: Sized,
+ {
+ InspectOk::new(self, f)
+ }
+
+ /// Do something with the error value of a future before passing it on.
+ ///
+ /// When using futures, you'll often chain several of them together. While
+ /// working on such code, you might want to check out what's happening at
+ /// various parts in the pipeline, without consuming the intermediate
+ /// value. To do that, insert a call to `inspect_err`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ ///
+ /// let future = async { Err::<(), _>(1) };
+ /// let new_future = future.inspect_err(|&x| println!("about to error: {}", x));
+ /// assert_eq!(new_future.await, Err(1));
+ /// # });
+ /// ```
+ fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
+ where
+ F: FnOnce(&Self::Error),
+ Self: Sized,
+ {
+ InspectErr::new(self, f)
+ }
+
+ /// Flatten the execution of this future when the successful result of this
+ /// future is a stream.
+ ///
+ /// This can be useful when stream initialization is deferred, and it is
+ /// convenient to work with that stream as if stream was available at the
+ /// call site.
+ ///
+ /// Note that this function consumes this future and returns a wrapped
+ /// version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::TryFutureExt;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let stream_items = vec![17, 18, 19].into_iter().map(Ok);
+ /// let future_of_a_stream = async { Ok::<_, ()>(stream::iter(stream_items)) };
+ ///
+ /// let stream = future_of_a_stream.try_flatten_stream();
+ /// let list = stream.try_collect::<Vec<_>>().await;
+ /// assert_eq!(list, Ok(vec![17, 18, 19]));
+ /// # });
+ /// ```
+ fn try_flatten_stream(self) -> TryFlattenStream<Self>
+ where
+ Self::Ok: TryStream<Error = Self::Error>,
+ Self: Sized,
+ {
+ TryFlattenStream::new(self)
+ }
+
+ /// Unwraps this future's ouput, producing a future with this future's
+ /// [`Ok`](TryFuture::Ok) type as its
+ /// [`Output`](std::future::Future::Output) type.
+ ///
+ /// If this future is resolved successfully, the returned future will
+ /// contain the original future's success value as output. Otherwise, the
+ /// closure `f` is called with the error value to produce an alternate
+ /// success value.
+ ///
+ /// This method is similar to the [`Result::unwrap_or_else`] method.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::TryFutureExt;
+ ///
+ /// # futures::executor::block_on(async {
+ /// let future = async { Err::<(), &str>("Boom!") };
+ /// let future = future.unwrap_or_else(|_| ());
+ /// assert_eq!(future.await, ());
+ /// # });
+ /// ```
+ fn unwrap_or_else<F>(self, f: F) -> UnwrapOrElse<Self, F>
+ where
+ Self: Sized,
+ F: FnOnce(Self::Error) -> Self::Ok,
+ {
+ UnwrapOrElse::new(self, f)
+ }
+
+ /// Wraps a [`TryFuture`] into a future compatable with libraries using
+ /// futures 0.1 future definitons. Requires the `compat` feature to enable.
+ #[cfg(feature = "compat")]
+ fn compat(self) -> Compat<Self>
+ where
+ Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+
+ /// Wraps a [`TryFuture`] into a type that implements
+ /// [`Future`](std::future::Future).
+ ///
+ /// [`TryFuture`]s currently do not implement the
+ /// [`Future`](std::future::Future) trait due to limitations of the
+ /// compiler.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::future::{Future, TryFuture, TryFutureExt};
+ ///
+ /// # type T = i32;
+ /// # type E = ();
+ /// fn make_try_future() -> impl TryFuture<Ok = T, Error = E> { // ... }
+ /// # async { Ok::<i32, ()>(1) }
+ /// # }
+ /// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ }
+ ///
+ /// take_future(make_try_future().into_future());
+ /// ```
+ fn into_future(self) -> IntoFuture<Self>
+ where
+ Self: Sized,
+ {
+ IntoFuture::new(self)
+ }
+
+ /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
+ /// future types.
+ fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).try_poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/or_else.rs b/third_party/rust/futures-util/src/future/try_future/or_else.rs
new file mode 100644
index 0000000000..a9c006fa9f
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/or_else.rs
@@ -0,0 +1,56 @@
+use super::{TryChain, TryChainAction};
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+/// Future for the [`or_else`](super::TryFutureExt::or_else) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct OrElse<Fut1, Fut2, F> {
+ try_chain: TryChain<Fut1, Fut2, F>,
+}
+
+impl<Fut1, Fut2, F> OrElse<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture,
+{
+ unsafe_pinned!(try_chain: TryChain<Fut1, Fut2, F>);
+
+ /// Creates a new `Then`.
+ pub(super) fn new(future: Fut1, f: F) -> OrElse<Fut1, Fut2, F> {
+ OrElse {
+ try_chain: TryChain::new(future, f),
+ }
+ }
+}
+
+impl<Fut1, Fut2, F> FusedFuture for OrElse<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture<Ok = Fut1::Ok>,
+ F: FnOnce(Fut1::Error) -> Fut2,
+{
+ fn is_terminated(&self) -> bool {
+ self.try_chain.is_terminated()
+ }
+}
+
+impl<Fut1, Fut2, F> Future for OrElse<Fut1, Fut2, F>
+ where Fut1: TryFuture,
+ Fut2: TryFuture<Ok = Fut1::Ok>,
+ F: FnOnce(Fut1::Error) -> Fut2,
+{
+ type Output = Result<Fut2::Ok, Fut2::Error>;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.try_chain().poll(cx, |result, async_op| {
+ match result {
+ Ok(ok) => TryChainAction::Output(Ok(ok)),
+ Err(err) => TryChainAction::Future(async_op(err)),
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/try_chain.rs b/third_party/rust/futures-util/src/future/try_future/try_chain.rs
new file mode 100644
index 0000000000..662bdf2d26
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/try_chain.rs
@@ -0,0 +1,108 @@
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::task::{Context, Poll};
+
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub(crate) enum TryChain<Fut1, Fut2, Data> {
+ First(Fut1, Option<Data>),
+ Second(Fut2),
+ Empty,
+}
+
+impl<Fut1: Unpin, Fut2: Unpin, Data> Unpin for TryChain<Fut1, Fut2, Data> {}
+
+pub(crate) enum TryChainAction<Fut2>
+ where Fut2: TryFuture,
+{
+ Future(Fut2),
+ Output(Result<Fut2::Ok, Fut2::Error>),
+}
+
+impl<Fut1, Fut2, Data> TryChain<Fut1, Fut2, Data>
+ where Fut1: TryFuture,
+ Fut2: TryFuture,
+{
+ pub(crate) fn new(fut1: Fut1, data: Data) -> TryChain<Fut1, Fut2, Data> {
+ TryChain::First(fut1, Some(data))
+ }
+
+ pub(crate) fn is_terminated(&self) -> bool {
+ match self {
+ TryChain::First(..) | TryChain::Second(_) => false,
+ TryChain::Empty => true,
+ }
+ }
+
+ pub(crate) fn poll<F>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ f: F,
+ ) -> Poll<Result<Fut2::Ok, Fut2::Error>>
+ where F: FnOnce(Result<Fut1::Ok, Fut1::Error>, Data) -> TryChainAction<Fut2>,
+ {
+ let mut f = Some(f);
+
+ // Safe to call `get_unchecked_mut` because we won't move the futures.
+ let this = unsafe { self.get_unchecked_mut() };
+
+ loop {
+ let (output, data) = match this {
+ TryChain::First(fut1, data) => {
+ // Poll the first future
+ let output = ready!(unsafe { Pin::new_unchecked(fut1) }.try_poll(cx));
+ (output, data.take().unwrap())
+ }
+ TryChain::Second(fut2) => {
+ // Poll the second future
+ return unsafe { Pin::new_unchecked(fut2) }
+ .try_poll(cx)
+ .map(|res| {
+ *this = TryChain::Empty; // Drop fut2.
+ res
+ });
+ }
+ TryChain::Empty => {
+ panic!("future must not be polled after it returned `Poll::Ready`");
+ }
+ };
+
+ *this = TryChain::Empty; // Drop fut1
+ let f = f.take().unwrap();
+ match f(output, data) {
+ TryChainAction::Future(fut2) => *this = TryChain::Second(fut2),
+ TryChainAction::Output(output) => return Poll::Ready(output),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ use futures_test::task::noop_context;
+
+ use crate::future::ready;
+
+ use super::{TryChain, TryChainAction};
+
+ #[test]
+ fn try_chain_is_terminated() {
+ let mut cx = noop_context();
+
+ let mut future = TryChain::new(ready(Ok(1)), ());
+ assert!(!future.is_terminated());
+
+ let res = Pin::new(&mut future).poll(
+ &mut cx,
+ |res: Result<usize, ()>, ()| {
+ assert!(res.is_ok());
+ TryChainAction::Future(ready(Ok(2)))
+ },
+ );
+ assert_eq!(res, Poll::Ready::<Result<usize, ()>>(Ok(2)));
+ assert!(future.is_terminated());
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs
new file mode 100644
index 0000000000..24624314c0
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs
@@ -0,0 +1,91 @@
+use super::FlattenStreamSink;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_utils::unsafe_pinned;
+
+/// Stream for the [`try_flatten_stream`](super::TryFutureExt::try_flatten_stream) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct TryFlattenStream<Fut>
+where
+ Fut: TryFuture,
+{
+ inner: FlattenStreamSink<Fut>,
+}
+
+impl<Fut: TryFuture> TryFlattenStream<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
+{
+ unsafe_pinned!(inner: FlattenStreamSink<Fut>);
+
+ pub(super) fn new(future: Fut) -> Self {
+ Self {
+ inner: FlattenStreamSink::new(future),
+ }
+ }
+}
+
+impl<Fut> fmt::Debug for TryFlattenStream<Fut>
+where
+ Fut: TryFuture + fmt::Debug,
+ Fut::Ok: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryFlattenStream")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+impl<Fut> FusedStream for TryFlattenStream<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error> + FusedStream,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<Fut> Stream for TryFlattenStream<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
+{
+ type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner().poll_next(cx)
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<Fut, Item> Sink<Item> for TryFlattenStream<Fut>
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error> + Sink<Item, Error = Fut::Error>,
+{
+ type Error = Fut::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.inner().start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner().poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs b/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs
new file mode 100644
index 0000000000..286cc009fb
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs
@@ -0,0 +1,55 @@
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+/// Future for the [`unwrap_or_else`](super::TryFutureExt::unwrap_or_else)
+/// method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct UnwrapOrElse<Fut, F> {
+ future: Fut,
+ f: Option<F>,
+}
+
+impl<Fut, F> UnwrapOrElse<Fut, F> {
+ unsafe_pinned!(future: Fut);
+ unsafe_unpinned!(f: Option<F>);
+
+ /// Creates a new UnwrapOrElse.
+ pub(super) fn new(future: Fut, f: F) -> UnwrapOrElse<Fut, F> {
+ UnwrapOrElse { future, f: Some(f) }
+ }
+}
+
+impl<Fut: Unpin, F> Unpin for UnwrapOrElse<Fut, F> {}
+
+impl<Fut, F> FusedFuture for UnwrapOrElse<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Error) -> Fut::Ok,
+{
+ fn is_terminated(&self) -> bool {
+ self.f.is_none()
+ }
+}
+
+impl<Fut, F> Future for UnwrapOrElse<Fut, F>
+ where Fut: TryFuture,
+ F: FnOnce(Fut::Error) -> Fut::Ok,
+{
+ type Output = Fut::Ok;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ self.as_mut()
+ .future()
+ .try_poll(cx)
+ .map(|result| {
+ let op = self.as_mut().f().take()
+ .expect("UnwrapOrElse already returned `Poll::Ready` before");
+ result.unwrap_or_else(op)
+ })
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_join.rs b/third_party/rust/futures-util/src/future/try_join.rs
new file mode 100644
index 0000000000..da85eff91d
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_join.rs
@@ -0,0 +1,262 @@
+#![allow(non_snake_case)]
+
+use crate::future::{MaybeDone, maybe_done, TryFutureExt, IntoFuture};
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+
+macro_rules! generate {
+ ($(
+ $(#[$doc:meta])*
+ ($Join:ident, <Fut1, $($Fut:ident),*>),
+ )*) => ($(
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> {
+ Fut1: MaybeDone<IntoFuture<Fut1>>,
+ $($Fut: MaybeDone<IntoFuture<$Fut>>,)*
+ }
+
+ impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture + fmt::Debug,
+ Fut1::Ok: fmt::Debug,
+ Fut1::Error: fmt::Debug,
+ $(
+ $Fut: TryFuture + fmt::Debug,
+ $Fut::Ok: fmt::Debug,
+ $Fut::Error: fmt::Debug,
+ )*
+ {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct(stringify!($Join))
+ .field("Fut1", &self.Fut1)
+ $(.field(stringify!($Fut), &self.$Fut))*
+ .finish()
+ }
+ }
+
+ impl<Fut1, $($Fut),*> $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture,
+ $(
+ $Fut: TryFuture<Error=Fut1::Error>
+ ),*
+ {
+ fn new(Fut1: Fut1, $($Fut: $Fut),*) -> $Join<Fut1, $($Fut),*> {
+ $Join {
+ Fut1: maybe_done(TryFutureExt::into_future(Fut1)),
+ $($Fut: maybe_done(TryFutureExt::into_future($Fut))),*
+ }
+ }
+
+ unsafe_pinned!(Fut1: MaybeDone<IntoFuture<Fut1>>);
+ $(
+ unsafe_pinned!($Fut: MaybeDone<IntoFuture<$Fut>>);
+ )*
+ }
+
+ impl<Fut1, $($Fut),*> Future for $Join<Fut1, $($Fut),*>
+ where
+ Fut1: TryFuture,
+ $(
+ $Fut: TryFuture<Error=Fut1::Error>
+ ),*
+ {
+ type Output = Result<(Fut1::Ok, $($Fut::Ok),*), Fut1::Error>;
+
+ fn poll(
+ mut self: Pin<&mut Self>, cx: &mut Context<'_>
+ ) -> Poll<Self::Output> {
+ let mut all_done = true;
+ if self.as_mut().Fut1().poll(cx).is_pending() {
+ all_done = false;
+ } else if self.as_mut().Fut1().output_mut().unwrap().is_err() {
+ return Poll::Ready(Err(
+ self.as_mut().Fut1().take_output().unwrap().err().unwrap()));
+ }
+ $(
+ if self.as_mut().$Fut().poll(cx).is_pending() {
+ all_done = false;
+ } else if self.as_mut().$Fut().output_mut().unwrap().is_err() {
+ return Poll::Ready(Err(
+ self.as_mut().$Fut().take_output().unwrap().err().unwrap()));
+ }
+ )*
+
+ if all_done {
+ Poll::Ready(Ok((
+ self.as_mut().Fut1().take_output().unwrap().ok().unwrap(),
+ $(
+ self.as_mut().$Fut().take_output().unwrap().ok().unwrap()
+ ),*
+ )))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+ )*)
+}
+
+generate! {
+ /// Future for the [`try_join`](try_join()) function.
+ (TryJoin, <Fut1, Fut2>),
+
+ /// Future for the [`try_join3`] function.
+ (TryJoin3, <Fut1, Fut2, Fut3>),
+
+ /// Future for the [`try_join4`] function.
+ (TryJoin4, <Fut1, Fut2, Fut3, Fut4>),
+
+ /// Future for the [`try_join5`] function.
+ (TryJoin5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
+}
+
+/// Joins the result of two futures, waiting for them both to complete or
+/// for one to produce an error.
+///
+/// This function will return a new future which awaits both futures to
+/// complete. If successful, the returned future will finish with a tuple of
+/// both results. If unsuccesful, it will complete with the first error
+/// encountered.
+///
+/// Note that this function consumes the passed futures and returns a
+/// wrapped version of it.
+///
+/// # Examples
+///
+/// When used on multiple futures that return [`Ok`], `try_join` will return
+/// [`Ok`] of a tuple of the values:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let pair = future::try_join(a, b);
+///
+/// assert_eq!(pair.await, Ok((1, 2)));
+/// # });
+/// ```
+///
+/// If one of the futures resolves to an error, `try_join` will return
+/// that error:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Err::<i32, i32>(2));
+/// let pair = future::try_join(a, b);
+///
+/// assert_eq!(pair.await, Err(2));
+/// # });
+/// ```
+pub fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+{
+ TryJoin::new(future1, future2)
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let tuple = future::try_join3(a, b, c);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3)));
+/// # });
+/// ```
+pub fn try_join3<Fut1, Fut2, Fut3>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+) -> TryJoin3<Fut1, Fut2, Fut3>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+{
+ TryJoin3::new(future1, future2, future3)
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let d = future::ready(Ok::<i32, i32>(4));
+/// let tuple = future::try_join4(a, b, c, d);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3, 4)));
+/// # });
+/// ```
+pub fn try_join4<Fut1, Fut2, Fut3, Fut4>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+) -> TryJoin4<Fut1, Fut2, Fut3, Fut4>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+ Fut4: TryFuture<Error = Fut1::Error>,
+{
+ TryJoin4::new(future1, future2, future3, future4)
+}
+
+/// Same as [`try_join`](try_join()), but with more futures.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let a = future::ready(Ok::<i32, i32>(1));
+/// let b = future::ready(Ok::<i32, i32>(2));
+/// let c = future::ready(Ok::<i32, i32>(3));
+/// let d = future::ready(Ok::<i32, i32>(4));
+/// let e = future::ready(Ok::<i32, i32>(5));
+/// let tuple = future::try_join5(a, b, c, d, e);
+///
+/// assert_eq!(tuple.await, Ok((1, 2, 3, 4, 5)));
+/// # });
+/// ```
+pub fn try_join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
+ future1: Fut1,
+ future2: Fut2,
+ future3: Fut3,
+ future4: Fut4,
+ future5: Fut5,
+) -> TryJoin5<Fut1, Fut2, Fut3, Fut4, Fut5>
+where
+ Fut1: TryFuture,
+ Fut2: TryFuture<Error = Fut1::Error>,
+ Fut3: TryFuture<Error = Fut1::Error>,
+ Fut4: TryFuture<Error = Fut1::Error>,
+ Fut5: TryFuture<Error = Fut1::Error>,
+{
+ TryJoin5::new(future1, future2, future3, future4, future5)
+}
diff --git a/third_party/rust/futures-util/src/future/try_join_all.rs b/third_party/rust/futures-util/src/future/try_join_all.rs
new file mode 100644
index 0000000000..30300e4e3e
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_join_all.rs
@@ -0,0 +1,179 @@
+//! Definition of the `TryJoinAll` combinator, waiting for all of a list of
+//! futures to finish with either success or error.
+
+use core::fmt;
+use core::future::Future;
+use core::iter::FromIterator;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use alloc::boxed::Box;
+use alloc::vec::Vec;
+
+use super::TryFuture;
+
+#[derive(Debug)]
+enum ElemState<F>
+where
+ F: TryFuture,
+{
+ Pending(F),
+ Done(Option<F::Ok>),
+}
+
+impl<F> ElemState<F>
+where
+ F: TryFuture,
+{
+ fn pending_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut F>> {
+ // Safety: Basic enum pin projection, no drop + optionally Unpin based
+ // on the type of this variant
+ match unsafe { self.get_unchecked_mut() } {
+ ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }),
+ ElemState::Done(_) => None,
+ }
+ }
+
+ fn take_done(self: Pin<&mut Self>) -> Option<F::Ok> {
+ // Safety: Going from pin to a variant we never pin-project
+ match unsafe { self.get_unchecked_mut() } {
+ ElemState::Pending(_) => None,
+ ElemState::Done(output) => output.take(),
+ }
+ }
+}
+
+impl<F> Unpin for ElemState<F> where F: TryFuture + Unpin {}
+
+fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+ // Safety: `std` _could_ make this unsound if it were to decide Pin's
+ // invariants aren't required to transmit through slices. Otherwise this has
+ // the same safety as a normal field pin projection.
+ unsafe { slice.get_unchecked_mut() }
+ .iter_mut()
+ .map(|t| unsafe { Pin::new_unchecked(t) })
+}
+
+enum FinalState<E = ()> {
+ Pending,
+ AllDone,
+ Error(E)
+}
+
+/// Future for the [`try_join_all`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct TryJoinAll<F>
+where
+ F: TryFuture,
+{
+ elems: Pin<Box<[ElemState<F>]>>,
+}
+
+impl<F> fmt::Debug for TryJoinAll<F>
+where
+ F: TryFuture + fmt::Debug,
+ F::Ok: fmt::Debug,
+ F::Error: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryJoinAll")
+ .field("elems", &self.elems)
+ .finish()
+ }
+}
+
+/// Creates a future which represents either a collection of the results of the
+/// futures given or an error.
+///
+/// The returned future will drive execution for all of its underlying futures,
+/// collecting the results into a destination `Vec<T>` in the same order as they
+/// were provided.
+///
+/// If any future returns an error then all other futures will be canceled and
+/// an error will be returned immediately. If all futures complete successfully,
+/// however, then the returned future will succeed with a `Vec` of all the
+/// successful results.
+///
+/// This function is only available when the `std` or `alloc` feature of this
+/// library is activated, and it is activated by default.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::{self, try_join_all};
+///
+/// let futures = vec![
+/// future::ok::<u32, u32>(1),
+/// future::ok::<u32, u32>(2),
+/// future::ok::<u32, u32>(3),
+/// ];
+///
+/// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3]));
+///
+/// let futures = vec![
+/// future::ok::<u32, u32>(1),
+/// future::err::<u32, u32>(2),
+/// future::ok::<u32, u32>(3),
+/// ];
+///
+/// assert_eq!(try_join_all(futures).await, Err(2));
+/// # });
+/// ```
+pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
+where
+ I: IntoIterator,
+ I::Item: TryFuture,
+{
+ let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect();
+ TryJoinAll {
+ elems: elems.into(),
+ }
+}
+
+impl<F> Future for TryJoinAll<F>
+where
+ F: TryFuture,
+{
+ type Output = Result<Vec<F::Ok>, F::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut state = FinalState::AllDone;
+
+ for mut elem in iter_pin_mut(self.elems.as_mut()) {
+ if let Some(pending) = elem.as_mut().pending_pin_mut() {
+ match pending.try_poll(cx) {
+ Poll::Pending => state = FinalState::Pending,
+ Poll::Ready(output) => match output {
+ Ok(item) => elem.set(ElemState::Done(Some(item))),
+ Err(e) => {
+ state = FinalState::Error(e);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ match state {
+ FinalState::Pending => Poll::Pending,
+ FinalState::AllDone => {
+ let mut elems = mem::replace(&mut self.elems, Box::pin([]));
+ let results = iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_done().unwrap())
+ .collect();
+ Poll::Ready(Ok(results))
+ },
+ FinalState::Error(e) => {
+ let _ = mem::replace(&mut self.elems, Box::pin([]));
+ Poll::Ready(Err(e))
+ },
+ }
+ }
+}
+
+impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
+ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
+ try_join_all(iter)
+ }
+}
diff --git a/third_party/rust/futures-util/src/future/try_select.rs b/third_party/rust/futures-util/src/future/try_select.rs
new file mode 100644
index 0000000000..56564f5b5c
--- /dev/null
+++ b/third_party/rust/futures-util/src/future/try_select.rs
@@ -0,0 +1,80 @@
+use core::pin::Pin;
+use futures_core::future::{Future, TryFuture};
+use futures_core::task::{Context, Poll};
+use crate::future::{Either, TryFutureExt};
+
+/// Future for the [`try_select()`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct TrySelect<A, B> {
+ inner: Option<(A, B)>,
+}
+
+impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
+
+/// Waits for either one of two differently-typed futures to complete.
+///
+/// This function will return a new future which awaits for either one of both
+/// futures to complete. The returned future will finish with both the value
+/// resolved and a future representing the completion of the other work.
+///
+/// Note that this function consumes the receiving futures and returns a
+/// wrapped version of them.
+///
+/// Also note that if both this and the second future have the same
+/// success/error type you can use the `Either::factor_first` method to
+/// conveniently extract out the value at the end.
+///
+/// # Examples
+///
+/// ```
+/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt};
+///
+/// // A poor-man's try_join implemented on top of select
+///
+/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E>
+/// where A: TryFuture<Error = E> + Unpin + 'static,
+/// B: TryFuture<Error = E> + Unpin + 'static,
+/// E: 'static,
+/// {
+/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> {
+/// match res {
+/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))),
+/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))),
+/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
+/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
+/// }
+/// })
+/// }
+/// ```
+pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
+ where A: TryFuture + Unpin, B: TryFuture + Unpin
+{
+ TrySelect { inner: Some((future1, future2)) }
+}
+
+impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
+ where A: TryFuture, B: TryFuture
+{
+ #[allow(clippy::type_complexity)]
+ type Output = Result<
+ Either<(A::Ok, B), (B::Ok, A)>,
+ Either<(A::Error, B), (B::Error, A)>,
+ >;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
+ match a.try_poll_unpin(cx) {
+ Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
+ Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
+ Poll::Pending => match b.try_poll_unpin(cx) {
+ Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
+ Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
+ Poll::Pending => {
+ self.inner = Some((a, b));
+ Poll::Pending
+ }
+ }
+ }
+ }
+}