diff options
Diffstat (limited to 'third_party/rust/tokio/src/time/driver')
-rw-r--r-- | third_party/rust/tokio/src/time/driver/entry.rs | 633 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/handle.rs | 94 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/mod.rs | 528 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/sleep.rs | 438 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/tests/mod.rs | 301 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/level.rs | 276 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/mod.rs | 359 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/stack.rs | 112 |
8 files changed, 2741 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/driver/entry.rs b/third_party/rust/tokio/src/time/driver/entry.rs new file mode 100644 index 0000000000..f0ea898e12 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/entry.rs @@ -0,0 +1,633 @@ +//! Timer state structures. +//! +//! This module contains the heart of the intrusive timer implementation, and as +//! such the structures inside are full of tricky concurrency and unsafe code. +//! +//! # Ground rules +//! +//! The heart of the timer implementation here is the [`TimerShared`] structure, +//! shared between the [`TimerEntry`] and the driver. Generally, we permit access +//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or +//! 2) a held driver lock. +//! +//! It follows from this that any changes made while holding BOTH 1 and 2 will +//! be reliably visible, regardless of ordering. This is because of the acq/rel +//! fences on the driver lock ensuring ordering with 2, and rust mutable +//! reference rules for 1 (a mutable reference to an object can't be passed +//! between threads without an acq/rel barrier, and same-thread we have local +//! happens-before ordering). +//! +//! # State field +//! +//! Each timer has a state field associated with it. This field contains either +//! the current scheduled time, or a special flag value indicating its state. +//! This state can either indicate that the timer is on the 'pending' queue (and +//! thus will be fired with an `Ok(())` result soon) or that it has already been +//! fired/deregistered. +//! +//! This single state field allows for code that is firing the timer to +//! synchronize with any racing `reset` calls reliably. +//! +//! # Cached vs true timeouts +//! +//! To allow for the use case of a timeout that is periodically reset before +//! expiration to be as lightweight as possible, we support optimistically +//! lock-free timer resets, in the case where a timer is rescheduled to a later +//! point than it was originally scheduled for. +//! +//! This is accomplished by lazily rescheduling timers. That is, we update the +//! state field field with the true expiration of the timer from the holder of +//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's +//! walking lists of timers), it checks this "true when" value, and reschedules +//! based on it. +//! +//! We do, however, also need to track what the expiration time was when we +//! originally registered the timer; this is used to locate the right linked +//! list when the timer is being cancelled. This is referred to as the "cached +//! when" internally. +//! +//! There is of course a race condition between timer reset and timer +//! expiration. If the driver fails to observe the updated expiration time, it +//! could trigger expiration of the timer too early. However, because +//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and +//! refuse to mark the timer as pending. +//! +//! [mark_pending]: TimerHandle::mark_pending + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU64; +use crate::loom::sync::atomic::Ordering; + +use crate::sync::AtomicWaker; +use crate::time::Instant; +use crate::util::linked_list; + +use super::Handle; + +use std::cell::UnsafeCell as StdUnsafeCell; +use std::task::{Context, Poll, Waker}; +use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; + +type TimerResult = Result<(), crate::time::error::Error>; + +const STATE_DEREGISTERED: u64 = u64::MAX; +const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; +const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; + +/// This structure holds the current shared state of the timer - its scheduled +/// time (if registered), or otherwise the result of the timer completing, as +/// well as the registered waker. +/// +/// Generally, the StateCell is only permitted to be accessed from two contexts: +/// Either a thread holding the corresponding &mut TimerEntry, or a thread +/// holding the timer driver lock. The write actions on the StateCell amount to +/// passing "ownership" of the StateCell between these contexts; moving a timer +/// from the TimerEntry to the driver requires _both_ holding the &mut +/// TimerEntry and the driver lock, while moving it back (firing the timer) +/// requires only the driver lock. +pub(super) struct StateCell { + /// Holds either the scheduled expiration time for this timer, or (if the + /// timer has been fired and is unregistered), `u64::MAX`. + state: AtomicU64, + /// If the timer is fired (an Acquire order read on state shows + /// `u64::MAX`), holds the result that should be returned from + /// polling the timer. Otherwise, the contents are unspecified and reading + /// without holding the driver lock is undefined behavior. + result: UnsafeCell<TimerResult>, + /// The currently-registered waker + waker: CachePadded<AtomicWaker>, +} + +impl Default for StateCell { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for StateCell { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "StateCell({:?})", self.read_state()) + } +} + +impl StateCell { + fn new() -> Self { + Self { + state: AtomicU64::new(STATE_DEREGISTERED), + result: UnsafeCell::new(Ok(())), + waker: CachePadded(AtomicWaker::new()), + } + } + + fn is_pending(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE + } + + /// Returns the current expiration time, or None if not currently scheduled. + fn when(&self) -> Option<u64> { + let cur_state = self.state.load(Ordering::Relaxed); + + if cur_state == u64::MAX { + None + } else { + Some(cur_state) + } + } + + /// If the timer is completed, returns the result of the timer. Otherwise, + /// returns None and registers the waker. + fn poll(&self, waker: &Waker) -> Poll<TimerResult> { + // We must register first. This ensures that either `fire` will + // observe the new waker, or we will observe a racing fire to have set + // the state, or both. + self.waker.0.register_by_ref(waker); + + self.read_state() + } + + fn read_state(&self) -> Poll<TimerResult> { + let cur_state = self.state.load(Ordering::Acquire); + + if cur_state == STATE_DEREGISTERED { + // SAFETY: The driver has fired this timer; this involves writing + // the result, and then writing (with release ordering) the state + // field. + Poll::Ready(unsafe { self.result.with(|p| *p) }) + } else { + Poll::Pending + } + } + + /// Marks this timer as being moved to the pending list, if its scheduled + /// time is not after `not_after`. + /// + /// If the timer is scheduled for a time after not_after, returns an Err + /// containing the current scheduled time. + /// + /// SAFETY: Must hold the driver lock. + unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + // Quick initial debug check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let mut cur_state = self.state.load(Ordering::Relaxed); + + loop { + debug_assert!(cur_state < STATE_MIN_VALUE); + + if cur_state > not_after { + break Err(cur_state); + } + + match self.state.compare_exchange( + cur_state, + STATE_PENDING_FIRE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + break Ok(()); + } + Err(actual_state) => { + cur_state = actual_state; + } + } + } + } + + /// Fires the timer, setting the result to the provided result. + /// + /// Returns: + /// * `Some(waker) - if fired and a waker needs to be invoked once the + /// driver lock is released + /// * `None` - if fired and a waker does not need to be invoked, or if + /// already fired + /// + /// SAFETY: The driver lock must be held. + unsafe fn fire(&self, result: TimerResult) -> Option<Waker> { + // Quick initial check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let cur_state = self.state.load(Ordering::Relaxed); + if cur_state == STATE_DEREGISTERED { + return None; + } + + // SAFETY: We assume the driver lock is held and the timer is not + // fired, so only the driver is accessing this field. + // + // We perform a release-ordered store to state below, to ensure this + // write is visible before the state update is visible. + unsafe { self.result.with_mut(|p| *p = result) }; + + self.state.store(STATE_DEREGISTERED, Ordering::Release); + + self.waker.0.take_waker() + } + + /// Marks the timer as registered (poll will return None) and sets the + /// expiration time. + /// + /// While this function is memory-safe, it should only be called from a + /// context holding both `&mut TimerEntry` and the driver lock. + fn set_expiration(&self, timestamp: u64) { + debug_assert!(timestamp < STATE_MIN_VALUE); + + // We can use relaxed ordering because we hold the driver lock and will + // fence when we release the lock. + self.state.store(timestamp, Ordering::Relaxed); + } + + /// Attempts to adjust the timer to a new timestamp. + /// + /// If the timer has already been fired, is pending firing, or the new + /// timestamp is earlier than the old timestamp, (or occasionally + /// spuriously) returns Err without changing the timer's state. In this + /// case, the timer must be deregistered and re-registered. + fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { + let mut prior = self.state.load(Ordering::Relaxed); + loop { + if new_timestamp < prior || prior >= STATE_MIN_VALUE { + return Err(()); + } + + match self.state.compare_exchange_weak( + prior, + new_timestamp, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + return Ok(()); + } + Err(true_prior) => { + prior = true_prior; + } + } + } + } + + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.load(Ordering::Relaxed) != u64::MAX + } +} + +/// A timer entry. +/// +/// This is the handle to a timer that is controlled by the requester of the +/// timer. As this participates in intrusive data structures, it must be pinned +/// before polling. +#[derive(Debug)] +pub(super) struct TimerEntry { + /// Arc reference to the driver. We can only free the driver after + /// deregistering everything from their respective timer wheels. + driver: Handle, + /// Shared inner structure; this is part of an intrusive linked list, and + /// therefore other references can exist to it while mutable references to + /// Entry exist. + /// + /// This is manipulated only under the inner mutex. TODO: Can we use loom + /// cells for this? + inner: StdUnsafeCell<TimerShared>, + /// Initial deadline for the timer. This is used to register on the first + /// poll, as we can't register prior to being pinned. + initial_deadline: Option<Instant>, + /// Ensure the type is !Unpin + _m: std::marker::PhantomPinned, +} + +unsafe impl Send for TimerEntry {} +unsafe impl Sync for TimerEntry {} + +/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the +/// timer entry. Generally, at most one TimerHandle exists for a timer at a time +/// (enforced by the timer state machine). +/// +/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats +/// of pointer safety apply. In particular, TimerHandle does not itself enforce +/// that the timer does still exist; however, normally an TimerHandle is created +/// immediately before registering the timer, and is consumed when firing the +/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce +/// memory safety, all operations are unsafe. +#[derive(Debug)] +pub(crate) struct TimerHandle { + inner: NonNull<TimerShared>, +} + +pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>; + +/// The shared state structure of a timer. This structure is shared between the +/// frontend (`Entry`) and driver backend. +/// +/// Note that this structure is located inside the `TimerEntry` structure. +#[derive(Debug)] +#[repr(C)] // required by `link_list::Link` impl +pub(crate) struct TimerShared { + /// Data manipulated by the driver thread itself, only. + driver_state: CachePadded<TimerSharedPadded>, + + /// Current state. This records whether the timer entry is currently under + /// the ownership of the driver, and if not, its current state (not + /// complete, fired, error, etc). + state: StateCell, + + _p: PhantomPinned, +} + +impl TimerShared { + pub(super) fn new() -> Self { + Self { + state: StateCell::default(), + driver_state: CachePadded(TimerSharedPadded::new()), + _p: PhantomPinned, + } + } + + /// Gets the cached time-of-expiration value. + pub(super) fn cached_when(&self) -> u64 { + // Cached-when is only accessed under the driver lock, so we can use relaxed + self.driver_state.0.cached_when.load(Ordering::Relaxed) + } + + /// Gets the true time-of-expiration value, and copies it into the cached + /// time-of-expiration value. + /// + /// SAFETY: Must be called with the driver lock held, and when this entry is + /// not in any timer wheel lists. + pub(super) unsafe fn sync_when(&self) -> u64 { + let true_when = self.true_when(); + + self.driver_state + .0 + .cached_when + .store(true_when, Ordering::Relaxed); + + true_when + } + + /// Sets the cached time-of-expiration value. + /// + /// SAFETY: Must be called with the driver lock held, and when this entry is + /// not in any timer wheel lists. + unsafe fn set_cached_when(&self, when: u64) { + self.driver_state + .0 + .cached_when + .store(when, Ordering::Relaxed); + } + + /// Returns the true time-of-expiration value, with relaxed memory ordering. + pub(super) fn true_when(&self) -> u64 { + self.state.when().expect("Timer already fired") + } + + /// Sets the true time-of-expiration value, even if it is less than the + /// current expiration or the timer is deregistered. + /// + /// SAFETY: Must only be called with the driver lock held and the entry not + /// in the timer wheel. + pub(super) unsafe fn set_expiration(&self, t: u64) { + self.state.set_expiration(t); + self.driver_state.0.cached_when.store(t, Ordering::Relaxed); + } + + /// Sets the true time-of-expiration only if it is after the current. + pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { + self.state.extend_expiration(t) + } + + /// Returns a TimerHandle for this timer. + pub(super) fn handle(&self) -> TimerHandle { + TimerHandle { + inner: NonNull::from(self), + } + } + + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.might_be_registered() + } +} + +/// Additional shared state between the driver and the timer which is cache +/// padded. This contains the information that the driver thread accesses most +/// frequently to minimize contention. In particular, we move it away from the +/// waker, as the waker is updated on every poll. +#[repr(C)] // required by `link_list::Link` impl +struct TimerSharedPadded { + /// A link within the doubly-linked list of timers on a particular level and + /// slot. Valid only if state is equal to Registered. + /// + /// Only accessed under the entry lock. + pointers: linked_list::Pointers<TimerShared>, + + /// The expiration time for which this entry is currently registered. + /// Generally owned by the driver, but is accessed by the entry when not + /// registered. + cached_when: AtomicU64, + + /// The true expiration time. Set by the timer future, read by the driver. + true_when: AtomicU64, +} + +impl std::fmt::Debug for TimerSharedPadded { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimerSharedPadded") + .field("when", &self.true_when.load(Ordering::Relaxed)) + .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) + .finish() + } +} + +impl TimerSharedPadded { + fn new() -> Self { + Self { + cached_when: AtomicU64::new(0), + true_when: AtomicU64::new(0), + pointers: linked_list::Pointers::new(), + } + } +} + +unsafe impl Send for TimerShared {} +unsafe impl Sync for TimerShared {} + +unsafe impl linked_list::Link for TimerShared { + type Handle = TimerHandle; + + type Target = TimerShared; + + fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> { + handle.inner + } + + unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle { + TimerHandle { inner: ptr } + } + + unsafe fn pointers( + target: NonNull<Self::Target>, + ) -> NonNull<linked_list::Pointers<Self::Target>> { + target.cast() + } +} + +// ===== impl Entry ===== + +impl TimerEntry { + pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { + let driver = handle.clone(); + + Self { + driver, + inner: StdUnsafeCell::new(TimerShared::new()), + initial_deadline: Some(deadline), + _m: std::marker::PhantomPinned, + } + } + + fn inner(&self) -> &TimerShared { + unsafe { &*self.inner.get() } + } + + pub(crate) fn is_elapsed(&self) -> bool { + !self.inner().state.might_be_registered() && self.initial_deadline.is_none() + } + + /// Cancels and deregisters the timer. This operation is irreversible. + pub(crate) fn cancel(self: Pin<&mut Self>) { + // We need to perform an acq/rel fence with the driver thread, and the + // simplest way to do so is to grab the driver lock. + // + // Why is this necessary? We're about to release this timer's memory for + // some other non-timer use. However, we've been doing a bunch of + // relaxed (or even non-atomic) writes from the driver thread, and we'll + // be doing more from _this thread_ (as this memory is interpreted as + // something else). + // + // It is critical to ensure that, from the point of view of the driver, + // those future non-timer writes happen-after the timer is fully fired, + // and from the purpose of this thread, the driver's writes all + // happen-before we drop the timer. This in turn requires us to perform + // an acquire-release barrier in _both_ directions between the driver + // and dropping thread. + // + // The lock acquisition in clear_entry serves this purpose. All of the + // driver manipulations happen with the lock held, so we can just take + // the lock and be sure that this drop happens-after everything the + // driver did so far and happens-before everything the driver does in + // the future. While we have the lock held, we also go ahead and + // deregister the entry if necessary. + unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; + } + + pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { + unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; + + let tick = self.driver.time_source().deadline_to_tick(new_time); + + if self.inner().extend_expiration(tick).is_ok() { + return; + } + + unsafe { + self.driver.reregister(tick, self.inner().into()); + } + } + + pub(crate) fn poll_elapsed( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), super::Error>> { + if self.driver.is_shutdown() { + panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); + } + + if let Some(deadline) = self.initial_deadline { + self.as_mut().reset(deadline); + } + + let this = unsafe { self.get_unchecked_mut() }; + + this.inner().state.poll(cx.waker()) + } +} + +impl TimerHandle { + pub(super) unsafe fn cached_when(&self) -> u64 { + unsafe { self.inner.as_ref().cached_when() } + } + + pub(super) unsafe fn sync_when(&self) -> u64 { + unsafe { self.inner.as_ref().sync_when() } + } + + pub(super) unsafe fn is_pending(&self) -> bool { + unsafe { self.inner.as_ref().state.is_pending() } + } + + /// Forcibly sets the true and cached expiration times to the given tick. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + pub(super) unsafe fn set_expiration(&self, tick: u64) { + self.inner.as_ref().set_expiration(tick); + } + + /// Attempts to mark this entry as pending. If the expiration time is after + /// `not_after`, however, returns an Err with the current expiration time. + /// + /// If an `Err` is returned, the `cached_when` value will be updated to this + /// new expiration time. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + /// After returning Ok, the entry must be added to the pending list. + pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + match self.inner.as_ref().state.mark_pending(not_after) { + Ok(()) => { + // mark this as being on the pending queue in cached_when + self.inner.as_ref().set_cached_when(u64::MAX); + Ok(()) + } + Err(tick) => { + self.inner.as_ref().set_cached_when(tick); + Err(tick) + } + } + } + + /// Attempts to transition to a terminal state. If the state is already a + /// terminal state, does nothing. + /// + /// Because the entry might be dropped after the state is moved to a + /// terminal state, this function consumes the handle to ensure we don't + /// access the entry afterwards. + /// + /// Returns the last-registered waker, if any. + /// + /// SAFETY: The driver lock must be held while invoking this function, and + /// the entry must not be in any wheel linked lists. + pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> { + self.inner.as_ref().state.fire(completed_state) + } +} + +impl Drop for TimerEntry { + fn drop(&mut self) { + unsafe { Pin::new_unchecked(self) }.as_mut().cancel() + } +} + +#[cfg_attr(target_arch = "x86_64", repr(align(128)))] +#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] +#[derive(Debug, Default)] +struct CachePadded<T>(T); diff --git a/third_party/rust/tokio/src/time/driver/handle.rs b/third_party/rust/tokio/src/time/driver/handle.rs new file mode 100644 index 0000000000..b61c0476e1 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/handle.rs @@ -0,0 +1,94 @@ +use crate::loom::sync::Arc; +use crate::time::driver::ClockTime; +use std::fmt; + +/// Handle to time driver instance. +#[derive(Clone)] +pub(crate) struct Handle { + time_source: ClockTime, + inner: Arc<super::Inner>, +} + +impl Handle { + /// Creates a new timer `Handle` from a shared `Inner` timer state. + pub(super) fn new(inner: Arc<super::Inner>) -> Self { + let time_source = inner.state.lock().time_source.clone(); + Handle { time_source, inner } + } + + /// Returns the time source associated with this handle. + pub(super) fn time_source(&self) -> &ClockTime { + &self.time_source + } + + /// Access the driver's inner structure. + pub(super) fn get(&self) -> &super::Inner { + &*self.inner + } + + /// Checks whether the driver has been shutdown. + pub(super) fn is_shutdown(&self) -> bool { + self.inner.is_shutdown() + } +} + +cfg_rt! { + impl Handle { + /// Tries to get a handle to the current timer. + /// + /// # Panics + /// + /// This function panics if there is no current timer set. + /// + /// It can be triggered when [`Builder::enable_time`] or + /// [`Builder::enable_all`] are not included in the builder. + /// + /// It can also panic whenever a timer is created outside of a + /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, + /// since the function is executed outside of the runtime. + /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. + /// And this is because wrapping the function on an async makes it lazy, + /// and so gets executed inside the runtime successfully without + /// panicking. + /// + /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time + /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all + pub(crate) fn current() -> Self { + crate::runtime::context::time_handle() + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + } + } +} + +cfg_not_rt! { + impl Handle { + /// Tries to get a handle to the current timer. + /// + /// # Panics + /// + /// This function panics if there is no current timer set. + /// + /// It can be triggered when [`Builder::enable_time`] or + /// [`Builder::enable_all`] are not included in the builder. + /// + /// It can also panic whenever a timer is created outside of a + /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, + /// since the function is executed outside of the runtime. + /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. + /// And this is because wrapping the function on an async makes it lazy, + /// and so gets executed inside the runtime successfully without + /// panicking. + /// + /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time + /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all + pub(crate) fn current() -> Self { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} diff --git a/third_party/rust/tokio/src/time/driver/mod.rs b/third_party/rust/tokio/src/time/driver/mod.rs new file mode 100644 index 0000000000..9971877479 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/mod.rs @@ -0,0 +1,528 @@ +// Currently, rust warns when an unsafe fn contains an unsafe {} block. However, +// in the future, this will change to the reverse. For now, suppress this +// warning and generally stick with being explicit about unsafety. +#![allow(unused_unsafe)] +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Time driver. + +mod entry; +pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; + +mod handle; +pub(crate) use self::handle::Handle; + +mod wheel; + +pub(super) mod sleep; + +use crate::loom::sync::atomic::{AtomicBool, Ordering}; +use crate::loom::sync::{Arc, Mutex}; +use crate::park::{Park, Unpark}; +use crate::time::error::Error; +use crate::time::{Clock, Duration, Instant}; + +use std::convert::TryInto; +use std::fmt; +use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; + +/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. +/// +/// A `Driver` instance tracks the state necessary for managing time and +/// notifying the [`Sleep`][sleep] instances once their deadlines are reached. +/// +/// It is expected that a single instance manages many individual [`Sleep`][sleep] +/// instances. The `Driver` implementation is thread-safe and, as such, is able +/// to handle callers from across threads. +/// +/// After creating the `Driver` instance, the caller must repeatedly call `park` +/// or `park_timeout`. The time driver will perform no work unless `park` or +/// `park_timeout` is called repeatedly. +/// +/// The driver has a resolution of one millisecond. Any unit of time that falls +/// between milliseconds are rounded up to the next millisecond. +/// +/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not +/// elapsed will be notified with an error. At this point, calling `poll` on the +/// [`Sleep`][sleep] instance will result in panic. +/// +/// # Implementation +/// +/// The time driver is based on the [paper by Varghese and Lauck][paper]. +/// +/// A hashed timing wheel is a vector of slots, where each slot handles a time +/// slice. As time progresses, the timer walks over the slot for the current +/// instant, and processes each entry for that slot. When the timer reaches the +/// end of the wheel, it starts again at the beginning. +/// +/// The implementation maintains six wheels arranged in a set of levels. As the +/// levels go up, the slots of the associated wheel represent larger intervals +/// of time. At each level, the wheel has 64 slots. Each slot covers a range of +/// time equal to the wheel at the lower level. At level zero, each slot +/// represents one millisecond of time. +/// +/// The wheels are: +/// +/// * Level 0: 64 x 1 millisecond slots. +/// * Level 1: 64 x 64 millisecond slots. +/// * Level 2: 64 x ~4 second slots. +/// * Level 3: 64 x ~4 minute slots. +/// * Level 4: 64 x ~4 hour slots. +/// * Level 5: 64 x ~12 day slots. +/// +/// When the timer processes entries at level zero, it will notify all the +/// `Sleep` instances as their deadlines have been reached. For all higher +/// levels, all entries will be redistributed across the wheel at the next level +/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will +/// either be canceled (dropped) or their associated entries will reach level +/// zero and be notified. +/// +/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf +/// [sleep]: crate::time::Sleep +/// [timeout]: crate::time::Timeout +/// [interval]: crate::time::Interval +#[derive(Debug)] +pub(crate) struct Driver<P: Park + 'static> { + /// Timing backend in use. + time_source: ClockTime, + + /// Shared state. + handle: Handle, + + /// Parker to delegate to. + park: P, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, +} + +/// A structure which handles conversion from Instants to u64 timestamps. +#[derive(Debug, Clone)] +pub(self) struct ClockTime { + clock: super::clock::Clock, + start_time: Instant, +} + +impl ClockTime { + pub(self) fn new(clock: Clock) -> Self { + Self { + start_time: clock.now(), + clock, + } + } + + pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { + // Round up to the end of a ms + self.instant_to_tick(t + Duration::from_nanos(999_999)) + } + + pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { + // round up + let dur: Duration = t + .checked_duration_since(self.start_time) + .unwrap_or_else(|| Duration::from_secs(0)); + let ms = dur.as_millis(); + + ms.try_into().unwrap_or(u64::MAX) + } + + pub(self) fn tick_to_duration(&self, t: u64) -> Duration { + Duration::from_millis(t) + } + + pub(self) fn now(&self) -> u64 { + self.instant_to_tick(self.clock.now()) + } +} + +/// Timer state shared between `Driver`, `Handle`, and `Registration`. +struct Inner { + // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex + pub(super) state: Mutex<InnerState>, + + /// True if the driver is being shutdown. + pub(super) is_shutdown: AtomicBool, +} + +/// Time state shared which must be protected by a `Mutex` +struct InnerState { + /// Timing backend in use. + time_source: ClockTime, + + /// The last published timer `elapsed` value. + elapsed: u64, + + /// The earliest time at which we promise to wake up without unparking. + next_wake: Option<NonZeroU64>, + + /// Timer wheel. + wheel: wheel::Wheel, + + /// Unparker that can be used to wake the time driver. + unpark: Box<dyn Unpark>, +} + +// ===== impl Driver ===== + +impl<P> Driver<P> +where + P: Park + 'static, +{ + /// Creates a new `Driver` instance that uses `park` to block the current + /// thread and `time_source` to get the current time and convert to ticks. + /// + /// Specifying the source of time is useful when testing. + pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { + let time_source = ClockTime::new(clock); + + let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); + + Driver { + time_source, + handle: Handle::new(Arc::new(inner)), + park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), + } + } + + /// Returns a handle to the timer. + /// + /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances + /// can either be created directly or the `Handle` instance can be passed to + /// `with_default`, setting the timer as the default timer for the execution + /// context. + pub(crate) fn handle(&self) -> Handle { + self.handle.clone() + } + + fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { + let mut lock = self.handle.get().state.lock(); + + assert!(!self.handle.is_shutdown()); + + let next_wake = lock.wheel.next_expiration_time(); + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + + drop(lock); + + match next_wake { + Some(when) => { + let now = self.time_source.now(); + // Note that we effectively round up to 1ms here - this avoids + // very short-duration microsecond-resolution sleeps that the OS + // might treat as zero-length. + let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); + + if duration > Duration::from_millis(0) { + if let Some(limit) = limit { + duration = std::cmp::min(limit, duration); + } + + self.park_timeout(duration)?; + } else { + self.park.park_timeout(Duration::from_secs(0))?; + } + } + None => { + if let Some(duration) = limit { + self.park_timeout(duration)?; + } else { + self.park.park()?; + } + } + } + + // Process pending timers after waking up + self.handle.process(); + + Ok(()) + } + + cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) + } + } +} + +impl Handle { + /// Runs timer related logic, and returns the next wakeup time + pub(self) fn process(&self) { + let now = self.time_source().now(); + + self.process_at_time(now) + } + + pub(self) fn process_at_time(&self, mut now: u64) { + let mut waker_list: [Option<Waker>; 32] = Default::default(); + let mut waker_idx = 0; + + let mut lock = self.get().lock(); + + if now < lock.elapsed { + // Time went backwards! This normally shouldn't happen as the Rust language + // guarantees that an Instant is monotonic, but can happen when running + // Linux in a VM on a Windows host due to std incorrectly trusting the + // hardware clock to be monotonic. + // + // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. + now = lock.elapsed; + } + + while let Some(entry) = lock.wheel.poll(now) { + debug_assert!(unsafe { entry.is_pending() }); + + // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. + if let Some(waker) = unsafe { entry.fire(Ok(())) } { + waker_list[waker_idx] = Some(waker); + + waker_idx += 1; + + if waker_idx == waker_list.len() { + // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. + drop(lock); + + for waker in waker_list.iter_mut() { + waker.take().unwrap().wake(); + } + + waker_idx = 0; + + lock = self.get().lock(); + } + } + } + + // Update the elapsed cache + lock.elapsed = lock.wheel.elapsed(); + lock.next_wake = lock + .wheel + .poll_at() + .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + + drop(lock); + + for waker in waker_list[0..waker_idx].iter_mut() { + waker.take().unwrap().wake(); + } + } + + /// Removes a registered timer from the driver. + /// + /// The timer will be moved to the cancelled state. Wakers will _not_ be + /// invoked. If the timer is already completed, this function is a no-op. + /// + /// This function always acquires the driver lock, even if the entry does + /// not appear to be registered. + /// + /// SAFETY: The timer must not be registered with some other driver, and + /// `add_entry` must not be called concurrently. + pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { + unsafe { + let mut lock = self.get().lock(); + + if entry.as_ref().might_be_registered() { + lock.wheel.remove(entry); + } + + entry.as_ref().handle().fire(Ok(())); + } + } + + /// Removes and re-adds an entry to the driver. + /// + /// SAFETY: The timer must be either unregistered, or registered with this + /// driver. No other threads are allowed to concurrently manipulate the + /// timer at all (the current thread should hold an exclusive reference to + /// the `TimerEntry`) + pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { + let waker = unsafe { + let mut lock = self.get().lock(); + + // We may have raced with a firing/deregistration, so check before + // deregistering. + if unsafe { entry.as_ref().might_be_registered() } { + lock.wheel.remove(entry); + } + + // Now that we have exclusive control of this entry, mint a handle to reinsert it. + let entry = entry.as_ref().handle(); + + if self.is_shutdown() { + unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } + } else { + entry.set_expiration(new_tick); + + // Note: We don't have to worry about racing with some other resetting + // thread, because add_entry and reregister require exclusive control of + // the timer entry. + match unsafe { lock.wheel.insert(entry) } { + Ok(when) => { + if lock + .next_wake + .map(|next_wake| when < next_wake.get()) + .unwrap_or(true) + { + lock.unpark.unpark(); + } + + None + } + Err((entry, super::error::InsertError::Elapsed)) => unsafe { + entry.fire(Ok(())) + }, + } + } + + // Must release lock before invoking waker to avoid the risk of deadlock. + }; + + // The timer was fired synchronously as a result of the reregistration. + // Wake the waker; this is needed because we might reset _after_ a poll, + // and otherwise the task won't be awoken to poll again. + if let Some(waker) = waker { + waker.wake(); + } + } +} + +impl<P> Park for Driver<P> +where + P: Park + 'static, +{ + type Unpark = TimerUnpark<P>; + type Error = P::Error; + + fn unpark(&self) -> Self::Unpark { + TimerUnpark::new(self) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park_internal(None) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park_internal(Some(duration)) + } + + fn shutdown(&mut self) { + if self.handle.is_shutdown() { + return; + } + + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); + + // Advance time forward to the end of time. + + self.handle.process_at_time(u64::MAX); + + self.park.shutdown(); + } +} + +impl<P> Drop for Driver<P> +where + P: Park + 'static, +{ + fn drop(&mut self) { + self.shutdown(); + } +} + +pub(crate) struct TimerUnpark<P: Park + 'static> { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, +} + +impl<P: Park + 'static> TimerUnpark<P> { + fn new(driver: &Driver<P>) -> TimerUnpark<P> { + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl<P: Park + 'static> Unpark for TimerUnpark<P> { + fn unpark(&self) { + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); + } +} + +// ===== impl Inner ===== + +impl Inner { + pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { + Inner { + state: Mutex::new(InnerState { + time_source, + elapsed: 0, + next_wake: None, + unpark, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), + } + } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { + self.state.lock() + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Inner").finish() + } +} + +#[cfg(test)] +mod tests; diff --git a/third_party/rust/tokio/src/time/driver/sleep.rs b/third_party/rust/tokio/src/time/driver/sleep.rs new file mode 100644 index 0000000000..7f27ef201f --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/sleep.rs @@ -0,0 +1,438 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::time::driver::ClockTime; +use crate::time::driver::{Handle, TimerEntry}; +use crate::time::{error::Error, Duration, Instant}; +use crate::util::trace; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::panic::Location; +use std::pin::Pin; +use std::task::{self, Poll}; + +/// Waits until `deadline` is reached. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep_until, Instant, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep_until(Instant::now() + Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_until"))] +#[track_caller] +pub fn sleep_until(deadline: Instant) -> Sleep { + return Sleep::new_timeout(deadline, trace::caller_location()); +} + +/// Waits until `duration` has elapsed. +/// +/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous +/// analog to `std::thread::sleep`. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years). +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep(Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// # Panics +/// +/// This function panics if there is no current timer set. +/// +/// It can be triggered when [`Builder::enable_time`] or +/// [`Builder::enable_all`] are not included in the builder. +/// +/// It can also panic whenever a timer is created outside of a +/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic, +/// since the function is executed outside of the runtime. +/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic. +/// And this is because wrapping the function on an async makes it lazy, +/// and so gets executed inside the runtime successfully without +/// panicking. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() +/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time +/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all +// Alias for old name in 0.x +#[cfg_attr(docsrs, doc(alias = "delay_for"))] +#[cfg_attr(docsrs, doc(alias = "wait"))] +#[track_caller] +pub fn sleep(duration: Duration) -> Sleep { + let location = trace::caller_location(); + + match Instant::now().checked_add(duration) { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::new_timeout(Instant::far_future(), location), + } +} + +pin_project! { + /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). + /// + /// This type does not implement the `Unpin` trait, which means that if you + /// use it with [`select!`] or by calling `poll`, you have to pin it first. + /// If you use it with `.await`, this does not apply. + /// + /// # Examples + /// + /// Wait 100ms and print "100 ms have elapsed". + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// sleep(Duration::from_millis(100)).await; + /// println!("100 ms have elapsed"); + /// } + /// ``` + /// + /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is + /// necessary when the same `Sleep` is selected on multiple times. + /// ```no_run + /// use tokio::time::{self, Duration, Instant}; + /// + /// #[tokio::main] + /// async fn main() { + /// let sleep = time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// loop { + /// tokio::select! { + /// () = &mut sleep => { + /// println!("timer elapsed"); + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); + /// }, + /// } + /// } + /// } + /// ``` + /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the + /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// + /// struct HasSleep { + /// sleep: Pin<Box<Sleep>>, + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.sleep.as_mut().poll(cx) + /// } + /// } + /// ``` + /// Use in a struct with pin projection. This method avoids the `Box`, but + /// the `HasSleep` struct will not be `Unpin` as a consequence. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// use pin_project_lite::pin_project; + /// + /// pin_project! { + /// struct HasSleep { + /// #[pin] + /// sleep: Sleep, + /// } + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.project().sleep.poll(cx) + /// } + /// } + /// ``` + /// + /// [`select!`]: ../macro.select.html + /// [`tokio::pin!`]: ../macro.pin.html + // Alias for old name in 0.2 + #[cfg_attr(docsrs, doc(alias = "Delay"))] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Sleep { + inner: Inner, + + // The link between the `Sleep` instance and the timer that drives it. + #[pin] + entry: TimerEntry, + } +} + +cfg_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + ctx: trace::AsyncOpTracingCtx, + time_source: ClockTime, + } +} + +cfg_not_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + } +} + +impl Sleep { + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { + let handle = Handle::current(); + let entry = TimerEntry::new(&handle, deadline); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let time_source = handle.time_source().clone(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); + + let location = location.expect("should have location if tracing"); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; + + Inner { + deadline, + ctx, + time_source, + } + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner { deadline }; + + Sleep { inner, entry } + } + + pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { + Self::new_timeout(Instant::far_future(), location) + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.inner.deadline + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.entry.is_elapsed() + } + + /// Resets the `Sleep` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method without consuming the + /// `Sleep` itself. + /// + /// # Example + /// + /// ``` + /// use tokio::time::{Duration, Instant}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let sleep = tokio::time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); + /// # } + /// ``` + /// + /// See also the top-level examples. + /// + /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.reset_inner(deadline) + } + + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { + let me = self.project(); + me.entry.reset(deadline); + (*me.inner).deadline = deadline; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + let _resource_enter = me.inner.ctx.resource_span.enter(); + me.inner.ctx.async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + let _async_op_enter = me.inner.ctx.async_op_span.enter(); + + me.inner.ctx.async_op_poll_span = + tracing::trace_span!("runtime.resource.async_op.poll"); + + let duration = { + let now = me.inner.time_source.now(); + let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + deadline_tick.checked_sub(now).unwrap_or(0) + }; + + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + } + } + + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let me = self.project(); + + // Keep track of task budget + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::coop::poll_proceed(cx), + )); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); + + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_elapsed", result); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return result; + } +} + +impl Future for Sleep { + type Output = (); + + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathological case where far too many + // sleep instances have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.inner.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.inner.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); + match ready!(self.as_mut().poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/tests/mod.rs b/third_party/rust/tokio/src/time/driver/tests/mod.rs new file mode 100644 index 0000000000..3ac8c75643 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/tests/mod.rs @@ -0,0 +1,301 @@ +use std::{task::Context, time::Duration}; + +#[cfg(not(loom))] +use futures::task::noop_waker_ref; + +use crate::loom::sync::Arc; +use crate::loom::thread; +use crate::{ + loom::sync::atomic::{AtomicBool, Ordering}, + park::Unpark, +}; + +use super::{Handle, TimerEntry}; + +struct MockUnpark {} +impl Unpark for MockUnpark { + fn unpark(&self) {} +} +impl MockUnpark { + fn mock() -> Box<dyn Unpark> { + Box::new(Self {}) + } +} + +fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { + #[cfg(loom)] + return loom::future::block_on(f); + + #[cfg(not(loom))] + { + let rt = crate::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(f) + } +} + +fn model(f: impl Fn() + Send + Sync + 'static) { + #[cfg(loom)] + loom::model(f); + + #[cfg(not(loom))] + f(); +} + +#[test] +fn single_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // This may or may not return Some (depending on how it races with the + // thread). If it does return None, however, the timer should complete + // synchronously. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn drop_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + }); + + thread::yield_now(); + + // advance 2s in the future. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn change_waker() { + model(|| { + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // advance 2s + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn reset_future() { + model(|| { + let finished_early = Arc::new(AtomicBool::new(false)); + + let clock = crate::time::clock::Clock::new(true, false); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let handle_ = handle.clone(); + let finished_early_ = finished_early.clone(); + let start = clock.now(); + + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + entry.as_mut().reset(start + Duration::from_secs(2)); + + // shouldn't complete before 2s + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + + finished_early_.store(true, Ordering::Relaxed); + }); + + thread::yield_now(); + + // This may or may not return a wakeup time. + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500))); + + assert!(!finished_early.load(Ordering::Relaxed)); + + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500))); + + jh.join().unwrap(); + + assert!(finished_early.load(Ordering::Relaxed)); + }) +} + +#[cfg(not(loom))] +fn normal_or_miri<T>(normal: T, miri: T) -> T { + if cfg!(miri) { + miri + } else { + normal + } +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels() { + let clock = crate::time::clock::Clock::new(true, false); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let mut entries = vec![]; + + for i in 0..normal_or_miri(1024, 64) { + let mut entry = Box::pin(TimerEntry::new( + &handle, + clock.now() + Duration::from_millis(i), + )); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); + + entries.push(entry); + } + + for t in 1..normal_or_miri(1024, 64) { + handle.process_at_time(t as u64); + for (deadline, future) in entries.iter_mut().enumerate() { + let mut context = Context::from_waker(noop_waker_ref()); + if deadline <= t { + assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); + } else { + assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); + } + } + } +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels_targeted() { + let mut context = Context::from_waker(noop_waker_ref()); + + let clock = crate::time::clock::Clock::new(true, false); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(inner)); + + let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); + pin!(e1); + + handle.process_at_time(62); + assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); + handle.process_at_time(192); + handle.process_at_time(192); +} + +/* +#[test] +fn balanced_incr_and_decr() { + const OPS: usize = 5; + + fn incr(inner: Arc<Inner>) { + for _ in 0..OPS { + inner.increment().expect("increment should not have failed"); + thread::yield_now(); + } + } + + fn decr(inner: Arc<Inner>) { + let mut ops_performed = 0; + while ops_performed < OPS { + if inner.num(Ordering::Relaxed) > 0 { + ops_performed += 1; + inner.decrement(); + } + thread::yield_now(); + } + } + + loom::model(|| { + let unpark = Box::new(MockUnpark); + let instant = Instant::now(); + + let inner = Arc::new(Inner::new(instant, unpark)); + + let incr_inner = inner.clone(); + let decr_inner = inner.clone(); + + let incr_hndle = thread::spawn(move || incr(incr_inner)); + let decr_hndle = thread::spawn(move || decr(decr_inner)); + + incr_hndle.join().expect("should never fail"); + decr_hndle.join().expect("should never fail"); + + assert_eq!(inner.num(Ordering::SeqCst), 0); + }) +} +*/ diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs new file mode 100644 index 0000000000..878754177b --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs @@ -0,0 +1,276 @@ +use crate::time::driver::TimerHandle; + +use crate::time::driver::{EntryList, TimerShared}; + +use std::{fmt, ptr::NonNull}; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell. + slot: [EntryList; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub(crate) struct Expiration { + /// The level containing the slot. + pub(crate) level: usize, + + /// The slot index. + pub(crate) slot: usize, + + /// The instant at which the slot needs to be processed. + pub(crate) deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = EntryList::default; + + Level { + level, + occupied: 0, + slot: [ + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // Compute the start date of the current level by masking the low bits + // of `now` (`level_range` is a power of 2). + let level_start = now & !(level_range - 1); + let mut deadline = level_start + slot as u64 * slot_range; + + if deadline <= now { + // A timer is in a slot "prior" to the current time. This can occur + // because we do not have an infinite hierarchy of timer levels, and + // eventually a timer scheduled for a very distant time might end up + // being placed in a slot that is beyond the end of all of the + // arrays. + // + // To deal with this, we first limit timers to being scheduled no + // more than MAX_DURATION ticks in the future; that is, they're at + // most one rotation of the top level away. Then, we force timers + // that logically would go into the top+1 level, to instead go into + // the top level's slots. + // + // What this means is that the top level's slots act as a + // pseudo-ring buffer, and we rotate around them indefinitely. If we + // compute a deadline before now, and it's the top level, it + // therefore means we're actually looking at a slot in the future. + debug_assert_eq!(self.level, super::NUM_LEVELS - 1); + + deadline += level_range; + } + + debug_assert!( + deadline >= now, + "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}", + deadline, + now, + self.level, + level_range, + slot_range, + slot, + self.occupied + ); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option<usize> { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) { + let slot = slot_for(item.cached_when(), self.level); + + self.slot[slot].push_front(item); + + self.occupied |= occupied_bit(slot); + } + + pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) { + let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level); + + unsafe { self.slot[slot].remove(item) }; + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList { + self.occupied &= !occupied_bit(slot); + + std::mem::take(&mut self.slot[slot]) + } +} + +impl fmt::Debug for Level { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + 1 << slot +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Converts a duration (milliseconds) and a level to a slot position. +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 0..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs new file mode 100644 index 0000000000..f088f2cfd6 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs @@ -0,0 +1,359 @@ +use crate::time::driver::{TimerHandle, TimerShared}; +use crate::time::error::InsertError; + +mod level; +pub(crate) use self::level::Expiration; +use self::level::Level; + +use std::ptr::NonNull; + +use super::EntryList; + +/// Timing wheel implementation. +/// +/// This type provides the hashed timing wheel implementation that backs `Timer` +/// and `DelayQueue`. +/// +/// The structure is generic over `T: Stack`. This allows handling timeout data +/// being stored on the heap or in a slab. In order to support the latter case, +/// the slab must be passed into each function allowing the implementation to +/// lookup timer entries. +/// +/// See `Timer` documentation for some implementation notes. +#[derive(Debug)] +pub(crate) struct Wheel { + /// The number of milliseconds elapsed since the wheel started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec<Level>, + + /// Entries queued for firing + pending: EntryList, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a `Sleep`. +pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +impl Wheel { + /// Creates a new timing wheel. + pub(crate) fn new() -> Wheel { + let levels = (0..NUM_LEVELS).map(Level::new).collect(); + + Wheel { + elapsed: 0, + levels, + pending: EntryList::new(), + } + } + + /// Returns the number of milliseconds that have elapsed since the timing + /// wheel's creation. + pub(crate) fn elapsed(&self) -> u64 { + self.elapsed + } + + /// Inserts an entry into the timing wheel. + /// + /// # Arguments + /// + /// * `item`: The item to insert into the wheel. + /// + /// # Return + /// + /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. + /// + /// `Err(Elapsed)` indicates that `when` represents an instant that has + /// already passed. In this case, the caller should fire the timeout + /// immediately. + /// + /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + /// + /// # Safety + /// + /// This function registers item into an intrusive linked list. The caller + /// must ensure that `item` is pinned and will not be dropped without first + /// being deregistered. + pub(crate) unsafe fn insert( + &mut self, + item: TimerHandle, + ) -> Result<u64, (TimerHandle, InsertError)> { + let when = item.sync_when(); + + if when <= self.elapsed { + return Err((item, InsertError::Elapsed)); + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + unsafe { + self.levels[level].add_entry(item); + } + + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + + Ok(when) + } + + /// Removes `item` from the timing wheel. + pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) { + unsafe { + let when = item.as_ref().cached_when(); + if when == u64::MAX { + self.pending.remove(item); + } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + + let level = self.level_for(when); + + self.levels[level].remove_entry(item); + } + } + } + + /// Instant at which to poll. + pub(crate) fn poll_at(&self) -> Option<u64> { + self.next_expiration().map(|expiration| expiration.deadline) + } + + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> { + loop { + if let Some(handle) = self.pending.pop_back() { + return Some(handle); + } + + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); + + match expiration { + Some(ref expiration) if expiration.deadline > now => return None, + Some(ref expiration) => { + self.process_expiration(expiration); + + self.set_elapsed(expiration.deadline); + } + None => { + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); + break; + } + } + } + + self.pending.pop_back() + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option<Expiration> { + if !self.pending.is_empty() { + // Expire immediately as we have things pending firing + return Some(Expiration { + level: 0, + slot: 0, + deadline: self.elapsed, + }); + } + + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + + return Some(expiration); + } + } + + None + } + + /// Returns the tick at which this timer wheel next needs to perform some + /// processing, or None if there are no timers registered. + pub(super) fn next_expiration_time(&self) -> Option<u64> { + self.next_expiration().map(|ex| ex.deadline) + } + + /// Used for debug assertions + fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { + let mut res = true; + + for l2 in start_level..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < before { + res = false; + } + } + } + + res + } + + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// queue it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn process_expiration(&mut self, expiration: &Expiration) { + // Note that we need to take _all_ of the entries off the list before + // processing any of them. This is important because it's possible that + // those entries might need to be reinserted into the same slot. + // + // This happens only on the highest level, when an entry is inserted + // more than MAX_DURATION into the future. When this happens, we wrap + // around, and process some entries a multiple of MAX_DURATION before + // they actually need to be dropped down a level. We then reinsert them + // back into the same position; we must make sure we don't then process + // those entries again or we'll end up in an infinite loop. + let mut entries = self.take_entries(expiration); + + while let Some(item) = entries.pop_back() { + if expiration.level == 0 { + debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline); + } + + // Try to expire the entry; this is cheap (doesn't synchronize) if + // the timer is not expired, and updates cached_when. + match unsafe { item.mark_pending(expiration.deadline) } { + Ok(()) => { + // Item was expired + self.pending.push_front(item); + } + Err(expiration_tick) => { + let level = level_for(expiration.deadline, expiration_tick); + unsafe { + self.levels[level].add_entry(item); + } + } + } + } + } + + fn set_elapsed(&mut self, when: u64) { + assert!( + self.elapsed <= when, + "elapsed={:?}; when={:?}", + self.elapsed, + when + ); + + if when > self.elapsed { + self.elapsed = when; + } + } + + /// Obtains the list of entries that need processing for the given expiration. + /// + fn take_entries(&mut self, expiration: &Expiration) -> EntryList { + self.levels[expiration.level].take_slot(expiration.slot) + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; + + if masked >= MAX_DURATION { + // Fudge the timer into the top level + masked = MAX_DURATION - 1; + } + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + + significant / 6 +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 0..64 { + assert_eq!( + 0, + level_for(0, pos), + "level_for({}) -- binary = {:b}", + pos, + pos + ); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + + if pos > level { + let a = a - 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + + if pos < 64 { + let a = a + 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs new file mode 100644 index 0000000000..80651c309e --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs @@ -0,0 +1,112 @@ +use super::{Item, OwnedItem}; +use crate::time::driver::Entry; + +use std::ptr; + +/// A doubly linked stack. +#[derive(Debug)] +pub(crate) struct Stack { + head: Option<OwnedItem>, +} + +impl Default for Stack { + fn default() -> Stack { + Stack { head: None } + } +} + +impl Stack { + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } + + pub(crate) fn push(&mut self, entry: OwnedItem) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pops an item from the stack. + pub(crate) fn pop(&mut self) -> Option<OwnedItem> { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + pub(crate) fn remove(&mut self, entry: &Item) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } + + contains + }); + + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); + + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } + + // Unlink `entry` from the prev node + + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } + + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } +} |