diff options
Diffstat (limited to 'third_party/rust/tokio/src/future')
-rw-r--r-- | third_party/rust/tokio/src/future/block_on.rs | 15 | ||||
-rw-r--r-- | third_party/rust/tokio/src/future/maybe_done.rs | 76 | ||||
-rw-r--r-- | third_party/rust/tokio/src/future/mod.rs | 30 | ||||
-rw-r--r-- | third_party/rust/tokio/src/future/poll_fn.rs | 40 | ||||
-rw-r--r-- | third_party/rust/tokio/src/future/trace.rs | 11 | ||||
-rw-r--r-- | third_party/rust/tokio/src/future/try_join.rs | 82 |
6 files changed, 254 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/future/block_on.rs b/third_party/rust/tokio/src/future/block_on.rs new file mode 100644 index 0000000000..91f9cc0055 --- /dev/null +++ b/third_party/rust/tokio/src/future/block_on.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +cfg_rt! { + pub(crate) fn block_on<F: Future>(f: F) -> F::Output { + let mut e = crate::runtime::enter::enter(false); + e.block_on(f).unwrap() + } +} + +cfg_not_rt! { + pub(crate) fn block_on<F: Future>(f: F) -> F::Output { + let mut park = crate::park::thread::CachedParkThread::new(); + park.block_on(f).unwrap() + } +} diff --git a/third_party/rust/tokio/src/future/maybe_done.rs b/third_party/rust/tokio/src/future/maybe_done.rs new file mode 100644 index 0000000000..486efbe01a --- /dev/null +++ b/third_party/rust/tokio/src/future/maybe_done.rs @@ -0,0 +1,76 @@ +//! Definition of the MaybeDone combinator. + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that may have completed. +#[derive(Debug)] +pub enum MaybeDone<Fut: Future> { + /// A not-yet-completed future. + Future(Fut), + /// The output of the completed future. + Done(Fut::Output), + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take_output`](MaybeDone::take_output) method. + Gone, +} + +// Safe because we never generate `Pin<&mut Fut::Output>` +impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} + +/// Wraps a future into a `MaybeDone`. +pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { + MaybeDone::Future(future) +} + +impl<Fut: Future> MaybeDone<Fut> { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take_output`](MaybeDone::take_output) + /// has not yet been called. + pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempts to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(_) => {} + MaybeDone::Future(_) | MaybeDone::Gone => return None, + }; + if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { + Some(output) + } else { + unreachable!() + } + } + } +} + +impl<Fut: Future> Future for MaybeDone<Fut> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let res = unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)), + MaybeDone::Done(_) => return Poll::Ready(()), + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + }; + self.set(MaybeDone::Done(res)); + Poll::Ready(()) + } +} diff --git a/third_party/rust/tokio/src/future/mod.rs b/third_party/rust/tokio/src/future/mod.rs new file mode 100644 index 0000000000..084ddc571f --- /dev/null +++ b/third_party/rust/tokio/src/future/mod.rs @@ -0,0 +1,30 @@ +#![cfg_attr(not(feature = "macros"), allow(unreachable_pub))] + +//! Asynchronous values. + +#[cfg(any(feature = "macros", feature = "process"))] +pub(crate) mod maybe_done; + +mod poll_fn; +pub use poll_fn::poll_fn; + +cfg_process! { + mod try_join; + pub(crate) use try_join::try_join3; +} + +cfg_sync! { + mod block_on; + pub(crate) use block_on::block_on; +} + +cfg_trace! { + mod trace; + pub(crate) use trace::InstrumentedFuture as Future; +} + +cfg_not_trace! { + cfg_rt! { + pub(crate) use std::future::Future; + } +} diff --git a/third_party/rust/tokio/src/future/poll_fn.rs b/third_party/rust/tokio/src/future/poll_fn.rs new file mode 100644 index 0000000000..d82ce8961d --- /dev/null +++ b/third_party/rust/tokio/src/future/poll_fn.rs @@ -0,0 +1,40 @@ +#![allow(dead_code)] + +//! Definition of the `PollFn` adapter combinator. + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`poll_fn`] function. +pub struct PollFn<F> { + f: F, +} + +impl<F> Unpin for PollFn<F> {} + +/// Creates a new future wrapping around a function returning [`Poll`]. +pub fn poll_fn<T, F>(f: F) -> PollFn<F> +where + F: FnMut(&mut Context<'_>) -> Poll<T>, +{ + PollFn { f } +} + +impl<F> fmt::Debug for PollFn<F> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl<T, F> Future for PollFn<F> +where + F: FnMut(&mut Context<'_>) -> Poll<T>, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + (&mut self.f)(cx) + } +} diff --git a/third_party/rust/tokio/src/future/trace.rs b/third_party/rust/tokio/src/future/trace.rs new file mode 100644 index 0000000000..28789a604d --- /dev/null +++ b/third_party/rust/tokio/src/future/trace.rs @@ -0,0 +1,11 @@ +use std::future::Future; + +pub(crate) trait InstrumentedFuture: Future { + fn id(&self) -> Option<tracing::Id>; +} + +impl<F: Future> InstrumentedFuture for tracing::instrument::Instrumented<F> { + fn id(&self) -> Option<tracing::Id> { + self.span().id() + } +} diff --git a/third_party/rust/tokio/src/future/try_join.rs b/third_party/rust/tokio/src/future/try_join.rs new file mode 100644 index 0000000000..8943f61a1e --- /dev/null +++ b/third_party/rust/tokio/src/future/try_join.rs @@ -0,0 +1,82 @@ +use crate::future::maybe_done::{maybe_done, MaybeDone}; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub(crate) fn try_join3<T1, F1, T2, F2, T3, F3, E>( + future1: F1, + future2: F2, + future3: F3, +) -> TryJoin3<F1, F2, F3> +where + F1: Future<Output = Result<T1, E>>, + F2: Future<Output = Result<T2, E>>, + F3: Future<Output = Result<T3, E>>, +{ + TryJoin3 { + future1: maybe_done(future1), + future2: maybe_done(future2), + future3: maybe_done(future3), + } +} + +pin_project! { + pub(crate) struct TryJoin3<F1, F2, F3> + where + F1: Future, + F2: Future, + F3: Future, + { + #[pin] + future1: MaybeDone<F1>, + #[pin] + future2: MaybeDone<F2>, + #[pin] + future3: MaybeDone<F3>, + } +} + +impl<T1, F1, T2, F2, T3, F3, E> Future for TryJoin3<F1, F2, F3> +where + F1: Future<Output = Result<T1, E>>, + F2: Future<Output = Result<T2, E>>, + F3: Future<Output = Result<T3, E>>, +{ + type Output = Result<(T1, T2, T3), E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut all_done = true; + + let mut me = self.project(); + + if me.future1.as_mut().poll(cx).is_pending() { + all_done = false; + } else if me.future1.as_mut().output_mut().unwrap().is_err() { + return Poll::Ready(Err(me.future1.take_output().unwrap().err().unwrap())); + } + + if me.future2.as_mut().poll(cx).is_pending() { + all_done = false; + } else if me.future2.as_mut().output_mut().unwrap().is_err() { + return Poll::Ready(Err(me.future2.take_output().unwrap().err().unwrap())); + } + + if me.future3.as_mut().poll(cx).is_pending() { + all_done = false; + } else if me.future3.as_mut().output_mut().unwrap().is_err() { + return Poll::Ready(Err(me.future3.take_output().unwrap().err().unwrap())); + } + + if all_done { + Poll::Ready(Ok(( + me.future1.take_output().unwrap().ok().unwrap(), + me.future2.take_output().unwrap().ok().unwrap(), + me.future3.take_output().unwrap().ok().unwrap(), + ))) + } else { + Poll::Pending + } + } +} |