diff options
Diffstat (limited to 'third_party/rust/tokio/src/time')
-rw-r--r-- | third_party/rust/tokio/src/time/clock.rs | 233 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/entry.rs | 633 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/handle.rs | 94 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/mod.rs | 528 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/sleep.rs | 438 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/tests/mod.rs | 301 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/level.rs | 276 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/mod.rs | 359 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/stack.rs | 112 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/error.rs | 120 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/instant.rs | 223 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/interval.rs | 531 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/mod.rs | 114 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/tests/mod.rs | 22 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/tests/test_sleep.rs | 443 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/timeout.rs | 202 |
16 files changed, 4629 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/clock.rs b/third_party/rust/tokio/src/time/clock.rs new file mode 100644 index 0000000000..41be9bac48 --- /dev/null +++ b/third_party/rust/tokio/src/time/clock.rs @@ -0,0 +1,233 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Source of time abstraction. +//! +//! By default, `std::time::Instant::now()` is used. However, when the +//! `test-util` feature flag is enabled, the values returned for `now()` are +//! configurable. + +cfg_not_test_util! { + use crate::time::{Instant}; + + #[derive(Debug, Clone)] + pub(crate) struct Clock {} + + pub(crate) fn now() -> Instant { + Instant::from_std(std::time::Instant::now()) + } + + impl Clock { + pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock { + Clock {} + } + + pub(crate) fn now(&self) -> Instant { + now() + } + } +} + +cfg_test_util! { + use crate::time::{Duration, Instant}; + use crate::loom::sync::{Arc, Mutex}; + + cfg_rt! { + fn clock() -> Option<Clock> { + crate::runtime::context::clock() + } + } + + cfg_not_rt! { + fn clock() -> Option<Clock> { + None + } + } + + /// A handle to a source of time. + #[derive(Debug, Clone)] + pub(crate) struct Clock { + inner: Arc<Mutex<Inner>>, + } + + #[derive(Debug)] + struct Inner { + /// True if the ability to pause time is enabled. + enable_pausing: bool, + + /// Instant to use as the clock's base instant. + base: std::time::Instant, + + /// Instant at which the clock was last unfrozen. + unfrozen: Option<std::time::Instant>, + } + + /// Pauses time. + /// + /// The current value of `Instant::now()` is saved and all subsequent calls + /// to `Instant::now()` will return the saved value. The saved value can be + /// changed by [`advance`] or by the time auto-advancing once the runtime + /// has no work to do. This only affects the `Instant` type in Tokio, and + /// the `Instant` in std continues to work as normal. + /// + /// Pausing time requires the `current_thread` Tokio runtime. This is the + /// default runtime used by `#[tokio::test]`. The runtime can be initialized + /// with time in a paused state using the `Builder::start_paused` method. + /// + /// For cases where time is immediately paused, it is better to pause + /// the time using the `main` or `test` macro: + /// ``` + /// #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// println!("Hello world"); + /// } + /// ``` + /// + /// # Panics + /// + /// Panics if time is already frozen or if called from outside of a + /// `current_thread` Tokio runtime. + /// + /// # Auto-advance + /// + /// If time is paused and the runtime has no work to do, the clock is + /// auto-advanced to the next pending timer. This means that [`Sleep`] or + /// other timer-backed primitives can cause the runtime to advance the + /// current time when awaited. + /// + /// [`Sleep`]: crate::time::Sleep + /// [`advance`]: crate::time::advance + pub fn pause() { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + clock.pause(); + } + + /// Resumes time. + /// + /// Clears the saved `Instant::now()` value. Subsequent calls to + /// `Instant::now()` will return the value returned by the system call. + /// + /// # Panics + /// + /// Panics if time is not frozen or if called from outside of the Tokio + /// runtime. + pub fn resume() { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + let mut inner = clock.inner.lock(); + + if inner.unfrozen.is_some() { + panic!("time is not frozen"); + } + + inner.unfrozen = Some(std::time::Instant::now()); + } + + /// Advances time. + /// + /// Increments the saved `Instant::now()` value by `duration`. Subsequent + /// calls to `Instant::now()` will return the result of the increment. + /// + /// This function will make the current time jump forward by the given + /// duration in one jump. This means that all `sleep` calls with a deadline + /// before the new time will immediately complete "at the same time", and + /// the runtime is free to poll them in any order. Additionally, this + /// method will not wait for the `sleep` calls it advanced past to complete. + /// If you want to do that, you should instead call [`sleep`] and rely on + /// the runtime's auto-advance feature. + /// + /// Note that calls to `sleep` are not guaranteed to complete the first time + /// they are polled after a call to `advance`. For example, this can happen + /// if the runtime has not yet touched the timer driver after the call to + /// `advance`. However if they don't, the runtime will poll the task again + /// shortly. + /// + /// # Panics + /// + /// Panics if time is not frozen or if called from outside of the Tokio + /// runtime. + /// + /// # Auto-advance + /// + /// If the time is paused and there is no work to do, the runtime advances + /// time to the next timer. See [`pause`](pause#auto-advance) for more + /// details. + /// + /// [`sleep`]: fn@crate::time::sleep + pub async fn advance(duration: Duration) { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + clock.advance(duration); + + crate::task::yield_now().await; + } + + /// Returns the current instant, factoring in frozen time. + pub(crate) fn now() -> Instant { + if let Some(clock) = clock() { + clock.now() + } else { + Instant::from_std(std::time::Instant::now()) + } + } + + impl Clock { + /// Returns a new `Clock` instance that uses the current execution context's + /// source of time. + pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { + let now = std::time::Instant::now(); + + let clock = Clock { + inner: Arc::new(Mutex::new(Inner { + enable_pausing, + base: now, + unfrozen: Some(now), + })), + }; + + if start_paused { + clock.pause(); + } + + clock + } + + pub(crate) fn pause(&self) { + let mut inner = self.inner.lock(); + + if !inner.enable_pausing { + drop(inner); // avoid poisoning the lock + panic!("`time::pause()` requires the `current_thread` Tokio runtime. \ + This is the default Runtime used by `#[tokio::test]."); + } + + let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed(); + inner.base += elapsed; + inner.unfrozen = None; + } + + pub(crate) fn is_paused(&self) -> bool { + let inner = self.inner.lock(); + inner.unfrozen.is_none() + } + + pub(crate) fn advance(&self, duration: Duration) { + let mut inner = self.inner.lock(); + + if inner.unfrozen.is_some() { + panic!("time is not frozen"); + } + + inner.base += duration; + } + + pub(crate) fn now(&self) -> Instant { + let inner = self.inner.lock(); + + let mut ret = inner.base; + + if let Some(unfrozen) = inner.unfrozen { + ret += unfrozen.elapsed(); + } + + Instant::from_std(ret) + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/entry.rs b/third_party/rust/tokio/src/time/driver/entry.rs new file mode 100644 index 0000000000..f0ea898e12 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/entry.rs @@ -0,0 +1,633 @@ +//! Timer state structures. +//! +//! This module contains the heart of the intrusive timer implementation, and as +//! such the structures inside are full of tricky concurrency and unsafe code. +//! +//! # Ground rules +//! +//! The heart of the timer implementation here is the [`TimerShared`] structure, +//! shared between the [`TimerEntry`] and the driver. Generally, we permit access +//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or +//! 2) a held driver lock. +//! +//! It follows from this that any changes made while holding BOTH 1 and 2 will +//! be reliably visible, regardless of ordering. This is because of the acq/rel +//! fences on the driver lock ensuring ordering with 2, and rust mutable +//! reference rules for 1 (a mutable reference to an object can't be passed +//! between threads without an acq/rel barrier, and same-thread we have local +//! happens-before ordering). +//! +//! # State field +//! +//! Each timer has a state field associated with it. This field contains either +//! the current scheduled time, or a special flag value indicating its state. +//! This state can either indicate that the timer is on the 'pending' queue (and +//! thus will be fired with an `Ok(())` result soon) or that it has already been +//! fired/deregistered. +//! +//! This single state field allows for code that is firing the timer to +//! synchronize with any racing `reset` calls reliably. +//! +//! # Cached vs true timeouts +//! +//! To allow for the use case of a timeout that is periodically reset before +//! expiration to be as lightweight as possible, we support optimistically +//! lock-free timer resets, in the case where a timer is rescheduled to a later +//! point than it was originally scheduled for. +//! +//! This is accomplished by lazily rescheduling timers. That is, we update the +//! state field field with the true expiration of the timer from the holder of +//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's +//! walking lists of timers), it checks this "true when" value, and reschedules +//! based on it. +//! +//! We do, however, also need to track what the expiration time was when we +//! originally registered the timer; this is used to locate the right linked +//! list when the timer is being cancelled. This is referred to as the "cached +//! when" internally. +//! +//! There is of course a race condition between timer reset and timer +//! expiration. If the driver fails to observe the updated expiration time, it +//! could trigger expiration of the timer too early. However, because +//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and +//! refuse to mark the timer as pending. +//! +//! [mark_pending]: TimerHandle::mark_pending + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU64; +use crate::loom::sync::atomic::Ordering; + +use crate::sync::AtomicWaker; +use crate::time::Instant; +use crate::util::linked_list; + +use super::Handle; + +use std::cell::UnsafeCell as StdUnsafeCell; +use std::task::{Context, Poll, Waker}; +use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; + +type TimerResult = Result<(), crate::time::error::Error>; + +const STATE_DEREGISTERED: u64 = u64::MAX; +const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; +const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; + +/// This structure holds the current shared state of the timer - its scheduled +/// time (if registered), or otherwise the result of the timer completing, as +/// well as the registered waker. +/// +/// Generally, the StateCell is only permitted to be accessed from two contexts: +/// Either a thread holding the corresponding &mut TimerEntry, or a thread +/// holding the timer driver lock. The write actions on the StateCell amount to +/// passing "ownership" of the StateCell between these contexts; moving a timer +/// from the TimerEntry to the driver requires _both_ holding the &mut +/// TimerEntry and the driver lock, while moving it back (firing the timer) +/// requires only the driver lock. +pub(super) struct StateCell { + /// Holds either the scheduled expiration time for this timer, or (if the + /// timer has been fired and is unregistered), `u64::MAX`. + state: AtomicU64, + /// If the timer is fired (an Acquire order read on state shows + /// `u64::MAX`), holds the result that should be returned from + /// polling the timer. Otherwise, the contents are unspecified and reading + /// without holding the driver lock is undefined behavior. + result: UnsafeCell<TimerResult>, + /// The currently-registered waker + waker: CachePadded<AtomicWaker>, +} + +impl Default for StateCell { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for StateCell { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "StateCell({:?})", self.read_state()) + } +} + +impl StateCell { + fn new() -> Self { + Self { + state: AtomicU64::new(STATE_DEREGISTERED), + result: UnsafeCell::new(Ok(())), + waker: CachePadded(AtomicWaker::new()), + } + } + + fn is_pending(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE + } + + /// Returns the current expiration time, or None if not currently scheduled. + fn when(&self) -> Option<u64> { + let cur_state = self.state.load(Ordering::Relaxed); + + if cur_state == u64::MAX { + None + } else { + Some(cur_state) + } + } + + /// If the timer is completed, returns the result of the timer. Otherwise, + /// returns None and registers the waker. + fn poll(&self, waker: &Waker) -> Poll<TimerResult> { + // We must register first. This ensures that either `fire` will + // observe the new waker, or we will observe a racing fire to have set + // the state, or both. + self.waker.0.register_by_ref(waker); + + self.read_state() + } + + fn read_state(&self) -> Poll<TimerResult> { + let cur_state = self.state.load(Ordering::Acquire); + + if cur_state == STATE_DEREGISTERED { + // SAFETY: The driver has fired this timer; this involves writing + // the result, and then writing (with release ordering) the state + // field. + Poll::Ready(unsafe { self.result.with(|p| *p) }) + } else { + Poll::Pending + } + } + + /// Marks this timer as being moved to the pending list, if its scheduled + /// time is not after `not_after`. + /// + /// If the timer is scheduled for a time after not_after, returns an Err + /// containing the current scheduled time. + /// + /// SAFETY: Must hold the driver lock. + unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + // Quick initial debug check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let mut cur_state = self.state.load(Ordering::Relaxed); + + loop { + debug_assert!(cur_state < STATE_MIN_VALUE); + + if cur_state > not_after { + break Err(cur_state); + } + + match self.state.compare_exchange( + cur_state, + STATE_PENDING_FIRE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + break Ok(()); + } + Err(actual_state) => { + cur_state = actual_state; + } + } + } + } + + /// Fires the timer, setting the result to the provided result. + /// + /// Returns: + /// * `Some(waker) - if fired and a waker needs to be invoked once the + /// driver lock is released + /// * `None` - if fired and a waker does not need to be invoked, or if + /// already fired + /// + /// SAFETY: The driver lock must be held. + unsafe fn fire(&self, result: TimerResult) -> Option<Waker> { + // Quick initial check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let cur_state = self.state.load(Ordering::Relaxed); + if cur_state == STATE_DEREGISTERED { + return None; + } + + // SAFETY: We assume the driver lock is held and the timer is not + // fired, so only the driver is accessing this field. + // + // We perform a release-ordered store to state below, to ensure this + // write is visible before the state update is visible. + unsafe { self.result.with_mut(|p| *p = result) }; + + self.state.store(STATE_DEREGISTERED, Ordering::Release); + + self.waker.0.take_waker() + } + + /// Marks the timer as registered (poll will return None) and sets the + /// expiration time. + /// + /// While this function is memory-safe, it should only be called from a + /// context holding both `&mut TimerEntry` and the driver lock. + fn set_expiration(&self, timestamp: u64) { + debug_assert!(timestamp < STATE_MIN_VALUE); + + // We can use relaxed ordering because we hold the driver lock and will + // fence when we release the lock. + self.state.store(timestamp, Ordering::Relaxed); + } + + /// Attempts to adjust the timer to a new timestamp. + /// + /// If the timer has already been fired, is pending firing, or the new + /// timestamp is earlier than the old timestamp, (or occasionally + /// spuriously) returns Err without changing the timer's state. In this + /// case, the timer must be deregistered and re-registered. + fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { + let mut prior = self.state.load(Ordering::Relaxed); + loop { + if new_timestamp < prior || prior >= STATE_MIN_VALUE { + return Err(()); + } + + match self.state.compare_exchange_weak( + prior, + new_timestamp, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + return Ok(()); + } + Err(true_prior) => { + prior = true_prior; + } + } + } + } + + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.load(Ordering::Relaxed) != u64::MAX + } +} + +/// A timer entry. +/// +/// This is the handle to a timer that is controlled by the requester of the +/// timer. As this participates in intrusive data structures, it must be pinned +/// before polling. +#[derive(Debug)] +pub(super) struct TimerEntry { + /// Arc reference to the driver. We can only free the driver after + /// deregistering everything from their respective timer wheels. + driver: Handle, + /// Shared inner structure; this is part of an intrusive linked list, and + /// therefore other references can exist to it while mutable references to + /// Entry exist. + /// + /// This is manipulated only under the inner mutex. TODO: Can we use loom + /// cells for this? + inner: StdUnsafeCell<TimerShared>, + /// Initial deadline for the timer. This is used to register on the first + /// poll, as we can't register prior to being pinned. + initial_deadline: Option<Instant>, + /// Ensure the type is !Unpin + _m: std::marker::PhantomPinned, +} + +unsafe impl Send for TimerEntry {} +unsafe impl Sync for TimerEntry {} + +/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the +/// timer entry. Generally, at most one TimerHandle exists for a timer at a time +/// (enforced by the timer state machine). +/// +/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats +/// of pointer safety apply. In particular, TimerHandle does not itself enforce +/// that the timer does still exist; however, normally an TimerHandle is created +/// immediately before registering the timer, and is consumed when firing the +/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce +/// memory safety, all operations are unsafe. +#[derive(Debug)] +pub(crate) struct TimerHandle { + inner: NonNull<TimerShared>, +} + +pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>; + +/// The shared state structure of a timer. This structure is shared between the +/// frontend (`Entry`) and driver backend. +/// +/// Note that this structure is located inside the `TimerEntry` structure. +#[derive(Debug)] +#[repr(C)] // required by `link_list::Link` impl +pub(crate) struct TimerShared { + /// Data manipulated by the driver thread itself, only. + driver_state: CachePadded<TimerSharedPadded>, + + /// Current state. This records whether the timer entry is currently under + /// the ownership of the driver, and if not, its current state (not + /// complete, fired, error, etc). + state: StateCell, + + _p: PhantomPinned, +} + +impl TimerShared { + pub(super) fn new() -> Self { + Self { + state: StateCell::default(), + driver_state: CachePadded(TimerSharedPadded::new()), + _p: PhantomPinned, + } + } + + /// Gets the cached time-of-expiration value. + pub(super) fn cached_when(&self) -> u64 { + // Cached-when is only accessed under the driver lock, so we can use relaxed + self.driver_state.0.cached_when.load(Ordering::Relaxed) + } + + /// Gets the true time-of-expiration value, and copies it into the cached + /// time-of-expiration value. + /// + /// SAFETY: Must be called with the driver lock held, and when this entry is + /// not in any timer wheel lists. + pub(super) unsafe fn sync_when(&self) -> u64 { + let true_when = self.true_when(); + + self.driver_state + .0 + .cached_when + .store(true_when, Ordering::Relaxed); + + true_when + } + + /// Sets the cached time-of-expiration value. + /// + /// SAFETY: Must be called with the driver lock held, and when this entry is + /// not in any timer wheel lists. + unsafe fn set_cached_when(&self, when: u64) { + self.driver_state + .0 + .cached_when + .store(when, Ordering::Relaxed); + } + + /// Returns the true time-of-expiration value, with relaxed memory ordering. + pub(super) fn true_when(&self) -> u64 { + self.state.when().expect("Timer already fired") + } + + /// Sets the true time-of-expiration value, even if it is less than the + /// current expiration or the timer is deregistered. + /// + /// SAFETY: Must only be called with the driver lock held and the entry not + /// in the timer wheel. + pub(super) unsafe fn set_expiration(&self, t: u64) { + self.state.set_expiration(t); + self.driver_state.0.cached_when.store(t, Ordering::Relaxed); + } + + /// Sets the true time-of-expiration only if it is after the current. + pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { + self.state.extend_expiration(t) + } + + /// Returns a TimerHandle for this timer. + pub(super) fn handle(&self) -> TimerHandle { + TimerHandle { + inner: NonNull::from(self), + } + } + + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.might_be_registered() + } +} + +/// Additional shared state between the driver and the timer which is cache +/// padded. This contains the information that the driver thread accesses most +/// frequently to minimize contention. In particular, we move it away from the +/// waker, as the waker is updated on every poll. +#[repr(C)] // required by `link_list::Link` impl +struct TimerSharedPadded { + /// A link within the doubly-linked list of timers on a particular level and + /// slot. Valid only if state is equal to Registered. + /// + /// Only accessed under the entry lock. + pointers: linked_list::Pointers<TimerShared>, + + /// The expiration time for which this entry is currently registered. + /// Generally owned by the driver, but is accessed by the entry when not + /// registered. + cached_when: AtomicU64, + + /// The true expiration time. Set by the timer future, read by the driver. + true_when: AtomicU64, +} + +impl std::fmt::Debug for TimerSharedPadded { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimerSharedPadded") + .field("when", &self.true_when.load(Ordering::Relaxed)) + .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) + .finish() + } +} + +impl TimerSharedPadded { + fn new() -> Self { + Self { + cached_when: AtomicU64::new(0), + true_when: AtomicU64::new(0), + pointers: linked_list::Pointers::new(), + } + } +} + +unsafe impl Send for TimerShared {} +unsafe impl Sync for TimerShared {} + +unsafe impl linked_list::Link for TimerShared { + type Handle = TimerHandle; + + type Target = TimerShared; + + fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> { + handle.inner + } + + unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle { + TimerHandle { inner: ptr } + } + + unsafe fn pointers( + target: NonNull<Self::Target>, + ) -> NonNull<linked_list::Pointers<Self::Target>> { + target.cast() + } +} + +// ===== impl Entry ===== + +impl TimerEntry { + pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { + let driver = handle.clone(); + + Self { + driver, + inner: StdUnsafeCell::new(TimerShared::new()), + initial_deadline: Some(deadline), + _m: std::marker::PhantomPinned, + } + } + + fn inner(&self) -> &TimerShared { + unsafe { &*self.inner.get() } + } + + pub(crate) fn is_elapsed(&self) -> bool { + !self.inner().state.might_be_registered() && self.initial_deadline.is_none() + } + + /// Cancels and deregisters the timer. This operation is irreversible. + pub(crate) fn cancel(self: Pin<&mut Self>) { + // We need to perform an acq/rel fence with the driver thread, and the + // simplest way to do so is to grab the driver lock. + // + // Why is this necessary? We're about to release this timer's memory for + // some other non-timer use. However, we've been doing a bunch of + // relaxed (or even non-atomic) writes from the driver thread, and we'll + // be doing more from _this thread_ (as this memory is interpreted as + // something else). + // + // It is critical to ensure that, from the point of view of the driver, + // those future non-timer writes happen-after the timer is fully fired, + // and from the purpose of this thread, the driver's writes all + // happen-before we drop the timer. This in turn requires us to perform + // an acquire-release barrier in _both_ directions between the driver + // and dropping thread. + // + // The lock acquisition in clear_entry serves this purpose. All of the + // driver manipulations happen with the lock held, so we can just take + // the lock and be sure that this drop happens-after everything the + // driver did so far and happens-before everything the driver does in + // the future. While we have the lock held, we also go ahead and + // deregister the entry if necessary. + unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; + } + + pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { + unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; + + let tick = self.driver.time_source().deadline_to_tick(new_time); + + if self.inner().extend_expiration(tick).is_ok() { + return; + } + + unsafe { + self.driver.reregister(tick, self.inner().into()); + } + } + + pub(crate) fn poll_elapsed( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), super::Error>> { + if self.driver.is_shutdown() { + panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); + } + + if let Some(deadline) = self.initial_deadline { + self.as_mut().reset(deadline); + } + + let this = unsafe { self.get_unchecked_mut() }; + + this.inner().state.poll(cx.waker()) + } +} + +impl TimerHandle { + pub(super) unsafe fn cached_when(&self) -> u64 { + unsafe { self.inner.as_ref().cached_when() } + } + + pub(super) unsafe fn sync_when(&self) -> u64 { + unsafe { self.inner.as_ref().sync_when() } + } + + pub(super) unsafe fn is_pending(&self) -> bool { + unsafe { self.inner.as_ref().state.is_pending() } + } + + /// Forcibly sets the true and cached expiration times to the given tick. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + pub(super) unsafe fn set_expiration(&self, tick: u64) { + self.inner.as_ref().set_expiration(tick); + } + + /// Attempts to mark this entry as pending. If the expiration time is after + /// `not_after`, however, returns an Err with the current expiration time. + /// + /// If an `Err` is returned, the `cached_when` value will be updated to this + /// new expiration time. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + /// After returning Ok, the entry must be added to the pending list. + pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + match self.inner.as_ref().state.mark_pending(not_after) { + Ok(()) => { + // mark this as being on the pending queue in cached_when + self.inner.as_ref().set_cached_when(u64::MAX); + Ok(()) + } + Err(tick) => { + self.inner.as_ref().set_cached_when(tick); + Err(tick) + } + } + } + + /// Attempts to transition to a terminal state. If the state is already a + /// terminal state, does nothing. + /// + /// Because the entry might be dropped after the state is moved to a + /// terminal state, this function consumes the handle to ensure we don't + /// access the entry afterwards. + /// + /// Returns the last-registered waker, if any. + /// + /// SAFETY: The driver lock must be held while invoking this function, and + /// the entry must not be in any wheel linked lists. + pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> { + self.inner.as_ref().state.fire(completed_state) + } +} + +impl Drop for TimerEntry { + fn drop(&mut self) { + unsafe { Pin::new_unchecked(self) }.as_mut().cancel() + } +} + +#[cfg_attr(target_arch = "x86_64", repr(align(128)))] +#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] +#[derive(Debug, Default)] +struct CachePadded<T>(T); diff --git a/third_party/rust/tokio/src/time/driver/handle.rs b/third_party/rust/tokio/src/time/driver/handle.rs new file mode 100644 index 0000000000..b61c0476e1 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/handle.rs @@ -0,0 +1,94 @@ +use crate::loom::sync::Arc; +use crate::time::driver::ClockTime; +use std::fmt; + +/// Handle to time driver instance. +#[derive(Clone)] +pub(crate) struct Handle { + time_source: ClockTime, + inner: Arc<super::Inner>, +} + +impl Handle { + /// Creates a new timer `Handle` from a shared `Inner` timer state. + pub(super) fn new(inner: Arc<super::Inner>) -> Self { + let time_source = inner.state.lock().time_source.clone(); + Handle { time_source, inner } + } + + /// Returns the time source associated with this handle. + pub(super) fn time_source(&self) -> &ClockTime { + &self.time_source + } + + /// Access the driver's inner structure. + pub(super) fn get(&self) -> &super::Inner { + &*self.inner + } + + /// Checks whether the driver has been shutdown. + pub(super) fn is_shutdown(&self) -> bool { + self.inner.is_shutdown() + } +} + +cfg_rt! { + impl Handle { + /// Tries to get a handle to the current timer. + /// + /// # Panics + /// + /// This function panics if there is no current timer set. + /// + /// It can be triggered when [`Builder::enable_time`] or + /// [`Builder::enable_all`] are not included in the builder. + /// + /// It can also panic whenever a timer is created outside of a + /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, + /// since the function is executed outside of the runtime. + /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. + /// And this is because wrapping the function on an async makes it lazy, + /// and so gets executed inside the runtime successfully without + /// panicking. + /// + /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time + /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all + pub(crate) fn current() -> Self { + crate::runtime::context::time_handle() + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + } + } +} + +cfg_not_rt! { + impl Handle { + /// Tries to get a handle to the current timer. + /// + /// # Panics + /// + /// This function panics if there is no current timer set. + /// + /// It can be triggered when [`Builder::enable_time`] or + /// [`Builder::enable_all`] are not included in the builder. + /// + /// It can also panic whenever a timer is created outside of a + /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, + /// since the function is executed outside of the runtime. + /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. + /// And this is because wrapping the function on an async makes it lazy, + /// and so gets executed inside the runtime successfully without + /// panicking. + /// + /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time + /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all + pub(crate) fn current() -> Self { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} diff --git a/third_party/rust/tokio/src/time/driver/mod.rs b/third_party/rust/tokio/src/time/driver/mod.rs new file mode 100644 index 0000000000..9971877479 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/mod.rs @@ -0,0 +1,528 @@ +// Currently, rust warns when an unsafe fn contains an unsafe {} block. However, +// in the future, this will change to the reverse. For now, suppress this +// warning and generally stick with being explicit about unsafety. +#![allow(unused_unsafe)] +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Time driver. + +mod entry; +pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; + +mod handle; +pub(crate) use self::handle::Handle; + +mod wheel; + +pub(super) mod sleep; + +use crate::loom::sync::atomic::{AtomicBool, Ordering}; +use crate::loom::sync::{Arc, Mutex}; +use crate::park::{Park, Unpark}; +use crate::time::error::Error; +use crate::time::{Clock, Duration, Instant}; + +use std::convert::TryInto; +use std::fmt; +use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; + +/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. +/// +/// A `Driver` instance tracks the state necessary for managing time and +/// notifying the [`Sleep`][sleep] instances once their deadlines are reached. +/// +/// It is expected that a single instance manages many individual [`Sleep`][sleep] +/// instances. The `Driver` implementation is thread-safe and, as such, is able +/// to handle callers from across threads. +/// +/// After creating the `Driver` instance, the caller must repeatedly call `park` +/// or `park_timeout`. The time driver will perform no work unless `park` or +/// `park_timeout` is called repeatedly. +/// +/// The driver has a resolution of one millisecond. Any unit of time that falls +/// between milliseconds are rounded up to the next millisecond. +/// +/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not +/// elapsed will be notified with an error. At this point, calling `poll` on the +/// [`Sleep`][sleep] instance will result in panic. +/// +/// # Implementation +/// +/// The time driver is based on the [paper by Varghese and Lauck][paper]. +/// +/// A hashed timing wheel is a vector of slots, where each slot handles a time +/// slice. As time progresses, the timer walks over the slot for the current +/// instant, and processes each entry for that slot. When the timer reaches the +/// end of the wheel, it starts again at the beginning. +/// +/// The implementation maintains six wheels arranged in a set of levels. As the +/// levels go up, the slots of the associated wheel represent larger intervals +/// of time. At each level, the wheel has 64 slots. Each slot covers a range of +/// time equal to the wheel at the lower level. At level zero, each slot +/// represents one millisecond of time. +/// +/// The wheels are: +/// +/// * Level 0: 64 x 1 millisecond slots. +/// * Level 1: 64 x 64 millisecond slots. +/// * Level 2: 64 x ~4 second slots. +/// * Level 3: 64 x ~4 minute slots. +/// * Level 4: 64 x ~4 hour slots. +/// * Level 5: 64 x ~12 day slots. +/// +/// When the timer processes entries at level zero, it will notify all the +/// `Sleep` instances as their deadlines have been reached. For all higher +/// levels, all entries will be redistributed across the wheel at the next level +/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will +/// either be canceled (dropped) or their associated entries will reach level +/// zero and be notified. +/// +/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf +/// [sleep]: crate::time::Sleep +/// [timeout]: crate::time::Timeout +/// [interval]: crate::time::Interval +#[derive(Debug)] +pub(crate) struct Driver<P: Park + 'static> { + /// Timing backend in use. + time_source: ClockTime, + + /// Shared state. + handle: Handle, + + /// Parker to delegate to. + park: P, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, +} + +/// A structure which handles conversion from Instants to u64 timestamps. +#[derive(Debug, Clone)] +pub(self) struct ClockTime { + clock: super::clock::Clock, + start_time: Instant, +} + +impl ClockTime { + pub(self) fn new(clock: Clock) -> Self { + Self { + start_time: clock.now(), + clock, + } + } + + pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { + // Round up to the end of a ms + self.instant_to_tick(t + Duration::from_nanos(999_999)) + } + + pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { + // round up + let dur: Duration = t + .checked_duration_since(self.start_time) + .unwrap_or_else(|| Duration::from_secs(0)); + let ms = dur.as_millis(); + + ms.try_into().unwrap_or(u64::MAX) + } + + pub(self) fn tick_to_duration(&self, t: u64) -> Duration { + Duration::from_millis(t) + } + + pub(self) fn now(&self) -> u64 { + self.instant_to_tick(self.clock.now()) + } +} + +/// Timer state shared between `Driver`, `Handle`, and `Registration`. +struct Inner { + // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex + pub(super) state: Mutex<InnerState>, + + /// True if the driver is being shutdown. + pub(super) is_shutdown: AtomicBool, +} + +/// Time state shared which must be protected by a `Mutex` +struct InnerState { + /// Timing backend in use. + time_source: ClockTime, + + /// The last published timer `elapsed` value. + elapsed: u64, + + /// The earliest time at which we promise to wake up without unparking. + next_wake: Option<NonZeroU64>, + + /// Timer wheel. + wheel: wheel::Wheel, + + /// Unparker that can be used to wake the time driver. + unpark: Box<dyn Unpark>, +} + +// ===== impl Driver ===== + +impl<P> Driver<P> +where + P: Park + 'static, +{ + /// Creates a new `Driver` instance that uses `park` to block the current + /// thread and `time_source` to get the current time and convert to ticks. + /// + /// Specifying the source of time is useful when testing. + pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { + let time_source = ClockTime::new(clock); + + let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); + + Driver { + time_source, + handle: Handle::new(Arc::new(inner)), + park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), + } + } + + /// Returns a handle to the timer. + /// + /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances + /// can either be created directly or the `Handle` instance can be passed to + /// `with_default`, setting the timer as the default timer for the execution + /// context. + pub(crate) fn handle(&self) -> Handle { + self.handle.clone() + } + + fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { + let mut lock = self.handle.get().state.lock(); + + assert!(!self.handle.is_shutdown()); + + let next_wake = lock.wheel.next_expiration_time(); + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + + drop(lock); + + match next_wake { + Some(when) => { + let now = self.time_source.now(); + // Note that we effectively round up to 1ms here - this avoids + // very short-duration microsecond-resolution sleeps that the OS + // might treat as zero-length. + let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); + + if duration > Duration::from_millis(0) { + if let Some(limit) = limit { + duration = std::cmp::min(limit, duration); + } + + self.park_timeout(duration)?; + } else { + self.park.park_timeout(Duration::from_secs(0))?; + } + } + None => { + if let Some(duration) = limit { + self.park_timeout(duration)?; + } else { + self.park.park()?; + } + } + } + + // Process pending timers after waking up + self.handle.process(); + + Ok(()) + } + + cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) + } + } +} + +impl Handle { + /// Runs timer related logic, and returns the next wakeup time + pub(self) fn process(&self) { + let now = self.time_source().now(); + + self.process_at_time(now) + } + + pub(self) fn process_at_time(&self, mut now: u64) { + let mut waker_list: [Option<Waker>; 32] = Default::default(); + let mut waker_idx = 0; + + let mut lock = self.get().lock(); + + if now < lock.elapsed { + // Time went backwards! This normally shouldn't happen as the Rust language + // guarantees that an Instant is monotonic, but can happen when running + // Linux in a VM on a Windows host due to std incorrectly trusting the + // hardware clock to be monotonic. + // + // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. + now = lock.elapsed; + } + + while let Some(entry) = lock.wheel.poll(now) { + debug_assert!(unsafe { entry.is_pending() }); + + // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. + if let Some(waker) = unsafe { entry.fire(Ok(())) } { + waker_list[waker_idx] = Some(waker); + + waker_idx += 1; + + if waker_idx == waker_list.len() { + // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. + drop(lock); + + for waker in waker_list.iter_mut() { + waker.take().unwrap().wake(); + } + + waker_idx = 0; + + lock = self.get().lock(); + } + } + } + + // Update the elapsed cache + lock.elapsed = lock.wheel.elapsed(); + lock.next_wake = lock + .wheel + .poll_at() + .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + + drop(lock); + + for waker in waker_list[0..waker_idx].iter_mut() { + waker.take().unwrap().wake(); + } + } + + /// Removes a registered timer from the driver. + /// + /// The timer will be moved to the cancelled state. Wakers will _not_ be + /// invoked. If the timer is already completed, this function is a no-op. + /// + /// This function always acquires the driver lock, even if the entry does + /// not appear to be registered. + /// + /// SAFETY: The timer must not be registered with some other driver, and + /// `add_entry` must not be called concurrently. + pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { + unsafe { + let mut lock = self.get().lock(); + + if entry.as_ref().might_be_registered() { + lock.wheel.remove(entry); + } + + entry.as_ref().handle().fire(Ok(())); + } + } + + /// Removes and re-adds an entry to the driver. + /// + /// SAFETY: The timer must be either unregistered, or registered with this + /// driver. No other threads are allowed to concurrently manipulate the + /// timer at all (the current thread should hold an exclusive reference to + /// the `TimerEntry`) + pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { + let waker = unsafe { + let mut lock = self.get().lock(); + + // We may have raced with a firing/deregistration, so check before + // deregistering. + if unsafe { entry.as_ref().might_be_registered() } { + lock.wheel.remove(entry); + } + + // Now that we have exclusive control of this entry, mint a handle to reinsert it. + let entry = entry.as_ref().handle(); + + if self.is_shutdown() { + unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } + } else { + entry.set_expiration(new_tick); + + // Note: We don't have to worry about racing with some other resetting + // thread, because add_entry and reregister require exclusive control of + // the timer entry. + match unsafe { lock.wheel.insert(entry) } { + Ok(when) => { + if lock + .next_wake + .map(|next_wake| when < next_wake.get()) + .unwrap_or(true) + { + lock.unpark.unpark(); + } + + None + } + Err((entry, super::error::InsertError::Elapsed)) => unsafe { + entry.fire(Ok(())) + }, + } + } + + // Must release lock before invoking waker to avoid the risk of deadlock. + }; + + // The timer was fired synchronously as a result of the reregistration. + // Wake the waker; this is needed because we might reset _after_ a poll, + // and otherwise the task won't be awoken to poll again. + if let Some(waker) = waker { + waker.wake(); + } + } +} + +impl<P> Park for Driver<P> +where + P: Park + 'static, +{ + type Unpark = TimerUnpark<P>; + type Error = P::Error; + + fn unpark(&self) -> Self::Unpark { + TimerUnpark::new(self) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park_internal(None) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park_internal(Some(duration)) + } + + fn shutdown(&mut self) { + if self.handle.is_shutdown() { + return; + } + + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); + + // Advance time forward to the end of time. + + self.handle.process_at_time(u64::MAX); + + self.park.shutdown(); + } +} + +impl<P> Drop for Driver<P> +where + P: Park + 'static, +{ + fn drop(&mut self) { + self.shutdown(); + } +} + +pub(crate) struct TimerUnpark<P: Park + 'static> { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, +} + +impl<P: Park + 'static> TimerUnpark<P> { + fn new(driver: &Driver<P>) -> TimerUnpark<P> { + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl<P: Park + 'static> Unpark for TimerUnpark<P> { + fn unpark(&self) { + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); + } +} + +// ===== impl Inner ===== + +impl Inner { + pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { + Inner { + state: Mutex::new(InnerState { + time_source, + elapsed: 0, + next_wake: None, + unpark, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), + } + } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { + self.state.lock() + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Inner").finish() + } +} + +#[cfg(test)] +mod tests; diff --git a/third_party/rust/tokio/src/time/driver/sleep.rs b/third_party/rust/tokio/src/time/driver/sleep.rs new file mode 100644 index 0000000000..7f27ef201f --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/sleep.rs @@ -0,0 +1,438 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::time::driver::ClockTime; +use crate::time::driver::{Handle, TimerEntry}; +use crate::time::{error::Error, Duration, Instant}; +use crate::util::trace; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::panic::Location; +use std::pin::Pin; +use std::task::{self, Poll}; + +/// Waits until `deadline` is reached. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep_until, Instant, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep_until(Instant::now() + Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_until"))] +#[track_caller] +pub fn sleep_until(deadline: Instant) -> Sleep { + return Sleep::new_timeout(deadline, trace::caller_location()); +} + +/// Waits until `duration` has elapsed. +/// +/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous +/// analog to `std::thread::sleep`. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years). +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep(Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_for"))] +#[cfg_attr(docsrs, doc(alias = "wait"))] +#[track_caller] +pub fn sleep(duration: Duration) -> Sleep { + let location = trace::caller_location(); + + match Instant::now().checked_add(duration) { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::new_timeout(Instant::far_future(), location), + } +} + +pin_project! { + /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). + /// + /// This type does not implement the `Unpin` trait, which means that if you + /// use it with [`select!`] or by calling `poll`, you have to pin it first. + /// If you use it with `.await`, this does not apply. + /// + /// # Examples + /// + /// Wait 100ms and print "100 ms have elapsed". + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// sleep(Duration::from_millis(100)).await; + /// println!("100 ms have elapsed"); + /// } + /// ``` + /// + /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is + /// necessary when the same `Sleep` is selected on multiple times. + /// ```no_run + /// use tokio::time::{self, Duration, Instant}; + /// + /// #[tokio::main] + /// async fn main() { + /// let sleep = time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// loop { + /// tokio::select! { + /// () = &mut sleep => { + /// println!("timer elapsed"); + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); + /// }, + /// } + /// } + /// } + /// ``` + /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the + /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// + /// struct HasSleep { + /// sleep: Pin<Box<Sleep>>, + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.sleep.as_mut().poll(cx) + /// } + /// } + /// ``` + /// Use in a struct with pin projection. This method avoids the `Box`, but + /// the `HasSleep` struct will not be `Unpin` as a consequence. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// use pin_project_lite::pin_project; + /// + /// pin_project! { + /// struct HasSleep { + /// #[pin] + /// sleep: Sleep, + /// } + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.project().sleep.poll(cx) + /// } + /// } + /// ``` + /// + /// [`select!`]: ../macro.select.html + /// [`tokio::pin!`]: ../macro.pin.html + // Alias for old name in 0.2 + #[cfg_attr(docsrs, doc(alias = "Delay"))] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Sleep { + inner: Inner, + + // The link between the `Sleep` instance and the timer that drives it. + #[pin] + entry: TimerEntry, + } +} + +cfg_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + ctx: trace::AsyncOpTracingCtx, + time_source: ClockTime, + } +} + +cfg_not_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + } +} + +impl Sleep { + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { + let handle = Handle::current(); + let entry = TimerEntry::new(&handle, deadline); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let time_source = handle.time_source().clone(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); + + let location = location.expect("should have location if tracing"); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; + + Inner { + deadline, + ctx, + time_source, + } + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner { deadline }; + + Sleep { inner, entry } + } + + pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { + Self::new_timeout(Instant::far_future(), location) + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.inner.deadline + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.entry.is_elapsed() + } + + /// Resets the `Sleep` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method without consuming the + /// `Sleep` itself. + /// + /// # Example + /// + /// ``` + /// use tokio::time::{Duration, Instant}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let sleep = tokio::time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); + /// # } + /// ``` + /// + /// See also the top-level examples. + /// + /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.reset_inner(deadline) + } + + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { + let me = self.project(); + me.entry.reset(deadline); + (*me.inner).deadline = deadline; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + let _resource_enter = me.inner.ctx.resource_span.enter(); + me.inner.ctx.async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + let _async_op_enter = me.inner.ctx.async_op_span.enter(); + + me.inner.ctx.async_op_poll_span = + tracing::trace_span!("runtime.resource.async_op.poll"); + + let duration = { + let now = me.inner.time_source.now(); + let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + deadline_tick.checked_sub(now).unwrap_or(0) + }; + + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + } + } + + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let me = self.project(); + + // Keep track of task budget + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::coop::poll_proceed(cx), + )); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); + + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_elapsed", result); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return result; + } +} + +impl Future for Sleep { + type Output = (); + + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathological case where far too many + // sleep instances have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.inner.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.inner.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); + match ready!(self.as_mut().poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/tests/mod.rs b/third_party/rust/tokio/src/time/driver/tests/mod.rs new file mode 100644 index 0000000000..3ac8c75643 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/tests/mod.rs @@ -0,0 +1,301 @@ +use std::{task::Context, time::Duration}; + +#[cfg(not(loom))] +use futures::task::noop_waker_ref; + +use crate::loom::sync::Arc; +use crate::loom::thread; +use crate::{ + loom::sync::atomic::{AtomicBool, Ordering}, + park::Unpark, +}; + +use super::{Handle, TimerEntry}; + +struct MockUnpark {} +impl Unpark for MockUnpark { + fn unpark(&self) {} +} +impl MockUnpark { + fn mock() -> Box<dyn Unpark> { + Box::new(Self {}) + } +} + +fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { + #[cfg(loom)] + return loom::future::block_on(f); + + #[cfg(not(loom))] + { + let rt = crate::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(f) + } +} + +fn model(f: impl Fn() + Send + Sync + 'static) { + #[cfg(loom)] + loom::model(f); + + #[cfg(not(loom))] + f(); +} + +#[test] +fn single_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // This may or may not return Some (depending on how it races with the + // thread). If it does return None, however, the timer should complete + // synchronously. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn drop_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + }); + + thread::yield_now(); + + // advance 2s in the future. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn change_waker() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // advance 2s + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn reset_future() { + model(|| { + let finished_early = Arc::new(AtomicBool::new(false)); + + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let finished_early_ = finished_early.clone(); + let start = clock.now(); + + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + entry.as_mut().reset(start + Duration::from_secs(2)); + + // shouldn't complete before 2s + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + + finished_early_.store(true, Ordering::Relaxed); + }); + + thread::yield_now(); + + // This may or may not return a wakeup time. + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500))); + + assert!(!finished_early.load(Ordering::Relaxed)); + + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500))); + + jh.join().unwrap(); + + assert!(finished_early.load(Ordering::Relaxed)); + }) +} + +#[cfg(not(loom))] +fn normal_or_miri<T>(normal: T, miri: T) -> T { + if cfg!(miri) { + miri + } else { + normal + } +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels() { + let clock = crate::time::clock::Clock::new(true, false); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let mut entries = vec![]; + + for i in 0..normal_or_miri(1024, 64) { + let mut entry = Box::pin(TimerEntry::new( + &handle, + clock.now() + Duration::from_millis(i), + )); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); + + entries.push(entry); + } + + for t in 1..normal_or_miri(1024, 64) { + handle.process_at_time(t as u64); + for (deadline, future) in entries.iter_mut().enumerate() { + let mut context = Context::from_waker(noop_waker_ref()); + if deadline <= t { + assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); + } else { + assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); + } + } + } +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels_targeted() { + let mut context = Context::from_waker(noop_waker_ref()); + + let clock = crate::time::clock::Clock::new(true, false); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); + pin!(e1); + + handle.process_at_time(62); + assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); + handle.process_at_time(192); + handle.process_at_time(192); +} + +/* +#[test] +fn balanced_incr_and_decr() { + const OPS: usize = 5; + + fn incr(inner: Arc<Inner>) { + for _ in 0..OPS { + inner.increment().expect("increment should not have failed"); + thread::yield_now(); + } + } + + fn decr(inner: Arc<Inner>) { + let mut ops_performed = 0; + while ops_performed < OPS { + if inner.num(Ordering::Relaxed) > 0 { + ops_performed += 1; + inner.decrement(); + } + thread::yield_now(); + } + } + + loom::model(|| { + let unpark = Box::new(MockUnpark); + let instant = Instant::now(); + + let inner = Arc::new(Inner::new(instant, unpark)); + + let incr_inner = inner.clone(); + let decr_inner = inner.clone(); + + let incr_hndle = thread::spawn(move || incr(incr_inner)); + let decr_hndle = thread::spawn(move || decr(decr_inner)); + + incr_hndle.join().expect("should never fail"); + decr_hndle.join().expect("should never fail"); + + assert_eq!(inner.num(Ordering::SeqCst), 0); + }) +} +*/ diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs new file mode 100644 index 0000000000..878754177b --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs @@ -0,0 +1,276 @@ +use crate::time::driver::TimerHandle; + +use crate::time::driver::{EntryList, TimerShared}; + +use std::{fmt, ptr::NonNull}; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell. + slot: [EntryList; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub(crate) struct Expiration { + /// The level containing the slot. + pub(crate) level: usize, + + /// The slot index. + pub(crate) slot: usize, + + /// The instant at which the slot needs to be processed. + pub(crate) deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = EntryList::default; + + Level { + level, + occupied: 0, + slot: [ + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // Compute the start date of the current level by masking the low bits + // of `now` (`level_range` is a power of 2). + let level_start = now & !(level_range - 1); + let mut deadline = level_start + slot as u64 * slot_range; + + if deadline <= now { + // A timer is in a slot "prior" to the current time. This can occur + // because we do not have an infinite hierarchy of timer levels, and + // eventually a timer scheduled for a very distant time might end up + // being placed in a slot that is beyond the end of all of the + // arrays. + // + // To deal with this, we first limit timers to being scheduled no + // more than MAX_DURATION ticks in the future; that is, they're at + // most one rotation of the top level away. Then, we force timers + // that logically would go into the top+1 level, to instead go into + // the top level's slots. + // + // What this means is that the top level's slots act as a + // pseudo-ring buffer, and we rotate around them indefinitely. If we + // compute a deadline before now, and it's the top level, it + // therefore means we're actually looking at a slot in the future. + debug_assert_eq!(self.level, super::NUM_LEVELS - 1); + + deadline += level_range; + } + + debug_assert!( + deadline >= now, + "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}", + deadline, + now, + self.level, + level_range, + slot_range, + slot, + self.occupied + ); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option<usize> { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) { + let slot = slot_for(item.cached_when(), self.level); + + self.slot[slot].push_front(item); + + self.occupied |= occupied_bit(slot); + } + + pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) { + let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level); + + unsafe { self.slot[slot].remove(item) }; + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList { + self.occupied &= !occupied_bit(slot); + + std::mem::take(&mut self.slot[slot]) + } +} + +impl fmt::Debug for Level { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + 1 << slot +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Converts a duration (milliseconds) and a level to a slot position. +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 0..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs new file mode 100644 index 0000000000..f088f2cfd6 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs @@ -0,0 +1,359 @@ +use crate::time::driver::{TimerHandle, TimerShared}; +use crate::time::error::InsertError; + +mod level; +pub(crate) use self::level::Expiration; +use self::level::Level; + +use std::ptr::NonNull; + +use super::EntryList; + +/// Timing wheel implementation. +/// +/// This type provides the hashed timing wheel implementation that backs `Timer` +/// and `DelayQueue`. +/// +/// The structure is generic over `T: Stack`. This allows handling timeout data +/// being stored on the heap or in a slab. In order to support the latter case, +/// the slab must be passed into each function allowing the implementation to +/// lookup timer entries. +/// +/// See `Timer` documentation for some implementation notes. +#[derive(Debug)] +pub(crate) struct Wheel { + /// The number of milliseconds elapsed since the wheel started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec<Level>, + + /// Entries queued for firing + pending: EntryList, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a `Sleep`. +pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +impl Wheel { + /// Creates a new timing wheel. + pub(crate) fn new() -> Wheel { + let levels = (0..NUM_LEVELS).map(Level::new).collect(); + + Wheel { + elapsed: 0, + levels, + pending: EntryList::new(), + } + } + + /// Returns the number of milliseconds that have elapsed since the timing + /// wheel's creation. + pub(crate) fn elapsed(&self) -> u64 { + self.elapsed + } + + /// Inserts an entry into the timing wheel. + /// + /// # Arguments + /// + /// * `item`: The item to insert into the wheel. + /// + /// # Return + /// + /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. + /// + /// `Err(Elapsed)` indicates that `when` represents an instant that has + /// already passed. In this case, the caller should fire the timeout + /// immediately. + /// + /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + /// + /// # Safety + /// + /// This function registers item into an intrusive linked list. The caller + /// must ensure that `item` is pinned and will not be dropped without first + /// being deregistered. + pub(crate) unsafe fn insert( + &mut self, + item: TimerHandle, + ) -> Result<u64, (TimerHandle, InsertError)> { + let when = item.sync_when(); + + if when <= self.elapsed { + return Err((item, InsertError::Elapsed)); + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + unsafe { + self.levels[level].add_entry(item); + } + + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + + Ok(when) + } + + /// Removes `item` from the timing wheel. + pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) { + unsafe { + let when = item.as_ref().cached_when(); + if when == u64::MAX { + self.pending.remove(item); + } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + + let level = self.level_for(when); + + self.levels[level].remove_entry(item); + } + } + } + + /// Instant at which to poll. + pub(crate) fn poll_at(&self) -> Option<u64> { + self.next_expiration().map(|expiration| expiration.deadline) + } + + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> { + loop { + if let Some(handle) = self.pending.pop_back() { + return Some(handle); + } + + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); + + match expiration { + Some(ref expiration) if expiration.deadline > now => return None, + Some(ref expiration) => { + self.process_expiration(expiration); + + self.set_elapsed(expiration.deadline); + } + None => { + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); + break; + } + } + } + + self.pending.pop_back() + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option<Expiration> { + if !self.pending.is_empty() { + // Expire immediately as we have things pending firing + return Some(Expiration { + level: 0, + slot: 0, + deadline: self.elapsed, + }); + } + + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + + return Some(expiration); + } + } + + None + } + + /// Returns the tick at which this timer wheel next needs to perform some + /// processing, or None if there are no timers registered. + pub(super) fn next_expiration_time(&self) -> Option<u64> { + self.next_expiration().map(|ex| ex.deadline) + } + + /// Used for debug assertions + fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { + let mut res = true; + + for l2 in start_level..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < before { + res = false; + } + } + } + + res + } + + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// queue it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn process_expiration(&mut self, expiration: &Expiration) { + // Note that we need to take _all_ of the entries off the list before + // processing any of them. This is important because it's possible that + // those entries might need to be reinserted into the same slot. + // + // This happens only on the highest level, when an entry is inserted + // more than MAX_DURATION into the future. When this happens, we wrap + // around, and process some entries a multiple of MAX_DURATION before + // they actually need to be dropped down a level. We then reinsert them + // back into the same position; we must make sure we don't then process + // those entries again or we'll end up in an infinite loop. + let mut entries = self.take_entries(expiration); + + while let Some(item) = entries.pop_back() { + if expiration.level == 0 { + debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline); + } + + // Try to expire the entry; this is cheap (doesn't synchronize) if + // the timer is not expired, and updates cached_when. + match unsafe { item.mark_pending(expiration.deadline) } { + Ok(()) => { + // Item was expired + self.pending.push_front(item); + } + Err(expiration_tick) => { + let level = level_for(expiration.deadline, expiration_tick); + unsafe { + self.levels[level].add_entry(item); + } + } + } + } + } + + fn set_elapsed(&mut self, when: u64) { + assert!( + self.elapsed <= when, + "elapsed={:?}; when={:?}", + self.elapsed, + when + ); + + if when > self.elapsed { + self.elapsed = when; + } + } + + /// Obtains the list of entries that need processing for the given expiration. + /// + fn take_entries(&mut self, expiration: &Expiration) -> EntryList { + self.levels[expiration.level].take_slot(expiration.slot) + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; + + if masked >= MAX_DURATION { + // Fudge the timer into the top level + masked = MAX_DURATION - 1; + } + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + + significant / 6 +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 0..64 { + assert_eq!( + 0, + level_for(0, pos), + "level_for({}) -- binary = {:b}", + pos, + pos + ); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + + if pos > level { + let a = a - 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + + if pos < 64 { + let a = a + 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs new file mode 100644 index 0000000000..80651c309e --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs @@ -0,0 +1,112 @@ +use super::{Item, OwnedItem}; +use crate::time::driver::Entry; + +use std::ptr; + +/// A doubly linked stack. +#[derive(Debug)] +pub(crate) struct Stack { + head: Option<OwnedItem>, +} + +impl Default for Stack { + fn default() -> Stack { + Stack { head: None } + } +} + +impl Stack { + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } + + pub(crate) fn push(&mut self, entry: OwnedItem) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pops an item from the stack. + pub(crate) fn pop(&mut self) -> Option<OwnedItem> { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + pub(crate) fn remove(&mut self, entry: &Item) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } + + contains + }); + + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); + + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } + + // Unlink `entry` from the prev node + + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } + + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } +} diff --git a/third_party/rust/tokio/src/time/error.rs b/third_party/rust/tokio/src/time/error.rs new file mode 100644 index 0000000000..63f0a3b0bd --- /dev/null +++ b/third_party/rust/tokio/src/time/error.rs @@ -0,0 +1,120 @@ +//! Time error types. + +use self::Kind::*; +use std::error; +use std::fmt; + +/// Errors encountered by the timer implementation. +/// +/// Currently, there are two different errors that can occur: +/// +/// * `shutdown` occurs when a timer operation is attempted, but the timer +/// instance has been dropped. In this case, the operation will never be able +/// to complete and the `shutdown` error is returned. This is a permanent +/// error, i.e., once this error is observed, timer operations will never +/// succeed in the future. +/// +/// * `at_capacity` occurs when a timer operation is attempted, but the timer +/// instance is currently handling its maximum number of outstanding sleep instances. +/// In this case, the operation is not able to be performed at the current +/// moment, and `at_capacity` is returned. This is a transient error, i.e., at +/// some point in the future, if the operation is attempted again, it might +/// succeed. Callers that observe this error should attempt to [shed load]. One +/// way to do this would be dropping the future that issued the timer operation. +/// +/// [shed load]: https://en.wikipedia.org/wiki/Load_Shedding +#[derive(Debug, Copy, Clone)] +pub struct Error(Kind); + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[repr(u8)] +pub(crate) enum Kind { + Shutdown = 1, + AtCapacity = 2, + Invalid = 3, +} + +impl From<Kind> for Error { + fn from(k: Kind) -> Self { + Error(k) + } +} + +/// Errors returned by `Timeout`. +#[derive(Debug, PartialEq)] +pub struct Elapsed(()); + +#[derive(Debug)] +pub(crate) enum InsertError { + Elapsed, +} + +// ===== impl Error ===== + +impl Error { + /// Creates an error representing a shutdown timer. + pub fn shutdown() -> Error { + Error(Shutdown) + } + + /// Returns `true` if the error was caused by the timer being shutdown. + pub fn is_shutdown(&self) -> bool { + matches!(self.0, Kind::Shutdown) + } + + /// Creates an error representing a timer at capacity. + pub fn at_capacity() -> Error { + Error(AtCapacity) + } + + /// Returns `true` if the error was caused by the timer being at capacity. + pub fn is_at_capacity(&self) -> bool { + matches!(self.0, Kind::AtCapacity) + } + + /// Creates an error representing a misconfigured timer. + pub fn invalid() -> Error { + Error(Invalid) + } + + /// Returns `true` if the error was caused by the timer being misconfigured. + pub fn is_invalid(&self) -> bool { + matches!(self.0, Kind::Invalid) + } +} + +impl error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use self::Kind::*; + let descr = match self.0 { + Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime", + AtCapacity => "timer is at capacity and cannot create a new entry", + Invalid => "timer duration exceeds maximum duration", + }; + write!(fmt, "{}", descr) + } +} + +// ===== impl Elapsed ===== + +impl Elapsed { + pub(crate) fn new() -> Self { + Elapsed(()) + } +} + +impl fmt::Display for Elapsed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + "deadline has elapsed".fmt(fmt) + } +} + +impl std::error::Error for Elapsed {} + +impl From<Elapsed> for std::io::Error { + fn from(_err: Elapsed) -> std::io::Error { + std::io::ErrorKind::TimedOut.into() + } +} diff --git a/third_party/rust/tokio/src/time/instant.rs b/third_party/rust/tokio/src/time/instant.rs new file mode 100644 index 0000000000..f18492930a --- /dev/null +++ b/third_party/rust/tokio/src/time/instant.rs @@ -0,0 +1,223 @@ +#![allow(clippy::trivially_copy_pass_by_ref)] + +use std::fmt; +use std::ops; +use std::time::Duration; + +/// A measurement of a monotonically nondecreasing clock. +/// Opaque and useful only with `Duration`. +/// +/// Instants are always guaranteed to be no less than any previously measured +/// instant when created, and are often useful for tasks such as measuring +/// benchmarks or timing how long an operation takes. +/// +/// Note, however, that instants are not guaranteed to be **steady**. In other +/// words, each tick of the underlying clock may not be the same length (e.g. +/// some seconds may be longer than others). An instant may jump forwards or +/// experience time dilation (slow down or speed up), but it will never go +/// backwards. +/// +/// Instants are opaque types that can only be compared to one another. There is +/// no method to get "the number of seconds" from an instant. Instead, it only +/// allows measuring the duration between two instants (or comparing two +/// instants). +/// +/// The size of an `Instant` struct may vary depending on the target operating +/// system. +/// +/// # Note +/// +/// This type wraps the inner `std` variant and is used to align the Tokio +/// clock for uses of `now()`. This can be useful for testing where you can +/// take advantage of `time::pause()` and `time::advance()`. +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct Instant { + std: std::time::Instant, +} + +impl Instant { + /// Returns an instant corresponding to "now". + /// + /// # Examples + /// + /// ``` + /// use tokio::time::Instant; + /// + /// let now = Instant::now(); + /// ``` + pub fn now() -> Instant { + variant::now() + } + + /// Create a `tokio::time::Instant` from a `std::time::Instant`. + pub fn from_std(std: std::time::Instant) -> Instant { + Instant { std } + } + + pub(crate) fn far_future() -> Instant { + // Roughly 30 years from now. + // API does not provide a way to obtain max `Instant` + // or convert specific date in the future to instant. + // 1000 years overflows on macOS, 100 years overflows on FreeBSD. + Self::now() + Duration::from_secs(86400 * 365 * 30) + } + + /// Convert the value into a `std::time::Instant`. + pub fn into_std(self) -> std::time::Instant { + self.std + } + + /// Returns the amount of time elapsed from another instant to this one, or + /// zero duration if that instant is later than this one. + pub fn duration_since(&self, earlier: Instant) -> Duration { + self.std.saturating_duration_since(earlier.std) + } + + /// Returns the amount of time elapsed from another instant to this one, or + /// None if that instant is later than this one. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::{Duration, Instant, sleep}; + /// + /// #[tokio::main] + /// async fn main() { + /// let now = Instant::now(); + /// sleep(Duration::new(1, 0)).await; + /// let new_now = Instant::now(); + /// println!("{:?}", new_now.checked_duration_since(now)); + /// println!("{:?}", now.checked_duration_since(new_now)); // None + /// } + /// ``` + pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> { + self.std.checked_duration_since(earlier.std) + } + + /// Returns the amount of time elapsed from another instant to this one, or + /// zero duration if that instant is later than this one. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::{Duration, Instant, sleep}; + /// + /// #[tokio::main] + /// async fn main() { + /// let now = Instant::now(); + /// sleep(Duration::new(1, 0)).await; + /// let new_now = Instant::now(); + /// println!("{:?}", new_now.saturating_duration_since(now)); + /// println!("{:?}", now.saturating_duration_since(new_now)); // 0ns + /// } + /// ``` + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + self.std.saturating_duration_since(earlier.std) + } + + /// Returns the amount of time elapsed since this instant was created, + /// or zero duration if that this instant is in the future. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::{Duration, Instant, sleep}; + /// + /// #[tokio::main] + /// async fn main() { + /// let instant = Instant::now(); + /// let three_secs = Duration::from_secs(3); + /// sleep(three_secs).await; + /// assert!(instant.elapsed() >= three_secs); + /// } + /// ``` + pub fn elapsed(&self) -> Duration { + Instant::now().saturating_duration_since(*self) + } + + /// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be + /// represented as `Instant` (which means it's inside the bounds of the + /// underlying data structure), `None` otherwise. + pub fn checked_add(&self, duration: Duration) -> Option<Instant> { + self.std.checked_add(duration).map(Instant::from_std) + } + + /// Returns `Some(t)` where `t` is the time `self - duration` if `t` can be + /// represented as `Instant` (which means it's inside the bounds of the + /// underlying data structure), `None` otherwise. + pub fn checked_sub(&self, duration: Duration) -> Option<Instant> { + self.std.checked_sub(duration).map(Instant::from_std) + } +} + +impl From<std::time::Instant> for Instant { + fn from(time: std::time::Instant) -> Instant { + Instant::from_std(time) + } +} + +impl From<Instant> for std::time::Instant { + fn from(time: Instant) -> std::time::Instant { + time.into_std() + } +} + +impl ops::Add<Duration> for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + Instant::from_std(self.std + other) + } +} + +impl ops::AddAssign<Duration> for Instant { + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl ops::Sub for Instant { + type Output = Duration; + + fn sub(self, rhs: Instant) -> Duration { + self.std.saturating_duration_since(rhs.std) + } +} + +impl ops::Sub<Duration> for Instant { + type Output = Instant; + + fn sub(self, rhs: Duration) -> Instant { + Instant::from_std(self.std - rhs) + } +} + +impl ops::SubAssign<Duration> for Instant { + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl fmt::Debug for Instant { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.std.fmt(fmt) + } +} + +#[cfg(not(feature = "test-util"))] +mod variant { + use super::Instant; + + pub(super) fn now() -> Instant { + Instant::from_std(std::time::Instant::now()) + } +} + +#[cfg(feature = "test-util")] +mod variant { + use super::Instant; + + pub(super) fn now() -> Instant { + crate::time::clock::now() + } +} diff --git a/third_party/rust/tokio/src/time/interval.rs b/third_party/rust/tokio/src/time/interval.rs new file mode 100644 index 0000000000..8ecb15b389 --- /dev/null +++ b/third_party/rust/tokio/src/time/interval.rs @@ -0,0 +1,531 @@ +use crate::future::poll_fn; +use crate::time::{sleep_until, Duration, Instant, Sleep}; +use crate::util::trace; + +use std::panic::Location; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{convert::TryInto, future::Future}; + +/// Creates new [`Interval`] that yields with interval of `period`. The first +/// tick completes immediately. The default [`MissedTickBehavior`] is +/// [`Burst`](MissedTickBehavior::Burst), but this can be configured +/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior). +/// +/// An interval will tick indefinitely. At any time, the [`Interval`] value can +/// be dropped. This cancels the interval. +/// +/// This function is equivalent to +/// [`interval_at(Instant::now(), period)`](interval_at). +/// +/// # Panics +/// +/// This function panics if `period` is zero. +/// +/// # Examples +/// +/// ``` +/// use tokio::time::{self, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut interval = time::interval(Duration::from_millis(10)); +/// +/// interval.tick().await; // ticks immediately +/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks after 10ms +/// +/// // approximately 20ms have elapsed. +/// } +/// ``` +/// +/// A simple example using `interval` to execute a task every two seconds. +/// +/// The difference between `interval` and [`sleep`] is that an [`Interval`] +/// measures the time since the last tick, which means that [`.tick().await`] +/// may wait for a shorter time than the duration specified for the interval +/// if some time has passed between calls to [`.tick().await`]. +/// +/// If the tick in the example below was replaced with [`sleep`], the task +/// would only be executed once every three seconds, and not every two +/// seconds. +/// +/// ``` +/// use tokio::time; +/// +/// async fn task_that_takes_a_second() { +/// println!("hello"); +/// time::sleep(time::Duration::from_secs(1)).await +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let mut interval = time::interval(time::Duration::from_secs(2)); +/// for _i in 0..5 { +/// interval.tick().await; +/// task_that_takes_a_second().await; +/// } +/// } +/// ``` +/// +/// [`sleep`]: crate::time::sleep() +/// [`.tick().await`]: Interval::tick +#[track_caller] +pub fn interval(period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + internal_interval_at(Instant::now(), period, trace::caller_location()) +} + +/// Creates new [`Interval`] that yields with interval of `period` with the +/// first tick completing at `start`. The default [`MissedTickBehavior`] is +/// [`Burst`](MissedTickBehavior::Burst), but this can be configured +/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior). +/// +/// An interval will tick indefinitely. At any time, the [`Interval`] value can +/// be dropped. This cancels the interval. +/// +/// # Panics +/// +/// This function panics if `period` is zero. +/// +/// # Examples +/// +/// ``` +/// use tokio::time::{interval_at, Duration, Instant}; +/// +/// #[tokio::main] +/// async fn main() { +/// let start = Instant::now() + Duration::from_millis(50); +/// let mut interval = interval_at(start, Duration::from_millis(10)); +/// +/// interval.tick().await; // ticks after 50ms +/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks after 10ms +/// +/// // approximately 70ms have elapsed. +/// } +/// ``` +#[track_caller] +pub fn interval_at(start: Instant, period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + internal_interval_at(start, period, trace::caller_location()) +} + +#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] +fn internal_interval_at( + start: Instant, + period: Duration, + location: Option<&'static Location<'static>>, +) -> Interval { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = location.expect("should have location if tracing"); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Interval", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let delay = Box::pin(sleep_until(start)); + + Interval { + delay, + period, + missed_tick_behavior: Default::default(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } +} + +/// Defines the behavior of an [`Interval`] when it misses a tick. +/// +/// Sometimes, an [`Interval`]'s tick is missed. For example, consider the +/// following: +/// +/// ``` +/// use tokio::time::{self, Duration}; +/// # async fn task_that_takes_one_to_three_millis() {} +/// +/// #[tokio::main] +/// async fn main() { +/// // ticks every 2 milliseconds +/// let mut interval = time::interval(Duration::from_millis(2)); +/// for _ in 0..5 { +/// interval.tick().await; +/// // if this takes more than 2 milliseconds, a tick will be delayed +/// task_that_takes_one_to_three_millis().await; +/// } +/// } +/// ``` +/// +/// Generally, a tick is missed if too much time is spent without calling +/// [`Interval::tick()`]. +/// +/// By default, when a tick is missed, [`Interval`] fires ticks as quickly as it +/// can until it is "caught up" in time to where it should be. +/// `MissedTickBehavior` can be used to specify a different behavior for +/// [`Interval`] to exhibit. Each variant represents a different strategy. +/// +/// Note that because the executor cannot guarantee exact precision with timers, +/// these strategies will only apply when the delay is greater than 5 +/// milliseconds. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MissedTickBehavior { + /// Ticks as fast as possible until caught up. + /// + /// When this strategy is used, [`Interval`] schedules ticks "normally" (the + /// same as it would have if the ticks hadn't been delayed), which results + /// in it firing ticks as fast as possible until it is caught up in time to + /// where it should be. Unlike [`Delay`] and [`Skip`], the ticks yielded + /// when `Burst` is used (the [`Instant`]s that [`tick`](Interval::tick) + /// yields) aren't different than they would have been if a tick had not + /// been missed. Like [`Skip`], and unlike [`Delay`], the ticks may be + /// shortened. + /// + /// This looks something like this: + /// ```text + /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | + /// Actual ticks: | work -----| delay | work | work | work -| work -----| + /// ``` + /// + /// In code: + /// + /// ``` + /// use tokio::time::{interval, Duration}; + /// # async fn task_that_takes_200_millis() {} + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let mut interval = interval(Duration::from_millis(50)); + /// + /// task_that_takes_200_millis().await; + /// // The `Interval` has missed a tick + /// + /// // Since we have exceeded our timeout, this will resolve immediately + /// interval.tick().await; + /// + /// // Since we are more than 100ms after the start of `interval`, this will + /// // also resolve immediately. + /// interval.tick().await; + /// + /// // Also resolves immediately, because it was supposed to resolve at + /// // 150ms after the start of `interval` + /// interval.tick().await; + /// + /// // Resolves immediately + /// interval.tick().await; + /// + /// // Since we have gotten to 200ms after the start of `interval`, this + /// // will resolve after 50ms + /// interval.tick().await; + /// # } + /// ``` + /// + /// This is the default behavior when [`Interval`] is created with + /// [`interval`] and [`interval_at`]. + /// + /// [`Delay`]: MissedTickBehavior::Delay + /// [`Skip`]: MissedTickBehavior::Skip + Burst, + + /// Tick at multiples of `period` from when [`tick`] was called, rather than + /// from `start`. + /// + /// When this strategy is used and [`Interval`] has missed a tick, instead + /// of scheduling ticks to fire at multiples of `period` from `start` (the + /// time when the first tick was fired), it schedules all future ticks to + /// happen at a regular `period` from the point when [`tick`] was called. + /// Unlike [`Burst`] and [`Skip`], ticks are not shortened, and they aren't + /// guaranteed to happen at a multiple of `period` from `start` any longer. + /// + /// This looks something like this: + /// ```text + /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | + /// Actual ticks: | work -----| delay | work -----| work -----| work -----| + /// ``` + /// + /// In code: + /// + /// ``` + /// use tokio::time::{interval, Duration, MissedTickBehavior}; + /// # async fn task_that_takes_more_than_50_millis() {} + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let mut interval = interval(Duration::from_millis(50)); + /// interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + /// + /// task_that_takes_more_than_50_millis().await; + /// // The `Interval` has missed a tick + /// + /// // Since we have exceeded our timeout, this will resolve immediately + /// interval.tick().await; + /// + /// // But this one, rather than also resolving immediately, as might happen + /// // with the `Burst` or `Skip` behaviors, will not resolve until + /// // 50ms after the call to `tick` up above. That is, in `tick`, when we + /// // recognize that we missed a tick, we schedule the next tick to happen + /// // 50ms (or whatever the `period` is) from right then, not from when + /// // were were *supposed* to tick + /// interval.tick().await; + /// # } + /// ``` + /// + /// [`Burst`]: MissedTickBehavior::Burst + /// [`Skip`]: MissedTickBehavior::Skip + /// [`tick`]: Interval::tick + Delay, + + /// Skips missed ticks and tick on the next multiple of `period` from + /// `start`. + /// + /// When this strategy is used, [`Interval`] schedules the next tick to fire + /// at the next-closest tick that is a multiple of `period` away from + /// `start` (the point where [`Interval`] first ticked). Like [`Burst`], all + /// ticks remain multiples of `period` away from `start`, but unlike + /// [`Burst`], the ticks may not be *one* multiple of `period` away from the + /// last tick. Like [`Delay`], the ticks are no longer the same as they + /// would have been if ticks had not been missed, but unlike [`Delay`], and + /// like [`Burst`], the ticks may be shortened to be less than one `period` + /// away from each other. + /// + /// This looks something like this: + /// ```text + /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | + /// Actual ticks: | work -----| delay | work ---| work -----| work -----| + /// ``` + /// + /// In code: + /// + /// ``` + /// use tokio::time::{interval, Duration, MissedTickBehavior}; + /// # async fn task_that_takes_75_millis() {} + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let mut interval = interval(Duration::from_millis(50)); + /// interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + /// + /// task_that_takes_75_millis().await; + /// // The `Interval` has missed a tick + /// + /// // Since we have exceeded our timeout, this will resolve immediately + /// interval.tick().await; + /// + /// // This one will resolve after 25ms, 100ms after the start of + /// // `interval`, which is the closest multiple of `period` from the start + /// // of `interval` after the call to `tick` up above. + /// interval.tick().await; + /// # } + /// ``` + /// + /// [`Burst`]: MissedTickBehavior::Burst + /// [`Delay`]: MissedTickBehavior::Delay + Skip, +} + +impl MissedTickBehavior { + /// If a tick is missed, this method is called to determine when the next tick should happen. + fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant { + match self { + Self::Burst => timeout + period, + Self::Delay => now + period, + Self::Skip => { + now + period + - Duration::from_nanos( + ((now - timeout).as_nanos() % period.as_nanos()) + .try_into() + // This operation is practically guaranteed not to + // fail, as in order for it to fail, `period` would + // have to be longer than `now - timeout`, and both + // would have to be longer than 584 years. + // + // If it did fail, there's not a good way to pass + // the error along to the user, so we just panic. + .expect( + "too much time has elapsed since the interval was supposed to tick", + ), + ) + } + } + } +} + +impl Default for MissedTickBehavior { + /// Returns [`MissedTickBehavior::Burst`]. + /// + /// For most usecases, the [`Burst`] strategy is what is desired. + /// Additionally, to preserve backwards compatibility, the [`Burst`] + /// strategy must be the default. For these reasons, + /// [`MissedTickBehavior::Burst`] is the default for [`MissedTickBehavior`]. + /// See [`Burst`] for more details. + /// + /// [`Burst`]: MissedTickBehavior::Burst + fn default() -> Self { + Self::Burst + } +} + +/// Interval returned by [`interval`] and [`interval_at`]. +/// +/// This type allows you to wait on a sequence of instants with a certain +/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets +/// you count the time spent between the calls to [`sleep`] as well. +/// +/// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. +/// +/// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html +/// [`sleep`]: crate::time::sleep +#[derive(Debug)] +pub struct Interval { + /// Future that completes the next time the `Interval` yields a value. + delay: Pin<Box<Sleep>>, + + /// The duration between values yielded by `Interval`. + period: Duration, + + /// The strategy `Interval` should use when a tick is missed. + missed_tick_behavior: MissedTickBehavior, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, +} + +impl Interval { + /// Completes when the next instant in the interval has been reached. + /// + /// # Cancel safety + /// + /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and + /// another branch completes first, then no tick has been consumed. + /// + /// # Examples + /// + /// ``` + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut interval = time::interval(Duration::from_millis(10)); + /// + /// interval.tick().await; + /// interval.tick().await; + /// interval.tick().await; + /// + /// // approximately 20ms have elapsed. + /// } + /// ``` + pub async fn tick(&mut self) -> Instant { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let instant = trace::async_op( + || poll_fn(|cx| self.poll_tick(cx)), + resource_span, + "Interval::tick", + "poll_tick", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let instant = poll_fn(|cx| self.poll_tick(cx)); + + instant.await + } + + /// Polls for the next instant in the interval to be reached. + /// + /// This method can return the following values: + /// + /// * `Poll::Pending` if the next instant has not yet been reached. + /// * `Poll::Ready(instant)` if the next instant has been reached. + /// + /// When this method returns `Poll::Pending`, the current task is scheduled + /// to receive a wakeup when the instant has elapsed. Note that on multiple + /// calls to `poll_tick`, only the [`Waker`](std::task::Waker) from the + /// [`Context`] passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> { + // Wait for the delay to be done + ready!(Pin::new(&mut self.delay).poll(cx)); + + // Get the time when we were scheduled to tick + let timeout = self.delay.deadline(); + + let now = Instant::now(); + + // If a tick was not missed, and thus we are being called before the + // next tick is due, just schedule the next tick normally, one `period` + // after `timeout` + // + // However, if a tick took excessively long and we are now behind, + // schedule the next tick according to how the user specified with + // `MissedTickBehavior` + let next = if now > timeout + Duration::from_millis(5) { + self.missed_tick_behavior + .next_timeout(timeout, now, self.period) + } else { + timeout + self.period + }; + + self.delay.as_mut().reset(next); + + // Return the time when we were scheduled to tick + Poll::Ready(timeout) + } + + /// Resets the interval to complete one period after the current time. + /// + /// This method ignores [`MissedTickBehavior`] strategy. + /// + /// # Examples + /// + /// ``` + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut interval = time::interval(Duration::from_millis(100)); + /// + /// interval.tick().await; + /// + /// time::sleep(Duration::from_millis(50)).await; + /// interval.reset(); + /// + /// interval.tick().await; + /// interval.tick().await; + /// + /// // approximately 250ms have elapsed. + /// } + /// ``` + pub fn reset(&mut self) { + self.delay.as_mut().reset(Instant::now() + self.period); + } + + /// Returns the [`MissedTickBehavior`] strategy currently being used. + pub fn missed_tick_behavior(&self) -> MissedTickBehavior { + self.missed_tick_behavior + } + + /// Sets the [`MissedTickBehavior`] strategy that should be used. + pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) { + self.missed_tick_behavior = behavior; + } + + /// Returns the period of the interval. + pub fn period(&self) -> Duration { + self.period + } +} diff --git a/third_party/rust/tokio/src/time/mod.rs b/third_party/rust/tokio/src/time/mod.rs new file mode 100644 index 0000000000..281990ef9a --- /dev/null +++ b/third_party/rust/tokio/src/time/mod.rs @@ -0,0 +1,114 @@ +//! Utilities for tracking time. +//! +//! This module provides a number of types for executing code after a set period +//! of time. +//! +//! * [`Sleep`] is a future that does no work and completes at a specific [`Instant`] +//! in time. +//! +//! * [`Interval`] is a stream yielding a value at a fixed period. It is +//! initialized with a [`Duration`] and repeatedly yields each time the duration +//! elapses. +//! +//! * [`Timeout`]: Wraps a future or stream, setting an upper bound to the amount +//! of time it is allowed to execute. If the future or stream does not +//! complete in time, then it is canceled and an error is returned. +//! +//! These types are sufficient for handling a large number of scenarios +//! involving time. +//! +//! These types must be used from within the context of the [`Runtime`](crate::runtime::Runtime). +//! +//! # Examples +//! +//! Wait 100ms and print "100 ms have elapsed" +//! +//! ``` +//! use std::time::Duration; +//! use tokio::time::sleep; +//! +//! #[tokio::main] +//! async fn main() { +//! sleep(Duration::from_millis(100)).await; +//! println!("100 ms have elapsed"); +//! } +//! ``` +//! +//! Require that an operation takes no more than 1s. +//! +//! ``` +//! use tokio::time::{timeout, Duration}; +//! +//! async fn long_future() { +//! // do work here +//! } +//! +//! # async fn dox() { +//! let res = timeout(Duration::from_secs(1), long_future()).await; +//! +//! if res.is_err() { +//! println!("operation timed out"); +//! } +//! # } +//! ``` +//! +//! A simple example using [`interval`] to execute a task every two seconds. +//! +//! The difference between [`interval`] and [`sleep`] is that an [`interval`] +//! measures the time since the last tick, which means that `.tick().await` may +//! wait for a shorter time than the duration specified for the interval +//! if some time has passed between calls to `.tick().await`. +//! +//! If the tick in the example below was replaced with [`sleep`], the task +//! would only be executed once every three seconds, and not every two +//! seconds. +//! +//! ``` +//! use tokio::time; +//! +//! async fn task_that_takes_a_second() { +//! println!("hello"); +//! time::sleep(time::Duration::from_secs(1)).await +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! let mut interval = time::interval(time::Duration::from_secs(2)); +//! for _i in 0..5 { +//! interval.tick().await; +//! task_that_takes_a_second().await; +//! } +//! } +//! ``` +//! +//! [`interval`]: crate::time::interval() + +mod clock; +pub(crate) use self::clock::Clock; +#[cfg(feature = "test-util")] +pub use clock::{advance, pause, resume}; + +pub(crate) mod driver; + +#[doc(inline)] +pub use driver::sleep::{sleep, sleep_until, Sleep}; + +pub mod error; + +mod instant; +pub use self::instant::Instant; + +mod interval; +pub use interval::{interval, interval_at, Interval, MissedTickBehavior}; + +mod timeout; +#[doc(inline)] +pub use timeout::{timeout, timeout_at, Timeout}; + +#[cfg(test)] +#[cfg(not(loom))] +mod tests; + +// Re-export for convenience +#[doc(no_inline)] +pub use std::time::Duration; diff --git a/third_party/rust/tokio/src/time/tests/mod.rs b/third_party/rust/tokio/src/time/tests/mod.rs new file mode 100644 index 0000000000..35e1060aca --- /dev/null +++ b/third_party/rust/tokio/src/time/tests/mod.rs @@ -0,0 +1,22 @@ +mod test_sleep; + +use crate::time::{self, Instant}; +use std::time::Duration; + +fn assert_send<T: Send>() {} +fn assert_sync<T: Sync>() {} + +#[test] +fn registration_is_send_and_sync() { + use crate::time::Sleep; + + assert_send::<Sleep>(); + assert_sync::<Sleep>(); +} + +#[test] +#[should_panic] +fn sleep_is_eager() { + let when = Instant::now() + Duration::from_millis(100); + let _ = time::sleep_until(when); +} diff --git a/third_party/rust/tokio/src/time/tests/test_sleep.rs b/third_party/rust/tokio/src/time/tests/test_sleep.rs new file mode 100644 index 0000000000..77ca07e319 --- /dev/null +++ b/third_party/rust/tokio/src/time/tests/test_sleep.rs @@ -0,0 +1,443 @@ +//use crate::time::driver::{Driver, Entry, Handle}; + +/* +macro_rules! poll { + ($e:expr) => { + $e.enter(|cx, e| e.poll_elapsed(cx)) + }; +} + +#[test] +fn frozen_utility_returns_correct_advanced_duration() { + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); + + clock.advance(ms(10)); + assert_eq!(clock.now() - start, ms(10)); +} + +#[test] +fn immediate_sleep() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + let when = clock.now(); + let mut e = task::spawn(sleep_until(&handle, when)); + + assert_ready_ok!(poll!(e)); + + assert_ok!(driver.park_timeout(Duration::from_millis(1000))); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.now() - start, ms(1000)); +} + +#[test] +fn delayed_sleep_level_0() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + for &i in &[1, 10, 60] { + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, start + ms(i))); + + // The sleep instance has not elapsed. + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(i)); + + assert_ready_ok!(poll!(e)); + } +} + +#[test] +fn sub_ms_delayed_sleep() { + let (mut driver, clock, handle) = setup(); + + for _ in 0..5 { + let deadline = clock.now() + ms(1) + Duration::new(0, 1); + + let mut e = task::spawn(sleep_until(&handle, deadline)); + + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_ready_ok!(poll!(e)); + + assert!(clock.now() >= deadline); + + clock.advance(Duration::new(0, 1)); + } +} + +#[test] +fn delayed_sleep_wrapping_level_0() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + assert_ok!(driver.park_timeout(ms(5))); + assert_eq!(clock.now() - start, ms(5)); + + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(60))); + + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(64)); + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(65)); + + assert_ready_ok!(poll!(e)); +} + +#[test] +fn timer_wrapping_with_higher_levels() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Set sleep to hit level 1 + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(64))); + assert_pending!(poll!(e1)); + + // Turn a bit + assert_ok!(driver.park_timeout(ms(5))); + + // Set timeout such that it will hit level 0, but wrap + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(60))); + assert_pending!(poll!(e2)); + + // This should result in s1 firing + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(64)); + + assert_ready_ok!(poll!(e1)); + assert_pending!(poll!(e2)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(65)); + + assert_ready_ok!(poll!(e1)); +} + +#[test] +fn sleep_with_deadline_in_past() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create `Sleep` that elapsed immediately. + let mut e = task::spawn(sleep_until(&handle, clock.now() - ms(100))); + + // Even though the `Sleep` expires in the past, it is not ready yet + // because the timer must observe it. + assert_ready_ok!(poll!(e)); + + // Turn the timer, it runs for the elapsed time + assert_ok!(driver.park_timeout(ms(1000))); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.now() - start, ms(1000)); +} + +#[test] +fn delayed_sleep_level_1() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(192)); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(234)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer with a smaller timeout than the cascade. + assert_ok!(driver.park_timeout(ms(100))); + assert_eq!(clock.now() - start, ms(100)); + + assert_pending!(poll!(e)); + + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(192)); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(234)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn concurrently_set_two_timers_second_one_shorter() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(500))); + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); + + // The sleep has not elapsed + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); + + // Sleep until a cascade + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(192)); + + // Sleep until the second timer. + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(200)); + + // The shorter sleep fires + assert_ready_ok!(poll!(e2)); + assert_pending!(poll!(e1)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(448)); + + assert_pending!(poll!(e1)); + + // Turn again, this time the time will advance to the second sleep + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(500)); + + assert_ready_ok!(poll!(e1)); +} + +#[test] +fn short_sleep() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(clock.now() - start, ms(1)); +} + +#[test] +fn sorta_long_sleep_until() { + const MIN_5: u64 = 5 * 60 * 1000; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MIN_5))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; + + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(elapsed)); + + assert_pending!(poll!(e)); + } + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(MIN_5)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn very_long_sleep() { + const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MO_5))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + let cascades = &[ + 12_884_901_888, + 12_952_010_752, + 12_959_875_072, + 12_959_997_952, + ]; + + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(elapsed)); + + assert_pending!(poll!(e)); + } + + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(clock.now() - start, ms(MO_5)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn unpark_is_delayed() { + // A special park that will take much longer than the requested duration + struct MockPark(Clock); + + struct MockUnpark; + + impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MockUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + assert_eq!(duration, ms(0)); + self.0.advance(ms(436)); + Ok(()) + } + + fn shutdown(&mut self) {} + } + + impl Unpark for MockUnpark { + fn unpark(&self) {} + } + + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); + let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); + + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(100))); + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(101))); + let mut e3 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); + + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); + assert_pending!(poll!(e3)); + + assert_ok!(driver.park()); + + assert_eq!(clock.now() - start, ms(500)); + + assert_ready_ok!(poll!(e1)); + assert_ready_ok!(poll!(e2)); + assert_ready_ok!(poll!(e3)); +} + +#[test] +fn set_timeout_at_deadline_greater_than_max_timer() { + const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; + const YR_5: u64 = 5 * YR_1; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + for _ in 0..5 { + assert_ok!(driver.park_timeout(ms(YR_1))); + } + + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); + assert_pending!(poll!(e)); + + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(YR_5) + ms(1)); + + assert_ready_ok!(poll!(e)); +} + +fn setup() -> (Driver<MockPark>, Clock, Handle) { + let clock = Clock::new(); + clock.pause(); + let driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); + + (driver, clock, handle) +} + +fn sleep_until(handle: &Handle, when: Instant) -> Arc<Entry> { + Entry::new(&handle, when, ms(0)) +} + +struct MockPark(Clock); + +struct MockUnpark; + +impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MockUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.0.advance(duration); + Ok(()) + } + + fn shutdown(&mut self) {} +} + +impl Unpark for MockUnpark { + fn unpark(&self) {} +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} +*/ diff --git a/third_party/rust/tokio/src/time/timeout.rs b/third_party/rust/tokio/src/time/timeout.rs new file mode 100644 index 0000000000..4a93089e8e --- /dev/null +++ b/third_party/rust/tokio/src/time/timeout.rs @@ -0,0 +1,202 @@ +//! Allows a future to execute for a maximum amount of time. +//! +//! See [`Timeout`] documentation for more details. +//! +//! [`Timeout`]: struct@Timeout + +use crate::{ + coop, + time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + util::trace, +}; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +/// Requires a `Future` to complete before the specified duration has elapsed. +/// +/// If the future completes before the duration has elapsed, then the completed +/// value is returned. Otherwise, an error is returned and the future is +/// canceled. +/// +/// # Cancelation +/// +/// Cancelling a timeout is done by dropping the future. No additional cleanup +/// or other work is required. +/// +/// The original future may be obtained by calling [`Timeout::into_inner`]. This +/// consumes the `Timeout`. +/// +/// # Examples +/// +/// Create a new `Timeout` set to expire in 10 milliseconds. +/// +/// ```rust +/// use tokio::time::timeout; +/// use tokio::sync::oneshot; +/// +/// use std::time::Duration; +/// +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// if let Err(_) = timeout(Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } +/// # } +/// ``` +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +#[track_caller] +pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> +where + T: Future, +{ + let location = trace::caller_location(); + + let deadline = Instant::now().checked_add(duration); + let delay = match deadline { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::far_future(location), + }; + Timeout::new_with_delay(future, delay) +} + +/// Requires a `Future` to complete before the specified instant in time. +/// +/// If the future completes before the instant is reached, then the completed +/// value is returned. Otherwise, an error is returned. +/// +/// # Cancelation +/// +/// Cancelling a timeout is done by dropping the future. No additional cleanup +/// or other work is required. +/// +/// The original future may be obtained by calling [`Timeout::into_inner`]. This +/// consumes the `Timeout`. +/// +/// # Examples +/// +/// Create a new `Timeout` set to expire in 10 milliseconds. +/// +/// ```rust +/// use tokio::time::{Instant, timeout_at}; +/// use tokio::sync::oneshot; +/// +/// use std::time::Duration; +/// +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the +/// // future. +/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } +/// # } +/// ``` +pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T> +where + T: Future, +{ + let delay = sleep_until(deadline); + + Timeout { + value: future, + delay, + } +} + +pin_project! { + /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at). + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Timeout<T> { + #[pin] + value: T, + #[pin] + delay: Sleep, + } +} + +impl<T> Timeout<T> { + pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> { + Timeout { value, delay } + } + + /// Gets a reference to the underlying value in this timeout. + pub fn get_ref(&self) -> &T { + &self.value + } + + /// Gets a mutable reference to the underlying value in this timeout. + pub fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + /// Consumes this timeout, returning the underlying value. + pub fn into_inner(self) -> T { + self.value + } +} + +impl<T> Future for Timeout<T> +where + T: Future, +{ + type Output = Result<T::Output, Elapsed>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + let had_budget_before = coop::has_budget_remaining(); + + // First, try polling the future + if let Poll::Ready(v) = me.value.poll(cx) { + return Poll::Ready(Ok(v)); + } + + let has_budget_now = coop::has_budget_remaining(); + + let delay = me.delay; + + let poll_delay = || -> Poll<Self::Output> { + match delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), + Poll::Pending => Poll::Pending, + } + }; + + if let (true, false) = (had_budget_before, has_budget_now) { + // if it is the underlying future that exhausted the budget, we poll + // the `delay` with an unconstrained one. This prevents pathological + // cases where the underlying future always exhausts the budget and + // we never get a chance to evaluate whether the timeout was hit or + // not. + coop::with_unconstrained(poll_delay) + } else { + poll_delay() + } + } +} |