diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/time | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/time')
-rw-r--r-- | vendor/tokio/src/time/clock.rs | 186 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/entry.rs | 629 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/handle.rs | 88 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/mod.rs | 520 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/sleep.rs | 257 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/tests/mod.rs | 287 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/wheel/level.rs | 275 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/wheel/mod.rs | 359 | ||||
-rw-r--r-- | vendor/tokio/src/time/driver/wheel/stack.rs | 112 | ||||
-rw-r--r-- | vendor/tokio/src/time/error.rs | 9 | ||||
-rw-r--r-- | vendor/tokio/src/time/instant.rs | 24 | ||||
-rw-r--r-- | vendor/tokio/src/time/interval.rs | 117 | ||||
-rw-r--r-- | vendor/tokio/src/time/mod.rs | 13 | ||||
-rw-r--r-- | vendor/tokio/src/time/sleep.rs | 452 | ||||
-rw-r--r-- | vendor/tokio/src/time/tests/mod.rs | 22 | ||||
-rw-r--r-- | vendor/tokio/src/time/tests/test_sleep.rs | 443 | ||||
-rw-r--r-- | vendor/tokio/src/time/timeout.rs | 99 |
17 files changed, 803 insertions, 3089 deletions
diff --git a/vendor/tokio/src/time/clock.rs b/vendor/tokio/src/time/clock.rs index a44d75f3c..091cf4b19 100644 --- a/vendor/tokio/src/time/clock.rs +++ b/vendor/tokio/src/time/clock.rs @@ -29,26 +29,53 @@ cfg_not_test_util! { cfg_test_util! { use crate::time::{Duration, Instant}; - use crate::loom::sync::{Arc, Mutex}; + use crate::loom::sync::Mutex; + use crate::loom::sync::atomic::Ordering; + use std::sync::atomic::AtomicBool as StdAtomicBool; cfg_rt! { - fn clock() -> Option<Clock> { - crate::runtime::context::clock() + #[track_caller] + fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R { + use crate::runtime::Handle; + + let res = match Handle::try_current() { + Ok(handle) => f(Some(handle.inner.driver().clock())), + Err(ref e) if e.is_missing_context() => f(None), + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + }; + + match res { + Ok(ret) => ret, + Err(msg) => panic!("{}", msg), + } } } cfg_not_rt! { - fn clock() -> Option<Clock> { - None + #[track_caller] + fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R { + match f(None) { + Ok(ret) => ret, + Err(msg) => panic!("{}", msg), + } } } /// A handle to a source of time. - #[derive(Debug, Clone)] + #[derive(Debug)] pub(crate) struct Clock { - inner: Arc<Mutex<Inner>>, + inner: Mutex<Inner>, } + // Used to track if the clock was ever paused. This is an optimization to + // avoid touching the mutex if `test-util` was accidentally enabled in + // release mode. + // + // A static is used so we can avoid accessing the thread-local as well. The + // `std` AtomicBool is used directly because loom does not support static + // atomics. + static DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false); + #[derive(Debug)] struct Inner { /// True if the ability to pause time is enabled. @@ -57,22 +84,34 @@ cfg_test_util! { /// Instant to use as the clock's base instant. base: std::time::Instant, - /// Instant at which the clock was last unfrozen + /// Instant at which the clock was last unfrozen. unfrozen: Option<std::time::Instant>, + + /// Number of `inhibit_auto_advance` calls still in effect. + auto_advance_inhibit_count: usize, } - /// Pause time + /// Pauses time. /// /// The current value of `Instant::now()` is saved and all subsequent calls - /// to `Instant::now()` until the timer wheel is checked again will return - /// the saved value. Once the timer wheel is checked, time will immediately - /// advance to the next registered `Sleep`. This is useful for running tests - /// that depend on time. + /// to `Instant::now()` will return the saved value. The saved value can be + /// changed by [`advance`] or by the time auto-advancing once the runtime + /// has no work to do. This only affects the `Instant` type in Tokio, and + /// the `Instant` in std continues to work as normal. /// /// Pausing time requires the `current_thread` Tokio runtime. This is the /// default runtime used by `#[tokio::test]`. The runtime can be initialized /// with time in a paused state using the `Builder::start_paused` method. /// + /// For cases where time is immediately paused, it is better to pause + /// the time using the `main` or `test` macro: + /// ``` + /// #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// println!("Hello world"); + /// } + /// ``` + /// /// # Panics /// /// Panics if time is already frozen or if called from outside of a @@ -86,12 +125,18 @@ cfg_test_util! { /// current time when awaited. /// /// [`Sleep`]: crate::time::Sleep + /// [`advance`]: crate::time::advance + #[track_caller] pub fn pause() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.pause(); + with_clock(|maybe_clock| { + match maybe_clock { + Some(clock) => clock.pause(), + None => Err("time cannot be frozen from outside the Tokio runtime"), + } + }) } - /// Resume time + /// Resumes time. /// /// Clears the saved `Instant::now()` value. Subsequent calls to /// `Instant::now()` will return the value returned by the system call. @@ -100,22 +145,44 @@ cfg_test_util! { /// /// Panics if time is not frozen or if called from outside of the Tokio /// runtime. + #[track_caller] pub fn resume() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut inner = clock.inner.lock(); + with_clock(|maybe_clock| { + let clock = match maybe_clock { + Some(clock) => clock, + None => return Err("time cannot be frozen from outside the Tokio runtime"), + }; - if inner.unfrozen.is_some() { - panic!("time is not frozen"); - } + let mut inner = clock.inner.lock(); + + if inner.unfrozen.is_some() { + return Err("time is not frozen"); + } - inner.unfrozen = Some(std::time::Instant::now()); + inner.unfrozen = Some(std::time::Instant::now()); + Ok(()) + }) } - /// Advance time + /// Advances time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent /// calls to `Instant::now()` will return the result of the increment. /// + /// This function will make the current time jump forward by the given + /// duration in one jump. This means that all `sleep` calls with a deadline + /// before the new time will immediately complete "at the same time", and + /// the runtime is free to poll them in any order. Additionally, this + /// method will not wait for the `sleep` calls it advanced past to complete. + /// If you want to do that, you should instead call [`sleep`] and rely on + /// the runtime's auto-advance feature. + /// + /// Note that calls to `sleep` are not guaranteed to complete the first time + /// they are polled after a call to `advance`. For example, this can happen + /// if the runtime has not yet touched the timer driver after the call to + /// `advance`. However if they don't, the runtime will poll the task again + /// shortly. + /// /// # Panics /// /// Panics if time is not frozen or if called from outside of the Tokio @@ -126,70 +193,107 @@ cfg_test_util! { /// If the time is paused and there is no work to do, the runtime advances /// time to the next timer. See [`pause`](pause#auto-advance) for more /// details. + /// + /// [`sleep`]: fn@crate::time::sleep pub async fn advance(duration: Duration) { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.advance(duration); + with_clock(|maybe_clock| { + let clock = match maybe_clock { + Some(clock) => clock, + None => return Err("time cannot be frozen from outside the Tokio runtime"), + }; + + clock.advance(duration) + }); crate::task::yield_now().await; } - /// Return the current instant, factoring in frozen time. + /// Returns the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { - if let Some(clock) = clock() { - clock.now() - } else { - Instant::from_std(std::time::Instant::now()) + if !DID_PAUSE_CLOCK.load(Ordering::Acquire) { + return Instant::from_std(std::time::Instant::now()); } + + with_clock(|maybe_clock| { + Ok(if let Some(clock) = maybe_clock { + clock.now() + } else { + Instant::from_std(std::time::Instant::now()) + }) + }) } impl Clock { - /// Return a new `Clock` instance that uses the current execution context's + /// Returns a new `Clock` instance that uses the current execution context's /// source of time. pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { let now = std::time::Instant::now(); let clock = Clock { - inner: Arc::new(Mutex::new(Inner { + inner: Mutex::new(Inner { enable_pausing, base: now, unfrozen: Some(now), - })), + auto_advance_inhibit_count: 0, + }), }; if start_paused { - clock.pause(); + if let Err(msg) = clock.pause() { + panic!("{}", msg); + } } clock } - pub(crate) fn pause(&self) { + pub(crate) fn pause(&self) -> Result<(), &'static str> { let mut inner = self.inner.lock(); if !inner.enable_pausing { drop(inner); // avoid poisoning the lock - panic!("`time::pause()` requires the `current_thread` Tokio runtime. \ + return Err("`time::pause()` requires the `current_thread` Tokio runtime. \ This is the default Runtime used by `#[tokio::test]."); } - let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed(); + // Track that we paused the clock + DID_PAUSE_CLOCK.store(true, Ordering::Release); + + let elapsed = match inner.unfrozen.as_ref() { + Some(v) => v.elapsed(), + None => return Err("time is already frozen") + }; inner.base += elapsed; inner.unfrozen = None; + + Ok(()) + } + + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count += 1; + } + + pub(crate) fn allow_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count -= 1; } - pub(crate) fn is_paused(&self) -> bool { + pub(crate) fn can_auto_advance(&self) -> bool { let inner = self.inner.lock(); - inner.unfrozen.is_none() + inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0 } - pub(crate) fn advance(&self, duration: Duration) { + pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> { let mut inner = self.inner.lock(); if inner.unfrozen.is_some() { - panic!("time is not frozen"); + return Err("time is not frozen"); } inner.base += duration; + Ok(()) } pub(crate) fn now(&self) -> Instant { diff --git a/vendor/tokio/src/time/driver/entry.rs b/vendor/tokio/src/time/driver/entry.rs deleted file mode 100644 index 168e0b995..000000000 --- a/vendor/tokio/src/time/driver/entry.rs +++ /dev/null @@ -1,629 +0,0 @@ -//! Timer state structures. -//! -//! This module contains the heart of the intrusive timer implementation, and as -//! such the structures inside are full of tricky concurrency and unsafe code. -//! -//! # Ground rules -//! -//! The heart of the timer implementation here is the `TimerShared` structure, -//! shared between the `TimerEntry` and the driver. Generally, we permit access -//! to `TimerShared` ONLY via either 1) a mutable reference to `TimerEntry` or -//! 2) a held driver lock. -//! -//! It follows from this that any changes made while holding BOTH 1 and 2 will -//! be reliably visible, regardless of ordering. This is because of the acq/rel -//! fences on the driver lock ensuring ordering with 2, and rust mutable -//! reference rules for 1 (a mutable reference to an object can't be passed -//! between threads without an acq/rel barrier, and same-thread we have local -//! happens-before ordering). -//! -//! # State field -//! -//! Each timer has a state field associated with it. This field contains either -//! the current scheduled time, or a special flag value indicating its state. -//! This state can either indicate that the timer is on the 'pending' queue (and -//! thus will be fired with an `Ok(())` result soon) or that it has already been -//! fired/deregistered. -//! -//! This single state field allows for code that is firing the timer to -//! synchronize with any racing `reset` calls reliably. -//! -//! # Cached vs true timeouts -//! -//! To allow for the use case of a timeout that is periodically reset before -//! expiration to be as lightweight as possible, we support optimistically -//! lock-free timer resets, in the case where a timer is rescheduled to a later -//! point than it was originally scheduled for. -//! -//! This is accomplished by lazily rescheduling timers. That is, we update the -//! state field field with the true expiration of the timer from the holder of -//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's -//! walking lists of timers), it checks this "true when" value, and reschedules -//! based on it. -//! -//! We do, however, also need to track what the expiration time was when we -//! originally registered the timer; this is used to locate the right linked -//! list when the timer is being cancelled. This is referred to as the "cached -//! when" internally. -//! -//! There is of course a race condition between timer reset and timer -//! expiration. If the driver fails to observe the updated expiration time, it -//! could trigger expiration of the timer too early. However, because -//! `mark_pending` performs a compare-and-swap, it will identify this race and -//! refuse to mark the timer as pending. - -use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::AtomicU64; -use crate::loom::sync::atomic::Ordering; - -use crate::sync::AtomicWaker; -use crate::time::Instant; -use crate::util::linked_list; - -use super::Handle; - -use std::cell::UnsafeCell as StdUnsafeCell; -use std::task::{Context, Poll, Waker}; -use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; - -type TimerResult = Result<(), crate::time::error::Error>; - -const STATE_DEREGISTERED: u64 = u64::MAX; -const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; -const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; - -/// This structure holds the current shared state of the timer - its scheduled -/// time (if registered), or otherwise the result of the timer completing, as -/// well as the registered waker. -/// -/// Generally, the StateCell is only permitted to be accessed from two contexts: -/// Either a thread holding the corresponding &mut TimerEntry, or a thread -/// holding the timer driver lock. The write actions on the StateCell amount to -/// passing "ownership" of the StateCell between these contexts; moving a timer -/// from the TimerEntry to the driver requires _both_ holding the &mut -/// TimerEntry and the driver lock, while moving it back (firing the timer) -/// requires only the driver lock. -pub(super) struct StateCell { - /// Holds either the scheduled expiration time for this timer, or (if the - /// timer has been fired and is unregistered), `u64::MAX`. - state: AtomicU64, - /// If the timer is fired (an Acquire order read on state shows - /// `u64::MAX`), holds the result that should be returned from - /// polling the timer. Otherwise, the contents are unspecified and reading - /// without holding the driver lock is undefined behavior. - result: UnsafeCell<TimerResult>, - /// The currently-registered waker - waker: CachePadded<AtomicWaker>, -} - -impl Default for StateCell { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for StateCell { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "StateCell({:?})", self.read_state()) - } -} - -impl StateCell { - fn new() -> Self { - Self { - state: AtomicU64::new(STATE_DEREGISTERED), - result: UnsafeCell::new(Ok(())), - waker: CachePadded(AtomicWaker::new()), - } - } - - fn is_pending(&self) -> bool { - self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE - } - - /// Returns the current expiration time, or None if not currently scheduled. - fn when(&self) -> Option<u64> { - let cur_state = self.state.load(Ordering::Relaxed); - - if cur_state == u64::MAX { - None - } else { - Some(cur_state) - } - } - - /// If the timer is completed, returns the result of the timer. Otherwise, - /// returns None and registers the waker. - fn poll(&self, waker: &Waker) -> Poll<TimerResult> { - // We must register first. This ensures that either `fire` will - // observe the new waker, or we will observe a racing fire to have set - // the state, or both. - self.waker.0.register_by_ref(waker); - - self.read_state() - } - - fn read_state(&self) -> Poll<TimerResult> { - let cur_state = self.state.load(Ordering::Acquire); - - if cur_state == STATE_DEREGISTERED { - // SAFETY: The driver has fired this timer; this involves writing - // the result, and then writing (with release ordering) the state - // field. - Poll::Ready(unsafe { self.result.with(|p| *p) }) - } else { - Poll::Pending - } - } - - /// Marks this timer as being moved to the pending list, if its scheduled - /// time is not after `not_after`. - /// - /// If the timer is scheduled for a time after not_after, returns an Err - /// containing the current scheduled time. - /// - /// SAFETY: Must hold the driver lock. - unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { - // Quick initial debug check to see if the timer is already fired. Since - // firing the timer can only happen with the driver lock held, we know - // we shouldn't be able to "miss" a transition to a fired state, even - // with relaxed ordering. - let mut cur_state = self.state.load(Ordering::Relaxed); - - loop { - debug_assert!(cur_state < STATE_MIN_VALUE); - - if cur_state > not_after { - break Err(cur_state); - } - - match self.state.compare_exchange( - cur_state, - STATE_PENDING_FIRE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - break Ok(()); - } - Err(actual_state) => { - cur_state = actual_state; - } - } - } - } - - /// Fires the timer, setting the result to the provided result. - /// - /// Returns: - /// * `Some(waker) - if fired and a waker needs to be invoked once the - /// driver lock is released - /// * `None` - if fired and a waker does not need to be invoked, or if - /// already fired - /// - /// SAFETY: The driver lock must be held. - unsafe fn fire(&self, result: TimerResult) -> Option<Waker> { - // Quick initial check to see if the timer is already fired. Since - // firing the timer can only happen with the driver lock held, we know - // we shouldn't be able to "miss" a transition to a fired state, even - // with relaxed ordering. - let cur_state = self.state.load(Ordering::Relaxed); - if cur_state == STATE_DEREGISTERED { - return None; - } - - // SAFETY: We assume the driver lock is held and the timer is not - // fired, so only the driver is accessing this field. - // - // We perform a release-ordered store to state below, to ensure this - // write is visible before the state update is visible. - unsafe { self.result.with_mut(|p| *p = result) }; - - self.state.store(STATE_DEREGISTERED, Ordering::Release); - - self.waker.0.take_waker() - } - - /// Marks the timer as registered (poll will return None) and sets the - /// expiration time. - /// - /// While this function is memory-safe, it should only be called from a - /// context holding both `&mut TimerEntry` and the driver lock. - fn set_expiration(&self, timestamp: u64) { - debug_assert!(timestamp < STATE_MIN_VALUE); - - // We can use relaxed ordering because we hold the driver lock and will - // fence when we release the lock. - self.state.store(timestamp, Ordering::Relaxed); - } - - /// Attempts to adjust the timer to a new timestamp. - /// - /// If the timer has already been fired, is pending firing, or the new - /// timestamp is earlier than the old timestamp, (or occasionally - /// spuriously) returns Err without changing the timer's state. In this - /// case, the timer must be deregistered and re-registered. - fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { - let mut prior = self.state.load(Ordering::Relaxed); - loop { - if new_timestamp < prior || prior >= STATE_MIN_VALUE { - return Err(()); - } - - match self.state.compare_exchange_weak( - prior, - new_timestamp, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - return Ok(()); - } - Err(true_prior) => { - prior = true_prior; - } - } - } - } - - /// Returns true if the state of this timer indicates that the timer might - /// be registered with the driver. This check is performed with relaxed - /// ordering, but is conservative - if it returns false, the timer is - /// definitely _not_ registered. - pub(super) fn might_be_registered(&self) -> bool { - self.state.load(Ordering::Relaxed) != u64::MAX - } -} - -/// A timer entry. -/// -/// This is the handle to a timer that is controlled by the requester of the -/// timer. As this participates in intrusive data structures, it must be pinned -/// before polling. -#[derive(Debug)] -pub(super) struct TimerEntry { - /// Arc reference to the driver. We can only free the driver after - /// deregistering everything from their respective timer wheels. - driver: Handle, - /// Shared inner structure; this is part of an intrusive linked list, and - /// therefore other references can exist to it while mutable references to - /// Entry exist. - /// - /// This is manipulated only under the inner mutex. TODO: Can we use loom - /// cells for this? - inner: StdUnsafeCell<TimerShared>, - /// Initial deadline for the timer. This is used to register on the first - /// poll, as we can't register prior to being pinned. - initial_deadline: Option<Instant>, - /// Ensure the type is !Unpin - _m: std::marker::PhantomPinned, -} - -unsafe impl Send for TimerEntry {} -unsafe impl Sync for TimerEntry {} - -/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the -/// timer entry. Generally, at most one TimerHandle exists for a timer at a time -/// (enforced by the timer state machine). -/// -/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats -/// of pointer safety apply. In particular, TimerHandle does not itself enforce -/// that the timer does still exist; however, normally an TimerHandle is created -/// immediately before registering the timer, and is consumed when firing the -/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce -/// memory safety, all operations are unsafe. -#[derive(Debug)] -pub(crate) struct TimerHandle { - inner: NonNull<TimerShared>, -} - -pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>; - -/// The shared state structure of a timer. This structure is shared between the -/// frontend (`Entry`) and driver backend. -/// -/// Note that this structure is located inside the `TimerEntry` structure. -#[derive(Debug)] -pub(crate) struct TimerShared { - /// Current state. This records whether the timer entry is currently under - /// the ownership of the driver, and if not, its current state (not - /// complete, fired, error, etc). - state: StateCell, - - /// Data manipulated by the driver thread itself, only. - driver_state: CachePadded<TimerSharedPadded>, - - _p: PhantomPinned, -} - -impl TimerShared { - pub(super) fn new() -> Self { - Self { - state: StateCell::default(), - driver_state: CachePadded(TimerSharedPadded::new()), - _p: PhantomPinned, - } - } - - /// Gets the cached time-of-expiration value - pub(super) fn cached_when(&self) -> u64 { - // Cached-when is only accessed under the driver lock, so we can use relaxed - self.driver_state.0.cached_when.load(Ordering::Relaxed) - } - - /// Gets the true time-of-expiration value, and copies it into the cached - /// time-of-expiration value. - /// - /// SAFETY: Must be called with the driver lock held, and when this entry is - /// not in any timer wheel lists. - pub(super) unsafe fn sync_when(&self) -> u64 { - let true_when = self.true_when(); - - self.driver_state - .0 - .cached_when - .store(true_when, Ordering::Relaxed); - - true_when - } - - /// Sets the cached time-of-expiration value. - /// - /// SAFETY: Must be called with the driver lock held, and when this entry is - /// not in any timer wheel lists. - unsafe fn set_cached_when(&self, when: u64) { - self.driver_state - .0 - .cached_when - .store(when, Ordering::Relaxed); - } - - /// Returns the true time-of-expiration value, with relaxed memory ordering. - pub(super) fn true_when(&self) -> u64 { - self.state.when().expect("Timer already fired") - } - - /// Sets the true time-of-expiration value, even if it is less than the - /// current expiration or the timer is deregistered. - /// - /// SAFETY: Must only be called with the driver lock held and the entry not - /// in the timer wheel. - pub(super) unsafe fn set_expiration(&self, t: u64) { - self.state.set_expiration(t); - self.driver_state.0.cached_when.store(t, Ordering::Relaxed); - } - - /// Sets the true time-of-expiration only if it is after the current. - pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { - self.state.extend_expiration(t) - } - - /// Returns a TimerHandle for this timer. - pub(super) fn handle(&self) -> TimerHandle { - TimerHandle { - inner: NonNull::from(self), - } - } - - /// Returns true if the state of this timer indicates that the timer might - /// be registered with the driver. This check is performed with relaxed - /// ordering, but is conservative - if it returns false, the timer is - /// definitely _not_ registered. - pub(super) fn might_be_registered(&self) -> bool { - self.state.might_be_registered() - } -} - -/// Additional shared state between the driver and the timer which is cache -/// padded. This contains the information that the driver thread accesses most -/// frequently to minimize contention. In particular, we move it away from the -/// waker, as the waker is updated on every poll. -struct TimerSharedPadded { - /// The expiration time for which this entry is currently registered. - /// Generally owned by the driver, but is accessed by the entry when not - /// registered. - cached_when: AtomicU64, - - /// The true expiration time. Set by the timer future, read by the driver. - true_when: AtomicU64, - - /// A link within the doubly-linked list of timers on a particular level and - /// slot. Valid only if state is equal to Registered. - /// - /// Only accessed under the entry lock. - pointers: StdUnsafeCell<linked_list::Pointers<TimerShared>>, -} - -impl std::fmt::Debug for TimerSharedPadded { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TimerSharedPadded") - .field("when", &self.true_when.load(Ordering::Relaxed)) - .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) - .finish() - } -} - -impl TimerSharedPadded { - fn new() -> Self { - Self { - cached_when: AtomicU64::new(0), - true_when: AtomicU64::new(0), - pointers: StdUnsafeCell::new(linked_list::Pointers::new()), - } - } -} - -unsafe impl Send for TimerShared {} -unsafe impl Sync for TimerShared {} - -unsafe impl linked_list::Link for TimerShared { - type Handle = TimerHandle; - - type Target = TimerShared; - - fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> { - handle.inner - } - - unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle { - TimerHandle { inner: ptr } - } - - unsafe fn pointers( - target: NonNull<Self::Target>, - ) -> NonNull<linked_list::Pointers<Self::Target>> { - unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() } - } -} - -// ===== impl Entry ===== - -impl TimerEntry { - pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { - let driver = handle.clone(); - - Self { - driver, - inner: StdUnsafeCell::new(TimerShared::new()), - initial_deadline: Some(deadline), - _m: std::marker::PhantomPinned, - } - } - - fn inner(&self) -> &TimerShared { - unsafe { &*self.inner.get() } - } - - pub(crate) fn is_elapsed(&self) -> bool { - !self.inner().state.might_be_registered() && self.initial_deadline.is_none() - } - - /// Cancels and deregisters the timer. This operation is irreversible. - pub(crate) fn cancel(self: Pin<&mut Self>) { - // We need to perform an acq/rel fence with the driver thread, and the - // simplest way to do so is to grab the driver lock. - // - // Why is this necessary? We're about to release this timer's memory for - // some other non-timer use. However, we've been doing a bunch of - // relaxed (or even non-atomic) writes from the driver thread, and we'll - // be doing more from _this thread_ (as this memory is interpreted as - // something else). - // - // It is critical to ensure that, from the point of view of the driver, - // those future non-timer writes happen-after the timer is fully fired, - // and from the purpose of this thread, the driver's writes all - // happen-before we drop the timer. This in turn requires us to perform - // an acquire-release barrier in _both_ directions between the driver - // and dropping thread. - // - // The lock acquisition in clear_entry serves this purpose. All of the - // driver manipulations happen with the lock held, so we can just take - // the lock and be sure that this drop happens-after everything the - // driver did so far and happens-before everything the driver does in - // the future. While we have the lock held, we also go ahead and - // deregister the entry if necessary. - unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; - } - - pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { - unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; - - let tick = self.driver.time_source().deadline_to_tick(new_time); - - if self.inner().extend_expiration(tick).is_ok() { - return; - } - - unsafe { - self.driver.reregister(tick, self.inner().into()); - } - } - - pub(crate) fn poll_elapsed( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), super::Error>> { - if self.driver.is_shutdown() { - panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); - } - - if let Some(deadline) = self.initial_deadline { - self.as_mut().reset(deadline); - } - - let this = unsafe { self.get_unchecked_mut() }; - - this.inner().state.poll(cx.waker()) - } -} - -impl TimerHandle { - pub(super) unsafe fn cached_when(&self) -> u64 { - unsafe { self.inner.as_ref().cached_when() } - } - - pub(super) unsafe fn sync_when(&self) -> u64 { - unsafe { self.inner.as_ref().sync_when() } - } - - pub(super) unsafe fn is_pending(&self) -> bool { - unsafe { self.inner.as_ref().state.is_pending() } - } - - /// Forcibly sets the true and cached expiration times to the given tick. - /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. - pub(super) unsafe fn set_expiration(&self, tick: u64) { - self.inner.as_ref().set_expiration(tick); - } - - /// Attempts to mark this entry as pending. If the expiration time is after - /// `not_after`, however, returns an Err with the current expiration time. - /// - /// If an `Err` is returned, the `cached_when` value will be updated to this - /// new expiration time. - /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. - /// After returning Ok, the entry must be added to the pending list. - pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { - match self.inner.as_ref().state.mark_pending(not_after) { - Ok(()) => { - // mark this as being on the pending queue in cached_when - self.inner.as_ref().set_cached_when(u64::MAX); - Ok(()) - } - Err(tick) => { - self.inner.as_ref().set_cached_when(tick); - Err(tick) - } - } - } - - /// Attempts to transition to a terminal state. If the state is already a - /// terminal state, does nothing. - /// - /// Because the entry might be dropped after the state is moved to a - /// terminal state, this function consumes the handle to ensure we don't - /// access the entry afterwards. - /// - /// Returns the last-registered waker, if any. - /// - /// SAFETY: The driver lock must be held while invoking this function, and - /// the entry must not be in any wheel linked lists. - pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> { - self.inner.as_ref().state.fire(completed_state) - } -} - -impl Drop for TimerEntry { - fn drop(&mut self) { - unsafe { Pin::new_unchecked(self) }.as_mut().cancel() - } -} - -#[cfg_attr(target_arch = "x86_64", repr(align(128)))] -#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] -#[derive(Debug, Default)] -struct CachePadded<T>(T); diff --git a/vendor/tokio/src/time/driver/handle.rs b/vendor/tokio/src/time/driver/handle.rs deleted file mode 100644 index 77b435873..000000000 --- a/vendor/tokio/src/time/driver/handle.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::loom::sync::Arc; -use crate::time::driver::ClockTime; -use std::fmt; - -/// Handle to time driver instance. -#[derive(Clone)] -pub(crate) struct Handle { - time_source: ClockTime, - inner: Arc<super::Inner>, -} - -impl Handle { - /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(super) fn new(inner: Arc<super::Inner>) -> Self { - let time_source = inner.state.lock().time_source.clone(); - Handle { time_source, inner } - } - - /// Returns the time source associated with this handle - pub(super) fn time_source(&self) -> &ClockTime { - &self.time_source - } - - /// Access the driver's inner structure - pub(super) fn get(&self) -> &super::Inner { - &*self.inner - } - - // Check whether the driver has been shutdown - pub(super) fn is_shutdown(&self) -> bool { - self.inner.is_shutdown() - } -} - -cfg_rt! { - impl Handle { - /// Tries to get a handle to the current timer. - /// - /// # Panics - /// - /// This function panics if there is no current timer set. - /// - /// It can be triggered when `Builder::enable_time()` or - /// `Builder::enable_all()` are not included in the builder. - /// - /// It can also panic whenever a timer is created outside of a - /// Tokio runtime. That is why `rt.block_on(delay_for(...))` will panic, - /// since the function is executed outside of the runtime. - /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't panic. - /// And this is because wrapping the function on an async makes it lazy, - /// and so gets executed inside the runtime successfully without - /// panicking. - pub(crate) fn current() -> Self { - crate::runtime::context::time_handle() - .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") - } - } -} - -cfg_not_rt! { - impl Handle { - /// Tries to get a handle to the current timer. - /// - /// # Panics - /// - /// This function panics if there is no current timer set. - /// - /// It can be triggered when `Builder::enable_time()` or - /// `Builder::enable_all()` are not included in the builder. - /// - /// It can also panic whenever a timer is created outside of a Tokio - /// runtime. That is why `rt.block_on(delay_for(...))` will panic, - /// since the function is executed outside of the runtime. - /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't - /// panic. And this is because wrapping the function on an async makes it - /// lazy, and so outside executed inside the runtime successfully without - /// panicking. - pub(crate) fn current() -> Self { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) - } - } -} - -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} diff --git a/vendor/tokio/src/time/driver/mod.rs b/vendor/tokio/src/time/driver/mod.rs deleted file mode 100644 index 37d2231c3..000000000 --- a/vendor/tokio/src/time/driver/mod.rs +++ /dev/null @@ -1,520 +0,0 @@ -// Currently, rust warns when an unsafe fn contains an unsafe {} block. However, -// in the future, this will change to the reverse. For now, suppress this -// warning and generally stick with being explicit about unsafety. -#![allow(unused_unsafe)] -#![cfg_attr(not(feature = "rt"), allow(dead_code))] - -//! Time driver - -mod entry; -pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; - -mod handle; -pub(crate) use self::handle::Handle; - -mod wheel; - -pub(super) mod sleep; - -use crate::loom::sync::atomic::{AtomicBool, Ordering}; -use crate::loom::sync::{Arc, Mutex}; -use crate::park::{Park, Unpark}; -use crate::time::error::Error; -use crate::time::{Clock, Duration, Instant}; - -use std::convert::TryInto; -use std::fmt; -use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; - -/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. -/// -/// A `Driver` instance tracks the state necessary for managing time and -/// notifying the [`Sleep`][sleep] instances once their deadlines are reached. -/// -/// It is expected that a single instance manages many individual [`Sleep`][sleep] -/// instances. The `Driver` implementation is thread-safe and, as such, is able -/// to handle callers from across threads. -/// -/// After creating the `Driver` instance, the caller must repeatedly call `park` -/// or `park_timeout`. The time driver will perform no work unless `park` or -/// `park_timeout` is called repeatedly. -/// -/// The driver has a resolution of one millisecond. Any unit of time that falls -/// between milliseconds are rounded up to the next millisecond. -/// -/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not -/// elapsed will be notified with an error. At this point, calling `poll` on the -/// [`Sleep`][sleep] instance will result in panic. -/// -/// # Implementation -/// -/// The time driver is based on the [paper by Varghese and Lauck][paper]. -/// -/// A hashed timing wheel is a vector of slots, where each slot handles a time -/// slice. As time progresses, the timer walks over the slot for the current -/// instant, and processes each entry for that slot. When the timer reaches the -/// end of the wheel, it starts again at the beginning. -/// -/// The implementation maintains six wheels arranged in a set of levels. As the -/// levels go up, the slots of the associated wheel represent larger intervals -/// of time. At each level, the wheel has 64 slots. Each slot covers a range of -/// time equal to the wheel at the lower level. At level zero, each slot -/// represents one millisecond of time. -/// -/// The wheels are: -/// -/// * Level 0: 64 x 1 millisecond slots. -/// * Level 1: 64 x 64 millisecond slots. -/// * Level 2: 64 x ~4 second slots. -/// * Level 3: 64 x ~4 minute slots. -/// * Level 4: 64 x ~4 hour slots. -/// * Level 5: 64 x ~12 day slots. -/// -/// When the timer processes entries at level zero, it will notify all the -/// `Sleep` instances as their deadlines have been reached. For all higher -/// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will -/// either be canceled (dropped) or their associated entries will reach level -/// zero and be notified. -/// -/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf -/// [sleep]: crate::time::Sleep -/// [timeout]: crate::time::Timeout -/// [interval]: crate::time::Interval -#[derive(Debug)] -pub(crate) struct Driver<P: Park + 'static> { - /// Timing backend in use - time_source: ClockTime, - - /// Shared state - handle: Handle, - - /// Parker to delegate to - park: P, - - // When `true`, a call to `park_timeout` should immediately return and time - // should not advance. One reason for this to be `true` is if the task - // passed to `Runtime::block_on` called `task::yield_now()`. - // - // While it may look racy, it only has any effect when the clock is paused - // and pausing the clock is restricted to a single-threaded runtime. - #[cfg(feature = "test-util")] - did_wake: Arc<AtomicBool>, -} - -/// A structure which handles conversion from Instants to u64 timestamps. -#[derive(Debug, Clone)] -pub(self) struct ClockTime { - clock: super::clock::Clock, - start_time: Instant, -} - -impl ClockTime { - pub(self) fn new(clock: Clock) -> Self { - Self { - start_time: clock.now(), - clock, - } - } - - pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { - // Round up to the end of a ms - self.instant_to_tick(t + Duration::from_nanos(999_999)) - } - - pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { - // round up - let dur: Duration = t - .checked_duration_since(self.start_time) - .unwrap_or_else(|| Duration::from_secs(0)); - let ms = dur.as_millis(); - - ms.try_into().expect("Duration too far into the future") - } - - pub(self) fn tick_to_duration(&self, t: u64) -> Duration { - Duration::from_millis(t) - } - - pub(self) fn now(&self) -> u64 { - self.instant_to_tick(self.clock.now()) - } -} - -/// Timer state shared between `Driver`, `Handle`, and `Registration`. -struct Inner { - // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex - pub(super) state: Mutex<InnerState>, - - /// True if the driver is being shutdown - pub(super) is_shutdown: AtomicBool, -} - -/// Time state shared which must be protected by a `Mutex` -struct InnerState { - /// Timing backend in use - time_source: ClockTime, - - /// The last published timer `elapsed` value. - elapsed: u64, - - /// The earliest time at which we promise to wake up without unparking - next_wake: Option<NonZeroU64>, - - /// Timer wheel - wheel: wheel::Wheel, - - /// Unparker that can be used to wake the time driver - unpark: Box<dyn Unpark>, -} - -// ===== impl Driver ===== - -impl<P> Driver<P> -where - P: Park + 'static, -{ - /// Creates a new `Driver` instance that uses `park` to block the current - /// thread and `time_source` to get the current time and convert to ticks. - /// - /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { - let time_source = ClockTime::new(clock); - - let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); - - Driver { - time_source, - handle: Handle::new(Arc::new(inner)), - park, - #[cfg(feature = "test-util")] - did_wake: Arc::new(AtomicBool::new(false)), - } - } - - /// Returns a handle to the timer. - /// - /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances - /// can either be created directly or the `Handle` instance can be passed to - /// `with_default`, setting the timer as the default timer for the execution - /// context. - pub(crate) fn handle(&self) -> Handle { - self.handle.clone() - } - - fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { - let mut lock = self.handle.get().state.lock(); - - assert!(!self.handle.is_shutdown()); - - let next_wake = lock.wheel.next_expiration_time(); - lock.next_wake = - next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - - drop(lock); - - match next_wake { - Some(when) => { - let now = self.time_source.now(); - // Note that we effectively round up to 1ms here - this avoids - // very short-duration microsecond-resolution sleeps that the OS - // might treat as zero-length. - let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); - - if duration > Duration::from_millis(0) { - if let Some(limit) = limit { - duration = std::cmp::min(limit, duration); - } - - self.park_timeout(duration)?; - } else { - self.park.park_timeout(Duration::from_secs(0))?; - } - } - None => { - if let Some(duration) = limit { - self.park_timeout(duration)?; - } else { - self.park.park()?; - } - } - } - - // Process pending timers after waking up - self.handle.process(); - - Ok(()) - } - - cfg_test_util! { - fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { - let clock = &self.time_source.clock; - - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - // If the time driver was woken, then the park completed - // before the "duration" elapsed (usually caused by a - // yield in `Runtime::block_on`). In this case, we don't - // advance the clock. - if !self.did_wake() { - // Simulate advancing time - clock.advance(duration); - } - } else { - self.park.park_timeout(duration)?; - } - - Ok(()) - } - - fn did_wake(&self) -> bool { - self.did_wake.swap(false, Ordering::SeqCst) - } - } - - cfg_not_test_util! { - fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { - self.park.park_timeout(duration) - } - } -} - -impl Handle { - /// Runs timer related logic, and returns the next wakeup time - pub(self) fn process(&self) { - let now = self.time_source().now(); - - self.process_at_time(now) - } - - pub(self) fn process_at_time(&self, now: u64) { - let mut waker_list: [Option<Waker>; 32] = Default::default(); - let mut waker_idx = 0; - - let mut lock = self.get().lock(); - - assert!(now >= lock.elapsed); - - while let Some(entry) = lock.wheel.poll(now) { - debug_assert!(unsafe { entry.is_pending() }); - - // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. - if let Some(waker) = unsafe { entry.fire(Ok(())) } { - waker_list[waker_idx] = Some(waker); - - waker_idx += 1; - - if waker_idx == waker_list.len() { - // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. - drop(lock); - - for waker in waker_list.iter_mut() { - waker.take().unwrap().wake(); - } - - waker_idx = 0; - - lock = self.get().lock(); - } - } - } - - // Update the elapsed cache - lock.elapsed = lock.wheel.elapsed(); - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - - drop(lock); - - for waker in waker_list[0..waker_idx].iter_mut() { - waker.take().unwrap().wake(); - } - } - - /// Removes a registered timer from the driver. - /// - /// The timer will be moved to the cancelled state. Wakers will _not_ be - /// invoked. If the timer is already completed, this function is a no-op. - /// - /// This function always acquires the driver lock, even if the entry does - /// not appear to be registered. - /// - /// SAFETY: The timer must not be registered with some other driver, and - /// `add_entry` must not be called concurrently. - pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { - unsafe { - let mut lock = self.get().lock(); - - if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); - } - - entry.as_ref().handle().fire(Ok(())); - } - } - - /// Removes and re-adds an entry to the driver. - /// - /// SAFETY: The timer must be either unregistered, or registered with this - /// driver. No other threads are allowed to concurrently manipulate the - /// timer at all (the current thread should hold an exclusive reference to - /// the `TimerEntry`) - pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { - let waker = unsafe { - let mut lock = self.get().lock(); - - // We may have raced with a firing/deregistration, so check before - // deregistering. - if unsafe { entry.as_ref().might_be_registered() } { - lock.wheel.remove(entry); - } - - // Now that we have exclusive control of this entry, mint a handle to reinsert it. - let entry = entry.as_ref().handle(); - - if self.is_shutdown() { - unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } - } else { - entry.set_expiration(new_tick); - - // Note: We don't have to worry about racing with some other resetting - // thread, because add_entry and reregister require exclusive control of - // the timer entry. - match unsafe { lock.wheel.insert(entry) } { - Ok(when) => { - if lock - .next_wake - .map(|next_wake| when < next_wake.get()) - .unwrap_or(true) - { - lock.unpark.unpark(); - } - - None - } - Err((entry, super::error::InsertError::Elapsed)) => unsafe { - entry.fire(Ok(())) - }, - } - } - - // Must release lock before invoking waker to avoid the risk of deadlock. - }; - - // The timer was fired synchronously as a result of the reregistration. - // Wake the waker; this is needed because we might reset _after_ a poll, - // and otherwise the task won't be awoken to poll again. - if let Some(waker) = waker { - waker.wake(); - } - } -} - -impl<P> Park for Driver<P> -where - P: Park + 'static, -{ - type Unpark = TimerUnpark<P>; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - TimerUnpark::new(self) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.park_internal(None) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.park_internal(Some(duration)) - } - - fn shutdown(&mut self) { - if self.handle.is_shutdown() { - return; - } - - self.handle.get().is_shutdown.store(true, Ordering::SeqCst); - - // Advance time forward to the end of time. - - self.handle.process_at_time(u64::MAX); - - self.park.shutdown(); - } -} - -impl<P> Drop for Driver<P> -where - P: Park + 'static, -{ - fn drop(&mut self) { - self.shutdown(); - } -} - -pub(crate) struct TimerUnpark<P: Park + 'static> { - inner: P::Unpark, - - #[cfg(feature = "test-util")] - did_wake: Arc<AtomicBool>, -} - -impl<P: Park + 'static> TimerUnpark<P> { - fn new(driver: &Driver<P>) -> TimerUnpark<P> { - TimerUnpark { - inner: driver.park.unpark(), - - #[cfg(feature = "test-util")] - did_wake: driver.did_wake.clone(), - } - } -} - -impl<P: Park + 'static> Unpark for TimerUnpark<P> { - fn unpark(&self) { - #[cfg(feature = "test-util")] - self.did_wake.store(true, Ordering::SeqCst); - - self.inner.unpark(); - } -} - -// ===== impl Inner ===== - -impl Inner { - pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { - Inner { - state: Mutex::new(InnerState { - time_source, - elapsed: 0, - next_wake: None, - unpark, - wheel: wheel::Wheel::new(), - }), - is_shutdown: AtomicBool::new(false), - } - } - - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { - self.state.lock() - } - - // Check whether the driver has been shutdown - pub(super) fn is_shutdown(&self) -> bool { - self.is_shutdown.load(Ordering::SeqCst) - } -} - -impl fmt::Debug for Inner { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Inner").finish() - } -} - -#[cfg(test)] -mod tests; diff --git a/vendor/tokio/src/time/driver/sleep.rs b/vendor/tokio/src/time/driver/sleep.rs deleted file mode 100644 index 40f745ad7..000000000 --- a/vendor/tokio/src/time/driver/sleep.rs +++ /dev/null @@ -1,257 +0,0 @@ -use crate::time::driver::{Handle, TimerEntry}; -use crate::time::{error::Error, Duration, Instant}; - -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{self, Poll}; - -/// Waits until `deadline` is reached. -/// -/// No work is performed while awaiting on the sleep future to complete. `Sleep` -/// operates at millisecond granularity and should not be used for tasks that -/// require high-resolution timers. -/// -/// # Cancellation -/// -/// Canceling a sleep instance is done by dropping the returned future. No additional -/// cleanup work is required. -// Alias for old name in 0.x -#[cfg_attr(docsrs, doc(alias = "delay_until"))] -pub fn sleep_until(deadline: Instant) -> Sleep { - Sleep::new_timeout(deadline) -} - -/// Waits until `duration` has elapsed. -/// -/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous -/// analog to `std::thread::sleep`. -/// -/// No work is performed while awaiting on the sleep future to complete. `Sleep` -/// operates at millisecond granularity and should not be used for tasks that -/// require high-resolution timers. -/// -/// To run something regularly on a schedule, see [`interval`]. -/// -/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years). -/// -/// # Cancellation -/// -/// Canceling a sleep instance is done by dropping the returned future. No additional -/// cleanup work is required. -/// -/// # Examples -/// -/// Wait 100ms and print "100 ms have elapsed". -/// -/// ``` -/// use tokio::time::{sleep, Duration}; -/// -/// #[tokio::main] -/// async fn main() { -/// sleep(Duration::from_millis(100)).await; -/// println!("100 ms have elapsed"); -/// } -/// ``` -/// -/// [`interval`]: crate::time::interval() -// Alias for old name in 0.x -#[cfg_attr(docsrs, doc(alias = "delay_for"))] -#[cfg_attr(docsrs, doc(alias = "wait"))] -pub fn sleep(duration: Duration) -> Sleep { - match Instant::now().checked_add(duration) { - Some(deadline) => sleep_until(deadline), - None => sleep_until(Instant::far_future()), - } -} - -pin_project! { - /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). - /// - /// This type does not implement the `Unpin` trait, which means that if you - /// use it with [`select!`] or by calling `poll`, you have to pin it first. - /// If you use it with `.await`, this does not apply. - /// - /// # Examples - /// - /// Wait 100ms and print "100 ms have elapsed". - /// - /// ``` - /// use tokio::time::{sleep, Duration}; - /// - /// #[tokio::main] - /// async fn main() { - /// sleep(Duration::from_millis(100)).await; - /// println!("100 ms have elapsed"); - /// } - /// ``` - /// - /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is - /// necessary when the same `Sleep` is selected on multiple times. - /// ```no_run - /// use tokio::time::{self, Duration, Instant}; - /// - /// #[tokio::main] - /// async fn main() { - /// let sleep = time::sleep(Duration::from_millis(10)); - /// tokio::pin!(sleep); - /// - /// loop { - /// tokio::select! { - /// () = &mut sleep => { - /// println!("timer elapsed"); - /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); - /// }, - /// } - /// } - /// } - /// ``` - /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the - /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. - /// ``` - /// use std::future::Future; - /// use std::pin::Pin; - /// use std::task::{Context, Poll}; - /// use tokio::time::Sleep; - /// - /// struct HasSleep { - /// sleep: Pin<Box<Sleep>>, - /// } - /// - /// impl Future for HasSleep { - /// type Output = (); - /// - /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - /// self.sleep.as_mut().poll(cx) - /// } - /// } - /// ``` - /// Use in a struct with pin projection. This method avoids the `Box`, but - /// the `HasSleep` struct will not be `Unpin` as a consequence. - /// ``` - /// use std::future::Future; - /// use std::pin::Pin; - /// use std::task::{Context, Poll}; - /// use tokio::time::Sleep; - /// use pin_project_lite::pin_project; - /// - /// pin_project! { - /// struct HasSleep { - /// #[pin] - /// sleep: Sleep, - /// } - /// } - /// - /// impl Future for HasSleep { - /// type Output = (); - /// - /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - /// self.project().sleep.poll(cx) - /// } - /// } - /// ``` - /// - /// [`select!`]: ../macro.select.html - /// [`tokio::pin!`]: ../macro.pin.html - // Alias for old name in 0.2 - #[cfg_attr(docsrs, doc(alias = "Delay"))] - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Sleep { - deadline: Instant, - - // The link between the `Sleep` instance and the timer that drives it. - #[pin] - entry: TimerEntry, - } -} - -impl Sleep { - pub(crate) fn new_timeout(deadline: Instant) -> Sleep { - let handle = Handle::current(); - let entry = TimerEntry::new(&handle, deadline); - - Sleep { deadline, entry } - } - - pub(crate) fn far_future() -> Sleep { - Self::new_timeout(Instant::far_future()) - } - - /// Returns the instant at which the future will complete. - pub fn deadline(&self) -> Instant { - self.deadline - } - - /// Returns `true` if `Sleep` has elapsed. - /// - /// A `Sleep` instance is elapsed when the requested duration has elapsed. - pub fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - /// Resets the `Sleep` instance to a new deadline. - /// - /// Calling this function allows changing the instant at which the `Sleep` - /// future completes without having to create new associated state. - /// - /// This function can be called both before and after the future has - /// completed. - /// - /// To call this method, you will usually combine the call with - /// [`Pin::as_mut`], which lets you call the method without consuming the - /// `Sleep` itself. - /// - /// # Example - /// - /// ``` - /// use tokio::time::{Duration, Instant}; - /// - /// # #[tokio::main(flavor = "current_thread")] - /// # async fn main() { - /// let sleep = tokio::time::sleep(Duration::from_millis(10)); - /// tokio::pin!(sleep); - /// - /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); - /// # } - /// ``` - /// - /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - let me = self.project(); - me.entry.reset(deadline); - *me.deadline = deadline; - } - - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { - let me = self.project(); - - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Future for Sleep { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { - // `poll_elapsed` can return an error in two cases: - // - // - AtCapacity: this is a pathological case where far too many - // sleep instances have been scheduled. - // - Shutdown: No timer has been setup, which is a mis-use error. - // - // Both cases are extremely rare, and pretty accurately fit into - // "logic errors", so we just panic in this case. A user couldn't - // really do much better if we passed the error onwards. - match ready!(self.as_mut().poll_elapsed(cx)) { - Ok(()) => Poll::Ready(()), - Err(e) => panic!("timer error: {}", e), - } - } -} diff --git a/vendor/tokio/src/time/driver/tests/mod.rs b/vendor/tokio/src/time/driver/tests/mod.rs deleted file mode 100644 index 7c5cf1fd0..000000000 --- a/vendor/tokio/src/time/driver/tests/mod.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::{task::Context, time::Duration}; - -#[cfg(not(loom))] -use futures::task::noop_waker_ref; - -use crate::loom::sync::Arc; -use crate::loom::thread; -use crate::{ - loom::sync::atomic::{AtomicBool, Ordering}, - park::Unpark, -}; - -use super::{Handle, TimerEntry}; - -struct MockUnpark {} -impl Unpark for MockUnpark { - fn unpark(&self) {} -} -impl MockUnpark { - fn mock() -> Box<dyn Unpark> { - Box::new(Self {}) - } -} - -fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { - #[cfg(loom)] - return loom::future::block_on(f); - - #[cfg(not(loom))] - return futures::executor::block_on(f); -} - -fn model(f: impl Fn() + Send + Sync + 'static) { - #[cfg(loom)] - loom::model(f); - - #[cfg(not(loom))] - f(); -} - -#[test] -fn single_timer() { - model(|| { - let clock = crate::time::clock::Clock::new(true, false); - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let handle_ = handle.clone(); - let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); - pin!(entry); - - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); - }); - - thread::yield_now(); - - // This may or may not return Some (depending on how it races with the - // thread). If it does return None, however, the timer should complete - // synchronously. - handle.process_at_time(time_source.now() + 2_000_000_000); - - jh.join().unwrap(); - }) -} - -#[test] -fn drop_timer() { - model(|| { - let clock = crate::time::clock::Clock::new(true, false); - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let handle_ = handle.clone(); - let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); - pin!(entry); - - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - }); - - thread::yield_now(); - - // advance 2s in the future. - handle.process_at_time(time_source.now() + 2_000_000_000); - - jh.join().unwrap(); - }) -} - -#[test] -fn change_waker() { - model(|| { - let clock = crate::time::clock::Clock::new(true, false); - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let handle_ = handle.clone(); - let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); - pin!(entry); - - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); - }); - - thread::yield_now(); - - // advance 2s - handle.process_at_time(time_source.now() + 2_000_000_000); - - jh.join().unwrap(); - }) -} - -#[test] -fn reset_future() { - model(|| { - let finished_early = Arc::new(AtomicBool::new(false)); - - let clock = crate::time::clock::Clock::new(true, false); - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let handle_ = handle.clone(); - let finished_early_ = finished_early.clone(); - let start = clock.now(); - - let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); - pin!(entry); - - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - - entry.as_mut().reset(start + Duration::from_secs(2)); - - // shouldn't complete before 2s - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); - - finished_early_.store(true, Ordering::Relaxed); - }); - - thread::yield_now(); - - // This may or may not return a wakeup time. - handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500))); - - assert!(!finished_early.load(Ordering::Relaxed)); - - handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500))); - - jh.join().unwrap(); - - assert!(finished_early.load(Ordering::Relaxed)); - }) -} - -#[test] -#[cfg(not(loom))] -fn poll_process_levels() { - let clock = crate::time::clock::Clock::new(true, false); - clock.pause(); - - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let mut entries = vec![]; - - for i in 0..1024 { - let mut entry = Box::pin(TimerEntry::new( - &handle, - clock.now() + Duration::from_millis(i), - )); - - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); - - entries.push(entry); - } - - for t in 1..1024 { - handle.process_at_time(t as u64); - for (deadline, future) in entries.iter_mut().enumerate() { - let mut context = Context::from_waker(noop_waker_ref()); - if deadline <= t { - assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); - } else { - assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); - } - } - } -} - -#[test] -#[cfg(not(loom))] -fn poll_process_levels_targeted() { - let mut context = Context::from_waker(noop_waker_ref()); - - let clock = crate::time::clock::Clock::new(true, false); - clock.pause(); - - let time_source = super::ClockTime::new(clock.clone()); - - let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(inner)); - - let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); - pin!(e1); - - handle.process_at_time(62); - assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); - handle.process_at_time(192); - handle.process_at_time(192); -} - -/* -#[test] -fn balanced_incr_and_decr() { - const OPS: usize = 5; - - fn incr(inner: Arc<Inner>) { - for _ in 0..OPS { - inner.increment().expect("increment should not have failed"); - thread::yield_now(); - } - } - - fn decr(inner: Arc<Inner>) { - let mut ops_performed = 0; - while ops_performed < OPS { - if inner.num(Ordering::Relaxed) > 0 { - ops_performed += 1; - inner.decrement(); - } - thread::yield_now(); - } - } - - loom::model(|| { - let unpark = Box::new(MockUnpark); - let instant = Instant::now(); - - let inner = Arc::new(Inner::new(instant, unpark)); - - let incr_inner = inner.clone(); - let decr_inner = inner.clone(); - - let incr_hndle = thread::spawn(move || incr(incr_inner)); - let decr_hndle = thread::spawn(move || decr(decr_inner)); - - incr_hndle.join().expect("should never fail"); - decr_hndle.join().expect("should never fail"); - - assert_eq!(inner.num(Ordering::SeqCst), 0); - }) -} -*/ diff --git a/vendor/tokio/src/time/driver/wheel/level.rs b/vendor/tokio/src/time/driver/wheel/level.rs deleted file mode 100644 index 81d6b58c7..000000000 --- a/vendor/tokio/src/time/driver/wheel/level.rs +++ /dev/null @@ -1,275 +0,0 @@ -use crate::time::driver::TimerHandle; - -use crate::time::driver::{EntryList, TimerShared}; - -use std::{fmt, ptr::NonNull}; - -/// Wheel for a single level in the timer. This wheel contains 64 slots. -pub(crate) struct Level { - level: usize, - - /// Bit field tracking which slots currently contain entries. - /// - /// Using a bit field to track slots that contain entries allows avoiding a - /// scan to find entries. This field is updated when entries are added or - /// removed from a slot. - /// - /// The least-significant bit represents slot zero. - occupied: u64, - - /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell. - slot: [EntryList; LEVEL_MULT], -} - -/// Indicates when a slot must be processed next. -#[derive(Debug)] -pub(crate) struct Expiration { - /// The level containing the slot. - pub(crate) level: usize, - - /// The slot index. - pub(crate) slot: usize, - - /// The instant at which the slot needs to be processed. - pub(crate) deadline: u64, -} - -/// Level multiplier. -/// -/// Being a power of 2 is very important. -const LEVEL_MULT: usize = 64; - -impl Level { - pub(crate) fn new(level: usize) -> Level { - // A value has to be Copy in order to use syntax like: - // let stack = Stack::default(); - // ... - // slots: [stack; 64], - // - // Alternatively, since Stack is Default one can - // use syntax like: - // let slots: [Stack; 64] = Default::default(); - // - // However, that is only supported for arrays of size - // 32 or fewer. So in our case we have to explicitly - // invoke the constructor for each array element. - let ctor = || EntryList::default(); - - Level { - level, - occupied: 0, - slot: [ - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ], - } - } - - /// Finds the slot that needs to be processed next and returns the slot and - /// `Instant` at which this slot must be processed. - pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> { - // Use the `occupied` bit field to get the index of the next slot that - // needs to be processed. - let slot = match self.next_occupied_slot(now) { - Some(slot) => slot, - None => return None, - }; - - // From the slot index, calculate the `Instant` at which it needs to be - // processed. This value *must* be in the future with respect to `now`. - - let level_range = level_range(self.level); - let slot_range = slot_range(self.level); - - // TODO: This can probably be simplified w/ power of 2 math - let level_start = now - (now % level_range); - let mut deadline = level_start + slot as u64 * slot_range; - - if deadline <= now { - // A timer is in a slot "prior" to the current time. This can occur - // because we do not have an infinite hierarchy of timer levels, and - // eventually a timer scheduled for a very distant time might end up - // being placed in a slot that is beyond the end of all of the - // arrays. - // - // To deal with this, we first limit timers to being scheduled no - // more than MAX_DURATION ticks in the future; that is, they're at - // most one rotation of the top level away. Then, we force timers - // that logically would go into the top+1 level, to instead go into - // the top level's slots. - // - // What this means is that the top level's slots act as a - // pseudo-ring buffer, and we rotate around them indefinitely. If we - // compute a deadline before now, and it's the top level, it - // therefore means we're actually looking at a slot in the future. - debug_assert_eq!(self.level, super::NUM_LEVELS - 1); - - deadline += level_range; - } - - debug_assert!( - deadline >= now, - "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}", - deadline, - now, - self.level, - level_range, - slot_range, - slot, - self.occupied - ); - - Some(Expiration { - level: self.level, - slot, - deadline, - }) - } - - fn next_occupied_slot(&self, now: u64) -> Option<usize> { - if self.occupied == 0 { - return None; - } - - // Get the slot for now using Maths - let now_slot = (now / slot_range(self.level)) as usize; - let occupied = self.occupied.rotate_right(now_slot as u32); - let zeros = occupied.trailing_zeros() as usize; - let slot = (zeros + now_slot) % 64; - - Some(slot) - } - - pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) { - let slot = slot_for(item.cached_when(), self.level); - - self.slot[slot].push_front(item); - - self.occupied |= occupied_bit(slot); - } - - pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) { - let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level); - - unsafe { self.slot[slot].remove(item) }; - if self.slot[slot].is_empty() { - // The bit is currently set - debug_assert!(self.occupied & occupied_bit(slot) != 0); - - // Unset the bit - self.occupied ^= occupied_bit(slot); - } - } - - pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList { - self.occupied &= !occupied_bit(slot); - - std::mem::take(&mut self.slot[slot]) - } -} - -impl fmt::Debug for Level { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Level") - .field("occupied", &self.occupied) - .finish() - } -} - -fn occupied_bit(slot: usize) -> u64 { - 1 << slot -} - -fn slot_range(level: usize) -> u64 { - LEVEL_MULT.pow(level as u32) as u64 -} - -fn level_range(level: usize) -> u64 { - LEVEL_MULT as u64 * slot_range(level) -} - -/// Convert a duration (milliseconds) and a level to a slot position -fn slot_for(duration: u64, level: usize) -> usize { - ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize -} - -#[cfg(all(test, not(loom)))] -mod test { - use super::*; - - #[test] - fn test_slot_for() { - for pos in 0..64 { - assert_eq!(pos as usize, slot_for(pos, 0)); - } - - for level in 1..5 { - for pos in level..64 { - let a = pos * 64_usize.pow(level as u32); - assert_eq!(pos as usize, slot_for(a as u64, level)); - } - } - } -} diff --git a/vendor/tokio/src/time/driver/wheel/mod.rs b/vendor/tokio/src/time/driver/wheel/mod.rs deleted file mode 100644 index 5a40f6db8..000000000 --- a/vendor/tokio/src/time/driver/wheel/mod.rs +++ /dev/null @@ -1,359 +0,0 @@ -use crate::time::driver::{TimerHandle, TimerShared}; -use crate::time::error::InsertError; - -mod level; -pub(crate) use self::level::Expiration; -use self::level::Level; - -use std::ptr::NonNull; - -use super::EntryList; - -/// Timing wheel implementation. -/// -/// This type provides the hashed timing wheel implementation that backs `Timer` -/// and `DelayQueue`. -/// -/// The structure is generic over `T: Stack`. This allows handling timeout data -/// being stored on the heap or in a slab. In order to support the latter case, -/// the slab must be passed into each function allowing the implementation to -/// lookup timer entries. -/// -/// See `Timer` documentation for some implementation notes. -#[derive(Debug)] -pub(crate) struct Wheel { - /// The number of milliseconds elapsed since the wheel started. - elapsed: u64, - - /// Timer wheel. - /// - /// Levels: - /// - /// * 1 ms slots / 64 ms range - /// * 64 ms slots / ~ 4 sec range - /// * ~ 4 sec slots / ~ 4 min range - /// * ~ 4 min slots / ~ 4 hr range - /// * ~ 4 hr slots / ~ 12 day range - /// * ~ 12 day slots / ~ 2 yr range - levels: Vec<Level>, - - /// Entries queued for firing - pending: EntryList, -} - -/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots -/// each, the timer is able to track time up to 2 years into the future with a -/// precision of 1 millisecond. -const NUM_LEVELS: usize = 6; - -/// The maximum duration of a `Sleep` -pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; - -impl Wheel { - /// Create a new timing wheel - pub(crate) fn new() -> Wheel { - let levels = (0..NUM_LEVELS).map(Level::new).collect(); - - Wheel { - elapsed: 0, - levels, - pending: EntryList::new(), - } - } - - /// Return the number of milliseconds that have elapsed since the timing - /// wheel's creation. - pub(crate) fn elapsed(&self) -> u64 { - self.elapsed - } - - /// Insert an entry into the timing wheel. - /// - /// # Arguments - /// - /// * `item`: The item to insert into the wheel. - /// - /// # Return - /// - /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. - /// - /// `Err(Elapsed)` indicates that `when` represents an instant that has - /// already passed. In this case, the caller should fire the timeout - /// immediately. - /// - /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. - /// - /// # Safety - /// - /// This function registers item into an intrusive linked list. The caller - /// must ensure that `item` is pinned and will not be dropped without first - /// being deregistered. - pub(crate) unsafe fn insert( - &mut self, - item: TimerHandle, - ) -> Result<u64, (TimerHandle, InsertError)> { - let when = item.sync_when(); - - if when <= self.elapsed { - return Err((item, InsertError::Elapsed)); - } - - // Get the level at which the entry should be stored - let level = self.level_for(when); - - unsafe { - self.levels[level].add_entry(item); - } - - debug_assert!({ - self.levels[level] - .next_expiration(self.elapsed) - .map(|e| e.deadline >= self.elapsed) - .unwrap_or(true) - }); - - Ok(when) - } - - /// Remove `item` from the timing wheel. - pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) { - unsafe { - let when = item.as_ref().cached_when(); - if when == u64::MAX { - self.pending.remove(item); - } else { - debug_assert!( - self.elapsed <= when, - "elapsed={}; when={}", - self.elapsed, - when - ); - - let level = self.level_for(when); - - self.levels[level].remove_entry(item); - } - } - } - - /// Instant at which to poll - pub(crate) fn poll_at(&self) -> Option<u64> { - self.next_expiration().map(|expiration| expiration.deadline) - } - - /// Advances the timer up to the instant represented by `now`. - pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> { - loop { - if let Some(handle) = self.pending.pop_back() { - return Some(handle); - } - - // under what circumstances is poll.expiration Some vs. None? - let expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > now { - None - } else { - Some(expiration) - } - }); - - match expiration { - Some(ref expiration) if expiration.deadline > now => return None, - Some(ref expiration) => { - self.process_expiration(expiration); - - self.set_elapsed(expiration.deadline); - } - None => { - // in this case the poll did not indicate an expiration - // _and_ we were not able to find a next expiration in - // the current list of timers. advance to the poll's - // current time and do nothing else. - self.set_elapsed(now); - break; - } - } - } - - self.pending.pop_back() - } - - /// Returns the instant at which the next timeout expires. - fn next_expiration(&self) -> Option<Expiration> { - if !self.pending.is_empty() { - // Expire immediately as we have things pending firing - return Some(Expiration { - level: 0, - slot: 0, - deadline: self.elapsed, - }); - } - - // Check all levels - for level in 0..NUM_LEVELS { - if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { - // There cannot be any expirations at a higher level that happen - // before this one. - debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); - - return Some(expiration); - } - } - - None - } - - /// Returns the tick at which this timer wheel next needs to perform some - /// processing, or None if there are no timers registered. - pub(super) fn next_expiration_time(&self) -> Option<u64> { - self.next_expiration().map(|ex| ex.deadline) - } - - /// Used for debug assertions - fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { - let mut res = true; - - for l2 in start_level..NUM_LEVELS { - if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { - if e2.deadline < before { - res = false; - } - } - } - - res - } - - /// iteratively find entries that are between the wheel's current - /// time and the expiration time. for each in that population either - /// queue it for notification (in the case of the last level) or tier - /// it down to the next level (in all other cases). - pub(crate) fn process_expiration(&mut self, expiration: &Expiration) { - // Note that we need to take _all_ of the entries off the list before - // processing any of them. This is important because it's possible that - // those entries might need to be reinserted into the same slot. - // - // This happens only on the highest level, when an entry is inserted - // more than MAX_DURATION into the future. When this happens, we wrap - // around, and process some entries a multiple of MAX_DURATION before - // they actually need to be dropped down a level. We then reinsert them - // back into the same position; we must make sure we don't then process - // those entries again or we'll end up in an infinite loop. - let mut entries = self.take_entries(expiration); - - while let Some(item) = entries.pop_back() { - if expiration.level == 0 { - debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline); - } - - // Try to expire the entry; this is cheap (doesn't synchronize) if - // the timer is not expired, and updates cached_when. - match unsafe { item.mark_pending(expiration.deadline) } { - Ok(()) => { - // Item was expired - self.pending.push_front(item); - } - Err(expiration_tick) => { - let level = level_for(expiration.deadline, expiration_tick); - unsafe { - self.levels[level].add_entry(item); - } - } - } - } - } - - fn set_elapsed(&mut self, when: u64) { - assert!( - self.elapsed <= when, - "elapsed={:?}; when={:?}", - self.elapsed, - when - ); - - if when > self.elapsed { - self.elapsed = when; - } - } - - /// Obtains the list of entries that need processing for the given expiration. - /// - fn take_entries(&mut self, expiration: &Expiration) -> EntryList { - self.levels[expiration.level].take_slot(expiration.slot) - } - - fn level_for(&self, when: u64) -> usize { - level_for(self.elapsed, when) - } -} - -fn level_for(elapsed: u64, when: u64) -> usize { - const SLOT_MASK: u64 = (1 << 6) - 1; - - // Mask in the trailing bits ignored by the level calculation in order to cap - // the possible leading zeros - let mut masked = elapsed ^ when | SLOT_MASK; - - if masked >= MAX_DURATION { - // Fudge the timer into the top level - masked = MAX_DURATION - 1; - } - - let leading_zeros = masked.leading_zeros() as usize; - let significant = 63 - leading_zeros; - - significant / 6 -} - -#[cfg(all(test, not(loom)))] -mod test { - use super::*; - - #[test] - fn test_level_for() { - for pos in 0..64 { - assert_eq!( - 0, - level_for(0, pos), - "level_for({}) -- binary = {:b}", - pos, - pos - ); - } - - for level in 1..5 { - for pos in level..64 { - let a = pos * 64_usize.pow(level as u32); - assert_eq!( - level, - level_for(0, a as u64), - "level_for({}) -- binary = {:b}", - a, - a - ); - - if pos > level { - let a = a - 1; - assert_eq!( - level, - level_for(0, a as u64), - "level_for({}) -- binary = {:b}", - a, - a - ); - } - - if pos < 64 { - let a = a + 1; - assert_eq!( - level, - level_for(0, a as u64), - "level_for({}) -- binary = {:b}", - a, - a - ); - } - } - } - } -} diff --git a/vendor/tokio/src/time/driver/wheel/stack.rs b/vendor/tokio/src/time/driver/wheel/stack.rs deleted file mode 100644 index e7ed137f5..000000000 --- a/vendor/tokio/src/time/driver/wheel/stack.rs +++ /dev/null @@ -1,112 +0,0 @@ -use super::{Item, OwnedItem}; -use crate::time::driver::Entry; - -use std::ptr; - -/// A doubly linked stack -#[derive(Debug)] -pub(crate) struct Stack { - head: Option<OwnedItem>, -} - -impl Default for Stack { - fn default() -> Stack { - Stack { head: None } - } -} - -impl Stack { - pub(crate) fn is_empty(&self) -> bool { - self.head.is_none() - } - - pub(crate) fn push(&mut self, entry: OwnedItem) { - // Get a pointer to the entry to for the prev link - let ptr: *const Entry = &*entry as *const _; - - // Remove the old head entry - let old = self.head.take(); - - unsafe { - // Ensure the entry is not already in a stack. - debug_assert!((*entry.next_stack.get()).is_none()); - debug_assert!((*entry.prev_stack.get()).is_null()); - - if let Some(ref entry) = old.as_ref() { - debug_assert!({ - // The head is not already set to the entry - ptr != &***entry as *const _ - }); - - // Set the previous link on the old head - *entry.prev_stack.get() = ptr; - } - - // Set this entry's next pointer - *entry.next_stack.get() = old; - } - - // Update the head pointer - self.head = Some(entry); - } - - /// Pops an item from the stack - pub(crate) fn pop(&mut self) -> Option<OwnedItem> { - let entry = self.head.take(); - - unsafe { - if let Some(entry) = entry.as_ref() { - self.head = (*entry.next_stack.get()).take(); - - if let Some(entry) = self.head.as_ref() { - *entry.prev_stack.get() = ptr::null(); - } - - *entry.prev_stack.get() = ptr::null(); - } - } - - entry - } - - pub(crate) fn remove(&mut self, entry: &Item) { - unsafe { - // Ensure that the entry is in fact contained by the stack - debug_assert!({ - // This walks the full linked list even if an entry is found. - let mut next = self.head.as_ref(); - let mut contains = false; - - while let Some(n) = next { - if entry as *const _ == &**n as *const _ { - debug_assert!(!contains); - contains = true; - } - - next = (*n.next_stack.get()).as_ref(); - } - - contains - }); - - // Unlink `entry` from the next node - let next = (*entry.next_stack.get()).take(); - - if let Some(next) = next.as_ref() { - (*next.prev_stack.get()) = *entry.prev_stack.get(); - } - - // Unlink `entry` from the prev node - - if let Some(prev) = (*entry.prev_stack.get()).as_ref() { - *prev.next_stack.get() = next; - } else { - // It is the head - self.head = next; - } - - // Unset the prev pointer - *entry.prev_stack.get() = ptr::null(); - } - } -} diff --git a/vendor/tokio/src/time/error.rs b/vendor/tokio/src/time/error.rs index 8674febe9..71344d434 100644 --- a/vendor/tokio/src/time/error.rs +++ b/vendor/tokio/src/time/error.rs @@ -40,8 +40,11 @@ impl From<Kind> for Error { } } -/// Error returned by `Timeout`. -#[derive(Debug, PartialEq)] +/// Errors returned by `Timeout`. +/// +/// This error is returned when a timeout expires before the function was able +/// to finish. +#[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); #[derive(Debug)] @@ -72,7 +75,7 @@ impl Error { matches!(self.0, Kind::AtCapacity) } - /// Create an error representing a misconfigured timer. + /// Creates an error representing a misconfigured timer. pub fn invalid() -> Error { Error(Invalid) } diff --git a/vendor/tokio/src/time/instant.rs b/vendor/tokio/src/time/instant.rs index f7cf12d4a..14cf6e567 100644 --- a/vendor/tokio/src/time/instant.rs +++ b/vendor/tokio/src/time/instant.rs @@ -67,13 +67,10 @@ impl Instant { self.std } - /// Returns the amount of time elapsed from another instant to this one. - /// - /// # Panics - /// - /// This function will panic if `earlier` is later than `self`. + /// Returns the amount of time elapsed from another instant to this one, or + /// zero duration if that instant is later than this one. pub fn duration_since(&self, earlier: Instant) -> Duration { - self.std.duration_since(earlier.std) + self.std.saturating_duration_since(earlier.std) } /// Returns the amount of time elapsed from another instant to this one, or @@ -118,13 +115,8 @@ impl Instant { self.std.saturating_duration_since(earlier.std) } - /// Returns the amount of time elapsed since this instant was created. - /// - /// # Panics - /// - /// This function may panic if the current time is earlier than this - /// instant, which is something that can happen if an `Instant` is - /// produced synthetically. + /// Returns the amount of time elapsed since this instant was created, + /// or zero duration if that this instant is in the future. /// /// # Examples /// @@ -140,7 +132,7 @@ impl Instant { /// } /// ``` pub fn elapsed(&self) -> Duration { - Instant::now() - *self + Instant::now().saturating_duration_since(*self) } /// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be @@ -188,7 +180,7 @@ impl ops::Sub for Instant { type Output = Duration; fn sub(self, rhs: Instant) -> Duration { - self.std - rhs.std + self.std.saturating_duration_since(rhs.std) } } @@ -196,7 +188,7 @@ impl ops::Sub<Duration> for Instant { type Output = Instant; fn sub(self, rhs: Duration) -> Instant { - Instant::from_std(self.std - rhs) + Instant::from_std(std::time::Instant::sub(self.std, rhs)) } } diff --git a/vendor/tokio/src/time/interval.rs b/vendor/tokio/src/time/interval.rs index a63e47b6e..2e153fdbe 100644 --- a/vendor/tokio/src/time/interval.rs +++ b/vendor/tokio/src/time/interval.rs @@ -1,9 +1,11 @@ use crate::future::poll_fn; use crate::time::{sleep_until, Duration, Instant, Sleep}; +use crate::util::trace; +use std::future::Future; +use std::panic::Location; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{convert::TryInto, future::Future}; /// Creates new [`Interval`] that yields with interval of `period`. The first /// tick completes immediately. The default [`MissedTickBehavior`] is @@ -68,10 +70,10 @@ use std::{convert::TryInto, future::Future}; /// /// [`sleep`]: crate::time::sleep() /// [`.tick().await`]: Interval::tick +#[track_caller] pub fn interval(period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); - - interval_at(Instant::now(), period) + internal_interval_at(Instant::now(), period, trace::caller_location()) } /// Creates new [`Interval`] that yields with interval of `period` with the @@ -103,13 +105,44 @@ pub fn interval(period: Duration) -> Interval { /// // approximately 70ms have elapsed. /// } /// ``` +#[track_caller] pub fn interval_at(start: Instant, period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + internal_interval_at(start, period, trace::caller_location()) +} + +#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] +fn internal_interval_at( + start: Instant, + period: Duration, + location: Option<&'static Location<'static>>, +) -> Interval { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = location.expect("should have location if tracing"); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Interval", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let delay = Box::pin(sleep_until(start)); Interval { - delay: Box::pin(sleep_until(start)), + delay, period, missed_tick_behavior: Default::default(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -124,7 +157,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { /// /// #[tokio::main] /// async fn main() { -/// // ticks every 2 seconds +/// // ticks every 2 milliseconds /// let mut interval = time::interval(Duration::from_millis(2)); /// for _ in 0..5 { /// interval.tick().await; @@ -147,7 +180,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { /// milliseconds. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MissedTickBehavior { - /// Tick as fast as possible until caught up. + /// Ticks as fast as possible until caught up. /// /// When this strategy is used, [`Interval`] schedules ticks "normally" (the /// same as it would have if the ticks hadn't been delayed), which results @@ -174,6 +207,9 @@ pub enum MissedTickBehavior { /// # async fn main() { /// let mut interval = interval(Duration::from_millis(50)); /// + /// // First tick resolves immediately after creation + /// interval.tick().await; + /// /// task_that_takes_200_millis().await; /// // The `Interval` has missed a tick /// @@ -242,7 +278,7 @@ pub enum MissedTickBehavior { /// // 50ms after the call to `tick` up above. That is, in `tick`, when we /// // recognize that we missed a tick, we schedule the next tick to happen /// // 50ms (or whatever the `period` is) from right then, not from when - /// // were were *supposed* to tick + /// // were *supposed* to tick /// interval.tick().await; /// # } /// ``` @@ -252,7 +288,7 @@ pub enum MissedTickBehavior { /// [`tick`]: Interval::tick Delay, - /// Skip missed ticks and tick on the next multiple of `period` from + /// Skips missed ticks and tick on the next multiple of `period` from /// `start`. /// /// When this strategy is used, [`Interval`] schedules the next tick to fire @@ -342,7 +378,7 @@ impl Default for MissedTickBehavior { } } -/// Interval returned by [`interval`] and [`interval_at`] +/// Interval returned by [`interval`] and [`interval_at`]. /// /// This type allows you to wait on a sequence of instants with a certain /// duration between each instant. Unlike calling [`sleep`] in a loop, this lets @@ -351,7 +387,7 @@ impl Default for MissedTickBehavior { /// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. /// /// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html -/// [`sleep`]: crate::time::sleep +/// [`sleep`]: crate::time::sleep() #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. @@ -362,11 +398,19 @@ pub struct Interval { /// The strategy `Interval` should use when a tick is missed. missed_tick_behavior: MissedTickBehavior, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } impl Interval { /// Completes when the next instant in the interval has been reached. /// + /// # Cancel safety + /// + /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and + /// another branch completes first, then no tick has been consumed. + /// /// # Examples /// /// ``` @@ -379,6 +423,7 @@ impl Interval { /// let mut interval = time::interval(Duration::from_millis(10)); /// /// interval.tick().await; + /// // approximately 0ms have elapsed. The first tick completes immediately. /// interval.tick().await; /// interval.tick().await; /// @@ -386,10 +431,23 @@ impl Interval { /// } /// ``` pub async fn tick(&mut self) -> Instant { - poll_fn(|cx| self.poll_tick(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let instant = trace::async_op( + || poll_fn(|cx| self.poll_tick(cx)), + resource_span, + "Interval::tick", + "poll_tick", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let instant = poll_fn(|cx| self.poll_tick(cx)); + + instant.await } - /// Poll for the next instant in the interval to be reached. + /// Polls for the next instant in the interval to be reached. /// /// This method can return the following values: /// @@ -424,12 +482,45 @@ impl Interval { timeout + self.period }; - self.delay.as_mut().reset(next); + // When we arrive here, the internal delay returned `Poll::Ready`. + // Reset the delay but do not register it. It should be registered with + // the next call to [`poll_tick`]. + self.delay.as_mut().reset_without_reregister(next); // Return the time when we were scheduled to tick Poll::Ready(timeout) } + /// Resets the interval to complete one period after the current time. + /// + /// This method ignores [`MissedTickBehavior`] strategy. + /// + /// # Examples + /// + /// ``` + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut interval = time::interval(Duration::from_millis(100)); + /// + /// interval.tick().await; + /// + /// time::sleep(Duration::from_millis(50)).await; + /// interval.reset(); + /// + /// interval.tick().await; + /// interval.tick().await; + /// + /// // approximately 250ms have elapsed. + /// } + /// ``` + pub fn reset(&mut self) { + self.delay.as_mut().reset(Instant::now() + self.period); + } + /// Returns the [`MissedTickBehavior`] strategy currently being used. pub fn missed_tick_behavior(&self) -> MissedTickBehavior { self.missed_tick_behavior diff --git a/vendor/tokio/src/time/mod.rs b/vendor/tokio/src/time/mod.rs index 281990ef9..a1f27b839 100644 --- a/vendor/tokio/src/time/mod.rs +++ b/vendor/tokio/src/time/mod.rs @@ -82,17 +82,13 @@ //! ``` //! //! [`interval`]: crate::time::interval() +//! [`sleep`]: sleep() mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; -pub(crate) mod driver; - -#[doc(inline)] -pub use driver::sleep::{sleep, sleep_until, Sleep}; - pub mod error; mod instant; @@ -101,14 +97,13 @@ pub use self::instant::Instant; mod interval; pub use interval::{interval, interval_at, Interval, MissedTickBehavior}; +mod sleep; +pub use sleep::{sleep, sleep_until, Sleep}; + mod timeout; #[doc(inline)] pub use timeout::{timeout, timeout_at, Timeout}; -#[cfg(test)] -#[cfg(not(loom))] -mod tests; - // Re-export for convenience #[doc(no_inline)] pub use std::time::Duration; diff --git a/vendor/tokio/src/time/sleep.rs b/vendor/tokio/src/time/sleep.rs new file mode 100644 index 000000000..6c9b33793 --- /dev/null +++ b/vendor/tokio/src/time/sleep.rs @@ -0,0 +1,452 @@ +use crate::runtime::time::TimerEntry; +use crate::time::{error::Error, Duration, Instant}; +use crate::util::trace; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::panic::Location; +use std::pin::Pin; +use std::task::{self, Poll}; + +/// Waits until `deadline` is reached. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep_until, Instant, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep_until(Instant::now() + Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_until"))] +#[track_caller] +pub fn sleep_until(deadline: Instant) -> Sleep { + return Sleep::new_timeout(deadline, trace::caller_location()); +} + +/// Waits until `duration` has elapsed. +/// +/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous +/// analog to `std::thread::sleep`. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. The implementation is platform specific, +/// and some platforms (specifically Windows) will provide timers with a +/// larger resolution than 1 ms. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years). +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep(Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_for"))] +#[cfg_attr(docsrs, doc(alias = "wait"))] +#[track_caller] +pub fn sleep(duration: Duration) -> Sleep { + let location = trace::caller_location(); + + match Instant::now().checked_add(duration) { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::new_timeout(Instant::far_future(), location), + } +} + +pin_project! { + /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). + /// + /// This type does not implement the `Unpin` trait, which means that if you + /// use it with [`select!`] or by calling `poll`, you have to pin it first. + /// If you use it with `.await`, this does not apply. + /// + /// # Examples + /// + /// Wait 100ms and print "100 ms have elapsed". + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// sleep(Duration::from_millis(100)).await; + /// println!("100 ms have elapsed"); + /// } + /// ``` + /// + /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is + /// necessary when the same `Sleep` is selected on multiple times. + /// ```no_run + /// use tokio::time::{self, Duration, Instant}; + /// + /// #[tokio::main] + /// async fn main() { + /// let sleep = time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// loop { + /// tokio::select! { + /// () = &mut sleep => { + /// println!("timer elapsed"); + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); + /// }, + /// } + /// } + /// } + /// ``` + /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the + /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// + /// struct HasSleep { + /// sleep: Pin<Box<Sleep>>, + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.sleep.as_mut().poll(cx) + /// } + /// } + /// ``` + /// Use in a struct with pin projection. This method avoids the `Box`, but + /// the `HasSleep` struct will not be `Unpin` as a consequence. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// use pin_project_lite::pin_project; + /// + /// pin_project! { + /// struct HasSleep { + /// #[pin] + /// sleep: Sleep, + /// } + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.project().sleep.poll(cx) + /// } + /// } + /// ``` + /// + /// [`select!`]: ../macro.select.html + /// [`tokio::pin!`]: ../macro.pin.html + // Alias for old name in 0.2 + #[cfg_attr(docsrs, doc(alias = "Delay"))] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Sleep { + inner: Inner, + + // The link between the `Sleep` instance and the timer that drives it. + #[pin] + entry: TimerEntry, + } +} + +cfg_trace! { + #[derive(Debug)] + struct Inner { + ctx: trace::AsyncOpTracingCtx, + } +} + +cfg_not_trace! { + #[derive(Debug)] + struct Inner { + } +} + +impl Sleep { + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[track_caller] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { + use crate::runtime::scheduler; + + let handle = scheduler::Handle::current(); + let entry = TimerEntry::new(&handle, deadline); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let clock = handle.driver().clock(); + let handle = &handle.driver().time(); + let time_source = handle.time_source(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.saturating_sub(time_source.now(clock)); + + let location = location.expect("should have location if tracing"); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; + + Inner { ctx } + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner {}; + + Sleep { inner, entry } + } + + pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { + Self::new_timeout(Instant::far_future(), location) + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.entry.deadline() + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.entry.is_elapsed() + } + + /// Resets the `Sleep` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method without consuming the + /// `Sleep` itself. + /// + /// # Example + /// + /// ``` + /// use tokio::time::{Duration, Instant}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let sleep = tokio::time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); + /// # } + /// ``` + /// + /// See also the top-level examples. + /// + /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.reset_inner(deadline) + } + + /// Resets the `Sleep` instance to a new deadline without reregistering it + /// to be woken up. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state and + /// without having it registered. This is required in e.g. the + /// [crate::time::Interval] where we want to reset the internal [Sleep] + /// without having it wake up the last task that polled it. + pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset(deadline, false); + } + + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset(deadline, true); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + let _resource_enter = me.inner.ctx.resource_span.enter(); + me.inner.ctx.async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + let _async_op_enter = me.inner.ctx.async_op_span.enter(); + + me.inner.ctx.async_op_poll_span = + tracing::trace_span!("runtime.resource.async_op.poll"); + + let duration = { + let clock = me.entry.clock(); + let time_source = me.entry.driver().time_source(); + let now = time_source.now(clock); + let deadline_tick = time_source.deadline_to_tick(deadline); + deadline_tick.saturating_sub(now) + }; + + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + } + } + + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let me = self.project(); + + ready!(crate::trace::trace_leaf(cx)); + + // Keep track of task budget + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::runtime::coop::poll_proceed(cx), + )); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_elapsed", result); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return result; + } +} + +impl Future for Sleep { + type Output = (); + + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathological case where far too many + // sleep instances have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.inner.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.inner.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); + match ready!(self.as_mut().poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } + } +} diff --git a/vendor/tokio/src/time/tests/mod.rs b/vendor/tokio/src/time/tests/mod.rs deleted file mode 100644 index 35e1060ac..000000000 --- a/vendor/tokio/src/time/tests/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -mod test_sleep; - -use crate::time::{self, Instant}; -use std::time::Duration; - -fn assert_send<T: Send>() {} -fn assert_sync<T: Sync>() {} - -#[test] -fn registration_is_send_and_sync() { - use crate::time::Sleep; - - assert_send::<Sleep>(); - assert_sync::<Sleep>(); -} - -#[test] -#[should_panic] -fn sleep_is_eager() { - let when = Instant::now() + Duration::from_millis(100); - let _ = time::sleep_until(when); -} diff --git a/vendor/tokio/src/time/tests/test_sleep.rs b/vendor/tokio/src/time/tests/test_sleep.rs deleted file mode 100644 index 77ca07e31..000000000 --- a/vendor/tokio/src/time/tests/test_sleep.rs +++ /dev/null @@ -1,443 +0,0 @@ -//use crate::time::driver::{Driver, Entry, Handle}; - -/* -macro_rules! poll { - ($e:expr) => { - $e.enter(|cx, e| e.poll_elapsed(cx)) - }; -} - -#[test] -fn frozen_utility_returns_correct_advanced_duration() { - let clock = Clock::new(); - clock.pause(); - let start = clock.now(); - - clock.advance(ms(10)); - assert_eq!(clock.now() - start, ms(10)); -} - -#[test] -fn immediate_sleep() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - let when = clock.now(); - let mut e = task::spawn(sleep_until(&handle, when)); - - assert_ready_ok!(poll!(e)); - - assert_ok!(driver.park_timeout(Duration::from_millis(1000))); - - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.now() - start, ms(1000)); -} - -#[test] -fn delayed_sleep_level_0() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - for &i in &[1, 10, 60] { - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, start + ms(i))); - - // The sleep instance has not elapsed. - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(i)); - - assert_ready_ok!(poll!(e)); - } -} - -#[test] -fn sub_ms_delayed_sleep() { - let (mut driver, clock, handle) = setup(); - - for _ in 0..5 { - let deadline = clock.now() + ms(1) + Duration::new(0, 1); - - let mut e = task::spawn(sleep_until(&handle, deadline)); - - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_ready_ok!(poll!(e)); - - assert!(clock.now() >= deadline); - - clock.advance(Duration::new(0, 1)); - } -} - -#[test] -fn delayed_sleep_wrapping_level_0() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - assert_ok!(driver.park_timeout(ms(5))); - assert_eq!(clock.now() - start, ms(5)); - - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(60))); - - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(64)); - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(65)); - - assert_ready_ok!(poll!(e)); -} - -#[test] -fn timer_wrapping_with_higher_levels() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Set sleep to hit level 1 - let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(64))); - assert_pending!(poll!(e1)); - - // Turn a bit - assert_ok!(driver.park_timeout(ms(5))); - - // Set timeout such that it will hit level 0, but wrap - let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(60))); - assert_pending!(poll!(e2)); - - // This should result in s1 firing - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(64)); - - assert_ready_ok!(poll!(e1)); - assert_pending!(poll!(e2)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(65)); - - assert_ready_ok!(poll!(e1)); -} - -#[test] -fn sleep_with_deadline_in_past() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create `Sleep` that elapsed immediately. - let mut e = task::spawn(sleep_until(&handle, clock.now() - ms(100))); - - // Even though the `Sleep` expires in the past, it is not ready yet - // because the timer must observe it. - assert_ready_ok!(poll!(e)); - - // Turn the timer, it runs for the elapsed time - assert_ok!(driver.park_timeout(ms(1000))); - - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.now() - start, ms(1000)); -} - -#[test] -fn delayed_sleep_level_1() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer, this will wake up to cascade the timer down. - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(192)); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer again - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(234)); - - // The sleep has elapsed. - assert_ready_ok!(poll!(e)); - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer with a smaller timeout than the cascade. - assert_ok!(driver.park_timeout(ms(100))); - assert_eq!(clock.now() - start, ms(100)); - - assert_pending!(poll!(e)); - - // Turn the timer, this will wake up to cascade the timer down. - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(192)); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer again - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(234)); - - // The sleep has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn concurrently_set_two_timers_second_one_shorter() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(500))); - let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); - - // The sleep has not elapsed - assert_pending!(poll!(e1)); - assert_pending!(poll!(e2)); - - // Sleep until a cascade - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(192)); - - // Sleep until the second timer. - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(200)); - - // The shorter sleep fires - assert_ready_ok!(poll!(e2)); - assert_pending!(poll!(e1)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(448)); - - assert_pending!(poll!(e1)); - - // Turn again, this time the time will advance to the second sleep - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(500)); - - assert_ready_ok!(poll!(e1)); -} - -#[test] -fn short_sleep() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer, but not enough time will go by. - assert_ok!(driver.park()); - - // The sleep has elapsed. - assert_ready_ok!(poll!(e)); - - // The time has advanced to the point of the sleep elapsing. - assert_eq!(clock.now() - start, ms(1)); -} - -#[test] -fn sorta_long_sleep_until() { - const MIN_5: u64 = 5 * 60 * 1000; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MIN_5))); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; - - for &elapsed in cascades { - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(elapsed)); - - assert_pending!(poll!(e)); - } - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(MIN_5)); - - // The sleep has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn very_long_sleep() { - const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Sleep` that elapses in the future - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MO_5))); - - // The sleep has not elapsed. - assert_pending!(poll!(e)); - - let cascades = &[ - 12_884_901_888, - 12_952_010_752, - 12_959_875_072, - 12_959_997_952, - ]; - - for &elapsed in cascades { - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(elapsed)); - - assert_pending!(poll!(e)); - } - - // Turn the timer, but not enough time will go by. - assert_ok!(driver.park()); - - // The time has advanced to the point of the sleep elapsing. - assert_eq!(clock.now() - start, ms(MO_5)); - - // The sleep has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn unpark_is_delayed() { - // A special park that will take much longer than the requested duration - struct MockPark(Clock); - - struct MockUnpark; - - impl Park for MockPark { - type Unpark = MockUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MockUnpark - } - - fn park(&mut self) -> Result<(), Self::Error> { - panic!("parking forever"); - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - assert_eq!(duration, ms(0)); - self.0.advance(ms(436)); - Ok(()) - } - - fn shutdown(&mut self) {} - } - - impl Unpark for MockUnpark { - fn unpark(&self) {} - } - - let clock = Clock::new(); - clock.pause(); - let start = clock.now(); - let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); - let handle = driver.handle(); - - let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(100))); - let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(101))); - let mut e3 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); - - assert_pending!(poll!(e1)); - assert_pending!(poll!(e2)); - assert_pending!(poll!(e3)); - - assert_ok!(driver.park()); - - assert_eq!(clock.now() - start, ms(500)); - - assert_ready_ok!(poll!(e1)); - assert_ready_ok!(poll!(e2)); - assert_ready_ok!(poll!(e3)); -} - -#[test] -fn set_timeout_at_deadline_greater_than_max_timer() { - const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; - const YR_5: u64 = 5 * YR_1; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - for _ in 0..5 { - assert_ok!(driver.park_timeout(ms(YR_1))); - } - - let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); - assert_pending!(poll!(e)); - - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(YR_5) + ms(1)); - - assert_ready_ok!(poll!(e)); -} - -fn setup() -> (Driver<MockPark>, Clock, Handle) { - let clock = Clock::new(); - clock.pause(); - let driver = Driver::new(MockPark(clock.clone()), clock.clone()); - let handle = driver.handle(); - - (driver, clock, handle) -} - -fn sleep_until(handle: &Handle, when: Instant) -> Arc<Entry> { - Entry::new(&handle, when, ms(0)) -} - -struct MockPark(Clock); - -struct MockUnpark; - -impl Park for MockPark { - type Unpark = MockUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MockUnpark - } - - fn park(&mut self) -> Result<(), Self::Error> { - panic!("parking forever"); - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.0.advance(duration); - Ok(()) - } - - fn shutdown(&mut self) {} -} - -impl Unpark for MockUnpark { - fn unpark(&self) {} -} - -fn ms(n: u64) -> Duration { - Duration::from_millis(n) -} -*/ diff --git a/vendor/tokio/src/time/timeout.rs b/vendor/tokio/src/time/timeout.rs index 61964ad24..52ab9891c 100644 --- a/vendor/tokio/src/time/timeout.rs +++ b/vendor/tokio/src/time/timeout.rs @@ -4,20 +4,39 @@ //! //! [`Timeout`]: struct@Timeout -use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}; +use crate::{ + runtime::coop, + time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + util::trace, +}; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; -/// Require a `Future` to complete before the specified duration has elapsed. +/// Requires a `Future` to complete before the specified duration has elapsed. /// /// If the future completes before the duration has elapsed, then the completed /// value is returned. Otherwise, an error is returned and the future is /// canceled. /// -/// # Cancelation +/// Note that the timeout is checked before polling the future, so if the future +/// does not yield during execution then it is possible for the future to complete +/// and exceed the timeout _without_ returning an error. +/// +/// This function returns a future whose return type is [`Result`]`<T,`[`Elapsed`]`>`, where `T` is the +/// return type of the provided future. +/// +/// If the provided future completes immediately, then the future returned from +/// this function is guaranteed to complete immediately with an [`Ok`] variant +/// no matter the provided duration. +/// +/// [`Ok`]: std::result::Result::Ok +/// [`Result`]: std::result::Result +/// [`Elapsed`]: crate::time::error::Elapsed +/// +/// # Cancellation /// /// Cancelling a timeout is done by dropping the future. No additional cleanup /// or other work is required. @@ -45,24 +64,56 @@ use std::task::{self, Poll}; /// } /// # } /// ``` -pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +#[track_caller] +pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F> where - T: Future, + F: Future, { + let location = trace::caller_location(); + let deadline = Instant::now().checked_add(duration); let delay = match deadline { - Some(deadline) => Sleep::new_timeout(deadline), - None => Sleep::far_future(), + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::far_future(location), }; Timeout::new_with_delay(future, delay) } -/// Require a `Future` to complete before the specified instant in time. +/// Requires a `Future` to complete before the specified instant in time. /// /// If the future completes before the instant is reached, then the completed /// value is returned. Otherwise, an error is returned. /// -/// # Cancelation +/// This function returns a future whose return type is [`Result`]`<T,`[`Elapsed`]`>`, where `T` is the +/// return type of the provided future. +/// +/// If the provided future completes immediately, then the future returned from +/// this function is guaranteed to complete immediately with an [`Ok`] variant +/// no matter the provided deadline. +/// +/// [`Ok`]: std::result::Result::Ok +/// [`Result`]: std::result::Result +/// [`Elapsed`]: crate::time::error::Elapsed +/// +/// # Cancellation /// /// Cancelling a timeout is done by dropping the future. No additional cleanup /// or other work is required. @@ -91,9 +142,9 @@ where /// } /// # } /// ``` -pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T> +pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F> where - T: Future, + F: Future, { let delay = sleep_until(deadline); @@ -145,15 +196,33 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { let me = self.project(); + let had_budget_before = coop::has_budget_remaining(); + // First, try polling the future if let Poll::Ready(v) = me.value.poll(cx) { return Poll::Ready(Ok(v)); } - // Now check the timer - match me.delay.poll(cx) { - Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), - Poll::Pending => Poll::Pending, + let has_budget_now = coop::has_budget_remaining(); + + let delay = me.delay; + + let poll_delay = || -> Poll<Self::Output> { + match delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), + Poll::Pending => Poll::Pending, + } + }; + + if let (true, false) = (had_budget_before, has_budget_now) { + // if it is the underlying future that exhausted the budget, we poll + // the `delay` with an unconstrained one. This prevents pathological + // cases where the underlying future always exhausts the budget and + // we never get a chance to evaluate whether the timeout was hit or + // not. + coop::with_unconstrained(poll_delay) + } else { + poll_delay() } } } |