summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/time
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio/src/time
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/time')
-rw-r--r--third_party/rust/tokio/src/time/clock.rs233
-rw-r--r--third_party/rust/tokio/src/time/driver/entry.rs633
-rw-r--r--third_party/rust/tokio/src/time/driver/handle.rs94
-rw-r--r--third_party/rust/tokio/src/time/driver/mod.rs528
-rw-r--r--third_party/rust/tokio/src/time/driver/sleep.rs438
-rw-r--r--third_party/rust/tokio/src/time/driver/tests/mod.rs301
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/level.rs276
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/mod.rs359
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/stack.rs112
-rw-r--r--third_party/rust/tokio/src/time/error.rs120
-rw-r--r--third_party/rust/tokio/src/time/instant.rs223
-rw-r--r--third_party/rust/tokio/src/time/interval.rs531
-rw-r--r--third_party/rust/tokio/src/time/mod.rs114
-rw-r--r--third_party/rust/tokio/src/time/tests/mod.rs22
-rw-r--r--third_party/rust/tokio/src/time/tests/test_sleep.rs443
-rw-r--r--third_party/rust/tokio/src/time/timeout.rs202
16 files changed, 4629 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/clock.rs b/third_party/rust/tokio/src/time/clock.rs
new file mode 100644
index 0000000000..41be9bac48
--- /dev/null
+++ b/third_party/rust/tokio/src/time/clock.rs
@@ -0,0 +1,233 @@
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
+//! Source of time abstraction.
+//!
+//! By default, `std::time::Instant::now()` is used. However, when the
+//! `test-util` feature flag is enabled, the values returned for `now()` are
+//! configurable.
+
+cfg_not_test_util! {
+ use crate::time::{Instant};
+
+ #[derive(Debug, Clone)]
+ pub(crate) struct Clock {}
+
+ pub(crate) fn now() -> Instant {
+ Instant::from_std(std::time::Instant::now())
+ }
+
+ impl Clock {
+ pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
+ Clock {}
+ }
+
+ pub(crate) fn now(&self) -> Instant {
+ now()
+ }
+ }
+}
+
+cfg_test_util! {
+ use crate::time::{Duration, Instant};
+ use crate::loom::sync::{Arc, Mutex};
+
+ cfg_rt! {
+ fn clock() -> Option<Clock> {
+ crate::runtime::context::clock()
+ }
+ }
+
+ cfg_not_rt! {
+ fn clock() -> Option<Clock> {
+ None
+ }
+ }
+
+ /// A handle to a source of time.
+ #[derive(Debug, Clone)]
+ pub(crate) struct Clock {
+ inner: Arc<Mutex<Inner>>,
+ }
+
+ #[derive(Debug)]
+ struct Inner {
+ /// True if the ability to pause time is enabled.
+ enable_pausing: bool,
+
+ /// Instant to use as the clock's base instant.
+ base: std::time::Instant,
+
+ /// Instant at which the clock was last unfrozen.
+ unfrozen: Option<std::time::Instant>,
+ }
+
+ /// Pauses time.
+ ///
+ /// The current value of `Instant::now()` is saved and all subsequent calls
+ /// to `Instant::now()` will return the saved value. The saved value can be
+ /// changed by [`advance`] or by the time auto-advancing once the runtime
+ /// has no work to do. This only affects the `Instant` type in Tokio, and
+ /// the `Instant` in std continues to work as normal.
+ ///
+ /// Pausing time requires the `current_thread` Tokio runtime. This is the
+ /// default runtime used by `#[tokio::test]`. The runtime can be initialized
+ /// with time in a paused state using the `Builder::start_paused` method.
+ ///
+ /// For cases where time is immediately paused, it is better to pause
+ /// the time using the `main` or `test` macro:
+ /// ```
+ /// #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// println!("Hello world");
+ /// }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if time is already frozen or if called from outside of a
+ /// `current_thread` Tokio runtime.
+ ///
+ /// # Auto-advance
+ ///
+ /// If time is paused and the runtime has no work to do, the clock is
+ /// auto-advanced to the next pending timer. This means that [`Sleep`] or
+ /// other timer-backed primitives can cause the runtime to advance the
+ /// current time when awaited.
+ ///
+ /// [`Sleep`]: crate::time::Sleep
+ /// [`advance`]: crate::time::advance
+ pub fn pause() {
+ let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
+ clock.pause();
+ }
+
+ /// Resumes time.
+ ///
+ /// Clears the saved `Instant::now()` value. Subsequent calls to
+ /// `Instant::now()` will return the value returned by the system call.
+ ///
+ /// # Panics
+ ///
+ /// Panics if time is not frozen or if called from outside of the Tokio
+ /// runtime.
+ pub fn resume() {
+ let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
+ let mut inner = clock.inner.lock();
+
+ if inner.unfrozen.is_some() {
+ panic!("time is not frozen");
+ }
+
+ inner.unfrozen = Some(std::time::Instant::now());
+ }
+
+ /// Advances time.
+ ///
+ /// Increments the saved `Instant::now()` value by `duration`. Subsequent
+ /// calls to `Instant::now()` will return the result of the increment.
+ ///
+ /// This function will make the current time jump forward by the given
+ /// duration in one jump. This means that all `sleep` calls with a deadline
+ /// before the new time will immediately complete "at the same time", and
+ /// the runtime is free to poll them in any order. Additionally, this
+ /// method will not wait for the `sleep` calls it advanced past to complete.
+ /// If you want to do that, you should instead call [`sleep`] and rely on
+ /// the runtime's auto-advance feature.
+ ///
+ /// Note that calls to `sleep` are not guaranteed to complete the first time
+ /// they are polled after a call to `advance`. For example, this can happen
+ /// if the runtime has not yet touched the timer driver after the call to
+ /// `advance`. However if they don't, the runtime will poll the task again
+ /// shortly.
+ ///
+ /// # Panics
+ ///
+ /// Panics if time is not frozen or if called from outside of the Tokio
+ /// runtime.
+ ///
+ /// # Auto-advance
+ ///
+ /// If the time is paused and there is no work to do, the runtime advances
+ /// time to the next timer. See [`pause`](pause#auto-advance) for more
+ /// details.
+ ///
+ /// [`sleep`]: fn@crate::time::sleep
+ pub async fn advance(duration: Duration) {
+ let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
+ clock.advance(duration);
+
+ crate::task::yield_now().await;
+ }
+
+ /// Returns the current instant, factoring in frozen time.
+ pub(crate) fn now() -> Instant {
+ if let Some(clock) = clock() {
+ clock.now()
+ } else {
+ Instant::from_std(std::time::Instant::now())
+ }
+ }
+
+ impl Clock {
+ /// Returns a new `Clock` instance that uses the current execution context's
+ /// source of time.
+ pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
+ let now = std::time::Instant::now();
+
+ let clock = Clock {
+ inner: Arc::new(Mutex::new(Inner {
+ enable_pausing,
+ base: now,
+ unfrozen: Some(now),
+ })),
+ };
+
+ if start_paused {
+ clock.pause();
+ }
+
+ clock
+ }
+
+ pub(crate) fn pause(&self) {
+ let mut inner = self.inner.lock();
+
+ if !inner.enable_pausing {
+ drop(inner); // avoid poisoning the lock
+ panic!("`time::pause()` requires the `current_thread` Tokio runtime. \
+ This is the default Runtime used by `#[tokio::test].");
+ }
+
+ let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed();
+ inner.base += elapsed;
+ inner.unfrozen = None;
+ }
+
+ pub(crate) fn is_paused(&self) -> bool {
+ let inner = self.inner.lock();
+ inner.unfrozen.is_none()
+ }
+
+ pub(crate) fn advance(&self, duration: Duration) {
+ let mut inner = self.inner.lock();
+
+ if inner.unfrozen.is_some() {
+ panic!("time is not frozen");
+ }
+
+ inner.base += duration;
+ }
+
+ pub(crate) fn now(&self) -> Instant {
+ let inner = self.inner.lock();
+
+ let mut ret = inner.base;
+
+ if let Some(unfrozen) = inner.unfrozen {
+ ret += unfrozen.elapsed();
+ }
+
+ Instant::from_std(ret)
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/entry.rs b/third_party/rust/tokio/src/time/driver/entry.rs
new file mode 100644
index 0000000000..f0ea898e12
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/entry.rs
@@ -0,0 +1,633 @@
+//! Timer state structures.
+//!
+//! This module contains the heart of the intrusive timer implementation, and as
+//! such the structures inside are full of tricky concurrency and unsafe code.
+//!
+//! # Ground rules
+//!
+//! The heart of the timer implementation here is the [`TimerShared`] structure,
+//! shared between the [`TimerEntry`] and the driver. Generally, we permit access
+//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or
+//! 2) a held driver lock.
+//!
+//! It follows from this that any changes made while holding BOTH 1 and 2 will
+//! be reliably visible, regardless of ordering. This is because of the acq/rel
+//! fences on the driver lock ensuring ordering with 2, and rust mutable
+//! reference rules for 1 (a mutable reference to an object can't be passed
+//! between threads without an acq/rel barrier, and same-thread we have local
+//! happens-before ordering).
+//!
+//! # State field
+//!
+//! Each timer has a state field associated with it. This field contains either
+//! the current scheduled time, or a special flag value indicating its state.
+//! This state can either indicate that the timer is on the 'pending' queue (and
+//! thus will be fired with an `Ok(())` result soon) or that it has already been
+//! fired/deregistered.
+//!
+//! This single state field allows for code that is firing the timer to
+//! synchronize with any racing `reset` calls reliably.
+//!
+//! # Cached vs true timeouts
+//!
+//! To allow for the use case of a timeout that is periodically reset before
+//! expiration to be as lightweight as possible, we support optimistically
+//! lock-free timer resets, in the case where a timer is rescheduled to a later
+//! point than it was originally scheduled for.
+//!
+//! This is accomplished by lazily rescheduling timers. That is, we update the
+//! state field field with the true expiration of the timer from the holder of
+//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
+//! walking lists of timers), it checks this "true when" value, and reschedules
+//! based on it.
+//!
+//! We do, however, also need to track what the expiration time was when we
+//! originally registered the timer; this is used to locate the right linked
+//! list when the timer is being cancelled. This is referred to as the "cached
+//! when" internally.
+//!
+//! There is of course a race condition between timer reset and timer
+//! expiration. If the driver fails to observe the updated expiration time, it
+//! could trigger expiration of the timer too early. However, because
+//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
+//! refuse to mark the timer as pending.
+//!
+//! [mark_pending]: TimerHandle::mark_pending
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::AtomicU64;
+use crate::loom::sync::atomic::Ordering;
+
+use crate::sync::AtomicWaker;
+use crate::time::Instant;
+use crate::util::linked_list;
+
+use super::Handle;
+
+use std::cell::UnsafeCell as StdUnsafeCell;
+use std::task::{Context, Poll, Waker};
+use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
+
+type TimerResult = Result<(), crate::time::error::Error>;
+
+const STATE_DEREGISTERED: u64 = u64::MAX;
+const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
+const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
+
+/// This structure holds the current shared state of the timer - its scheduled
+/// time (if registered), or otherwise the result of the timer completing, as
+/// well as the registered waker.
+///
+/// Generally, the StateCell is only permitted to be accessed from two contexts:
+/// Either a thread holding the corresponding &mut TimerEntry, or a thread
+/// holding the timer driver lock. The write actions on the StateCell amount to
+/// passing "ownership" of the StateCell between these contexts; moving a timer
+/// from the TimerEntry to the driver requires _both_ holding the &mut
+/// TimerEntry and the driver lock, while moving it back (firing the timer)
+/// requires only the driver lock.
+pub(super) struct StateCell {
+ /// Holds either the scheduled expiration time for this timer, or (if the
+ /// timer has been fired and is unregistered), `u64::MAX`.
+ state: AtomicU64,
+ /// If the timer is fired (an Acquire order read on state shows
+ /// `u64::MAX`), holds the result that should be returned from
+ /// polling the timer. Otherwise, the contents are unspecified and reading
+ /// without holding the driver lock is undefined behavior.
+ result: UnsafeCell<TimerResult>,
+ /// The currently-registered waker
+ waker: CachePadded<AtomicWaker>,
+}
+
+impl Default for StateCell {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl std::fmt::Debug for StateCell {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "StateCell({:?})", self.read_state())
+ }
+}
+
+impl StateCell {
+ fn new() -> Self {
+ Self {
+ state: AtomicU64::new(STATE_DEREGISTERED),
+ result: UnsafeCell::new(Ok(())),
+ waker: CachePadded(AtomicWaker::new()),
+ }
+ }
+
+ fn is_pending(&self) -> bool {
+ self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
+ }
+
+ /// Returns the current expiration time, or None if not currently scheduled.
+ fn when(&self) -> Option<u64> {
+ let cur_state = self.state.load(Ordering::Relaxed);
+
+ if cur_state == u64::MAX {
+ None
+ } else {
+ Some(cur_state)
+ }
+ }
+
+ /// If the timer is completed, returns the result of the timer. Otherwise,
+ /// returns None and registers the waker.
+ fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
+ // We must register first. This ensures that either `fire` will
+ // observe the new waker, or we will observe a racing fire to have set
+ // the state, or both.
+ self.waker.0.register_by_ref(waker);
+
+ self.read_state()
+ }
+
+ fn read_state(&self) -> Poll<TimerResult> {
+ let cur_state = self.state.load(Ordering::Acquire);
+
+ if cur_state == STATE_DEREGISTERED {
+ // SAFETY: The driver has fired this timer; this involves writing
+ // the result, and then writing (with release ordering) the state
+ // field.
+ Poll::Ready(unsafe { self.result.with(|p| *p) })
+ } else {
+ Poll::Pending
+ }
+ }
+
+ /// Marks this timer as being moved to the pending list, if its scheduled
+ /// time is not after `not_after`.
+ ///
+ /// If the timer is scheduled for a time after not_after, returns an Err
+ /// containing the current scheduled time.
+ ///
+ /// SAFETY: Must hold the driver lock.
+ unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
+ // Quick initial debug check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let mut cur_state = self.state.load(Ordering::Relaxed);
+
+ loop {
+ debug_assert!(cur_state < STATE_MIN_VALUE);
+
+ if cur_state > not_after {
+ break Err(cur_state);
+ }
+
+ match self.state.compare_exchange(
+ cur_state,
+ STATE_PENDING_FIRE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ break Ok(());
+ }
+ Err(actual_state) => {
+ cur_state = actual_state;
+ }
+ }
+ }
+ }
+
+ /// Fires the timer, setting the result to the provided result.
+ ///
+ /// Returns:
+ /// * `Some(waker) - if fired and a waker needs to be invoked once the
+ /// driver lock is released
+ /// * `None` - if fired and a waker does not need to be invoked, or if
+ /// already fired
+ ///
+ /// SAFETY: The driver lock must be held.
+ unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
+ // Quick initial check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let cur_state = self.state.load(Ordering::Relaxed);
+ if cur_state == STATE_DEREGISTERED {
+ return None;
+ }
+
+ // SAFETY: We assume the driver lock is held and the timer is not
+ // fired, so only the driver is accessing this field.
+ //
+ // We perform a release-ordered store to state below, to ensure this
+ // write is visible before the state update is visible.
+ unsafe { self.result.with_mut(|p| *p = result) };
+
+ self.state.store(STATE_DEREGISTERED, Ordering::Release);
+
+ self.waker.0.take_waker()
+ }
+
+ /// Marks the timer as registered (poll will return None) and sets the
+ /// expiration time.
+ ///
+ /// While this function is memory-safe, it should only be called from a
+ /// context holding both `&mut TimerEntry` and the driver lock.
+ fn set_expiration(&self, timestamp: u64) {
+ debug_assert!(timestamp < STATE_MIN_VALUE);
+
+ // We can use relaxed ordering because we hold the driver lock and will
+ // fence when we release the lock.
+ self.state.store(timestamp, Ordering::Relaxed);
+ }
+
+ /// Attempts to adjust the timer to a new timestamp.
+ ///
+ /// If the timer has already been fired, is pending firing, or the new
+ /// timestamp is earlier than the old timestamp, (or occasionally
+ /// spuriously) returns Err without changing the timer's state. In this
+ /// case, the timer must be deregistered and re-registered.
+ fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
+ let mut prior = self.state.load(Ordering::Relaxed);
+ loop {
+ if new_timestamp < prior || prior >= STATE_MIN_VALUE {
+ return Err(());
+ }
+
+ match self.state.compare_exchange_weak(
+ prior,
+ new_timestamp,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ return Ok(());
+ }
+ Err(true_prior) => {
+ prior = true_prior;
+ }
+ }
+ }
+ }
+
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.load(Ordering::Relaxed) != u64::MAX
+ }
+}
+
+/// A timer entry.
+///
+/// This is the handle to a timer that is controlled by the requester of the
+/// timer. As this participates in intrusive data structures, it must be pinned
+/// before polling.
+#[derive(Debug)]
+pub(super) struct TimerEntry {
+ /// Arc reference to the driver. We can only free the driver after
+ /// deregistering everything from their respective timer wheels.
+ driver: Handle,
+ /// Shared inner structure; this is part of an intrusive linked list, and
+ /// therefore other references can exist to it while mutable references to
+ /// Entry exist.
+ ///
+ /// This is manipulated only under the inner mutex. TODO: Can we use loom
+ /// cells for this?
+ inner: StdUnsafeCell<TimerShared>,
+ /// Initial deadline for the timer. This is used to register on the first
+ /// poll, as we can't register prior to being pinned.
+ initial_deadline: Option<Instant>,
+ /// Ensure the type is !Unpin
+ _m: std::marker::PhantomPinned,
+}
+
+unsafe impl Send for TimerEntry {}
+unsafe impl Sync for TimerEntry {}
+
+/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the
+/// timer entry. Generally, at most one TimerHandle exists for a timer at a time
+/// (enforced by the timer state machine).
+///
+/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats
+/// of pointer safety apply. In particular, TimerHandle does not itself enforce
+/// that the timer does still exist; however, normally an TimerHandle is created
+/// immediately before registering the timer, and is consumed when firing the
+/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce
+/// memory safety, all operations are unsafe.
+#[derive(Debug)]
+pub(crate) struct TimerHandle {
+ inner: NonNull<TimerShared>,
+}
+
+pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
+
+/// The shared state structure of a timer. This structure is shared between the
+/// frontend (`Entry`) and driver backend.
+///
+/// Note that this structure is located inside the `TimerEntry` structure.
+#[derive(Debug)]
+#[repr(C)] // required by `link_list::Link` impl
+pub(crate) struct TimerShared {
+ /// Data manipulated by the driver thread itself, only.
+ driver_state: CachePadded<TimerSharedPadded>,
+
+ /// Current state. This records whether the timer entry is currently under
+ /// the ownership of the driver, and if not, its current state (not
+ /// complete, fired, error, etc).
+ state: StateCell,
+
+ _p: PhantomPinned,
+}
+
+impl TimerShared {
+ pub(super) fn new() -> Self {
+ Self {
+ state: StateCell::default(),
+ driver_state: CachePadded(TimerSharedPadded::new()),
+ _p: PhantomPinned,
+ }
+ }
+
+ /// Gets the cached time-of-expiration value.
+ pub(super) fn cached_when(&self) -> u64 {
+ // Cached-when is only accessed under the driver lock, so we can use relaxed
+ self.driver_state.0.cached_when.load(Ordering::Relaxed)
+ }
+
+ /// Gets the true time-of-expiration value, and copies it into the cached
+ /// time-of-expiration value.
+ ///
+ /// SAFETY: Must be called with the driver lock held, and when this entry is
+ /// not in any timer wheel lists.
+ pub(super) unsafe fn sync_when(&self) -> u64 {
+ let true_when = self.true_when();
+
+ self.driver_state
+ .0
+ .cached_when
+ .store(true_when, Ordering::Relaxed);
+
+ true_when
+ }
+
+ /// Sets the cached time-of-expiration value.
+ ///
+ /// SAFETY: Must be called with the driver lock held, and when this entry is
+ /// not in any timer wheel lists.
+ unsafe fn set_cached_when(&self, when: u64) {
+ self.driver_state
+ .0
+ .cached_when
+ .store(when, Ordering::Relaxed);
+ }
+
+ /// Returns the true time-of-expiration value, with relaxed memory ordering.
+ pub(super) fn true_when(&self) -> u64 {
+ self.state.when().expect("Timer already fired")
+ }
+
+ /// Sets the true time-of-expiration value, even if it is less than the
+ /// current expiration or the timer is deregistered.
+ ///
+ /// SAFETY: Must only be called with the driver lock held and the entry not
+ /// in the timer wheel.
+ pub(super) unsafe fn set_expiration(&self, t: u64) {
+ self.state.set_expiration(t);
+ self.driver_state.0.cached_when.store(t, Ordering::Relaxed);
+ }
+
+ /// Sets the true time-of-expiration only if it is after the current.
+ pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
+ self.state.extend_expiration(t)
+ }
+
+ /// Returns a TimerHandle for this timer.
+ pub(super) fn handle(&self) -> TimerHandle {
+ TimerHandle {
+ inner: NonNull::from(self),
+ }
+ }
+
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.might_be_registered()
+ }
+}
+
+/// Additional shared state between the driver and the timer which is cache
+/// padded. This contains the information that the driver thread accesses most
+/// frequently to minimize contention. In particular, we move it away from the
+/// waker, as the waker is updated on every poll.
+#[repr(C)] // required by `link_list::Link` impl
+struct TimerSharedPadded {
+ /// A link within the doubly-linked list of timers on a particular level and
+ /// slot. Valid only if state is equal to Registered.
+ ///
+ /// Only accessed under the entry lock.
+ pointers: linked_list::Pointers<TimerShared>,
+
+ /// The expiration time for which this entry is currently registered.
+ /// Generally owned by the driver, but is accessed by the entry when not
+ /// registered.
+ cached_when: AtomicU64,
+
+ /// The true expiration time. Set by the timer future, read by the driver.
+ true_when: AtomicU64,
+}
+
+impl std::fmt::Debug for TimerSharedPadded {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TimerSharedPadded")
+ .field("when", &self.true_when.load(Ordering::Relaxed))
+ .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
+ .finish()
+ }
+}
+
+impl TimerSharedPadded {
+ fn new() -> Self {
+ Self {
+ cached_when: AtomicU64::new(0),
+ true_when: AtomicU64::new(0),
+ pointers: linked_list::Pointers::new(),
+ }
+ }
+}
+
+unsafe impl Send for TimerShared {}
+unsafe impl Sync for TimerShared {}
+
+unsafe impl linked_list::Link for TimerShared {
+ type Handle = TimerHandle;
+
+ type Target = TimerShared;
+
+ fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
+ handle.inner
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
+ TimerHandle { inner: ptr }
+ }
+
+ unsafe fn pointers(
+ target: NonNull<Self::Target>,
+ ) -> NonNull<linked_list::Pointers<Self::Target>> {
+ target.cast()
+ }
+}
+
+// ===== impl Entry =====
+
+impl TimerEntry {
+ pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self {
+ let driver = handle.clone();
+
+ Self {
+ driver,
+ inner: StdUnsafeCell::new(TimerShared::new()),
+ initial_deadline: Some(deadline),
+ _m: std::marker::PhantomPinned,
+ }
+ }
+
+ fn inner(&self) -> &TimerShared {
+ unsafe { &*self.inner.get() }
+ }
+
+ pub(crate) fn is_elapsed(&self) -> bool {
+ !self.inner().state.might_be_registered() && self.initial_deadline.is_none()
+ }
+
+ /// Cancels and deregisters the timer. This operation is irreversible.
+ pub(crate) fn cancel(self: Pin<&mut Self>) {
+ // We need to perform an acq/rel fence with the driver thread, and the
+ // simplest way to do so is to grab the driver lock.
+ //
+ // Why is this necessary? We're about to release this timer's memory for
+ // some other non-timer use. However, we've been doing a bunch of
+ // relaxed (or even non-atomic) writes from the driver thread, and we'll
+ // be doing more from _this thread_ (as this memory is interpreted as
+ // something else).
+ //
+ // It is critical to ensure that, from the point of view of the driver,
+ // those future non-timer writes happen-after the timer is fully fired,
+ // and from the purpose of this thread, the driver's writes all
+ // happen-before we drop the timer. This in turn requires us to perform
+ // an acquire-release barrier in _both_ directions between the driver
+ // and dropping thread.
+ //
+ // The lock acquisition in clear_entry serves this purpose. All of the
+ // driver manipulations happen with the lock held, so we can just take
+ // the lock and be sure that this drop happens-after everything the
+ // driver did so far and happens-before everything the driver does in
+ // the future. While we have the lock held, we also go ahead and
+ // deregister the entry if necessary.
+ unsafe { self.driver.clear_entry(NonNull::from(self.inner())) };
+ }
+
+ pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
+ unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None;
+
+ let tick = self.driver.time_source().deadline_to_tick(new_time);
+
+ if self.inner().extend_expiration(tick).is_ok() {
+ return;
+ }
+
+ unsafe {
+ self.driver.reregister(tick, self.inner().into());
+ }
+ }
+
+ pub(crate) fn poll_elapsed(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), super::Error>> {
+ if self.driver.is_shutdown() {
+ panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
+ }
+
+ if let Some(deadline) = self.initial_deadline {
+ self.as_mut().reset(deadline);
+ }
+
+ let this = unsafe { self.get_unchecked_mut() };
+
+ this.inner().state.poll(cx.waker())
+ }
+}
+
+impl TimerHandle {
+ pub(super) unsafe fn cached_when(&self) -> u64 {
+ unsafe { self.inner.as_ref().cached_when() }
+ }
+
+ pub(super) unsafe fn sync_when(&self) -> u64 {
+ unsafe { self.inner.as_ref().sync_when() }
+ }
+
+ pub(super) unsafe fn is_pending(&self) -> bool {
+ unsafe { self.inner.as_ref().state.is_pending() }
+ }
+
+ /// Forcibly sets the true and cached expiration times to the given tick.
+ ///
+ /// SAFETY: The caller must ensure that the handle remains valid, the driver
+ /// lock is held, and that the timer is not in any wheel linked lists.
+ pub(super) unsafe fn set_expiration(&self, tick: u64) {
+ self.inner.as_ref().set_expiration(tick);
+ }
+
+ /// Attempts to mark this entry as pending. If the expiration time is after
+ /// `not_after`, however, returns an Err with the current expiration time.
+ ///
+ /// If an `Err` is returned, the `cached_when` value will be updated to this
+ /// new expiration time.
+ ///
+ /// SAFETY: The caller must ensure that the handle remains valid, the driver
+ /// lock is held, and that the timer is not in any wheel linked lists.
+ /// After returning Ok, the entry must be added to the pending list.
+ pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
+ match self.inner.as_ref().state.mark_pending(not_after) {
+ Ok(()) => {
+ // mark this as being on the pending queue in cached_when
+ self.inner.as_ref().set_cached_when(u64::MAX);
+ Ok(())
+ }
+ Err(tick) => {
+ self.inner.as_ref().set_cached_when(tick);
+ Err(tick)
+ }
+ }
+ }
+
+ /// Attempts to transition to a terminal state. If the state is already a
+ /// terminal state, does nothing.
+ ///
+ /// Because the entry might be dropped after the state is moved to a
+ /// terminal state, this function consumes the handle to ensure we don't
+ /// access the entry afterwards.
+ ///
+ /// Returns the last-registered waker, if any.
+ ///
+ /// SAFETY: The driver lock must be held while invoking this function, and
+ /// the entry must not be in any wheel linked lists.
+ pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
+ self.inner.as_ref().state.fire(completed_state)
+ }
+}
+
+impl Drop for TimerEntry {
+ fn drop(&mut self) {
+ unsafe { Pin::new_unchecked(self) }.as_mut().cancel()
+ }
+}
+
+#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
+#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
+#[derive(Debug, Default)]
+struct CachePadded<T>(T);
diff --git a/third_party/rust/tokio/src/time/driver/handle.rs b/third_party/rust/tokio/src/time/driver/handle.rs
new file mode 100644
index 0000000000..b61c0476e1
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/handle.rs
@@ -0,0 +1,94 @@
+use crate::loom::sync::Arc;
+use crate::time::driver::ClockTime;
+use std::fmt;
+
+/// Handle to time driver instance.
+#[derive(Clone)]
+pub(crate) struct Handle {
+ time_source: ClockTime,
+ inner: Arc<super::Inner>,
+}
+
+impl Handle {
+ /// Creates a new timer `Handle` from a shared `Inner` timer state.
+ pub(super) fn new(inner: Arc<super::Inner>) -> Self {
+ let time_source = inner.state.lock().time_source.clone();
+ Handle { time_source, inner }
+ }
+
+ /// Returns the time source associated with this handle.
+ pub(super) fn time_source(&self) -> &ClockTime {
+ &self.time_source
+ }
+
+ /// Access the driver's inner structure.
+ pub(super) fn get(&self) -> &super::Inner {
+ &*self.inner
+ }
+
+ /// Checks whether the driver has been shutdown.
+ pub(super) fn is_shutdown(&self) -> bool {
+ self.inner.is_shutdown()
+ }
+}
+
+cfg_rt! {
+ impl Handle {
+ /// Tries to get a handle to the current timer.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current timer set.
+ ///
+ /// It can be triggered when [`Builder::enable_time`] or
+ /// [`Builder::enable_all`] are not included in the builder.
+ ///
+ /// It can also panic whenever a timer is created outside of a
+ /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+ /// since the function is executed outside of the runtime.
+ /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+ /// And this is because wrapping the function on an async makes it lazy,
+ /// and so gets executed inside the runtime successfully without
+ /// panicking.
+ ///
+ /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+ /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+ pub(crate) fn current() -> Self {
+ crate::runtime::context::time_handle()
+ .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
+ }
+ }
+}
+
+cfg_not_rt! {
+ impl Handle {
+ /// Tries to get a handle to the current timer.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current timer set.
+ ///
+ /// It can be triggered when [`Builder::enable_time`] or
+ /// [`Builder::enable_all`] are not included in the builder.
+ ///
+ /// It can also panic whenever a timer is created outside of a
+ /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+ /// since the function is executed outside of the runtime.
+ /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+ /// And this is because wrapping the function on an async makes it lazy,
+ /// and so gets executed inside the runtime successfully without
+ /// panicking.
+ ///
+ /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+ /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+ pub(crate) fn current() -> Self {
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
+ }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/mod.rs b/third_party/rust/tokio/src/time/driver/mod.rs
new file mode 100644
index 0000000000..9971877479
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/mod.rs
@@ -0,0 +1,528 @@
+// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
+// in the future, this will change to the reverse. For now, suppress this
+// warning and generally stick with being explicit about unsafety.
+#![allow(unused_unsafe)]
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
+//! Time driver.
+
+mod entry;
+pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
+
+mod handle;
+pub(crate) use self::handle::Handle;
+
+mod wheel;
+
+pub(super) mod sleep;
+
+use crate::loom::sync::atomic::{AtomicBool, Ordering};
+use crate::loom::sync::{Arc, Mutex};
+use crate::park::{Park, Unpark};
+use crate::time::error::Error;
+use crate::time::{Clock, Duration, Instant};
+
+use std::convert::TryInto;
+use std::fmt;
+use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
+
+/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
+///
+/// A `Driver` instance tracks the state necessary for managing time and
+/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
+///
+/// It is expected that a single instance manages many individual [`Sleep`][sleep]
+/// instances. The `Driver` implementation is thread-safe and, as such, is able
+/// to handle callers from across threads.
+///
+/// After creating the `Driver` instance, the caller must repeatedly call `park`
+/// or `park_timeout`. The time driver will perform no work unless `park` or
+/// `park_timeout` is called repeatedly.
+///
+/// The driver has a resolution of one millisecond. Any unit of time that falls
+/// between milliseconds are rounded up to the next millisecond.
+///
+/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
+/// elapsed will be notified with an error. At this point, calling `poll` on the
+/// [`Sleep`][sleep] instance will result in panic.
+///
+/// # Implementation
+///
+/// The time driver is based on the [paper by Varghese and Lauck][paper].
+///
+/// A hashed timing wheel is a vector of slots, where each slot handles a time
+/// slice. As time progresses, the timer walks over the slot for the current
+/// instant, and processes each entry for that slot. When the timer reaches the
+/// end of the wheel, it starts again at the beginning.
+///
+/// The implementation maintains six wheels arranged in a set of levels. As the
+/// levels go up, the slots of the associated wheel represent larger intervals
+/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
+/// time equal to the wheel at the lower level. At level zero, each slot
+/// represents one millisecond of time.
+///
+/// The wheels are:
+///
+/// * Level 0: 64 x 1 millisecond slots.
+/// * Level 1: 64 x 64 millisecond slots.
+/// * Level 2: 64 x ~4 second slots.
+/// * Level 3: 64 x ~4 minute slots.
+/// * Level 4: 64 x ~4 hour slots.
+/// * Level 5: 64 x ~12 day slots.
+///
+/// When the timer processes entries at level zero, it will notify all the
+/// `Sleep` instances as their deadlines have been reached. For all higher
+/// levels, all entries will be redistributed across the wheel at the next level
+/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
+/// either be canceled (dropped) or their associated entries will reach level
+/// zero and be notified.
+///
+/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
+/// [sleep]: crate::time::Sleep
+/// [timeout]: crate::time::Timeout
+/// [interval]: crate::time::Interval
+#[derive(Debug)]
+pub(crate) struct Driver<P: Park + 'static> {
+ /// Timing backend in use.
+ time_source: ClockTime,
+
+ /// Shared state.
+ handle: Handle,
+
+ /// Parker to delegate to.
+ park: P,
+
+ // When `true`, a call to `park_timeout` should immediately return and time
+ // should not advance. One reason for this to be `true` is if the task
+ // passed to `Runtime::block_on` called `task::yield_now()`.
+ //
+ // While it may look racy, it only has any effect when the clock is paused
+ // and pausing the clock is restricted to a single-threaded runtime.
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
+}
+
+/// A structure which handles conversion from Instants to u64 timestamps.
+#[derive(Debug, Clone)]
+pub(self) struct ClockTime {
+ clock: super::clock::Clock,
+ start_time: Instant,
+}
+
+impl ClockTime {
+ pub(self) fn new(clock: Clock) -> Self {
+ Self {
+ start_time: clock.now(),
+ clock,
+ }
+ }
+
+ pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
+ // Round up to the end of a ms
+ self.instant_to_tick(t + Duration::from_nanos(999_999))
+ }
+
+ pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
+ // round up
+ let dur: Duration = t
+ .checked_duration_since(self.start_time)
+ .unwrap_or_else(|| Duration::from_secs(0));
+ let ms = dur.as_millis();
+
+ ms.try_into().unwrap_or(u64::MAX)
+ }
+
+ pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
+ Duration::from_millis(t)
+ }
+
+ pub(self) fn now(&self) -> u64 {
+ self.instant_to_tick(self.clock.now())
+ }
+}
+
+/// Timer state shared between `Driver`, `Handle`, and `Registration`.
+struct Inner {
+ // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
+ pub(super) state: Mutex<InnerState>,
+
+ /// True if the driver is being shutdown.
+ pub(super) is_shutdown: AtomicBool,
+}
+
+/// Time state shared which must be protected by a `Mutex`
+struct InnerState {
+ /// Timing backend in use.
+ time_source: ClockTime,
+
+ /// The last published timer `elapsed` value.
+ elapsed: u64,
+
+ /// The earliest time at which we promise to wake up without unparking.
+ next_wake: Option<NonZeroU64>,
+
+ /// Timer wheel.
+ wheel: wheel::Wheel,
+
+ /// Unparker that can be used to wake the time driver.
+ unpark: Box<dyn Unpark>,
+}
+
+// ===== impl Driver =====
+
+impl<P> Driver<P>
+where
+ P: Park + 'static,
+{
+ /// Creates a new `Driver` instance that uses `park` to block the current
+ /// thread and `time_source` to get the current time and convert to ticks.
+ ///
+ /// Specifying the source of time is useful when testing.
+ pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
+ let time_source = ClockTime::new(clock);
+
+ let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
+
+ Driver {
+ time_source,
+ handle: Handle::new(Arc::new(inner)),
+ park,
+ #[cfg(feature = "test-util")]
+ did_wake: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Returns a handle to the timer.
+ ///
+ /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances
+ /// can either be created directly or the `Handle` instance can be passed to
+ /// `with_default`, setting the timer as the default timer for the execution
+ /// context.
+ pub(crate) fn handle(&self) -> Handle {
+ self.handle.clone()
+ }
+
+ fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
+ let mut lock = self.handle.get().state.lock();
+
+ assert!(!self.handle.is_shutdown());
+
+ let next_wake = lock.wheel.next_expiration_time();
+ lock.next_wake =
+ next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
+
+ drop(lock);
+
+ match next_wake {
+ Some(when) => {
+ let now = self.time_source.now();
+ // Note that we effectively round up to 1ms here - this avoids
+ // very short-duration microsecond-resolution sleeps that the OS
+ // might treat as zero-length.
+ let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
+
+ if duration > Duration::from_millis(0) {
+ if let Some(limit) = limit {
+ duration = std::cmp::min(limit, duration);
+ }
+
+ self.park_timeout(duration)?;
+ } else {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ }
+ }
+ None => {
+ if let Some(duration) = limit {
+ self.park_timeout(duration)?;
+ } else {
+ self.park.park()?;
+ }
+ }
+ }
+
+ // Process pending timers after waking up
+ self.handle.process();
+
+ Ok(())
+ }
+
+ cfg_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ let clock = &self.time_source.clock;
+
+ if clock.is_paused() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+
+ // If the time driver was woken, then the park completed
+ // before the "duration" elapsed (usually caused by a
+ // yield in `Runtime::block_on`). In this case, we don't
+ // advance the clock.
+ if !self.did_wake() {
+ // Simulate advancing time
+ clock.advance(duration);
+ }
+ } else {
+ self.park.park_timeout(duration)?;
+ }
+
+ Ok(())
+ }
+
+ fn did_wake(&self) -> bool {
+ self.did_wake.swap(false, Ordering::SeqCst)
+ }
+ }
+
+ cfg_not_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ self.park.park_timeout(duration)
+ }
+ }
+}
+
+impl Handle {
+ /// Runs timer related logic, and returns the next wakeup time
+ pub(self) fn process(&self) {
+ let now = self.time_source().now();
+
+ self.process_at_time(now)
+ }
+
+ pub(self) fn process_at_time(&self, mut now: u64) {
+ let mut waker_list: [Option<Waker>; 32] = Default::default();
+ let mut waker_idx = 0;
+
+ let mut lock = self.get().lock();
+
+ if now < lock.elapsed {
+ // Time went backwards! This normally shouldn't happen as the Rust language
+ // guarantees that an Instant is monotonic, but can happen when running
+ // Linux in a VM on a Windows host due to std incorrectly trusting the
+ // hardware clock to be monotonic.
+ //
+ // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
+ now = lock.elapsed;
+ }
+
+ while let Some(entry) = lock.wheel.poll(now) {
+ debug_assert!(unsafe { entry.is_pending() });
+
+ // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
+ if let Some(waker) = unsafe { entry.fire(Ok(())) } {
+ waker_list[waker_idx] = Some(waker);
+
+ waker_idx += 1;
+
+ if waker_idx == waker_list.len() {
+ // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
+ drop(lock);
+
+ for waker in waker_list.iter_mut() {
+ waker.take().unwrap().wake();
+ }
+
+ waker_idx = 0;
+
+ lock = self.get().lock();
+ }
+ }
+ }
+
+ // Update the elapsed cache
+ lock.elapsed = lock.wheel.elapsed();
+ lock.next_wake = lock
+ .wheel
+ .poll_at()
+ .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
+
+ drop(lock);
+
+ for waker in waker_list[0..waker_idx].iter_mut() {
+ waker.take().unwrap().wake();
+ }
+ }
+
+ /// Removes a registered timer from the driver.
+ ///
+ /// The timer will be moved to the cancelled state. Wakers will _not_ be
+ /// invoked. If the timer is already completed, this function is a no-op.
+ ///
+ /// This function always acquires the driver lock, even if the entry does
+ /// not appear to be registered.
+ ///
+ /// SAFETY: The timer must not be registered with some other driver, and
+ /// `add_entry` must not be called concurrently.
+ pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
+ unsafe {
+ let mut lock = self.get().lock();
+
+ if entry.as_ref().might_be_registered() {
+ lock.wheel.remove(entry);
+ }
+
+ entry.as_ref().handle().fire(Ok(()));
+ }
+ }
+
+ /// Removes and re-adds an entry to the driver.
+ ///
+ /// SAFETY: The timer must be either unregistered, or registered with this
+ /// driver. No other threads are allowed to concurrently manipulate the
+ /// timer at all (the current thread should hold an exclusive reference to
+ /// the `TimerEntry`)
+ pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
+ let waker = unsafe {
+ let mut lock = self.get().lock();
+
+ // We may have raced with a firing/deregistration, so check before
+ // deregistering.
+ if unsafe { entry.as_ref().might_be_registered() } {
+ lock.wheel.remove(entry);
+ }
+
+ // Now that we have exclusive control of this entry, mint a handle to reinsert it.
+ let entry = entry.as_ref().handle();
+
+ if self.is_shutdown() {
+ unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
+ } else {
+ entry.set_expiration(new_tick);
+
+ // Note: We don't have to worry about racing with some other resetting
+ // thread, because add_entry and reregister require exclusive control of
+ // the timer entry.
+ match unsafe { lock.wheel.insert(entry) } {
+ Ok(when) => {
+ if lock
+ .next_wake
+ .map(|next_wake| when < next_wake.get())
+ .unwrap_or(true)
+ {
+ lock.unpark.unpark();
+ }
+
+ None
+ }
+ Err((entry, super::error::InsertError::Elapsed)) => unsafe {
+ entry.fire(Ok(()))
+ },
+ }
+ }
+
+ // Must release lock before invoking waker to avoid the risk of deadlock.
+ };
+
+ // The timer was fired synchronously as a result of the reregistration.
+ // Wake the waker; this is needed because we might reset _after_ a poll,
+ // and otherwise the task won't be awoken to poll again.
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
+}
+
+impl<P> Park for Driver<P>
+where
+ P: Park + 'static,
+{
+ type Unpark = TimerUnpark<P>;
+ type Error = P::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ TimerUnpark::new(self)
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park_internal(None)
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park_internal(Some(duration))
+ }
+
+ fn shutdown(&mut self) {
+ if self.handle.is_shutdown() {
+ return;
+ }
+
+ self.handle.get().is_shutdown.store(true, Ordering::SeqCst);
+
+ // Advance time forward to the end of time.
+
+ self.handle.process_at_time(u64::MAX);
+
+ self.park.shutdown();
+ }
+}
+
+impl<P> Drop for Driver<P>
+where
+ P: Park + 'static,
+{
+ fn drop(&mut self) {
+ self.shutdown();
+ }
+}
+
+pub(crate) struct TimerUnpark<P: Park + 'static> {
+ inner: P::Unpark,
+
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
+}
+
+impl<P: Park + 'static> TimerUnpark<P> {
+ fn new(driver: &Driver<P>) -> TimerUnpark<P> {
+ TimerUnpark {
+ inner: driver.park.unpark(),
+
+ #[cfg(feature = "test-util")]
+ did_wake: driver.did_wake.clone(),
+ }
+ }
+}
+
+impl<P: Park + 'static> Unpark for TimerUnpark<P> {
+ fn unpark(&self) {
+ #[cfg(feature = "test-util")]
+ self.did_wake.store(true, Ordering::SeqCst);
+
+ self.inner.unpark();
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
+ Inner {
+ state: Mutex::new(InnerState {
+ time_source,
+ elapsed: 0,
+ next_wake: None,
+ unpark,
+ wheel: wheel::Wheel::new(),
+ }),
+ is_shutdown: AtomicBool::new(false),
+ }
+ }
+
+ /// Locks the driver's inner structure
+ pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
+ self.state.lock()
+ }
+
+ // Check whether the driver has been shutdown
+ pub(super) fn is_shutdown(&self) -> bool {
+ self.is_shutdown.load(Ordering::SeqCst)
+ }
+}
+
+impl fmt::Debug for Inner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Inner").finish()
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/third_party/rust/tokio/src/time/driver/sleep.rs b/third_party/rust/tokio/src/time/driver/sleep.rs
new file mode 100644
index 0000000000..7f27ef201f
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/sleep.rs
@@ -0,0 +1,438 @@
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::time::driver::ClockTime;
+use crate::time::driver::{Handle, TimerEntry};
+use crate::time::{error::Error, Duration, Instant};
+use crate::util::trace;
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::panic::Location;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+/// Waits until `deadline` is reached.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep_until, Instant, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_until"))]
+#[track_caller]
+pub fn sleep_until(deadline: Instant) -> Sleep {
+ return Sleep::new_timeout(deadline, trace::caller_location());
+}
+
+/// Waits until `duration` has elapsed.
+///
+/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
+/// analog to `std::thread::sleep`.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep(Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_for"))]
+#[cfg_attr(docsrs, doc(alias = "wait"))]
+#[track_caller]
+pub fn sleep(duration: Duration) -> Sleep {
+ let location = trace::caller_location();
+
+ match Instant::now().checked_add(duration) {
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::new_timeout(Instant::far_future(), location),
+ }
+}
+
+pin_project! {
+ /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
+ ///
+ /// This type does not implement the `Unpin` trait, which means that if you
+ /// use it with [`select!`] or by calling `poll`, you have to pin it first.
+ /// If you use it with `.await`, this does not apply.
+ ///
+ /// # Examples
+ ///
+ /// Wait 100ms and print "100 ms have elapsed".
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// sleep(Duration::from_millis(100)).await;
+ /// println!("100 ms have elapsed");
+ /// }
+ /// ```
+ ///
+ /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
+ /// necessary when the same `Sleep` is selected on multiple times.
+ /// ```no_run
+ /// use tokio::time::{self, Duration, Instant};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let sleep = time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// loop {
+ /// tokio::select! {
+ /// () = &mut sleep => {
+ /// println!("timer elapsed");
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
+ /// },
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
+ /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ ///
+ /// struct HasSleep {
+ /// sleep: Pin<Box<Sleep>>,
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.sleep.as_mut().poll(cx)
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with pin projection. This method avoids the `Box`, but
+ /// the `HasSleep` struct will not be `Unpin` as a consequence.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ /// use pin_project_lite::pin_project;
+ ///
+ /// pin_project! {
+ /// struct HasSleep {
+ /// #[pin]
+ /// sleep: Sleep,
+ /// }
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.project().sleep.poll(cx)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`select!`]: ../macro.select.html
+ /// [`tokio::pin!`]: ../macro.pin.html
+ // Alias for old name in 0.2
+ #[cfg_attr(docsrs, doc(alias = "Delay"))]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Sleep {
+ inner: Inner,
+
+ // The link between the `Sleep` instance and the timer that drives it.
+ #[pin]
+ entry: TimerEntry,
+ }
+}
+
+cfg_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ ctx: trace::AsyncOpTracingCtx,
+ time_source: ClockTime,
+ }
+}
+
+cfg_not_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ }
+}
+
+impl Sleep {
+ #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+ pub(crate) fn new_timeout(
+ deadline: Instant,
+ location: Option<&'static Location<'static>>,
+ ) -> Sleep {
+ let handle = Handle::current();
+ let entry = TimerEntry::new(&handle, deadline);
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = {
+ let time_source = handle.time_source().clone();
+ let deadline_tick = time_source.deadline_to_tick(deadline);
+ let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);
+
+ let location = location.expect("should have location if tracing");
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Sleep",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ let async_op_span = resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
+ });
+
+ let async_op_poll_span =
+ async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
+
+ let ctx = trace::AsyncOpTracingCtx {
+ async_op_span,
+ async_op_poll_span,
+ resource_span,
+ };
+
+ Inner {
+ deadline,
+ ctx,
+ time_source,
+ }
+ };
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = Inner { deadline };
+
+ Sleep { inner, entry }
+ }
+
+ pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
+ Self::new_timeout(Instant::far_future(), location)
+ }
+
+ /// Returns the instant at which the future will complete.
+ pub fn deadline(&self) -> Instant {
+ self.inner.deadline
+ }
+
+ /// Returns `true` if `Sleep` has elapsed.
+ ///
+ /// A `Sleep` instance is elapsed when the requested duration has elapsed.
+ pub fn is_elapsed(&self) -> bool {
+ self.entry.is_elapsed()
+ }
+
+ /// Resets the `Sleep` instance to a new deadline.
+ ///
+ /// Calling this function allows changing the instant at which the `Sleep`
+ /// future completes without having to create new associated state.
+ ///
+ /// This function can be called both before and after the future has
+ /// completed.
+ ///
+ /// To call this method, you will usually combine the call with
+ /// [`Pin::as_mut`], which lets you call the method without consuming the
+ /// `Sleep` itself.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant};
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let sleep = tokio::time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
+ /// # }
+ /// ```
+ ///
+ /// See also the top-level examples.
+ ///
+ /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
+ pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
+ self.reset_inner(deadline)
+ }
+
+ fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
+ let me = self.project();
+ me.entry.reset(deadline);
+ (*me.inner).deadline = deadline;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ {
+ let _resource_enter = me.inner.ctx.resource_span.enter();
+ me.inner.ctx.async_op_span =
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
+ let _async_op_enter = me.inner.ctx.async_op_span.enter();
+
+ me.inner.ctx.async_op_poll_span =
+ tracing::trace_span!("runtime.resource.async_op.poll");
+
+ let duration = {
+ let now = me.inner.time_source.now();
+ let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
+ deadline_tick.checked_sub(now).unwrap_or(0)
+ };
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+ }
+ }
+
+ fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let me = self.project();
+
+ // Keep track of task budget
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let coop = ready!(trace_poll_op!(
+ "poll_elapsed",
+ crate::coop::poll_proceed(cx),
+ ));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let result = me.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ return trace_poll_op!("poll_elapsed", result);
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ return result;
+ }
+}
+
+impl Future for Sleep {
+ type Output = ();
+
+ // `poll_elapsed` can return an error in two cases:
+ //
+ // - AtCapacity: this is a pathological case where far too many
+ // sleep instances have been scheduled.
+ // - Shutdown: No timer has been setup, which is a mis-use error.
+ //
+ // Both cases are extremely rare, and pretty accurately fit into
+ // "logic errors", so we just panic in this case. A user couldn't
+ // really do much better if we passed the error onwards.
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _res_span = self.inner.ctx.resource_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_span = self.inner.ctx.async_op_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered();
+ match ready!(self.as_mut().poll_elapsed(cx)) {
+ Ok(()) => Poll::Ready(()),
+ Err(e) => panic!("timer error: {}", e),
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/tests/mod.rs b/third_party/rust/tokio/src/time/driver/tests/mod.rs
new file mode 100644
index 0000000000..3ac8c75643
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/tests/mod.rs
@@ -0,0 +1,301 @@
+use std::{task::Context, time::Duration};
+
+#[cfg(not(loom))]
+use futures::task::noop_waker_ref;
+
+use crate::loom::sync::Arc;
+use crate::loom::thread;
+use crate::{
+ loom::sync::atomic::{AtomicBool, Ordering},
+ park::Unpark,
+};
+
+use super::{Handle, TimerEntry};
+
+struct MockUnpark {}
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+}
+impl MockUnpark {
+ fn mock() -> Box<dyn Unpark> {
+ Box::new(Self {})
+ }
+}
+
+fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
+ #[cfg(loom)]
+ return loom::future::block_on(f);
+
+ #[cfg(not(loom))]
+ {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+ rt.block_on(f)
+ }
+}
+
+fn model(f: impl Fn() + Send + Sync + 'static) {
+ #[cfg(loom)]
+ loom::model(f);
+
+ #[cfg(not(loom))]
+ f();
+}
+
+#[test]
+fn single_timer() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+ });
+
+ thread::yield_now();
+
+ // This may or may not return Some (depending on how it races with the
+ // thread). If it does return None, however, the timer should complete
+ // synchronously.
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn drop_timer() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+ });
+
+ thread::yield_now();
+
+ // advance 2s in the future.
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn change_waker() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+ });
+
+ thread::yield_now();
+
+ // advance 2s
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn reset_future() {
+ model(|| {
+ let finished_early = Arc::new(AtomicBool::new(false));
+
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let finished_early_ = finished_early.clone();
+ let start = clock.now();
+
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+
+ entry.as_mut().reset(start + Duration::from_secs(2));
+
+ // shouldn't complete before 2s
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+
+ finished_early_.store(true, Ordering::Relaxed);
+ });
+
+ thread::yield_now();
+
+ // This may or may not return a wakeup time.
+ handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500)));
+
+ assert!(!finished_early.load(Ordering::Relaxed));
+
+ handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500)));
+
+ jh.join().unwrap();
+
+ assert!(finished_early.load(Ordering::Relaxed));
+ })
+}
+
+#[cfg(not(loom))]
+fn normal_or_miri<T>(normal: T, miri: T) -> T {
+ if cfg!(miri) {
+ miri
+ } else {
+ normal
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn poll_process_levels() {
+ let clock = crate::time::clock::Clock::new(true, false);
+ clock.pause();
+
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source, MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let mut entries = vec![];
+
+ for i in 0..normal_or_miri(1024, 64) {
+ let mut entry = Box::pin(TimerEntry::new(
+ &handle,
+ clock.now() + Duration::from_millis(i),
+ ));
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
+
+ entries.push(entry);
+ }
+
+ for t in 1..normal_or_miri(1024, 64) {
+ handle.process_at_time(t as u64);
+ for (deadline, future) in entries.iter_mut().enumerate() {
+ let mut context = Context::from_waker(noop_waker_ref());
+ if deadline <= t {
+ assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
+ } else {
+ assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
+ }
+ }
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn poll_process_levels_targeted() {
+ let mut context = Context::from_waker(noop_waker_ref());
+
+ let clock = crate::time::clock::Clock::new(true, false);
+ clock.pause();
+
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source, MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
+ pin!(e1);
+
+ handle.process_at_time(62);
+ assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
+ handle.process_at_time(192);
+ handle.process_at_time(192);
+}
+
+/*
+#[test]
+fn balanced_incr_and_decr() {
+ const OPS: usize = 5;
+
+ fn incr(inner: Arc<Inner>) {
+ for _ in 0..OPS {
+ inner.increment().expect("increment should not have failed");
+ thread::yield_now();
+ }
+ }
+
+ fn decr(inner: Arc<Inner>) {
+ let mut ops_performed = 0;
+ while ops_performed < OPS {
+ if inner.num(Ordering::Relaxed) > 0 {
+ ops_performed += 1;
+ inner.decrement();
+ }
+ thread::yield_now();
+ }
+ }
+
+ loom::model(|| {
+ let unpark = Box::new(MockUnpark);
+ let instant = Instant::now();
+
+ let inner = Arc::new(Inner::new(instant, unpark));
+
+ let incr_inner = inner.clone();
+ let decr_inner = inner.clone();
+
+ let incr_hndle = thread::spawn(move || incr(incr_inner));
+ let decr_hndle = thread::spawn(move || decr(decr_inner));
+
+ incr_hndle.join().expect("should never fail");
+ decr_hndle.join().expect("should never fail");
+
+ assert_eq!(inner.num(Ordering::SeqCst), 0);
+ })
+}
+*/
diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs
new file mode 100644
index 0000000000..878754177b
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs
@@ -0,0 +1,276 @@
+use crate::time::driver::TimerHandle;
+
+use crate::time::driver::{EntryList, TimerShared};
+
+use std::{fmt, ptr::NonNull};
+
+/// Wheel for a single level in the timer. This wheel contains 64 slots.
+pub(crate) struct Level {
+ level: usize,
+
+ /// Bit field tracking which slots currently contain entries.
+ ///
+ /// Using a bit field to track slots that contain entries allows avoiding a
+ /// scan to find entries. This field is updated when entries are added or
+ /// removed from a slot.
+ ///
+ /// The least-significant bit represents slot zero.
+ occupied: u64,
+
+ /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell.
+ slot: [EntryList; LEVEL_MULT],
+}
+
+/// Indicates when a slot must be processed next.
+#[derive(Debug)]
+pub(crate) struct Expiration {
+ /// The level containing the slot.
+ pub(crate) level: usize,
+
+ /// The slot index.
+ pub(crate) slot: usize,
+
+ /// The instant at which the slot needs to be processed.
+ pub(crate) deadline: u64,
+}
+
+/// Level multiplier.
+///
+/// Being a power of 2 is very important.
+const LEVEL_MULT: usize = 64;
+
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = EntryList::default;
+
+ Level {
+ level,
+ occupied: 0,
+ slot: [
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ],
+ }
+ }
+
+ /// Finds the slot that needs to be processed next and returns the slot and
+ /// `Instant` at which this slot must be processed.
+ pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
+ // Use the `occupied` bit field to get the index of the next slot that
+ // needs to be processed.
+ let slot = match self.next_occupied_slot(now) {
+ Some(slot) => slot,
+ None => return None,
+ };
+
+ // From the slot index, calculate the `Instant` at which it needs to be
+ // processed. This value *must* be in the future with respect to `now`.
+
+ let level_range = level_range(self.level);
+ let slot_range = slot_range(self.level);
+
+ // Compute the start date of the current level by masking the low bits
+ // of `now` (`level_range` is a power of 2).
+ let level_start = now & !(level_range - 1);
+ let mut deadline = level_start + slot as u64 * slot_range;
+
+ if deadline <= now {
+ // A timer is in a slot "prior" to the current time. This can occur
+ // because we do not have an infinite hierarchy of timer levels, and
+ // eventually a timer scheduled for a very distant time might end up
+ // being placed in a slot that is beyond the end of all of the
+ // arrays.
+ //
+ // To deal with this, we first limit timers to being scheduled no
+ // more than MAX_DURATION ticks in the future; that is, they're at
+ // most one rotation of the top level away. Then, we force timers
+ // that logically would go into the top+1 level, to instead go into
+ // the top level's slots.
+ //
+ // What this means is that the top level's slots act as a
+ // pseudo-ring buffer, and we rotate around them indefinitely. If we
+ // compute a deadline before now, and it's the top level, it
+ // therefore means we're actually looking at a slot in the future.
+ debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
+
+ deadline += level_range;
+ }
+
+ debug_assert!(
+ deadline >= now,
+ "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}",
+ deadline,
+ now,
+ self.level,
+ level_range,
+ slot_range,
+ slot,
+ self.occupied
+ );
+
+ Some(Expiration {
+ level: self.level,
+ slot,
+ deadline,
+ })
+ }
+
+ fn next_occupied_slot(&self, now: u64) -> Option<usize> {
+ if self.occupied == 0 {
+ return None;
+ }
+
+ // Get the slot for now using Maths
+ let now_slot = (now / slot_range(self.level)) as usize;
+ let occupied = self.occupied.rotate_right(now_slot as u32);
+ let zeros = occupied.trailing_zeros() as usize;
+ let slot = (zeros + now_slot) % 64;
+
+ Some(slot)
+ }
+
+ pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
+ let slot = slot_for(item.cached_when(), self.level);
+
+ self.slot[slot].push_front(item);
+
+ self.occupied |= occupied_bit(slot);
+ }
+
+ pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
+ let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
+
+ unsafe { self.slot[slot].remove(item) };
+ if self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ // Unset the bit
+ self.occupied ^= occupied_bit(slot);
+ }
+ }
+
+ pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
+ self.occupied &= !occupied_bit(slot);
+
+ std::mem::take(&mut self.slot[slot])
+ }
+}
+
+impl fmt::Debug for Level {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Level")
+ .field("occupied", &self.occupied)
+ .finish()
+ }
+}
+
+fn occupied_bit(slot: usize) -> u64 {
+ 1 << slot
+}
+
+fn slot_range(level: usize) -> u64 {
+ LEVEL_MULT.pow(level as u32) as u64
+}
+
+fn level_range(level: usize) -> u64 {
+ LEVEL_MULT as u64 * slot_range(level)
+}
+
+/// Converts a duration (milliseconds) and a level to a slot position.
+fn slot_for(duration: u64, level: usize) -> usize {
+ ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_slot_for() {
+ for pos in 0..64 {
+ assert_eq!(pos as usize, slot_for(pos, 0));
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(pos as usize, slot_for(a as u64, level));
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
new file mode 100644
index 0000000000..f088f2cfd6
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
@@ -0,0 +1,359 @@
+use crate::time::driver::{TimerHandle, TimerShared};
+use crate::time::error::InsertError;
+
+mod level;
+pub(crate) use self::level::Expiration;
+use self::level::Level;
+
+use std::ptr::NonNull;
+
+use super::EntryList;
+
+/// Timing wheel implementation.
+///
+/// This type provides the hashed timing wheel implementation that backs `Timer`
+/// and `DelayQueue`.
+///
+/// The structure is generic over `T: Stack`. This allows handling timeout data
+/// being stored on the heap or in a slab. In order to support the latter case,
+/// the slab must be passed into each function allowing the implementation to
+/// lookup timer entries.
+///
+/// See `Timer` documentation for some implementation notes.
+#[derive(Debug)]
+pub(crate) struct Wheel {
+ /// The number of milliseconds elapsed since the wheel started.
+ elapsed: u64,
+
+ /// Timer wheel.
+ ///
+ /// Levels:
+ ///
+ /// * 1 ms slots / 64 ms range
+ /// * 64 ms slots / ~ 4 sec range
+ /// * ~ 4 sec slots / ~ 4 min range
+ /// * ~ 4 min slots / ~ 4 hr range
+ /// * ~ 4 hr slots / ~ 12 day range
+ /// * ~ 12 day slots / ~ 2 yr range
+ levels: Vec<Level>,
+
+ /// Entries queued for firing
+ pending: EntryList,
+}
+
+/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
+/// each, the timer is able to track time up to 2 years into the future with a
+/// precision of 1 millisecond.
+const NUM_LEVELS: usize = 6;
+
+/// The maximum duration of a `Sleep`.
+pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
+
+impl Wheel {
+ /// Creates a new timing wheel.
+ pub(crate) fn new() -> Wheel {
+ let levels = (0..NUM_LEVELS).map(Level::new).collect();
+
+ Wheel {
+ elapsed: 0,
+ levels,
+ pending: EntryList::new(),
+ }
+ }
+
+ /// Returns the number of milliseconds that have elapsed since the timing
+ /// wheel's creation.
+ pub(crate) fn elapsed(&self) -> u64 {
+ self.elapsed
+ }
+
+ /// Inserts an entry into the timing wheel.
+ ///
+ /// # Arguments
+ ///
+ /// * `item`: The item to insert into the wheel.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
+ ///
+ /// `Err(Elapsed)` indicates that `when` represents an instant that has
+ /// already passed. In this case, the caller should fire the timeout
+ /// immediately.
+ ///
+ /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
+ ///
+ /// # Safety
+ ///
+ /// This function registers item into an intrusive linked list. The caller
+ /// must ensure that `item` is pinned and will not be dropped without first
+ /// being deregistered.
+ pub(crate) unsafe fn insert(
+ &mut self,
+ item: TimerHandle,
+ ) -> Result<u64, (TimerHandle, InsertError)> {
+ let when = item.sync_when();
+
+ if when <= self.elapsed {
+ return Err((item, InsertError::Elapsed));
+ }
+
+ // Get the level at which the entry should be stored
+ let level = self.level_for(when);
+
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+
+ debug_assert!({
+ self.levels[level]
+ .next_expiration(self.elapsed)
+ .map(|e| e.deadline >= self.elapsed)
+ .unwrap_or(true)
+ });
+
+ Ok(when)
+ }
+
+ /// Removes `item` from the timing wheel.
+ pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
+ unsafe {
+ let when = item.as_ref().cached_when();
+ if when == u64::MAX {
+ self.pending.remove(item);
+ } else {
+ debug_assert!(
+ self.elapsed <= when,
+ "elapsed={}; when={}",
+ self.elapsed,
+ when
+ );
+
+ let level = self.level_for(when);
+
+ self.levels[level].remove_entry(item);
+ }
+ }
+ }
+
+ /// Instant at which to poll.
+ pub(crate) fn poll_at(&self) -> Option<u64> {
+ self.next_expiration().map(|expiration| expiration.deadline)
+ }
+
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
+ loop {
+ if let Some(handle) = self.pending.pop_back() {
+ return Some(handle);
+ }
+
+ // under what circumstances is poll.expiration Some vs. None?
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
+
+ match expiration {
+ Some(ref expiration) if expiration.deadline > now => return None,
+ Some(ref expiration) => {
+ self.process_expiration(expiration);
+
+ self.set_elapsed(expiration.deadline);
+ }
+ None => {
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
+ break;
+ }
+ }
+ }
+
+ self.pending.pop_back()
+ }
+
+ /// Returns the instant at which the next timeout expires.
+ fn next_expiration(&self) -> Option<Expiration> {
+ if !self.pending.is_empty() {
+ // Expire immediately as we have things pending firing
+ return Some(Expiration {
+ level: 0,
+ slot: 0,
+ deadline: self.elapsed,
+ });
+ }
+
+ // Check all levels
+ for level in 0..NUM_LEVELS {
+ if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
+ // There cannot be any expirations at a higher level that happen
+ // before this one.
+ debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
+
+ return Some(expiration);
+ }
+ }
+
+ None
+ }
+
+ /// Returns the tick at which this timer wheel next needs to perform some
+ /// processing, or None if there are no timers registered.
+ pub(super) fn next_expiration_time(&self) -> Option<u64> {
+ self.next_expiration().map(|ex| ex.deadline)
+ }
+
+ /// Used for debug assertions
+ fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
+ let mut res = true;
+
+ for l2 in start_level..NUM_LEVELS {
+ if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
+ if e2.deadline < before {
+ res = false;
+ }
+ }
+ }
+
+ res
+ }
+
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// queue it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
+ pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
+ // Note that we need to take _all_ of the entries off the list before
+ // processing any of them. This is important because it's possible that
+ // those entries might need to be reinserted into the same slot.
+ //
+ // This happens only on the highest level, when an entry is inserted
+ // more than MAX_DURATION into the future. When this happens, we wrap
+ // around, and process some entries a multiple of MAX_DURATION before
+ // they actually need to be dropped down a level. We then reinsert them
+ // back into the same position; we must make sure we don't then process
+ // those entries again or we'll end up in an infinite loop.
+ let mut entries = self.take_entries(expiration);
+
+ while let Some(item) = entries.pop_back() {
+ if expiration.level == 0 {
+ debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
+ }
+
+ // Try to expire the entry; this is cheap (doesn't synchronize) if
+ // the timer is not expired, and updates cached_when.
+ match unsafe { item.mark_pending(expiration.deadline) } {
+ Ok(()) => {
+ // Item was expired
+ self.pending.push_front(item);
+ }
+ Err(expiration_tick) => {
+ let level = level_for(expiration.deadline, expiration_tick);
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+ }
+ }
+ }
+ }
+
+ fn set_elapsed(&mut self, when: u64) {
+ assert!(
+ self.elapsed <= when,
+ "elapsed={:?}; when={:?}",
+ self.elapsed,
+ when
+ );
+
+ if when > self.elapsed {
+ self.elapsed = when;
+ }
+ }
+
+ /// Obtains the list of entries that need processing for the given expiration.
+ ///
+ fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
+ self.levels[expiration.level].take_slot(expiration.slot)
+ }
+
+ fn level_for(&self, when: u64) -> usize {
+ level_for(self.elapsed, when)
+ }
+}
+
+fn level_for(elapsed: u64, when: u64) -> usize {
+ const SLOT_MASK: u64 = (1 << 6) - 1;
+
+ // Mask in the trailing bits ignored by the level calculation in order to cap
+ // the possible leading zeros
+ let mut masked = elapsed ^ when | SLOT_MASK;
+
+ if masked >= MAX_DURATION {
+ // Fudge the timer into the top level
+ masked = MAX_DURATION - 1;
+ }
+
+ let leading_zeros = masked.leading_zeros() as usize;
+ let significant = 63 - leading_zeros;
+
+ significant / 6
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_level_for() {
+ for pos in 0..64 {
+ assert_eq!(
+ 0,
+ level_for(0, pos),
+ "level_for({}) -- binary = {:b}",
+ pos,
+ pos
+ );
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+
+ if pos > level {
+ let a = a - 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+
+ if pos < 64 {
+ let a = a + 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
new file mode 100644
index 0000000000..80651c309e
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
@@ -0,0 +1,112 @@
+use super::{Item, OwnedItem};
+use crate::time::driver::Entry;
+
+use std::ptr;
+
+/// A doubly linked stack.
+#[derive(Debug)]
+pub(crate) struct Stack {
+ head: Option<OwnedItem>,
+}
+
+impl Default for Stack {
+ fn default() -> Stack {
+ Stack { head: None }
+ }
+}
+
+impl Stack {
+ pub(crate) fn is_empty(&self) -> bool {
+ self.head.is_none()
+ }
+
+ pub(crate) fn push(&mut self, entry: OwnedItem) {
+ // Get a pointer to the entry to for the prev link
+ let ptr: *const Entry = &*entry as *const _;
+
+ // Remove the old head entry
+ let old = self.head.take();
+
+ unsafe {
+ // Ensure the entry is not already in a stack.
+ debug_assert!((*entry.next_stack.get()).is_none());
+ debug_assert!((*entry.prev_stack.get()).is_null());
+
+ if let Some(ref entry) = old.as_ref() {
+ debug_assert!({
+ // The head is not already set to the entry
+ ptr != &***entry as *const _
+ });
+
+ // Set the previous link on the old head
+ *entry.prev_stack.get() = ptr;
+ }
+
+ // Set this entry's next pointer
+ *entry.next_stack.get() = old;
+ }
+
+ // Update the head pointer
+ self.head = Some(entry);
+ }
+
+ /// Pops an item from the stack.
+ pub(crate) fn pop(&mut self) -> Option<OwnedItem> {
+ let entry = self.head.take();
+
+ unsafe {
+ if let Some(entry) = entry.as_ref() {
+ self.head = (*entry.next_stack.get()).take();
+
+ if let Some(entry) = self.head.as_ref() {
+ *entry.prev_stack.get() = ptr::null();
+ }
+
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+
+ entry
+ }
+
+ pub(crate) fn remove(&mut self, entry: &Item) {
+ unsafe {
+ // Ensure that the entry is in fact contained by the stack
+ debug_assert!({
+ // This walks the full linked list even if an entry is found.
+ let mut next = self.head.as_ref();
+ let mut contains = false;
+
+ while let Some(n) = next {
+ if entry as *const _ == &**n as *const _ {
+ debug_assert!(!contains);
+ contains = true;
+ }
+
+ next = (*n.next_stack.get()).as_ref();
+ }
+
+ contains
+ });
+
+ // Unlink `entry` from the next node
+ let next = (*entry.next_stack.get()).take();
+
+ if let Some(next) = next.as_ref() {
+ (*next.prev_stack.get()) = *entry.prev_stack.get();
+ }
+
+ // Unlink `entry` from the prev node
+
+ if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
+ *prev.next_stack.get() = next;
+ } else {
+ // It is the head
+ self.head = next;
+ }
+
+ // Unset the prev pointer
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/error.rs b/third_party/rust/tokio/src/time/error.rs
new file mode 100644
index 0000000000..63f0a3b0bd
--- /dev/null
+++ b/third_party/rust/tokio/src/time/error.rs
@@ -0,0 +1,120 @@
+//! Time error types.
+
+use self::Kind::*;
+use std::error;
+use std::fmt;
+
+/// Errors encountered by the timer implementation.
+///
+/// Currently, there are two different errors that can occur:
+///
+/// * `shutdown` occurs when a timer operation is attempted, but the timer
+/// instance has been dropped. In this case, the operation will never be able
+/// to complete and the `shutdown` error is returned. This is a permanent
+/// error, i.e., once this error is observed, timer operations will never
+/// succeed in the future.
+///
+/// * `at_capacity` occurs when a timer operation is attempted, but the timer
+/// instance is currently handling its maximum number of outstanding sleep instances.
+/// In this case, the operation is not able to be performed at the current
+/// moment, and `at_capacity` is returned. This is a transient error, i.e., at
+/// some point in the future, if the operation is attempted again, it might
+/// succeed. Callers that observe this error should attempt to [shed load]. One
+/// way to do this would be dropping the future that issued the timer operation.
+///
+/// [shed load]: https://en.wikipedia.org/wiki/Load_Shedding
+#[derive(Debug, Copy, Clone)]
+pub struct Error(Kind);
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+#[repr(u8)]
+pub(crate) enum Kind {
+ Shutdown = 1,
+ AtCapacity = 2,
+ Invalid = 3,
+}
+
+impl From<Kind> for Error {
+ fn from(k: Kind) -> Self {
+ Error(k)
+ }
+}
+
+/// Errors returned by `Timeout`.
+#[derive(Debug, PartialEq)]
+pub struct Elapsed(());
+
+#[derive(Debug)]
+pub(crate) enum InsertError {
+ Elapsed,
+}
+
+// ===== impl Error =====
+
+impl Error {
+ /// Creates an error representing a shutdown timer.
+ pub fn shutdown() -> Error {
+ Error(Shutdown)
+ }
+
+ /// Returns `true` if the error was caused by the timer being shutdown.
+ pub fn is_shutdown(&self) -> bool {
+ matches!(self.0, Kind::Shutdown)
+ }
+
+ /// Creates an error representing a timer at capacity.
+ pub fn at_capacity() -> Error {
+ Error(AtCapacity)
+ }
+
+ /// Returns `true` if the error was caused by the timer being at capacity.
+ pub fn is_at_capacity(&self) -> bool {
+ matches!(self.0, Kind::AtCapacity)
+ }
+
+ /// Creates an error representing a misconfigured timer.
+ pub fn invalid() -> Error {
+ Error(Invalid)
+ }
+
+ /// Returns `true` if the error was caused by the timer being misconfigured.
+ pub fn is_invalid(&self) -> bool {
+ matches!(self.0, Kind::Invalid)
+ }
+}
+
+impl error::Error for Error {}
+
+impl fmt::Display for Error {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use self::Kind::*;
+ let descr = match self.0 {
+ Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime",
+ AtCapacity => "timer is at capacity and cannot create a new entry",
+ Invalid => "timer duration exceeds maximum duration",
+ };
+ write!(fmt, "{}", descr)
+ }
+}
+
+// ===== impl Elapsed =====
+
+impl Elapsed {
+ pub(crate) fn new() -> Self {
+ Elapsed(())
+ }
+}
+
+impl fmt::Display for Elapsed {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "deadline has elapsed".fmt(fmt)
+ }
+}
+
+impl std::error::Error for Elapsed {}
+
+impl From<Elapsed> for std::io::Error {
+ fn from(_err: Elapsed) -> std::io::Error {
+ std::io::ErrorKind::TimedOut.into()
+ }
+}
diff --git a/third_party/rust/tokio/src/time/instant.rs b/third_party/rust/tokio/src/time/instant.rs
new file mode 100644
index 0000000000..f18492930a
--- /dev/null
+++ b/third_party/rust/tokio/src/time/instant.rs
@@ -0,0 +1,223 @@
+#![allow(clippy::trivially_copy_pass_by_ref)]
+
+use std::fmt;
+use std::ops;
+use std::time::Duration;
+
+/// A measurement of a monotonically nondecreasing clock.
+/// Opaque and useful only with `Duration`.
+///
+/// Instants are always guaranteed to be no less than any previously measured
+/// instant when created, and are often useful for tasks such as measuring
+/// benchmarks or timing how long an operation takes.
+///
+/// Note, however, that instants are not guaranteed to be **steady**. In other
+/// words, each tick of the underlying clock may not be the same length (e.g.
+/// some seconds may be longer than others). An instant may jump forwards or
+/// experience time dilation (slow down or speed up), but it will never go
+/// backwards.
+///
+/// Instants are opaque types that can only be compared to one another. There is
+/// no method to get "the number of seconds" from an instant. Instead, it only
+/// allows measuring the duration between two instants (or comparing two
+/// instants).
+///
+/// The size of an `Instant` struct may vary depending on the target operating
+/// system.
+///
+/// # Note
+///
+/// This type wraps the inner `std` variant and is used to align the Tokio
+/// clock for uses of `now()`. This can be useful for testing where you can
+/// take advantage of `time::pause()` and `time::advance()`.
+#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)]
+pub struct Instant {
+ std: std::time::Instant,
+}
+
+impl Instant {
+ /// Returns an instant corresponding to "now".
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time::Instant;
+ ///
+ /// let now = Instant::now();
+ /// ```
+ pub fn now() -> Instant {
+ variant::now()
+ }
+
+ /// Create a `tokio::time::Instant` from a `std::time::Instant`.
+ pub fn from_std(std: std::time::Instant) -> Instant {
+ Instant { std }
+ }
+
+ pub(crate) fn far_future() -> Instant {
+ // Roughly 30 years from now.
+ // API does not provide a way to obtain max `Instant`
+ // or convert specific date in the future to instant.
+ // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
+ Self::now() + Duration::from_secs(86400 * 365 * 30)
+ }
+
+ /// Convert the value into a `std::time::Instant`.
+ pub fn into_std(self) -> std::time::Instant {
+ self.std
+ }
+
+ /// Returns the amount of time elapsed from another instant to this one, or
+ /// zero duration if that instant is later than this one.
+ pub fn duration_since(&self, earlier: Instant) -> Duration {
+ self.std.saturating_duration_since(earlier.std)
+ }
+
+ /// Returns the amount of time elapsed from another instant to this one, or
+ /// None if that instant is later than this one.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant, sleep};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let now = Instant::now();
+ /// sleep(Duration::new(1, 0)).await;
+ /// let new_now = Instant::now();
+ /// println!("{:?}", new_now.checked_duration_since(now));
+ /// println!("{:?}", now.checked_duration_since(new_now)); // None
+ /// }
+ /// ```
+ pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
+ self.std.checked_duration_since(earlier.std)
+ }
+
+ /// Returns the amount of time elapsed from another instant to this one, or
+ /// zero duration if that instant is later than this one.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant, sleep};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let now = Instant::now();
+ /// sleep(Duration::new(1, 0)).await;
+ /// let new_now = Instant::now();
+ /// println!("{:?}", new_now.saturating_duration_since(now));
+ /// println!("{:?}", now.saturating_duration_since(new_now)); // 0ns
+ /// }
+ /// ```
+ pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
+ self.std.saturating_duration_since(earlier.std)
+ }
+
+ /// Returns the amount of time elapsed since this instant was created,
+ /// or zero duration if that this instant is in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant, sleep};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let instant = Instant::now();
+ /// let three_secs = Duration::from_secs(3);
+ /// sleep(three_secs).await;
+ /// assert!(instant.elapsed() >= three_secs);
+ /// }
+ /// ```
+ pub fn elapsed(&self) -> Duration {
+ Instant::now().saturating_duration_since(*self)
+ }
+
+ /// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be
+ /// represented as `Instant` (which means it's inside the bounds of the
+ /// underlying data structure), `None` otherwise.
+ pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
+ self.std.checked_add(duration).map(Instant::from_std)
+ }
+
+ /// Returns `Some(t)` where `t` is the time `self - duration` if `t` can be
+ /// represented as `Instant` (which means it's inside the bounds of the
+ /// underlying data structure), `None` otherwise.
+ pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
+ self.std.checked_sub(duration).map(Instant::from_std)
+ }
+}
+
+impl From<std::time::Instant> for Instant {
+ fn from(time: std::time::Instant) -> Instant {
+ Instant::from_std(time)
+ }
+}
+
+impl From<Instant> for std::time::Instant {
+ fn from(time: Instant) -> std::time::Instant {
+ time.into_std()
+ }
+}
+
+impl ops::Add<Duration> for Instant {
+ type Output = Instant;
+
+ fn add(self, other: Duration) -> Instant {
+ Instant::from_std(self.std + other)
+ }
+}
+
+impl ops::AddAssign<Duration> for Instant {
+ fn add_assign(&mut self, rhs: Duration) {
+ *self = *self + rhs;
+ }
+}
+
+impl ops::Sub for Instant {
+ type Output = Duration;
+
+ fn sub(self, rhs: Instant) -> Duration {
+ self.std.saturating_duration_since(rhs.std)
+ }
+}
+
+impl ops::Sub<Duration> for Instant {
+ type Output = Instant;
+
+ fn sub(self, rhs: Duration) -> Instant {
+ Instant::from_std(self.std - rhs)
+ }
+}
+
+impl ops::SubAssign<Duration> for Instant {
+ fn sub_assign(&mut self, rhs: Duration) {
+ *self = *self - rhs;
+ }
+}
+
+impl fmt::Debug for Instant {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.std.fmt(fmt)
+ }
+}
+
+#[cfg(not(feature = "test-util"))]
+mod variant {
+ use super::Instant;
+
+ pub(super) fn now() -> Instant {
+ Instant::from_std(std::time::Instant::now())
+ }
+}
+
+#[cfg(feature = "test-util")]
+mod variant {
+ use super::Instant;
+
+ pub(super) fn now() -> Instant {
+ crate::time::clock::now()
+ }
+}
diff --git a/third_party/rust/tokio/src/time/interval.rs b/third_party/rust/tokio/src/time/interval.rs
new file mode 100644
index 0000000000..8ecb15b389
--- /dev/null
+++ b/third_party/rust/tokio/src/time/interval.rs
@@ -0,0 +1,531 @@
+use crate::future::poll_fn;
+use crate::time::{sleep_until, Duration, Instant, Sleep};
+use crate::util::trace;
+
+use std::panic::Location;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{convert::TryInto, future::Future};
+
+/// Creates new [`Interval`] that yields with interval of `period`. The first
+/// tick completes immediately. The default [`MissedTickBehavior`] is
+/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
+/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
+///
+/// An interval will tick indefinitely. At any time, the [`Interval`] value can
+/// be dropped. This cancels the interval.
+///
+/// This function is equivalent to
+/// [`interval_at(Instant::now(), period)`](interval_at).
+///
+/// # Panics
+///
+/// This function panics if `period` is zero.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::time::{self, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut interval = time::interval(Duration::from_millis(10));
+///
+/// interval.tick().await; // ticks immediately
+/// interval.tick().await; // ticks after 10ms
+/// interval.tick().await; // ticks after 10ms
+///
+/// // approximately 20ms have elapsed.
+/// }
+/// ```
+///
+/// A simple example using `interval` to execute a task every two seconds.
+///
+/// The difference between `interval` and [`sleep`] is that an [`Interval`]
+/// measures the time since the last tick, which means that [`.tick().await`]
+/// may wait for a shorter time than the duration specified for the interval
+/// if some time has passed between calls to [`.tick().await`].
+///
+/// If the tick in the example below was replaced with [`sleep`], the task
+/// would only be executed once every three seconds, and not every two
+/// seconds.
+///
+/// ```
+/// use tokio::time;
+///
+/// async fn task_that_takes_a_second() {
+/// println!("hello");
+/// time::sleep(time::Duration::from_secs(1)).await
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut interval = time::interval(time::Duration::from_secs(2));
+/// for _i in 0..5 {
+/// interval.tick().await;
+/// task_that_takes_a_second().await;
+/// }
+/// }
+/// ```
+///
+/// [`sleep`]: crate::time::sleep()
+/// [`.tick().await`]: Interval::tick
+#[track_caller]
+pub fn interval(period: Duration) -> Interval {
+ assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
+ internal_interval_at(Instant::now(), period, trace::caller_location())
+}
+
+/// Creates new [`Interval`] that yields with interval of `period` with the
+/// first tick completing at `start`. The default [`MissedTickBehavior`] is
+/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
+/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
+///
+/// An interval will tick indefinitely. At any time, the [`Interval`] value can
+/// be dropped. This cancels the interval.
+///
+/// # Panics
+///
+/// This function panics if `period` is zero.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::time::{interval_at, Duration, Instant};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let start = Instant::now() + Duration::from_millis(50);
+/// let mut interval = interval_at(start, Duration::from_millis(10));
+///
+/// interval.tick().await; // ticks after 50ms
+/// interval.tick().await; // ticks after 10ms
+/// interval.tick().await; // ticks after 10ms
+///
+/// // approximately 70ms have elapsed.
+/// }
+/// ```
+#[track_caller]
+pub fn interval_at(start: Instant, period: Duration) -> Interval {
+ assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
+ internal_interval_at(start, period, trace::caller_location())
+}
+
+#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+fn internal_interval_at(
+ start: Instant,
+ period: Duration,
+ location: Option<&'static Location<'static>>,
+) -> Interval {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = location.expect("should have location if tracing");
+
+ tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Interval",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ )
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let delay = resource_span.in_scope(|| Box::pin(sleep_until(start)));
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let delay = Box::pin(sleep_until(start));
+
+ Interval {
+ delay,
+ period,
+ missed_tick_behavior: Default::default(),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+}
+
+/// Defines the behavior of an [`Interval`] when it misses a tick.
+///
+/// Sometimes, an [`Interval`]'s tick is missed. For example, consider the
+/// following:
+///
+/// ```
+/// use tokio::time::{self, Duration};
+/// # async fn task_that_takes_one_to_three_millis() {}
+///
+/// #[tokio::main]
+/// async fn main() {
+/// // ticks every 2 milliseconds
+/// let mut interval = time::interval(Duration::from_millis(2));
+/// for _ in 0..5 {
+/// interval.tick().await;
+/// // if this takes more than 2 milliseconds, a tick will be delayed
+/// task_that_takes_one_to_three_millis().await;
+/// }
+/// }
+/// ```
+///
+/// Generally, a tick is missed if too much time is spent without calling
+/// [`Interval::tick()`].
+///
+/// By default, when a tick is missed, [`Interval`] fires ticks as quickly as it
+/// can until it is "caught up" in time to where it should be.
+/// `MissedTickBehavior` can be used to specify a different behavior for
+/// [`Interval`] to exhibit. Each variant represents a different strategy.
+///
+/// Note that because the executor cannot guarantee exact precision with timers,
+/// these strategies will only apply when the delay is greater than 5
+/// milliseconds.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum MissedTickBehavior {
+ /// Ticks as fast as possible until caught up.
+ ///
+ /// When this strategy is used, [`Interval`] schedules ticks "normally" (the
+ /// same as it would have if the ticks hadn't been delayed), which results
+ /// in it firing ticks as fast as possible until it is caught up in time to
+ /// where it should be. Unlike [`Delay`] and [`Skip`], the ticks yielded
+ /// when `Burst` is used (the [`Instant`]s that [`tick`](Interval::tick)
+ /// yields) aren't different than they would have been if a tick had not
+ /// been missed. Like [`Skip`], and unlike [`Delay`], the ticks may be
+ /// shortened.
+ ///
+ /// This looks something like this:
+ /// ```text
+ /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+ /// Actual ticks: | work -----| delay | work | work | work -| work -----|
+ /// ```
+ ///
+ /// In code:
+ ///
+ /// ```
+ /// use tokio::time::{interval, Duration};
+ /// # async fn task_that_takes_200_millis() {}
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let mut interval = interval(Duration::from_millis(50));
+ ///
+ /// task_that_takes_200_millis().await;
+ /// // The `Interval` has missed a tick
+ ///
+ /// // Since we have exceeded our timeout, this will resolve immediately
+ /// interval.tick().await;
+ ///
+ /// // Since we are more than 100ms after the start of `interval`, this will
+ /// // also resolve immediately.
+ /// interval.tick().await;
+ ///
+ /// // Also resolves immediately, because it was supposed to resolve at
+ /// // 150ms after the start of `interval`
+ /// interval.tick().await;
+ ///
+ /// // Resolves immediately
+ /// interval.tick().await;
+ ///
+ /// // Since we have gotten to 200ms after the start of `interval`, this
+ /// // will resolve after 50ms
+ /// interval.tick().await;
+ /// # }
+ /// ```
+ ///
+ /// This is the default behavior when [`Interval`] is created with
+ /// [`interval`] and [`interval_at`].
+ ///
+ /// [`Delay`]: MissedTickBehavior::Delay
+ /// [`Skip`]: MissedTickBehavior::Skip
+ Burst,
+
+ /// Tick at multiples of `period` from when [`tick`] was called, rather than
+ /// from `start`.
+ ///
+ /// When this strategy is used and [`Interval`] has missed a tick, instead
+ /// of scheduling ticks to fire at multiples of `period` from `start` (the
+ /// time when the first tick was fired), it schedules all future ticks to
+ /// happen at a regular `period` from the point when [`tick`] was called.
+ /// Unlike [`Burst`] and [`Skip`], ticks are not shortened, and they aren't
+ /// guaranteed to happen at a multiple of `period` from `start` any longer.
+ ///
+ /// This looks something like this:
+ /// ```text
+ /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+ /// Actual ticks: | work -----| delay | work -----| work -----| work -----|
+ /// ```
+ ///
+ /// In code:
+ ///
+ /// ```
+ /// use tokio::time::{interval, Duration, MissedTickBehavior};
+ /// # async fn task_that_takes_more_than_50_millis() {}
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let mut interval = interval(Duration::from_millis(50));
+ /// interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+ ///
+ /// task_that_takes_more_than_50_millis().await;
+ /// // The `Interval` has missed a tick
+ ///
+ /// // Since we have exceeded our timeout, this will resolve immediately
+ /// interval.tick().await;
+ ///
+ /// // But this one, rather than also resolving immediately, as might happen
+ /// // with the `Burst` or `Skip` behaviors, will not resolve until
+ /// // 50ms after the call to `tick` up above. That is, in `tick`, when we
+ /// // recognize that we missed a tick, we schedule the next tick to happen
+ /// // 50ms (or whatever the `period` is) from right then, not from when
+ /// // were were *supposed* to tick
+ /// interval.tick().await;
+ /// # }
+ /// ```
+ ///
+ /// [`Burst`]: MissedTickBehavior::Burst
+ /// [`Skip`]: MissedTickBehavior::Skip
+ /// [`tick`]: Interval::tick
+ Delay,
+
+ /// Skips missed ticks and tick on the next multiple of `period` from
+ /// `start`.
+ ///
+ /// When this strategy is used, [`Interval`] schedules the next tick to fire
+ /// at the next-closest tick that is a multiple of `period` away from
+ /// `start` (the point where [`Interval`] first ticked). Like [`Burst`], all
+ /// ticks remain multiples of `period` away from `start`, but unlike
+ /// [`Burst`], the ticks may not be *one* multiple of `period` away from the
+ /// last tick. Like [`Delay`], the ticks are no longer the same as they
+ /// would have been if ticks had not been missed, but unlike [`Delay`], and
+ /// like [`Burst`], the ticks may be shortened to be less than one `period`
+ /// away from each other.
+ ///
+ /// This looks something like this:
+ /// ```text
+ /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+ /// Actual ticks: | work -----| delay | work ---| work -----| work -----|
+ /// ```
+ ///
+ /// In code:
+ ///
+ /// ```
+ /// use tokio::time::{interval, Duration, MissedTickBehavior};
+ /// # async fn task_that_takes_75_millis() {}
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let mut interval = interval(Duration::from_millis(50));
+ /// interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ ///
+ /// task_that_takes_75_millis().await;
+ /// // The `Interval` has missed a tick
+ ///
+ /// // Since we have exceeded our timeout, this will resolve immediately
+ /// interval.tick().await;
+ ///
+ /// // This one will resolve after 25ms, 100ms after the start of
+ /// // `interval`, which is the closest multiple of `period` from the start
+ /// // of `interval` after the call to `tick` up above.
+ /// interval.tick().await;
+ /// # }
+ /// ```
+ ///
+ /// [`Burst`]: MissedTickBehavior::Burst
+ /// [`Delay`]: MissedTickBehavior::Delay
+ Skip,
+}
+
+impl MissedTickBehavior {
+ /// If a tick is missed, this method is called to determine when the next tick should happen.
+ fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant {
+ match self {
+ Self::Burst => timeout + period,
+ Self::Delay => now + period,
+ Self::Skip => {
+ now + period
+ - Duration::from_nanos(
+ ((now - timeout).as_nanos() % period.as_nanos())
+ .try_into()
+ // This operation is practically guaranteed not to
+ // fail, as in order for it to fail, `period` would
+ // have to be longer than `now - timeout`, and both
+ // would have to be longer than 584 years.
+ //
+ // If it did fail, there's not a good way to pass
+ // the error along to the user, so we just panic.
+ .expect(
+ "too much time has elapsed since the interval was supposed to tick",
+ ),
+ )
+ }
+ }
+ }
+}
+
+impl Default for MissedTickBehavior {
+ /// Returns [`MissedTickBehavior::Burst`].
+ ///
+ /// For most usecases, the [`Burst`] strategy is what is desired.
+ /// Additionally, to preserve backwards compatibility, the [`Burst`]
+ /// strategy must be the default. For these reasons,
+ /// [`MissedTickBehavior::Burst`] is the default for [`MissedTickBehavior`].
+ /// See [`Burst`] for more details.
+ ///
+ /// [`Burst`]: MissedTickBehavior::Burst
+ fn default() -> Self {
+ Self::Burst
+ }
+}
+
+/// Interval returned by [`interval`] and [`interval_at`].
+///
+/// This type allows you to wait on a sequence of instants with a certain
+/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
+/// you count the time spent between the calls to [`sleep`] as well.
+///
+/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
+///
+/// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html
+/// [`sleep`]: crate::time::sleep
+#[derive(Debug)]
+pub struct Interval {
+ /// Future that completes the next time the `Interval` yields a value.
+ delay: Pin<Box<Sleep>>,
+
+ /// The duration between values yielded by `Interval`.
+ period: Duration,
+
+ /// The strategy `Interval` should use when a tick is missed.
+ missed_tick_behavior: MissedTickBehavior,
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
+}
+
+impl Interval {
+ /// Completes when the next instant in the interval has been reached.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and
+ /// another branch completes first, then no tick has been consumed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut interval = time::interval(Duration::from_millis(10));
+ ///
+ /// interval.tick().await;
+ /// interval.tick().await;
+ /// interval.tick().await;
+ ///
+ /// // approximately 20ms have elapsed.
+ /// }
+ /// ```
+ pub async fn tick(&mut self) -> Instant {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let instant = trace::async_op(
+ || poll_fn(|cx| self.poll_tick(cx)),
+ resource_span,
+ "Interval::tick",
+ "poll_tick",
+ false,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let instant = poll_fn(|cx| self.poll_tick(cx));
+
+ instant.await
+ }
+
+ /// Polls for the next instant in the interval to be reached.
+ ///
+ /// This method can return the following values:
+ ///
+ /// * `Poll::Pending` if the next instant has not yet been reached.
+ /// * `Poll::Ready(instant)` if the next instant has been reached.
+ ///
+ /// When this method returns `Poll::Pending`, the current task is scheduled
+ /// to receive a wakeup when the instant has elapsed. Note that on multiple
+ /// calls to `poll_tick`, only the [`Waker`](std::task::Waker) from the
+ /// [`Context`] passed to the most recent call is scheduled to receive a
+ /// wakeup.
+ pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
+ // Wait for the delay to be done
+ ready!(Pin::new(&mut self.delay).poll(cx));
+
+ // Get the time when we were scheduled to tick
+ let timeout = self.delay.deadline();
+
+ let now = Instant::now();
+
+ // If a tick was not missed, and thus we are being called before the
+ // next tick is due, just schedule the next tick normally, one `period`
+ // after `timeout`
+ //
+ // However, if a tick took excessively long and we are now behind,
+ // schedule the next tick according to how the user specified with
+ // `MissedTickBehavior`
+ let next = if now > timeout + Duration::from_millis(5) {
+ self.missed_tick_behavior
+ .next_timeout(timeout, now, self.period)
+ } else {
+ timeout + self.period
+ };
+
+ self.delay.as_mut().reset(next);
+
+ // Return the time when we were scheduled to tick
+ Poll::Ready(timeout)
+ }
+
+ /// Resets the interval to complete one period after the current time.
+ ///
+ /// This method ignores [`MissedTickBehavior`] strategy.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut interval = time::interval(Duration::from_millis(100));
+ ///
+ /// interval.tick().await;
+ ///
+ /// time::sleep(Duration::from_millis(50)).await;
+ /// interval.reset();
+ ///
+ /// interval.tick().await;
+ /// interval.tick().await;
+ ///
+ /// // approximately 250ms have elapsed.
+ /// }
+ /// ```
+ pub fn reset(&mut self) {
+ self.delay.as_mut().reset(Instant::now() + self.period);
+ }
+
+ /// Returns the [`MissedTickBehavior`] strategy currently being used.
+ pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
+ self.missed_tick_behavior
+ }
+
+ /// Sets the [`MissedTickBehavior`] strategy that should be used.
+ pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
+ self.missed_tick_behavior = behavior;
+ }
+
+ /// Returns the period of the interval.
+ pub fn period(&self) -> Duration {
+ self.period
+ }
+}
diff --git a/third_party/rust/tokio/src/time/mod.rs b/third_party/rust/tokio/src/time/mod.rs
new file mode 100644
index 0000000000..281990ef9a
--- /dev/null
+++ b/third_party/rust/tokio/src/time/mod.rs
@@ -0,0 +1,114 @@
+//! Utilities for tracking time.
+//!
+//! This module provides a number of types for executing code after a set period
+//! of time.
+//!
+//! * [`Sleep`] is a future that does no work and completes at a specific [`Instant`]
+//! in time.
+//!
+//! * [`Interval`] is a stream yielding a value at a fixed period. It is
+//! initialized with a [`Duration`] and repeatedly yields each time the duration
+//! elapses.
+//!
+//! * [`Timeout`]: Wraps a future or stream, setting an upper bound to the amount
+//! of time it is allowed to execute. If the future or stream does not
+//! complete in time, then it is canceled and an error is returned.
+//!
+//! These types are sufficient for handling a large number of scenarios
+//! involving time.
+//!
+//! These types must be used from within the context of the [`Runtime`](crate::runtime::Runtime).
+//!
+//! # Examples
+//!
+//! Wait 100ms and print "100 ms have elapsed"
+//!
+//! ```
+//! use std::time::Duration;
+//! use tokio::time::sleep;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! sleep(Duration::from_millis(100)).await;
+//! println!("100 ms have elapsed");
+//! }
+//! ```
+//!
+//! Require that an operation takes no more than 1s.
+//!
+//! ```
+//! use tokio::time::{timeout, Duration};
+//!
+//! async fn long_future() {
+//! // do work here
+//! }
+//!
+//! # async fn dox() {
+//! let res = timeout(Duration::from_secs(1), long_future()).await;
+//!
+//! if res.is_err() {
+//! println!("operation timed out");
+//! }
+//! # }
+//! ```
+//!
+//! A simple example using [`interval`] to execute a task every two seconds.
+//!
+//! The difference between [`interval`] and [`sleep`] is that an [`interval`]
+//! measures the time since the last tick, which means that `.tick().await` may
+//! wait for a shorter time than the duration specified for the interval
+//! if some time has passed between calls to `.tick().await`.
+//!
+//! If the tick in the example below was replaced with [`sleep`], the task
+//! would only be executed once every three seconds, and not every two
+//! seconds.
+//!
+//! ```
+//! use tokio::time;
+//!
+//! async fn task_that_takes_a_second() {
+//! println!("hello");
+//! time::sleep(time::Duration::from_secs(1)).await
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let mut interval = time::interval(time::Duration::from_secs(2));
+//! for _i in 0..5 {
+//! interval.tick().await;
+//! task_that_takes_a_second().await;
+//! }
+//! }
+//! ```
+//!
+//! [`interval`]: crate::time::interval()
+
+mod clock;
+pub(crate) use self::clock::Clock;
+#[cfg(feature = "test-util")]
+pub use clock::{advance, pause, resume};
+
+pub(crate) mod driver;
+
+#[doc(inline)]
+pub use driver::sleep::{sleep, sleep_until, Sleep};
+
+pub mod error;
+
+mod instant;
+pub use self::instant::Instant;
+
+mod interval;
+pub use interval::{interval, interval_at, Interval, MissedTickBehavior};
+
+mod timeout;
+#[doc(inline)]
+pub use timeout::{timeout, timeout_at, Timeout};
+
+#[cfg(test)]
+#[cfg(not(loom))]
+mod tests;
+
+// Re-export for convenience
+#[doc(no_inline)]
+pub use std::time::Duration;
diff --git a/third_party/rust/tokio/src/time/tests/mod.rs b/third_party/rust/tokio/src/time/tests/mod.rs
new file mode 100644
index 0000000000..35e1060aca
--- /dev/null
+++ b/third_party/rust/tokio/src/time/tests/mod.rs
@@ -0,0 +1,22 @@
+mod test_sleep;
+
+use crate::time::{self, Instant};
+use std::time::Duration;
+
+fn assert_send<T: Send>() {}
+fn assert_sync<T: Sync>() {}
+
+#[test]
+fn registration_is_send_and_sync() {
+ use crate::time::Sleep;
+
+ assert_send::<Sleep>();
+ assert_sync::<Sleep>();
+}
+
+#[test]
+#[should_panic]
+fn sleep_is_eager() {
+ let when = Instant::now() + Duration::from_millis(100);
+ let _ = time::sleep_until(when);
+}
diff --git a/third_party/rust/tokio/src/time/tests/test_sleep.rs b/third_party/rust/tokio/src/time/tests/test_sleep.rs
new file mode 100644
index 0000000000..77ca07e319
--- /dev/null
+++ b/third_party/rust/tokio/src/time/tests/test_sleep.rs
@@ -0,0 +1,443 @@
+//use crate::time::driver::{Driver, Entry, Handle};
+
+/*
+macro_rules! poll {
+ ($e:expr) => {
+ $e.enter(|cx, e| e.poll_elapsed(cx))
+ };
+}
+
+#[test]
+fn frozen_utility_returns_correct_advanced_duration() {
+ let clock = Clock::new();
+ clock.pause();
+ let start = clock.now();
+
+ clock.advance(ms(10));
+ assert_eq!(clock.now() - start, ms(10));
+}
+
+#[test]
+fn immediate_sleep() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ let when = clock.now();
+ let mut e = task::spawn(sleep_until(&handle, when));
+
+ assert_ready_ok!(poll!(e));
+
+ assert_ok!(driver.park_timeout(Duration::from_millis(1000)));
+
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.now() - start, ms(1000));
+}
+
+#[test]
+fn delayed_sleep_level_0() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ for &i in &[1, 10, 60] {
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, start + ms(i)));
+
+ // The sleep instance has not elapsed.
+ assert_pending!(poll!(e));
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(i));
+
+ assert_ready_ok!(poll!(e));
+ }
+}
+
+#[test]
+fn sub_ms_delayed_sleep() {
+ let (mut driver, clock, handle) = setup();
+
+ for _ in 0..5 {
+ let deadline = clock.now() + ms(1) + Duration::new(0, 1);
+
+ let mut e = task::spawn(sleep_until(&handle, deadline));
+
+ assert_pending!(poll!(e));
+
+ assert_ok!(driver.park());
+ assert_ready_ok!(poll!(e));
+
+ assert!(clock.now() >= deadline);
+
+ clock.advance(Duration::new(0, 1));
+ }
+}
+
+#[test]
+fn delayed_sleep_wrapping_level_0() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ assert_ok!(driver.park_timeout(ms(5)));
+ assert_eq!(clock.now() - start, ms(5));
+
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(60)));
+
+ assert_pending!(poll!(e));
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(64));
+ assert_pending!(poll!(e));
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(65));
+
+ assert_ready_ok!(poll!(e));
+}
+
+#[test]
+fn timer_wrapping_with_higher_levels() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Set sleep to hit level 1
+ let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(64)));
+ assert_pending!(poll!(e1));
+
+ // Turn a bit
+ assert_ok!(driver.park_timeout(ms(5)));
+
+ // Set timeout such that it will hit level 0, but wrap
+ let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(60)));
+ assert_pending!(poll!(e2));
+
+ // This should result in s1 firing
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(64));
+
+ assert_ready_ok!(poll!(e1));
+ assert_pending!(poll!(e2));
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(65));
+
+ assert_ready_ok!(poll!(e1));
+}
+
+#[test]
+fn sleep_with_deadline_in_past() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create `Sleep` that elapsed immediately.
+ let mut e = task::spawn(sleep_until(&handle, clock.now() - ms(100)));
+
+ // Even though the `Sleep` expires in the past, it is not ready yet
+ // because the timer must observe it.
+ assert_ready_ok!(poll!(e));
+
+ // Turn the timer, it runs for the elapsed time
+ assert_ok!(driver.park_timeout(ms(1000)));
+
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.now() - start, ms(1000));
+}
+
+#[test]
+fn delayed_sleep_level_1() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234)));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.now() - start, ms(192));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.now() - start, ms(234));
+
+ // The sleep has elapsed.
+ assert_ready_ok!(poll!(e));
+
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234)));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ // Turn the timer with a smaller timeout than the cascade.
+ assert_ok!(driver.park_timeout(ms(100)));
+ assert_eq!(clock.now() - start, ms(100));
+
+ assert_pending!(poll!(e));
+
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.now() - start, ms(192));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.now() - start, ms(234));
+
+ // The sleep has elapsed.
+ assert_ready_ok!(poll!(e));
+}
+
+#[test]
+fn concurrently_set_two_timers_second_one_shorter() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(500)));
+ let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(200)));
+
+ // The sleep has not elapsed
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
+
+ // Sleep until a cascade
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(192));
+
+ // Sleep until the second timer.
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(200));
+
+ // The shorter sleep fires
+ assert_ready_ok!(poll!(e2));
+ assert_pending!(poll!(e1));
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(448));
+
+ assert_pending!(poll!(e1));
+
+ // Turn again, this time the time will advance to the second sleep
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(500));
+
+ assert_ready_ok!(poll!(e1));
+}
+
+#[test]
+fn short_sleep() {
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1)));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
+
+ // The sleep has elapsed.
+ assert_ready_ok!(poll!(e));
+
+ // The time has advanced to the point of the sleep elapsing.
+ assert_eq!(clock.now() - start, ms(1));
+}
+
+#[test]
+fn sorta_long_sleep_until() {
+ const MIN_5: u64 = 5 * 60 * 1000;
+
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MIN_5)));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
+
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(elapsed));
+
+ assert_pending!(poll!(e));
+ }
+
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(MIN_5));
+
+ // The sleep has elapsed.
+ assert_ready_ok!(poll!(e));
+}
+
+#[test]
+fn very_long_sleep() {
+ const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
+
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ // Create a `Sleep` that elapses in the future
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MO_5)));
+
+ // The sleep has not elapsed.
+ assert_pending!(poll!(e));
+
+ let cascades = &[
+ 12_884_901_888,
+ 12_952_010_752,
+ 12_959_875_072,
+ 12_959_997_952,
+ ];
+
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.now() - start, ms(elapsed));
+
+ assert_pending!(poll!(e));
+ }
+
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
+
+ // The time has advanced to the point of the sleep elapsing.
+ assert_eq!(clock.now() - start, ms(MO_5));
+
+ // The sleep has elapsed.
+ assert_ready_ok!(poll!(e));
+}
+
+#[test]
+fn unpark_is_delayed() {
+ // A special park that will take much longer than the requested duration
+ struct MockPark(Clock);
+
+ struct MockUnpark;
+
+ impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ assert_eq!(duration, ms(0));
+ self.0.advance(ms(436));
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {}
+ }
+
+ impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+ }
+
+ let clock = Clock::new();
+ clock.pause();
+ let start = clock.now();
+ let mut driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
+
+ let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(100)));
+ let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(101)));
+ let mut e3 = task::spawn(sleep_until(&handle, clock.now() + ms(200)));
+
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
+ assert_pending!(poll!(e3));
+
+ assert_ok!(driver.park());
+
+ assert_eq!(clock.now() - start, ms(500));
+
+ assert_ready_ok!(poll!(e1));
+ assert_ready_ok!(poll!(e2));
+ assert_ready_ok!(poll!(e3));
+}
+
+#[test]
+fn set_timeout_at_deadline_greater_than_max_timer() {
+ const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
+ const YR_5: u64 = 5 * YR_1;
+
+ let (mut driver, clock, handle) = setup();
+ let start = clock.now();
+
+ for _ in 0..5 {
+ assert_ok!(driver.park_timeout(ms(YR_1)));
+ }
+
+ let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1)));
+ assert_pending!(poll!(e));
+
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.now() - start, ms(YR_5) + ms(1));
+
+ assert_ready_ok!(poll!(e));
+}
+
+fn setup() -> (Driver<MockPark>, Clock, Handle) {
+ let clock = Clock::new();
+ clock.pause();
+ let driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
+
+ (driver, clock, handle)
+}
+
+fn sleep_until(handle: &Handle, when: Instant) -> Arc<Entry> {
+ Entry::new(&handle, when, ms(0))
+}
+
+struct MockPark(Clock);
+
+struct MockUnpark;
+
+impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.0.advance(duration);
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {}
+}
+
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+}
+
+fn ms(n: u64) -> Duration {
+ Duration::from_millis(n)
+}
+*/
diff --git a/third_party/rust/tokio/src/time/timeout.rs b/third_party/rust/tokio/src/time/timeout.rs
new file mode 100644
index 0000000000..4a93089e8e
--- /dev/null
+++ b/third_party/rust/tokio/src/time/timeout.rs
@@ -0,0 +1,202 @@
+//! Allows a future to execute for a maximum amount of time.
+//!
+//! See [`Timeout`] documentation for more details.
+//!
+//! [`Timeout`]: struct@Timeout
+
+use crate::{
+ coop,
+ time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
+ util::trace,
+};
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+/// Requires a `Future` to complete before the specified duration has elapsed.
+///
+/// If the future completes before the duration has elapsed, then the completed
+/// value is returned. Otherwise, an error is returned and the future is
+/// canceled.
+///
+/// # Cancelation
+///
+/// Cancelling a timeout is done by dropping the future. No additional cleanup
+/// or other work is required.
+///
+/// The original future may be obtained by calling [`Timeout::into_inner`]. This
+/// consumes the `Timeout`.
+///
+/// # Examples
+///
+/// Create a new `Timeout` set to expire in 10 milliseconds.
+///
+/// ```rust
+/// use tokio::time::timeout;
+/// use tokio::sync::oneshot;
+///
+/// use std::time::Duration;
+///
+/// # async fn dox() {
+/// let (tx, rx) = oneshot::channel();
+/// # tx.send(()).unwrap();
+///
+/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
+/// if let Err(_) = timeout(Duration::from_millis(10), rx).await {
+/// println!("did not receive value within 10 ms");
+/// }
+/// # }
+/// ```
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+#[track_caller]
+pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
+where
+ T: Future,
+{
+ let location = trace::caller_location();
+
+ let deadline = Instant::now().checked_add(duration);
+ let delay = match deadline {
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::far_future(location),
+ };
+ Timeout::new_with_delay(future, delay)
+}
+
+/// Requires a `Future` to complete before the specified instant in time.
+///
+/// If the future completes before the instant is reached, then the completed
+/// value is returned. Otherwise, an error is returned.
+///
+/// # Cancelation
+///
+/// Cancelling a timeout is done by dropping the future. No additional cleanup
+/// or other work is required.
+///
+/// The original future may be obtained by calling [`Timeout::into_inner`]. This
+/// consumes the `Timeout`.
+///
+/// # Examples
+///
+/// Create a new `Timeout` set to expire in 10 milliseconds.
+///
+/// ```rust
+/// use tokio::time::{Instant, timeout_at};
+/// use tokio::sync::oneshot;
+///
+/// use std::time::Duration;
+///
+/// # async fn dox() {
+/// let (tx, rx) = oneshot::channel();
+/// # tx.send(()).unwrap();
+///
+/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the
+/// // future.
+/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await {
+/// println!("did not receive value within 10 ms");
+/// }
+/// # }
+/// ```
+pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T>
+where
+ T: Future,
+{
+ let delay = sleep_until(deadline);
+
+ Timeout {
+ value: future,
+ delay,
+ }
+}
+
+pin_project! {
+ /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[derive(Debug)]
+ pub struct Timeout<T> {
+ #[pin]
+ value: T,
+ #[pin]
+ delay: Sleep,
+ }
+}
+
+impl<T> Timeout<T> {
+ pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
+ Timeout { value, delay }
+ }
+
+ /// Gets a reference to the underlying value in this timeout.
+ pub fn get_ref(&self) -> &T {
+ &self.value
+ }
+
+ /// Gets a mutable reference to the underlying value in this timeout.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+
+ /// Consumes this timeout, returning the underlying value.
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+}
+
+impl<T> Future for Timeout<T>
+where
+ T: Future,
+{
+ type Output = Result<T::Output, Elapsed>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ let had_budget_before = coop::has_budget_remaining();
+
+ // First, try polling the future
+ if let Poll::Ready(v) = me.value.poll(cx) {
+ return Poll::Ready(Ok(v));
+ }
+
+ let has_budget_now = coop::has_budget_remaining();
+
+ let delay = me.delay;
+
+ let poll_delay = || -> Poll<Self::Output> {
+ match delay.poll(cx) {
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
+ Poll::Pending => Poll::Pending,
+ }
+ };
+
+ if let (true, false) = (had_budget_before, has_budget_now) {
+ // if it is the underlying future that exhausted the budget, we poll
+ // the `delay` with an unconstrained one. This prevents pathological
+ // cases where the underlying future always exhausts the budget and
+ // we never get a chance to evaluate whether the timeout was hit or
+ // not.
+ coop::with_unconstrained(poll_delay)
+ } else {
+ poll_delay()
+ }
+ }
+}