summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/time/driver
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/time/driver')
-rw-r--r--third_party/rust/tokio/src/time/driver/entry.rs633
-rw-r--r--third_party/rust/tokio/src/time/driver/handle.rs94
-rw-r--r--third_party/rust/tokio/src/time/driver/mod.rs528
-rw-r--r--third_party/rust/tokio/src/time/driver/sleep.rs438
-rw-r--r--third_party/rust/tokio/src/time/driver/tests/mod.rs301
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/level.rs276
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/mod.rs359
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/stack.rs112
8 files changed, 2741 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/driver/entry.rs b/third_party/rust/tokio/src/time/driver/entry.rs
new file mode 100644
index 0000000000..f0ea898e12
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/entry.rs
@@ -0,0 +1,633 @@
+//! Timer state structures.
+//!
+//! This module contains the heart of the intrusive timer implementation, and as
+//! such the structures inside are full of tricky concurrency and unsafe code.
+//!
+//! # Ground rules
+//!
+//! The heart of the timer implementation here is the [`TimerShared`] structure,
+//! shared between the [`TimerEntry`] and the driver. Generally, we permit access
+//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or
+//! 2) a held driver lock.
+//!
+//! It follows from this that any changes made while holding BOTH 1 and 2 will
+//! be reliably visible, regardless of ordering. This is because of the acq/rel
+//! fences on the driver lock ensuring ordering with 2, and rust mutable
+//! reference rules for 1 (a mutable reference to an object can't be passed
+//! between threads without an acq/rel barrier, and same-thread we have local
+//! happens-before ordering).
+//!
+//! # State field
+//!
+//! Each timer has a state field associated with it. This field contains either
+//! the current scheduled time, or a special flag value indicating its state.
+//! This state can either indicate that the timer is on the 'pending' queue (and
+//! thus will be fired with an `Ok(())` result soon) or that it has already been
+//! fired/deregistered.
+//!
+//! This single state field allows for code that is firing the timer to
+//! synchronize with any racing `reset` calls reliably.
+//!
+//! # Cached vs true timeouts
+//!
+//! To allow for the use case of a timeout that is periodically reset before
+//! expiration to be as lightweight as possible, we support optimistically
+//! lock-free timer resets, in the case where a timer is rescheduled to a later
+//! point than it was originally scheduled for.
+//!
+//! This is accomplished by lazily rescheduling timers. That is, we update the
+//! state field field with the true expiration of the timer from the holder of
+//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
+//! walking lists of timers), it checks this "true when" value, and reschedules
+//! based on it.
+//!
+//! We do, however, also need to track what the expiration time was when we
+//! originally registered the timer; this is used to locate the right linked
+//! list when the timer is being cancelled. This is referred to as the "cached
+//! when" internally.
+//!
+//! There is of course a race condition between timer reset and timer
+//! expiration. If the driver fails to observe the updated expiration time, it
+//! could trigger expiration of the timer too early. However, because
+//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
+//! refuse to mark the timer as pending.
+//!
+//! [mark_pending]: TimerHandle::mark_pending
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::AtomicU64;
+use crate::loom::sync::atomic::Ordering;
+
+use crate::sync::AtomicWaker;
+use crate::time::Instant;
+use crate::util::linked_list;
+
+use super::Handle;
+
+use std::cell::UnsafeCell as StdUnsafeCell;
+use std::task::{Context, Poll, Waker};
+use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
+
+type TimerResult = Result<(), crate::time::error::Error>;
+
+const STATE_DEREGISTERED: u64 = u64::MAX;
+const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
+const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
+
+/// This structure holds the current shared state of the timer - its scheduled
+/// time (if registered), or otherwise the result of the timer completing, as
+/// well as the registered waker.
+///
+/// Generally, the StateCell is only permitted to be accessed from two contexts:
+/// Either a thread holding the corresponding &mut TimerEntry, or a thread
+/// holding the timer driver lock. The write actions on the StateCell amount to
+/// passing "ownership" of the StateCell between these contexts; moving a timer
+/// from the TimerEntry to the driver requires _both_ holding the &mut
+/// TimerEntry and the driver lock, while moving it back (firing the timer)
+/// requires only the driver lock.
+pub(super) struct StateCell {
+ /// Holds either the scheduled expiration time for this timer, or (if the
+ /// timer has been fired and is unregistered), `u64::MAX`.
+ state: AtomicU64,
+ /// If the timer is fired (an Acquire order read on state shows
+ /// `u64::MAX`), holds the result that should be returned from
+ /// polling the timer. Otherwise, the contents are unspecified and reading
+ /// without holding the driver lock is undefined behavior.
+ result: UnsafeCell<TimerResult>,
+ /// The currently-registered waker
+ waker: CachePadded<AtomicWaker>,
+}
+
+impl Default for StateCell {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl std::fmt::Debug for StateCell {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "StateCell({:?})", self.read_state())
+ }
+}
+
+impl StateCell {
+ fn new() -> Self {
+ Self {
+ state: AtomicU64::new(STATE_DEREGISTERED),
+ result: UnsafeCell::new(Ok(())),
+ waker: CachePadded(AtomicWaker::new()),
+ }
+ }
+
+ fn is_pending(&self) -> bool {
+ self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
+ }
+
+ /// Returns the current expiration time, or None if not currently scheduled.
+ fn when(&self) -> Option<u64> {
+ let cur_state = self.state.load(Ordering::Relaxed);
+
+ if cur_state == u64::MAX {
+ None
+ } else {
+ Some(cur_state)
+ }
+ }
+
+ /// If the timer is completed, returns the result of the timer. Otherwise,
+ /// returns None and registers the waker.
+ fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
+ // We must register first. This ensures that either `fire` will
+ // observe the new waker, or we will observe a racing fire to have set
+ // the state, or both.
+ self.waker.0.register_by_ref(waker);
+
+ self.read_state()
+ }
+
+ fn read_state(&self) -> Poll<TimerResult> {
+ let cur_state = self.state.load(Ordering::Acquire);
+
+ if cur_state == STATE_DEREGISTERED {
+ // SAFETY: The driver has fired this timer; this involves writing
+ // the result, and then writing (with release ordering) the state
+ // field.
+ Poll::Ready(unsafe { self.result.with(|p| *p) })
+ } else {
+ Poll::Pending
+ }
+ }
+
+ /// Marks this timer as being moved to the pending list, if its scheduled
+ /// time is not after `not_after`.
+ ///
+ /// If the timer is scheduled for a time after not_after, returns an Err
+ /// containing the current scheduled time.
+ ///
+ /// SAFETY: Must hold the driver lock.
+ unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
+ // Quick initial debug check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let mut cur_state = self.state.load(Ordering::Relaxed);
+
+ loop {
+ debug_assert!(cur_state < STATE_MIN_VALUE);
+
+ if cur_state > not_after {
+ break Err(cur_state);
+ }
+
+ match self.state.compare_exchange(
+ cur_state,
+ STATE_PENDING_FIRE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ break Ok(());
+ }
+ Err(actual_state) => {
+ cur_state = actual_state;
+ }
+ }
+ }
+ }
+
+ /// Fires the timer, setting the result to the provided result.
+ ///
+ /// Returns:
+ /// * `Some(waker) - if fired and a waker needs to be invoked once the
+ /// driver lock is released
+ /// * `None` - if fired and a waker does not need to be invoked, or if
+ /// already fired
+ ///
+ /// SAFETY: The driver lock must be held.
+ unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
+ // Quick initial check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let cur_state = self.state.load(Ordering::Relaxed);
+ if cur_state == STATE_DEREGISTERED {
+ return None;
+ }
+
+ // SAFETY: We assume the driver lock is held and the timer is not
+ // fired, so only the driver is accessing this field.
+ //
+ // We perform a release-ordered store to state below, to ensure this
+ // write is visible before the state update is visible.
+ unsafe { self.result.with_mut(|p| *p = result) };
+
+ self.state.store(STATE_DEREGISTERED, Ordering::Release);
+
+ self.waker.0.take_waker()
+ }
+
+ /// Marks the timer as registered (poll will return None) and sets the
+ /// expiration time.
+ ///
+ /// While this function is memory-safe, it should only be called from a
+ /// context holding both `&mut TimerEntry` and the driver lock.
+ fn set_expiration(&self, timestamp: u64) {
+ debug_assert!(timestamp < STATE_MIN_VALUE);
+
+ // We can use relaxed ordering because we hold the driver lock and will
+ // fence when we release the lock.
+ self.state.store(timestamp, Ordering::Relaxed);
+ }
+
+ /// Attempts to adjust the timer to a new timestamp.
+ ///
+ /// If the timer has already been fired, is pending firing, or the new
+ /// timestamp is earlier than the old timestamp, (or occasionally
+ /// spuriously) returns Err without changing the timer's state. In this
+ /// case, the timer must be deregistered and re-registered.
+ fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
+ let mut prior = self.state.load(Ordering::Relaxed);
+ loop {
+ if new_timestamp < prior || prior >= STATE_MIN_VALUE {
+ return Err(());
+ }
+
+ match self.state.compare_exchange_weak(
+ prior,
+ new_timestamp,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ return Ok(());
+ }
+ Err(true_prior) => {
+ prior = true_prior;
+ }
+ }
+ }
+ }
+
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.load(Ordering::Relaxed) != u64::MAX
+ }
+}
+
+/// A timer entry.
+///
+/// This is the handle to a timer that is controlled by the requester of the
+/// timer. As this participates in intrusive data structures, it must be pinned
+/// before polling.
+#[derive(Debug)]
+pub(super) struct TimerEntry {
+ /// Arc reference to the driver. We can only free the driver after
+ /// deregistering everything from their respective timer wheels.
+ driver: Handle,
+ /// Shared inner structure; this is part of an intrusive linked list, and
+ /// therefore other references can exist to it while mutable references to
+ /// Entry exist.
+ ///
+ /// This is manipulated only under the inner mutex. TODO: Can we use loom
+ /// cells for this?
+ inner: StdUnsafeCell<TimerShared>,
+ /// Initial deadline for the timer. This is used to register on the first
+ /// poll, as we can't register prior to being pinned.
+ initial_deadline: Option<Instant>,
+ /// Ensure the type is !Unpin
+ _m: std::marker::PhantomPinned,
+}
+
+unsafe impl Send for TimerEntry {}
+unsafe impl Sync for TimerEntry {}
+
+/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the
+/// timer entry. Generally, at most one TimerHandle exists for a timer at a time
+/// (enforced by the timer state machine).
+///
+/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats
+/// of pointer safety apply. In particular, TimerHandle does not itself enforce
+/// that the timer does still exist; however, normally an TimerHandle is created
+/// immediately before registering the timer, and is consumed when firing the
+/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce
+/// memory safety, all operations are unsafe.
+#[derive(Debug)]
+pub(crate) struct TimerHandle {
+ inner: NonNull<TimerShared>,
+}
+
+pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
+
+/// The shared state structure of a timer. This structure is shared between the
+/// frontend (`Entry`) and driver backend.
+///
+/// Note that this structure is located inside the `TimerEntry` structure.
+#[derive(Debug)]
+#[repr(C)] // required by `link_list::Link` impl
+pub(crate) struct TimerShared {
+ /// Data manipulated by the driver thread itself, only.
+ driver_state: CachePadded<TimerSharedPadded>,
+
+ /// Current state. This records whether the timer entry is currently under
+ /// the ownership of the driver, and if not, its current state (not
+ /// complete, fired, error, etc).
+ state: StateCell,
+
+ _p: PhantomPinned,
+}
+
+impl TimerShared {
+ pub(super) fn new() -> Self {
+ Self {
+ state: StateCell::default(),
+ driver_state: CachePadded(TimerSharedPadded::new()),
+ _p: PhantomPinned,
+ }
+ }
+
+ /// Gets the cached time-of-expiration value.
+ pub(super) fn cached_when(&self) -> u64 {
+ // Cached-when is only accessed under the driver lock, so we can use relaxed
+ self.driver_state.0.cached_when.load(Ordering::Relaxed)
+ }
+
+ /// Gets the true time-of-expiration value, and copies it into the cached
+ /// time-of-expiration value.
+ ///
+ /// SAFETY: Must be called with the driver lock held, and when this entry is
+ /// not in any timer wheel lists.
+ pub(super) unsafe fn sync_when(&self) -> u64 {
+ let true_when = self.true_when();
+
+ self.driver_state
+ .0
+ .cached_when
+ .store(true_when, Ordering::Relaxed);
+
+ true_when
+ }
+
+ /// Sets the cached time-of-expiration value.
+ ///
+ /// SAFETY: Must be called with the driver lock held, and when this entry is
+ /// not in any timer wheel lists.
+ unsafe fn set_cached_when(&self, when: u64) {
+ self.driver_state
+ .0
+ .cached_when
+ .store(when, Ordering::Relaxed);
+ }
+
+ /// Returns the true time-of-expiration value, with relaxed memory ordering.
+ pub(super) fn true_when(&self) -> u64 {
+ self.state.when().expect("Timer already fired")
+ }
+
+ /// Sets the true time-of-expiration value, even if it is less than the
+ /// current expiration or the timer is deregistered.
+ ///
+ /// SAFETY: Must only be called with the driver lock held and the entry not
+ /// in the timer wheel.
+ pub(super) unsafe fn set_expiration(&self, t: u64) {
+ self.state.set_expiration(t);
+ self.driver_state.0.cached_when.store(t, Ordering::Relaxed);
+ }
+
+ /// Sets the true time-of-expiration only if it is after the current.
+ pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
+ self.state.extend_expiration(t)
+ }
+
+ /// Returns a TimerHandle for this timer.
+ pub(super) fn handle(&self) -> TimerHandle {
+ TimerHandle {
+ inner: NonNull::from(self),
+ }
+ }
+
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.might_be_registered()
+ }
+}
+
+/// Additional shared state between the driver and the timer which is cache
+/// padded. This contains the information that the driver thread accesses most
+/// frequently to minimize contention. In particular, we move it away from the
+/// waker, as the waker is updated on every poll.
+#[repr(C)] // required by `link_list::Link` impl
+struct TimerSharedPadded {
+ /// A link within the doubly-linked list of timers on a particular level and
+ /// slot. Valid only if state is equal to Registered.
+ ///
+ /// Only accessed under the entry lock.
+ pointers: linked_list::Pointers<TimerShared>,
+
+ /// The expiration time for which this entry is currently registered.
+ /// Generally owned by the driver, but is accessed by the entry when not
+ /// registered.
+ cached_when: AtomicU64,
+
+ /// The true expiration time. Set by the timer future, read by the driver.
+ true_when: AtomicU64,
+}
+
+impl std::fmt::Debug for TimerSharedPadded {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TimerSharedPadded")
+ .field("when", &self.true_when.load(Ordering::Relaxed))
+ .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
+ .finish()
+ }
+}
+
+impl TimerSharedPadded {
+ fn new() -> Self {
+ Self {
+ cached_when: AtomicU64::new(0),
+ true_when: AtomicU64::new(0),
+ pointers: linked_list::Pointers::new(),
+ }
+ }
+}
+
+unsafe impl Send for TimerShared {}
+unsafe impl Sync for TimerShared {}
+
+unsafe impl linked_list::Link for TimerShared {
+ type Handle = TimerHandle;
+
+ type Target = TimerShared;
+
+ fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
+ handle.inner
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
+ TimerHandle { inner: ptr }
+ }
+
+ unsafe fn pointers(
+ target: NonNull<Self::Target>,
+ ) -> NonNull<linked_list::Pointers<Self::Target>> {
+ target.cast()
+ }
+}
+
+// ===== impl Entry =====
+
+impl TimerEntry {
+ pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self {
+ let driver = handle.clone();
+
+ Self {
+ driver,
+ inner: StdUnsafeCell::new(TimerShared::new()),
+ initial_deadline: Some(deadline),
+ _m: std::marker::PhantomPinned,
+ }
+ }
+
+ fn inner(&self) -> &TimerShared {
+ unsafe { &*self.inner.get() }
+ }
+
+ pub(crate) fn is_elapsed(&self) -> bool {
+ !self.inner().state.might_be_registered() && self.initial_deadline.is_none()
+ }
+
+ /// Cancels and deregisters the timer. This operation is irreversible.
+ pub(crate) fn cancel(self: Pin<&mut Self>) {
+ // We need to perform an acq/rel fence with the driver thread, and the
+ // simplest way to do so is to grab the driver lock.
+ //
+ // Why is this necessary? We're about to release this timer's memory for
+ // some other non-timer use. However, we've been doing a bunch of
+ // relaxed (or even non-atomic) writes from the driver thread, and we'll
+ // be doing more from _this thread_ (as this memory is interpreted as
+ // something else).
+ //
+ // It is critical to ensure that, from the point of view of the driver,
+ // those future non-timer writes happen-after the timer is fully fired,
+ // and from the purpose of this thread, the driver's writes all
+ // happen-before we drop the timer. This in turn requires us to perform
+ // an acquire-release barrier in _both_ directions between the driver
+ // and dropping thread.
+ //
+ // The lock acquisition in clear_entry serves this purpose. All of the
+ // driver manipulations happen with the lock held, so we can just take
+ // the lock and be sure that this drop happens-after everything the
+ // driver did so far and happens-before everything the driver does in
+ // the future. While we have the lock held, we also go ahead and
+ // deregister the entry if necessary.
+ unsafe { self.driver.clear_entry(NonNull::from(self.inner())) };
+ }
+
+ pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
+ unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None;
+
+ let tick = self.driver.time_source().deadline_to_tick(new_time);
+
+ if self.inner().extend_expiration(tick).is_ok() {
+ return;
+ }
+
+ unsafe {
+ self.driver.reregister(tick, self.inner().into());
+ }
+ }
+
+ pub(crate) fn poll_elapsed(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), super::Error>> {
+ if self.driver.is_shutdown() {
+ panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
+ }
+
+ if let Some(deadline) = self.initial_deadline {
+ self.as_mut().reset(deadline);
+ }
+
+ let this = unsafe { self.get_unchecked_mut() };
+
+ this.inner().state.poll(cx.waker())
+ }
+}
+
+impl TimerHandle {
+ pub(super) unsafe fn cached_when(&self) -> u64 {
+ unsafe { self.inner.as_ref().cached_when() }
+ }
+
+ pub(super) unsafe fn sync_when(&self) -> u64 {
+ unsafe { self.inner.as_ref().sync_when() }
+ }
+
+ pub(super) unsafe fn is_pending(&self) -> bool {
+ unsafe { self.inner.as_ref().state.is_pending() }
+ }
+
+ /// Forcibly sets the true and cached expiration times to the given tick.
+ ///
+ /// SAFETY: The caller must ensure that the handle remains valid, the driver
+ /// lock is held, and that the timer is not in any wheel linked lists.
+ pub(super) unsafe fn set_expiration(&self, tick: u64) {
+ self.inner.as_ref().set_expiration(tick);
+ }
+
+ /// Attempts to mark this entry as pending. If the expiration time is after
+ /// `not_after`, however, returns an Err with the current expiration time.
+ ///
+ /// If an `Err` is returned, the `cached_when` value will be updated to this
+ /// new expiration time.
+ ///
+ /// SAFETY: The caller must ensure that the handle remains valid, the driver
+ /// lock is held, and that the timer is not in any wheel linked lists.
+ /// After returning Ok, the entry must be added to the pending list.
+ pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
+ match self.inner.as_ref().state.mark_pending(not_after) {
+ Ok(()) => {
+ // mark this as being on the pending queue in cached_when
+ self.inner.as_ref().set_cached_when(u64::MAX);
+ Ok(())
+ }
+ Err(tick) => {
+ self.inner.as_ref().set_cached_when(tick);
+ Err(tick)
+ }
+ }
+ }
+
+ /// Attempts to transition to a terminal state. If the state is already a
+ /// terminal state, does nothing.
+ ///
+ /// Because the entry might be dropped after the state is moved to a
+ /// terminal state, this function consumes the handle to ensure we don't
+ /// access the entry afterwards.
+ ///
+ /// Returns the last-registered waker, if any.
+ ///
+ /// SAFETY: The driver lock must be held while invoking this function, and
+ /// the entry must not be in any wheel linked lists.
+ pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
+ self.inner.as_ref().state.fire(completed_state)
+ }
+}
+
+impl Drop for TimerEntry {
+ fn drop(&mut self) {
+ unsafe { Pin::new_unchecked(self) }.as_mut().cancel()
+ }
+}
+
+#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
+#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
+#[derive(Debug, Default)]
+struct CachePadded<T>(T);
diff --git a/third_party/rust/tokio/src/time/driver/handle.rs b/third_party/rust/tokio/src/time/driver/handle.rs
new file mode 100644
index 0000000000..b61c0476e1
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/handle.rs
@@ -0,0 +1,94 @@
+use crate::loom::sync::Arc;
+use crate::time::driver::ClockTime;
+use std::fmt;
+
+/// Handle to time driver instance.
+#[derive(Clone)]
+pub(crate) struct Handle {
+ time_source: ClockTime,
+ inner: Arc<super::Inner>,
+}
+
+impl Handle {
+ /// Creates a new timer `Handle` from a shared `Inner` timer state.
+ pub(super) fn new(inner: Arc<super::Inner>) -> Self {
+ let time_source = inner.state.lock().time_source.clone();
+ Handle { time_source, inner }
+ }
+
+ /// Returns the time source associated with this handle.
+ pub(super) fn time_source(&self) -> &ClockTime {
+ &self.time_source
+ }
+
+ /// Access the driver's inner structure.
+ pub(super) fn get(&self) -> &super::Inner {
+ &*self.inner
+ }
+
+ /// Checks whether the driver has been shutdown.
+ pub(super) fn is_shutdown(&self) -> bool {
+ self.inner.is_shutdown()
+ }
+}
+
+cfg_rt! {
+ impl Handle {
+ /// Tries to get a handle to the current timer.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current timer set.
+ ///
+ /// It can be triggered when [`Builder::enable_time`] or
+ /// [`Builder::enable_all`] are not included in the builder.
+ ///
+ /// It can also panic whenever a timer is created outside of a
+ /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+ /// since the function is executed outside of the runtime.
+ /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+ /// And this is because wrapping the function on an async makes it lazy,
+ /// and so gets executed inside the runtime successfully without
+ /// panicking.
+ ///
+ /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+ /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+ pub(crate) fn current() -> Self {
+ crate::runtime::context::time_handle()
+ .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
+ }
+ }
+}
+
+cfg_not_rt! {
+ impl Handle {
+ /// Tries to get a handle to the current timer.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current timer set.
+ ///
+ /// It can be triggered when [`Builder::enable_time`] or
+ /// [`Builder::enable_all`] are not included in the builder.
+ ///
+ /// It can also panic whenever a timer is created outside of a
+ /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+ /// since the function is executed outside of the runtime.
+ /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+ /// And this is because wrapping the function on an async makes it lazy,
+ /// and so gets executed inside the runtime successfully without
+ /// panicking.
+ ///
+ /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+ /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+ pub(crate) fn current() -> Self {
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
+ }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/mod.rs b/third_party/rust/tokio/src/time/driver/mod.rs
new file mode 100644
index 0000000000..9971877479
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/mod.rs
@@ -0,0 +1,528 @@
+// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
+// in the future, this will change to the reverse. For now, suppress this
+// warning and generally stick with being explicit about unsafety.
+#![allow(unused_unsafe)]
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
+//! Time driver.
+
+mod entry;
+pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
+
+mod handle;
+pub(crate) use self::handle::Handle;
+
+mod wheel;
+
+pub(super) mod sleep;
+
+use crate::loom::sync::atomic::{AtomicBool, Ordering};
+use crate::loom::sync::{Arc, Mutex};
+use crate::park::{Park, Unpark};
+use crate::time::error::Error;
+use crate::time::{Clock, Duration, Instant};
+
+use std::convert::TryInto;
+use std::fmt;
+use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
+
+/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
+///
+/// A `Driver` instance tracks the state necessary for managing time and
+/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
+///
+/// It is expected that a single instance manages many individual [`Sleep`][sleep]
+/// instances. The `Driver` implementation is thread-safe and, as such, is able
+/// to handle callers from across threads.
+///
+/// After creating the `Driver` instance, the caller must repeatedly call `park`
+/// or `park_timeout`. The time driver will perform no work unless `park` or
+/// `park_timeout` is called repeatedly.
+///
+/// The driver has a resolution of one millisecond. Any unit of time that falls
+/// between milliseconds are rounded up to the next millisecond.
+///
+/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
+/// elapsed will be notified with an error. At this point, calling `poll` on the
+/// [`Sleep`][sleep] instance will result in panic.
+///
+/// # Implementation
+///
+/// The time driver is based on the [paper by Varghese and Lauck][paper].
+///
+/// A hashed timing wheel is a vector of slots, where each slot handles a time
+/// slice. As time progresses, the timer walks over the slot for the current
+/// instant, and processes each entry for that slot. When the timer reaches the
+/// end of the wheel, it starts again at the beginning.
+///
+/// The implementation maintains six wheels arranged in a set of levels. As the
+/// levels go up, the slots of the associated wheel represent larger intervals
+/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
+/// time equal to the wheel at the lower level. At level zero, each slot
+/// represents one millisecond of time.
+///
+/// The wheels are:
+///
+/// * Level 0: 64 x 1 millisecond slots.
+/// * Level 1: 64 x 64 millisecond slots.
+/// * Level 2: 64 x ~4 second slots.
+/// * Level 3: 64 x ~4 minute slots.
+/// * Level 4: 64 x ~4 hour slots.
+/// * Level 5: 64 x ~12 day slots.
+///
+/// When the timer processes entries at level zero, it will notify all the
+/// `Sleep` instances as their deadlines have been reached. For all higher
+/// levels, all entries will be redistributed across the wheel at the next level
+/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
+/// either be canceled (dropped) or their associated entries will reach level
+/// zero and be notified.
+///
+/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
+/// [sleep]: crate::time::Sleep
+/// [timeout]: crate::time::Timeout
+/// [interval]: crate::time::Interval
+#[derive(Debug)]
+pub(crate) struct Driver<P: Park + 'static> {
+ /// Timing backend in use.
+ time_source: ClockTime,
+
+ /// Shared state.
+ handle: Handle,
+
+ /// Parker to delegate to.
+ park: P,
+
+ // When `true`, a call to `park_timeout` should immediately return and time
+ // should not advance. One reason for this to be `true` is if the task
+ // passed to `Runtime::block_on` called `task::yield_now()`.
+ //
+ // While it may look racy, it only has any effect when the clock is paused
+ // and pausing the clock is restricted to a single-threaded runtime.
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
+}
+
+/// A structure which handles conversion from Instants to u64 timestamps.
+#[derive(Debug, Clone)]
+pub(self) struct ClockTime {
+ clock: super::clock::Clock,
+ start_time: Instant,
+}
+
+impl ClockTime {
+ pub(self) fn new(clock: Clock) -> Self {
+ Self {
+ start_time: clock.now(),
+ clock,
+ }
+ }
+
+ pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
+ // Round up to the end of a ms
+ self.instant_to_tick(t + Duration::from_nanos(999_999))
+ }
+
+ pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
+ // round up
+ let dur: Duration = t
+ .checked_duration_since(self.start_time)
+ .unwrap_or_else(|| Duration::from_secs(0));
+ let ms = dur.as_millis();
+
+ ms.try_into().unwrap_or(u64::MAX)
+ }
+
+ pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
+ Duration::from_millis(t)
+ }
+
+ pub(self) fn now(&self) -> u64 {
+ self.instant_to_tick(self.clock.now())
+ }
+}
+
+/// Timer state shared between `Driver`, `Handle`, and `Registration`.
+struct Inner {
+ // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
+ pub(super) state: Mutex<InnerState>,
+
+ /// True if the driver is being shutdown.
+ pub(super) is_shutdown: AtomicBool,
+}
+
+/// Time state shared which must be protected by a `Mutex`
+struct InnerState {
+ /// Timing backend in use.
+ time_source: ClockTime,
+
+ /// The last published timer `elapsed` value.
+ elapsed: u64,
+
+ /// The earliest time at which we promise to wake up without unparking.
+ next_wake: Option<NonZeroU64>,
+
+ /// Timer wheel.
+ wheel: wheel::Wheel,
+
+ /// Unparker that can be used to wake the time driver.
+ unpark: Box<dyn Unpark>,
+}
+
+// ===== impl Driver =====
+
+impl<P> Driver<P>
+where
+ P: Park + 'static,
+{
+ /// Creates a new `Driver` instance that uses `park` to block the current
+ /// thread and `time_source` to get the current time and convert to ticks.
+ ///
+ /// Specifying the source of time is useful when testing.
+ pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
+ let time_source = ClockTime::new(clock);
+
+ let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
+
+ Driver {
+ time_source,
+ handle: Handle::new(Arc::new(inner)),
+ park,
+ #[cfg(feature = "test-util")]
+ did_wake: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Returns a handle to the timer.
+ ///
+ /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances
+ /// can either be created directly or the `Handle` instance can be passed to
+ /// `with_default`, setting the timer as the default timer for the execution
+ /// context.
+ pub(crate) fn handle(&self) -> Handle {
+ self.handle.clone()
+ }
+
+ fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
+ let mut lock = self.handle.get().state.lock();
+
+ assert!(!self.handle.is_shutdown());
+
+ let next_wake = lock.wheel.next_expiration_time();
+ lock.next_wake =
+ next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
+
+ drop(lock);
+
+ match next_wake {
+ Some(when) => {
+ let now = self.time_source.now();
+ // Note that we effectively round up to 1ms here - this avoids
+ // very short-duration microsecond-resolution sleeps that the OS
+ // might treat as zero-length.
+ let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
+
+ if duration > Duration::from_millis(0) {
+ if let Some(limit) = limit {
+ duration = std::cmp::min(limit, duration);
+ }
+
+ self.park_timeout(duration)?;
+ } else {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ }
+ }
+ None => {
+ if let Some(duration) = limit {
+ self.park_timeout(duration)?;
+ } else {
+ self.park.park()?;
+ }
+ }
+ }
+
+ // Process pending timers after waking up
+ self.handle.process();
+
+ Ok(())
+ }
+
+ cfg_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ let clock = &self.time_source.clock;
+
+ if clock.is_paused() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+
+ // If the time driver was woken, then the park completed
+ // before the "duration" elapsed (usually caused by a
+ // yield in `Runtime::block_on`). In this case, we don't
+ // advance the clock.
+ if !self.did_wake() {
+ // Simulate advancing time
+ clock.advance(duration);
+ }
+ } else {
+ self.park.park_timeout(duration)?;
+ }
+
+ Ok(())
+ }
+
+ fn did_wake(&self) -> bool {
+ self.did_wake.swap(false, Ordering::SeqCst)
+ }
+ }
+
+ cfg_not_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ self.park.park_timeout(duration)
+ }
+ }
+}
+
+impl Handle {
+ /// Runs timer related logic, and returns the next wakeup time
+ pub(self) fn process(&self) {
+ let now = self.time_source().now();
+
+ self.process_at_time(now)
+ }
+
+ pub(self) fn process_at_time(&self, mut now: u64) {
+ let mut waker_list: [Option<Waker>; 32] = Default::default();
+ let mut waker_idx = 0;
+
+ let mut lock = self.get().lock();
+
+ if now < lock.elapsed {
+ // Time went backwards! This normally shouldn't happen as the Rust language
+ // guarantees that an Instant is monotonic, but can happen when running
+ // Linux in a VM on a Windows host due to std incorrectly trusting the
+ // hardware clock to be monotonic.
+ //
+ // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
+ now = lock.elapsed;
+ }
+
+ while let Some(entry) = lock.wheel.poll(now) {
+ debug_assert!(unsafe { entry.is_pending() });
+
+ // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
+ if let Some(waker) = unsafe { entry.fire(Ok(())) } {
+ waker_list[waker_idx] = Some(waker);
+
+ waker_idx += 1;
+
+ if waker_idx == waker_list.len() {
+ // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
+ drop(lock);
+
+ for waker in waker_list.iter_mut() {
+ waker.take().unwrap().wake();
+ }
+
+ waker_idx = 0;
+
+ lock = self.get().lock();
+ }
+ }
+ }
+
+ // Update the elapsed cache
+ lock.elapsed = lock.wheel.elapsed();
+ lock.next_wake = lock
+ .wheel
+ .poll_at()
+ .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
+
+ drop(lock);
+
+ for waker in waker_list[0..waker_idx].iter_mut() {
+ waker.take().unwrap().wake();
+ }
+ }
+
+ /// Removes a registered timer from the driver.
+ ///
+ /// The timer will be moved to the cancelled state. Wakers will _not_ be
+ /// invoked. If the timer is already completed, this function is a no-op.
+ ///
+ /// This function always acquires the driver lock, even if the entry does
+ /// not appear to be registered.
+ ///
+ /// SAFETY: The timer must not be registered with some other driver, and
+ /// `add_entry` must not be called concurrently.
+ pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
+ unsafe {
+ let mut lock = self.get().lock();
+
+ if entry.as_ref().might_be_registered() {
+ lock.wheel.remove(entry);
+ }
+
+ entry.as_ref().handle().fire(Ok(()));
+ }
+ }
+
+ /// Removes and re-adds an entry to the driver.
+ ///
+ /// SAFETY: The timer must be either unregistered, or registered with this
+ /// driver. No other threads are allowed to concurrently manipulate the
+ /// timer at all (the current thread should hold an exclusive reference to
+ /// the `TimerEntry`)
+ pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
+ let waker = unsafe {
+ let mut lock = self.get().lock();
+
+ // We may have raced with a firing/deregistration, so check before
+ // deregistering.
+ if unsafe { entry.as_ref().might_be_registered() } {
+ lock.wheel.remove(entry);
+ }
+
+ // Now that we have exclusive control of this entry, mint a handle to reinsert it.
+ let entry = entry.as_ref().handle();
+
+ if self.is_shutdown() {
+ unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
+ } else {
+ entry.set_expiration(new_tick);
+
+ // Note: We don't have to worry about racing with some other resetting
+ // thread, because add_entry and reregister require exclusive control of
+ // the timer entry.
+ match unsafe { lock.wheel.insert(entry) } {
+ Ok(when) => {
+ if lock
+ .next_wake
+ .map(|next_wake| when < next_wake.get())
+ .unwrap_or(true)
+ {
+ lock.unpark.unpark();
+ }
+
+ None
+ }
+ Err((entry, super::error::InsertError::Elapsed)) => unsafe {
+ entry.fire(Ok(()))
+ },
+ }
+ }
+
+ // Must release lock before invoking waker to avoid the risk of deadlock.
+ };
+
+ // The timer was fired synchronously as a result of the reregistration.
+ // Wake the waker; this is needed because we might reset _after_ a poll,
+ // and otherwise the task won't be awoken to poll again.
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
+}
+
+impl<P> Park for Driver<P>
+where
+ P: Park + 'static,
+{
+ type Unpark = TimerUnpark<P>;
+ type Error = P::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ TimerUnpark::new(self)
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park_internal(None)
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park_internal(Some(duration))
+ }
+
+ fn shutdown(&mut self) {
+ if self.handle.is_shutdown() {
+ return;
+ }
+
+ self.handle.get().is_shutdown.store(true, Ordering::SeqCst);
+
+ // Advance time forward to the end of time.
+
+ self.handle.process_at_time(u64::MAX);
+
+ self.park.shutdown();
+ }
+}
+
+impl<P> Drop for Driver<P>
+where
+ P: Park + 'static,
+{
+ fn drop(&mut self) {
+ self.shutdown();
+ }
+}
+
+pub(crate) struct TimerUnpark<P: Park + 'static> {
+ inner: P::Unpark,
+
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
+}
+
+impl<P: Park + 'static> TimerUnpark<P> {
+ fn new(driver: &Driver<P>) -> TimerUnpark<P> {
+ TimerUnpark {
+ inner: driver.park.unpark(),
+
+ #[cfg(feature = "test-util")]
+ did_wake: driver.did_wake.clone(),
+ }
+ }
+}
+
+impl<P: Park + 'static> Unpark for TimerUnpark<P> {
+ fn unpark(&self) {
+ #[cfg(feature = "test-util")]
+ self.did_wake.store(true, Ordering::SeqCst);
+
+ self.inner.unpark();
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
+ Inner {
+ state: Mutex::new(InnerState {
+ time_source,
+ elapsed: 0,
+ next_wake: None,
+ unpark,
+ wheel: wheel::Wheel::new(),
+ }),
+ is_shutdown: AtomicBool::new(false),
+ }
+ }
+
+ /// Locks the driver's inner structure
+ pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
+ self.state.lock()
+ }
+
+ // Check whether the driver has been shutdown
+ pub(super) fn is_shutdown(&self) -> bool {
+ self.is_shutdown.load(Ordering::SeqCst)
+ }
+}
+
+impl fmt::Debug for Inner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Inner").finish()
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/third_party/rust/tokio/src/time/driver/sleep.rs b/third_party/rust/tokio/src/time/driver/sleep.rs
new file mode 100644
index 0000000000..7f27ef201f
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/sleep.rs
@@ -0,0 +1,438 @@
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::time::driver::ClockTime;
+use crate::time::driver::{Handle, TimerEntry};
+use crate::time::{error::Error, Duration, Instant};
+use crate::util::trace;
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::panic::Location;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+/// Waits until `deadline` is reached.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep_until, Instant, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_until"))]
+#[track_caller]
+pub fn sleep_until(deadline: Instant) -> Sleep {
+ return Sleep::new_timeout(deadline, trace::caller_location());
+}
+
+/// Waits until `duration` has elapsed.
+///
+/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
+/// analog to `std::thread::sleep`.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep(Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_for"))]
+#[cfg_attr(docsrs, doc(alias = "wait"))]
+#[track_caller]
+pub fn sleep(duration: Duration) -> Sleep {
+ let location = trace::caller_location();
+
+ match Instant::now().checked_add(duration) {
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::new_timeout(Instant::far_future(), location),
+ }
+}
+
+pin_project! {
+ /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
+ ///
+ /// This type does not implement the `Unpin` trait, which means that if you
+ /// use it with [`select!`] or by calling `poll`, you have to pin it first.
+ /// If you use it with `.await`, this does not apply.
+ ///
+ /// # Examples
+ ///
+ /// Wait 100ms and print "100 ms have elapsed".
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// sleep(Duration::from_millis(100)).await;
+ /// println!("100 ms have elapsed");
+ /// }
+ /// ```
+ ///
+ /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
+ /// necessary when the same `Sleep` is selected on multiple times.
+ /// ```no_run
+ /// use tokio::time::{self, Duration, Instant};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let sleep = time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// loop {
+ /// tokio::select! {
+ /// () = &mut sleep => {
+ /// println!("timer elapsed");
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
+ /// },
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
+ /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ ///
+ /// struct HasSleep {
+ /// sleep: Pin<Box<Sleep>>,
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.sleep.as_mut().poll(cx)
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with pin projection. This method avoids the `Box`, but
+ /// the `HasSleep` struct will not be `Unpin` as a consequence.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ /// use pin_project_lite::pin_project;
+ ///
+ /// pin_project! {
+ /// struct HasSleep {
+ /// #[pin]
+ /// sleep: Sleep,
+ /// }
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.project().sleep.poll(cx)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`select!`]: ../macro.select.html
+ /// [`tokio::pin!`]: ../macro.pin.html
+ // Alias for old name in 0.2
+ #[cfg_attr(docsrs, doc(alias = "Delay"))]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Sleep {
+ inner: Inner,
+
+ // The link between the `Sleep` instance and the timer that drives it.
+ #[pin]
+ entry: TimerEntry,
+ }
+}
+
+cfg_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ ctx: trace::AsyncOpTracingCtx,
+ time_source: ClockTime,
+ }
+}
+
+cfg_not_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ }
+}
+
+impl Sleep {
+ #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+ pub(crate) fn new_timeout(
+ deadline: Instant,
+ location: Option<&'static Location<'static>>,
+ ) -> Sleep {
+ let handle = Handle::current();
+ let entry = TimerEntry::new(&handle, deadline);
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = {
+ let time_source = handle.time_source().clone();
+ let deadline_tick = time_source.deadline_to_tick(deadline);
+ let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);
+
+ let location = location.expect("should have location if tracing");
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Sleep",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ let async_op_span = resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
+ });
+
+ let async_op_poll_span =
+ async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
+
+ let ctx = trace::AsyncOpTracingCtx {
+ async_op_span,
+ async_op_poll_span,
+ resource_span,
+ };
+
+ Inner {
+ deadline,
+ ctx,
+ time_source,
+ }
+ };
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = Inner { deadline };
+
+ Sleep { inner, entry }
+ }
+
+ pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
+ Self::new_timeout(Instant::far_future(), location)
+ }
+
+ /// Returns the instant at which the future will complete.
+ pub fn deadline(&self) -> Instant {
+ self.inner.deadline
+ }
+
+ /// Returns `true` if `Sleep` has elapsed.
+ ///
+ /// A `Sleep` instance is elapsed when the requested duration has elapsed.
+ pub fn is_elapsed(&self) -> bool {
+ self.entry.is_elapsed()
+ }
+
+ /// Resets the `Sleep` instance to a new deadline.
+ ///
+ /// Calling this function allows changing the instant at which the `Sleep`
+ /// future completes without having to create new associated state.
+ ///
+ /// This function can be called both before and after the future has
+ /// completed.
+ ///
+ /// To call this method, you will usually combine the call with
+ /// [`Pin::as_mut`], which lets you call the method without consuming the
+ /// `Sleep` itself.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant};
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let sleep = tokio::time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
+ /// # }
+ /// ```
+ ///
+ /// See also the top-level examples.
+ ///
+ /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
+ pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
+ self.reset_inner(deadline)
+ }
+
+ fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
+ let me = self.project();
+ me.entry.reset(deadline);
+ (*me.inner).deadline = deadline;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ {
+ let _resource_enter = me.inner.ctx.resource_span.enter();
+ me.inner.ctx.async_op_span =
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
+ let _async_op_enter = me.inner.ctx.async_op_span.enter();
+
+ me.inner.ctx.async_op_poll_span =
+ tracing::trace_span!("runtime.resource.async_op.poll");
+
+ let duration = {
+ let now = me.inner.time_source.now();
+ let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
+ deadline_tick.checked_sub(now).unwrap_or(0)
+ };
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+ }
+ }
+
+ fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let me = self.project();
+
+ // Keep track of task budget
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let coop = ready!(trace_poll_op!(
+ "poll_elapsed",
+ crate::coop::poll_proceed(cx),
+ ));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let result = me.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ return trace_poll_op!("poll_elapsed", result);
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ return result;
+ }
+}
+
+impl Future for Sleep {
+ type Output = ();
+
+ // `poll_elapsed` can return an error in two cases:
+ //
+ // - AtCapacity: this is a pathological case where far too many
+ // sleep instances have been scheduled.
+ // - Shutdown: No timer has been setup, which is a mis-use error.
+ //
+ // Both cases are extremely rare, and pretty accurately fit into
+ // "logic errors", so we just panic in this case. A user couldn't
+ // really do much better if we passed the error onwards.
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _res_span = self.inner.ctx.resource_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_span = self.inner.ctx.async_op_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered();
+ match ready!(self.as_mut().poll_elapsed(cx)) {
+ Ok(()) => Poll::Ready(()),
+ Err(e) => panic!("timer error: {}", e),
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/tests/mod.rs b/third_party/rust/tokio/src/time/driver/tests/mod.rs
new file mode 100644
index 0000000000..3ac8c75643
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/tests/mod.rs
@@ -0,0 +1,301 @@
+use std::{task::Context, time::Duration};
+
+#[cfg(not(loom))]
+use futures::task::noop_waker_ref;
+
+use crate::loom::sync::Arc;
+use crate::loom::thread;
+use crate::{
+ loom::sync::atomic::{AtomicBool, Ordering},
+ park::Unpark,
+};
+
+use super::{Handle, TimerEntry};
+
+struct MockUnpark {}
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+}
+impl MockUnpark {
+ fn mock() -> Box<dyn Unpark> {
+ Box::new(Self {})
+ }
+}
+
+fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
+ #[cfg(loom)]
+ return loom::future::block_on(f);
+
+ #[cfg(not(loom))]
+ {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+ rt.block_on(f)
+ }
+}
+
+fn model(f: impl Fn() + Send + Sync + 'static) {
+ #[cfg(loom)]
+ loom::model(f);
+
+ #[cfg(not(loom))]
+ f();
+}
+
+#[test]
+fn single_timer() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+ });
+
+ thread::yield_now();
+
+ // This may or may not return Some (depending on how it races with the
+ // thread). If it does return None, however, the timer should complete
+ // synchronously.
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn drop_timer() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+ });
+
+ thread::yield_now();
+
+ // advance 2s in the future.
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn change_waker() {
+ model(|| {
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+ });
+
+ thread::yield_now();
+
+ // advance 2s
+ handle.process_at_time(time_source.now() + 2_000_000_000);
+
+ jh.join().unwrap();
+ })
+}
+
+#[test]
+fn reset_future() {
+ model(|| {
+ let finished_early = Arc::new(AtomicBool::new(false));
+
+ let clock = crate::time::clock::Clock::new(true, false);
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let handle_ = handle.clone();
+ let finished_early_ = finished_early.clone();
+ let start = clock.now();
+
+ let jh = thread::spawn(move || {
+ let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
+ pin!(entry);
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
+
+ entry.as_mut().reset(start + Duration::from_secs(2));
+
+ // shouldn't complete before 2s
+ block_on(futures::future::poll_fn(|cx| {
+ entry.as_mut().poll_elapsed(cx)
+ }))
+ .unwrap();
+
+ finished_early_.store(true, Ordering::Relaxed);
+ });
+
+ thread::yield_now();
+
+ // This may or may not return a wakeup time.
+ handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500)));
+
+ assert!(!finished_early.load(Ordering::Relaxed));
+
+ handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500)));
+
+ jh.join().unwrap();
+
+ assert!(finished_early.load(Ordering::Relaxed));
+ })
+}
+
+#[cfg(not(loom))]
+fn normal_or_miri<T>(normal: T, miri: T) -> T {
+ if cfg!(miri) {
+ miri
+ } else {
+ normal
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn poll_process_levels() {
+ let clock = crate::time::clock::Clock::new(true, false);
+ clock.pause();
+
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source, MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let mut entries = vec![];
+
+ for i in 0..normal_or_miri(1024, 64) {
+ let mut entry = Box::pin(TimerEntry::new(
+ &handle,
+ clock.now() + Duration::from_millis(i),
+ ));
+
+ let _ = entry
+ .as_mut()
+ .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
+
+ entries.push(entry);
+ }
+
+ for t in 1..normal_or_miri(1024, 64) {
+ handle.process_at_time(t as u64);
+ for (deadline, future) in entries.iter_mut().enumerate() {
+ let mut context = Context::from_waker(noop_waker_ref());
+ if deadline <= t {
+ assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
+ } else {
+ assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
+ }
+ }
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn poll_process_levels_targeted() {
+ let mut context = Context::from_waker(noop_waker_ref());
+
+ let clock = crate::time::clock::Clock::new(true, false);
+ clock.pause();
+
+ let time_source = super::ClockTime::new(clock.clone());
+
+ let inner = super::Inner::new(time_source, MockUnpark::mock());
+ let handle = Handle::new(Arc::new(inner));
+
+ let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
+ pin!(e1);
+
+ handle.process_at_time(62);
+ assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
+ handle.process_at_time(192);
+ handle.process_at_time(192);
+}
+
+/*
+#[test]
+fn balanced_incr_and_decr() {
+ const OPS: usize = 5;
+
+ fn incr(inner: Arc<Inner>) {
+ for _ in 0..OPS {
+ inner.increment().expect("increment should not have failed");
+ thread::yield_now();
+ }
+ }
+
+ fn decr(inner: Arc<Inner>) {
+ let mut ops_performed = 0;
+ while ops_performed < OPS {
+ if inner.num(Ordering::Relaxed) > 0 {
+ ops_performed += 1;
+ inner.decrement();
+ }
+ thread::yield_now();
+ }
+ }
+
+ loom::model(|| {
+ let unpark = Box::new(MockUnpark);
+ let instant = Instant::now();
+
+ let inner = Arc::new(Inner::new(instant, unpark));
+
+ let incr_inner = inner.clone();
+ let decr_inner = inner.clone();
+
+ let incr_hndle = thread::spawn(move || incr(incr_inner));
+ let decr_hndle = thread::spawn(move || decr(decr_inner));
+
+ incr_hndle.join().expect("should never fail");
+ decr_hndle.join().expect("should never fail");
+
+ assert_eq!(inner.num(Ordering::SeqCst), 0);
+ })
+}
+*/
diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs
new file mode 100644
index 0000000000..878754177b
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs
@@ -0,0 +1,276 @@
+use crate::time::driver::TimerHandle;
+
+use crate::time::driver::{EntryList, TimerShared};
+
+use std::{fmt, ptr::NonNull};
+
+/// Wheel for a single level in the timer. This wheel contains 64 slots.
+pub(crate) struct Level {
+ level: usize,
+
+ /// Bit field tracking which slots currently contain entries.
+ ///
+ /// Using a bit field to track slots that contain entries allows avoiding a
+ /// scan to find entries. This field is updated when entries are added or
+ /// removed from a slot.
+ ///
+ /// The least-significant bit represents slot zero.
+ occupied: u64,
+
+ /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell.
+ slot: [EntryList; LEVEL_MULT],
+}
+
+/// Indicates when a slot must be processed next.
+#[derive(Debug)]
+pub(crate) struct Expiration {
+ /// The level containing the slot.
+ pub(crate) level: usize,
+
+ /// The slot index.
+ pub(crate) slot: usize,
+
+ /// The instant at which the slot needs to be processed.
+ pub(crate) deadline: u64,
+}
+
+/// Level multiplier.
+///
+/// Being a power of 2 is very important.
+const LEVEL_MULT: usize = 64;
+
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = EntryList::default;
+
+ Level {
+ level,
+ occupied: 0,
+ slot: [
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ],
+ }
+ }
+
+ /// Finds the slot that needs to be processed next and returns the slot and
+ /// `Instant` at which this slot must be processed.
+ pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
+ // Use the `occupied` bit field to get the index of the next slot that
+ // needs to be processed.
+ let slot = match self.next_occupied_slot(now) {
+ Some(slot) => slot,
+ None => return None,
+ };
+
+ // From the slot index, calculate the `Instant` at which it needs to be
+ // processed. This value *must* be in the future with respect to `now`.
+
+ let level_range = level_range(self.level);
+ let slot_range = slot_range(self.level);
+
+ // Compute the start date of the current level by masking the low bits
+ // of `now` (`level_range` is a power of 2).
+ let level_start = now & !(level_range - 1);
+ let mut deadline = level_start + slot as u64 * slot_range;
+
+ if deadline <= now {
+ // A timer is in a slot "prior" to the current time. This can occur
+ // because we do not have an infinite hierarchy of timer levels, and
+ // eventually a timer scheduled for a very distant time might end up
+ // being placed in a slot that is beyond the end of all of the
+ // arrays.
+ //
+ // To deal with this, we first limit timers to being scheduled no
+ // more than MAX_DURATION ticks in the future; that is, they're at
+ // most one rotation of the top level away. Then, we force timers
+ // that logically would go into the top+1 level, to instead go into
+ // the top level's slots.
+ //
+ // What this means is that the top level's slots act as a
+ // pseudo-ring buffer, and we rotate around them indefinitely. If we
+ // compute a deadline before now, and it's the top level, it
+ // therefore means we're actually looking at a slot in the future.
+ debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
+
+ deadline += level_range;
+ }
+
+ debug_assert!(
+ deadline >= now,
+ "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}",
+ deadline,
+ now,
+ self.level,
+ level_range,
+ slot_range,
+ slot,
+ self.occupied
+ );
+
+ Some(Expiration {
+ level: self.level,
+ slot,
+ deadline,
+ })
+ }
+
+ fn next_occupied_slot(&self, now: u64) -> Option<usize> {
+ if self.occupied == 0 {
+ return None;
+ }
+
+ // Get the slot for now using Maths
+ let now_slot = (now / slot_range(self.level)) as usize;
+ let occupied = self.occupied.rotate_right(now_slot as u32);
+ let zeros = occupied.trailing_zeros() as usize;
+ let slot = (zeros + now_slot) % 64;
+
+ Some(slot)
+ }
+
+ pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
+ let slot = slot_for(item.cached_when(), self.level);
+
+ self.slot[slot].push_front(item);
+
+ self.occupied |= occupied_bit(slot);
+ }
+
+ pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
+ let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
+
+ unsafe { self.slot[slot].remove(item) };
+ if self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ // Unset the bit
+ self.occupied ^= occupied_bit(slot);
+ }
+ }
+
+ pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
+ self.occupied &= !occupied_bit(slot);
+
+ std::mem::take(&mut self.slot[slot])
+ }
+}
+
+impl fmt::Debug for Level {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Level")
+ .field("occupied", &self.occupied)
+ .finish()
+ }
+}
+
+fn occupied_bit(slot: usize) -> u64 {
+ 1 << slot
+}
+
+fn slot_range(level: usize) -> u64 {
+ LEVEL_MULT.pow(level as u32) as u64
+}
+
+fn level_range(level: usize) -> u64 {
+ LEVEL_MULT as u64 * slot_range(level)
+}
+
+/// Converts a duration (milliseconds) and a level to a slot position.
+fn slot_for(duration: u64, level: usize) -> usize {
+ ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_slot_for() {
+ for pos in 0..64 {
+ assert_eq!(pos as usize, slot_for(pos, 0));
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(pos as usize, slot_for(a as u64, level));
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
new file mode 100644
index 0000000000..f088f2cfd6
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
@@ -0,0 +1,359 @@
+use crate::time::driver::{TimerHandle, TimerShared};
+use crate::time::error::InsertError;
+
+mod level;
+pub(crate) use self::level::Expiration;
+use self::level::Level;
+
+use std::ptr::NonNull;
+
+use super::EntryList;
+
+/// Timing wheel implementation.
+///
+/// This type provides the hashed timing wheel implementation that backs `Timer`
+/// and `DelayQueue`.
+///
+/// The structure is generic over `T: Stack`. This allows handling timeout data
+/// being stored on the heap or in a slab. In order to support the latter case,
+/// the slab must be passed into each function allowing the implementation to
+/// lookup timer entries.
+///
+/// See `Timer` documentation for some implementation notes.
+#[derive(Debug)]
+pub(crate) struct Wheel {
+ /// The number of milliseconds elapsed since the wheel started.
+ elapsed: u64,
+
+ /// Timer wheel.
+ ///
+ /// Levels:
+ ///
+ /// * 1 ms slots / 64 ms range
+ /// * 64 ms slots / ~ 4 sec range
+ /// * ~ 4 sec slots / ~ 4 min range
+ /// * ~ 4 min slots / ~ 4 hr range
+ /// * ~ 4 hr slots / ~ 12 day range
+ /// * ~ 12 day slots / ~ 2 yr range
+ levels: Vec<Level>,
+
+ /// Entries queued for firing
+ pending: EntryList,
+}
+
+/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
+/// each, the timer is able to track time up to 2 years into the future with a
+/// precision of 1 millisecond.
+const NUM_LEVELS: usize = 6;
+
+/// The maximum duration of a `Sleep`.
+pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
+
+impl Wheel {
+ /// Creates a new timing wheel.
+ pub(crate) fn new() -> Wheel {
+ let levels = (0..NUM_LEVELS).map(Level::new).collect();
+
+ Wheel {
+ elapsed: 0,
+ levels,
+ pending: EntryList::new(),
+ }
+ }
+
+ /// Returns the number of milliseconds that have elapsed since the timing
+ /// wheel's creation.
+ pub(crate) fn elapsed(&self) -> u64 {
+ self.elapsed
+ }
+
+ /// Inserts an entry into the timing wheel.
+ ///
+ /// # Arguments
+ ///
+ /// * `item`: The item to insert into the wheel.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
+ ///
+ /// `Err(Elapsed)` indicates that `when` represents an instant that has
+ /// already passed. In this case, the caller should fire the timeout
+ /// immediately.
+ ///
+ /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
+ ///
+ /// # Safety
+ ///
+ /// This function registers item into an intrusive linked list. The caller
+ /// must ensure that `item` is pinned and will not be dropped without first
+ /// being deregistered.
+ pub(crate) unsafe fn insert(
+ &mut self,
+ item: TimerHandle,
+ ) -> Result<u64, (TimerHandle, InsertError)> {
+ let when = item.sync_when();
+
+ if when <= self.elapsed {
+ return Err((item, InsertError::Elapsed));
+ }
+
+ // Get the level at which the entry should be stored
+ let level = self.level_for(when);
+
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+
+ debug_assert!({
+ self.levels[level]
+ .next_expiration(self.elapsed)
+ .map(|e| e.deadline >= self.elapsed)
+ .unwrap_or(true)
+ });
+
+ Ok(when)
+ }
+
+ /// Removes `item` from the timing wheel.
+ pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
+ unsafe {
+ let when = item.as_ref().cached_when();
+ if when == u64::MAX {
+ self.pending.remove(item);
+ } else {
+ debug_assert!(
+ self.elapsed <= when,
+ "elapsed={}; when={}",
+ self.elapsed,
+ when
+ );
+
+ let level = self.level_for(when);
+
+ self.levels[level].remove_entry(item);
+ }
+ }
+ }
+
+ /// Instant at which to poll.
+ pub(crate) fn poll_at(&self) -> Option<u64> {
+ self.next_expiration().map(|expiration| expiration.deadline)
+ }
+
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
+ loop {
+ if let Some(handle) = self.pending.pop_back() {
+ return Some(handle);
+ }
+
+ // under what circumstances is poll.expiration Some vs. None?
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
+
+ match expiration {
+ Some(ref expiration) if expiration.deadline > now => return None,
+ Some(ref expiration) => {
+ self.process_expiration(expiration);
+
+ self.set_elapsed(expiration.deadline);
+ }
+ None => {
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
+ break;
+ }
+ }
+ }
+
+ self.pending.pop_back()
+ }
+
+ /// Returns the instant at which the next timeout expires.
+ fn next_expiration(&self) -> Option<Expiration> {
+ if !self.pending.is_empty() {
+ // Expire immediately as we have things pending firing
+ return Some(Expiration {
+ level: 0,
+ slot: 0,
+ deadline: self.elapsed,
+ });
+ }
+
+ // Check all levels
+ for level in 0..NUM_LEVELS {
+ if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
+ // There cannot be any expirations at a higher level that happen
+ // before this one.
+ debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
+
+ return Some(expiration);
+ }
+ }
+
+ None
+ }
+
+ /// Returns the tick at which this timer wheel next needs to perform some
+ /// processing, or None if there are no timers registered.
+ pub(super) fn next_expiration_time(&self) -> Option<u64> {
+ self.next_expiration().map(|ex| ex.deadline)
+ }
+
+ /// Used for debug assertions
+ fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
+ let mut res = true;
+
+ for l2 in start_level..NUM_LEVELS {
+ if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
+ if e2.deadline < before {
+ res = false;
+ }
+ }
+ }
+
+ res
+ }
+
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// queue it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
+ pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
+ // Note that we need to take _all_ of the entries off the list before
+ // processing any of them. This is important because it's possible that
+ // those entries might need to be reinserted into the same slot.
+ //
+ // This happens only on the highest level, when an entry is inserted
+ // more than MAX_DURATION into the future. When this happens, we wrap
+ // around, and process some entries a multiple of MAX_DURATION before
+ // they actually need to be dropped down a level. We then reinsert them
+ // back into the same position; we must make sure we don't then process
+ // those entries again or we'll end up in an infinite loop.
+ let mut entries = self.take_entries(expiration);
+
+ while let Some(item) = entries.pop_back() {
+ if expiration.level == 0 {
+ debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
+ }
+
+ // Try to expire the entry; this is cheap (doesn't synchronize) if
+ // the timer is not expired, and updates cached_when.
+ match unsafe { item.mark_pending(expiration.deadline) } {
+ Ok(()) => {
+ // Item was expired
+ self.pending.push_front(item);
+ }
+ Err(expiration_tick) => {
+ let level = level_for(expiration.deadline, expiration_tick);
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+ }
+ }
+ }
+ }
+
+ fn set_elapsed(&mut self, when: u64) {
+ assert!(
+ self.elapsed <= when,
+ "elapsed={:?}; when={:?}",
+ self.elapsed,
+ when
+ );
+
+ if when > self.elapsed {
+ self.elapsed = when;
+ }
+ }
+
+ /// Obtains the list of entries that need processing for the given expiration.
+ ///
+ fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
+ self.levels[expiration.level].take_slot(expiration.slot)
+ }
+
+ fn level_for(&self, when: u64) -> usize {
+ level_for(self.elapsed, when)
+ }
+}
+
+fn level_for(elapsed: u64, when: u64) -> usize {
+ const SLOT_MASK: u64 = (1 << 6) - 1;
+
+ // Mask in the trailing bits ignored by the level calculation in order to cap
+ // the possible leading zeros
+ let mut masked = elapsed ^ when | SLOT_MASK;
+
+ if masked >= MAX_DURATION {
+ // Fudge the timer into the top level
+ masked = MAX_DURATION - 1;
+ }
+
+ let leading_zeros = masked.leading_zeros() as usize;
+ let significant = 63 - leading_zeros;
+
+ significant / 6
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_level_for() {
+ for pos in 0..64 {
+ assert_eq!(
+ 0,
+ level_for(0, pos),
+ "level_for({}) -- binary = {:b}",
+ pos,
+ pos
+ );
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+
+ if pos > level {
+ let a = a - 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+
+ if pos < 64 {
+ let a = a + 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
new file mode 100644
index 0000000000..80651c309e
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
@@ -0,0 +1,112 @@
+use super::{Item, OwnedItem};
+use crate::time::driver::Entry;
+
+use std::ptr;
+
+/// A doubly linked stack.
+#[derive(Debug)]
+pub(crate) struct Stack {
+ head: Option<OwnedItem>,
+}
+
+impl Default for Stack {
+ fn default() -> Stack {
+ Stack { head: None }
+ }
+}
+
+impl Stack {
+ pub(crate) fn is_empty(&self) -> bool {
+ self.head.is_none()
+ }
+
+ pub(crate) fn push(&mut self, entry: OwnedItem) {
+ // Get a pointer to the entry to for the prev link
+ let ptr: *const Entry = &*entry as *const _;
+
+ // Remove the old head entry
+ let old = self.head.take();
+
+ unsafe {
+ // Ensure the entry is not already in a stack.
+ debug_assert!((*entry.next_stack.get()).is_none());
+ debug_assert!((*entry.prev_stack.get()).is_null());
+
+ if let Some(ref entry) = old.as_ref() {
+ debug_assert!({
+ // The head is not already set to the entry
+ ptr != &***entry as *const _
+ });
+
+ // Set the previous link on the old head
+ *entry.prev_stack.get() = ptr;
+ }
+
+ // Set this entry's next pointer
+ *entry.next_stack.get() = old;
+ }
+
+ // Update the head pointer
+ self.head = Some(entry);
+ }
+
+ /// Pops an item from the stack.
+ pub(crate) fn pop(&mut self) -> Option<OwnedItem> {
+ let entry = self.head.take();
+
+ unsafe {
+ if let Some(entry) = entry.as_ref() {
+ self.head = (*entry.next_stack.get()).take();
+
+ if let Some(entry) = self.head.as_ref() {
+ *entry.prev_stack.get() = ptr::null();
+ }
+
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+
+ entry
+ }
+
+ pub(crate) fn remove(&mut self, entry: &Item) {
+ unsafe {
+ // Ensure that the entry is in fact contained by the stack
+ debug_assert!({
+ // This walks the full linked list even if an entry is found.
+ let mut next = self.head.as_ref();
+ let mut contains = false;
+
+ while let Some(n) = next {
+ if entry as *const _ == &**n as *const _ {
+ debug_assert!(!contains);
+ contains = true;
+ }
+
+ next = (*n.next_stack.get()).as_ref();
+ }
+
+ contains
+ });
+
+ // Unlink `entry` from the next node
+ let next = (*entry.next_stack.get()).take();
+
+ if let Some(next) = next.as_ref() {
+ (*next.prev_stack.get()) = *entry.prev_stack.get();
+ }
+
+ // Unlink `entry` from the prev node
+
+ if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
+ *prev.next_stack.get() = next;
+ } else {
+ // It is the head
+ self.head = next;
+ }
+
+ // Unset the prev pointer
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+}