diff options
Diffstat (limited to 'third_party/rust/tokio/src/time/timeout.rs')
-rw-r--r-- | third_party/rust/tokio/src/time/timeout.rs | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/timeout.rs b/third_party/rust/tokio/src/time/timeout.rs new file mode 100644 index 0000000000..4a93089e8e --- /dev/null +++ b/third_party/rust/tokio/src/time/timeout.rs @@ -0,0 +1,202 @@ +//! Allows a future to execute for a maximum amount of time. +//! +//! See [`Timeout`] documentation for more details. +//! +//! [`Timeout`]: struct@Timeout + +use crate::{ + coop, + time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + util::trace, +}; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +/// Requires a `Future` to complete before the specified duration has elapsed. +/// +/// If the future completes before the duration has elapsed, then the completed +/// value is returned. Otherwise, an error is returned and the future is +/// canceled. +/// +/// # Cancelation +/// +/// Cancelling a timeout is done by dropping the future. No additional cleanup +/// or other work is required. +/// +/// The original future may be obtained by calling [`Timeout::into_inner`]. This +/// consumes the `Timeout`. +/// +/// # Examples +/// +/// Create a new `Timeout` set to expire in 10 milliseconds. +/// +/// ```rust +/// use tokio::time::timeout; +/// use tokio::sync::oneshot; +/// +/// use std::time::Duration; +/// +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// if let Err(_) = timeout(Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } +/// # } +/// ``` +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +#[track_caller] +pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> +where + T: Future, +{ + let location = trace::caller_location(); + + let deadline = Instant::now().checked_add(duration); + let delay = match deadline { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::far_future(location), + }; + Timeout::new_with_delay(future, delay) +} + +/// Requires a `Future` to complete before the specified instant in time. +/// +/// If the future completes before the instant is reached, then the completed +/// value is returned. Otherwise, an error is returned. +/// +/// # Cancelation +/// +/// Cancelling a timeout is done by dropping the future. No additional cleanup +/// or other work is required. +/// +/// The original future may be obtained by calling [`Timeout::into_inner`]. This +/// consumes the `Timeout`. +/// +/// # Examples +/// +/// Create a new `Timeout` set to expire in 10 milliseconds. +/// +/// ```rust +/// use tokio::time::{Instant, timeout_at}; +/// use tokio::sync::oneshot; +/// +/// use std::time::Duration; +/// +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the +/// // future. +/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } +/// # } +/// ``` +pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T> +where + T: Future, +{ + let delay = sleep_until(deadline); + + Timeout { + value: future, + delay, + } +} + +pin_project! { + /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at). + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Timeout<T> { + #[pin] + value: T, + #[pin] + delay: Sleep, + } +} + +impl<T> Timeout<T> { + pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> { + Timeout { value, delay } + } + + /// Gets a reference to the underlying value in this timeout. + pub fn get_ref(&self) -> &T { + &self.value + } + + /// Gets a mutable reference to the underlying value in this timeout. + pub fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + /// Consumes this timeout, returning the underlying value. + pub fn into_inner(self) -> T { + self.value + } +} + +impl<T> Future for Timeout<T> +where + T: Future, +{ + type Output = Result<T::Output, Elapsed>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + let had_budget_before = coop::has_budget_remaining(); + + // First, try polling the future + if let Poll::Ready(v) = me.value.poll(cx) { + return Poll::Ready(Ok(v)); + } + + let has_budget_now = coop::has_budget_remaining(); + + let delay = me.delay; + + let poll_delay = || -> Poll<Self::Output> { + match delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), + Poll::Pending => Poll::Pending, + } + }; + + if let (true, false) = (had_budget_before, has_budget_now) { + // if it is the underlying future that exhausted the budget, we poll + // the `delay` with an unconstrained one. This prevents pathological + // cases where the underlying future always exhausts the budget and + // we never get a chance to evaluate whether the timeout was hit or + // not. + coop::with_unconstrained(poll_delay) + } else { + poll_delay() + } + } +} |