diff options
Diffstat (limited to 'third_party/rust/tokio/src/time/wheel')
-rw-r--r-- | third_party/rust/tokio/src/time/wheel/level.rs | 255 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/wheel/mod.rs | 314 | ||||
-rw-r--r-- | third_party/rust/tokio/src/time/wheel/stack.rs | 26 |
3 files changed, 595 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/time/wheel/level.rs b/third_party/rust/tokio/src/time/wheel/level.rs new file mode 100644 index 0000000000..49f9bfb9cf --- /dev/null +++ b/third_party/rust/tokio/src/time/wheel/level.rs @@ -0,0 +1,255 @@ +use crate::time::wheel::Stack; + +use std::fmt; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level<T> { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots + slot: [T; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub(crate) struct Expiration { + /// The level containing the slot. + pub(crate) level: usize, + + /// The slot index. + pub(crate) slot: usize, + + /// The instant at which the slot needs to be processed. + pub(crate) deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl<T: Stack> Level<T> { + pub(crate) fn new(level: usize) -> Level<T> { + // Rust's derived implementations for arrays require that the value + // contained by the array be `Copy`. So, here we have to manually + // initialize every single slot. + macro_rules! s { + () => { + T::default() + }; + }; + + Level { + level, + occupied: 0, + slot: [ + // It does not look like the necessary traits are + // derived for [T; 64]. + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // TODO: This can probably be simplified w/ power of 2 math + let level_start = now - (now % level_range); + let deadline = level_start + slot as u64 * slot_range; + + debug_assert!( + deadline >= now, + "deadline={}; now={}; level={}; slot={}; occupied={:b}", + deadline, + now, + self.level, + slot, + self.occupied + ); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option<usize> { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) { + let slot = slot_for(when, self.level); + + self.slot[slot].push(item, store); + self.occupied |= occupied_bit(slot); + } + + pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) { + let slot = slot_for(when, self.level); + + self.slot[slot].remove(item, store); + + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option<T::Owned> { + let ret = self.slot[slot].pop(store); + + if ret.is_some() && self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + self.occupied ^= occupied_bit(slot); + } + + ret + } +} + +impl<T> fmt::Debug for Level<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + 1 << slot +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Convert a duration (milliseconds) and a level to a slot position +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +/* +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 1..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} +*/ diff --git a/third_party/rust/tokio/src/time/wheel/mod.rs b/third_party/rust/tokio/src/time/wheel/mod.rs new file mode 100644 index 0000000000..a2ef27fc6c --- /dev/null +++ b/third_party/rust/tokio/src/time/wheel/mod.rs @@ -0,0 +1,314 @@ +mod level; +pub(crate) use self::level::Expiration; +use self::level::Level; + +mod stack; +pub(crate) use self::stack::Stack; + +use std::borrow::Borrow; +use std::usize; + +/// Timing wheel implementation. +/// +/// This type provides the hashed timing wheel implementation that backs `Timer` +/// and `DelayQueue`. +/// +/// The structure is generic over `T: Stack`. This allows handling timeout data +/// being stored on the heap or in a slab. In order to support the latter case, +/// the slab must be passed into each function allowing the implementation to +/// lookup timer entries. +/// +/// See `Timer` documentation for some implementation notes. +#[derive(Debug)] +pub(crate) struct Wheel<T> { + /// The number of milliseconds elapsed since the wheel started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec<Level<T>>, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a delay +const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +#[derive(Debug)] +pub(crate) enum InsertError { + Elapsed, + Invalid, +} + +/// Poll expirations from the wheel +#[derive(Debug, Default)] +pub(crate) struct Poll { + now: u64, + expiration: Option<Expiration>, +} + +impl<T> Wheel<T> +where + T: Stack, +{ + /// Create a new timing wheel + pub(crate) fn new() -> Wheel<T> { + let levels = (0..NUM_LEVELS).map(Level::new).collect(); + + Wheel { elapsed: 0, levels } + } + + /// Return the number of milliseconds that have elapsed since the timing + /// wheel's creation. + pub(crate) fn elapsed(&self) -> u64 { + self.elapsed + } + + /// Insert an entry into the timing wheel. + /// + /// # Arguments + /// + /// * `when`: is the instant at which the entry should be fired. It is + /// represented as the number of milliseconds since the creation + /// of the timing wheel. + /// + /// * `item`: The item to insert into the wheel. + /// + /// * `store`: The slab or `()` when using heap storage. + /// + /// # Return + /// + /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. + /// + /// `Err(Elapsed)` indicates that `when` represents an instant that has + /// already passed. In this case, the caller should fire the timeout + /// immediately. + /// + /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + pub(crate) fn insert( + &mut self, + when: u64, + item: T::Owned, + store: &mut T::Store, + ) -> Result<(), (T::Owned, InsertError)> { + if when <= self.elapsed { + return Err((item, InsertError::Elapsed)); + } else if when - self.elapsed > MAX_DURATION { + return Err((item, InsertError::Invalid)); + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + self.levels[level].add_entry(when, item, store); + + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + + Ok(()) + } + + /// Remove `item` from thee timing wheel. + pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { + let when = T::when(item, store); + let level = self.level_for(when); + + self.levels[level].remove_entry(when, item, store); + } + + /// Instant at which to poll + pub(crate) fn poll_at(&self) -> Option<u64> { + self.next_expiration().map(|expiration| expiration.deadline) + } + + pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> { + loop { + if poll.expiration.is_none() { + poll.expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > poll.now { + None + } else { + Some(expiration) + } + }); + } + + match poll.expiration { + Some(ref expiration) => { + if let Some(item) = self.poll_expiration(expiration, store) { + return Some(item); + } + + self.set_elapsed(expiration.deadline); + } + None => { + self.set_elapsed(poll.now); + return None; + } + } + + poll.expiration = None; + } + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option<Expiration> { + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + + return Some(expiration); + } + } + + None + } + + /// Used for debug assertions + fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { + let mut res = true; + + for l2 in start_level..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < before { + res = false; + } + } + } + + res + } + + pub(crate) fn poll_expiration( + &mut self, + expiration: &Expiration, + store: &mut T::Store, + ) -> Option<T::Owned> { + while let Some(item) = self.pop_entry(expiration, store) { + if expiration.level == 0 { + debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline); + + return Some(item); + } else { + let when = T::when(item.borrow(), store); + + let next_level = expiration.level - 1; + + self.levels[next_level].add_entry(when, item, store); + } + } + + None + } + + fn set_elapsed(&mut self, when: u64) { + assert!( + self.elapsed <= when, + "elapsed={:?}; when={:?}", + self.elapsed, + when + ); + + if when > self.elapsed { + self.elapsed = when; + } + } + + fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> { + self.levels[expiration.level].pop_entry_slot(expiration.slot, store) + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + let masked = elapsed ^ when; + + assert!(masked != 0, "elapsed={}; when={}", elapsed, when); + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + significant / 6 +} + +impl Poll { + pub(crate) fn new(now: u64) -> Poll { + Poll { + now, + expiration: None, + } + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 1..64 { + assert_eq!( + 0, + level_for(0, pos), + "level_for({}) -- binary = {:b}", + pos, + pos + ); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + + if pos > level { + let a = a - 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + + if pos < 64 { + let a = a + 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + } + } + } +} diff --git a/third_party/rust/tokio/src/time/wheel/stack.rs b/third_party/rust/tokio/src/time/wheel/stack.rs new file mode 100644 index 0000000000..6e55c38ccd --- /dev/null +++ b/third_party/rust/tokio/src/time/wheel/stack.rs @@ -0,0 +1,26 @@ +use std::borrow::Borrow; + +/// Abstracts the stack operations needed to track timeouts. +pub(crate) trait Stack: Default { + /// Type of the item stored in the stack + type Owned: Borrow<Self::Borrowed>; + + /// Borrowed item + type Borrowed; + + /// Item storage, this allows a slab to be used instead of just the heap + type Store; + + /// Returns `true` if the stack is empty + fn is_empty(&self) -> bool; + + /// Push an item onto the stack + fn push(&mut self, item: Self::Owned, store: &mut Self::Store); + + /// Pop an item from the stack + fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>; + + fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store); + + fn when(item: &Self::Borrowed, store: &Self::Store) -> u64; +} |