diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-timer/src/timer/entry.rs | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-timer/src/timer/entry.rs')
-rw-r--r-- | third_party/rust/tokio-timer/src/timer/entry.rs | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/src/timer/entry.rs b/third_party/rust/tokio-timer/src/timer/entry.rs new file mode 100644 index 0000000000..40979afaec --- /dev/null +++ b/third_party/rust/tokio-timer/src/timer/entry.rs @@ -0,0 +1,394 @@ +use atomic::AtomicU64; +use timer::{HandlePriv, Inner}; +use Error; + +use crossbeam_utils::CachePadded; +use futures::task::AtomicTask; +use futures::Poll; + +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; +use std::u64; + +/// Internal state shared between a `Delay` instance and the timer. +/// +/// This struct is used as a node in two intrusive data structures: +/// +/// * An atomic stack used to signal to the timer thread that the entry state +/// has changed. The timer thread will observe the entry on this stack and +/// perform any actions as necessary. +/// +/// * A doubly linked list used **only** by the timer thread. Each slot in the +/// timer wheel is a head pointer to the list of entries that must be +/// processed during that timer tick. +#[derive(Debug)] +pub(crate) struct Entry { + /// Only accessed from `Registration`. + time: CachePadded<UnsafeCell<Time>>, + + /// Timer internals. Using a weak pointer allows the timer to shutdown + /// without all `Delay` instances having completed. + /// + /// When `None`, the entry has not yet been linked with a timer instance. + inner: Option<Weak<Inner>>, + + /// Tracks the entry state. This value contains the following information: + /// + /// * The deadline at which the entry must be "fired". + /// * A flag indicating if the entry has already been fired. + /// * Whether or not the entry transitioned to the error state. + /// + /// When an `Entry` is created, `state` is initialized to the instant at + /// which the entry must be fired. When a timer is reset to a different + /// instant, this value is changed. + state: AtomicU64, + + /// Task to notify once the deadline is reached. + task: AtomicTask, + + /// True when the entry is queued in the "process" stack. This value + /// is set before pushing the value and unset after popping the value. + /// + /// TODO: This could possibly be rolled up into `state`. + pub(super) queued: AtomicBool, + + /// Next entry in the "process" linked list. + /// + /// Access to this field is coordinated by the `queued` flag. + /// + /// Represents a strong Arc ref. + pub(super) next_atomic: UnsafeCell<*mut Entry>, + + /// When the entry expires, relative to the `start` of the timer + /// (Inner::start). This is only used by the timer. + /// + /// A `Delay` instance can be reset to a different deadline by the thread + /// that owns the `Delay` instance. In this case, the timer thread will not + /// immediately know that this has happened. The timer thread must know the + /// last deadline that it saw as it uses this value to locate the entry in + /// its wheel. + /// + /// Once the timer thread observes that the instant has changed, it updates + /// the wheel and sets this value. The idea is that this value eventually + /// converges to the value of `state` as the timer thread makes updates. + when: UnsafeCell<Option<u64>>, + + /// Next entry in the State's linked list. + /// + /// This is only accessed by the timer + pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, + + /// Previous entry in the State's linked list. + /// + /// This is only accessed by the timer and is used to unlink a canceled + /// entry. + /// + /// This is a weak reference. + pub(super) prev_stack: UnsafeCell<*const Entry>, +} + +/// Stores the info for `Delay`. +#[derive(Debug)] +pub(crate) struct Time { + pub(crate) deadline: Instant, + pub(crate) duration: Duration, +} + +/// Flag indicating a timer entry has elapsed +const ELAPSED: u64 = 1 << 63; + +/// Flag indicating a timer entry has reached an error state +const ERROR: u64 = u64::MAX; + +// ===== impl Entry ===== + +impl Entry { + pub fn new(deadline: Instant, duration: Duration) -> Entry { + Entry { + time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })), + inner: None, + task: AtomicTask::new(), + state: AtomicU64::new(0), + queued: AtomicBool::new(false), + next_atomic: UnsafeCell::new(ptr::null_mut()), + when: UnsafeCell::new(None), + next_stack: UnsafeCell::new(None), + prev_stack: UnsafeCell::new(ptr::null_mut()), + } + } + + /// Only called by `Registration` + pub fn time_ref(&self) -> &Time { + unsafe { &*self.time.get() } + } + + /// Only called by `Registration` + pub fn time_mut(&self) -> &mut Time { + unsafe { &mut *self.time.get() } + } + + /// Returns `true` if the `Entry` is currently associated with a timer + /// instance. + pub fn is_registered(&self) -> bool { + self.inner.is_some() + } + + /// Only called by `Registration` + pub fn register(me: &mut Arc<Self>) { + let handle = match HandlePriv::try_current() { + Ok(handle) => handle, + Err(_) => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + }; + + Entry::register_with(me, handle) + } + + /// Only called by `Registration` + pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) { + assert!(!me.is_registered(), "only register an entry once"); + + let deadline = me.time_ref().deadline; + + let inner = match handle.inner() { + Some(inner) => inner, + None => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + }; + + // Increment the number of active timeouts + if inner.increment().is_err() { + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + + // Associate the entry with the timer + Arc::get_mut(me).unwrap().inner = Some(handle.into_inner()); + + let when = inner.normalize_deadline(deadline); + + // Relaxed OK: At this point, there are no other threads that have + // access to this entry. + if when <= inner.elapsed() { + me.state.store(ELAPSED, Relaxed); + return; + } else { + me.state.store(when, Relaxed); + } + + if inner.queue(me).is_err() { + // The timer has shutdown, transition the entry to the error state. + me.error(); + } + } + + fn transition_to_error(&mut self) { + self.inner = Some(Weak::new()); + self.state = AtomicU64::new(ERROR); + } + + /// The current entry state as known by the timer. This is not the value of + /// `state`, but lets the timer know how to converge its state to `state`. + pub fn when_internal(&self) -> Option<u64> { + unsafe { (*self.when.get()) } + } + + pub fn set_when_internal(&self, when: Option<u64>) { + unsafe { + (*self.when.get()) = when; + } + } + + /// Called by `Timer` to load the current value of `state` for processing + pub fn load_state(&self) -> Option<u64> { + let state = self.state.load(SeqCst); + + if is_elapsed(state) { + None + } else { + Some(state) + } + } + + pub fn is_elapsed(&self) -> bool { + let state = self.state.load(SeqCst); + is_elapsed(state) + } + + pub fn fire(&self, when: u64) { + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) || curr > when { + return; + } + + let next = ELAPSED | curr; + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.task.notify(); + } + + pub fn error(&self) { + // Only transition to the error state if not currently elapsed + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) { + return; + } + + let next = ERROR; + + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.task.notify(); + } + + pub fn cancel(entry: &Arc<Entry>) { + let state = entry.state.fetch_or(ELAPSED, SeqCst); + + if is_elapsed(state) { + // Nothing more to do + return; + } + + // If registered with a timer instance, try to upgrade the Arc. + let inner = match entry.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + let _ = inner.queue(entry); + } + + pub fn poll_elapsed(&self) -> Poll<(), Error> { + use futures::Async::NotReady; + + let mut curr = self.state.load(SeqCst); + + if is_elapsed(curr) { + if curr == ERROR { + return Err(Error::shutdown()); + } else { + return Ok(().into()); + } + } + + self.task.register(); + + curr = self.state.load(SeqCst).into(); + + if is_elapsed(curr) { + if curr == ERROR { + return Err(Error::shutdown()); + } else { + return Ok(().into()); + } + } + + Ok(NotReady) + } + + /// Only called by `Registration` + pub fn reset(entry: &mut Arc<Entry>) { + if !entry.is_registered() { + return; + } + + let inner = match entry.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + let deadline = entry.time_ref().deadline; + let when = inner.normalize_deadline(deadline); + let elapsed = inner.elapsed(); + + let mut curr = entry.state.load(SeqCst); + let mut notify; + + loop { + // In these two cases, there is no work to do when resetting the + // timer. If the `Entry` is in an error state, then it cannot be + // used anymore. If resetting the entry to the current value, then + // the reset is a noop. + if curr == ERROR || curr == when { + return; + } + + let next; + + if when <= elapsed { + next = ELAPSED; + notify = !is_elapsed(curr); + } else { + next = when; + notify = true; + } + + let actual = entry.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + if notify { + let _ = inner.queue(entry); + } + } + + fn upgrade_inner(&self) -> Option<Arc<Inner>> { + self.inner.as_ref().and_then(|inner| inner.upgrade()) + } +} + +fn is_elapsed(state: u64) -> bool { + state & ELAPSED == ELAPSED +} + +impl Drop for Entry { + fn drop(&mut self) { + let inner = match self.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + inner.decrement(); + } +} + +unsafe impl Send for Entry {} +unsafe impl Sync for Entry {} |