use atomic::AtomicU64; use timer::{HandlePriv, Inner}; use Error; use crossbeam_utils::CachePadded; use futures::task::AtomicTask; use futures::Poll; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use std::u64; /// Internal state shared between a `Delay` instance and the timer. /// /// This struct is used as a node in two intrusive data structures: /// /// * An atomic stack used to signal to the timer thread that the entry state /// has changed. The timer thread will observe the entry on this stack and /// perform any actions as necessary. /// /// * A doubly linked list used **only** by the timer thread. Each slot in the /// timer wheel is a head pointer to the list of entries that must be /// processed during that timer tick. #[derive(Debug)] pub(crate) struct Entry { /// Only accessed from `Registration`. time: CachePadded>, /// Timer internals. Using a weak pointer allows the timer to shutdown /// without all `Delay` instances having completed. /// /// When `None`, the entry has not yet been linked with a timer instance. inner: Option>, /// Tracks the entry state. This value contains the following information: /// /// * The deadline at which the entry must be "fired". /// * A flag indicating if the entry has already been fired. /// * Whether or not the entry transitioned to the error state. /// /// When an `Entry` is created, `state` is initialized to the instant at /// which the entry must be fired. When a timer is reset to a different /// instant, this value is changed. state: AtomicU64, /// Task to notify once the deadline is reached. task: AtomicTask, /// True when the entry is queued in the "process" stack. This value /// is set before pushing the value and unset after popping the value. /// /// TODO: This could possibly be rolled up into `state`. pub(super) queued: AtomicBool, /// Next entry in the "process" linked list. /// /// Access to this field is coordinated by the `queued` flag. /// /// Represents a strong Arc ref. pub(super) next_atomic: UnsafeCell<*mut Entry>, /// When the entry expires, relative to the `start` of the timer /// (Inner::start). This is only used by the timer. /// /// A `Delay` instance can be reset to a different deadline by the thread /// that owns the `Delay` instance. In this case, the timer thread will not /// immediately know that this has happened. The timer thread must know the /// last deadline that it saw as it uses this value to locate the entry in /// its wheel. /// /// Once the timer thread observes that the instant has changed, it updates /// the wheel and sets this value. The idea is that this value eventually /// converges to the value of `state` as the timer thread makes updates. when: UnsafeCell>, /// Next entry in the State's linked list. /// /// This is only accessed by the timer pub(super) next_stack: UnsafeCell>>, /// Previous entry in the State's linked list. /// /// This is only accessed by the timer and is used to unlink a canceled /// entry. /// /// This is a weak reference. pub(super) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. #[derive(Debug)] pub(crate) struct Time { pub(crate) deadline: Instant, pub(crate) duration: Duration, } /// Flag indicating a timer entry has elapsed const ELAPSED: u64 = 1 << 63; /// Flag indicating a timer entry has reached an error state const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { pub fn new(deadline: Instant, duration: Duration) -> Entry { Entry { time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })), inner: None, task: AtomicTask::new(), state: AtomicU64::new(0), queued: AtomicBool::new(false), next_atomic: UnsafeCell::new(ptr::null_mut()), when: UnsafeCell::new(None), next_stack: UnsafeCell::new(None), prev_stack: UnsafeCell::new(ptr::null_mut()), } } /// Only called by `Registration` pub fn time_ref(&self) -> &Time { unsafe { &*self.time.get() } } /// Only called by `Registration` pub fn time_mut(&self) -> &mut Time { unsafe { &mut *self.time.get() } } /// Returns `true` if the `Entry` is currently associated with a timer /// instance. pub fn is_registered(&self) -> bool { self.inner.is_some() } /// Only called by `Registration` pub fn register(me: &mut Arc) { let handle = match HandlePriv::try_current() { Ok(handle) => handle, Err(_) => { // Could not associate the entry with a timer, transition the // state to error Arc::get_mut(me).unwrap().transition_to_error(); return; } }; Entry::register_with(me, handle) } /// Only called by `Registration` pub fn register_with(me: &mut Arc, handle: HandlePriv) { assert!(!me.is_registered(), "only register an entry once"); let deadline = me.time_ref().deadline; let inner = match handle.inner() { Some(inner) => inner, None => { // Could not associate the entry with a timer, transition the // state to error Arc::get_mut(me).unwrap().transition_to_error(); return; } }; // Increment the number of active timeouts if inner.increment().is_err() { Arc::get_mut(me).unwrap().transition_to_error(); return; } // Associate the entry with the timer Arc::get_mut(me).unwrap().inner = Some(handle.into_inner()); let when = inner.normalize_deadline(deadline); // Relaxed OK: At this point, there are no other threads that have // access to this entry. if when <= inner.elapsed() { me.state.store(ELAPSED, Relaxed); return; } else { me.state.store(when, Relaxed); } if inner.queue(me).is_err() { // The timer has shutdown, transition the entry to the error state. me.error(); } } fn transition_to_error(&mut self) { self.inner = Some(Weak::new()); self.state = AtomicU64::new(ERROR); } /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub fn when_internal(&self) -> Option { unsafe { (*self.when.get()) } } pub fn set_when_internal(&self, when: Option) { unsafe { (*self.when.get()) = when; } } /// Called by `Timer` to load the current value of `state` for processing pub fn load_state(&self) -> Option { let state = self.state.load(SeqCst); if is_elapsed(state) { None } else { Some(state) } } pub fn is_elapsed(&self) -> bool { let state = self.state.load(SeqCst); is_elapsed(state) } pub fn fire(&self, when: u64) { let mut curr = self.state.load(SeqCst); loop { if is_elapsed(curr) || curr > when { return; } let next = ELAPSED | curr; let actual = self.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } self.task.notify(); } pub fn error(&self) { // Only transition to the error state if not currently elapsed let mut curr = self.state.load(SeqCst); loop { if is_elapsed(curr) { return; } let next = ERROR; let actual = self.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } self.task.notify(); } pub fn cancel(entry: &Arc) { let state = entry.state.fetch_or(ELAPSED, SeqCst); if is_elapsed(state) { // Nothing more to do return; } // If registered with a timer instance, try to upgrade the Arc. let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; let _ = inner.queue(entry); } pub fn poll_elapsed(&self) -> Poll<(), Error> { use futures::Async::NotReady; let mut curr = self.state.load(SeqCst); if is_elapsed(curr) { if curr == ERROR { return Err(Error::shutdown()); } else { return Ok(().into()); } } self.task.register(); curr = self.state.load(SeqCst).into(); if is_elapsed(curr) { if curr == ERROR { return Err(Error::shutdown()); } else { return Ok(().into()); } } Ok(NotReady) } /// Only called by `Registration` pub fn reset(entry: &mut Arc) { if !entry.is_registered() { return; } let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; let deadline = entry.time_ref().deadline; let when = inner.normalize_deadline(deadline); let elapsed = inner.elapsed(); let mut curr = entry.state.load(SeqCst); let mut notify; loop { // In these two cases, there is no work to do when resetting the // timer. If the `Entry` is in an error state, then it cannot be // used anymore. If resetting the entry to the current value, then // the reset is a noop. if curr == ERROR || curr == when { return; } let next; if when <= elapsed { next = ELAPSED; notify = !is_elapsed(curr); } else { next = when; notify = true; } let actual = entry.state.compare_and_swap(curr, next, SeqCst); if curr == actual { break; } curr = actual; } if notify { let _ = inner.queue(entry); } } fn upgrade_inner(&self) -> Option> { self.inner.as_ref().and_then(|inner| inner.upgrade()) } } fn is_elapsed(state: u64) -> bool { state & ELAPSED == ELAPSED } impl Drop for Entry { fn drop(&mut self) { let inner = match self.upgrade_inner() { Some(inner) => inner, None => return, }; inner.decrement(); } } unsafe impl Send for Entry {} unsafe impl Sync for Entry {}