summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/time/driver/wheel
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/time/driver/wheel
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/time/driver/wheel')
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/level.rs276
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/mod.rs359
-rw-r--r--third_party/rust/tokio/src/time/driver/wheel/stack.rs112
3 files changed, 747 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs
new file mode 100644
index 0000000000..878754177b
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs
@@ -0,0 +1,276 @@
+use crate::time::driver::TimerHandle;
+
+use crate::time::driver::{EntryList, TimerShared};
+
+use std::{fmt, ptr::NonNull};
+
+/// Wheel for a single level in the timer. This wheel contains 64 slots.
+pub(crate) struct Level {
+ level: usize,
+
+ /// Bit field tracking which slots currently contain entries.
+ ///
+ /// Using a bit field to track slots that contain entries allows avoiding a
+ /// scan to find entries. This field is updated when entries are added or
+ /// removed from a slot.
+ ///
+ /// The least-significant bit represents slot zero.
+ occupied: u64,
+
+ /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell.
+ slot: [EntryList; LEVEL_MULT],
+}
+
+/// Indicates when a slot must be processed next.
+#[derive(Debug)]
+pub(crate) struct Expiration {
+ /// The level containing the slot.
+ pub(crate) level: usize,
+
+ /// The slot index.
+ pub(crate) slot: usize,
+
+ /// The instant at which the slot needs to be processed.
+ pub(crate) deadline: u64,
+}
+
+/// Level multiplier.
+///
+/// Being a power of 2 is very important.
+const LEVEL_MULT: usize = 64;
+
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = EntryList::default;
+
+ Level {
+ level,
+ occupied: 0,
+ slot: [
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ],
+ }
+ }
+
+ /// Finds the slot that needs to be processed next and returns the slot and
+ /// `Instant` at which this slot must be processed.
+ pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
+ // Use the `occupied` bit field to get the index of the next slot that
+ // needs to be processed.
+ let slot = match self.next_occupied_slot(now) {
+ Some(slot) => slot,
+ None => return None,
+ };
+
+ // From the slot index, calculate the `Instant` at which it needs to be
+ // processed. This value *must* be in the future with respect to `now`.
+
+ let level_range = level_range(self.level);
+ let slot_range = slot_range(self.level);
+
+ // Compute the start date of the current level by masking the low bits
+ // of `now` (`level_range` is a power of 2).
+ let level_start = now & !(level_range - 1);
+ let mut deadline = level_start + slot as u64 * slot_range;
+
+ if deadline <= now {
+ // A timer is in a slot "prior" to the current time. This can occur
+ // because we do not have an infinite hierarchy of timer levels, and
+ // eventually a timer scheduled for a very distant time might end up
+ // being placed in a slot that is beyond the end of all of the
+ // arrays.
+ //
+ // To deal with this, we first limit timers to being scheduled no
+ // more than MAX_DURATION ticks in the future; that is, they're at
+ // most one rotation of the top level away. Then, we force timers
+ // that logically would go into the top+1 level, to instead go into
+ // the top level's slots.
+ //
+ // What this means is that the top level's slots act as a
+ // pseudo-ring buffer, and we rotate around them indefinitely. If we
+ // compute a deadline before now, and it's the top level, it
+ // therefore means we're actually looking at a slot in the future.
+ debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
+
+ deadline += level_range;
+ }
+
+ debug_assert!(
+ deadline >= now,
+ "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}",
+ deadline,
+ now,
+ self.level,
+ level_range,
+ slot_range,
+ slot,
+ self.occupied
+ );
+
+ Some(Expiration {
+ level: self.level,
+ slot,
+ deadline,
+ })
+ }
+
+ fn next_occupied_slot(&self, now: u64) -> Option<usize> {
+ if self.occupied == 0 {
+ return None;
+ }
+
+ // Get the slot for now using Maths
+ let now_slot = (now / slot_range(self.level)) as usize;
+ let occupied = self.occupied.rotate_right(now_slot as u32);
+ let zeros = occupied.trailing_zeros() as usize;
+ let slot = (zeros + now_slot) % 64;
+
+ Some(slot)
+ }
+
+ pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
+ let slot = slot_for(item.cached_when(), self.level);
+
+ self.slot[slot].push_front(item);
+
+ self.occupied |= occupied_bit(slot);
+ }
+
+ pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
+ let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
+
+ unsafe { self.slot[slot].remove(item) };
+ if self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ // Unset the bit
+ self.occupied ^= occupied_bit(slot);
+ }
+ }
+
+ pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
+ self.occupied &= !occupied_bit(slot);
+
+ std::mem::take(&mut self.slot[slot])
+ }
+}
+
+impl fmt::Debug for Level {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Level")
+ .field("occupied", &self.occupied)
+ .finish()
+ }
+}
+
+fn occupied_bit(slot: usize) -> u64 {
+ 1 << slot
+}
+
+fn slot_range(level: usize) -> u64 {
+ LEVEL_MULT.pow(level as u32) as u64
+}
+
+fn level_range(level: usize) -> u64 {
+ LEVEL_MULT as u64 * slot_range(level)
+}
+
+/// Converts a duration (milliseconds) and a level to a slot position.
+fn slot_for(duration: u64, level: usize) -> usize {
+ ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_slot_for() {
+ for pos in 0..64 {
+ assert_eq!(pos as usize, slot_for(pos, 0));
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(pos as usize, slot_for(a as u64, level));
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
new file mode 100644
index 0000000000..f088f2cfd6
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs
@@ -0,0 +1,359 @@
+use crate::time::driver::{TimerHandle, TimerShared};
+use crate::time::error::InsertError;
+
+mod level;
+pub(crate) use self::level::Expiration;
+use self::level::Level;
+
+use std::ptr::NonNull;
+
+use super::EntryList;
+
+/// Timing wheel implementation.
+///
+/// This type provides the hashed timing wheel implementation that backs `Timer`
+/// and `DelayQueue`.
+///
+/// The structure is generic over `T: Stack`. This allows handling timeout data
+/// being stored on the heap or in a slab. In order to support the latter case,
+/// the slab must be passed into each function allowing the implementation to
+/// lookup timer entries.
+///
+/// See `Timer` documentation for some implementation notes.
+#[derive(Debug)]
+pub(crate) struct Wheel {
+ /// The number of milliseconds elapsed since the wheel started.
+ elapsed: u64,
+
+ /// Timer wheel.
+ ///
+ /// Levels:
+ ///
+ /// * 1 ms slots / 64 ms range
+ /// * 64 ms slots / ~ 4 sec range
+ /// * ~ 4 sec slots / ~ 4 min range
+ /// * ~ 4 min slots / ~ 4 hr range
+ /// * ~ 4 hr slots / ~ 12 day range
+ /// * ~ 12 day slots / ~ 2 yr range
+ levels: Vec<Level>,
+
+ /// Entries queued for firing
+ pending: EntryList,
+}
+
+/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
+/// each, the timer is able to track time up to 2 years into the future with a
+/// precision of 1 millisecond.
+const NUM_LEVELS: usize = 6;
+
+/// The maximum duration of a `Sleep`.
+pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
+
+impl Wheel {
+ /// Creates a new timing wheel.
+ pub(crate) fn new() -> Wheel {
+ let levels = (0..NUM_LEVELS).map(Level::new).collect();
+
+ Wheel {
+ elapsed: 0,
+ levels,
+ pending: EntryList::new(),
+ }
+ }
+
+ /// Returns the number of milliseconds that have elapsed since the timing
+ /// wheel's creation.
+ pub(crate) fn elapsed(&self) -> u64 {
+ self.elapsed
+ }
+
+ /// Inserts an entry into the timing wheel.
+ ///
+ /// # Arguments
+ ///
+ /// * `item`: The item to insert into the wheel.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
+ ///
+ /// `Err(Elapsed)` indicates that `when` represents an instant that has
+ /// already passed. In this case, the caller should fire the timeout
+ /// immediately.
+ ///
+ /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
+ ///
+ /// # Safety
+ ///
+ /// This function registers item into an intrusive linked list. The caller
+ /// must ensure that `item` is pinned and will not be dropped without first
+ /// being deregistered.
+ pub(crate) unsafe fn insert(
+ &mut self,
+ item: TimerHandle,
+ ) -> Result<u64, (TimerHandle, InsertError)> {
+ let when = item.sync_when();
+
+ if when <= self.elapsed {
+ return Err((item, InsertError::Elapsed));
+ }
+
+ // Get the level at which the entry should be stored
+ let level = self.level_for(when);
+
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+
+ debug_assert!({
+ self.levels[level]
+ .next_expiration(self.elapsed)
+ .map(|e| e.deadline >= self.elapsed)
+ .unwrap_or(true)
+ });
+
+ Ok(when)
+ }
+
+ /// Removes `item` from the timing wheel.
+ pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
+ unsafe {
+ let when = item.as_ref().cached_when();
+ if when == u64::MAX {
+ self.pending.remove(item);
+ } else {
+ debug_assert!(
+ self.elapsed <= when,
+ "elapsed={}; when={}",
+ self.elapsed,
+ when
+ );
+
+ let level = self.level_for(when);
+
+ self.levels[level].remove_entry(item);
+ }
+ }
+ }
+
+ /// Instant at which to poll.
+ pub(crate) fn poll_at(&self) -> Option<u64> {
+ self.next_expiration().map(|expiration| expiration.deadline)
+ }
+
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
+ loop {
+ if let Some(handle) = self.pending.pop_back() {
+ return Some(handle);
+ }
+
+ // under what circumstances is poll.expiration Some vs. None?
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
+
+ match expiration {
+ Some(ref expiration) if expiration.deadline > now => return None,
+ Some(ref expiration) => {
+ self.process_expiration(expiration);
+
+ self.set_elapsed(expiration.deadline);
+ }
+ None => {
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
+ break;
+ }
+ }
+ }
+
+ self.pending.pop_back()
+ }
+
+ /// Returns the instant at which the next timeout expires.
+ fn next_expiration(&self) -> Option<Expiration> {
+ if !self.pending.is_empty() {
+ // Expire immediately as we have things pending firing
+ return Some(Expiration {
+ level: 0,
+ slot: 0,
+ deadline: self.elapsed,
+ });
+ }
+
+ // Check all levels
+ for level in 0..NUM_LEVELS {
+ if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
+ // There cannot be any expirations at a higher level that happen
+ // before this one.
+ debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
+
+ return Some(expiration);
+ }
+ }
+
+ None
+ }
+
+ /// Returns the tick at which this timer wheel next needs to perform some
+ /// processing, or None if there are no timers registered.
+ pub(super) fn next_expiration_time(&self) -> Option<u64> {
+ self.next_expiration().map(|ex| ex.deadline)
+ }
+
+ /// Used for debug assertions
+ fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
+ let mut res = true;
+
+ for l2 in start_level..NUM_LEVELS {
+ if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
+ if e2.deadline < before {
+ res = false;
+ }
+ }
+ }
+
+ res
+ }
+
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// queue it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
+ pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
+ // Note that we need to take _all_ of the entries off the list before
+ // processing any of them. This is important because it's possible that
+ // those entries might need to be reinserted into the same slot.
+ //
+ // This happens only on the highest level, when an entry is inserted
+ // more than MAX_DURATION into the future. When this happens, we wrap
+ // around, and process some entries a multiple of MAX_DURATION before
+ // they actually need to be dropped down a level. We then reinsert them
+ // back into the same position; we must make sure we don't then process
+ // those entries again or we'll end up in an infinite loop.
+ let mut entries = self.take_entries(expiration);
+
+ while let Some(item) = entries.pop_back() {
+ if expiration.level == 0 {
+ debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
+ }
+
+ // Try to expire the entry; this is cheap (doesn't synchronize) if
+ // the timer is not expired, and updates cached_when.
+ match unsafe { item.mark_pending(expiration.deadline) } {
+ Ok(()) => {
+ // Item was expired
+ self.pending.push_front(item);
+ }
+ Err(expiration_tick) => {
+ let level = level_for(expiration.deadline, expiration_tick);
+ unsafe {
+ self.levels[level].add_entry(item);
+ }
+ }
+ }
+ }
+ }
+
+ fn set_elapsed(&mut self, when: u64) {
+ assert!(
+ self.elapsed <= when,
+ "elapsed={:?}; when={:?}",
+ self.elapsed,
+ when
+ );
+
+ if when > self.elapsed {
+ self.elapsed = when;
+ }
+ }
+
+ /// Obtains the list of entries that need processing for the given expiration.
+ ///
+ fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
+ self.levels[expiration.level].take_slot(expiration.slot)
+ }
+
+ fn level_for(&self, when: u64) -> usize {
+ level_for(self.elapsed, when)
+ }
+}
+
+fn level_for(elapsed: u64, when: u64) -> usize {
+ const SLOT_MASK: u64 = (1 << 6) - 1;
+
+ // Mask in the trailing bits ignored by the level calculation in order to cap
+ // the possible leading zeros
+ let mut masked = elapsed ^ when | SLOT_MASK;
+
+ if masked >= MAX_DURATION {
+ // Fudge the timer into the top level
+ masked = MAX_DURATION - 1;
+ }
+
+ let leading_zeros = masked.leading_zeros() as usize;
+ let significant = 63 - leading_zeros;
+
+ significant / 6
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_level_for() {
+ for pos in 0..64 {
+ assert_eq!(
+ 0,
+ level_for(0, pos),
+ "level_for({}) -- binary = {:b}",
+ pos,
+ pos
+ );
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+
+ if pos > level {
+ let a = a - 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+
+ if pos < 64 {
+ let a = a + 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
new file mode 100644
index 0000000000..80651c309e
--- /dev/null
+++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs
@@ -0,0 +1,112 @@
+use super::{Item, OwnedItem};
+use crate::time::driver::Entry;
+
+use std::ptr;
+
+/// A doubly linked stack.
+#[derive(Debug)]
+pub(crate) struct Stack {
+ head: Option<OwnedItem>,
+}
+
+impl Default for Stack {
+ fn default() -> Stack {
+ Stack { head: None }
+ }
+}
+
+impl Stack {
+ pub(crate) fn is_empty(&self) -> bool {
+ self.head.is_none()
+ }
+
+ pub(crate) fn push(&mut self, entry: OwnedItem) {
+ // Get a pointer to the entry to for the prev link
+ let ptr: *const Entry = &*entry as *const _;
+
+ // Remove the old head entry
+ let old = self.head.take();
+
+ unsafe {
+ // Ensure the entry is not already in a stack.
+ debug_assert!((*entry.next_stack.get()).is_none());
+ debug_assert!((*entry.prev_stack.get()).is_null());
+
+ if let Some(ref entry) = old.as_ref() {
+ debug_assert!({
+ // The head is not already set to the entry
+ ptr != &***entry as *const _
+ });
+
+ // Set the previous link on the old head
+ *entry.prev_stack.get() = ptr;
+ }
+
+ // Set this entry's next pointer
+ *entry.next_stack.get() = old;
+ }
+
+ // Update the head pointer
+ self.head = Some(entry);
+ }
+
+ /// Pops an item from the stack.
+ pub(crate) fn pop(&mut self) -> Option<OwnedItem> {
+ let entry = self.head.take();
+
+ unsafe {
+ if let Some(entry) = entry.as_ref() {
+ self.head = (*entry.next_stack.get()).take();
+
+ if let Some(entry) = self.head.as_ref() {
+ *entry.prev_stack.get() = ptr::null();
+ }
+
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+
+ entry
+ }
+
+ pub(crate) fn remove(&mut self, entry: &Item) {
+ unsafe {
+ // Ensure that the entry is in fact contained by the stack
+ debug_assert!({
+ // This walks the full linked list even if an entry is found.
+ let mut next = self.head.as_ref();
+ let mut contains = false;
+
+ while let Some(n) = next {
+ if entry as *const _ == &**n as *const _ {
+ debug_assert!(!contains);
+ contains = true;
+ }
+
+ next = (*n.next_stack.get()).as_ref();
+ }
+
+ contains
+ });
+
+ // Unlink `entry` from the next node
+ let next = (*entry.next_stack.get()).take();
+
+ if let Some(next) = next.as_ref() {
+ (*next.prev_stack.get()) = *entry.prev_stack.get();
+ }
+
+ // Unlink `entry` from the prev node
+
+ if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
+ *prev.next_stack.get() = next;
+ } else {
+ // It is the head
+ self.head = next;
+ }
+
+ // Unset the prev pointer
+ *entry.prev_stack.get() = ptr::null();
+ }
+ }
+}