summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/time
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/time
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/time')
-rw-r--r--vendor/tokio/src/time/clock.rs186
-rw-r--r--vendor/tokio/src/time/driver/entry.rs629
-rw-r--r--vendor/tokio/src/time/driver/handle.rs88
-rw-r--r--vendor/tokio/src/time/driver/mod.rs520
-rw-r--r--vendor/tokio/src/time/driver/sleep.rs257
-rw-r--r--vendor/tokio/src/time/driver/tests/mod.rs287
-rw-r--r--vendor/tokio/src/time/driver/wheel/level.rs275
-rw-r--r--vendor/tokio/src/time/driver/wheel/mod.rs359
-rw-r--r--vendor/tokio/src/time/driver/wheel/stack.rs112
-rw-r--r--vendor/tokio/src/time/error.rs9
-rw-r--r--vendor/tokio/src/time/instant.rs24
-rw-r--r--vendor/tokio/src/time/interval.rs117
-rw-r--r--vendor/tokio/src/time/mod.rs13
-rw-r--r--vendor/tokio/src/time/sleep.rs452
-rw-r--r--vendor/tokio/src/time/tests/mod.rs22
-rw-r--r--vendor/tokio/src/time/tests/test_sleep.rs443
-rw-r--r--vendor/tokio/src/time/timeout.rs99
17 files changed, 803 insertions, 3089 deletions
diff --git a/vendor/tokio/src/time/clock.rs b/vendor/tokio/src/time/clock.rs
index a44d75f3c..091cf4b19 100644
--- a/vendor/tokio/src/time/clock.rs
+++ b/vendor/tokio/src/time/clock.rs
@@ -29,26 +29,53 @@ cfg_not_test_util! {
cfg_test_util! {
use crate::time::{Duration, Instant};
- use crate::loom::sync::{Arc, Mutex};
+ use crate::loom::sync::Mutex;
+ use crate::loom::sync::atomic::Ordering;
+ use std::sync::atomic::AtomicBool as StdAtomicBool;
cfg_rt! {
- fn clock() -> Option<Clock> {
- crate::runtime::context::clock()
+ #[track_caller]
+ fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
+ use crate::runtime::Handle;
+
+ let res = match Handle::try_current() {
+ Ok(handle) => f(Some(handle.inner.driver().clock())),
+ Err(ref e) if e.is_missing_context() => f(None),
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ };
+
+ match res {
+ Ok(ret) => ret,
+ Err(msg) => panic!("{}", msg),
+ }
}
}
cfg_not_rt! {
- fn clock() -> Option<Clock> {
- None
+ #[track_caller]
+ fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
+ match f(None) {
+ Ok(ret) => ret,
+ Err(msg) => panic!("{}", msg),
+ }
}
}
/// A handle to a source of time.
- #[derive(Debug, Clone)]
+ #[derive(Debug)]
pub(crate) struct Clock {
- inner: Arc<Mutex<Inner>>,
+ inner: Mutex<Inner>,
}
+ // Used to track if the clock was ever paused. This is an optimization to
+ // avoid touching the mutex if `test-util` was accidentally enabled in
+ // release mode.
+ //
+ // A static is used so we can avoid accessing the thread-local as well. The
+ // `std` AtomicBool is used directly because loom does not support static
+ // atomics.
+ static DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false);
+
#[derive(Debug)]
struct Inner {
/// True if the ability to pause time is enabled.
@@ -57,22 +84,34 @@ cfg_test_util! {
/// Instant to use as the clock's base instant.
base: std::time::Instant,
- /// Instant at which the clock was last unfrozen
+ /// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,
+
+ /// Number of `inhibit_auto_advance` calls still in effect.
+ auto_advance_inhibit_count: usize,
}
- /// Pause time
+ /// Pauses time.
///
/// The current value of `Instant::now()` is saved and all subsequent calls
- /// to `Instant::now()` until the timer wheel is checked again will return
- /// the saved value. Once the timer wheel is checked, time will immediately
- /// advance to the next registered `Sleep`. This is useful for running tests
- /// that depend on time.
+ /// to `Instant::now()` will return the saved value. The saved value can be
+ /// changed by [`advance`] or by the time auto-advancing once the runtime
+ /// has no work to do. This only affects the `Instant` type in Tokio, and
+ /// the `Instant` in std continues to work as normal.
///
/// Pausing time requires the `current_thread` Tokio runtime. This is the
/// default runtime used by `#[tokio::test]`. The runtime can be initialized
/// with time in a paused state using the `Builder::start_paused` method.
///
+ /// For cases where time is immediately paused, it is better to pause
+ /// the time using the `main` or `test` macro:
+ /// ```
+ /// #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// println!("Hello world");
+ /// }
+ /// ```
+ ///
/// # Panics
///
/// Panics if time is already frozen or if called from outside of a
@@ -86,12 +125,18 @@ cfg_test_util! {
/// current time when awaited.
///
/// [`Sleep`]: crate::time::Sleep
+ /// [`advance`]: crate::time::advance
+ #[track_caller]
pub fn pause() {
- let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
- clock.pause();
+ with_clock(|maybe_clock| {
+ match maybe_clock {
+ Some(clock) => clock.pause(),
+ None => Err("time cannot be frozen from outside the Tokio runtime"),
+ }
+ })
}
- /// Resume time
+ /// Resumes time.
///
/// Clears the saved `Instant::now()` value. Subsequent calls to
/// `Instant::now()` will return the value returned by the system call.
@@ -100,22 +145,44 @@ cfg_test_util! {
///
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
+ #[track_caller]
pub fn resume() {
- let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
- let mut inner = clock.inner.lock();
+ with_clock(|maybe_clock| {
+ let clock = match maybe_clock {
+ Some(clock) => clock,
+ None => return Err("time cannot be frozen from outside the Tokio runtime"),
+ };
- if inner.unfrozen.is_some() {
- panic!("time is not frozen");
- }
+ let mut inner = clock.inner.lock();
+
+ if inner.unfrozen.is_some() {
+ return Err("time is not frozen");
+ }
- inner.unfrozen = Some(std::time::Instant::now());
+ inner.unfrozen = Some(std::time::Instant::now());
+ Ok(())
+ })
}
- /// Advance time
+ /// Advances time.
///
/// Increments the saved `Instant::now()` value by `duration`. Subsequent
/// calls to `Instant::now()` will return the result of the increment.
///
+ /// This function will make the current time jump forward by the given
+ /// duration in one jump. This means that all `sleep` calls with a deadline
+ /// before the new time will immediately complete "at the same time", and
+ /// the runtime is free to poll them in any order. Additionally, this
+ /// method will not wait for the `sleep` calls it advanced past to complete.
+ /// If you want to do that, you should instead call [`sleep`] and rely on
+ /// the runtime's auto-advance feature.
+ ///
+ /// Note that calls to `sleep` are not guaranteed to complete the first time
+ /// they are polled after a call to `advance`. For example, this can happen
+ /// if the runtime has not yet touched the timer driver after the call to
+ /// `advance`. However if they don't, the runtime will poll the task again
+ /// shortly.
+ ///
/// # Panics
///
/// Panics if time is not frozen or if called from outside of the Tokio
@@ -126,70 +193,107 @@ cfg_test_util! {
/// If the time is paused and there is no work to do, the runtime advances
/// time to the next timer. See [`pause`](pause#auto-advance) for more
/// details.
+ ///
+ /// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
- let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
- clock.advance(duration);
+ with_clock(|maybe_clock| {
+ let clock = match maybe_clock {
+ Some(clock) => clock,
+ None => return Err("time cannot be frozen from outside the Tokio runtime"),
+ };
+
+ clock.advance(duration)
+ });
crate::task::yield_now().await;
}
- /// Return the current instant, factoring in frozen time.
+ /// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
- if let Some(clock) = clock() {
- clock.now()
- } else {
- Instant::from_std(std::time::Instant::now())
+ if !DID_PAUSE_CLOCK.load(Ordering::Acquire) {
+ return Instant::from_std(std::time::Instant::now());
}
+
+ with_clock(|maybe_clock| {
+ Ok(if let Some(clock) = maybe_clock {
+ clock.now()
+ } else {
+ Instant::from_std(std::time::Instant::now())
+ })
+ })
}
impl Clock {
- /// Return a new `Clock` instance that uses the current execution context's
+ /// Returns a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
let now = std::time::Instant::now();
let clock = Clock {
- inner: Arc::new(Mutex::new(Inner {
+ inner: Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
- })),
+ auto_advance_inhibit_count: 0,
+ }),
};
if start_paused {
- clock.pause();
+ if let Err(msg) = clock.pause() {
+ panic!("{}", msg);
+ }
}
clock
}
- pub(crate) fn pause(&self) {
+ pub(crate) fn pause(&self) -> Result<(), &'static str> {
let mut inner = self.inner.lock();
if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
- panic!("`time::pause()` requires the `current_thread` Tokio runtime. \
+ return Err("`time::pause()` requires the `current_thread` Tokio runtime. \
This is the default Runtime used by `#[tokio::test].");
}
- let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed();
+ // Track that we paused the clock
+ DID_PAUSE_CLOCK.store(true, Ordering::Release);
+
+ let elapsed = match inner.unfrozen.as_ref() {
+ Some(v) => v.elapsed(),
+ None => return Err("time is already frozen")
+ };
inner.base += elapsed;
inner.unfrozen = None;
+
+ Ok(())
+ }
+
+ /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
+ pub(crate) fn inhibit_auto_advance(&self) {
+ let mut inner = self.inner.lock();
+ inner.auto_advance_inhibit_count += 1;
+ }
+
+ pub(crate) fn allow_auto_advance(&self) {
+ let mut inner = self.inner.lock();
+ inner.auto_advance_inhibit_count -= 1;
}
- pub(crate) fn is_paused(&self) -> bool {
+ pub(crate) fn can_auto_advance(&self) -> bool {
let inner = self.inner.lock();
- inner.unfrozen.is_none()
+ inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}
- pub(crate) fn advance(&self, duration: Duration) {
+ pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> {
let mut inner = self.inner.lock();
if inner.unfrozen.is_some() {
- panic!("time is not frozen");
+ return Err("time is not frozen");
}
inner.base += duration;
+ Ok(())
}
pub(crate) fn now(&self) -> Instant {
diff --git a/vendor/tokio/src/time/driver/entry.rs b/vendor/tokio/src/time/driver/entry.rs
deleted file mode 100644
index 168e0b995..000000000
--- a/vendor/tokio/src/time/driver/entry.rs
+++ /dev/null
@@ -1,629 +0,0 @@
-//! Timer state structures.
-//!
-//! This module contains the heart of the intrusive timer implementation, and as
-//! such the structures inside are full of tricky concurrency and unsafe code.
-//!
-//! # Ground rules
-//!
-//! The heart of the timer implementation here is the `TimerShared` structure,
-//! shared between the `TimerEntry` and the driver. Generally, we permit access
-//! to `TimerShared` ONLY via either 1) a mutable reference to `TimerEntry` or
-//! 2) a held driver lock.
-//!
-//! It follows from this that any changes made while holding BOTH 1 and 2 will
-//! be reliably visible, regardless of ordering. This is because of the acq/rel
-//! fences on the driver lock ensuring ordering with 2, and rust mutable
-//! reference rules for 1 (a mutable reference to an object can't be passed
-//! between threads without an acq/rel barrier, and same-thread we have local
-//! happens-before ordering).
-//!
-//! # State field
-//!
-//! Each timer has a state field associated with it. This field contains either
-//! the current scheduled time, or a special flag value indicating its state.
-//! This state can either indicate that the timer is on the 'pending' queue (and
-//! thus will be fired with an `Ok(())` result soon) or that it has already been
-//! fired/deregistered.
-//!
-//! This single state field allows for code that is firing the timer to
-//! synchronize with any racing `reset` calls reliably.
-//!
-//! # Cached vs true timeouts
-//!
-//! To allow for the use case of a timeout that is periodically reset before
-//! expiration to be as lightweight as possible, we support optimistically
-//! lock-free timer resets, in the case where a timer is rescheduled to a later
-//! point than it was originally scheduled for.
-//!
-//! This is accomplished by lazily rescheduling timers. That is, we update the
-//! state field field with the true expiration of the timer from the holder of
-//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
-//! walking lists of timers), it checks this "true when" value, and reschedules
-//! based on it.
-//!
-//! We do, however, also need to track what the expiration time was when we
-//! originally registered the timer; this is used to locate the right linked
-//! list when the timer is being cancelled. This is referred to as the "cached
-//! when" internally.
-//!
-//! There is of course a race condition between timer reset and timer
-//! expiration. If the driver fails to observe the updated expiration time, it
-//! could trigger expiration of the timer too early. However, because
-//! `mark_pending` performs a compare-and-swap, it will identify this race and
-//! refuse to mark the timer as pending.
-
-use crate::loom::cell::UnsafeCell;
-use crate::loom::sync::atomic::AtomicU64;
-use crate::loom::sync::atomic::Ordering;
-
-use crate::sync::AtomicWaker;
-use crate::time::Instant;
-use crate::util::linked_list;
-
-use super::Handle;
-
-use std::cell::UnsafeCell as StdUnsafeCell;
-use std::task::{Context, Poll, Waker};
-use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
-
-type TimerResult = Result<(), crate::time::error::Error>;
-
-const STATE_DEREGISTERED: u64 = u64::MAX;
-const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
-const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
-
-/// This structure holds the current shared state of the timer - its scheduled
-/// time (if registered), or otherwise the result of the timer completing, as
-/// well as the registered waker.
-///
-/// Generally, the StateCell is only permitted to be accessed from two contexts:
-/// Either a thread holding the corresponding &mut TimerEntry, or a thread
-/// holding the timer driver lock. The write actions on the StateCell amount to
-/// passing "ownership" of the StateCell between these contexts; moving a timer
-/// from the TimerEntry to the driver requires _both_ holding the &mut
-/// TimerEntry and the driver lock, while moving it back (firing the timer)
-/// requires only the driver lock.
-pub(super) struct StateCell {
- /// Holds either the scheduled expiration time for this timer, or (if the
- /// timer has been fired and is unregistered), `u64::MAX`.
- state: AtomicU64,
- /// If the timer is fired (an Acquire order read on state shows
- /// `u64::MAX`), holds the result that should be returned from
- /// polling the timer. Otherwise, the contents are unspecified and reading
- /// without holding the driver lock is undefined behavior.
- result: UnsafeCell<TimerResult>,
- /// The currently-registered waker
- waker: CachePadded<AtomicWaker>,
-}
-
-impl Default for StateCell {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl std::fmt::Debug for StateCell {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "StateCell({:?})", self.read_state())
- }
-}
-
-impl StateCell {
- fn new() -> Self {
- Self {
- state: AtomicU64::new(STATE_DEREGISTERED),
- result: UnsafeCell::new(Ok(())),
- waker: CachePadded(AtomicWaker::new()),
- }
- }
-
- fn is_pending(&self) -> bool {
- self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
- }
-
- /// Returns the current expiration time, or None if not currently scheduled.
- fn when(&self) -> Option<u64> {
- let cur_state = self.state.load(Ordering::Relaxed);
-
- if cur_state == u64::MAX {
- None
- } else {
- Some(cur_state)
- }
- }
-
- /// If the timer is completed, returns the result of the timer. Otherwise,
- /// returns None and registers the waker.
- fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
- // We must register first. This ensures that either `fire` will
- // observe the new waker, or we will observe a racing fire to have set
- // the state, or both.
- self.waker.0.register_by_ref(waker);
-
- self.read_state()
- }
-
- fn read_state(&self) -> Poll<TimerResult> {
- let cur_state = self.state.load(Ordering::Acquire);
-
- if cur_state == STATE_DEREGISTERED {
- // SAFETY: The driver has fired this timer; this involves writing
- // the result, and then writing (with release ordering) the state
- // field.
- Poll::Ready(unsafe { self.result.with(|p| *p) })
- } else {
- Poll::Pending
- }
- }
-
- /// Marks this timer as being moved to the pending list, if its scheduled
- /// time is not after `not_after`.
- ///
- /// If the timer is scheduled for a time after not_after, returns an Err
- /// containing the current scheduled time.
- ///
- /// SAFETY: Must hold the driver lock.
- unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
- // Quick initial debug check to see if the timer is already fired. Since
- // firing the timer can only happen with the driver lock held, we know
- // we shouldn't be able to "miss" a transition to a fired state, even
- // with relaxed ordering.
- let mut cur_state = self.state.load(Ordering::Relaxed);
-
- loop {
- debug_assert!(cur_state < STATE_MIN_VALUE);
-
- if cur_state > not_after {
- break Err(cur_state);
- }
-
- match self.state.compare_exchange(
- cur_state,
- STATE_PENDING_FIRE,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => {
- break Ok(());
- }
- Err(actual_state) => {
- cur_state = actual_state;
- }
- }
- }
- }
-
- /// Fires the timer, setting the result to the provided result.
- ///
- /// Returns:
- /// * `Some(waker) - if fired and a waker needs to be invoked once the
- /// driver lock is released
- /// * `None` - if fired and a waker does not need to be invoked, or if
- /// already fired
- ///
- /// SAFETY: The driver lock must be held.
- unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
- // Quick initial check to see if the timer is already fired. Since
- // firing the timer can only happen with the driver lock held, we know
- // we shouldn't be able to "miss" a transition to a fired state, even
- // with relaxed ordering.
- let cur_state = self.state.load(Ordering::Relaxed);
- if cur_state == STATE_DEREGISTERED {
- return None;
- }
-
- // SAFETY: We assume the driver lock is held and the timer is not
- // fired, so only the driver is accessing this field.
- //
- // We perform a release-ordered store to state below, to ensure this
- // write is visible before the state update is visible.
- unsafe { self.result.with_mut(|p| *p = result) };
-
- self.state.store(STATE_DEREGISTERED, Ordering::Release);
-
- self.waker.0.take_waker()
- }
-
- /// Marks the timer as registered (poll will return None) and sets the
- /// expiration time.
- ///
- /// While this function is memory-safe, it should only be called from a
- /// context holding both `&mut TimerEntry` and the driver lock.
- fn set_expiration(&self, timestamp: u64) {
- debug_assert!(timestamp < STATE_MIN_VALUE);
-
- // We can use relaxed ordering because we hold the driver lock and will
- // fence when we release the lock.
- self.state.store(timestamp, Ordering::Relaxed);
- }
-
- /// Attempts to adjust the timer to a new timestamp.
- ///
- /// If the timer has already been fired, is pending firing, or the new
- /// timestamp is earlier than the old timestamp, (or occasionally
- /// spuriously) returns Err without changing the timer's state. In this
- /// case, the timer must be deregistered and re-registered.
- fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
- let mut prior = self.state.load(Ordering::Relaxed);
- loop {
- if new_timestamp < prior || prior >= STATE_MIN_VALUE {
- return Err(());
- }
-
- match self.state.compare_exchange_weak(
- prior,
- new_timestamp,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => {
- return Ok(());
- }
- Err(true_prior) => {
- prior = true_prior;
- }
- }
- }
- }
-
- /// Returns true if the state of this timer indicates that the timer might
- /// be registered with the driver. This check is performed with relaxed
- /// ordering, but is conservative - if it returns false, the timer is
- /// definitely _not_ registered.
- pub(super) fn might_be_registered(&self) -> bool {
- self.state.load(Ordering::Relaxed) != u64::MAX
- }
-}
-
-/// A timer entry.
-///
-/// This is the handle to a timer that is controlled by the requester of the
-/// timer. As this participates in intrusive data structures, it must be pinned
-/// before polling.
-#[derive(Debug)]
-pub(super) struct TimerEntry {
- /// Arc reference to the driver. We can only free the driver after
- /// deregistering everything from their respective timer wheels.
- driver: Handle,
- /// Shared inner structure; this is part of an intrusive linked list, and
- /// therefore other references can exist to it while mutable references to
- /// Entry exist.
- ///
- /// This is manipulated only under the inner mutex. TODO: Can we use loom
- /// cells for this?
- inner: StdUnsafeCell<TimerShared>,
- /// Initial deadline for the timer. This is used to register on the first
- /// poll, as we can't register prior to being pinned.
- initial_deadline: Option<Instant>,
- /// Ensure the type is !Unpin
- _m: std::marker::PhantomPinned,
-}
-
-unsafe impl Send for TimerEntry {}
-unsafe impl Sync for TimerEntry {}
-
-/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the
-/// timer entry. Generally, at most one TimerHandle exists for a timer at a time
-/// (enforced by the timer state machine).
-///
-/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats
-/// of pointer safety apply. In particular, TimerHandle does not itself enforce
-/// that the timer does still exist; however, normally an TimerHandle is created
-/// immediately before registering the timer, and is consumed when firing the
-/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce
-/// memory safety, all operations are unsafe.
-#[derive(Debug)]
-pub(crate) struct TimerHandle {
- inner: NonNull<TimerShared>,
-}
-
-pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
-
-/// The shared state structure of a timer. This structure is shared between the
-/// frontend (`Entry`) and driver backend.
-///
-/// Note that this structure is located inside the `TimerEntry` structure.
-#[derive(Debug)]
-pub(crate) struct TimerShared {
- /// Current state. This records whether the timer entry is currently under
- /// the ownership of the driver, and if not, its current state (not
- /// complete, fired, error, etc).
- state: StateCell,
-
- /// Data manipulated by the driver thread itself, only.
- driver_state: CachePadded<TimerSharedPadded>,
-
- _p: PhantomPinned,
-}
-
-impl TimerShared {
- pub(super) fn new() -> Self {
- Self {
- state: StateCell::default(),
- driver_state: CachePadded(TimerSharedPadded::new()),
- _p: PhantomPinned,
- }
- }
-
- /// Gets the cached time-of-expiration value
- pub(super) fn cached_when(&self) -> u64 {
- // Cached-when is only accessed under the driver lock, so we can use relaxed
- self.driver_state.0.cached_when.load(Ordering::Relaxed)
- }
-
- /// Gets the true time-of-expiration value, and copies it into the cached
- /// time-of-expiration value.
- ///
- /// SAFETY: Must be called with the driver lock held, and when this entry is
- /// not in any timer wheel lists.
- pub(super) unsafe fn sync_when(&self) -> u64 {
- let true_when = self.true_when();
-
- self.driver_state
- .0
- .cached_when
- .store(true_when, Ordering::Relaxed);
-
- true_when
- }
-
- /// Sets the cached time-of-expiration value.
- ///
- /// SAFETY: Must be called with the driver lock held, and when this entry is
- /// not in any timer wheel lists.
- unsafe fn set_cached_when(&self, when: u64) {
- self.driver_state
- .0
- .cached_when
- .store(when, Ordering::Relaxed);
- }
-
- /// Returns the true time-of-expiration value, with relaxed memory ordering.
- pub(super) fn true_when(&self) -> u64 {
- self.state.when().expect("Timer already fired")
- }
-
- /// Sets the true time-of-expiration value, even if it is less than the
- /// current expiration or the timer is deregistered.
- ///
- /// SAFETY: Must only be called with the driver lock held and the entry not
- /// in the timer wheel.
- pub(super) unsafe fn set_expiration(&self, t: u64) {
- self.state.set_expiration(t);
- self.driver_state.0.cached_when.store(t, Ordering::Relaxed);
- }
-
- /// Sets the true time-of-expiration only if it is after the current.
- pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
- self.state.extend_expiration(t)
- }
-
- /// Returns a TimerHandle for this timer.
- pub(super) fn handle(&self) -> TimerHandle {
- TimerHandle {
- inner: NonNull::from(self),
- }
- }
-
- /// Returns true if the state of this timer indicates that the timer might
- /// be registered with the driver. This check is performed with relaxed
- /// ordering, but is conservative - if it returns false, the timer is
- /// definitely _not_ registered.
- pub(super) fn might_be_registered(&self) -> bool {
- self.state.might_be_registered()
- }
-}
-
-/// Additional shared state between the driver and the timer which is cache
-/// padded. This contains the information that the driver thread accesses most
-/// frequently to minimize contention. In particular, we move it away from the
-/// waker, as the waker is updated on every poll.
-struct TimerSharedPadded {
- /// The expiration time for which this entry is currently registered.
- /// Generally owned by the driver, but is accessed by the entry when not
- /// registered.
- cached_when: AtomicU64,
-
- /// The true expiration time. Set by the timer future, read by the driver.
- true_when: AtomicU64,
-
- /// A link within the doubly-linked list of timers on a particular level and
- /// slot. Valid only if state is equal to Registered.
- ///
- /// Only accessed under the entry lock.
- pointers: StdUnsafeCell<linked_list::Pointers<TimerShared>>,
-}
-
-impl std::fmt::Debug for TimerSharedPadded {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("TimerSharedPadded")
- .field("when", &self.true_when.load(Ordering::Relaxed))
- .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
- .finish()
- }
-}
-
-impl TimerSharedPadded {
- fn new() -> Self {
- Self {
- cached_when: AtomicU64::new(0),
- true_when: AtomicU64::new(0),
- pointers: StdUnsafeCell::new(linked_list::Pointers::new()),
- }
- }
-}
-
-unsafe impl Send for TimerShared {}
-unsafe impl Sync for TimerShared {}
-
-unsafe impl linked_list::Link for TimerShared {
- type Handle = TimerHandle;
-
- type Target = TimerShared;
-
- fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
- handle.inner
- }
-
- unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
- TimerHandle { inner: ptr }
- }
-
- unsafe fn pointers(
- target: NonNull<Self::Target>,
- ) -> NonNull<linked_list::Pointers<Self::Target>> {
- unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() }
- }
-}
-
-// ===== impl Entry =====
-
-impl TimerEntry {
- pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self {
- let driver = handle.clone();
-
- Self {
- driver,
- inner: StdUnsafeCell::new(TimerShared::new()),
- initial_deadline: Some(deadline),
- _m: std::marker::PhantomPinned,
- }
- }
-
- fn inner(&self) -> &TimerShared {
- unsafe { &*self.inner.get() }
- }
-
- pub(crate) fn is_elapsed(&self) -> bool {
- !self.inner().state.might_be_registered() && self.initial_deadline.is_none()
- }
-
- /// Cancels and deregisters the timer. This operation is irreversible.
- pub(crate) fn cancel(self: Pin<&mut Self>) {
- // We need to perform an acq/rel fence with the driver thread, and the
- // simplest way to do so is to grab the driver lock.
- //
- // Why is this necessary? We're about to release this timer's memory for
- // some other non-timer use. However, we've been doing a bunch of
- // relaxed (or even non-atomic) writes from the driver thread, and we'll
- // be doing more from _this thread_ (as this memory is interpreted as
- // something else).
- //
- // It is critical to ensure that, from the point of view of the driver,
- // those future non-timer writes happen-after the timer is fully fired,
- // and from the purpose of this thread, the driver's writes all
- // happen-before we drop the timer. This in turn requires us to perform
- // an acquire-release barrier in _both_ directions between the driver
- // and dropping thread.
- //
- // The lock acquisition in clear_entry serves this purpose. All of the
- // driver manipulations happen with the lock held, so we can just take
- // the lock and be sure that this drop happens-after everything the
- // driver did so far and happens-before everything the driver does in
- // the future. While we have the lock held, we also go ahead and
- // deregister the entry if necessary.
- unsafe { self.driver.clear_entry(NonNull::from(self.inner())) };
- }
-
- pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
- unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None;
-
- let tick = self.driver.time_source().deadline_to_tick(new_time);
-
- if self.inner().extend_expiration(tick).is_ok() {
- return;
- }
-
- unsafe {
- self.driver.reregister(tick, self.inner().into());
- }
- }
-
- pub(crate) fn poll_elapsed(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), super::Error>> {
- if self.driver.is_shutdown() {
- panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
- }
-
- if let Some(deadline) = self.initial_deadline {
- self.as_mut().reset(deadline);
- }
-
- let this = unsafe { self.get_unchecked_mut() };
-
- this.inner().state.poll(cx.waker())
- }
-}
-
-impl TimerHandle {
- pub(super) unsafe fn cached_when(&self) -> u64 {
- unsafe { self.inner.as_ref().cached_when() }
- }
-
- pub(super) unsafe fn sync_when(&self) -> u64 {
- unsafe { self.inner.as_ref().sync_when() }
- }
-
- pub(super) unsafe fn is_pending(&self) -> bool {
- unsafe { self.inner.as_ref().state.is_pending() }
- }
-
- /// Forcibly sets the true and cached expiration times to the given tick.
- ///
- /// SAFETY: The caller must ensure that the handle remains valid, the driver
- /// lock is held, and that the timer is not in any wheel linked lists.
- pub(super) unsafe fn set_expiration(&self, tick: u64) {
- self.inner.as_ref().set_expiration(tick);
- }
-
- /// Attempts to mark this entry as pending. If the expiration time is after
- /// `not_after`, however, returns an Err with the current expiration time.
- ///
- /// If an `Err` is returned, the `cached_when` value will be updated to this
- /// new expiration time.
- ///
- /// SAFETY: The caller must ensure that the handle remains valid, the driver
- /// lock is held, and that the timer is not in any wheel linked lists.
- /// After returning Ok, the entry must be added to the pending list.
- pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
- match self.inner.as_ref().state.mark_pending(not_after) {
- Ok(()) => {
- // mark this as being on the pending queue in cached_when
- self.inner.as_ref().set_cached_when(u64::MAX);
- Ok(())
- }
- Err(tick) => {
- self.inner.as_ref().set_cached_when(tick);
- Err(tick)
- }
- }
- }
-
- /// Attempts to transition to a terminal state. If the state is already a
- /// terminal state, does nothing.
- ///
- /// Because the entry might be dropped after the state is moved to a
- /// terminal state, this function consumes the handle to ensure we don't
- /// access the entry afterwards.
- ///
- /// Returns the last-registered waker, if any.
- ///
- /// SAFETY: The driver lock must be held while invoking this function, and
- /// the entry must not be in any wheel linked lists.
- pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
- self.inner.as_ref().state.fire(completed_state)
- }
-}
-
-impl Drop for TimerEntry {
- fn drop(&mut self) {
- unsafe { Pin::new_unchecked(self) }.as_mut().cancel()
- }
-}
-
-#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
-#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
-#[derive(Debug, Default)]
-struct CachePadded<T>(T);
diff --git a/vendor/tokio/src/time/driver/handle.rs b/vendor/tokio/src/time/driver/handle.rs
deleted file mode 100644
index 77b435873..000000000
--- a/vendor/tokio/src/time/driver/handle.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-use crate::loom::sync::Arc;
-use crate::time::driver::ClockTime;
-use std::fmt;
-
-/// Handle to time driver instance.
-#[derive(Clone)]
-pub(crate) struct Handle {
- time_source: ClockTime,
- inner: Arc<super::Inner>,
-}
-
-impl Handle {
- /// Creates a new timer `Handle` from a shared `Inner` timer state.
- pub(super) fn new(inner: Arc<super::Inner>) -> Self {
- let time_source = inner.state.lock().time_source.clone();
- Handle { time_source, inner }
- }
-
- /// Returns the time source associated with this handle
- pub(super) fn time_source(&self) -> &ClockTime {
- &self.time_source
- }
-
- /// Access the driver's inner structure
- pub(super) fn get(&self) -> &super::Inner {
- &*self.inner
- }
-
- // Check whether the driver has been shutdown
- pub(super) fn is_shutdown(&self) -> bool {
- self.inner.is_shutdown()
- }
-}
-
-cfg_rt! {
- impl Handle {
- /// Tries to get a handle to the current timer.
- ///
- /// # Panics
- ///
- /// This function panics if there is no current timer set.
- ///
- /// It can be triggered when `Builder::enable_time()` or
- /// `Builder::enable_all()` are not included in the builder.
- ///
- /// It can also panic whenever a timer is created outside of a
- /// Tokio runtime. That is why `rt.block_on(delay_for(...))` will panic,
- /// since the function is executed outside of the runtime.
- /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't panic.
- /// And this is because wrapping the function on an async makes it lazy,
- /// and so gets executed inside the runtime successfully without
- /// panicking.
- pub(crate) fn current() -> Self {
- crate::runtime::context::time_handle()
- .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
- }
- }
-}
-
-cfg_not_rt! {
- impl Handle {
- /// Tries to get a handle to the current timer.
- ///
- /// # Panics
- ///
- /// This function panics if there is no current timer set.
- ///
- /// It can be triggered when `Builder::enable_time()` or
- /// `Builder::enable_all()` are not included in the builder.
- ///
- /// It can also panic whenever a timer is created outside of a Tokio
- /// runtime. That is why `rt.block_on(delay_for(...))` will panic,
- /// since the function is executed outside of the runtime.
- /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't
- /// panic. And this is because wrapping the function on an async makes it
- /// lazy, and so outside executed inside the runtime successfully without
- /// panicking.
- pub(crate) fn current() -> Self {
- panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
- }
- }
-}
-
-impl fmt::Debug for Handle {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Handle")
- }
-}
diff --git a/vendor/tokio/src/time/driver/mod.rs b/vendor/tokio/src/time/driver/mod.rs
deleted file mode 100644
index 37d2231c3..000000000
--- a/vendor/tokio/src/time/driver/mod.rs
+++ /dev/null
@@ -1,520 +0,0 @@
-// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
-// in the future, this will change to the reverse. For now, suppress this
-// warning and generally stick with being explicit about unsafety.
-#![allow(unused_unsafe)]
-#![cfg_attr(not(feature = "rt"), allow(dead_code))]
-
-//! Time driver
-
-mod entry;
-pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
-
-mod handle;
-pub(crate) use self::handle::Handle;
-
-mod wheel;
-
-pub(super) mod sleep;
-
-use crate::loom::sync::atomic::{AtomicBool, Ordering};
-use crate::loom::sync::{Arc, Mutex};
-use crate::park::{Park, Unpark};
-use crate::time::error::Error;
-use crate::time::{Clock, Duration, Instant};
-
-use std::convert::TryInto;
-use std::fmt;
-use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
-
-/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
-///
-/// A `Driver` instance tracks the state necessary for managing time and
-/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
-///
-/// It is expected that a single instance manages many individual [`Sleep`][sleep]
-/// instances. The `Driver` implementation is thread-safe and, as such, is able
-/// to handle callers from across threads.
-///
-/// After creating the `Driver` instance, the caller must repeatedly call `park`
-/// or `park_timeout`. The time driver will perform no work unless `park` or
-/// `park_timeout` is called repeatedly.
-///
-/// The driver has a resolution of one millisecond. Any unit of time that falls
-/// between milliseconds are rounded up to the next millisecond.
-///
-/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
-/// elapsed will be notified with an error. At this point, calling `poll` on the
-/// [`Sleep`][sleep] instance will result in panic.
-///
-/// # Implementation
-///
-/// The time driver is based on the [paper by Varghese and Lauck][paper].
-///
-/// A hashed timing wheel is a vector of slots, where each slot handles a time
-/// slice. As time progresses, the timer walks over the slot for the current
-/// instant, and processes each entry for that slot. When the timer reaches the
-/// end of the wheel, it starts again at the beginning.
-///
-/// The implementation maintains six wheels arranged in a set of levels. As the
-/// levels go up, the slots of the associated wheel represent larger intervals
-/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
-/// time equal to the wheel at the lower level. At level zero, each slot
-/// represents one millisecond of time.
-///
-/// The wheels are:
-///
-/// * Level 0: 64 x 1 millisecond slots.
-/// * Level 1: 64 x 64 millisecond slots.
-/// * Level 2: 64 x ~4 second slots.
-/// * Level 3: 64 x ~4 minute slots.
-/// * Level 4: 64 x ~4 hour slots.
-/// * Level 5: 64 x ~12 day slots.
-///
-/// When the timer processes entries at level zero, it will notify all the
-/// `Sleep` instances as their deadlines have been reached. For all higher
-/// levels, all entries will be redistributed across the wheel at the next level
-/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
-/// either be canceled (dropped) or their associated entries will reach level
-/// zero and be notified.
-///
-/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
-/// [sleep]: crate::time::Sleep
-/// [timeout]: crate::time::Timeout
-/// [interval]: crate::time::Interval
-#[derive(Debug)]
-pub(crate) struct Driver<P: Park + 'static> {
- /// Timing backend in use
- time_source: ClockTime,
-
- /// Shared state
- handle: Handle,
-
- /// Parker to delegate to
- park: P,
-
- // When `true`, a call to `park_timeout` should immediately return and time
- // should not advance. One reason for this to be `true` is if the task
- // passed to `Runtime::block_on` called `task::yield_now()`.
- //
- // While it may look racy, it only has any effect when the clock is paused
- // and pausing the clock is restricted to a single-threaded runtime.
- #[cfg(feature = "test-util")]
- did_wake: Arc<AtomicBool>,
-}
-
-/// A structure which handles conversion from Instants to u64 timestamps.
-#[derive(Debug, Clone)]
-pub(self) struct ClockTime {
- clock: super::clock::Clock,
- start_time: Instant,
-}
-
-impl ClockTime {
- pub(self) fn new(clock: Clock) -> Self {
- Self {
- start_time: clock.now(),
- clock,
- }
- }
-
- pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
- // Round up to the end of a ms
- self.instant_to_tick(t + Duration::from_nanos(999_999))
- }
-
- pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
- // round up
- let dur: Duration = t
- .checked_duration_since(self.start_time)
- .unwrap_or_else(|| Duration::from_secs(0));
- let ms = dur.as_millis();
-
- ms.try_into().expect("Duration too far into the future")
- }
-
- pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
- Duration::from_millis(t)
- }
-
- pub(self) fn now(&self) -> u64 {
- self.instant_to_tick(self.clock.now())
- }
-}
-
-/// Timer state shared between `Driver`, `Handle`, and `Registration`.
-struct Inner {
- // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
- pub(super) state: Mutex<InnerState>,
-
- /// True if the driver is being shutdown
- pub(super) is_shutdown: AtomicBool,
-}
-
-/// Time state shared which must be protected by a `Mutex`
-struct InnerState {
- /// Timing backend in use
- time_source: ClockTime,
-
- /// The last published timer `elapsed` value.
- elapsed: u64,
-
- /// The earliest time at which we promise to wake up without unparking
- next_wake: Option<NonZeroU64>,
-
- /// Timer wheel
- wheel: wheel::Wheel,
-
- /// Unparker that can be used to wake the time driver
- unpark: Box<dyn Unpark>,
-}
-
-// ===== impl Driver =====
-
-impl<P> Driver<P>
-where
- P: Park + 'static,
-{
- /// Creates a new `Driver` instance that uses `park` to block the current
- /// thread and `time_source` to get the current time and convert to ticks.
- ///
- /// Specifying the source of time is useful when testing.
- pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
- let time_source = ClockTime::new(clock);
-
- let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
-
- Driver {
- time_source,
- handle: Handle::new(Arc::new(inner)),
- park,
- #[cfg(feature = "test-util")]
- did_wake: Arc::new(AtomicBool::new(false)),
- }
- }
-
- /// Returns a handle to the timer.
- ///
- /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances
- /// can either be created directly or the `Handle` instance can be passed to
- /// `with_default`, setting the timer as the default timer for the execution
- /// context.
- pub(crate) fn handle(&self) -> Handle {
- self.handle.clone()
- }
-
- fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
- let mut lock = self.handle.get().state.lock();
-
- assert!(!self.handle.is_shutdown());
-
- let next_wake = lock.wheel.next_expiration_time();
- lock.next_wake =
- next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
-
- drop(lock);
-
- match next_wake {
- Some(when) => {
- let now = self.time_source.now();
- // Note that we effectively round up to 1ms here - this avoids
- // very short-duration microsecond-resolution sleeps that the OS
- // might treat as zero-length.
- let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
-
- if duration > Duration::from_millis(0) {
- if let Some(limit) = limit {
- duration = std::cmp::min(limit, duration);
- }
-
- self.park_timeout(duration)?;
- } else {
- self.park.park_timeout(Duration::from_secs(0))?;
- }
- }
- None => {
- if let Some(duration) = limit {
- self.park_timeout(duration)?;
- } else {
- self.park.park()?;
- }
- }
- }
-
- // Process pending timers after waking up
- self.handle.process();
-
- Ok(())
- }
-
- cfg_test_util! {
- fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
- let clock = &self.time_source.clock;
-
- if clock.is_paused() {
- self.park.park_timeout(Duration::from_secs(0))?;
-
- // If the time driver was woken, then the park completed
- // before the "duration" elapsed (usually caused by a
- // yield in `Runtime::block_on`). In this case, we don't
- // advance the clock.
- if !self.did_wake() {
- // Simulate advancing time
- clock.advance(duration);
- }
- } else {
- self.park.park_timeout(duration)?;
- }
-
- Ok(())
- }
-
- fn did_wake(&self) -> bool {
- self.did_wake.swap(false, Ordering::SeqCst)
- }
- }
-
- cfg_not_test_util! {
- fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
- self.park.park_timeout(duration)
- }
- }
-}
-
-impl Handle {
- /// Runs timer related logic, and returns the next wakeup time
- pub(self) fn process(&self) {
- let now = self.time_source().now();
-
- self.process_at_time(now)
- }
-
- pub(self) fn process_at_time(&self, now: u64) {
- let mut waker_list: [Option<Waker>; 32] = Default::default();
- let mut waker_idx = 0;
-
- let mut lock = self.get().lock();
-
- assert!(now >= lock.elapsed);
-
- while let Some(entry) = lock.wheel.poll(now) {
- debug_assert!(unsafe { entry.is_pending() });
-
- // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
- if let Some(waker) = unsafe { entry.fire(Ok(())) } {
- waker_list[waker_idx] = Some(waker);
-
- waker_idx += 1;
-
- if waker_idx == waker_list.len() {
- // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
- drop(lock);
-
- for waker in waker_list.iter_mut() {
- waker.take().unwrap().wake();
- }
-
- waker_idx = 0;
-
- lock = self.get().lock();
- }
- }
- }
-
- // Update the elapsed cache
- lock.elapsed = lock.wheel.elapsed();
- lock.next_wake = lock
- .wheel
- .poll_at()
- .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
-
- drop(lock);
-
- for waker in waker_list[0..waker_idx].iter_mut() {
- waker.take().unwrap().wake();
- }
- }
-
- /// Removes a registered timer from the driver.
- ///
- /// The timer will be moved to the cancelled state. Wakers will _not_ be
- /// invoked. If the timer is already completed, this function is a no-op.
- ///
- /// This function always acquires the driver lock, even if the entry does
- /// not appear to be registered.
- ///
- /// SAFETY: The timer must not be registered with some other driver, and
- /// `add_entry` must not be called concurrently.
- pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
- unsafe {
- let mut lock = self.get().lock();
-
- if entry.as_ref().might_be_registered() {
- lock.wheel.remove(entry);
- }
-
- entry.as_ref().handle().fire(Ok(()));
- }
- }
-
- /// Removes and re-adds an entry to the driver.
- ///
- /// SAFETY: The timer must be either unregistered, or registered with this
- /// driver. No other threads are allowed to concurrently manipulate the
- /// timer at all (the current thread should hold an exclusive reference to
- /// the `TimerEntry`)
- pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
- let waker = unsafe {
- let mut lock = self.get().lock();
-
- // We may have raced with a firing/deregistration, so check before
- // deregistering.
- if unsafe { entry.as_ref().might_be_registered() } {
- lock.wheel.remove(entry);
- }
-
- // Now that we have exclusive control of this entry, mint a handle to reinsert it.
- let entry = entry.as_ref().handle();
-
- if self.is_shutdown() {
- unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
- } else {
- entry.set_expiration(new_tick);
-
- // Note: We don't have to worry about racing with some other resetting
- // thread, because add_entry and reregister require exclusive control of
- // the timer entry.
- match unsafe { lock.wheel.insert(entry) } {
- Ok(when) => {
- if lock
- .next_wake
- .map(|next_wake| when < next_wake.get())
- .unwrap_or(true)
- {
- lock.unpark.unpark();
- }
-
- None
- }
- Err((entry, super::error::InsertError::Elapsed)) => unsafe {
- entry.fire(Ok(()))
- },
- }
- }
-
- // Must release lock before invoking waker to avoid the risk of deadlock.
- };
-
- // The timer was fired synchronously as a result of the reregistration.
- // Wake the waker; this is needed because we might reset _after_ a poll,
- // and otherwise the task won't be awoken to poll again.
- if let Some(waker) = waker {
- waker.wake();
- }
- }
-}
-
-impl<P> Park for Driver<P>
-where
- P: Park + 'static,
-{
- type Unpark = TimerUnpark<P>;
- type Error = P::Error;
-
- fn unpark(&self) -> Self::Unpark {
- TimerUnpark::new(self)
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.park_internal(None)
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.park_internal(Some(duration))
- }
-
- fn shutdown(&mut self) {
- if self.handle.is_shutdown() {
- return;
- }
-
- self.handle.get().is_shutdown.store(true, Ordering::SeqCst);
-
- // Advance time forward to the end of time.
-
- self.handle.process_at_time(u64::MAX);
-
- self.park.shutdown();
- }
-}
-
-impl<P> Drop for Driver<P>
-where
- P: Park + 'static,
-{
- fn drop(&mut self) {
- self.shutdown();
- }
-}
-
-pub(crate) struct TimerUnpark<P: Park + 'static> {
- inner: P::Unpark,
-
- #[cfg(feature = "test-util")]
- did_wake: Arc<AtomicBool>,
-}
-
-impl<P: Park + 'static> TimerUnpark<P> {
- fn new(driver: &Driver<P>) -> TimerUnpark<P> {
- TimerUnpark {
- inner: driver.park.unpark(),
-
- #[cfg(feature = "test-util")]
- did_wake: driver.did_wake.clone(),
- }
- }
-}
-
-impl<P: Park + 'static> Unpark for TimerUnpark<P> {
- fn unpark(&self) {
- #[cfg(feature = "test-util")]
- self.did_wake.store(true, Ordering::SeqCst);
-
- self.inner.unpark();
- }
-}
-
-// ===== impl Inner =====
-
-impl Inner {
- pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
- Inner {
- state: Mutex::new(InnerState {
- time_source,
- elapsed: 0,
- next_wake: None,
- unpark,
- wheel: wheel::Wheel::new(),
- }),
- is_shutdown: AtomicBool::new(false),
- }
- }
-
- /// Locks the driver's inner structure
- pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
- self.state.lock()
- }
-
- // Check whether the driver has been shutdown
- pub(super) fn is_shutdown(&self) -> bool {
- self.is_shutdown.load(Ordering::SeqCst)
- }
-}
-
-impl fmt::Debug for Inner {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Inner").finish()
- }
-}
-
-#[cfg(test)]
-mod tests;
diff --git a/vendor/tokio/src/time/driver/sleep.rs b/vendor/tokio/src/time/driver/sleep.rs
deleted file mode 100644
index 40f745ad7..000000000
--- a/vendor/tokio/src/time/driver/sleep.rs
+++ /dev/null
@@ -1,257 +0,0 @@
-use crate::time::driver::{Handle, TimerEntry};
-use crate::time::{error::Error, Duration, Instant};
-
-use pin_project_lite::pin_project;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{self, Poll};
-
-/// Waits until `deadline` is reached.
-///
-/// No work is performed while awaiting on the sleep future to complete. `Sleep`
-/// operates at millisecond granularity and should not be used for tasks that
-/// require high-resolution timers.
-///
-/// # Cancellation
-///
-/// Canceling a sleep instance is done by dropping the returned future. No additional
-/// cleanup work is required.
-// Alias for old name in 0.x
-#[cfg_attr(docsrs, doc(alias = "delay_until"))]
-pub fn sleep_until(deadline: Instant) -> Sleep {
- Sleep::new_timeout(deadline)
-}
-
-/// Waits until `duration` has elapsed.
-///
-/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
-/// analog to `std::thread::sleep`.
-///
-/// No work is performed while awaiting on the sleep future to complete. `Sleep`
-/// operates at millisecond granularity and should not be used for tasks that
-/// require high-resolution timers.
-///
-/// To run something regularly on a schedule, see [`interval`].
-///
-/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
-///
-/// # Cancellation
-///
-/// Canceling a sleep instance is done by dropping the returned future. No additional
-/// cleanup work is required.
-///
-/// # Examples
-///
-/// Wait 100ms and print "100 ms have elapsed".
-///
-/// ```
-/// use tokio::time::{sleep, Duration};
-///
-/// #[tokio::main]
-/// async fn main() {
-/// sleep(Duration::from_millis(100)).await;
-/// println!("100 ms have elapsed");
-/// }
-/// ```
-///
-/// [`interval`]: crate::time::interval()
-// Alias for old name in 0.x
-#[cfg_attr(docsrs, doc(alias = "delay_for"))]
-#[cfg_attr(docsrs, doc(alias = "wait"))]
-pub fn sleep(duration: Duration) -> Sleep {
- match Instant::now().checked_add(duration) {
- Some(deadline) => sleep_until(deadline),
- None => sleep_until(Instant::far_future()),
- }
-}
-
-pin_project! {
- /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
- ///
- /// This type does not implement the `Unpin` trait, which means that if you
- /// use it with [`select!`] or by calling `poll`, you have to pin it first.
- /// If you use it with `.await`, this does not apply.
- ///
- /// # Examples
- ///
- /// Wait 100ms and print "100 ms have elapsed".
- ///
- /// ```
- /// use tokio::time::{sleep, Duration};
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// sleep(Duration::from_millis(100)).await;
- /// println!("100 ms have elapsed");
- /// }
- /// ```
- ///
- /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
- /// necessary when the same `Sleep` is selected on multiple times.
- /// ```no_run
- /// use tokio::time::{self, Duration, Instant};
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let sleep = time::sleep(Duration::from_millis(10));
- /// tokio::pin!(sleep);
- ///
- /// loop {
- /// tokio::select! {
- /// () = &mut sleep => {
- /// println!("timer elapsed");
- /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
- /// },
- /// }
- /// }
- /// }
- /// ```
- /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
- /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
- /// ```
- /// use std::future::Future;
- /// use std::pin::Pin;
- /// use std::task::{Context, Poll};
- /// use tokio::time::Sleep;
- ///
- /// struct HasSleep {
- /// sleep: Pin<Box<Sleep>>,
- /// }
- ///
- /// impl Future for HasSleep {
- /// type Output = ();
- ///
- /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- /// self.sleep.as_mut().poll(cx)
- /// }
- /// }
- /// ```
- /// Use in a struct with pin projection. This method avoids the `Box`, but
- /// the `HasSleep` struct will not be `Unpin` as a consequence.
- /// ```
- /// use std::future::Future;
- /// use std::pin::Pin;
- /// use std::task::{Context, Poll};
- /// use tokio::time::Sleep;
- /// use pin_project_lite::pin_project;
- ///
- /// pin_project! {
- /// struct HasSleep {
- /// #[pin]
- /// sleep: Sleep,
- /// }
- /// }
- ///
- /// impl Future for HasSleep {
- /// type Output = ();
- ///
- /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- /// self.project().sleep.poll(cx)
- /// }
- /// }
- /// ```
- ///
- /// [`select!`]: ../macro.select.html
- /// [`tokio::pin!`]: ../macro.pin.html
- // Alias for old name in 0.2
- #[cfg_attr(docsrs, doc(alias = "Delay"))]
- #[derive(Debug)]
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct Sleep {
- deadline: Instant,
-
- // The link between the `Sleep` instance and the timer that drives it.
- #[pin]
- entry: TimerEntry,
- }
-}
-
-impl Sleep {
- pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
- let handle = Handle::current();
- let entry = TimerEntry::new(&handle, deadline);
-
- Sleep { deadline, entry }
- }
-
- pub(crate) fn far_future() -> Sleep {
- Self::new_timeout(Instant::far_future())
- }
-
- /// Returns the instant at which the future will complete.
- pub fn deadline(&self) -> Instant {
- self.deadline
- }
-
- /// Returns `true` if `Sleep` has elapsed.
- ///
- /// A `Sleep` instance is elapsed when the requested duration has elapsed.
- pub fn is_elapsed(&self) -> bool {
- self.entry.is_elapsed()
- }
-
- /// Resets the `Sleep` instance to a new deadline.
- ///
- /// Calling this function allows changing the instant at which the `Sleep`
- /// future completes without having to create new associated state.
- ///
- /// This function can be called both before and after the future has
- /// completed.
- ///
- /// To call this method, you will usually combine the call with
- /// [`Pin::as_mut`], which lets you call the method without consuming the
- /// `Sleep` itself.
- ///
- /// # Example
- ///
- /// ```
- /// use tokio::time::{Duration, Instant};
- ///
- /// # #[tokio::main(flavor = "current_thread")]
- /// # async fn main() {
- /// let sleep = tokio::time::sleep(Duration::from_millis(10));
- /// tokio::pin!(sleep);
- ///
- /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
- /// # }
- /// ```
- ///
- /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
- pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
- let me = self.project();
- me.entry.reset(deadline);
- *me.deadline = deadline;
- }
-
- fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- let me = self.project();
-
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- me.entry.poll_elapsed(cx).map(move |r| {
- coop.made_progress();
- r
- })
- }
-}
-
-impl Future for Sleep {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
- // `poll_elapsed` can return an error in two cases:
- //
- // - AtCapacity: this is a pathological case where far too many
- // sleep instances have been scheduled.
- // - Shutdown: No timer has been setup, which is a mis-use error.
- //
- // Both cases are extremely rare, and pretty accurately fit into
- // "logic errors", so we just panic in this case. A user couldn't
- // really do much better if we passed the error onwards.
- match ready!(self.as_mut().poll_elapsed(cx)) {
- Ok(()) => Poll::Ready(()),
- Err(e) => panic!("timer error: {}", e),
- }
- }
-}
diff --git a/vendor/tokio/src/time/driver/tests/mod.rs b/vendor/tokio/src/time/driver/tests/mod.rs
deleted file mode 100644
index 7c5cf1fd0..000000000
--- a/vendor/tokio/src/time/driver/tests/mod.rs
+++ /dev/null
@@ -1,287 +0,0 @@
-use std::{task::Context, time::Duration};
-
-#[cfg(not(loom))]
-use futures::task::noop_waker_ref;
-
-use crate::loom::sync::Arc;
-use crate::loom::thread;
-use crate::{
- loom::sync::atomic::{AtomicBool, Ordering},
- park::Unpark,
-};
-
-use super::{Handle, TimerEntry};
-
-struct MockUnpark {}
-impl Unpark for MockUnpark {
- fn unpark(&self) {}
-}
-impl MockUnpark {
- fn mock() -> Box<dyn Unpark> {
- Box::new(Self {})
- }
-}
-
-fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
- #[cfg(loom)]
- return loom::future::block_on(f);
-
- #[cfg(not(loom))]
- return futures::executor::block_on(f);
-}
-
-fn model(f: impl Fn() + Send + Sync + 'static) {
- #[cfg(loom)]
- loom::model(f);
-
- #[cfg(not(loom))]
- f();
-}
-
-#[test]
-fn single_timer() {
- model(|| {
- let clock = crate::time::clock::Clock::new(true, false);
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let handle_ = handle.clone();
- let jh = thread::spawn(move || {
- let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
- pin!(entry);
-
- block_on(futures::future::poll_fn(|cx| {
- entry.as_mut().poll_elapsed(cx)
- }))
- .unwrap();
- });
-
- thread::yield_now();
-
- // This may or may not return Some (depending on how it races with the
- // thread). If it does return None, however, the timer should complete
- // synchronously.
- handle.process_at_time(time_source.now() + 2_000_000_000);
-
- jh.join().unwrap();
- })
-}
-
-#[test]
-fn drop_timer() {
- model(|| {
- let clock = crate::time::clock::Clock::new(true, false);
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let handle_ = handle.clone();
- let jh = thread::spawn(move || {
- let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
- pin!(entry);
-
- let _ = entry
- .as_mut()
- .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
- let _ = entry
- .as_mut()
- .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
- });
-
- thread::yield_now();
-
- // advance 2s in the future.
- handle.process_at_time(time_source.now() + 2_000_000_000);
-
- jh.join().unwrap();
- })
-}
-
-#[test]
-fn change_waker() {
- model(|| {
- let clock = crate::time::clock::Clock::new(true, false);
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let handle_ = handle.clone();
- let jh = thread::spawn(move || {
- let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
- pin!(entry);
-
- let _ = entry
- .as_mut()
- .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
-
- block_on(futures::future::poll_fn(|cx| {
- entry.as_mut().poll_elapsed(cx)
- }))
- .unwrap();
- });
-
- thread::yield_now();
-
- // advance 2s
- handle.process_at_time(time_source.now() + 2_000_000_000);
-
- jh.join().unwrap();
- })
-}
-
-#[test]
-fn reset_future() {
- model(|| {
- let finished_early = Arc::new(AtomicBool::new(false));
-
- let clock = crate::time::clock::Clock::new(true, false);
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let handle_ = handle.clone();
- let finished_early_ = finished_early.clone();
- let start = clock.now();
-
- let jh = thread::spawn(move || {
- let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
- pin!(entry);
-
- let _ = entry
- .as_mut()
- .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
-
- entry.as_mut().reset(start + Duration::from_secs(2));
-
- // shouldn't complete before 2s
- block_on(futures::future::poll_fn(|cx| {
- entry.as_mut().poll_elapsed(cx)
- }))
- .unwrap();
-
- finished_early_.store(true, Ordering::Relaxed);
- });
-
- thread::yield_now();
-
- // This may or may not return a wakeup time.
- handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500)));
-
- assert!(!finished_early.load(Ordering::Relaxed));
-
- handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500)));
-
- jh.join().unwrap();
-
- assert!(finished_early.load(Ordering::Relaxed));
- })
-}
-
-#[test]
-#[cfg(not(loom))]
-fn poll_process_levels() {
- let clock = crate::time::clock::Clock::new(true, false);
- clock.pause();
-
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source, MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let mut entries = vec![];
-
- for i in 0..1024 {
- let mut entry = Box::pin(TimerEntry::new(
- &handle,
- clock.now() + Duration::from_millis(i),
- ));
-
- let _ = entry
- .as_mut()
- .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
-
- entries.push(entry);
- }
-
- for t in 1..1024 {
- handle.process_at_time(t as u64);
- for (deadline, future) in entries.iter_mut().enumerate() {
- let mut context = Context::from_waker(noop_waker_ref());
- if deadline <= t {
- assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
- } else {
- assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
- }
- }
- }
-}
-
-#[test]
-#[cfg(not(loom))]
-fn poll_process_levels_targeted() {
- let mut context = Context::from_waker(noop_waker_ref());
-
- let clock = crate::time::clock::Clock::new(true, false);
- clock.pause();
-
- let time_source = super::ClockTime::new(clock.clone());
-
- let inner = super::Inner::new(time_source, MockUnpark::mock());
- let handle = Handle::new(Arc::new(inner));
-
- let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
- pin!(e1);
-
- handle.process_at_time(62);
- assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
- handle.process_at_time(192);
- handle.process_at_time(192);
-}
-
-/*
-#[test]
-fn balanced_incr_and_decr() {
- const OPS: usize = 5;
-
- fn incr(inner: Arc<Inner>) {
- for _ in 0..OPS {
- inner.increment().expect("increment should not have failed");
- thread::yield_now();
- }
- }
-
- fn decr(inner: Arc<Inner>) {
- let mut ops_performed = 0;
- while ops_performed < OPS {
- if inner.num(Ordering::Relaxed) > 0 {
- ops_performed += 1;
- inner.decrement();
- }
- thread::yield_now();
- }
- }
-
- loom::model(|| {
- let unpark = Box::new(MockUnpark);
- let instant = Instant::now();
-
- let inner = Arc::new(Inner::new(instant, unpark));
-
- let incr_inner = inner.clone();
- let decr_inner = inner.clone();
-
- let incr_hndle = thread::spawn(move || incr(incr_inner));
- let decr_hndle = thread::spawn(move || decr(decr_inner));
-
- incr_hndle.join().expect("should never fail");
- decr_hndle.join().expect("should never fail");
-
- assert_eq!(inner.num(Ordering::SeqCst), 0);
- })
-}
-*/
diff --git a/vendor/tokio/src/time/driver/wheel/level.rs b/vendor/tokio/src/time/driver/wheel/level.rs
deleted file mode 100644
index 81d6b58c7..000000000
--- a/vendor/tokio/src/time/driver/wheel/level.rs
+++ /dev/null
@@ -1,275 +0,0 @@
-use crate::time::driver::TimerHandle;
-
-use crate::time::driver::{EntryList, TimerShared};
-
-use std::{fmt, ptr::NonNull};
-
-/// Wheel for a single level in the timer. This wheel contains 64 slots.
-pub(crate) struct Level {
- level: usize,
-
- /// Bit field tracking which slots currently contain entries.
- ///
- /// Using a bit field to track slots that contain entries allows avoiding a
- /// scan to find entries. This field is updated when entries are added or
- /// removed from a slot.
- ///
- /// The least-significant bit represents slot zero.
- occupied: u64,
-
- /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell.
- slot: [EntryList; LEVEL_MULT],
-}
-
-/// Indicates when a slot must be processed next.
-#[derive(Debug)]
-pub(crate) struct Expiration {
- /// The level containing the slot.
- pub(crate) level: usize,
-
- /// The slot index.
- pub(crate) slot: usize,
-
- /// The instant at which the slot needs to be processed.
- pub(crate) deadline: u64,
-}
-
-/// Level multiplier.
-///
-/// Being a power of 2 is very important.
-const LEVEL_MULT: usize = 64;
-
-impl Level {
- pub(crate) fn new(level: usize) -> Level {
- // A value has to be Copy in order to use syntax like:
- // let stack = Stack::default();
- // ...
- // slots: [stack; 64],
- //
- // Alternatively, since Stack is Default one can
- // use syntax like:
- // let slots: [Stack; 64] = Default::default();
- //
- // However, that is only supported for arrays of size
- // 32 or fewer. So in our case we have to explicitly
- // invoke the constructor for each array element.
- let ctor = || EntryList::default();
-
- Level {
- level,
- occupied: 0,
- slot: [
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ctor(),
- ],
- }
- }
-
- /// Finds the slot that needs to be processed next and returns the slot and
- /// `Instant` at which this slot must be processed.
- pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
- // Use the `occupied` bit field to get the index of the next slot that
- // needs to be processed.
- let slot = match self.next_occupied_slot(now) {
- Some(slot) => slot,
- None => return None,
- };
-
- // From the slot index, calculate the `Instant` at which it needs to be
- // processed. This value *must* be in the future with respect to `now`.
-
- let level_range = level_range(self.level);
- let slot_range = slot_range(self.level);
-
- // TODO: This can probably be simplified w/ power of 2 math
- let level_start = now - (now % level_range);
- let mut deadline = level_start + slot as u64 * slot_range;
-
- if deadline <= now {
- // A timer is in a slot "prior" to the current time. This can occur
- // because we do not have an infinite hierarchy of timer levels, and
- // eventually a timer scheduled for a very distant time might end up
- // being placed in a slot that is beyond the end of all of the
- // arrays.
- //
- // To deal with this, we first limit timers to being scheduled no
- // more than MAX_DURATION ticks in the future; that is, they're at
- // most one rotation of the top level away. Then, we force timers
- // that logically would go into the top+1 level, to instead go into
- // the top level's slots.
- //
- // What this means is that the top level's slots act as a
- // pseudo-ring buffer, and we rotate around them indefinitely. If we
- // compute a deadline before now, and it's the top level, it
- // therefore means we're actually looking at a slot in the future.
- debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
-
- deadline += level_range;
- }
-
- debug_assert!(
- deadline >= now,
- "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}",
- deadline,
- now,
- self.level,
- level_range,
- slot_range,
- slot,
- self.occupied
- );
-
- Some(Expiration {
- level: self.level,
- slot,
- deadline,
- })
- }
-
- fn next_occupied_slot(&self, now: u64) -> Option<usize> {
- if self.occupied == 0 {
- return None;
- }
-
- // Get the slot for now using Maths
- let now_slot = (now / slot_range(self.level)) as usize;
- let occupied = self.occupied.rotate_right(now_slot as u32);
- let zeros = occupied.trailing_zeros() as usize;
- let slot = (zeros + now_slot) % 64;
-
- Some(slot)
- }
-
- pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
- let slot = slot_for(item.cached_when(), self.level);
-
- self.slot[slot].push_front(item);
-
- self.occupied |= occupied_bit(slot);
- }
-
- pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
- let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
-
- unsafe { self.slot[slot].remove(item) };
- if self.slot[slot].is_empty() {
- // The bit is currently set
- debug_assert!(self.occupied & occupied_bit(slot) != 0);
-
- // Unset the bit
- self.occupied ^= occupied_bit(slot);
- }
- }
-
- pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
- self.occupied &= !occupied_bit(slot);
-
- std::mem::take(&mut self.slot[slot])
- }
-}
-
-impl fmt::Debug for Level {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Level")
- .field("occupied", &self.occupied)
- .finish()
- }
-}
-
-fn occupied_bit(slot: usize) -> u64 {
- 1 << slot
-}
-
-fn slot_range(level: usize) -> u64 {
- LEVEL_MULT.pow(level as u32) as u64
-}
-
-fn level_range(level: usize) -> u64 {
- LEVEL_MULT as u64 * slot_range(level)
-}
-
-/// Convert a duration (milliseconds) and a level to a slot position
-fn slot_for(duration: u64, level: usize) -> usize {
- ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
-}
-
-#[cfg(all(test, not(loom)))]
-mod test {
- use super::*;
-
- #[test]
- fn test_slot_for() {
- for pos in 0..64 {
- assert_eq!(pos as usize, slot_for(pos, 0));
- }
-
- for level in 1..5 {
- for pos in level..64 {
- let a = pos * 64_usize.pow(level as u32);
- assert_eq!(pos as usize, slot_for(a as u64, level));
- }
- }
- }
-}
diff --git a/vendor/tokio/src/time/driver/wheel/mod.rs b/vendor/tokio/src/time/driver/wheel/mod.rs
deleted file mode 100644
index 5a40f6db8..000000000
--- a/vendor/tokio/src/time/driver/wheel/mod.rs
+++ /dev/null
@@ -1,359 +0,0 @@
-use crate::time::driver::{TimerHandle, TimerShared};
-use crate::time::error::InsertError;
-
-mod level;
-pub(crate) use self::level::Expiration;
-use self::level::Level;
-
-use std::ptr::NonNull;
-
-use super::EntryList;
-
-/// Timing wheel implementation.
-///
-/// This type provides the hashed timing wheel implementation that backs `Timer`
-/// and `DelayQueue`.
-///
-/// The structure is generic over `T: Stack`. This allows handling timeout data
-/// being stored on the heap or in a slab. In order to support the latter case,
-/// the slab must be passed into each function allowing the implementation to
-/// lookup timer entries.
-///
-/// See `Timer` documentation for some implementation notes.
-#[derive(Debug)]
-pub(crate) struct Wheel {
- /// The number of milliseconds elapsed since the wheel started.
- elapsed: u64,
-
- /// Timer wheel.
- ///
- /// Levels:
- ///
- /// * 1 ms slots / 64 ms range
- /// * 64 ms slots / ~ 4 sec range
- /// * ~ 4 sec slots / ~ 4 min range
- /// * ~ 4 min slots / ~ 4 hr range
- /// * ~ 4 hr slots / ~ 12 day range
- /// * ~ 12 day slots / ~ 2 yr range
- levels: Vec<Level>,
-
- /// Entries queued for firing
- pending: EntryList,
-}
-
-/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
-/// each, the timer is able to track time up to 2 years into the future with a
-/// precision of 1 millisecond.
-const NUM_LEVELS: usize = 6;
-
-/// The maximum duration of a `Sleep`
-pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
-
-impl Wheel {
- /// Create a new timing wheel
- pub(crate) fn new() -> Wheel {
- let levels = (0..NUM_LEVELS).map(Level::new).collect();
-
- Wheel {
- elapsed: 0,
- levels,
- pending: EntryList::new(),
- }
- }
-
- /// Return the number of milliseconds that have elapsed since the timing
- /// wheel's creation.
- pub(crate) fn elapsed(&self) -> u64 {
- self.elapsed
- }
-
- /// Insert an entry into the timing wheel.
- ///
- /// # Arguments
- ///
- /// * `item`: The item to insert into the wheel.
- ///
- /// # Return
- ///
- /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
- ///
- /// `Err(Elapsed)` indicates that `when` represents an instant that has
- /// already passed. In this case, the caller should fire the timeout
- /// immediately.
- ///
- /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
- ///
- /// # Safety
- ///
- /// This function registers item into an intrusive linked list. The caller
- /// must ensure that `item` is pinned and will not be dropped without first
- /// being deregistered.
- pub(crate) unsafe fn insert(
- &mut self,
- item: TimerHandle,
- ) -> Result<u64, (TimerHandle, InsertError)> {
- let when = item.sync_when();
-
- if when <= self.elapsed {
- return Err((item, InsertError::Elapsed));
- }
-
- // Get the level at which the entry should be stored
- let level = self.level_for(when);
-
- unsafe {
- self.levels[level].add_entry(item);
- }
-
- debug_assert!({
- self.levels[level]
- .next_expiration(self.elapsed)
- .map(|e| e.deadline >= self.elapsed)
- .unwrap_or(true)
- });
-
- Ok(when)
- }
-
- /// Remove `item` from the timing wheel.
- pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
- unsafe {
- let when = item.as_ref().cached_when();
- if when == u64::MAX {
- self.pending.remove(item);
- } else {
- debug_assert!(
- self.elapsed <= when,
- "elapsed={}; when={}",
- self.elapsed,
- when
- );
-
- let level = self.level_for(when);
-
- self.levels[level].remove_entry(item);
- }
- }
- }
-
- /// Instant at which to poll
- pub(crate) fn poll_at(&self) -> Option<u64> {
- self.next_expiration().map(|expiration| expiration.deadline)
- }
-
- /// Advances the timer up to the instant represented by `now`.
- pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
- loop {
- if let Some(handle) = self.pending.pop_back() {
- return Some(handle);
- }
-
- // under what circumstances is poll.expiration Some vs. None?
- let expiration = self.next_expiration().and_then(|expiration| {
- if expiration.deadline > now {
- None
- } else {
- Some(expiration)
- }
- });
-
- match expiration {
- Some(ref expiration) if expiration.deadline > now => return None,
- Some(ref expiration) => {
- self.process_expiration(expiration);
-
- self.set_elapsed(expiration.deadline);
- }
- None => {
- // in this case the poll did not indicate an expiration
- // _and_ we were not able to find a next expiration in
- // the current list of timers. advance to the poll's
- // current time and do nothing else.
- self.set_elapsed(now);
- break;
- }
- }
- }
-
- self.pending.pop_back()
- }
-
- /// Returns the instant at which the next timeout expires.
- fn next_expiration(&self) -> Option<Expiration> {
- if !self.pending.is_empty() {
- // Expire immediately as we have things pending firing
- return Some(Expiration {
- level: 0,
- slot: 0,
- deadline: self.elapsed,
- });
- }
-
- // Check all levels
- for level in 0..NUM_LEVELS {
- if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
- // There cannot be any expirations at a higher level that happen
- // before this one.
- debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
-
- return Some(expiration);
- }
- }
-
- None
- }
-
- /// Returns the tick at which this timer wheel next needs to perform some
- /// processing, or None if there are no timers registered.
- pub(super) fn next_expiration_time(&self) -> Option<u64> {
- self.next_expiration().map(|ex| ex.deadline)
- }
-
- /// Used for debug assertions
- fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
- let mut res = true;
-
- for l2 in start_level..NUM_LEVELS {
- if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
- if e2.deadline < before {
- res = false;
- }
- }
- }
-
- res
- }
-
- /// iteratively find entries that are between the wheel's current
- /// time and the expiration time. for each in that population either
- /// queue it for notification (in the case of the last level) or tier
- /// it down to the next level (in all other cases).
- pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
- // Note that we need to take _all_ of the entries off the list before
- // processing any of them. This is important because it's possible that
- // those entries might need to be reinserted into the same slot.
- //
- // This happens only on the highest level, when an entry is inserted
- // more than MAX_DURATION into the future. When this happens, we wrap
- // around, and process some entries a multiple of MAX_DURATION before
- // they actually need to be dropped down a level. We then reinsert them
- // back into the same position; we must make sure we don't then process
- // those entries again or we'll end up in an infinite loop.
- let mut entries = self.take_entries(expiration);
-
- while let Some(item) = entries.pop_back() {
- if expiration.level == 0 {
- debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
- }
-
- // Try to expire the entry; this is cheap (doesn't synchronize) if
- // the timer is not expired, and updates cached_when.
- match unsafe { item.mark_pending(expiration.deadline) } {
- Ok(()) => {
- // Item was expired
- self.pending.push_front(item);
- }
- Err(expiration_tick) => {
- let level = level_for(expiration.deadline, expiration_tick);
- unsafe {
- self.levels[level].add_entry(item);
- }
- }
- }
- }
- }
-
- fn set_elapsed(&mut self, when: u64) {
- assert!(
- self.elapsed <= when,
- "elapsed={:?}; when={:?}",
- self.elapsed,
- when
- );
-
- if when > self.elapsed {
- self.elapsed = when;
- }
- }
-
- /// Obtains the list of entries that need processing for the given expiration.
- ///
- fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
- self.levels[expiration.level].take_slot(expiration.slot)
- }
-
- fn level_for(&self, when: u64) -> usize {
- level_for(self.elapsed, when)
- }
-}
-
-fn level_for(elapsed: u64, when: u64) -> usize {
- const SLOT_MASK: u64 = (1 << 6) - 1;
-
- // Mask in the trailing bits ignored by the level calculation in order to cap
- // the possible leading zeros
- let mut masked = elapsed ^ when | SLOT_MASK;
-
- if masked >= MAX_DURATION {
- // Fudge the timer into the top level
- masked = MAX_DURATION - 1;
- }
-
- let leading_zeros = masked.leading_zeros() as usize;
- let significant = 63 - leading_zeros;
-
- significant / 6
-}
-
-#[cfg(all(test, not(loom)))]
-mod test {
- use super::*;
-
- #[test]
- fn test_level_for() {
- for pos in 0..64 {
- assert_eq!(
- 0,
- level_for(0, pos),
- "level_for({}) -- binary = {:b}",
- pos,
- pos
- );
- }
-
- for level in 1..5 {
- for pos in level..64 {
- let a = pos * 64_usize.pow(level as u32);
- assert_eq!(
- level,
- level_for(0, a as u64),
- "level_for({}) -- binary = {:b}",
- a,
- a
- );
-
- if pos > level {
- let a = a - 1;
- assert_eq!(
- level,
- level_for(0, a as u64),
- "level_for({}) -- binary = {:b}",
- a,
- a
- );
- }
-
- if pos < 64 {
- let a = a + 1;
- assert_eq!(
- level,
- level_for(0, a as u64),
- "level_for({}) -- binary = {:b}",
- a,
- a
- );
- }
- }
- }
- }
-}
diff --git a/vendor/tokio/src/time/driver/wheel/stack.rs b/vendor/tokio/src/time/driver/wheel/stack.rs
deleted file mode 100644
index e7ed137f5..000000000
--- a/vendor/tokio/src/time/driver/wheel/stack.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-use super::{Item, OwnedItem};
-use crate::time::driver::Entry;
-
-use std::ptr;
-
-/// A doubly linked stack
-#[derive(Debug)]
-pub(crate) struct Stack {
- head: Option<OwnedItem>,
-}
-
-impl Default for Stack {
- fn default() -> Stack {
- Stack { head: None }
- }
-}
-
-impl Stack {
- pub(crate) fn is_empty(&self) -> bool {
- self.head.is_none()
- }
-
- pub(crate) fn push(&mut self, entry: OwnedItem) {
- // Get a pointer to the entry to for the prev link
- let ptr: *const Entry = &*entry as *const _;
-
- // Remove the old head entry
- let old = self.head.take();
-
- unsafe {
- // Ensure the entry is not already in a stack.
- debug_assert!((*entry.next_stack.get()).is_none());
- debug_assert!((*entry.prev_stack.get()).is_null());
-
- if let Some(ref entry) = old.as_ref() {
- debug_assert!({
- // The head is not already set to the entry
- ptr != &***entry as *const _
- });
-
- // Set the previous link on the old head
- *entry.prev_stack.get() = ptr;
- }
-
- // Set this entry's next pointer
- *entry.next_stack.get() = old;
- }
-
- // Update the head pointer
- self.head = Some(entry);
- }
-
- /// Pops an item from the stack
- pub(crate) fn pop(&mut self) -> Option<OwnedItem> {
- let entry = self.head.take();
-
- unsafe {
- if let Some(entry) = entry.as_ref() {
- self.head = (*entry.next_stack.get()).take();
-
- if let Some(entry) = self.head.as_ref() {
- *entry.prev_stack.get() = ptr::null();
- }
-
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- entry
- }
-
- pub(crate) fn remove(&mut self, entry: &Item) {
- unsafe {
- // Ensure that the entry is in fact contained by the stack
- debug_assert!({
- // This walks the full linked list even if an entry is found.
- let mut next = self.head.as_ref();
- let mut contains = false;
-
- while let Some(n) = next {
- if entry as *const _ == &**n as *const _ {
- debug_assert!(!contains);
- contains = true;
- }
-
- next = (*n.next_stack.get()).as_ref();
- }
-
- contains
- });
-
- // Unlink `entry` from the next node
- let next = (*entry.next_stack.get()).take();
-
- if let Some(next) = next.as_ref() {
- (*next.prev_stack.get()) = *entry.prev_stack.get();
- }
-
- // Unlink `entry` from the prev node
-
- if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
- *prev.next_stack.get() = next;
- } else {
- // It is the head
- self.head = next;
- }
-
- // Unset the prev pointer
- *entry.prev_stack.get() = ptr::null();
- }
- }
-}
diff --git a/vendor/tokio/src/time/error.rs b/vendor/tokio/src/time/error.rs
index 8674febe9..71344d434 100644
--- a/vendor/tokio/src/time/error.rs
+++ b/vendor/tokio/src/time/error.rs
@@ -40,8 +40,11 @@ impl From<Kind> for Error {
}
}
-/// Error returned by `Timeout`.
-#[derive(Debug, PartialEq)]
+/// Errors returned by `Timeout`.
+///
+/// This error is returned when a timeout expires before the function was able
+/// to finish.
+#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
#[derive(Debug)]
@@ -72,7 +75,7 @@ impl Error {
matches!(self.0, Kind::AtCapacity)
}
- /// Create an error representing a misconfigured timer.
+ /// Creates an error representing a misconfigured timer.
pub fn invalid() -> Error {
Error(Invalid)
}
diff --git a/vendor/tokio/src/time/instant.rs b/vendor/tokio/src/time/instant.rs
index f7cf12d4a..14cf6e567 100644
--- a/vendor/tokio/src/time/instant.rs
+++ b/vendor/tokio/src/time/instant.rs
@@ -67,13 +67,10 @@ impl Instant {
self.std
}
- /// Returns the amount of time elapsed from another instant to this one.
- ///
- /// # Panics
- ///
- /// This function will panic if `earlier` is later than `self`.
+ /// Returns the amount of time elapsed from another instant to this one, or
+ /// zero duration if that instant is later than this one.
pub fn duration_since(&self, earlier: Instant) -> Duration {
- self.std.duration_since(earlier.std)
+ self.std.saturating_duration_since(earlier.std)
}
/// Returns the amount of time elapsed from another instant to this one, or
@@ -118,13 +115,8 @@ impl Instant {
self.std.saturating_duration_since(earlier.std)
}
- /// Returns the amount of time elapsed since this instant was created.
- ///
- /// # Panics
- ///
- /// This function may panic if the current time is earlier than this
- /// instant, which is something that can happen if an `Instant` is
- /// produced synthetically.
+ /// Returns the amount of time elapsed since this instant was created,
+ /// or zero duration if that this instant is in the future.
///
/// # Examples
///
@@ -140,7 +132,7 @@ impl Instant {
/// }
/// ```
pub fn elapsed(&self) -> Duration {
- Instant::now() - *self
+ Instant::now().saturating_duration_since(*self)
}
/// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be
@@ -188,7 +180,7 @@ impl ops::Sub for Instant {
type Output = Duration;
fn sub(self, rhs: Instant) -> Duration {
- self.std - rhs.std
+ self.std.saturating_duration_since(rhs.std)
}
}
@@ -196,7 +188,7 @@ impl ops::Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, rhs: Duration) -> Instant {
- Instant::from_std(self.std - rhs)
+ Instant::from_std(std::time::Instant::sub(self.std, rhs))
}
}
diff --git a/vendor/tokio/src/time/interval.rs b/vendor/tokio/src/time/interval.rs
index a63e47b6e..2e153fdbe 100644
--- a/vendor/tokio/src/time/interval.rs
+++ b/vendor/tokio/src/time/interval.rs
@@ -1,9 +1,11 @@
use crate::future::poll_fn;
use crate::time::{sleep_until, Duration, Instant, Sleep};
+use crate::util::trace;
+use std::future::Future;
+use std::panic::Location;
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::{convert::TryInto, future::Future};
/// Creates new [`Interval`] that yields with interval of `period`. The first
/// tick completes immediately. The default [`MissedTickBehavior`] is
@@ -68,10 +70,10 @@ use std::{convert::TryInto, future::Future};
///
/// [`sleep`]: crate::time::sleep()
/// [`.tick().await`]: Interval::tick
+#[track_caller]
pub fn interval(period: Duration) -> Interval {
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
-
- interval_at(Instant::now(), period)
+ internal_interval_at(Instant::now(), period, trace::caller_location())
}
/// Creates new [`Interval`] that yields with interval of `period` with the
@@ -103,13 +105,44 @@ pub fn interval(period: Duration) -> Interval {
/// // approximately 70ms have elapsed.
/// }
/// ```
+#[track_caller]
pub fn interval_at(start: Instant, period: Duration) -> Interval {
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
+ internal_interval_at(start, period, trace::caller_location())
+}
+
+#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+fn internal_interval_at(
+ start: Instant,
+ period: Duration,
+ location: Option<&'static Location<'static>>,
+) -> Interval {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = location.expect("should have location if tracing");
+
+ tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Interval",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ )
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let delay = resource_span.in_scope(|| Box::pin(sleep_until(start)));
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let delay = Box::pin(sleep_until(start));
Interval {
- delay: Box::pin(sleep_until(start)),
+ delay,
period,
missed_tick_behavior: Default::default(),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
}
}
@@ -124,7 +157,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
///
/// #[tokio::main]
/// async fn main() {
-/// // ticks every 2 seconds
+/// // ticks every 2 milliseconds
/// let mut interval = time::interval(Duration::from_millis(2));
/// for _ in 0..5 {
/// interval.tick().await;
@@ -147,7 +180,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
/// milliseconds.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MissedTickBehavior {
- /// Tick as fast as possible until caught up.
+ /// Ticks as fast as possible until caught up.
///
/// When this strategy is used, [`Interval`] schedules ticks "normally" (the
/// same as it would have if the ticks hadn't been delayed), which results
@@ -174,6 +207,9 @@ pub enum MissedTickBehavior {
/// # async fn main() {
/// let mut interval = interval(Duration::from_millis(50));
///
+ /// // First tick resolves immediately after creation
+ /// interval.tick().await;
+ ///
/// task_that_takes_200_millis().await;
/// // The `Interval` has missed a tick
///
@@ -242,7 +278,7 @@ pub enum MissedTickBehavior {
/// // 50ms after the call to `tick` up above. That is, in `tick`, when we
/// // recognize that we missed a tick, we schedule the next tick to happen
/// // 50ms (or whatever the `period` is) from right then, not from when
- /// // were were *supposed* to tick
+ /// // were *supposed* to tick
/// interval.tick().await;
/// # }
/// ```
@@ -252,7 +288,7 @@ pub enum MissedTickBehavior {
/// [`tick`]: Interval::tick
Delay,
- /// Skip missed ticks and tick on the next multiple of `period` from
+ /// Skips missed ticks and tick on the next multiple of `period` from
/// `start`.
///
/// When this strategy is used, [`Interval`] schedules the next tick to fire
@@ -342,7 +378,7 @@ impl Default for MissedTickBehavior {
}
}
-/// Interval returned by [`interval`] and [`interval_at`]
+/// Interval returned by [`interval`] and [`interval_at`].
///
/// This type allows you to wait on a sequence of instants with a certain
/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
@@ -351,7 +387,7 @@ impl Default for MissedTickBehavior {
/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
///
/// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html
-/// [`sleep`]: crate::time::sleep
+/// [`sleep`]: crate::time::sleep()
#[derive(Debug)]
pub struct Interval {
/// Future that completes the next time the `Interval` yields a value.
@@ -362,11 +398,19 @@ pub struct Interval {
/// The strategy `Interval` should use when a tick is missed.
missed_tick_behavior: MissedTickBehavior,
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
}
impl Interval {
/// Completes when the next instant in the interval has been reached.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and
+ /// another branch completes first, then no tick has been consumed.
+ ///
/// # Examples
///
/// ```
@@ -379,6 +423,7 @@ impl Interval {
/// let mut interval = time::interval(Duration::from_millis(10));
///
/// interval.tick().await;
+ /// // approximately 0ms have elapsed. The first tick completes immediately.
/// interval.tick().await;
/// interval.tick().await;
///
@@ -386,10 +431,23 @@ impl Interval {
/// }
/// ```
pub async fn tick(&mut self) -> Instant {
- poll_fn(|cx| self.poll_tick(cx)).await
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let instant = trace::async_op(
+ || poll_fn(|cx| self.poll_tick(cx)),
+ resource_span,
+ "Interval::tick",
+ "poll_tick",
+ false,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let instant = poll_fn(|cx| self.poll_tick(cx));
+
+ instant.await
}
- /// Poll for the next instant in the interval to be reached.
+ /// Polls for the next instant in the interval to be reached.
///
/// This method can return the following values:
///
@@ -424,12 +482,45 @@ impl Interval {
timeout + self.period
};
- self.delay.as_mut().reset(next);
+ // When we arrive here, the internal delay returned `Poll::Ready`.
+ // Reset the delay but do not register it. It should be registered with
+ // the next call to [`poll_tick`].
+ self.delay.as_mut().reset_without_reregister(next);
// Return the time when we were scheduled to tick
Poll::Ready(timeout)
}
+ /// Resets the interval to complete one period after the current time.
+ ///
+ /// This method ignores [`MissedTickBehavior`] strategy.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut interval = time::interval(Duration::from_millis(100));
+ ///
+ /// interval.tick().await;
+ ///
+ /// time::sleep(Duration::from_millis(50)).await;
+ /// interval.reset();
+ ///
+ /// interval.tick().await;
+ /// interval.tick().await;
+ ///
+ /// // approximately 250ms have elapsed.
+ /// }
+ /// ```
+ pub fn reset(&mut self) {
+ self.delay.as_mut().reset(Instant::now() + self.period);
+ }
+
/// Returns the [`MissedTickBehavior`] strategy currently being used.
pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
self.missed_tick_behavior
diff --git a/vendor/tokio/src/time/mod.rs b/vendor/tokio/src/time/mod.rs
index 281990ef9..a1f27b839 100644
--- a/vendor/tokio/src/time/mod.rs
+++ b/vendor/tokio/src/time/mod.rs
@@ -82,17 +82,13 @@
//! ```
//!
//! [`interval`]: crate::time::interval()
+//! [`sleep`]: sleep()
mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};
-pub(crate) mod driver;
-
-#[doc(inline)]
-pub use driver::sleep::{sleep, sleep_until, Sleep};
-
pub mod error;
mod instant;
@@ -101,14 +97,13 @@ pub use self::instant::Instant;
mod interval;
pub use interval::{interval, interval_at, Interval, MissedTickBehavior};
+mod sleep;
+pub use sleep::{sleep, sleep_until, Sleep};
+
mod timeout;
#[doc(inline)]
pub use timeout::{timeout, timeout_at, Timeout};
-#[cfg(test)]
-#[cfg(not(loom))]
-mod tests;
-
// Re-export for convenience
#[doc(no_inline)]
pub use std::time::Duration;
diff --git a/vendor/tokio/src/time/sleep.rs b/vendor/tokio/src/time/sleep.rs
new file mode 100644
index 000000000..6c9b33793
--- /dev/null
+++ b/vendor/tokio/src/time/sleep.rs
@@ -0,0 +1,452 @@
+use crate::runtime::time::TimerEntry;
+use crate::time::{error::Error, Duration, Instant};
+use crate::util::trace;
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::panic::Location;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+/// Waits until `deadline` is reached.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep_until, Instant, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_until"))]
+#[track_caller]
+pub fn sleep_until(deadline: Instant) -> Sleep {
+ return Sleep::new_timeout(deadline, trace::caller_location());
+}
+
+/// Waits until `duration` has elapsed.
+///
+/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
+/// analog to `std::thread::sleep`.
+///
+/// No work is performed while awaiting on the sleep future to complete. `Sleep`
+/// operates at millisecond granularity and should not be used for tasks that
+/// require high-resolution timers. The implementation is platform specific,
+/// and some platforms (specifically Windows) will provide timers with a
+/// larger resolution than 1 ms.
+///
+/// To run something regularly on a schedule, see [`interval`].
+///
+/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
+///
+/// # Cancellation
+///
+/// Canceling a sleep instance is done by dropping the returned future. No additional
+/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep(Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+// Alias for old name in 0.x
+#[cfg_attr(docsrs, doc(alias = "delay_for"))]
+#[cfg_attr(docsrs, doc(alias = "wait"))]
+#[track_caller]
+pub fn sleep(duration: Duration) -> Sleep {
+ let location = trace::caller_location();
+
+ match Instant::now().checked_add(duration) {
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::new_timeout(Instant::far_future(), location),
+ }
+}
+
+pin_project! {
+ /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
+ ///
+ /// This type does not implement the `Unpin` trait, which means that if you
+ /// use it with [`select!`] or by calling `poll`, you have to pin it first.
+ /// If you use it with `.await`, this does not apply.
+ ///
+ /// # Examples
+ ///
+ /// Wait 100ms and print "100 ms have elapsed".
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// sleep(Duration::from_millis(100)).await;
+ /// println!("100 ms have elapsed");
+ /// }
+ /// ```
+ ///
+ /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
+ /// necessary when the same `Sleep` is selected on multiple times.
+ /// ```no_run
+ /// use tokio::time::{self, Duration, Instant};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let sleep = time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// loop {
+ /// tokio::select! {
+ /// () = &mut sleep => {
+ /// println!("timer elapsed");
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
+ /// },
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
+ /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ ///
+ /// struct HasSleep {
+ /// sleep: Pin<Box<Sleep>>,
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.sleep.as_mut().poll(cx)
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with pin projection. This method avoids the `Box`, but
+ /// the `HasSleep` struct will not be `Unpin` as a consequence.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ /// use pin_project_lite::pin_project;
+ ///
+ /// pin_project! {
+ /// struct HasSleep {
+ /// #[pin]
+ /// sleep: Sleep,
+ /// }
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.project().sleep.poll(cx)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`select!`]: ../macro.select.html
+ /// [`tokio::pin!`]: ../macro.pin.html
+ // Alias for old name in 0.2
+ #[cfg_attr(docsrs, doc(alias = "Delay"))]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Sleep {
+ inner: Inner,
+
+ // The link between the `Sleep` instance and the timer that drives it.
+ #[pin]
+ entry: TimerEntry,
+ }
+}
+
+cfg_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ ctx: trace::AsyncOpTracingCtx,
+ }
+}
+
+cfg_not_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ }
+}
+
+impl Sleep {
+ #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+ #[track_caller]
+ pub(crate) fn new_timeout(
+ deadline: Instant,
+ location: Option<&'static Location<'static>>,
+ ) -> Sleep {
+ use crate::runtime::scheduler;
+
+ let handle = scheduler::Handle::current();
+ let entry = TimerEntry::new(&handle, deadline);
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = {
+ let clock = handle.driver().clock();
+ let handle = &handle.driver().time();
+ let time_source = handle.time_source();
+ let deadline_tick = time_source.deadline_to_tick(deadline);
+ let duration = deadline_tick.saturating_sub(time_source.now(clock));
+
+ let location = location.expect("should have location if tracing");
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Sleep",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ let async_op_span = resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
+ });
+
+ let async_op_poll_span =
+ async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
+
+ let ctx = trace::AsyncOpTracingCtx {
+ async_op_span,
+ async_op_poll_span,
+ resource_span,
+ };
+
+ Inner { ctx }
+ };
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = Inner {};
+
+ Sleep { inner, entry }
+ }
+
+ pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
+ Self::new_timeout(Instant::far_future(), location)
+ }
+
+ /// Returns the instant at which the future will complete.
+ pub fn deadline(&self) -> Instant {
+ self.entry.deadline()
+ }
+
+ /// Returns `true` if `Sleep` has elapsed.
+ ///
+ /// A `Sleep` instance is elapsed when the requested duration has elapsed.
+ pub fn is_elapsed(&self) -> bool {
+ self.entry.is_elapsed()
+ }
+
+ /// Resets the `Sleep` instance to a new deadline.
+ ///
+ /// Calling this function allows changing the instant at which the `Sleep`
+ /// future completes without having to create new associated state.
+ ///
+ /// This function can be called both before and after the future has
+ /// completed.
+ ///
+ /// To call this method, you will usually combine the call with
+ /// [`Pin::as_mut`], which lets you call the method without consuming the
+ /// `Sleep` itself.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant};
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let sleep = tokio::time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
+ /// # }
+ /// ```
+ ///
+ /// See also the top-level examples.
+ ///
+ /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
+ pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
+ self.reset_inner(deadline)
+ }
+
+ /// Resets the `Sleep` instance to a new deadline without reregistering it
+ /// to be woken up.
+ ///
+ /// Calling this function allows changing the instant at which the `Sleep`
+ /// future completes without having to create new associated state and
+ /// without having it registered. This is required in e.g. the
+ /// [crate::time::Interval] where we want to reset the internal [Sleep]
+ /// without having it wake up the last task that polled it.
+ pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
+ let mut me = self.project();
+ me.entry.as_mut().reset(deadline, false);
+ }
+
+ fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
+ let mut me = self.project();
+ me.entry.as_mut().reset(deadline, true);
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ {
+ let _resource_enter = me.inner.ctx.resource_span.enter();
+ me.inner.ctx.async_op_span =
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
+ let _async_op_enter = me.inner.ctx.async_op_span.enter();
+
+ me.inner.ctx.async_op_poll_span =
+ tracing::trace_span!("runtime.resource.async_op.poll");
+
+ let duration = {
+ let clock = me.entry.clock();
+ let time_source = me.entry.driver().time_source();
+ let now = time_source.now(clock);
+ let deadline_tick = time_source.deadline_to_tick(deadline);
+ deadline_tick.saturating_sub(now)
+ };
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+ }
+ }
+
+ fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let me = self.project();
+
+ ready!(crate::trace::trace_leaf(cx));
+
+ // Keep track of task budget
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let coop = ready!(trace_poll_op!(
+ "poll_elapsed",
+ crate::runtime::coop::poll_proceed(cx),
+ ));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
+
+ let result = me.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ return trace_poll_op!("poll_elapsed", result);
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ return result;
+ }
+}
+
+impl Future for Sleep {
+ type Output = ();
+
+ // `poll_elapsed` can return an error in two cases:
+ //
+ // - AtCapacity: this is a pathological case where far too many
+ // sleep instances have been scheduled.
+ // - Shutdown: No timer has been setup, which is a mis-use error.
+ //
+ // Both cases are extremely rare, and pretty accurately fit into
+ // "logic errors", so we just panic in this case. A user couldn't
+ // really do much better if we passed the error onwards.
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _res_span = self.inner.ctx.resource_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_span = self.inner.ctx.async_op_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered();
+ match ready!(self.as_mut().poll_elapsed(cx)) {
+ Ok(()) => Poll::Ready(()),
+ Err(e) => panic!("timer error: {}", e),
+ }
+ }
+}
diff --git a/vendor/tokio/src/time/tests/mod.rs b/vendor/tokio/src/time/tests/mod.rs
deleted file mode 100644
index 35e1060ac..000000000
--- a/vendor/tokio/src/time/tests/mod.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-mod test_sleep;
-
-use crate::time::{self, Instant};
-use std::time::Duration;
-
-fn assert_send<T: Send>() {}
-fn assert_sync<T: Sync>() {}
-
-#[test]
-fn registration_is_send_and_sync() {
- use crate::time::Sleep;
-
- assert_send::<Sleep>();
- assert_sync::<Sleep>();
-}
-
-#[test]
-#[should_panic]
-fn sleep_is_eager() {
- let when = Instant::now() + Duration::from_millis(100);
- let _ = time::sleep_until(when);
-}
diff --git a/vendor/tokio/src/time/tests/test_sleep.rs b/vendor/tokio/src/time/tests/test_sleep.rs
deleted file mode 100644
index 77ca07e31..000000000
--- a/vendor/tokio/src/time/tests/test_sleep.rs
+++ /dev/null
@@ -1,443 +0,0 @@
-//use crate::time::driver::{Driver, Entry, Handle};
-
-/*
-macro_rules! poll {
- ($e:expr) => {
- $e.enter(|cx, e| e.poll_elapsed(cx))
- };
-}
-
-#[test]
-fn frozen_utility_returns_correct_advanced_duration() {
- let clock = Clock::new();
- clock.pause();
- let start = clock.now();
-
- clock.advance(ms(10));
- assert_eq!(clock.now() - start, ms(10));
-}
-
-#[test]
-fn immediate_sleep() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- let when = clock.now();
- let mut e = task::spawn(sleep_until(&handle, when));
-
- assert_ready_ok!(poll!(e));
-
- assert_ok!(driver.park_timeout(Duration::from_millis(1000)));
-
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.now() - start, ms(1000));
-}
-
-#[test]
-fn delayed_sleep_level_0() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- for &i in &[1, 10, 60] {
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, start + ms(i)));
-
- // The sleep instance has not elapsed.
- assert_pending!(poll!(e));
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(i));
-
- assert_ready_ok!(poll!(e));
- }
-}
-
-#[test]
-fn sub_ms_delayed_sleep() {
- let (mut driver, clock, handle) = setup();
-
- for _ in 0..5 {
- let deadline = clock.now() + ms(1) + Duration::new(0, 1);
-
- let mut e = task::spawn(sleep_until(&handle, deadline));
-
- assert_pending!(poll!(e));
-
- assert_ok!(driver.park());
- assert_ready_ok!(poll!(e));
-
- assert!(clock.now() >= deadline);
-
- clock.advance(Duration::new(0, 1));
- }
-}
-
-#[test]
-fn delayed_sleep_wrapping_level_0() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- assert_ok!(driver.park_timeout(ms(5)));
- assert_eq!(clock.now() - start, ms(5));
-
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(60)));
-
- assert_pending!(poll!(e));
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(64));
- assert_pending!(poll!(e));
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(65));
-
- assert_ready_ok!(poll!(e));
-}
-
-#[test]
-fn timer_wrapping_with_higher_levels() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Set sleep to hit level 1
- let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(64)));
- assert_pending!(poll!(e1));
-
- // Turn a bit
- assert_ok!(driver.park_timeout(ms(5)));
-
- // Set timeout such that it will hit level 0, but wrap
- let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(60)));
- assert_pending!(poll!(e2));
-
- // This should result in s1 firing
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(64));
-
- assert_ready_ok!(poll!(e1));
- assert_pending!(poll!(e2));
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(65));
-
- assert_ready_ok!(poll!(e1));
-}
-
-#[test]
-fn sleep_with_deadline_in_past() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create `Sleep` that elapsed immediately.
- let mut e = task::spawn(sleep_until(&handle, clock.now() - ms(100)));
-
- // Even though the `Sleep` expires in the past, it is not ready yet
- // because the timer must observe it.
- assert_ready_ok!(poll!(e));
-
- // Turn the timer, it runs for the elapsed time
- assert_ok!(driver.park_timeout(ms(1000)));
-
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.now() - start, ms(1000));
-}
-
-#[test]
-fn delayed_sleep_level_1() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234)));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- // Turn the timer, this will wake up to cascade the timer down.
- assert_ok!(driver.park_timeout(ms(1000)));
- assert_eq!(clock.now() - start, ms(192));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- // Turn the timer again
- assert_ok!(driver.park_timeout(ms(1000)));
- assert_eq!(clock.now() - start, ms(234));
-
- // The sleep has elapsed.
- assert_ready_ok!(poll!(e));
-
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234)));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- // Turn the timer with a smaller timeout than the cascade.
- assert_ok!(driver.park_timeout(ms(100)));
- assert_eq!(clock.now() - start, ms(100));
-
- assert_pending!(poll!(e));
-
- // Turn the timer, this will wake up to cascade the timer down.
- assert_ok!(driver.park_timeout(ms(1000)));
- assert_eq!(clock.now() - start, ms(192));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- // Turn the timer again
- assert_ok!(driver.park_timeout(ms(1000)));
- assert_eq!(clock.now() - start, ms(234));
-
- // The sleep has elapsed.
- assert_ready_ok!(poll!(e));
-}
-
-#[test]
-fn concurrently_set_two_timers_second_one_shorter() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(500)));
- let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(200)));
-
- // The sleep has not elapsed
- assert_pending!(poll!(e1));
- assert_pending!(poll!(e2));
-
- // Sleep until a cascade
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(192));
-
- // Sleep until the second timer.
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(200));
-
- // The shorter sleep fires
- assert_ready_ok!(poll!(e2));
- assert_pending!(poll!(e1));
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(448));
-
- assert_pending!(poll!(e1));
-
- // Turn again, this time the time will advance to the second sleep
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(500));
-
- assert_ready_ok!(poll!(e1));
-}
-
-#[test]
-fn short_sleep() {
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1)));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- // Turn the timer, but not enough time will go by.
- assert_ok!(driver.park());
-
- // The sleep has elapsed.
- assert_ready_ok!(poll!(e));
-
- // The time has advanced to the point of the sleep elapsing.
- assert_eq!(clock.now() - start, ms(1));
-}
-
-#[test]
-fn sorta_long_sleep_until() {
- const MIN_5: u64 = 5 * 60 * 1000;
-
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MIN_5)));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
-
- for &elapsed in cascades {
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(elapsed));
-
- assert_pending!(poll!(e));
- }
-
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(MIN_5));
-
- // The sleep has elapsed.
- assert_ready_ok!(poll!(e));
-}
-
-#[test]
-fn very_long_sleep() {
- const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
-
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- // Create a `Sleep` that elapses in the future
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MO_5)));
-
- // The sleep has not elapsed.
- assert_pending!(poll!(e));
-
- let cascades = &[
- 12_884_901_888,
- 12_952_010_752,
- 12_959_875_072,
- 12_959_997_952,
- ];
-
- for &elapsed in cascades {
- assert_ok!(driver.park());
- assert_eq!(clock.now() - start, ms(elapsed));
-
- assert_pending!(poll!(e));
- }
-
- // Turn the timer, but not enough time will go by.
- assert_ok!(driver.park());
-
- // The time has advanced to the point of the sleep elapsing.
- assert_eq!(clock.now() - start, ms(MO_5));
-
- // The sleep has elapsed.
- assert_ready_ok!(poll!(e));
-}
-
-#[test]
-fn unpark_is_delayed() {
- // A special park that will take much longer than the requested duration
- struct MockPark(Clock);
-
- struct MockUnpark;
-
- impl Park for MockPark {
- type Unpark = MockUnpark;
- type Error = ();
-
- fn unpark(&self) -> Self::Unpark {
- MockUnpark
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- panic!("parking forever");
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- assert_eq!(duration, ms(0));
- self.0.advance(ms(436));
- Ok(())
- }
-
- fn shutdown(&mut self) {}
- }
-
- impl Unpark for MockUnpark {
- fn unpark(&self) {}
- }
-
- let clock = Clock::new();
- clock.pause();
- let start = clock.now();
- let mut driver = Driver::new(MockPark(clock.clone()), clock.clone());
- let handle = driver.handle();
-
- let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(100)));
- let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(101)));
- let mut e3 = task::spawn(sleep_until(&handle, clock.now() + ms(200)));
-
- assert_pending!(poll!(e1));
- assert_pending!(poll!(e2));
- assert_pending!(poll!(e3));
-
- assert_ok!(driver.park());
-
- assert_eq!(clock.now() - start, ms(500));
-
- assert_ready_ok!(poll!(e1));
- assert_ready_ok!(poll!(e2));
- assert_ready_ok!(poll!(e3));
-}
-
-#[test]
-fn set_timeout_at_deadline_greater_than_max_timer() {
- const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
- const YR_5: u64 = 5 * YR_1;
-
- let (mut driver, clock, handle) = setup();
- let start = clock.now();
-
- for _ in 0..5 {
- assert_ok!(driver.park_timeout(ms(YR_1)));
- }
-
- let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1)));
- assert_pending!(poll!(e));
-
- assert_ok!(driver.park_timeout(ms(1000)));
- assert_eq!(clock.now() - start, ms(YR_5) + ms(1));
-
- assert_ready_ok!(poll!(e));
-}
-
-fn setup() -> (Driver<MockPark>, Clock, Handle) {
- let clock = Clock::new();
- clock.pause();
- let driver = Driver::new(MockPark(clock.clone()), clock.clone());
- let handle = driver.handle();
-
- (driver, clock, handle)
-}
-
-fn sleep_until(handle: &Handle, when: Instant) -> Arc<Entry> {
- Entry::new(&handle, when, ms(0))
-}
-
-struct MockPark(Clock);
-
-struct MockUnpark;
-
-impl Park for MockPark {
- type Unpark = MockUnpark;
- type Error = ();
-
- fn unpark(&self) -> Self::Unpark {
- MockUnpark
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- panic!("parking forever");
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.0.advance(duration);
- Ok(())
- }
-
- fn shutdown(&mut self) {}
-}
-
-impl Unpark for MockUnpark {
- fn unpark(&self) {}
-}
-
-fn ms(n: u64) -> Duration {
- Duration::from_millis(n)
-}
-*/
diff --git a/vendor/tokio/src/time/timeout.rs b/vendor/tokio/src/time/timeout.rs
index 61964ad24..52ab9891c 100644
--- a/vendor/tokio/src/time/timeout.rs
+++ b/vendor/tokio/src/time/timeout.rs
@@ -4,20 +4,39 @@
//!
//! [`Timeout`]: struct@Timeout
-use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep};
+use crate::{
+ runtime::coop,
+ time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
+ util::trace,
+};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
-/// Require a `Future` to complete before the specified duration has elapsed.
+/// Requires a `Future` to complete before the specified duration has elapsed.
///
/// If the future completes before the duration has elapsed, then the completed
/// value is returned. Otherwise, an error is returned and the future is
/// canceled.
///
-/// # Cancelation
+/// Note that the timeout is checked before polling the future, so if the future
+/// does not yield during execution then it is possible for the future to complete
+/// and exceed the timeout _without_ returning an error.
+///
+/// This function returns a future whose return type is [`Result`]`<T,`[`Elapsed`]`>`, where `T` is the
+/// return type of the provided future.
+///
+/// If the provided future completes immediately, then the future returned from
+/// this function is guaranteed to complete immediately with an [`Ok`] variant
+/// no matter the provided duration.
+///
+/// [`Ok`]: std::result::Result::Ok
+/// [`Result`]: std::result::Result
+/// [`Elapsed`]: crate::time::error::Elapsed
+///
+/// # Cancellation
///
/// Cancelling a timeout is done by dropping the future. No additional cleanup
/// or other work is required.
@@ -45,24 +64,56 @@ use std::task::{self, Poll};
/// }
/// # }
/// ```
-pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
+///
+/// # Panics
+///
+/// This function panics if there is no current timer set.
+///
+/// It can be triggered when [`Builder::enable_time`] or
+/// [`Builder::enable_all`] are not included in the builder.
+///
+/// It can also panic whenever a timer is created outside of a
+/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
+/// since the function is executed outside of the runtime.
+/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
+/// And this is because wrapping the function on an async makes it lazy,
+/// and so gets executed inside the runtime successfully without
+/// panicking.
+///
+/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+#[track_caller]
+pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
- T: Future,
+ F: Future,
{
+ let location = trace::caller_location();
+
let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
- Some(deadline) => Sleep::new_timeout(deadline),
- None => Sleep::far_future(),
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
}
-/// Require a `Future` to complete before the specified instant in time.
+/// Requires a `Future` to complete before the specified instant in time.
///
/// If the future completes before the instant is reached, then the completed
/// value is returned. Otherwise, an error is returned.
///
-/// # Cancelation
+/// This function returns a future whose return type is [`Result`]`<T,`[`Elapsed`]`>`, where `T` is the
+/// return type of the provided future.
+///
+/// If the provided future completes immediately, then the future returned from
+/// this function is guaranteed to complete immediately with an [`Ok`] variant
+/// no matter the provided deadline.
+///
+/// [`Ok`]: std::result::Result::Ok
+/// [`Result`]: std::result::Result
+/// [`Elapsed`]: crate::time::error::Elapsed
+///
+/// # Cancellation
///
/// Cancelling a timeout is done by dropping the future. No additional cleanup
/// or other work is required.
@@ -91,9 +142,9 @@ where
/// }
/// # }
/// ```
-pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T>
+pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
- T: Future,
+ F: Future,
{
let delay = sleep_until(deadline);
@@ -145,15 +196,33 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
+ let had_budget_before = coop::has_budget_remaining();
+
// First, try polling the future
if let Poll::Ready(v) = me.value.poll(cx) {
return Poll::Ready(Ok(v));
}
- // Now check the timer
- match me.delay.poll(cx) {
- Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
- Poll::Pending => Poll::Pending,
+ let has_budget_now = coop::has_budget_remaining();
+
+ let delay = me.delay;
+
+ let poll_delay = || -> Poll<Self::Output> {
+ match delay.poll(cx) {
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
+ Poll::Pending => Poll::Pending,
+ }
+ };
+
+ if let (true, false) = (had_budget_before, has_budget_now) {
+ // if it is the underlying future that exhausted the budget, we poll
+ // the `delay` with an unconstrained one. This prevents pathological
+ // cases where the underlying future always exhausts the budget and
+ // we never get a chance to evaluate whether the timeout was hit or
+ // not.
+ coop::with_unconstrained(poll_delay)
+ } else {
+ poll_delay()
}
}
}