diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/time/driver/wheel | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/time/driver/wheel')
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/level.rs | 276 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/mod.rs | 359 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/driver/wheel/stack.rs | 112 |
3 files changed, 747 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/driver/wheel/level.rs b/third_party/rust/tokio/src/time/driver/wheel/level.rs new file mode 100644 index 0000000000..878754177b --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/level.rs @@ -0,0 +1,276 @@ +use crate::time::driver::TimerHandle; + +use crate::time::driver::{EntryList, TimerShared}; + +use std::{fmt, ptr::NonNull}; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell. + slot: [EntryList; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub(crate) struct Expiration { + /// The level containing the slot. + pub(crate) level: usize, + + /// The slot index. + pub(crate) slot: usize, + + /// The instant at which the slot needs to be processed. + pub(crate) deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = EntryList::default; + + Level { + level, + occupied: 0, + slot: [ + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // Compute the start date of the current level by masking the low bits + // of `now` (`level_range` is a power of 2). + let level_start = now & !(level_range - 1); + let mut deadline = level_start + slot as u64 * slot_range; + + if deadline <= now { + // A timer is in a slot "prior" to the current time. This can occur + // because we do not have an infinite hierarchy of timer levels, and + // eventually a timer scheduled for a very distant time might end up + // being placed in a slot that is beyond the end of all of the + // arrays. + // + // To deal with this, we first limit timers to being scheduled no + // more than MAX_DURATION ticks in the future; that is, they're at + // most one rotation of the top level away. Then, we force timers + // that logically would go into the top+1 level, to instead go into + // the top level's slots. + // + // What this means is that the top level's slots act as a + // pseudo-ring buffer, and we rotate around them indefinitely. If we + // compute a deadline before now, and it's the top level, it + // therefore means we're actually looking at a slot in the future. + debug_assert_eq!(self.level, super::NUM_LEVELS - 1); + + deadline += level_range; + } + + debug_assert!( + deadline >= now, + "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}", + deadline, + now, + self.level, + level_range, + slot_range, + slot, + self.occupied + ); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option<usize> { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) { + let slot = slot_for(item.cached_when(), self.level); + + self.slot[slot].push_front(item); + + self.occupied |= occupied_bit(slot); + } + + pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) { + let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level); + + unsafe { self.slot[slot].remove(item) }; + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList { + self.occupied &= !occupied_bit(slot); + + std::mem::take(&mut self.slot[slot]) + } +} + +impl fmt::Debug for Level { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + 1 << slot +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Converts a duration (milliseconds) and a level to a slot position. +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 0..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/mod.rs b/third_party/rust/tokio/src/time/driver/wheel/mod.rs new file mode 100644 index 0000000000..f088f2cfd6 --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/mod.rs @@ -0,0 +1,359 @@ +use crate::time::driver::{TimerHandle, TimerShared}; +use crate::time::error::InsertError; + +mod level; +pub(crate) use self::level::Expiration; +use self::level::Level; + +use std::ptr::NonNull; + +use super::EntryList; + +/// Timing wheel implementation. +/// +/// This type provides the hashed timing wheel implementation that backs `Timer` +/// and `DelayQueue`. +/// +/// The structure is generic over `T: Stack`. This allows handling timeout data +/// being stored on the heap or in a slab. In order to support the latter case, +/// the slab must be passed into each function allowing the implementation to +/// lookup timer entries. +/// +/// See `Timer` documentation for some implementation notes. +#[derive(Debug)] +pub(crate) struct Wheel { + /// The number of milliseconds elapsed since the wheel started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec<Level>, + + /// Entries queued for firing + pending: EntryList, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a `Sleep`. +pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +impl Wheel { + /// Creates a new timing wheel. + pub(crate) fn new() -> Wheel { + let levels = (0..NUM_LEVELS).map(Level::new).collect(); + + Wheel { + elapsed: 0, + levels, + pending: EntryList::new(), + } + } + + /// Returns the number of milliseconds that have elapsed since the timing + /// wheel's creation. + pub(crate) fn elapsed(&self) -> u64 { + self.elapsed + } + + /// Inserts an entry into the timing wheel. + /// + /// # Arguments + /// + /// * `item`: The item to insert into the wheel. + /// + /// # Return + /// + /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. + /// + /// `Err(Elapsed)` indicates that `when` represents an instant that has + /// already passed. In this case, the caller should fire the timeout + /// immediately. + /// + /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + /// + /// # Safety + /// + /// This function registers item into an intrusive linked list. The caller + /// must ensure that `item` is pinned and will not be dropped without first + /// being deregistered. + pub(crate) unsafe fn insert( + &mut self, + item: TimerHandle, + ) -> Result<u64, (TimerHandle, InsertError)> { + let when = item.sync_when(); + + if when <= self.elapsed { + return Err((item, InsertError::Elapsed)); + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + unsafe { + self.levels[level].add_entry(item); + } + + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + + Ok(when) + } + + /// Removes `item` from the timing wheel. + pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) { + unsafe { + let when = item.as_ref().cached_when(); + if when == u64::MAX { + self.pending.remove(item); + } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + + let level = self.level_for(when); + + self.levels[level].remove_entry(item); + } + } + } + + /// Instant at which to poll. + pub(crate) fn poll_at(&self) -> Option<u64> { + self.next_expiration().map(|expiration| expiration.deadline) + } + + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> { + loop { + if let Some(handle) = self.pending.pop_back() { + return Some(handle); + } + + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); + + match expiration { + Some(ref expiration) if expiration.deadline > now => return None, + Some(ref expiration) => { + self.process_expiration(expiration); + + self.set_elapsed(expiration.deadline); + } + None => { + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); + break; + } + } + } + + self.pending.pop_back() + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option<Expiration> { + if !self.pending.is_empty() { + // Expire immediately as we have things pending firing + return Some(Expiration { + level: 0, + slot: 0, + deadline: self.elapsed, + }); + } + + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + + return Some(expiration); + } + } + + None + } + + /// Returns the tick at which this timer wheel next needs to perform some + /// processing, or None if there are no timers registered. + pub(super) fn next_expiration_time(&self) -> Option<u64> { + self.next_expiration().map(|ex| ex.deadline) + } + + /// Used for debug assertions + fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { + let mut res = true; + + for l2 in start_level..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < before { + res = false; + } + } + } + + res + } + + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// queue it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn process_expiration(&mut self, expiration: &Expiration) { + // Note that we need to take _all_ of the entries off the list before + // processing any of them. This is important because it's possible that + // those entries might need to be reinserted into the same slot. + // + // This happens only on the highest level, when an entry is inserted + // more than MAX_DURATION into the future. When this happens, we wrap + // around, and process some entries a multiple of MAX_DURATION before + // they actually need to be dropped down a level. We then reinsert them + // back into the same position; we must make sure we don't then process + // those entries again or we'll end up in an infinite loop. + let mut entries = self.take_entries(expiration); + + while let Some(item) = entries.pop_back() { + if expiration.level == 0 { + debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline); + } + + // Try to expire the entry; this is cheap (doesn't synchronize) if + // the timer is not expired, and updates cached_when. + match unsafe { item.mark_pending(expiration.deadline) } { + Ok(()) => { + // Item was expired + self.pending.push_front(item); + } + Err(expiration_tick) => { + let level = level_for(expiration.deadline, expiration_tick); + unsafe { + self.levels[level].add_entry(item); + } + } + } + } + } + + fn set_elapsed(&mut self, when: u64) { + assert!( + self.elapsed <= when, + "elapsed={:?}; when={:?}", + self.elapsed, + when + ); + + if when > self.elapsed { + self.elapsed = when; + } + } + + /// Obtains the list of entries that need processing for the given expiration. + /// + fn take_entries(&mut self, expiration: &Expiration) -> EntryList { + self.levels[expiration.level].take_slot(expiration.slot) + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; + + if masked >= MAX_DURATION { + // Fudge the timer into the top level + masked = MAX_DURATION - 1; + } + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + + significant / 6 +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 0..64 { + assert_eq!( + 0, + level_for(0, pos), + "level_for({}) -- binary = {:b}", + pos, + pos + ); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + + if pos > level { + let a = a - 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + + if pos < 64 { + let a = a + 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + } + } + } +} diff --git a/third_party/rust/tokio/src/time/driver/wheel/stack.rs b/third_party/rust/tokio/src/time/driver/wheel/stack.rs new file mode 100644 index 0000000000..80651c309e --- /dev/null +++ b/third_party/rust/tokio/src/time/driver/wheel/stack.rs @@ -0,0 +1,112 @@ +use super::{Item, OwnedItem}; +use crate::time::driver::Entry; + +use std::ptr; + +/// A doubly linked stack. +#[derive(Debug)] +pub(crate) struct Stack { + head: Option<OwnedItem>, +} + +impl Default for Stack { + fn default() -> Stack { + Stack { head: None } + } +} + +impl Stack { + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } + + pub(crate) fn push(&mut self, entry: OwnedItem) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pops an item from the stack. + pub(crate) fn pop(&mut self) -> Option<OwnedItem> { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + pub(crate) fn remove(&mut self, entry: &Item) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } + + contains + }); + + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); + + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } + + // Unlink `entry` from the prev node + + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } + + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } +} |