use crate::loom::sync::atomic::AtomicU64; use crate::sync::AtomicWaker; use crate::time::driver::{Handle, Inner}; use crate::time::{Duration, Error, Instant}; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Weak}; use std::task::{self, Poll}; use std::u64; /// Internal state shared between a `Delay` instance and the timer. /// /// This struct is used as a node in two intrusive data structures: /// /// * An atomic stack used to signal to the timer thread that the entry state /// has changed. The timer thread will observe the entry on this stack and /// perform any actions as necessary. /// /// * A doubly linked list used **only** by the timer thread. Each slot in the /// timer wheel is a head pointer to the list of entries that must be /// processed during that timer tick. #[derive(Debug)] pub(crate) struct Entry { /// Only accessed from `Registration`. time: CachePadded>, /// Timer internals. Using a weak pointer allows the timer to shutdown /// without all `Delay` instances having completed. /// /// When `None`, the entry has not yet been linked with a timer instance. inner: Weak, /// Tracks the entry state. This value contains the following information: /// /// * The deadline at which the entry must be "fired". /// * A flag indicating if the entry has already been fired. /// * Whether or not the entry transitioned to the error state. /// /// When an `Entry` is created, `state` is initialized to the instant at /// which the entry must be fired. When a timer is reset to a different /// instant, this value is changed. state: AtomicU64, /// Task to notify once the deadline is reached. waker: AtomicWaker, /// True when the entry is queued in the "process" stack. This value /// is set before pushing the value and unset after popping the value. /// /// TODO: This could possibly be rolled up into `state`. pub(super) queued: AtomicBool, /// Next entry in the "process" linked list. /// /// Access to this field is coordinated by the `queued` flag. /// /// Represents a strong Arc ref. pub(super) next_atomic: UnsafeCell<*mut Entry>, /// When the entry expires, relative to the `start` of the timer /// (Inner::start). This is only used by the timer. /// /// A `Delay` instance can be reset to a different deadline by the thread /// that owns the `Delay` instance. In this case, the timer thread will not /// immediately know that this has happened. The timer thread must know the /// last deadline that it saw as it uses this value to locate the entry in /// its wheel. /// /// Once the timer thread observes that the instant has changed, it updates /// the wheel and sets this value. The idea is that this value eventually /// converges to the value of `state` as the timer thread makes updates. when: UnsafeCell>, /// Next entry in the State's linked list. /// /// This is only accessed by the timer pub(super) next_stack: UnsafeCell>>, /// Previous entry in the State's linked list. /// /// This is only accessed by the timer and is used to unlink a canceled /// entry. /// /// This is a weak reference. pub(super) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. #[derive(Debug)] pub(crate) struct Time { pub(crate) deadline: Instant, pub(crate) duration: Duration, } /// Flag indicating a timer entry has elapsed const ELAPSED: u64 = 1 << 63; /// Flag indicating a timer entry has reached an error state const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc { let inner = handle.inner().unwrap(); let entry: Entry; // Increment the number of active timeouts if inner.increment().is_err() { entry = Entry::new2(deadline, duration, Weak::new(), ERROR) } else { let when = inner.normalize_deadline(deadline); let state = if when <= inner.elapsed() { ELAPSED } else { when }; entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); } let entry = Arc::new(entry); if inner.queue(&entry).is_err() { entry.error(); } entry } /// Only called by `Registration` pub(crate) fn time_ref(&self) -> &Time { unsafe { &*self.time.0.get() } } /// Only called by `Registration` #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281 pub(crate) unsafe fn time_mut(&self) -> &mut Time { &mut *self.time.0.get() } /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option { unsafe { *self.when.get() } } pub(crate) fn set_when_internal(&self, when: Option) { unsafe { *self.when.get() = when; } } /// Called by `Timer` to load the current value of `state` for processing pub(crate) fn load_state(&self) -> Option { let state = self.state.load(SeqCst); if is_elapsed(state) { None } else { Some(state) } } pub(crate) fn is_elapsed(&self) -> bool { let state = self.state.load(SeqCst); is_elapsed(state) } pub(crate) fn fire(&self, when: u64) { let mut curr = self.state.load(SeqCst); loop { if is_elapsed(curr) || curr > when { return; } let next = ELAPSED | curr; let actual = self.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } self.waker.wake(); } pub(crate) fn error(&self) { // Only transition to the error state if not currently elapsed let mut curr = self.state.load(SeqCst); loop { if is_elapsed(curr) { return; } let next = ERROR; let actual = self.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } self.waker.wake(); } pub(crate) fn cancel(entry: &Arc) { let state = entry.state.fetch_or(ELAPSED, SeqCst); if is_elapsed(state) { // Nothing more to do return; } // If registered with a timer instance, try to upgrade the Arc. let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; let _ = inner.queue(entry); } pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { let mut curr = self.state.load(SeqCst); if is_elapsed(curr) { return Poll::Ready(if curr == ERROR { Err(Error::shutdown()) } else { Ok(()) }); } self.waker.register_by_ref(cx.waker()); curr = self.state.load(SeqCst); if is_elapsed(curr) { return Poll::Ready(if curr == ERROR { Err(Error::shutdown()) } else { Ok(()) }); } Poll::Pending } /// Only called by `Registration` pub(crate) fn reset(entry: &mut Arc) { let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; let deadline = entry.time_ref().deadline; let when = inner.normalize_deadline(deadline); let elapsed = inner.elapsed(); let mut curr = entry.state.load(SeqCst); let mut notify; loop { // In these two cases, there is no work to do when resetting the // timer. If the `Entry` is in an error state, then it cannot be // used anymore. If resetting the entry to the current value, then // the reset is a noop. if curr == ERROR || curr == when { return; } let next; if when <= elapsed { next = ELAPSED; notify = !is_elapsed(curr); } else { next = when; notify = true; } let actual = entry.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } if notify { let _ = inner.queue(entry); } } fn new2(deadline: Instant, duration: Duration, inner: Weak, state: u64) -> Self { Self { time: CachePadded(UnsafeCell::new(Time { deadline, duration })), inner, waker: AtomicWaker::new(), state: AtomicU64::new(state), queued: AtomicBool::new(false), next_atomic: UnsafeCell::new(ptr::null_mut()), when: UnsafeCell::new(None), next_stack: UnsafeCell::new(None), prev_stack: UnsafeCell::new(ptr::null_mut()), } } fn upgrade_inner(&self) -> Option> { self.inner.upgrade() } } fn is_elapsed(state: u64) -> bool { state & ELAPSED == ELAPSED } impl Drop for Entry { fn drop(&mut self) { let inner = match self.upgrade_inner() { Some(inner) => inner, None => return, }; inner.decrement(); } } unsafe impl Send for Entry {} unsafe impl Sync for Entry {} #[cfg_attr(target_arch = "x86_64", repr(align(128)))] #[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] #[derive(Debug)] struct CachePadded(T);