//! Channel that delivers messages periodically. //! //! Messages cannot be sent into this kind of channel; they are materialized on demand. use std::thread; use std::time::{Duration, Instant}; use crossbeam_utils::atomic::AtomicCell; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; use crate::utils; /// Result of a receive operation. pub(crate) type TickToken = Option; /// Channel that delivers messages periodically. pub(crate) struct Channel { /// The instant at which the next message will be delivered. delivery_time: AtomicCell, /// The time interval in which messages get delivered. duration: Duration, } impl Channel { /// Creates a channel that delivers messages periodically. #[inline] pub(crate) fn new(dur: Duration) -> Self { Channel { delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)), duration: dur, } } /// Attempts to receive a message without blocking. #[inline] pub(crate) fn try_recv(&self) -> Result { loop { let now = Instant::now(); let delivery_time = self.delivery_time.load(); if now < delivery_time { return Err(TryRecvError::Empty); } if self .delivery_time .compare_exchange(delivery_time, now + self.duration) .is_ok() { return Ok(delivery_time); } } } /// Receives a message from the channel. #[inline] pub(crate) fn recv(&self, deadline: Option) -> Result { loop { let delivery_time = self.delivery_time.load(); let now = Instant::now(); if let Some(d) = deadline { if d < delivery_time { if now < d { thread::sleep(d - now); } return Err(RecvTimeoutError::Timeout); } } if self .delivery_time .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) .is_ok() { if now < delivery_time { thread::sleep(delivery_time - now); } return Ok(delivery_time); } } } /// Reads a message from the channel. #[inline] pub(crate) unsafe fn read(&self, token: &mut Token) -> Result { token.tick.ok_or(()) } /// Returns `true` if the channel is empty. #[inline] pub(crate) fn is_empty(&self) -> bool { Instant::now() < self.delivery_time.load() } /// Returns `true` if the channel is full. #[inline] pub(crate) fn is_full(&self) -> bool { !self.is_empty() } /// Returns the number of messages in the channel. #[inline] pub(crate) fn len(&self) -> usize { if self.is_empty() { 0 } else { 1 } } /// Returns the capacity of the channel. #[inline] pub(crate) fn capacity(&self) -> Option { Some(1) } } impl SelectHandle for Channel { #[inline] fn try_select(&self, token: &mut Token) -> bool { match self.try_recv() { Ok(msg) => { token.tick = Some(msg); true } Err(TryRecvError::Disconnected) => { token.tick = None; true } Err(TryRecvError::Empty) => false, } } #[inline] fn deadline(&self) -> Option { Some(self.delivery_time.load()) } #[inline] fn register(&self, _oper: Operation, _cx: &Context) -> bool { self.is_ready() } #[inline] fn unregister(&self, _oper: Operation) {} #[inline] fn accept(&self, token: &mut Token, _cx: &Context) -> bool { self.try_select(token) } #[inline] fn is_ready(&self) -> bool { !self.is_empty() } #[inline] fn watch(&self, _oper: Operation, _cx: &Context) -> bool { self.is_ready() } #[inline] fn unwatch(&self, _oper: Operation) {} }