diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/mio-extras/src/timer.rs | 751 |
1 files changed, 751 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/src/timer.rs b/third_party/rust/mio-extras/src/timer.rs new file mode 100644 index 0000000000..adf8f2774a --- /dev/null +++ b/third_party/rust/mio-extras/src/timer.rs @@ -0,0 +1,751 @@ +//! Timer optimized for I/O related operations +use convert; +use lazycell::LazyCell; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use slab::Slab; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{cmp, fmt, io, iter, thread, u64, usize}; + +/// A timer. +/// +/// Typical usage goes like this: +/// +/// * register the timer with a `mio::Poll`. +/// * set a timeout, by calling `Timer::set_timeout`. Here you provide some +/// state to be associated with this timeout. +/// * poll the `Poll`, to learn when a timeout has occurred. +/// * retrieve state associated with the timeout by calling `Timer::poll`. +/// +/// You can omit use of the `Poll` altogether, if you like, and just poll the +/// `Timer` directly. +pub struct Timer<T> { + // Size of each tick in milliseconds + tick_ms: u64, + // Slab of timeout entries + entries: Slab<Entry<T>>, + // Timeout wheel. Each tick, the timer will look at the next slot for + // timeouts that match the current tick. + wheel: Vec<WheelEntry>, + // Tick 0's time instant + start: Instant, + // The current tick + tick: Tick, + // The next entry to possibly timeout + next: Token, + // Masks the target tick to get the slot + mask: u64, + // Set on registration with Poll + inner: LazyCell<Inner>, +} + +/// Used to create a `Timer`. +pub struct Builder { + // Approximate duration of each tick + tick: Duration, + // Number of slots in the timer wheel + num_slots: usize, + // Max number of timeouts that can be in flight at a given time. + capacity: usize, +} + +/// A timeout, as returned by `Timer::set_timeout`. +/// +/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout. +#[derive(Clone, Debug)] +pub struct Timeout { + // Reference into the timer entry slab + token: Token, + // Tick that it should match up with + tick: u64, +} + +struct Inner { + registration: Registration, + set_readiness: SetReadiness, + wakeup_state: WakeupState, + wakeup_thread: thread::JoinHandle<()>, +} + +impl Drop for Inner { + fn drop(&mut self) { + // 1. Set wakeup state to TERMINATE_THREAD + self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release); + // 2. Wake him up + self.wakeup_thread.thread().unpark(); + } +} + +#[derive(Copy, Clone, Debug)] +struct WheelEntry { + next_tick: Tick, + head: Token, +} + +// Doubly linked list of timer entries. Allows for efficient insertion / +// removal of timeouts. +struct Entry<T> { + state: T, + links: EntryLinks, +} + +#[derive(Copy, Clone)] +struct EntryLinks { + tick: Tick, + prev: Token, + next: Token, +} + +type Tick = u64; + +const TICK_MAX: Tick = u64::MAX; + +// Manages communication with wakeup thread +type WakeupState = Arc<AtomicUsize>; + +const TERMINATE_THREAD: usize = 0; +const EMPTY: Token = Token(usize::MAX); + +impl Builder { + /// Set the tick duration. Default is 100ms. + pub fn tick_duration(mut self, duration: Duration) -> Builder { + self.tick = duration; + self + } + + /// Set the number of slots. Default is 256. + pub fn num_slots(mut self, num_slots: usize) -> Builder { + self.num_slots = num_slots; + self + } + + /// Set the capacity. Default is 65536. + pub fn capacity(mut self, capacity: usize) -> Builder { + self.capacity = capacity; + self + } + + /// Build a `Timer` with the parameters set on this `Builder`. + pub fn build<T>(self) -> Timer<T> { + Timer::new( + convert::millis(self.tick), + self.num_slots, + self.capacity, + Instant::now(), + ) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder { + tick: Duration::from_millis(100), + num_slots: 1 << 8, + capacity: 1 << 16, + } + } +} + +impl<T> Timer<T> { + fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> { + let num_slots = num_slots.next_power_of_two(); + let capacity = capacity.next_power_of_two(); + let mask = (num_slots as u64) - 1; + let wheel = iter::repeat(WheelEntry { + next_tick: TICK_MAX, + head: EMPTY, + }).take(num_slots) + .collect(); + + Timer { + tick_ms, + entries: Slab::with_capacity(capacity), + wheel, + start, + tick: 0, + next: EMPTY, + mask, + inner: LazyCell::new(), + } + } + + /// Set a timeout. + /// + /// When the timeout occurs, the given state becomes available via `poll`. + pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout { + let delay_from_start = self.start.elapsed() + delay_from_now; + self.set_timeout_at(delay_from_start, state) + } + + fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout { + let mut tick = duration_to_tick(delay_from_start, self.tick_ms); + trace!( + "setting timeout; delay={:?}; tick={:?}; current-tick={:?}", + delay_from_start, + tick, + self.tick + ); + + // Always target at least 1 tick in the future + if tick <= self.tick { + tick = self.tick + 1; + } + + self.insert(tick, state) + } + + fn insert(&mut self, tick: Tick, state: T) -> Timeout { + // Get the slot for the requested tick + let slot = (tick & self.mask) as usize; + let curr = self.wheel[slot]; + + // Insert the new entry + let entry = Entry::new(state, tick, curr.head); + let token = Token(self.entries.insert(entry)); + + if curr.head != EMPTY { + // If there was a previous entry, set its prev pointer to the new + // entry + self.entries[curr.head.into()].links.prev = token; + } + + // Update the head slot + self.wheel[slot] = WheelEntry { + next_tick: cmp::min(tick, curr.next_tick), + head: token, + }; + + self.schedule_readiness(tick); + + trace!("inserted timout; slot={}; token={:?}", slot, token); + + // Return the new timeout + Timeout { token, tick } + } + + /// Cancel a timeout. + /// + /// If the timeout has not yet occurred, the return value holds the + /// associated state. + pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> { + let links = match self.entries.get(timeout.token.into()) { + Some(e) => e.links, + None => return None, + }; + + // Sanity check + if links.tick != timeout.tick { + return None; + } + + self.unlink(&links, timeout.token); + Some(self.entries.remove(timeout.token.into()).state) + } + + /// Poll for an expired timer. + /// + /// The return value holds the state associated with the first expired + /// timer, if any. + pub fn poll(&mut self) -> Option<T> { + let target_tick = current_tick(self.start, self.tick_ms); + self.poll_to(target_tick) + } + + fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> { + trace!( + "tick_to; target_tick={}; current_tick={}", + target_tick, + self.tick + ); + + if target_tick < self.tick { + target_tick = self.tick; + } + + while self.tick <= target_tick { + let curr = self.next; + + trace!("ticking; curr={:?}", curr); + + if curr == EMPTY { + self.tick += 1; + + let slot = self.slot_for(self.tick); + self.next = self.wheel[slot].head; + + // Handle the case when a slot has a single timeout which gets + // canceled before the timeout expires. In this case, the + // slot's head is EMPTY but there is a value for next_tick. Not + // resetting next_tick here causes the timer to get stuck in a + // loop. + if self.next == EMPTY { + self.wheel[slot].next_tick = TICK_MAX; + } + } else { + let slot = self.slot_for(self.tick); + + if curr == self.wheel[slot].head { + self.wheel[slot].next_tick = TICK_MAX; + } + + let links = self.entries[curr.into()].links; + + if links.tick <= self.tick { + trace!("triggering; token={:?}", curr); + + // Unlink will also advance self.next + self.unlink(&links, curr); + + // Remove and return the token + return Some(self.entries.remove(curr.into()).state); + } else { + let next_tick = self.wheel[slot].next_tick; + self.wheel[slot].next_tick = cmp::min(next_tick, links.tick); + self.next = links.next; + } + } + } + + // No more timeouts to poll + if let Some(inner) = self.inner.borrow() { + trace!("unsetting readiness"); + let _ = inner.set_readiness.set_readiness(Ready::empty()); + + if let Some(tick) = self.next_tick() { + self.schedule_readiness(tick); + } + } + + None + } + + fn unlink(&mut self, links: &EntryLinks, token: Token) { + trace!( + "unlinking timeout; slot={}; token={:?}", + self.slot_for(links.tick), + token + ); + + if links.prev == EMPTY { + let slot = self.slot_for(links.tick); + self.wheel[slot].head = links.next; + } else { + self.entries[links.prev.into()].links.next = links.next; + } + + if links.next != EMPTY { + self.entries[links.next.into()].links.prev = links.prev; + + if token == self.next { + self.next = links.next; + } + } else if token == self.next { + self.next = EMPTY; + } + } + + fn schedule_readiness(&self, tick: Tick) { + if let Some(inner) = self.inner.borrow() { + // Coordinate setting readiness w/ the wakeup thread + let mut curr = inner.wakeup_state.load(Ordering::Acquire); + + loop { + if curr as Tick <= tick { + // Nothing to do, wakeup is already scheduled + return; + } + + // Attempt to move the wakeup time forward + trace!("advancing the wakeup time; target={}; curr={}", tick, curr); + let actual = + inner + .wakeup_state + .compare_and_swap(curr, tick as usize, Ordering::Release); + + if actual == curr { + // Signal to the wakeup thread that the wakeup time has + // been changed. + trace!("unparking wakeup thread"); + inner.wakeup_thread.thread().unpark(); + return; + } + + curr = actual; + } + } + } + + // Next tick containing a timeout + fn next_tick(&self) -> Option<Tick> { + if self.next != EMPTY { + let slot = self.slot_for(self.entries[self.next.into()].links.tick); + + if self.wheel[slot].next_tick == self.tick { + // There is data ready right now + return Some(self.tick); + } + } + + self.wheel.iter().map(|e| e.next_tick).min() + } + + fn slot_for(&self, tick: Tick) -> usize { + (self.mask & tick) as usize + } +} + +impl<T> Default for Timer<T> { + fn default() -> Timer<T> { + Builder::default().build() + } +} + +impl<T> Evented for Timer<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.inner.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "timer already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX)); + let thread_handle = spawn_wakeup_thread( + Arc::clone(&wakeup_state), + set_readiness.clone(), + self.start, + self.tick_ms, + ); + + self.inner + .fill(Inner { + registration, + set_readiness, + wakeup_state, + wakeup_thread: thread_handle, + }) + .expect("timer already registered"); + + if let Some(next_tick) = self.next_tick() { + self.schedule_readiness(next_tick); + } + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.reregister(&inner.registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.deregister(&inner.registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Inner") + .field("registration", &self.registration) + .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed)) + .finish() + } +} + +fn spawn_wakeup_thread( + state: WakeupState, + set_readiness: SetReadiness, + start: Instant, + tick_ms: u64, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick; + + loop { + if sleep_until_tick == TERMINATE_THREAD as Tick { + return; + } + + let now_tick = current_tick(start, tick_ms); + + trace!( + "wakeup thread: sleep_until_tick={:?}; now_tick={:?}", + sleep_until_tick, + now_tick + ); + + if now_tick < sleep_until_tick { + // Calling park_timeout with u64::MAX leads to undefined + // behavior in pthread, causing the park to return immediately + // and causing the thread to tightly spin. Instead of u64::MAX + // on large values, simply use a blocking park. + match tick_ms.checked_mul(sleep_until_tick - now_tick) { + Some(sleep_duration) => { + trace!( + "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}", + tick_ms, + now_tick, + sleep_until_tick, + sleep_duration + ); + thread::park_timeout(Duration::from_millis(sleep_duration)); + } + None => { + trace!( + "sleeping; tick_ms={}; now_tick={}; blocking sleep", + tick_ms, + now_tick + ); + thread::park(); + } + } + sleep_until_tick = state.load(Ordering::Acquire) as Tick; + } else { + let actual = + state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel) + as Tick; + + if actual == sleep_until_tick { + trace!("setting readiness from wakeup thread"); + let _ = set_readiness.set_readiness(Ready::readable()); + sleep_until_tick = usize::MAX as Tick; + } else { + sleep_until_tick = actual as Tick; + } + } + } + }) +} + +fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick { + // Calculate tick rounding up to the closest one + let elapsed_ms = convert::millis(elapsed); + elapsed_ms.saturating_add(tick_ms / 2) / tick_ms +} + +fn current_tick(start: Instant, tick_ms: u64) -> Tick { + duration_to_tick(start.elapsed(), tick_ms) +} + +impl<T> Entry<T> { + fn new(state: T, tick: u64, next: Token) -> Entry<T> { + Entry { + state, + links: EntryLinks { + tick, + prev: EMPTY, + next, + }, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::{Duration, Instant}; + + #[test] + pub fn test_timeout_next_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + + tick = ms_to_tick(&t, 50); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 150); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_clearing_timeout() { + let mut t = timer(); + let mut tick; + + let to = t.set_timeout_at(Duration::from_millis(100), "a"); + assert_eq!("a", t.cancel_timeout(&to).unwrap()); + + tick = ms_to_tick(&t, 100); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_same_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100), "b"); + + let mut rcv = vec![]; + + tick = ms_to_tick(&t, 100); + rcv.push(t.poll_to(tick).unwrap()); + rcv.push(t.poll_to(tick).unwrap()); + + assert_eq!(None, t.poll_to(tick)); + + rcv.sort(); + assert!(rcv == ["a", "b"], "actual={:?}", rcv); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_diff_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + t.set_timeout_at(Duration::from_millis(560), "e"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 300); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 400); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 500); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 600); + assert_eq!(Some("e"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_catching_up() { + let mut t = timer(); + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + + let tick = ms_to_tick(&t, 600); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_timeout_hash_collision() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + #[test] + pub fn test_clearing_timeout_between_triggers() { + let mut t = timer(); + let mut tick; + + let a = t.set_timeout_at(Duration::from_millis(100), "a"); + let _ = t.set_timeout_at(Duration::from_millis(100), "b"); + let _ = t.set_timeout_at(Duration::from_millis(200), "c"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(2, count(&t)); + + t.cancel_timeout(&a); + assert_eq!(1, count(&t)); + + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + const TICK: u64 = 100; + const SLOTS: usize = 16; + const CAPACITY: usize = 32; + + fn count<T>(timer: &Timer<T>) -> usize { + timer.entries.len() + } + + fn timer() -> Timer<&'static str> { + Timer::new(TICK, SLOTS, CAPACITY, Instant::now()) + } + + fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 { + ms / timer.tick_ms + } + +} |