diff options
Diffstat (limited to 'third_party/rust/mio-extras/src')
-rw-r--r-- | third_party/rust/mio-extras/src/channel.rs | 431 | ||||
-rw-r--r-- | third_party/rust/mio-extras/src/lib.rs | 33 | ||||
-rw-r--r-- | third_party/rust/mio-extras/src/timer.rs | 751 |
3 files changed, 1215 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs new file mode 100644 index 0000000000..9ebc73bb7f --- /dev/null +++ b/third_party/rust/mio-extras/src/channel.rs @@ -0,0 +1,431 @@ +//! Thread safe communication channel implementing `Evented` +use lazycell::{AtomicLazyCell, LazyCell}; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use std::any::Any; +use std::error; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{mpsc, Arc}; +use std::{fmt, io}; + +/// Creates a new asynchronous channel, where the `Receiver` can be registered +/// with `Poll`. +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::channel(); + + let tx = Sender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +/// Creates a new synchronous, bounded channel where the `Receiver` can be +/// registered with `Poll`. +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::sync_channel(bound); + + let tx = SyncSender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +fn ctl_pair() -> (SenderCtl, ReceiverCtl) { + let inner = Arc::new(Inner { + pending: AtomicUsize::new(0), + senders: AtomicUsize::new(1), + set_readiness: AtomicLazyCell::new(), + }); + + let tx = SenderCtl { + inner: Arc::clone(&inner), + }; + + let rx = ReceiverCtl { + registration: LazyCell::new(), + inner, + }; + + (tx, rx) +} + +/// Tracks messages sent on a channel in order to update readiness. +struct SenderCtl { + inner: Arc<Inner>, +} + +/// Tracks messages received on a channel in order to track readiness. +struct ReceiverCtl { + registration: LazyCell<Registration>, + inner: Arc<Inner>, +} + +/// The sending half of a channel. +pub struct Sender<T> { + tx: mpsc::Sender<T>, + ctl: SenderCtl, +} + +/// The sending half of a synchronous channel. +pub struct SyncSender<T> { + tx: mpsc::SyncSender<T>, + ctl: SenderCtl, +} + +/// The receiving half of a channel. +pub struct Receiver<T> { + rx: mpsc::Receiver<T>, + ctl: ReceiverCtl, +} + +/// An error returned from the `Sender::send` or `SyncSender::send` function. +pub enum SendError<T> { + /// An IO error. + Io(io::Error), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +/// An error returned from the `SyncSender::try_send` function. +pub enum TrySendError<T> { + /// An IO error. + Io(io::Error), + + /// Data could not be sent because it would require the callee to block. + Full(T), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +struct Inner { + // The number of outstanding messages for the receiver to read + pending: AtomicUsize, + // The number of sender handles + senders: AtomicUsize, + // The set readiness handle + set_readiness: AtomicLazyCell<SetReadiness>, +} + +impl<T> Sender<T> { + /// Attempts to send a value on this channel, returning it back if it could not be sent. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(SendError::from).and_then(|_| { + try!(self.ctl.inc()); + Ok(()) + }) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> SyncSender<T> { + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(From::from).and_then(|_| { + try!(self.ctl.inc()); + Ok(()) + }) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send` by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.tx.try_send(t).map_err(From::from).and_then(|_| { + try!(self.ctl.inc()); + Ok(()) + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> Receiver<T> { + /// Attempts to return a pending value on this receiver without blocking. + pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { + self.rx.try_recv().and_then(|res| { + let _ = self.ctl.dec(); + Ok(res) + }) + } +} + +impl<T> Evented for Receiver<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +/* + * + * ===== SenderCtl / ReceiverCtl ===== + * + */ + +impl SenderCtl { + /// Call to track that a message has been sent + fn inc(&self) -> io::Result<()> { + let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire); + + if 0 == cnt { + // Toggle readiness to readable + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + try!(set_readiness.set_readiness(Ready::readable())); + } + } + + Ok(()) + } +} + +impl Clone for SenderCtl { + fn clone(&self) -> SenderCtl { + self.inner.senders.fetch_add(1, Ordering::Relaxed); + SenderCtl { + inner: Arc::clone(&self.inner), + } + } +} + +impl Drop for SenderCtl { + fn drop(&mut self) { + if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 { + let _ = self.inc(); + } + } +} + +impl ReceiverCtl { + fn dec(&self) -> io::Result<()> { + let first = self.inner.pending.load(Ordering::Acquire); + + if first == 1 { + // Unset readiness + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + try!(set_readiness.set_readiness(Ready::empty())); + } + } + + // Decrement + let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel); + + if first == 1 && second > 1 { + // There are still pending messages. Since readiness was + // previously unset, it must be reset here + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + try!(set_readiness.set_readiness(Ready::readable())); + } + } + + Ok(()) + } +} + +impl Evented for ReceiverCtl { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.registration.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "receiver already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + + if self.inner.pending.load(Ordering::Relaxed) > 0 { + // TODO: Don't drop readiness + let _ = set_readiness.set_readiness(Ready::readable()); + } + + self.registration + .fill(registration) + .expect("unexpected state encountered"); + self.inner + .set_readiness + .fill(set_readiness) + .expect("unexpected state encountered"); + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.reregister(registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.deregister(registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +/* + * + * ===== Error conversions ===== + * + */ + +impl<T> From<mpsc::SendError<T>> for SendError<T> { + fn from(src: mpsc::SendError<T>) -> SendError<T> { + SendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for SendError<T> { + fn from(src: io::Error) -> SendError<T> { + SendError::Io(src) + } +} + +impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> { + fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> { + match src { + mpsc::TrySendError::Full(v) => TrySendError::Full(v), + mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v), + } + } +} + +impl<T> From<mpsc::SendError<T>> for TrySendError<T> { + fn from(src: mpsc::SendError<T>) -> TrySendError<T> { + TrySendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for TrySendError<T> { + fn from(src: io::Error) -> TrySendError<T> { + TrySendError::Io(src) + } +} + +/* + * + * ===== Implement Error, Debug and Display for Errors ===== + * + */ + +impl<T: Any> error::Error for SendError<T> { + fn description(&self) -> &str { + match *self { + SendError::Io(ref io_err) => io_err.description(), + SendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T: Any> error::Error for TrySendError<T> { + fn description(&self) -> &str { + match *self { + TrySendError::Io(ref io_err) => io_err.description(), + TrySendError::Full(..) => "Full", + TrySendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +#[inline] +fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + SendError::Io(ref io_err) => write!(f, "{}", io_err), + SendError::Disconnected(..) => write!(f, "Disconnected"), + } +} + +#[inline] +fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + TrySendError::Io(ref io_err) => write!(f, "{}", io_err), + TrySendError::Full(..) => write!(f, "Full"), + TrySendError::Disconnected(..) => write!(f, "Disconnected"), + } +} diff --git a/third_party/rust/mio-extras/src/lib.rs b/third_party/rust/mio-extras/src/lib.rs new file mode 100644 index 0000000000..69a000556c --- /dev/null +++ b/third_party/rust/mio-extras/src/lib.rs @@ -0,0 +1,33 @@ +//! Extra components for use with Mio. +#![deny(missing_docs)] +extern crate lazycell; +extern crate mio; +extern crate slab; + +#[macro_use] +extern crate log; + +pub mod channel; +pub mod timer; + +// Conversion utilities +mod convert { + use std::time::Duration; + + const NANOS_PER_MILLI: u32 = 1_000_000; + const MILLIS_PER_SEC: u64 = 1_000; + + /// Convert a `Duration` to milliseconds, rounding up and saturating at + /// `u64::MAX`. + /// + /// The saturating is fine because `u64::MAX` milliseconds are still many + /// million years. + pub fn millis(duration: Duration) -> u64 { + // Round up. + let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI; + duration + .as_secs() + .saturating_mul(MILLIS_PER_SEC) + .saturating_add(u64::from(millis)) + } +} diff --git a/third_party/rust/mio-extras/src/timer.rs b/third_party/rust/mio-extras/src/timer.rs new file mode 100644 index 0000000000..adf8f2774a --- /dev/null +++ b/third_party/rust/mio-extras/src/timer.rs @@ -0,0 +1,751 @@ +//! Timer optimized for I/O related operations +use convert; +use lazycell::LazyCell; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use slab::Slab; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{cmp, fmt, io, iter, thread, u64, usize}; + +/// A timer. +/// +/// Typical usage goes like this: +/// +/// * register the timer with a `mio::Poll`. +/// * set a timeout, by calling `Timer::set_timeout`. Here you provide some +/// state to be associated with this timeout. +/// * poll the `Poll`, to learn when a timeout has occurred. +/// * retrieve state associated with the timeout by calling `Timer::poll`. +/// +/// You can omit use of the `Poll` altogether, if you like, and just poll the +/// `Timer` directly. +pub struct Timer<T> { + // Size of each tick in milliseconds + tick_ms: u64, + // Slab of timeout entries + entries: Slab<Entry<T>>, + // Timeout wheel. Each tick, the timer will look at the next slot for + // timeouts that match the current tick. + wheel: Vec<WheelEntry>, + // Tick 0's time instant + start: Instant, + // The current tick + tick: Tick, + // The next entry to possibly timeout + next: Token, + // Masks the target tick to get the slot + mask: u64, + // Set on registration with Poll + inner: LazyCell<Inner>, +} + +/// Used to create a `Timer`. +pub struct Builder { + // Approximate duration of each tick + tick: Duration, + // Number of slots in the timer wheel + num_slots: usize, + // Max number of timeouts that can be in flight at a given time. + capacity: usize, +} + +/// A timeout, as returned by `Timer::set_timeout`. +/// +/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout. +#[derive(Clone, Debug)] +pub struct Timeout { + // Reference into the timer entry slab + token: Token, + // Tick that it should match up with + tick: u64, +} + +struct Inner { + registration: Registration, + set_readiness: SetReadiness, + wakeup_state: WakeupState, + wakeup_thread: thread::JoinHandle<()>, +} + +impl Drop for Inner { + fn drop(&mut self) { + // 1. Set wakeup state to TERMINATE_THREAD + self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release); + // 2. Wake him up + self.wakeup_thread.thread().unpark(); + } +} + +#[derive(Copy, Clone, Debug)] +struct WheelEntry { + next_tick: Tick, + head: Token, +} + +// Doubly linked list of timer entries. Allows for efficient insertion / +// removal of timeouts. +struct Entry<T> { + state: T, + links: EntryLinks, +} + +#[derive(Copy, Clone)] +struct EntryLinks { + tick: Tick, + prev: Token, + next: Token, +} + +type Tick = u64; + +const TICK_MAX: Tick = u64::MAX; + +// Manages communication with wakeup thread +type WakeupState = Arc<AtomicUsize>; + +const TERMINATE_THREAD: usize = 0; +const EMPTY: Token = Token(usize::MAX); + +impl Builder { + /// Set the tick duration. Default is 100ms. + pub fn tick_duration(mut self, duration: Duration) -> Builder { + self.tick = duration; + self + } + + /// Set the number of slots. Default is 256. + pub fn num_slots(mut self, num_slots: usize) -> Builder { + self.num_slots = num_slots; + self + } + + /// Set the capacity. Default is 65536. + pub fn capacity(mut self, capacity: usize) -> Builder { + self.capacity = capacity; + self + } + + /// Build a `Timer` with the parameters set on this `Builder`. + pub fn build<T>(self) -> Timer<T> { + Timer::new( + convert::millis(self.tick), + self.num_slots, + self.capacity, + Instant::now(), + ) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder { + tick: Duration::from_millis(100), + num_slots: 1 << 8, + capacity: 1 << 16, + } + } +} + +impl<T> Timer<T> { + fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> { + let num_slots = num_slots.next_power_of_two(); + let capacity = capacity.next_power_of_two(); + let mask = (num_slots as u64) - 1; + let wheel = iter::repeat(WheelEntry { + next_tick: TICK_MAX, + head: EMPTY, + }).take(num_slots) + .collect(); + + Timer { + tick_ms, + entries: Slab::with_capacity(capacity), + wheel, + start, + tick: 0, + next: EMPTY, + mask, + inner: LazyCell::new(), + } + } + + /// Set a timeout. + /// + /// When the timeout occurs, the given state becomes available via `poll`. + pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout { + let delay_from_start = self.start.elapsed() + delay_from_now; + self.set_timeout_at(delay_from_start, state) + } + + fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout { + let mut tick = duration_to_tick(delay_from_start, self.tick_ms); + trace!( + "setting timeout; delay={:?}; tick={:?}; current-tick={:?}", + delay_from_start, + tick, + self.tick + ); + + // Always target at least 1 tick in the future + if tick <= self.tick { + tick = self.tick + 1; + } + + self.insert(tick, state) + } + + fn insert(&mut self, tick: Tick, state: T) -> Timeout { + // Get the slot for the requested tick + let slot = (tick & self.mask) as usize; + let curr = self.wheel[slot]; + + // Insert the new entry + let entry = Entry::new(state, tick, curr.head); + let token = Token(self.entries.insert(entry)); + + if curr.head != EMPTY { + // If there was a previous entry, set its prev pointer to the new + // entry + self.entries[curr.head.into()].links.prev = token; + } + + // Update the head slot + self.wheel[slot] = WheelEntry { + next_tick: cmp::min(tick, curr.next_tick), + head: token, + }; + + self.schedule_readiness(tick); + + trace!("inserted timout; slot={}; token={:?}", slot, token); + + // Return the new timeout + Timeout { token, tick } + } + + /// Cancel a timeout. + /// + /// If the timeout has not yet occurred, the return value holds the + /// associated state. + pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> { + let links = match self.entries.get(timeout.token.into()) { + Some(e) => e.links, + None => return None, + }; + + // Sanity check + if links.tick != timeout.tick { + return None; + } + + self.unlink(&links, timeout.token); + Some(self.entries.remove(timeout.token.into()).state) + } + + /// Poll for an expired timer. + /// + /// The return value holds the state associated with the first expired + /// timer, if any. + pub fn poll(&mut self) -> Option<T> { + let target_tick = current_tick(self.start, self.tick_ms); + self.poll_to(target_tick) + } + + fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> { + trace!( + "tick_to; target_tick={}; current_tick={}", + target_tick, + self.tick + ); + + if target_tick < self.tick { + target_tick = self.tick; + } + + while self.tick <= target_tick { + let curr = self.next; + + trace!("ticking; curr={:?}", curr); + + if curr == EMPTY { + self.tick += 1; + + let slot = self.slot_for(self.tick); + self.next = self.wheel[slot].head; + + // Handle the case when a slot has a single timeout which gets + // canceled before the timeout expires. In this case, the + // slot's head is EMPTY but there is a value for next_tick. Not + // resetting next_tick here causes the timer to get stuck in a + // loop. + if self.next == EMPTY { + self.wheel[slot].next_tick = TICK_MAX; + } + } else { + let slot = self.slot_for(self.tick); + + if curr == self.wheel[slot].head { + self.wheel[slot].next_tick = TICK_MAX; + } + + let links = self.entries[curr.into()].links; + + if links.tick <= self.tick { + trace!("triggering; token={:?}", curr); + + // Unlink will also advance self.next + self.unlink(&links, curr); + + // Remove and return the token + return Some(self.entries.remove(curr.into()).state); + } else { + let next_tick = self.wheel[slot].next_tick; + self.wheel[slot].next_tick = cmp::min(next_tick, links.tick); + self.next = links.next; + } + } + } + + // No more timeouts to poll + if let Some(inner) = self.inner.borrow() { + trace!("unsetting readiness"); + let _ = inner.set_readiness.set_readiness(Ready::empty()); + + if let Some(tick) = self.next_tick() { + self.schedule_readiness(tick); + } + } + + None + } + + fn unlink(&mut self, links: &EntryLinks, token: Token) { + trace!( + "unlinking timeout; slot={}; token={:?}", + self.slot_for(links.tick), + token + ); + + if links.prev == EMPTY { + let slot = self.slot_for(links.tick); + self.wheel[slot].head = links.next; + } else { + self.entries[links.prev.into()].links.next = links.next; + } + + if links.next != EMPTY { + self.entries[links.next.into()].links.prev = links.prev; + + if token == self.next { + self.next = links.next; + } + } else if token == self.next { + self.next = EMPTY; + } + } + + fn schedule_readiness(&self, tick: Tick) { + if let Some(inner) = self.inner.borrow() { + // Coordinate setting readiness w/ the wakeup thread + let mut curr = inner.wakeup_state.load(Ordering::Acquire); + + loop { + if curr as Tick <= tick { + // Nothing to do, wakeup is already scheduled + return; + } + + // Attempt to move the wakeup time forward + trace!("advancing the wakeup time; target={}; curr={}", tick, curr); + let actual = + inner + .wakeup_state + .compare_and_swap(curr, tick as usize, Ordering::Release); + + if actual == curr { + // Signal to the wakeup thread that the wakeup time has + // been changed. + trace!("unparking wakeup thread"); + inner.wakeup_thread.thread().unpark(); + return; + } + + curr = actual; + } + } + } + + // Next tick containing a timeout + fn next_tick(&self) -> Option<Tick> { + if self.next != EMPTY { + let slot = self.slot_for(self.entries[self.next.into()].links.tick); + + if self.wheel[slot].next_tick == self.tick { + // There is data ready right now + return Some(self.tick); + } + } + + self.wheel.iter().map(|e| e.next_tick).min() + } + + fn slot_for(&self, tick: Tick) -> usize { + (self.mask & tick) as usize + } +} + +impl<T> Default for Timer<T> { + fn default() -> Timer<T> { + Builder::default().build() + } +} + +impl<T> Evented for Timer<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.inner.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "timer already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX)); + let thread_handle = spawn_wakeup_thread( + Arc::clone(&wakeup_state), + set_readiness.clone(), + self.start, + self.tick_ms, + ); + + self.inner + .fill(Inner { + registration, + set_readiness, + wakeup_state, + wakeup_thread: thread_handle, + }) + .expect("timer already registered"); + + if let Some(next_tick) = self.next_tick() { + self.schedule_readiness(next_tick); + } + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.reregister(&inner.registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.deregister(&inner.registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Inner") + .field("registration", &self.registration) + .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed)) + .finish() + } +} + +fn spawn_wakeup_thread( + state: WakeupState, + set_readiness: SetReadiness, + start: Instant, + tick_ms: u64, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick; + + loop { + if sleep_until_tick == TERMINATE_THREAD as Tick { + return; + } + + let now_tick = current_tick(start, tick_ms); + + trace!( + "wakeup thread: sleep_until_tick={:?}; now_tick={:?}", + sleep_until_tick, + now_tick + ); + + if now_tick < sleep_until_tick { + // Calling park_timeout with u64::MAX leads to undefined + // behavior in pthread, causing the park to return immediately + // and causing the thread to tightly spin. Instead of u64::MAX + // on large values, simply use a blocking park. + match tick_ms.checked_mul(sleep_until_tick - now_tick) { + Some(sleep_duration) => { + trace!( + "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}", + tick_ms, + now_tick, + sleep_until_tick, + sleep_duration + ); + thread::park_timeout(Duration::from_millis(sleep_duration)); + } + None => { + trace!( + "sleeping; tick_ms={}; now_tick={}; blocking sleep", + tick_ms, + now_tick + ); + thread::park(); + } + } + sleep_until_tick = state.load(Ordering::Acquire) as Tick; + } else { + let actual = + state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel) + as Tick; + + if actual == sleep_until_tick { + trace!("setting readiness from wakeup thread"); + let _ = set_readiness.set_readiness(Ready::readable()); + sleep_until_tick = usize::MAX as Tick; + } else { + sleep_until_tick = actual as Tick; + } + } + } + }) +} + +fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick { + // Calculate tick rounding up to the closest one + let elapsed_ms = convert::millis(elapsed); + elapsed_ms.saturating_add(tick_ms / 2) / tick_ms +} + +fn current_tick(start: Instant, tick_ms: u64) -> Tick { + duration_to_tick(start.elapsed(), tick_ms) +} + +impl<T> Entry<T> { + fn new(state: T, tick: u64, next: Token) -> Entry<T> { + Entry { + state, + links: EntryLinks { + tick, + prev: EMPTY, + next, + }, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::{Duration, Instant}; + + #[test] + pub fn test_timeout_next_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + + tick = ms_to_tick(&t, 50); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 150); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_clearing_timeout() { + let mut t = timer(); + let mut tick; + + let to = t.set_timeout_at(Duration::from_millis(100), "a"); + assert_eq!("a", t.cancel_timeout(&to).unwrap()); + + tick = ms_to_tick(&t, 100); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_same_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100), "b"); + + let mut rcv = vec![]; + + tick = ms_to_tick(&t, 100); + rcv.push(t.poll_to(tick).unwrap()); + rcv.push(t.poll_to(tick).unwrap()); + + assert_eq!(None, t.poll_to(tick)); + + rcv.sort(); + assert!(rcv == ["a", "b"], "actual={:?}", rcv); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_diff_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + t.set_timeout_at(Duration::from_millis(560), "e"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 300); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 400); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 500); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 600); + assert_eq!(Some("e"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_catching_up() { + let mut t = timer(); + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + + let tick = ms_to_tick(&t, 600); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_timeout_hash_collision() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + #[test] + pub fn test_clearing_timeout_between_triggers() { + let mut t = timer(); + let mut tick; + + let a = t.set_timeout_at(Duration::from_millis(100), "a"); + let _ = t.set_timeout_at(Duration::from_millis(100), "b"); + let _ = t.set_timeout_at(Duration::from_millis(200), "c"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(2, count(&t)); + + t.cancel_timeout(&a); + assert_eq!(1, count(&t)); + + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + const TICK: u64 = 100; + const SLOTS: usize = 16; + const CAPACITY: usize = 32; + + fn count<T>(timer: &Timer<T>) -> usize { + timer.entries.len() + } + + fn timer() -> Timer<&'static str> { + Timer::new(TICK, SLOTS, CAPACITY, Instant::now()) + } + + fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 { + ms / timer.tick_ms + } + +} |