diff options
Diffstat (limited to 'third_party/rust/futures-util/src/future')
46 files changed, 5231 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/future/abortable.rs b/third_party/rust/futures-util/src/future/abortable.rs new file mode 100644 index 0000000000..281cf6b481 --- /dev/null +++ b/third_party/rust/futures-util/src/future/abortable.rs @@ -0,0 +1,177 @@ +use crate::task::AtomicWaker; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; +use core::fmt; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use alloc::sync::Arc; + +/// A future which can be remotely short-circuited using an `AbortHandle`. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Abortable<Fut> { + future: Fut, + inner: Arc<AbortInner>, +} + +impl<Fut: Unpin> Unpin for Abortable<Fut> {} + +impl<Fut> Abortable<Fut> where Fut: Future { + unsafe_pinned!(future: Fut); + + /// Creates a new `Abortable` future using an existing `AbortRegistration`. + /// `AbortRegistration`s can be acquired through `AbortHandle::new`. + /// + /// When `abort` is called on the handle tied to `reg` or if `abort` has + /// already been called, the future will complete immediately without making + /// any further progress. + /// + /// Example: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{Abortable, AbortHandle, Aborted}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let future = Abortable::new(async { 2 }, abort_registration); + /// abort_handle.abort(); + /// assert_eq!(future.await, Err(Aborted)); + /// # }); + /// ``` + pub fn new(future: Fut, reg: AbortRegistration) -> Self { + Abortable { + future, + inner: reg.inner, + } + } +} + +/// A registration handle for a `Abortable` future. +/// Values of this type can be acquired from `AbortHandle::new` and are used +/// in calls to `Abortable::new`. +#[derive(Debug)] +pub struct AbortRegistration { + inner: Arc<AbortInner>, +} + +/// A handle to a `Abortable` future. +#[derive(Debug, Clone)] +pub struct AbortHandle { + inner: Arc<AbortInner>, +} + +impl AbortHandle { + /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used + /// to abort a running future. + /// + /// This function is usually paired with a call to `Abortable::new`. + /// + /// Example: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{Abortable, AbortHandle, Aborted}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let future = Abortable::new(async { 2 }, abort_registration); + /// abort_handle.abort(); + /// assert_eq!(future.await, Err(Aborted)); + /// # }); + /// ``` + pub fn new_pair() -> (Self, AbortRegistration) { + let inner = Arc::new(AbortInner { + waker: AtomicWaker::new(), + cancel: AtomicBool::new(false), + }); + + ( + AbortHandle { + inner: inner.clone(), + }, + AbortRegistration { + inner, + }, + ) + } +} + +// Inner type storing the waker to awaken and a bool indicating that it +// should be cancelled. +#[derive(Debug)] +struct AbortInner { + waker: AtomicWaker, + cancel: AtomicBool, +} + +/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it. +/// +/// This function is a convenient (but less flexible) alternative to calling +/// `AbortHandle::new` and `Abortable::new` manually. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle) + where Fut: Future +{ + let (handle, reg) = AbortHandle::new_pair(); + ( + Abortable::new(future, reg), + handle, + ) +} + +/// Indicator that the `Abortable` future was aborted. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct Aborted; + +impl fmt::Display for Aborted { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "`Abortable` future has been aborted") + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Aborted {} + +impl<Fut> Future for Abortable<Fut> where Fut: Future { + type Output = Result<Fut::Output, Aborted>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Check if the future has been aborted + if self.inner.cancel.load(Ordering::Relaxed) { + return Poll::Ready(Err(Aborted)) + } + + // attempt to complete the future + if let Poll::Ready(x) = self.as_mut().future().poll(cx) { + return Poll::Ready(Ok(x)) + } + + // Register to receive a wakeup if the future is aborted in the... future + self.inner.waker.register(cx.waker()); + + // Check to see if the future was aborted between the first check and + // registration. + // Checking with `Relaxed` is sufficient because `register` introduces an + // `AcqRel` barrier. + if self.inner.cancel.load(Ordering::Relaxed) { + return Poll::Ready(Err(Aborted)) + } + + Poll::Pending + } +} + +impl AbortHandle { + /// Abort the `Abortable` future associated with this handle. + /// + /// Notifies the Abortable future associated with this handle that it + /// should abort. Note that if the future is currently being polled on + /// another thread, it will not immediately stop running. Instead, it will + /// continue to run until its poll method returns. + pub fn abort(&self) { + self.inner.cancel.store(true, Ordering::Relaxed); + self.inner.waker.wake(); + } +} diff --git a/third_party/rust/futures-util/src/future/either.rs b/third_party/rust/futures-util/src/future/either.rs new file mode 100644 index 0000000000..24fbbe79d8 --- /dev/null +++ b/third_party/rust/futures-util/src/future/either.rs @@ -0,0 +1,304 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +/// Combines two different futures, streams, or sinks having the same associated types into a single +/// type. +#[derive(Debug, Clone)] +pub enum Either<A, B> { + /// First branch of the type + Left(A), + /// Second branch of the type + Right(B), +} + +impl<A, B, T> Either<(T, A), (T, B)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the first element of the pairs. + pub fn factor_first(self) -> (T, Either<A, B>) { + match self { + Either::Left((x, a)) => (x, Either::Left(a)), + Either::Right((x, b)) => (x, Either::Right(b)), + } + } +} + +impl<A, B, T> Either<(A, T), (B, T)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the second element of the pairs. + pub fn factor_second(self) -> (Either<A, B>, T) { + match self { + Either::Left((a, x)) => (Either::Left(a), x), + Either::Right((b, x)) => (Either::Right(b), x), + } + } +} + +impl<T> Either<T, T> { + /// Extract the value of an either over two equivalent types. + pub fn into_inner(self) -> T { + match self { + Either::Left(x) => x, + Either::Right(x) => x, + } + } +} + +impl<A, B> Future for Either<A, B> +where + A: Future, + B: Future<Output = A::Output>, +{ + type Output = A::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<A::Output> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll(cx), + Either::Right(x) => Pin::new_unchecked(x).poll(cx), + } + } + } +} + +impl<A, B> FusedFuture for Either<A, B> +where + A: FusedFuture, + B: FusedFuture<Output = A::Output>, +{ + fn is_terminated(&self) -> bool { + match self { + Either::Left(x) => x.is_terminated(), + Either::Right(x) => x.is_terminated(), + } + } +} + +impl<A, B> Stream for Either<A, B> +where + A: Stream, + B: Stream<Item = A::Item>, +{ + type Item = A::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<A::Item>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_next(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_next(cx), + } + } + } +} + +impl<A, B> FusedStream for Either<A, B> +where + A: FusedStream, + B: FusedStream<Item = A::Item>, +{ + fn is_terminated(&self) -> bool { + match self { + Either::Left(x) => x.is_terminated(), + Either::Right(x) => x.is_terminated(), + } + } +} + +#[cfg(feature = "sink")] +impl<A, B, Item> Sink<Item> for Either<A, B> +where + A: Sink<Item>, + B: Sink<Item, Error = A::Error>, +{ + type Error = A::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_ready(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_ready(cx), + } + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).start_send(item), + Either::Right(x) => Pin::new_unchecked(x).start_send(item), + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx), + } + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_close(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_close(cx), + } + } + } +} + +#[cfg(feature = "io")] +#[cfg(feature = "std")] +mod if_std { + use super::Either; + use core::pin::Pin; + use core::task::{Context, Poll}; + #[cfg(feature = "read-initializer")] + use futures_io::Initializer; + use futures_io::{ + AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom, + }; + + impl<A, B> AsyncRead for Either<A, B> + where + A: AsyncRead, + B: AsyncRead, + { + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + match self { + Either::Left(x) => x.initializer(), + Either::Right(x) => x.initializer(), + } + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<usize>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_read(cx, buf), + Either::Right(x) => Pin::new_unchecked(x).poll_read(cx, buf), + } + } + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<Result<usize>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_read_vectored(cx, bufs), + Either::Right(x) => Pin::new_unchecked(x).poll_read_vectored(cx, bufs), + } + } + } + } + + impl<A, B> AsyncWrite for Either<A, B> + where + A: AsyncWrite, + B: AsyncWrite, + { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_write(cx, buf), + Either::Right(x) => Pin::new_unchecked(x).poll_write(cx, buf), + } + } + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<Result<usize>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_write_vectored(cx, bufs), + Either::Right(x) => Pin::new_unchecked(x).poll_write_vectored(cx, bufs), + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx), + } + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_close(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_close(cx), + } + } + } + } + + impl<A, B> AsyncSeek for Either<A, B> + where + A: AsyncSeek, + B: AsyncSeek, + { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<Result<u64>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_seek(cx, pos), + Either::Right(x) => Pin::new_unchecked(x).poll_seek(cx, pos), + } + } + } + } + + impl<A, B> AsyncBufRead for Either<A, B> + where + A: AsyncBufRead, + B: AsyncBufRead, + { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<&[u8]>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).poll_fill_buf(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_fill_buf(cx), + } + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + unsafe { + match self.get_unchecked_mut() { + Either::Left(x) => Pin::new_unchecked(x).consume(amt), + Either::Right(x) => Pin::new_unchecked(x).consume(amt), + } + } + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/catch_unwind.rs b/third_party/rust/futures-util/src/future/future/catch_unwind.rs new file mode 100644 index 0000000000..e88cce7e9d --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/catch_unwind.rs @@ -0,0 +1,31 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; +use std::any::Any; +use std::pin::Pin; +use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; + +/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CatchUnwind<Fut> { + future: Fut, +} + +impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe { + unsafe_pinned!(future: Fut); + + pub(super) fn new(future: Fut) -> CatchUnwind<Fut> { + CatchUnwind { future } + } +} + +impl<Fut> Future for CatchUnwind<Fut> + where Fut: Future + UnwindSafe, +{ + type Output = Result<Fut::Output, Box<dyn Any + Send>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok) + } +} diff --git a/third_party/rust/futures-util/src/future/future/chain.rs b/third_party/rust/futures-util/src/future/future/chain.rs new file mode 100644 index 0000000000..3f248e80fe --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/chain.rs @@ -0,0 +1,58 @@ +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub(crate) enum Chain<Fut1, Fut2, Data> { + First(Fut1, Option<Data>), + Second(Fut2), + Empty, +} + +impl<Fut1: Unpin, Fut2: Unpin, Data> Unpin for Chain<Fut1, Fut2, Data> {} + +impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data> { + pub(crate)fn is_terminated(&self) -> bool { + if let Chain::Empty = *self { true } else { false } + } +} + +impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data> + where Fut1: Future, + Fut2: Future, +{ + pub(crate) fn new(fut1: Fut1, data: Data) -> Chain<Fut1, Fut2, Data> { + Chain::First(fut1, Some(data)) + } + + pub(crate) fn poll<F>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + f: F, + ) -> Poll<Fut2::Output> + where F: FnOnce(Fut1::Output, Data) -> Fut2, + { + let mut f = Some(f); + + // Safe to call `get_unchecked_mut` because we won't move the futures. + let this = unsafe { self.get_unchecked_mut() }; + + loop { + let (output, data) = match this { + Chain::First(fut1, data) => { + let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx)); + (output, data.take().unwrap()) + } + Chain::Second(fut2) => { + return unsafe { Pin::new_unchecked(fut2) }.poll(cx); + } + Chain::Empty => unreachable!() + }; + + *this = Chain::Empty; // Drop fut1 + let fut2 = (f.take().unwrap())(output, data); + *this = Chain::Second(fut2) + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/flatten.rs b/third_party/rust/futures-util/src/future/future/flatten.rs new file mode 100644 index 0000000000..16b3a19de9 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/flatten.rs @@ -0,0 +1,56 @@ +use super::chain::Chain; +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`flatten`](super::FutureExt::flatten) method. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Flatten<Fut> + where Fut: Future, +{ + state: Chain<Fut, Fut::Output, ()>, +} + +impl<Fut> Flatten<Fut> + where Fut: Future, + Fut::Output: Future, +{ + unsafe_pinned!(state: Chain<Fut, Fut::Output, ()>); + + pub(super) fn new(future: Fut) -> Flatten<Fut> { + Flatten { + state: Chain::new(future, ()), + } + } +} + +impl<Fut> fmt::Debug for Flatten<Fut> + where Fut: Future + fmt::Debug, + Fut::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Flatten") + .field("state", &self.state) + .finish() + } +} + +impl<Fut> FusedFuture for Flatten<Fut> + where Fut: Future, + Fut::Output: Future, +{ + fn is_terminated(&self) -> bool { self.state.is_terminated() } +} + +impl<Fut> Future for Flatten<Fut> + where Fut: Future, + Fut::Output: Future, +{ + type Output = <Fut::Output as Future>::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.state().poll(cx, |a, ()| a) + } +} diff --git a/third_party/rust/futures-util/src/future/future/flatten_stream.rs b/third_party/rust/futures-util/src/future/future/flatten_stream.rs new file mode 100644 index 0000000000..d1108866ca --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/flatten_stream.rs @@ -0,0 +1,89 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method. +#[must_use = "streams do nothing unless polled"] +pub struct FlattenStream<Fut: Future> { + state: State<Fut, Fut::Output>, +} + +impl<Fut: Future> FlattenStream<Fut> { + unsafe_pinned!(state: State<Fut, Fut::Output>); + + pub(super) fn new(future: Fut) -> FlattenStream<Fut> { + FlattenStream { + state: State::Future(future) + } + } +} + +impl<Fut> fmt::Debug for FlattenStream<Fut> + where Fut: Future + fmt::Debug, + Fut::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenStream") + .field("state", &self.state) + .finish() + } +} + +#[derive(Debug)] +enum State<Fut, St> { + // future is not yet called or called and not ready + Future(Fut), + // future resolved to Stream + Stream(St), +} + +impl<Fut, St> State<Fut, St> { + fn get_pin_mut(self: Pin<&mut Self>) -> State<Pin<&mut Fut>, Pin<&mut St>> { + // safety: data is never moved via the resulting &mut reference + match unsafe { self.get_unchecked_mut() } { + // safety: the future we're re-pinning here will never be moved; + // it will just be polled, then dropped in place + State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }), + // safety: the stream we're repinning here will never be moved; + // it will just be polled, then dropped in place + State::Stream(s) => State::Stream(unsafe { Pin::new_unchecked(s) }), + } + } +} + +impl<Fut> FusedStream for FlattenStream<Fut> + where Fut: Future, + Fut::Output: Stream + FusedStream, +{ + fn is_terminated(&self) -> bool { + match &self.state { + State::Future(_) => false, + State::Stream(stream) => stream.is_terminated(), + } + } +} + +impl<Fut> Stream for FlattenStream<Fut> + where Fut: Future, + Fut::Output: Stream, +{ + type Item = <Fut::Output as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match self.as_mut().state().get_pin_mut() { + State::Future(f) => { + let stream = ready!(f.poll(cx)); + // Future resolved to stream. + // We do not return, but poll that + // stream in the next loop iteration. + self.as_mut().state().set(State::Stream(stream)); + } + State::Stream(s) => return s.poll_next(cx), + } + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/fuse.rs b/third_party/rust/futures-util/src/future/future/fuse.rs new file mode 100644 index 0000000000..b5ef913034 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/fuse.rs @@ -0,0 +1,90 @@ +use core::pin::Pin; +use futures_core::future::{Future, FusedFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`fuse`](super::FutureExt::fuse) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Fuse<Fut> { + future: Option<Fut>, +} + +impl<Fut: Future> Fuse<Fut> { + unsafe_pinned!(future: Option<Fut>); + + pub(super) fn new(f: Fut) -> Fuse<Fut> { + Fuse { + future: Some(f), + } + } + + /// Creates a new `Fuse`-wrapped future which is already terminated. + /// + /// This can be useful in combination with looping and the `select!` + /// macro, which bypasses terminated futures. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::future::{Fuse, FusedFuture, FutureExt}; + /// use futures::select; + /// use futures::stream::StreamExt; + /// use futures::pin_mut; + /// + /// let (sender, mut stream) = mpsc::unbounded(); + /// + /// // Send a few messages into the stream + /// sender.unbounded_send(()).unwrap(); + /// sender.unbounded_send(()).unwrap(); + /// drop(sender); + /// + /// // Use `Fuse::termianted()` to create an already-terminated future + /// // which may be instantiated later. + /// let foo_printer = Fuse::terminated(); + /// pin_mut!(foo_printer); + /// + /// loop { + /// select! { + /// _ = foo_printer => {}, + /// () = stream.select_next_some() => { + /// if !foo_printer.is_terminated() { + /// println!("Foo is already being printed!"); + /// } else { + /// foo_printer.set(async { + /// // do some other async operations + /// println!("Printing foo from `foo_printer` future"); + /// }.fuse()); + /// } + /// }, + /// complete => break, // `foo_printer` is terminated and the stream is done + /// } + /// } + /// # }); + /// ``` + pub fn terminated() -> Fuse<Fut> { + Fuse { future: None } + } +} + +impl<Fut: Future> FusedFuture for Fuse<Fut> { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +impl<Fut: Future> Future for Fuse<Fut> { + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { + let v = match self.as_mut().future().as_pin_mut() { + Some(fut) => ready!(fut.poll(cx)), + None => return Poll::Pending, + }; + + self.as_mut().future().set(None); + Poll::Ready(v) + } +} diff --git a/third_party/rust/futures-util/src/future/future/inspect.rs b/third_party/rust/futures-util/src/future/future/inspect.rs new file mode 100644 index 0000000000..d67455aa6d --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/inspect.rs @@ -0,0 +1,47 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`inspect`](super::FutureExt::inspect) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Inspect<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + pub(super) fn new(future: Fut, f: F) -> Inspect<Fut, F> { + Inspect { + future, + f: Some(f), + } + } +} + +impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {} + +impl<Fut, F> FusedFuture for Inspect<Fut, F> + where Fut: FusedFuture, + F: FnOnce(&Fut::Output), +{ + fn is_terminated(&self) -> bool { self.future.is_terminated() } +} + +impl<Fut, F> Future for Inspect<Fut, F> + where Fut: Future, + F: FnOnce(&Fut::Output), +{ + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { + let e = ready!(self.as_mut().future().poll(cx)); + let f = self.as_mut().f().take().expect("cannot poll Inspect twice"); + f(&e); + Poll::Ready(e) + } +} diff --git a/third_party/rust/futures-util/src/future/future/into_stream.rs b/third_party/rust/futures-util/src/future/future/into_stream.rs new file mode 100644 index 0000000000..616c4cbb57 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/into_stream.rs @@ -0,0 +1,43 @@ +use crate::stream::{self, Once}; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::{Stream, FusedStream}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Stream for the [`into_stream`](super::FutureExt::into_stream) method. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct IntoStream<Fut> { + inner: Once<Fut> +} + +impl<Fut: Future> IntoStream<Fut> { + unsafe_pinned!(inner: Once<Fut>); + + pub(super) fn new(future: Fut) -> IntoStream<Fut> { + IntoStream { + inner: stream::once(future) + } + } +} + +impl<Fut: Future> Stream for IntoStream<Fut> { + type Item = Fut::Output; + + #[inline] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner().poll_next(cx) + } + + #[inline] + fn size_hint(&self) -> (usize, Option<usize>) { + self.inner.size_hint() + } +} + +impl<Fut: Future> FusedStream for IntoStream<Fut> { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} diff --git a/third_party/rust/futures-util/src/future/future/map.rs b/third_party/rust/futures-util/src/future/future/map.rs new file mode 100644 index 0000000000..b5fbfb1384 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/map.rs @@ -0,0 +1,49 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`map`](super::FutureExt::map) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Map<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut, F> Map<Fut, F> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + /// Creates a new Map. + pub(super) fn new(future: Fut, f: F) -> Map<Fut, F> { + Map { future, f: Some(f) } + } +} + +impl<Fut: Unpin, F> Unpin for Map<Fut, F> {} + +impl<Fut, F, T> FusedFuture for Map<Fut, F> + where Fut: Future, + F: FnOnce(Fut::Output) -> T, +{ + fn is_terminated(&self) -> bool { self.f.is_none() } +} + +impl<Fut, F, T> Future for Map<Fut, F> + where Fut: Future, + F: FnOnce(Fut::Output) -> T, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + self.as_mut() + .future() + .poll(cx) + .map(|output| { + let f = self.f().take() + .expect("Map must not be polled after it returned `Poll::Ready`"); + f(output) + }) + } +} diff --git a/third_party/rust/futures-util/src/future/future/mod.rs b/third_party/rust/futures-util/src/future/future/mod.rs new file mode 100644 index 0000000000..e58cafc8c0 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/mod.rs @@ -0,0 +1,558 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the `FutureExt` trait which adds methods to `Future` types. + +use super::{assert_future, Either}; +#[cfg(feature = "alloc")] +use alloc::boxed::Box; +use core::pin::Pin; +#[cfg(feature = "alloc")] +use futures_core::future::{BoxFuture, LocalBoxFuture}; +use futures_core::{ + future::Future, + stream::Stream, + task::{Context, Poll}, +}; + +// Combinators + +mod flatten; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::flatten::Flatten; + +mod flatten_stream; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::flatten_stream::FlattenStream; + +mod fuse; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::fuse::Fuse; + +mod into_stream; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::into_stream::IntoStream; + +mod map; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::map::Map; + +mod then; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::then::Then; + +mod inspect; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::inspect::Inspect; + +mod unit_error; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::unit_error::UnitError; + +mod never_error; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::never_error::NeverError; + +#[cfg(feature = "std")] +mod catch_unwind; +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::catch_unwind::CatchUnwind; + +#[cfg(feature = "channel")] +#[cfg(feature = "std")] +mod remote_handle; +#[cfg(feature = "channel")] +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::remote_handle::{Remote, RemoteHandle}; + +#[cfg(feature = "std")] +mod shared; +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::shared::Shared; + +// Implementation details + +mod chain; +pub(crate) use self::chain::Chain; + +impl<T: ?Sized> FutureExt for T where T: Future {} + +/// An extension trait for `Future`s that provides a variety of convenient +/// adapters. +pub trait FutureExt: Future { + /// Map this future's output to a different type, returning a new future of + /// the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying future. This is useful to + /// chain along a computation once a future has been resolved. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 1 }; + /// let new_future = future.map(|x| x + 3); + /// assert_eq!(new_future.await, 4); + /// # }); + /// ``` + fn map<U, F>(self, f: F) -> Map<Self, F> + where + F: FnOnce(Self::Output) -> U, + Self: Sized, + { + assert_future::<U, _>(Map::new(self, f)) + } + + /// Chain on a computation for when a future finished, passing the result of + /// the future to the provided closure `f`. + /// + /// The returned value of the closure must implement the `Future` trait + /// and can represent some more work to be done before the composed future + /// is finished. + /// + /// The closure `f` is only run *after* successful completion of the `self` + /// future. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future_of_1 = async { 1 }; + /// let future_of_4 = future_of_1.then(|x| async move { x + 3 }); + /// assert_eq!(future_of_4.await, 4); + /// # }); + /// ``` + fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> + where + F: FnOnce(Self::Output) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::<Fut::Output, _>(Then::new(self, f)) + } + + /// Wrap this future in an `Either` future, making it the left-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `right_future` method to write `if` + /// statements that evaluate to different futures in different branches. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let x = 6; + /// let future = if x < 10 { + /// async { true }.left_future() + /// } else { + /// async { false }.right_future() + /// }; + /// + /// assert_eq!(future.await, true); + /// # }); + /// ``` + fn left_future<B>(self) -> Either<Self, B> + where + B: Future<Output = Self::Output>, + Self: Sized, + { + Either::Left(self) + } + + /// Wrap this future in an `Either` future, making it the right-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `left_future` method to write `if` + /// statements that evaluate to different futures in different branches. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let x = 6; + /// let future = if x > 10 { + /// async { true }.left_future() + /// } else { + /// async { false }.right_future() + /// }; + /// + /// assert_eq!(future.await, false); + /// # }); + /// ``` + fn right_future<A>(self) -> Either<A, Self> + where + A: Future<Output = Self::Output>, + Self: Sized, + { + Either::Right(self) + } + + /// Convert this future into a single element stream. + /// + /// The returned stream contains single success if this future resolves to + /// success or single error if this future resolves into error. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::stream::StreamExt; + /// + /// let future = async { 17 }; + /// let stream = future.into_stream(); + /// let collected: Vec<_> = stream.collect().await; + /// assert_eq!(collected, vec![17]); + /// # }); + /// ``` + fn into_stream(self) -> IntoStream<Self> + where + Self: Sized, + { + IntoStream::new(self) + } + + /// Flatten the execution of this future when the output of this + /// future is itself another future. + /// + /// This can be useful when combining futures together to flatten the + /// computation out the final result. + /// + /// This method is roughly equivalent to `self.then(|x| x)`. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let nested_future = async { async { 1 } }; + /// let future = nested_future.flatten(); + /// assert_eq!(future.await, 1); + /// # }); + /// ``` + fn flatten(self) -> Flatten<Self> + where + Self::Output: Future, + Self: Sized, + { + let f = Flatten::new(self); + assert_future::<<<Self as Future>::Output as Future>::Output, _>(f) + } + + /// Flatten the execution of this future when the successful result of this + /// future is a stream. + /// + /// This can be useful when stream initialization is deferred, and it is + /// convenient to work with that stream as if stream was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::stream::{self, StreamExt}; + /// + /// let stream_items = vec![17, 18, 19]; + /// let future_of_a_stream = async { stream::iter(stream_items) }; + /// + /// let stream = future_of_a_stream.flatten_stream(); + /// let list: Vec<_> = stream.collect().await; + /// assert_eq!(list, vec![17, 18, 19]); + /// # }); + /// ``` + fn flatten_stream(self) -> FlattenStream<Self> + where + Self::Output: Stream, + Self: Sized, + { + FlattenStream::new(self) + } + + /// Fuse a future such that `poll` will never again be called once it has + /// completed. This method can be used to turn any `Future` into a + /// `FusedFuture`. + /// + /// Normally, once a future has returned `Poll::Ready` from `poll`, + /// any further calls could exhibit bad behavior such as blocking + /// forever, panicking, never returning, etc. If it is known that `poll` + /// may be called too often then this method can be used to ensure that it + /// has defined semantics. + /// + /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready` + /// previously, it will return `Poll::Pending`, from `poll` again (and will + /// continue to do so for all future calls to `poll`). + /// + /// This combinator will drop the underlying future as soon as it has been + /// completed to ensure resources are reclaimed as soon as possible. + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + let f = Fuse::new(self); + assert_future::<Self::Output, _>(f) + } + + /// Do something with the output of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 1 }; + /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x)); + /// assert_eq!(new_future.await, 1); + /// # }); + /// ``` + fn inspect<F>(self, f: F) -> Inspect<Self, F> + where + F: FnOnce(&Self::Output), + Self: Sized, + { + assert_future::<Self::Output, _>(Inspect::new(self, f)) + } + + /// Catches unwinding panics while polling the future. + /// + /// In general, panics within a future can propagate all the way out to the + /// task level. This combinator makes it possible to halt unwinding within + /// the future itself. It's most commonly used within task executors. It's + /// not recommended to use this for error handling. + /// + /// Note that this method requires the `UnwindSafe` bound from the standard + /// library. This isn't always applied automatically, and the standard + /// library provides an `AssertUnwindSafe` wrapper type to apply it + /// after-the fact. To assist using this method, the `Future` trait is also + /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{self, FutureExt, Ready}; + /// + /// let future = future::ready(2); + /// assert!(future.catch_unwind().await.is_ok()); + /// + /// let future = future::lazy(|_| -> Ready<i32> { + /// unimplemented!() + /// }); + /// assert!(future.catch_unwind().await.is_err()); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn catch_unwind(self) -> CatchUnwind<Self> + where + Self: Sized + ::std::panic::UnwindSafe, + { + CatchUnwind::new(self) + } + + /// Create a cloneable handle to this future where all handles will resolve + /// to the same result. + /// + /// The `shared` combinator method provides a method to convert any future + /// into a cloneable future. It enables a future to be polled by multiple + /// threads. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// + /// let future = async { 6 }; + /// let shared1 = future.shared(); + /// let shared2 = shared1.clone(); + /// + /// assert_eq!(6, shared1.await); + /// assert_eq!(6, shared2.await); + /// # }); + /// ``` + /// + /// ``` + /// // Note, unlike most examples this is written in the context of a + /// // synchronous function to better illustrate the cross-thread aspect of + /// // the `shared` combinator. + /// + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::executor::block_on; + /// use std::thread; + /// + /// let future = async { 6 }; + /// let shared1 = future.shared(); + /// let shared2 = shared1.clone(); + /// let join_handle = thread::spawn(move || { + /// assert_eq!(6, block_on(shared2)); + /// }); + /// assert_eq!(6, shared1.await); + /// join_handle.join().unwrap(); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn shared(self) -> Shared<Self> + where + Self: Sized, + Self::Output: Clone, + { + Shared::new(self) + } + + /// Turn this future into a future that yields `()` on completion and sends + /// its output to another future on a separate task. + /// + /// This can be used with spawning executors to easily retrieve the result + /// of a future executing on a separate task or thread. + /// + /// This method is only available when the `std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "channel")] + #[cfg(feature = "std")] + fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) + where + Self: Sized, + { + remote_handle::remote_handle(self) + } + + /// Wrap the future in a Box, pinning it. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "alloc")] + fn boxed<'a>(self) -> BoxFuture<'a, Self::Output> + where + Self: Sized + Send + 'a, + { + Box::pin(self) + } + + /// Wrap the future in a Box, pinning it. + /// + /// Similar to `boxed`, but without the `Send` requirement. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "alloc")] + fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> + where + Self: Sized + 'a, + { + Box::pin(self) + } + + /// Turns a [`Future<Output = T>`](Future) into a + /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture). + fn unit_error(self) -> UnitError<Self> + where + Self: Sized, + { + UnitError::new(self) + } + + /// Turns a [`Future<Output = T>`](Future) into a + /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture). + fn never_error(self) -> NeverError<Self> + where + Self: Sized, + { + NeverError::new(self) + } + + /// A convenience for calling `Future::poll` on `Unpin` future types. + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> + where + Self: Unpin, + { + Pin::new(self).poll(cx) + } + + /// Evaluates and consumes the future, returning the resulting output if + /// the future is ready after the first call to `Future::poll`. + /// + /// If `poll` instead returns `Poll::Pending`, `None` is returned. + /// + /// This method is useful in cases where immediacy is more important than + /// waiting for a result. It is also convenient for quickly obtaining + /// the value of a future that is known to always resolve immediately. + /// + /// # Examples + /// + /// ``` + /// # use futures::prelude::*; + /// use futures::{future::ready, future::pending}; + /// let future_ready = ready("foobar"); + /// let future_pending = pending::<&'static str>(); + /// + /// assert_eq!(future_ready.now_or_never(), Some("foobar")); + /// assert_eq!(future_pending.now_or_never(), None); + /// ``` + /// + /// In cases where it is absolutely known that a future should always + /// resolve immediately and never return `Poll::Pending`, this method can + /// be combined with `expect()`: + /// + /// ``` + /// # use futures::{prelude::*, future::ready}; + /// let future_ready = ready("foobar"); + /// + /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar"); + /// ``` + fn now_or_never(mut self) -> Option<Self::Output> + where + Self: Sized, + { + let noop_waker = crate::task::noop_waker(); + let mut cx = Context::from_waker(&noop_waker); + + // SAFETY: This is safe because this method consumes the future, so `poll` is + // only going to be called once. Thus it doesn't matter to us if the + // future is `Unpin` or not. + let pinned = unsafe { Pin::new_unchecked(&mut self) }; + + match pinned.poll(&mut cx) { + Poll::Ready(x) => Some(x), + _ => None, + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/never_error.rs b/third_party/rust/futures-util/src/future/future/never_error.rs new file mode 100644 index 0000000000..5a68e6f952 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/never_error.rs @@ -0,0 +1,36 @@ +use crate::never::Never; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{self, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`never_error`](super::FutureExt::never_error) combinator. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct NeverError<Fut> { + future: Fut, +} + +impl<Fut> NeverError<Fut> { + unsafe_pinned!(future: Fut); + + pub(super) fn new(future: Fut) -> NeverError<Fut> { + NeverError { future } + } +} + +impl<Fut: Unpin> Unpin for NeverError<Fut> {} + +impl<Fut: FusedFuture> FusedFuture for NeverError<Fut> { + fn is_terminated(&self) -> bool { self.future.is_terminated() } +} + +impl<Fut, T> Future for NeverError<Fut> + where Fut: Future<Output = T>, +{ + type Output = Result<T, Never>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.future().poll(cx).map(Ok) + } +} diff --git a/third_party/rust/futures-util/src/future/future/remote_handle.rs b/third_party/rust/futures-util/src/future/future/remote_handle.rs new file mode 100644 index 0000000000..11b2a65af7 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/remote_handle.rs @@ -0,0 +1,114 @@ +use { + crate::future::{CatchUnwind, FutureExt}, + futures_channel::oneshot::{self, Sender, Receiver}, + futures_core::{ + future::Future, + task::{Context, Poll}, + }, + pin_utils::{unsafe_pinned, unsafe_unpinned}, + std::{ + any::Any, + fmt, + panic::{self, AssertUnwindSafe}, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread, + }, +}; + +/// The handle to a remote future returned by +/// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this, +/// the remote future will be woken up to be dropped by the executor. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct RemoteHandle<T> { + rx: Receiver<thread::Result<T>>, + keep_running: Arc<AtomicBool>, +} + +impl<T> RemoteHandle<T> { + /// Drops this handle *without* canceling the underlying future. + /// + /// This method can be used if you want to drop the handle, but let the + /// execution continue. + pub fn forget(self) { + self.keep_running.store(true, Ordering::SeqCst); + } +} + +impl<T: Send + 'static> Future for RemoteHandle<T> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + match ready!(self.rx.poll_unpin(cx)) { + Ok(Ok(output)) => Poll::Ready(output), + Ok(Err(e)) => panic::resume_unwind(e), + Err(e) => panic::resume_unwind(Box::new(e)), + } + } +} + +type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>; + +/// A future which sends its output to the corresponding `RemoteHandle`. +/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Remote<Fut: Future> { + tx: Option<Sender<SendMsg<Fut>>>, + keep_running: Arc<AtomicBool>, + future: CatchUnwind<AssertUnwindSafe<Fut>>, +} + +impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Remote") + .field(&self.future) + .finish() + } +} + +impl<Fut: Future + Unpin> Unpin for Remote<Fut> {} + +impl<Fut: Future> Remote<Fut> { + unsafe_pinned!(future: CatchUnwind<AssertUnwindSafe<Fut>>); + unsafe_unpinned!(tx: Option<Sender<SendMsg<Fut>>>); +} + +impl<Fut: Future> Future for Remote<Fut> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if let Poll::Ready(_) = self.as_mut().tx().as_mut().unwrap().poll_canceled(cx) { + if !self.keep_running.load(Ordering::SeqCst) { + // Cancelled, bail out + return Poll::Ready(()) + } + } + + let output = ready!(self.as_mut().future().poll(cx)); + + // if the receiving end has gone away then that's ok, we just ignore the + // send error here. + drop(self.as_mut().tx().take().unwrap().send(output)); + Poll::Ready(()) + } +} + +pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) { + let (tx, rx) = oneshot::channel(); + let keep_running = Arc::new(AtomicBool::new(false)); + + // AssertUnwindSafe is used here because `Send + 'static` is basically + // an alias for an implementation of the `UnwindSafe` trait but we can't + // express that in the standard library right now. + let wrapped = Remote { + future: AssertUnwindSafe(future).catch_unwind(), + tx: Some(tx), + keep_running: keep_running.clone(), + }; + + (wrapped, RemoteHandle { rx, keep_running }) +} diff --git a/third_party/rust/futures-util/src/future/future/shared.rs b/third_party/rust/futures-util/src/future/future/shared.rs new file mode 100644 index 0000000000..816f5dd007 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/shared.rs @@ -0,0 +1,335 @@ +use crate::task::{ArcWake, waker_ref}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; +use slab::Slab; +use std::cell::UnsafeCell; +use std::fmt; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; + +/// Future for the [`shared`](super::FutureExt::shared) method. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Shared<Fut: Future> { + inner: Option<Arc<Inner<Fut>>>, + waker_key: usize, +} + +struct Inner<Fut: Future> { + future_or_output: UnsafeCell<FutureOrOutput<Fut>>, + notifier: Arc<Notifier>, +} + +struct Notifier { + state: AtomicUsize, + wakers: Mutex<Option<Slab<Option<Waker>>>>, +} + +// The future itself is polled behind the `Arc`, so it won't be moved +// when `Shared` is moved. +impl<Fut: Future> Unpin for Shared<Fut> {} + +impl<Fut: Future> fmt::Debug for Shared<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("inner", &self.inner) + .field("waker_key", &self.waker_key) + .finish() + } +} + +impl<Fut: Future> fmt::Debug for Inner<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Inner").finish() + } +} + +enum FutureOrOutput<Fut: Future> { + Future(Fut), + Output(Fut::Output), +} + +unsafe impl<Fut> Send for Inner<Fut> +where + Fut: Future + Send, + Fut::Output: Send + Sync, +{} + +unsafe impl<Fut> Sync for Inner<Fut> +where + Fut: Future + Send, + Fut::Output: Send + Sync, +{} + +const IDLE: usize = 0; +const POLLING: usize = 1; +const REPOLL: usize = 2; +const COMPLETE: usize = 3; +const POISONED: usize = 4; + +const NULL_WAKER_KEY: usize = usize::max_value(); + +impl<Fut: Future> Shared<Fut> { + pub(super) fn new(future: Fut) -> Shared<Fut> { + let inner = Inner { + future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)), + notifier: Arc::new(Notifier { + state: AtomicUsize::new(IDLE), + wakers: Mutex::new(Some(Slab::new())), + }), + }; + + Shared { + inner: Some(Arc::new(inner)), + waker_key: NULL_WAKER_KEY, + } + } +} + +impl<Fut> Shared<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + /// Returns [`Some`] containing a reference to this [`Shared`]'s output if + /// it has already been computed by a clone or [`None`] if it hasn't been + /// computed yet or this [`Shared`] already returned its output from + /// [`poll`](Future::poll). + pub fn peek(&self) -> Option<&Fut::Output> { + if let Some(inner) = self.inner.as_ref() { + match inner.notifier.state.load(SeqCst) { + COMPLETE => unsafe { return Some(inner.output()) }, + POISONED => panic!("inner future panicked during poll"), + _ => {} + } + } + None + } + + /// Registers the current task to receive a wakeup when `Inner` is awoken. + fn set_waker(&mut self, cx: &mut Context<'_>) { + // Acquire the lock first before checking COMPLETE to ensure there + // isn't a race. + let mut wakers_guard = if let Some(inner) = self.inner.as_ref() { + inner.notifier.wakers.lock().unwrap() + } else { + return; + }; + + let wakers = if let Some(wakers) = wakers_guard.as_mut() { + wakers + } else { + return; + }; + + if self.waker_key == NULL_WAKER_KEY { + self.waker_key = wakers.insert(Some(cx.waker().clone())); + } else { + let waker_slot = &mut wakers[self.waker_key]; + let needs_replacement = if let Some(_old_waker) = waker_slot { + // If there's still an unwoken waker in the slot, only replace + // if the current one wouldn't wake the same task. + // TODO: This API is currently not available, so replace always + // !waker.will_wake_nonlocal(old_waker) + true + } else { + true + }; + if needs_replacement { + *waker_slot = Some(cx.waker().clone()); + } + } + debug_assert!(self.waker_key != NULL_WAKER_KEY); + } + + /// Safety: callers must first ensure that `self.inner.state` + /// is `COMPLETE` + unsafe fn take_or_clone_output(&mut self) -> Fut::Output { + let inner = self.inner.take().unwrap(); + + match Arc::try_unwrap(inner) { + Ok(inner) => match inner.future_or_output.into_inner() { + FutureOrOutput::Output(item) => item, + FutureOrOutput::Future(_) => unreachable!(), + }, + Err(inner) => inner.output().clone(), + } + } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + /// Safety: callers must first ensure that `self.inner.state` + /// is `COMPLETE` + unsafe fn output(&self) -> &Fut::Output { + match &*self.future_or_output.get() { + FutureOrOutput::Output(ref item) => &item, + FutureOrOutput::Future(_) => unreachable!(), + } + } +} + +impl<Fut> FusedFuture for Shared<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<Fut> Future for Shared<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + + this.set_waker(cx); + + let inner = if let Some(inner) = this.inner.as_ref() { + inner + } else { + panic!("Shared future polled again after completion"); + }; + + match inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) { + IDLE => { + // Lock acquired, fall through + } + POLLING | REPOLL => { + // Another task is currently polling, at this point we just want + // to ensure that the waker for this task is registered + + return Poll::Pending; + } + COMPLETE => { + // Safety: We're in the COMPLETE state + return unsafe { Poll::Ready(this.take_or_clone_output()) }; + } + POISONED => panic!("inner future panicked during poll"), + _ => unreachable!(), + } + + let waker = waker_ref(&inner.notifier); + let mut cx = Context::from_waker(&waker); + + struct Reset<'a>(&'a AtomicUsize); + + impl Drop for Reset<'_> { + fn drop(&mut self) { + use std::thread; + + if thread::panicking() { + self.0.store(POISONED, SeqCst); + } + } + } + + let _reset = Reset(&inner.notifier.state); + + let output = loop { + let future = unsafe { + match &mut *inner.future_or_output.get() { + FutureOrOutput::Future(fut) => Pin::new_unchecked(fut), + _ => unreachable!(), + } + }; + + let poll = future.poll(&mut cx); + + match poll { + Poll::Pending => { + let state = &inner.notifier.state; + match state.compare_and_swap(POLLING, IDLE, SeqCst) { + POLLING => { + // Success + return Poll::Pending; + } + REPOLL => { + // Was woken since: Gotta poll again! + let prev = state.swap(POLLING, SeqCst); + assert_eq!(prev, REPOLL); + } + _ => unreachable!(), + } + } + Poll::Ready(output) => break output, + } + }; + + unsafe { + *inner.future_or_output.get() = + FutureOrOutput::Output(output); + } + + inner.notifier.state.store(COMPLETE, SeqCst); + + // Wake all tasks and drop the slab + let mut wakers_guard = inner.notifier.wakers.lock().unwrap(); + let wakers = &mut wakers_guard.take().unwrap(); + for (_key, opt_waker) in wakers { + if let Some(waker) = opt_waker.take() { + waker.wake(); + } + } + + drop(_reset); // Make borrow checker happy + drop(wakers_guard); + + // Safety: We're in the COMPLETE state + unsafe { Poll::Ready(this.take_or_clone_output()) } + } +} + +impl<Fut> Clone for Shared<Fut> +where + Fut: Future, +{ + fn clone(&self) -> Self { + Shared { + inner: self.inner.clone(), + waker_key: NULL_WAKER_KEY, + } + } +} + +impl<Fut> Drop for Shared<Fut> +where + Fut: Future, +{ + fn drop(&mut self) { + if self.waker_key != NULL_WAKER_KEY { + if let Some(ref inner) = self.inner { + if let Ok(mut wakers) = inner.notifier.wakers.lock() { + if let Some(wakers) = wakers.as_mut() { + wakers.remove(self.waker_key); + } + } + } + } + } +} + +impl ArcWake for Notifier { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst); + + let wakers = &mut *arc_self.wakers.lock().unwrap(); + if let Some(wakers) = wakers.as_mut() { + for (_key, opt_waker) in wakers { + if let Some(waker) = opt_waker.take() { + waker.wake(); + } + } + } + } +} diff --git a/third_party/rust/futures-util/src/future/future/then.rs b/third_party/rust/futures-util/src/future/future/then.rs new file mode 100644 index 0000000000..9f30f09864 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/then.rs @@ -0,0 +1,46 @@ +use super::Chain; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`then`](super::FutureExt::then) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Then<Fut1, Fut2, F> { + chain: Chain<Fut1, Fut2, F>, +} + +impl<Fut1, Fut2, F> Then<Fut1, Fut2, F> + where Fut1: Future, + Fut2: Future, +{ + unsafe_pinned!(chain: Chain<Fut1, Fut2, F>); + + /// Creates a new `Then`. + pub(super) fn new(future: Fut1, f: F) -> Then<Fut1, Fut2, F> { + Then { + chain: Chain::new(future, f), + } + } +} + +impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F> + where Fut1: Future, + Fut2: Future, + F: FnOnce(Fut1::Output) -> Fut2, +{ + fn is_terminated(&self) -> bool { self.chain.is_terminated() } +} + +impl<Fut1, Fut2, F> Future for Then<Fut1, Fut2, F> + where Fut1: Future, + Fut2: Future, + F: FnOnce(Fut1::Output) -> Fut2, +{ + type Output = Fut2::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut2::Output> { + self.as_mut().chain().poll(cx, |output, f| f(output)) + } +} diff --git a/third_party/rust/futures-util/src/future/future/unit_error.rs b/third_party/rust/futures-util/src/future/future/unit_error.rs new file mode 100644 index 0000000000..679e988b16 --- /dev/null +++ b/third_party/rust/futures-util/src/future/future/unit_error.rs @@ -0,0 +1,35 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`unit_error`](super::FutureExt::unit_error) combinator. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct UnitError<Fut> { + future: Fut, +} + +impl<Fut> UnitError<Fut> { + unsafe_pinned!(future: Fut); + + pub(super) fn new(future: Fut) -> UnitError<Fut> { + UnitError { future } + } +} + +impl<Fut: Unpin> Unpin for UnitError<Fut> {} + +impl<Fut: FusedFuture> FusedFuture for UnitError<Fut> { + fn is_terminated(&self) -> bool { self.future.is_terminated() } +} + +impl<Fut, T> Future for UnitError<Fut> + where Fut: Future<Output = T>, +{ + type Output = Result<T, ()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, ()>> { + self.future().poll(cx).map(Ok) + } +} diff --git a/third_party/rust/futures-util/src/future/join.rs b/third_party/rust/futures-util/src/future/join.rs new file mode 100644 index 0000000000..5af5b408e9 --- /dev/null +++ b/third_party/rust/futures-util/src/future/join.rs @@ -0,0 +1,214 @@ +#![allow(non_snake_case)] + +use crate::future::{MaybeDone, maybe_done}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::{Future, FusedFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; +use super::assert_future; + +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Join:ident, <$($Fut:ident),*>), + )*) => ($( + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<$($Fut: Future),*> { + $($Fut: MaybeDone<$Fut>,)* + } + + impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*> + where + $( + $Fut: Future + fmt::Debug, + $Fut::Output: fmt::Debug, + )* + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(stringify!($Join)) + $(.field(stringify!($Fut), &self.$Fut))* + .finish() + } + } + + impl<$($Fut: Future),*> $Join<$($Fut),*> { + fn new($($Fut: $Fut),*) -> $Join<$($Fut),*> { + $Join { + $($Fut: maybe_done($Fut)),* + } + } + $( + unsafe_pinned!($Fut: MaybeDone<$Fut>); + )* + } + + impl<$($Fut: Future),*> Future for $Join<$($Fut),*> { + type Output = ($($Fut::Output),*); + + fn poll( + mut self: Pin<&mut Self>, cx: &mut Context<'_> + ) -> Poll<Self::Output> { + let mut all_done = true; + $( + all_done &= self.as_mut().$Fut().poll(cx).is_ready(); + )* + + if all_done { + Poll::Ready(($(self.as_mut().$Fut().take_output().unwrap()), *)) + } else { + Poll::Pending + } + } + } + + impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> { + fn is_terminated(&self) -> bool { + $( + self.$Fut.is_terminated() + ) && * + } + } + )*) +} + +generate! { + /// Future for the [`join`](join()) function. + (Join, <Fut1, Fut2>), + + /// Future for the [`join3`] function. + (Join3, <Fut1, Fut2, Fut3>), + + /// Future for the [`join4`] function. + (Join4, <Fut1, Fut2, Fut3, Fut4>), + + /// Future for the [`join5`] function. + (Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>), +} + +/// Joins the result of two futures, waiting for them both to complete. +/// +/// This function will return a new future which awaits both futures to +/// complete. The returned future will finish with a tuple of both results. +/// +/// Note that this function consumes the passed futures and returns a +/// wrapped version of it. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let pair = future::join(a, b); +/// +/// assert_eq!(pair.await, (1, 2)); +/// # }); +/// ``` +pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2> +where + Fut1: Future, + Fut2: Future, +{ + let f = Join::new(future1, future2); + assert_future::<(Fut1::Output, Fut2::Output), _>(f) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let tuple = future::join3(a, b, c); +/// +/// assert_eq!(tuple.await, (1, 2, 3)); +/// # }); +/// ``` +pub fn join3<Fut1, Fut2, Fut3>( + future1: Fut1, + future2: Fut2, + future3: Fut3, +) -> Join3<Fut1, Fut2, Fut3> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, +{ + Join3::new(future1, future2, future3) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let d = async { 4 }; +/// let tuple = future::join4(a, b, c, d); +/// +/// assert_eq!(tuple.await, (1, 2, 3, 4)); +/// # }); +/// ``` +pub fn join4<Fut1, Fut2, Fut3, Fut4>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, +) -> Join4<Fut1, Fut2, Fut3, Fut4> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, + Fut4: Future, +{ + Join4::new(future1, future2, future3, future4) +} + +/// Same as [`join`](join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = async { 1 }; +/// let b = async { 2 }; +/// let c = async { 3 }; +/// let d = async { 4 }; +/// let e = async { 5 }; +/// let tuple = future::join5(a, b, c, d, e); +/// +/// assert_eq!(tuple.await, (1, 2, 3, 4, 5)); +/// # }); +/// ``` +pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, + future5: Fut5, +) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, + Fut4: Future, + Fut5: Future, +{ + Join5::new(future1, future2, future3, future4, future5) +} diff --git a/third_party/rust/futures-util/src/future/join_all.rs b/third_party/rust/futures-util/src/future/join_all.rs new file mode 100644 index 0000000000..07408856a4 --- /dev/null +++ b/third_party/rust/futures-util/src/future/join_all.rs @@ -0,0 +1,159 @@ +//! Definition of the `JoinAll` combinator, waiting for all of a list of futures +//! to finish. + +use core::fmt; +use core::future::Future; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use alloc::boxed::Box; +use alloc::vec::Vec; + +#[derive(Debug)] +enum ElemState<F> +where + F: Future, +{ + Pending(F), + Done(Option<F::Output>), +} + +impl<F> ElemState<F> +where + F: Future, +{ + fn pending_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut F>> { + // Safety: Basic enum pin projection, no drop + optionally Unpin based + // on the type of this variant + match unsafe { self.get_unchecked_mut() } { + ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }), + ElemState::Done(_) => None, + } + } + + fn take_done(self: Pin<&mut Self>) -> Option<F::Output> { + // Safety: Going from pin to a variant we never pin-project + match unsafe { self.get_unchecked_mut() } { + ElemState::Pending(_) => None, + ElemState::Done(output) => output.take(), + } + } +} + +impl<F> Unpin for ElemState<F> +where + F: Future + Unpin, +{ +} + +fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { + // Safety: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { slice.get_unchecked_mut() } + .iter_mut() + .map(|t| unsafe { Pin::new_unchecked(t) }) +} + +/// Future for the [`join_all`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct JoinAll<F> +where + F: Future, +{ + elems: Pin<Box<[ElemState<F>]>>, +} + +impl<F> fmt::Debug for JoinAll<F> +where + F: Future + fmt::Debug, + F::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JoinAll") + .field("elems", &self.elems) + .finish() + } +} + +/// Creates a future which represents a collection of the outputs of the futures +/// given. +/// +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec<T>` in the same order as they +/// were provided. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # See Also +/// +/// This is purposefully a very simple API for basic use-cases. In a lot of +/// cases you will want to use the more powerful +/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some +/// examples of additional functionality that provides: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::join_all; +/// +/// async fn foo(i: u32) -> u32 { i } +/// +/// let futures = vec![foo(1), foo(2), foo(3)]; +/// +/// assert_eq!(join_all(futures).await, [1, 2, 3]); +/// # }); +/// ``` +pub fn join_all<I>(i: I) -> JoinAll<I::Item> +where + I: IntoIterator, + I::Item: Future, +{ + let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect(); + JoinAll { elems: elems.into() } +} + +impl<F> Future for JoinAll<F> +where + F: Future, +{ + type Output = Vec<F::Output>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut all_done = true; + + for mut elem in iter_pin_mut(self.elems.as_mut()) { + if let Some(pending) = elem.as_mut().pending_pin_mut() { + if let Poll::Ready(output) = pending.poll(cx) { + elem.set(ElemState::Done(Some(output))); + } else { + all_done = false; + } + } + } + + if all_done { + let mut elems = mem::replace(&mut self.elems, Box::pin([])); + let result = iter_pin_mut(elems.as_mut()) + .map(|e| e.take_done().unwrap()) + .collect(); + Poll::Ready(result) + } else { + Poll::Pending + } + } +} + +impl<F: Future> FromIterator<F> for JoinAll<F> { + fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { + join_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/lazy.rs b/third_party/rust/futures-util/src/future/lazy.rs new file mode 100644 index 0000000000..5e72218d1f --- /dev/null +++ b/third_party/rust/futures-util/src/future/lazy.rs @@ -0,0 +1,54 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`lazy`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Lazy<F> { + f: Option<F> +} + +// safe because we never generate `Pin<&mut F>` +impl<F> Unpin for Lazy<F> {} + +/// Creates a new future that allows delayed execution of a closure. +/// +/// The provided closure is only run once the future is polled. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::lazy(|_| 1); +/// assert_eq!(a.await, 1); +/// +/// let b = future::lazy(|_| -> i32 { +/// panic!("oh no!") +/// }); +/// drop(b); // closure is never run +/// # }); +/// ``` +pub fn lazy<F, R>(f: F) -> Lazy<F> + where F: FnOnce(&mut Context<'_>) -> R, +{ + Lazy { f: Some(f) } +} + +impl<F, R> FusedFuture for Lazy<F> + where F: FnOnce(&mut Context<'_>) -> R, +{ + fn is_terminated(&self) -> bool { self.f.is_none() } +} + +impl<F, R> Future for Lazy<F> + where F: FnOnce(&mut Context<'_>) -> R, +{ + type Output = R; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> { + Poll::Ready((self.f.take().unwrap())(cx)) + } +} diff --git a/third_party/rust/futures-util/src/future/maybe_done.rs b/third_party/rust/futures-util/src/future/maybe_done.rs new file mode 100644 index 0000000000..f16f889781 --- /dev/null +++ b/third_party/rust/futures-util/src/future/maybe_done.rs @@ -0,0 +1,104 @@ +//! Definition of the MaybeDone combinator + +use core::mem; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// A future that may have completed. +/// +/// This is created by the [`maybe_done()`] function. +#[derive(Debug)] +pub enum MaybeDone<Fut: Future> { + /// A not-yet-completed future + Future(Fut), + /// The output of the completed future + Done(Fut::Output), + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take_output`](MaybeDone::take_output) method. + Gone, +} + +// Safe because we never generate `Pin<&mut Fut::Output>` +impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} + +/// Wraps a future into a `MaybeDone` +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// use futures::pin_mut; +/// +/// let future = future::maybe_done(async { 5 }); +/// pin_mut!(future); +/// assert_eq!(future.as_mut().take_output(), None); +/// let () = future.as_mut().await; +/// assert_eq!(future.as_mut().take_output(), Some(5)); +/// assert_eq!(future.as_mut().take_output(), None); +/// # }); +/// ``` +pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { + MaybeDone::Future(future) +} + +impl<Fut: Future> MaybeDone<Fut> { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take_output`](MaybeDone::take_output) + /// has not yet been called. + #[inline] + pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempt to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(_) => {}, + MaybeDone::Future(_) | MaybeDone::Gone => return None, + }; + if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { + Some(output) + } else { + unreachable!() + } + } + } +} + +impl<Fut: Future> FusedFuture for MaybeDone<Fut> { + fn is_terminated(&self) -> bool { + match self { + MaybeDone::Future(_) => false, + MaybeDone::Done(_) | MaybeDone::Gone => true, + } + } +} + +impl<Fut: Future> Future for MaybeDone<Fut> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let res = unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)), + MaybeDone::Done(_) => return Poll::Ready(()), + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + }; + self.set(MaybeDone::Done(res)); + Poll::Ready(()) + } +} diff --git a/third_party/rust/futures-util/src/future/mod.rs b/third_party/rust/futures-util/src/future/mod.rs new file mode 100644 index 0000000000..3f4bb01436 --- /dev/null +++ b/third_party/rust/futures-util/src/future/mod.rs @@ -0,0 +1,110 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the [`FutureExt`] trait and the [`TryFutureExt`] trait which add +//! methods to `Future` types. + +#[cfg(feature = "alloc")] +pub use futures_core::future::{BoxFuture, LocalBoxFuture}; +pub use futures_core::future::{FusedFuture, Future, TryFuture}; +pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; + +// Extension traits and combinators + +#[allow(clippy::module_inception)] +mod future; +pub use self::future::{ + Flatten, FlattenStream, Fuse, FutureExt, Inspect, IntoStream, Map, NeverError, Then, UnitError, +}; + +#[cfg(feature = "std")] +pub use self::future::CatchUnwind; + +#[cfg(feature = "channel")] +#[cfg(feature = "std")] +pub use self::future::{Remote, RemoteHandle}; + +#[cfg(feature = "std")] +pub use self::future::Shared; + +mod try_future; +pub use self::try_future::{ + AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, OrElse, TryFlattenStream, + TryFutureExt, UnwrapOrElse, +}; + +#[cfg(feature = "sink")] +pub use self::try_future::FlattenSink; + +// Primitive futures + +mod lazy; +pub use self::lazy::{lazy, Lazy}; + +mod pending; +pub use self::pending::{pending, Pending}; + +mod maybe_done; +pub use self::maybe_done::{maybe_done, MaybeDone}; + +mod option; +pub use self::option::OptionFuture; + +mod poll_fn; +pub use self::poll_fn::{poll_fn, PollFn}; + +mod ready; +pub use self::ready::{err, ok, ready, Ready}; + +mod join; +pub use self::join::{join, join3, join4, join5, Join, Join3, Join4, Join5}; + +#[cfg(feature = "alloc")] +mod join_all; +#[cfg(feature = "alloc")] +pub use self::join_all::{join_all, JoinAll}; + +mod select; +pub use self::select::{select, Select}; + +#[cfg(feature = "alloc")] +mod select_all; +#[cfg(feature = "alloc")] +pub use self::select_all::{select_all, SelectAll}; + +mod try_join; +pub use self::try_join::{ + try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5, +}; + +#[cfg(feature = "alloc")] +mod try_join_all; +#[cfg(feature = "alloc")] +pub use self::try_join_all::{try_join_all, TryJoinAll}; + +mod try_select; +pub use self::try_select::{try_select, TrySelect}; + +#[cfg(feature = "alloc")] +mod select_ok; +#[cfg(feature = "alloc")] +pub use self::select_ok::{select_ok, SelectOk}; + +mod either; +pub use self::either::Either; + +cfg_target_has_atomic! { + #[cfg(feature = "alloc")] + mod abortable; + #[cfg(feature = "alloc")] + pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted}; +} + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +fn assert_future<T, F>(future: F) -> F +where + F: Future<Output = T>, +{ + future +} diff --git a/third_party/rust/futures-util/src/future/option.rs b/third_party/rust/futures-util/src/future/option.rs new file mode 100644 index 0000000000..21413525d0 --- /dev/null +++ b/third_party/rust/futures-util/src/future/option.rs @@ -0,0 +1,62 @@ +//! Definition of the `Option` (optional step) combinator + +use core::pin::Pin; +use futures_core::future::{Future, FusedFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// A future representing a value which may or may not be present. +/// +/// Created by the [`From`] implementation for [`Option`](std::option::Option). +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::OptionFuture; +/// +/// let mut a: OptionFuture<_> = Some(async { 123 }).into(); +/// assert_eq!(a.await, Some(123)); +/// +/// a = None.into(); +/// assert_eq!(a.await, None); +/// # }); +/// ``` +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct OptionFuture<F> { + option: Option<F>, +} + +impl<F> OptionFuture<F> { + unsafe_pinned!(option: Option<F>); +} + +impl<F: Future> Future for OptionFuture<F> { + type Output = Option<F::Output>; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + match self.option().as_pin_mut() { + Some(x) => x.poll(cx).map(Some), + None => Poll::Ready(None), + } + } +} + +impl<F: FusedFuture> FusedFuture for OptionFuture<F> { + fn is_terminated(&self) -> bool { + match &self.option { + Some(x) => x.is_terminated(), + None => true, + } + } +} + +impl<T> From<Option<T>> for OptionFuture<T> { + fn from(option: Option<T>) -> Self { + OptionFuture { option } + } +} diff --git a/third_party/rust/futures-util/src/future/pending.rs b/third_party/rust/futures-util/src/future/pending.rs new file mode 100644 index 0000000000..5a7bbb8d59 --- /dev/null +++ b/third_party/rust/futures-util/src/future/pending.rs @@ -0,0 +1,56 @@ +use core::marker; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`pending()`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Pending<T> { + _data: marker::PhantomData<T>, +} + +impl<T> FusedFuture for Pending<T> { + fn is_terminated(&self) -> bool { + true + } +} + +/// Creates a future which never resolves, representing a computation that never +/// finishes. +/// +/// The returned future will forever return [`Poll::Pending`]. +/// +/// # Examples +/// +/// ```ignore +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let future = future::pending(); +/// let () = future.await; +/// unreachable!(); +/// # }); +/// ``` +pub fn pending<T>() -> Pending<T> { + Pending { + _data: marker::PhantomData, + } +} + +impl<T> Future for Pending<T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> { + Poll::Pending + } +} + +impl<T> Unpin for Pending<T> { +} + +impl<T> Clone for Pending<T> { + fn clone(&self) -> Self { + pending() + } +} diff --git a/third_party/rust/futures-util/src/future/poll_fn.rs b/third_party/rust/futures-util/src/future/poll_fn.rs new file mode 100644 index 0000000000..b7b10be85d --- /dev/null +++ b/third_party/rust/futures-util/src/future/poll_fn.rs @@ -0,0 +1,56 @@ +//! Definition of the `PollFn` adapter combinator + +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +/// Future for the [`poll_fn`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PollFn<F> { + f: F, +} + +impl<F> Unpin for PollFn<F> {} + +/// Creates a new future wrapping around a function returning [`Poll`]. +/// +/// Polling the returned future delegates to the wrapped function. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::poll_fn; +/// use futures::task::{Context, Poll}; +/// +/// fn read_line(_cx: &mut Context<'_>) -> Poll<String> { +/// Poll::Ready("Hello, World!".into()) +/// } +/// +/// let read_future = poll_fn(read_line); +/// assert_eq!(read_future.await, "Hello, World!".to_owned()); +/// # }); +/// ``` +pub fn poll_fn<T, F>(f: F) -> PollFn<F> +where + F: FnMut(&mut Context<'_>) -> Poll<T> +{ + PollFn { f } +} + +impl<F> fmt::Debug for PollFn<F> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl<T, F> Future for PollFn<F> + where F: FnMut(&mut Context<'_>) -> Poll<T>, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + (&mut self.f)(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/ready.rs b/third_party/rust/futures-util/src/future/ready.rs new file mode 100644 index 0000000000..48661b3d84 --- /dev/null +++ b/third_party/rust/futures-util/src/future/ready.rs @@ -0,0 +1,81 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`ready`](ready()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Ready<T>(Option<T>); + +impl<T> Ready<T> { + /// Unwraps the value from this immediately ready future. + #[inline] + pub fn into_inner(mut self) -> T { + self.0.take().unwrap() + } +} + +impl<T> Unpin for Ready<T> {} + +impl<T> FusedFuture for Ready<T> { + fn is_terminated(&self) -> bool { + self.0.is_none() + } +} + +impl<T> Future for Ready<T> { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> { + Poll::Ready(self.0.take().unwrap()) + } +} + +/// Creates a future that is immediately ready with a value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(1); +/// assert_eq!(a.await, 1); +/// # }); +/// ``` +pub fn ready<T>(t: T) -> Ready<T> { + Ready(Some(t)) +} + +/// Create a future that is immediately ready with a success value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ok::<i32, i32>(1); +/// assert_eq!(a.await, Ok(1)); +/// # }); +/// ``` +pub fn ok<T, E>(t: T) -> Ready<Result<T, E>> { + Ready(Some(Ok(t))) +} + +/// Create a future that is immediately ready with an error value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::err::<i32, i32>(1); +/// assert_eq!(a.await, Err(1)); +/// # }); +/// ``` +pub fn err<T, E>(err: E) -> Ready<Result<T, E>> { + Ready(Some(Err(err))) +} diff --git a/third_party/rust/futures-util/src/future/select.rs b/third_party/rust/futures-util/src/future/select.rs new file mode 100644 index 0000000000..91b467dca6 --- /dev/null +++ b/third_party/rust/futures-util/src/future/select.rs @@ -0,0 +1,83 @@ +use core::pin::Pin; +use futures_core::future::{Future, FusedFuture}; +use futures_core::task::{Context, Poll}; +use crate::future::{Either, FutureExt}; + +/// Future for the [`select()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Select<A, B> { + inner: Option<(A, B)>, +} + +impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt}; +/// +/// // A poor-man's join implemented on top of select +/// +/// fn join<A, B, E>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)> +/// where A: Future + Unpin, +/// B: Future + Unpin, +/// { +/// future::select(a, b).then(|either| { +/// match either { +/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(), +/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(), +/// } +/// }) +/// } +/// ``` +pub fn select<A, B>(future1: A, future2: B) -> Select<A, B> + where A: Future + Unpin, B: Future + Unpin +{ + Select { inner: Some((future1, future2)) } +} + +impl<A, B> Future for Select<A, B> +where + A: Future + Unpin, + B: Future + Unpin, +{ + type Output = Either<(A::Output, B), (B::Output, A)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + match a.poll_unpin(cx) { + Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), + Poll::Pending => match b.poll_unpin(cx) { + Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + } + } + } +} + +impl<A, B> FusedFuture for Select<A, B> +where + A: Future + Unpin, + B: Future + Unpin, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} diff --git a/third_party/rust/futures-util/src/future/select_all.rs b/third_party/rust/futures-util/src/future/select_all.rs new file mode 100644 index 0000000000..9f7fb245bf --- /dev/null +++ b/third_party/rust/futures-util/src/future/select_all.rs @@ -0,0 +1,69 @@ +use crate::future::FutureExt; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use alloc::vec::Vec; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +/// Future for the [`select_all`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SelectAll<Fut> { + inner: Vec<Fut>, +} + +impl<Fut: Unpin> Unpin for SelectAll<Fut> {} + +/// Creates a new future which will select over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready. Upon +/// completion the item resolved will be returned, along with the index of the +/// future that was ready and the list of all the remaining futures. +/// +/// There are no guarantees provided on the order of the list with the remaining +/// futures. They might be swapped around, reversed, or completely random. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_all<I>(iter: I) -> SelectAll<I::Item> + where I: IntoIterator, + I::Item: Future + Unpin, +{ + let ret = SelectAll { + inner: iter.into_iter().collect() + }; + assert!(!ret.inner.is_empty()); + ret +} + +impl<Fut: Future + Unpin> Future for SelectAll<Fut> { + type Output = (Fut::Output, usize, Vec<Fut>); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { + match f.poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + } + }); + match item { + Some((idx, res)) => { + let _ = self.inner.swap_remove(idx); + let rest = mem::replace(&mut self.inner, Vec::new()); + Poll::Ready((res, idx, rest)) + } + None => Poll::Pending, + } + } +} + +impl<Fut: Future + Unpin> FromIterator<Fut> for SelectAll<Fut> { + fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self { + select_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/select_ok.rs b/third_party/rust/futures-util/src/future/select_ok.rs new file mode 100644 index 0000000000..7f4f4d65f4 --- /dev/null +++ b/third_party/rust/futures-util/src/future/select_ok.rs @@ -0,0 +1,83 @@ +use crate::future::TryFutureExt; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use alloc::vec::Vec; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`select_ok`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SelectOk<Fut> { + inner: Vec<Fut>, +} + +impl<Fut: Unpin> Unpin for SelectOk<Fut> {} + +/// Creates a new future which will select the first successful future over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike +/// `select_all`, this will only return the first successful completion, or the last +/// failure. This is useful in contexts where any success is desired and failures +/// are ignored, unless all the futures fail. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_ok<I>(iter: I) -> SelectOk<I::Item> + where I: IntoIterator, + I::Item: TryFuture + Unpin, +{ + let ret = SelectOk { + inner: iter.into_iter().collect() + }; + assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); + ret +} + +impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { + type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // loop until we've either exhausted all errors, a success was hit, or nothing is ready + loop { + let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { + match f.try_poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + } + }); + match item { + Some((idx, res)) => { + // always remove Ok or Err, if it's not the last Err continue looping + drop(self.inner.remove(idx)); + match res { + Ok(e) => { + let rest = mem::replace(&mut self.inner, Vec::new()); + return Poll::Ready(Ok((e, rest))) + } + Err(e) => { + if self.inner.is_empty() { + return Poll::Ready(Err(e)) + } + } + } + } + None => { + // based on the filter above, nothing is ready, return + return Poll::Pending + } + } + } + } +} + +impl<Fut: TryFuture + Unpin> FromIterator<Fut> for SelectOk<Fut> { + fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self { + select_ok(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/and_then.rs b/third_party/rust/futures-util/src/future/try_future/and_then.rs new file mode 100644 index 0000000000..37333e0503 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/and_then.rs @@ -0,0 +1,53 @@ +use super::{TryChain, TryChainAction}; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`and_then`](super::TryFutureExt::and_then) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct AndThen<Fut1, Fut2, F> { + try_chain: TryChain<Fut1, Fut2, F>, +} + +impl<Fut1, Fut2, F> AndThen<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture, +{ + unsafe_pinned!(try_chain: TryChain<Fut1, Fut2, F>); + + /// Creates a new `Then`. + pub(super) fn new(future: Fut1, f: F) -> AndThen<Fut1, Fut2, F> { + AndThen { + try_chain: TryChain::new(future, f), + } + } +} + +impl<Fut1, Fut2, F> FusedFuture for AndThen<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + F: FnOnce(Fut1::Ok) -> Fut2, +{ + fn is_terminated(&self) -> bool { + self.try_chain.is_terminated() + } +} + +impl<Fut1, Fut2, F> Future for AndThen<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + F: FnOnce(Fut1::Ok) -> Fut2, +{ + type Output = Result<Fut2::Ok, Fut2::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.try_chain().poll(cx, |result, async_op| { + match result { + Ok(ok) => TryChainAction::Future(async_op(ok)), + Err(err) => TryChainAction::Output(Err(err)), + } + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/err_into.rs b/third_party/rust/futures-util/src/future/try_future/err_into.rs new file mode 100644 index 0000000000..731fcae39e --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/err_into.rs @@ -0,0 +1,48 @@ +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`err_into`](super::TryFutureExt::err_into) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ErrInto<Fut, E> { + future: Fut, + _marker: PhantomData<E>, +} + +impl<Fut: Unpin, E> Unpin for ErrInto<Fut, E> {} + +impl<Fut, E> ErrInto<Fut, E> { + unsafe_pinned!(future: Fut); + + pub(super) fn new(future: Fut) -> ErrInto<Fut, E> { + ErrInto { + future, + _marker: PhantomData, + } + } +} + +impl<Fut, E> FusedFuture for ErrInto<Fut, E> + where Fut: TryFuture + FusedFuture, + Fut::Error: Into<E>, +{ + fn is_terminated(&self) -> bool { self.future.is_terminated() } +} + +impl<Fut, E> Future for ErrInto<Fut, E> + where Fut: TryFuture, + Fut::Error: Into<E>, +{ + type Output = Result<Fut::Ok, E>; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.future().try_poll(cx) + .map(|res| res.map_err(Into::into)) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs b/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs new file mode 100644 index 0000000000..d6863dd2cd --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/flatten_sink.rs @@ -0,0 +1,76 @@ +use super::FlattenStreamSink; +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_utils::unsafe_pinned; + +/// Sink for the [`flatten_sink`](super::TryFutureExt::flatten_sink) method. +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct FlattenSink<Fut, Si> +where + Fut: TryFuture<Ok = Si>, +{ + inner: FlattenStreamSink<Fut>, +} + +impl<Fut, Si> FlattenSink<Fut, Si> +where + Fut: TryFuture<Ok = Si>, +{ + unsafe_pinned!(inner: FlattenStreamSink<Fut>); + + pub(super) fn new(future: Fut) -> Self { + Self { + inner: FlattenStreamSink::new(future), + } + } +} + +impl<Fut, S> FusedStream for FlattenSink<Fut, S> +where + Fut: TryFuture<Ok = S>, + S: TryStream<Error = Fut::Error> + FusedStream, +{ + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl<Fut, S> Stream for FlattenSink<Fut, S> +where + Fut: TryFuture<Ok = S>, + S: TryStream<Error = Fut::Error>, +{ + type Item = Result<S::Ok, Fut::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner().poll_next(cx) + } +} + +impl<Fut, Si, Item> Sink<Item> for FlattenSink<Fut, Si> +where + Fut: TryFuture<Ok = Si>, + Si: Sink<Item, Error = Fut::Error>, +{ + type Error = Fut::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.inner().start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_close(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs b/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs new file mode 100644 index 0000000000..5a56bf708d --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/flatten_stream_sink.rs @@ -0,0 +1,181 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_utils::unsafe_pinned; + +#[must_use = "streams do nothing unless polled"] +pub(crate) struct FlattenStreamSink<Fut> +where + Fut: TryFuture, +{ + state: State<Fut, Fut::Ok>, +} + +impl<Fut> Unpin for FlattenStreamSink<Fut> +where + Fut: TryFuture + Unpin, + Fut::Ok: Unpin, +{ +} + +impl<Fut> fmt::Debug for FlattenStreamSink<Fut> +where + Fut: TryFuture + fmt::Debug, + Fut::Ok: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenStreamSink") + .field("state", &self.state) + .finish() + } +} + +impl<Fut> FlattenStreamSink<Fut> +where + Fut: TryFuture, +{ + unsafe_pinned!(state: State<Fut, Fut::Ok>); + + pub(crate) fn new(future: Fut) -> Self { + Self { + state: State::Future(future), + } + } +} + +#[derive(Debug)] +enum State<Fut, S> { + // future is not yet called or called and not ready + Future(Fut), + // future resolved to Stream or Sink + StreamOrSink(S), + // future resolved to error + Done, +} + +impl<Fut, S> State<Fut, S> { + fn get_pin_mut(self: Pin<&mut Self>) -> State<Pin<&mut Fut>, Pin<&mut S>> { + // safety: data is never moved via the resulting &mut reference + match unsafe { self.get_unchecked_mut() } { + // safety: the future we're re-pinning here will never be moved; + // it will just be polled, then dropped in place + State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }), + // safety: the stream we're repinning here will never be moved; + // it will just be polled, then dropped in place + State::StreamOrSink(s) => State::StreamOrSink(unsafe { Pin::new_unchecked(s) }), + State::Done => State::Done, + } + } +} + +impl<Fut> State<Fut, Fut::Ok> +where + Fut: TryFuture, +{ + fn poll_future(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Fut::Error>> { + if let State::Future(f) = self.as_mut().get_pin_mut() { + match ready!(f.try_poll(cx)) { + Ok(s) => { + // Future resolved to stream. + // We do not return, but poll that + // stream in the next loop iteration. + self.set(State::StreamOrSink(s)); + } + Err(e) => { + // Future resolved to error. + // We have neither a pollable stream nor a future. + self.set(State::Done); + return Poll::Ready(Err(e)); + } + } + } + Poll::Ready(Ok(())) + } +} + +impl<Fut> FusedStream for FlattenStreamSink<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error> + FusedStream, +{ + fn is_terminated(&self) -> bool { + match &self.state { + State::Future(_) => false, + State::StreamOrSink(stream) => stream.is_terminated(), + State::Done => true, + } + } +} + +impl<Fut> Stream for FlattenStreamSink<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, +{ + type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + ready!(self.as_mut().state().poll_future(cx)?); + match self.as_mut().state().get_pin_mut() { + State::StreamOrSink(s) => s.try_poll_next(cx), + State::Done => Poll::Ready(None), + State::Future(_) => unreachable!(), + } + } +} + +#[cfg(feature = "sink")] +impl<Fut, Item> Sink<Item> for FlattenStreamSink<Fut> +where + Fut: TryFuture, + Fut::Ok: Sink<Item, Error = Fut::Error>, +{ + type Error = Fut::Error; + + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().state().poll_future(cx)?); + match self.as_mut().state().get_pin_mut() { + State::StreamOrSink(s) => s.poll_ready(cx), + State::Done => panic!("poll_ready called after eof"), + State::Future(_) => unreachable!(), + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + match self.state().get_pin_mut() { + State::StreamOrSink(s) => s.start_send(item), + State::Future(_) => panic!("poll_ready not called first"), + State::Done => panic!("start_send called after eof"), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.state().get_pin_mut() { + State::StreamOrSink(s) => s.poll_flush(cx), + // if sink not yet resolved, nothing written ==> everything flushed + State::Future(_) => Poll::Ready(Ok(())), + State::Done => panic!("poll_flush called after eof"), + } + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + let res = match self.as_mut().state().get_pin_mut() { + State::StreamOrSink(s) => s.poll_close(cx), + State::Future(_) | State::Done => Poll::Ready(Ok(())), + }; + if res.is_ready() { + self.as_mut().state().set(State::Done); + } + res + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/inspect_err.rs b/third_party/rust/futures-util/src/future/try_future/inspect_err.rs new file mode 100644 index 0000000000..8700337bb2 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/inspect_err.rs @@ -0,0 +1,53 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct InspectErr<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut: Unpin, F> Unpin for InspectErr<Fut, F> {} + +impl<Fut, F> InspectErr<Fut, F> +where + Fut: TryFuture, + F: FnOnce(&Fut::Error), +{ + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + pub(super) fn new(future: Fut, f: F) -> Self { + Self { future, f: Some(f) } + } +} + +impl<Fut, F> FusedFuture for InspectErr<Fut, F> +where + Fut: TryFuture + FusedFuture, + F: FnOnce(&Fut::Error), +{ + fn is_terminated(&self) -> bool { + self.future.is_terminated() + } +} + +impl<Fut, F> Future for InspectErr<Fut, F> +where + Fut: TryFuture, + F: FnOnce(&Fut::Error), +{ + type Output = Result<Fut::Ok, Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let e = ready!(self.as_mut().future().try_poll(cx)); + if let Err(e) = &e { + self.as_mut().f().take().expect("cannot poll InspectErr twice")(e); + } + Poll::Ready(e) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs b/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs new file mode 100644 index 0000000000..3d0a972226 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/inspect_ok.rs @@ -0,0 +1,53 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct InspectOk<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut: Unpin, F> Unpin for InspectOk<Fut, F> {} + +impl<Fut, F> InspectOk<Fut, F> +where + Fut: TryFuture, + F: FnOnce(&Fut::Ok), +{ + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + pub(super) fn new(future: Fut, f: F) -> Self { + Self { future, f: Some(f) } + } +} + +impl<Fut, F> FusedFuture for InspectOk<Fut, F> +where + Fut: TryFuture + FusedFuture, + F: FnOnce(&Fut::Ok), +{ + fn is_terminated(&self) -> bool { + self.future.is_terminated() + } +} + +impl<Fut, F> Future for InspectOk<Fut, F> +where + Fut: TryFuture, + F: FnOnce(&Fut::Ok), +{ + type Output = Result<Fut::Ok, Fut::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let e = ready!(self.as_mut().future().try_poll(cx)); + if let Ok(e) = &e { + self.as_mut().f().take().expect("cannot poll InspectOk twice")(e); + } + Poll::Ready(e) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/into_future.rs b/third_party/rust/futures-util/src/future/try_future/into_future.rs new file mode 100644 index 0000000000..a766d5b66d --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/into_future.rs @@ -0,0 +1,36 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`into_future`](super::TryFutureExt::into_future) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct IntoFuture<Fut> { + future: Fut, +} + +impl<Fut> IntoFuture<Fut> { + unsafe_pinned!(future: Fut); + + #[inline] + pub(super) fn new(future: Fut) -> IntoFuture<Fut> { + IntoFuture { future } + } +} + +impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> { + fn is_terminated(&self) -> bool { self.future.is_terminated() } +} + +impl<Fut: TryFuture> Future for IntoFuture<Fut> { + type Output = Result<Fut::Ok, Fut::Error>; + + #[inline] + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.future().try_poll(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/map_err.rs b/third_party/rust/futures-util/src/future/try_future/map_err.rs new file mode 100644 index 0000000000..8edebad86d --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/map_err.rs @@ -0,0 +1,52 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`map_err`](super::TryFutureExt::map_err) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct MapErr<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut, F> MapErr<Fut, F> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + /// Creates a new MapErr. + pub(super) fn new(future: Fut, f: F) -> MapErr<Fut, F> { + MapErr { future, f: Some(f) } + } +} + +impl<Fut: Unpin, F> Unpin for MapErr<Fut, F> {} + +impl<Fut, F, E> FusedFuture for MapErr<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Error) -> E, +{ + fn is_terminated(&self) -> bool { self.f.is_none() } +} + +impl<Fut, F, E> Future for MapErr<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Error) -> E, +{ + type Output = Result<Fut::Ok, E>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.as_mut() + .future() + .try_poll(cx) + .map(|result| { + let f = self.as_mut().f().take() + .expect("MapErr must not be polled after it returned `Poll::Ready`"); + result.map_err(f) + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/map_ok.rs b/third_party/rust/futures-util/src/future/try_future/map_ok.rs new file mode 100644 index 0000000000..ab28f1443f --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/map_ok.rs @@ -0,0 +1,54 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`map_ok`](super::TryFutureExt::map_ok) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct MapOk<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut, F> MapOk<Fut, F> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + /// Creates a new MapOk. + pub(super) fn new(future: Fut, f: F) -> MapOk<Fut, F> { + MapOk { future, f: Some(f) } + } +} + +impl<Fut: Unpin, F> Unpin for MapOk<Fut, F> {} + +impl<Fut, F, T> FusedFuture for MapOk<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Ok) -> T, +{ + fn is_terminated(&self) -> bool { + self.f.is_none() + } +} + +impl<Fut, F, T> Future for MapOk<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Ok) -> T, +{ + type Output = Result<T, Fut::Error>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.as_mut() + .future() + .try_poll(cx) + .map(|result| { + let op = self.as_mut().f().take() + .expect("MapOk must not be polled after it returned `Poll::Ready`"); + result.map(op) + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs b/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs new file mode 100644 index 0000000000..730b67922c --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/map_ok_or_else.rs @@ -0,0 +1,59 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`map_ok_or_else`](super::TryFutureExt::map_ok_or_else) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct MapOkOrElse<Fut, F, E> { + future: Fut, + f: Option<F>, + e: Option<E>, +} + +impl<Fut, F, E> MapOkOrElse<Fut, F, E> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + unsafe_unpinned!(e: Option<E>); + + /// Creates a new MapOkOrElse. + pub(super) fn new(future: Fut, e: E, f: F) -> Self { + Self { future, f: Some(f), e: Some(e) } + } +} + +impl<Fut: Unpin, F, E> Unpin for MapOkOrElse<Fut, F, E> {} + +impl<Fut, F, E, T> FusedFuture for MapOkOrElse<Fut, F, E> + where Fut: TryFuture, + F: FnOnce(Fut::Ok) -> T, + E: FnOnce(Fut::Error) -> T, +{ + fn is_terminated(&self) -> bool { + self.f.is_none() || self.e.is_none() + } +} + +impl<Fut, F, E, T> Future for MapOkOrElse<Fut, F, E> + where Fut: TryFuture, + F: FnOnce(Fut::Ok) -> T, + E: FnOnce(Fut::Error) -> T, +{ + type Output = T; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.as_mut() + .future() + .try_poll(cx) + .map(|result| { + match result { + Ok(i) => (self.as_mut().f().take().expect("MapOkOrElse must not be polled after it returned `Poll::Ready`"))(i), + Err(e) => (self.as_mut().e().take().expect("MapOkOrElse must not be polled after it returned `Poll::Ready`"))(e), + } + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/mod.rs b/third_party/rust/futures-util/src/future/try_future/mod.rs new file mode 100644 index 0000000000..e8e059e373 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/mod.rs @@ -0,0 +1,536 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the `FutureExt` trait which adds methods to `Future` types. + +#[cfg(feature = "compat")] +use crate::compat::Compat; +use core::pin::Pin; +use futures_core::{ + future::TryFuture, + stream::TryStream, + task::{Context, Poll}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +// Combinators + +mod and_then; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::and_then::AndThen; + +mod err_into; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::err_into::ErrInto; + +#[cfg(feature = "sink")] +mod flatten_sink; +#[cfg(feature = "sink")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::flatten_sink::FlattenSink; + +mod inspect_ok; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::inspect_ok::InspectOk; + +mod inspect_err; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::inspect_err::InspectErr; + +mod into_future; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::into_future::IntoFuture; + +mod map_err; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::map_err::MapErr; + +mod map_ok; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::map_ok::MapOk; + +mod map_ok_or_else; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::map_ok_or_else::MapOkOrElse; + +mod or_else; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::or_else::OrElse; + +mod try_flatten_stream; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_stream::TryFlattenStream; + +mod unwrap_or_else; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::unwrap_or_else::UnwrapOrElse; + +// Implementation details + +mod flatten_stream_sink; +pub(crate) use self::flatten_stream_sink::FlattenStreamSink; + +mod try_chain; +pub(crate) use self::try_chain::{TryChain, TryChainAction}; + +impl<Fut: ?Sized + TryFuture> TryFutureExt for Fut {} + +/// Adapters specific to [`Result`]-returning futures +pub trait TryFutureExt: TryFuture { + /// Flattens the execution of this future when the successful result of this + /// future is a [`Sink`]. + /// + /// This can be useful when sink initialization is deferred, and it is + /// convenient to work with that sink as if the sink was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::{Future, TryFutureExt}; + /// use futures::sink::Sink; + /// # use futures::channel::mpsc::{self, SendError}; + /// # type T = i32; + /// # type E = SendError; + /// + /// fn make_sink_async() -> impl Future<Output = Result< + /// impl Sink<T, Error = E>, + /// E, + /// >> { // ... } + /// # let (tx, _rx) = mpsc::unbounded::<i32>(); + /// # futures::future::ready(Ok(tx)) + /// # } + /// fn take_sink(sink: impl Sink<T, Error = E>) { /* ... */ } + /// + /// let fut = make_sink_async(); + /// take_sink(fut.flatten_sink()) + /// ``` + #[cfg(feature = "sink")] + fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok> + where + Self::Ok: Sink<Item, Error = Self::Error>, + Self: Sized, + { + FlattenSink::new(self) + } + + /// Maps this future's success value to a different value. + /// + /// This method can be used to change the [`Ok`](TryFuture::Ok) type of the + /// future into a different type. It is similar to the [`Result::map`] + /// method. You can use this method to chain along a computation once the + /// future has been resolved. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.map_ok(|x| x + 3); + /// assert_eq!(future.await, Ok(4)); + /// # }); + /// ``` + /// + /// Calling [`map_ok`](TryFutureExt::map_ok) on an errored future has no + /// effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.map_ok(|x| x + 3); + /// assert_eq!(future.await, Err(1)); + /// # }); + /// ``` + fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> + where + F: FnOnce(Self::Ok) -> T, + Self: Sized, + { + MapOk::new(self, f) + } + + /// Maps this future's success value to a different value, and permits for error handling resulting in the same type. + /// + /// This method can be used to coalesce your [`Ok`](TryFuture::Ok) type and [`Error`](TryFuture::Error) into another type, + /// where that type is the same for both outcomes. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// The provided closure `e` will only be called if this future is resolved + /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(5) }; + /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); + /// assert_eq!(future.await, 8); + /// + /// let future = async { Err::<i32, i32>(5) }; + /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); + /// assert_eq!(future.await, 10); + /// # }); + /// ``` + /// + fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E> + where + F: FnOnce(Self::Ok) -> T, + E: FnOnce(Self::Error) -> T, + Self: Sized, + { + MapOkOrElse::new(self, e, f) + } + + /// Maps this future's error value to a different value. + /// + /// This method can be used to change the [`Error`](TryFuture::Error) type + /// of the future into a different type. It is similar to the + /// [`Result::map_err`] method. You can use this method for example to + /// ensure that futures have the same [`Error`](TryFuture::Error) type when + /// using [`select!`] or [`join!`]. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then + /// the provided closure will never be invoked. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.map_err(|x| x + 3); + /// assert_eq!(future.await, Err(4)); + /// # }); + /// ``` + /// + /// Calling [`map_err`](TryFutureExt::map_err) on a successful future has + /// no effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.map_err(|x| x + 3); + /// assert_eq!(future.await, Ok(1)); + /// # }); + /// ``` + fn map_err<E, F>(self, f: F) -> MapErr<Self, F> + where + F: FnOnce(Self::Error) -> E, + Self: Sized, + { + MapErr::new(self, f) + } + + /// Maps this future's [`Error`](TryFuture::Error) to a new error type + /// using the [`Into`](std::convert::Into) trait. + /// + /// This method does for futures what the `?`-operator does for + /// [`Result`]: It lets the compiler infer the type of the resulting + /// error. Just as [`map_err`](TryFutureExt::map_err), this is useful for + /// example to ensure that futures have the same [`Error`](TryFuture::Error) + /// type when using [`select!`] or [`join!`]. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future_err_u8 = async { Err::<(), u8>(1) }; + /// let future_err_i32 = future_err_u8.err_into::<i32>(); + /// # }); + /// ``` + fn err_into<E>(self) -> ErrInto<Self, E> + where + Self: Sized, + Self::Error: Into<E>, + { + ErrInto::new(self) + } + + /// Executes another future after this one resolves successfully. The + /// success value is passed to a closure to create this subsequent future. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Ok`]. If this future resolves to an [`Err`], panics, or is + /// dropped, then the provided closure will never be invoked. The + /// [`Error`](TryFuture::Error) type of this future and the future + /// returned by `f` have to match. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.and_then(|x| async move { Ok::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Ok(4)); + /// # }); + /// ``` + /// + /// Calling [`and_then`](TryFutureExt::and_then) on an errored future has no + /// effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.and_then(|x| async move { Err::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Err(1)); + /// # }); + /// ``` + fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> + where + F: FnOnce(Self::Ok) -> Fut, + Fut: TryFuture<Error = Self::Error>, + Self: Sized, + { + AndThen::new(self, f) + } + + /// Executes another future if this one resolves to an error. The + /// error value is passed to a closure to create this subsequent future. + /// + /// The provided closure `f` will only be called if this future is resolved + /// to an [`Err`]. If this future resolves to an [`Ok`], panics, or is + /// dropped, then the provided closure will never be invoked. The + /// [`Ok`](TryFuture::Ok) type of this future and the future returned by `f` + /// have to match. + /// + /// Note that this method consumes the future it is called on and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<i32, i32>(1) }; + /// let future = future.or_else(|x| async move { Err::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Err(4)); + /// # }); + /// ``` + /// + /// Calling [`or_else`](TryFutureExt::or_else) on a successful future has + /// no effect: + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Ok::<i32, i32>(1) }; + /// let future = future.or_else(|x| async move { Ok::<i32, i32>(x + 3) }); + /// assert_eq!(future.await, Ok(1)); + /// # }); + /// ``` + fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> + where + F: FnOnce(Self::Error) -> Fut, + Fut: TryFuture<Ok = Self::Ok>, + Self: Sized, + { + OrElse::new(self, f) + } + + /// Do something with the success value of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect_ok`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// + /// let future = async { Ok::<_, ()>(1) }; + /// let new_future = future.inspect_ok(|&x| println!("about to resolve: {}", x)); + /// assert_eq!(new_future.await, Ok(1)); + /// # }); + /// ``` + fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> + where + F: FnOnce(&Self::Ok), + Self: Sized, + { + InspectOk::new(self, f) + } + + /// Do something with the error value of a future before passing it on. + /// + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect_err`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// + /// let future = async { Err::<(), _>(1) }; + /// let new_future = future.inspect_err(|&x| println!("about to error: {}", x)); + /// assert_eq!(new_future.await, Err(1)); + /// # }); + /// ``` + fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> + where + F: FnOnce(&Self::Error), + Self: Sized, + { + InspectErr::new(self, f) + } + + /// Flatten the execution of this future when the successful result of this + /// future is a stream. + /// + /// This can be useful when stream initialization is deferred, and it is + /// convenient to work with that stream as if stream was available at the + /// call site. + /// + /// Note that this function consumes this future and returns a wrapped + /// version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::TryFutureExt; + /// use futures::stream::{self, TryStreamExt}; + /// + /// let stream_items = vec![17, 18, 19].into_iter().map(Ok); + /// let future_of_a_stream = async { Ok::<_, ()>(stream::iter(stream_items)) }; + /// + /// let stream = future_of_a_stream.try_flatten_stream(); + /// let list = stream.try_collect::<Vec<_>>().await; + /// assert_eq!(list, Ok(vec![17, 18, 19])); + /// # }); + /// ``` + fn try_flatten_stream(self) -> TryFlattenStream<Self> + where + Self::Ok: TryStream<Error = Self::Error>, + Self: Sized, + { + TryFlattenStream::new(self) + } + + /// Unwraps this future's ouput, producing a future with this future's + /// [`Ok`](TryFuture::Ok) type as its + /// [`Output`](std::future::Future::Output) type. + /// + /// If this future is resolved successfully, the returned future will + /// contain the original future's success value as output. Otherwise, the + /// closure `f` is called with the error value to produce an alternate + /// success value. + /// + /// This method is similar to the [`Result::unwrap_or_else`] method. + /// + /// # Examples + /// + /// ``` + /// use futures::future::TryFutureExt; + /// + /// # futures::executor::block_on(async { + /// let future = async { Err::<(), &str>("Boom!") }; + /// let future = future.unwrap_or_else(|_| ()); + /// assert_eq!(future.await, ()); + /// # }); + /// ``` + fn unwrap_or_else<F>(self, f: F) -> UnwrapOrElse<Self, F> + where + Self: Sized, + F: FnOnce(Self::Error) -> Self::Ok, + { + UnwrapOrElse::new(self, f) + } + + /// Wraps a [`TryFuture`] into a future compatable with libraries using + /// futures 0.1 future definitons. Requires the `compat` feature to enable. + #[cfg(feature = "compat")] + fn compat(self) -> Compat<Self> + where + Self: Sized + Unpin, + { + Compat::new(self) + } + + /// Wraps a [`TryFuture`] into a type that implements + /// [`Future`](std::future::Future). + /// + /// [`TryFuture`]s currently do not implement the + /// [`Future`](std::future::Future) trait due to limitations of the + /// compiler. + /// + /// # Examples + /// + /// ``` + /// use futures::future::{Future, TryFuture, TryFutureExt}; + /// + /// # type T = i32; + /// # type E = (); + /// fn make_try_future() -> impl TryFuture<Ok = T, Error = E> { // ... } + /// # async { Ok::<i32, ()>(1) } + /// # } + /// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ } + /// + /// take_future(make_try_future().into_future()); + /// ``` + fn into_future(self) -> IntoFuture<Self> + where + Self: Sized, + { + IntoFuture::new(self) + } + + /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] + /// future types. + fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>> + where + Self: Unpin, + { + Pin::new(self).try_poll(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/or_else.rs b/third_party/rust/futures-util/src/future/try_future/or_else.rs new file mode 100644 index 0000000000..a9c006fa9f --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/or_else.rs @@ -0,0 +1,56 @@ +use super::{TryChain, TryChainAction}; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`or_else`](super::TryFutureExt::or_else) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct OrElse<Fut1, Fut2, F> { + try_chain: TryChain<Fut1, Fut2, F>, +} + +impl<Fut1, Fut2, F> OrElse<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture, +{ + unsafe_pinned!(try_chain: TryChain<Fut1, Fut2, F>); + + /// Creates a new `Then`. + pub(super) fn new(future: Fut1, f: F) -> OrElse<Fut1, Fut2, F> { + OrElse { + try_chain: TryChain::new(future, f), + } + } +} + +impl<Fut1, Fut2, F> FusedFuture for OrElse<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture<Ok = Fut1::Ok>, + F: FnOnce(Fut1::Error) -> Fut2, +{ + fn is_terminated(&self) -> bool { + self.try_chain.is_terminated() + } +} + +impl<Fut1, Fut2, F> Future for OrElse<Fut1, Fut2, F> + where Fut1: TryFuture, + Fut2: TryFuture<Ok = Fut1::Ok>, + F: FnOnce(Fut1::Error) -> Fut2, +{ + type Output = Result<Fut2::Ok, Fut2::Error>; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.try_chain().poll(cx, |result, async_op| { + match result { + Ok(ok) => TryChainAction::Output(Ok(ok)), + Err(err) => TryChainAction::Future(async_op(err)), + } + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/try_chain.rs b/third_party/rust/futures-util/src/future/try_future/try_chain.rs new file mode 100644 index 0000000000..662bdf2d26 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/try_chain.rs @@ -0,0 +1,108 @@ +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::task::{Context, Poll}; + +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub(crate) enum TryChain<Fut1, Fut2, Data> { + First(Fut1, Option<Data>), + Second(Fut2), + Empty, +} + +impl<Fut1: Unpin, Fut2: Unpin, Data> Unpin for TryChain<Fut1, Fut2, Data> {} + +pub(crate) enum TryChainAction<Fut2> + where Fut2: TryFuture, +{ + Future(Fut2), + Output(Result<Fut2::Ok, Fut2::Error>), +} + +impl<Fut1, Fut2, Data> TryChain<Fut1, Fut2, Data> + where Fut1: TryFuture, + Fut2: TryFuture, +{ + pub(crate) fn new(fut1: Fut1, data: Data) -> TryChain<Fut1, Fut2, Data> { + TryChain::First(fut1, Some(data)) + } + + pub(crate) fn is_terminated(&self) -> bool { + match self { + TryChain::First(..) | TryChain::Second(_) => false, + TryChain::Empty => true, + } + } + + pub(crate) fn poll<F>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + f: F, + ) -> Poll<Result<Fut2::Ok, Fut2::Error>> + where F: FnOnce(Result<Fut1::Ok, Fut1::Error>, Data) -> TryChainAction<Fut2>, + { + let mut f = Some(f); + + // Safe to call `get_unchecked_mut` because we won't move the futures. + let this = unsafe { self.get_unchecked_mut() }; + + loop { + let (output, data) = match this { + TryChain::First(fut1, data) => { + // Poll the first future + let output = ready!(unsafe { Pin::new_unchecked(fut1) }.try_poll(cx)); + (output, data.take().unwrap()) + } + TryChain::Second(fut2) => { + // Poll the second future + return unsafe { Pin::new_unchecked(fut2) } + .try_poll(cx) + .map(|res| { + *this = TryChain::Empty; // Drop fut2. + res + }); + } + TryChain::Empty => { + panic!("future must not be polled after it returned `Poll::Ready`"); + } + }; + + *this = TryChain::Empty; // Drop fut1 + let f = f.take().unwrap(); + match f(output, data) { + TryChainAction::Future(fut2) => *this = TryChain::Second(fut2), + TryChainAction::Output(output) => return Poll::Ready(output), + } + } + } +} + +#[cfg(test)] +mod tests { + use std::pin::Pin; + use std::task::Poll; + + use futures_test::task::noop_context; + + use crate::future::ready; + + use super::{TryChain, TryChainAction}; + + #[test] + fn try_chain_is_terminated() { + let mut cx = noop_context(); + + let mut future = TryChain::new(ready(Ok(1)), ()); + assert!(!future.is_terminated()); + + let res = Pin::new(&mut future).poll( + &mut cx, + |res: Result<usize, ()>, ()| { + assert!(res.is_ok()); + TryChainAction::Future(ready(Ok(2))) + }, + ); + assert_eq!(res, Poll::Ready::<Result<usize, ()>>(Ok(2))); + assert!(future.is_terminated()); + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs b/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs new file mode 100644 index 0000000000..24624314c0 --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/try_flatten_stream.rs @@ -0,0 +1,91 @@ +use super::FlattenStreamSink; +use core::fmt; +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_utils::unsafe_pinned; + +/// Stream for the [`try_flatten_stream`](super::TryFutureExt::try_flatten_stream) method. +#[must_use = "streams do nothing unless polled"] +pub struct TryFlattenStream<Fut> +where + Fut: TryFuture, +{ + inner: FlattenStreamSink<Fut>, +} + +impl<Fut: TryFuture> TryFlattenStream<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, +{ + unsafe_pinned!(inner: FlattenStreamSink<Fut>); + + pub(super) fn new(future: Fut) -> Self { + Self { + inner: FlattenStreamSink::new(future), + } + } +} + +impl<Fut> fmt::Debug for TryFlattenStream<Fut> +where + Fut: TryFuture + fmt::Debug, + Fut::Ok: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryFlattenStream") + .field("inner", &self.inner) + .finish() + } +} + +impl<Fut> FusedStream for TryFlattenStream<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error> + FusedStream, +{ + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl<Fut> Stream for TryFlattenStream<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, +{ + type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner().poll_next(cx) + } +} + +#[cfg(feature = "sink")] +impl<Fut, Item> Sink<Item> for TryFlattenStream<Fut> +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error> + Sink<Item, Error = Fut::Error>, +{ + type Error = Fut::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.inner().start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner().poll_close(cx) + } +} diff --git a/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs b/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs new file mode 100644 index 0000000000..286cc009fb --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_future/unwrap_or_else.rs @@ -0,0 +1,55 @@ +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Future for the [`unwrap_or_else`](super::TryFutureExt::unwrap_or_else) +/// method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct UnwrapOrElse<Fut, F> { + future: Fut, + f: Option<F>, +} + +impl<Fut, F> UnwrapOrElse<Fut, F> { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(f: Option<F>); + + /// Creates a new UnwrapOrElse. + pub(super) fn new(future: Fut, f: F) -> UnwrapOrElse<Fut, F> { + UnwrapOrElse { future, f: Some(f) } + } +} + +impl<Fut: Unpin, F> Unpin for UnwrapOrElse<Fut, F> {} + +impl<Fut, F> FusedFuture for UnwrapOrElse<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Error) -> Fut::Ok, +{ + fn is_terminated(&self) -> bool { + self.f.is_none() + } +} + +impl<Fut, F> Future for UnwrapOrElse<Fut, F> + where Fut: TryFuture, + F: FnOnce(Fut::Error) -> Fut::Ok, +{ + type Output = Fut::Ok; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + self.as_mut() + .future() + .try_poll(cx) + .map(|result| { + let op = self.as_mut().f().take() + .expect("UnwrapOrElse already returned `Poll::Ready` before"); + result.unwrap_or_else(op) + }) + } +} diff --git a/third_party/rust/futures-util/src/future/try_join.rs b/third_party/rust/futures-util/src/future/try_join.rs new file mode 100644 index 0000000000..da85eff91d --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_join.rs @@ -0,0 +1,262 @@ +#![allow(non_snake_case)] + +use crate::future::{MaybeDone, maybe_done, TryFutureExt, IntoFuture}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Join:ident, <Fut1, $($Fut:ident),*>), + )*) => ($( + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> { + Fut1: MaybeDone<IntoFuture<Fut1>>, + $($Fut: MaybeDone<IntoFuture<$Fut>>,)* + } + + impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture + fmt::Debug, + Fut1::Ok: fmt::Debug, + Fut1::Error: fmt::Debug, + $( + $Fut: TryFuture + fmt::Debug, + $Fut::Ok: fmt::Debug, + $Fut::Error: fmt::Debug, + )* + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(stringify!($Join)) + .field("Fut1", &self.Fut1) + $(.field(stringify!($Fut), &self.$Fut))* + .finish() + } + } + + impl<Fut1, $($Fut),*> $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture, + $( + $Fut: TryFuture<Error=Fut1::Error> + ),* + { + fn new(Fut1: Fut1, $($Fut: $Fut),*) -> $Join<Fut1, $($Fut),*> { + $Join { + Fut1: maybe_done(TryFutureExt::into_future(Fut1)), + $($Fut: maybe_done(TryFutureExt::into_future($Fut))),* + } + } + + unsafe_pinned!(Fut1: MaybeDone<IntoFuture<Fut1>>); + $( + unsafe_pinned!($Fut: MaybeDone<IntoFuture<$Fut>>); + )* + } + + impl<Fut1, $($Fut),*> Future for $Join<Fut1, $($Fut),*> + where + Fut1: TryFuture, + $( + $Fut: TryFuture<Error=Fut1::Error> + ),* + { + type Output = Result<(Fut1::Ok, $($Fut::Ok),*), Fut1::Error>; + + fn poll( + mut self: Pin<&mut Self>, cx: &mut Context<'_> + ) -> Poll<Self::Output> { + let mut all_done = true; + if self.as_mut().Fut1().poll(cx).is_pending() { + all_done = false; + } else if self.as_mut().Fut1().output_mut().unwrap().is_err() { + return Poll::Ready(Err( + self.as_mut().Fut1().take_output().unwrap().err().unwrap())); + } + $( + if self.as_mut().$Fut().poll(cx).is_pending() { + all_done = false; + } else if self.as_mut().$Fut().output_mut().unwrap().is_err() { + return Poll::Ready(Err( + self.as_mut().$Fut().take_output().unwrap().err().unwrap())); + } + )* + + if all_done { + Poll::Ready(Ok(( + self.as_mut().Fut1().take_output().unwrap().ok().unwrap(), + $( + self.as_mut().$Fut().take_output().unwrap().ok().unwrap() + ),* + ))) + } else { + Poll::Pending + } + } + } + )*) +} + +generate! { + /// Future for the [`try_join`](try_join()) function. + (TryJoin, <Fut1, Fut2>), + + /// Future for the [`try_join3`] function. + (TryJoin3, <Fut1, Fut2, Fut3>), + + /// Future for the [`try_join4`] function. + (TryJoin4, <Fut1, Fut2, Fut3, Fut4>), + + /// Future for the [`try_join5`] function. + (TryJoin5, <Fut1, Fut2, Fut3, Fut4, Fut5>), +} + +/// Joins the result of two futures, waiting for them both to complete or +/// for one to produce an error. +/// +/// This function will return a new future which awaits both futures to +/// complete. If successful, the returned future will finish with a tuple of +/// both results. If unsuccesful, it will complete with the first error +/// encountered. +/// +/// Note that this function consumes the passed futures and returns a +/// wrapped version of it. +/// +/// # Examples +/// +/// When used on multiple futures that return [`Ok`], `try_join` will return +/// [`Ok`] of a tuple of the values: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let pair = future::try_join(a, b); +/// +/// assert_eq!(pair.await, Ok((1, 2))); +/// # }); +/// ``` +/// +/// If one of the futures resolves to an error, `try_join` will return +/// that error: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Err::<i32, i32>(2)); +/// let pair = future::try_join(a, b); +/// +/// assert_eq!(pair.await, Err(2)); +/// # }); +/// ``` +pub fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, +{ + TryJoin::new(future1, future2) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let tuple = future::try_join3(a, b, c); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3))); +/// # }); +/// ``` +pub fn try_join3<Fut1, Fut2, Fut3>( + future1: Fut1, + future2: Fut2, + future3: Fut3, +) -> TryJoin3<Fut1, Fut2, Fut3> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, +{ + TryJoin3::new(future1, future2, future3) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let d = future::ready(Ok::<i32, i32>(4)); +/// let tuple = future::try_join4(a, b, c, d); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3, 4))); +/// # }); +/// ``` +pub fn try_join4<Fut1, Fut2, Fut3, Fut4>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, +) -> TryJoin4<Fut1, Fut2, Fut3, Fut4> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, + Fut4: TryFuture<Error = Fut1::Error>, +{ + TryJoin4::new(future1, future2, future3, future4) +} + +/// Same as [`try_join`](try_join()), but with more futures. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let a = future::ready(Ok::<i32, i32>(1)); +/// let b = future::ready(Ok::<i32, i32>(2)); +/// let c = future::ready(Ok::<i32, i32>(3)); +/// let d = future::ready(Ok::<i32, i32>(4)); +/// let e = future::ready(Ok::<i32, i32>(5)); +/// let tuple = future::try_join5(a, b, c, d, e); +/// +/// assert_eq!(tuple.await, Ok((1, 2, 3, 4, 5))); +/// # }); +/// ``` +pub fn try_join5<Fut1, Fut2, Fut3, Fut4, Fut5>( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, + future5: Fut5, +) -> TryJoin5<Fut1, Fut2, Fut3, Fut4, Fut5> +where + Fut1: TryFuture, + Fut2: TryFuture<Error = Fut1::Error>, + Fut3: TryFuture<Error = Fut1::Error>, + Fut4: TryFuture<Error = Fut1::Error>, + Fut5: TryFuture<Error = Fut1::Error>, +{ + TryJoin5::new(future1, future2, future3, future4, future5) +} diff --git a/third_party/rust/futures-util/src/future/try_join_all.rs b/third_party/rust/futures-util/src/future/try_join_all.rs new file mode 100644 index 0000000000..30300e4e3e --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_join_all.rs @@ -0,0 +1,179 @@ +//! Definition of the `TryJoinAll` combinator, waiting for all of a list of +//! futures to finish with either success or error. + +use core::fmt; +use core::future::Future; +use core::iter::FromIterator; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use alloc::boxed::Box; +use alloc::vec::Vec; + +use super::TryFuture; + +#[derive(Debug)] +enum ElemState<F> +where + F: TryFuture, +{ + Pending(F), + Done(Option<F::Ok>), +} + +impl<F> ElemState<F> +where + F: TryFuture, +{ + fn pending_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut F>> { + // Safety: Basic enum pin projection, no drop + optionally Unpin based + // on the type of this variant + match unsafe { self.get_unchecked_mut() } { + ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }), + ElemState::Done(_) => None, + } + } + + fn take_done(self: Pin<&mut Self>) -> Option<F::Ok> { + // Safety: Going from pin to a variant we never pin-project + match unsafe { self.get_unchecked_mut() } { + ElemState::Pending(_) => None, + ElemState::Done(output) => output.take(), + } + } +} + +impl<F> Unpin for ElemState<F> where F: TryFuture + Unpin {} + +fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { + // Safety: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { slice.get_unchecked_mut() } + .iter_mut() + .map(|t| unsafe { Pin::new_unchecked(t) }) +} + +enum FinalState<E = ()> { + Pending, + AllDone, + Error(E) +} + +/// Future for the [`try_join_all`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct TryJoinAll<F> +where + F: TryFuture, +{ + elems: Pin<Box<[ElemState<F>]>>, +} + +impl<F> fmt::Debug for TryJoinAll<F> +where + F: TryFuture + fmt::Debug, + F::Ok: fmt::Debug, + F::Error: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryJoinAll") + .field("elems", &self.elems) + .finish() + } +} + +/// Creates a future which represents either a collection of the results of the +/// futures given or an error. +/// +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec<T>` in the same order as they +/// were provided. +/// +/// If any future returns an error then all other futures will be canceled and +/// an error will be returned immediately. If all futures complete successfully, +/// however, then the returned future will succeed with a `Vec` of all the +/// successful results. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::{self, try_join_all}; +/// +/// let futures = vec![ +/// future::ok::<u32, u32>(1), +/// future::ok::<u32, u32>(2), +/// future::ok::<u32, u32>(3), +/// ]; +/// +/// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3])); +/// +/// let futures = vec![ +/// future::ok::<u32, u32>(1), +/// future::err::<u32, u32>(2), +/// future::ok::<u32, u32>(3), +/// ]; +/// +/// assert_eq!(try_join_all(futures).await, Err(2)); +/// # }); +/// ``` +pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item> +where + I: IntoIterator, + I::Item: TryFuture, +{ + let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect(); + TryJoinAll { + elems: elems.into(), + } +} + +impl<F> Future for TryJoinAll<F> +where + F: TryFuture, +{ + type Output = Result<Vec<F::Ok>, F::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut state = FinalState::AllDone; + + for mut elem in iter_pin_mut(self.elems.as_mut()) { + if let Some(pending) = elem.as_mut().pending_pin_mut() { + match pending.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(output) => match output { + Ok(item) => elem.set(ElemState::Done(Some(item))), + Err(e) => { + state = FinalState::Error(e); + break; + } + } + } + } + } + + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(&mut self.elems, Box::pin([])); + let results = iter_pin_mut(elems.as_mut()) + .map(|e| e.take_done().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + }, + FinalState::Error(e) => { + let _ = mem::replace(&mut self.elems, Box::pin([])); + Poll::Ready(Err(e)) + }, + } + } +} + +impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> { + fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { + try_join_all(iter) + } +} diff --git a/third_party/rust/futures-util/src/future/try_select.rs b/third_party/rust/futures-util/src/future/try_select.rs new file mode 100644 index 0000000000..56564f5b5c --- /dev/null +++ b/third_party/rust/futures-util/src/future/try_select.rs @@ -0,0 +1,80 @@ +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use crate::future::{Either, TryFutureExt}; + +/// Future for the [`try_select()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct TrySelect<A, B> { + inner: Option<(A, B)>, +} + +impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// success/error type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt}; +/// +/// // A poor-man's try_join implemented on top of select +/// +/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E> +/// where A: TryFuture<Error = E> + Unpin + 'static, +/// B: TryFuture<Error = E> + Unpin + 'static, +/// E: 'static, +/// { +/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> { +/// match res { +/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))), +/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))), +/// Err(Either::Left((e, _))) => Box::new(future::err(e)), +/// Err(Either::Right((e, _))) => Box::new(future::err(e)), +/// } +/// }) +/// } +/// ``` +pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B> + where A: TryFuture + Unpin, B: TryFuture + Unpin +{ + TrySelect { inner: Some((future1, future2)) } +} + +impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> + where A: TryFuture, B: TryFuture +{ + #[allow(clippy::type_complexity)] + type Output = Result< + Either<(A::Ok, B), (B::Ok, A)>, + Either<(A::Error, B), (B::Error, A)>, + >; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + match a.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), + Poll::Pending => match b.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + } + } + } +} |