diff options
Diffstat (limited to 'third_party/rust/crossbeam-channel/src/flavors/tick.rs')
-rw-r--r-- | third_party/rust/crossbeam-channel/src/flavors/tick.rs | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/flavors/tick.rs b/third_party/rust/crossbeam-channel/src/flavors/tick.rs new file mode 100644 index 0000000000..e8e7020ca8 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/tick.rs @@ -0,0 +1,167 @@ +//! Channel that delivers messages periodically. +//! +//! Messages cannot be sent into this kind of channel; they are materialized on demand. + +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_utils::atomic::AtomicCell; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, TryRecvError}; +use crate::select::{Operation, SelectHandle, Token}; + +/// Result of a receive operation. +pub type TickToken = Option<Instant>; + +/// Channel that delivers messages periodically. +pub struct Channel { + /// The instant at which the next message will be delivered. + delivery_time: AtomicCell<Instant>, + + /// The time interval in which messages get delivered. + duration: Duration, +} + +impl Channel { + /// Creates a channel that delivers messages periodically. + #[inline] + pub fn new(dur: Duration) -> Self { + Channel { + delivery_time: AtomicCell::new(Instant::now() + dur), + duration: dur, + } + } + + /// Attempts to receive a message without blocking. + #[inline] + pub fn try_recv(&self) -> Result<Instant, TryRecvError> { + loop { + let now = Instant::now(); + let delivery_time = self.delivery_time.load(); + + if now < delivery_time { + return Err(TryRecvError::Empty); + } + + if self + .delivery_time + .compare_exchange(delivery_time, now + self.duration) + .is_ok() + { + return Ok(delivery_time); + } + } + } + + /// Receives a message from the channel. + #[inline] + pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + loop { + let delivery_time = self.delivery_time.load(); + let now = Instant::now(); + + if let Some(d) = deadline { + if d < delivery_time { + if now < d { + thread::sleep(d - now); + } + return Err(RecvTimeoutError::Timeout); + } + } + + if self + .delivery_time + .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) + .is_ok() + { + if now < delivery_time { + thread::sleep(delivery_time - now); + } + return Ok(delivery_time); + } + } + } + + /// Reads a message from the channel. + #[inline] + pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + token.tick.ok_or(()) + } + + /// Returns `true` if the channel is empty. + #[inline] + pub fn is_empty(&self) -> bool { + Instant::now() < self.delivery_time.load() + } + + /// Returns `true` if the channel is full. + #[inline] + pub fn is_full(&self) -> bool { + !self.is_empty() + } + + /// Returns the number of messages in the channel. + #[inline] + pub fn len(&self) -> usize { + if self.is_empty() { + 0 + } else { + 1 + } + } + + /// Returns the capacity of the channel. + #[inline] + pub fn capacity(&self) -> Option<usize> { + Some(1) + } +} + +impl SelectHandle for Channel { + #[inline] + fn try_select(&self, token: &mut Token) -> bool { + match self.try_recv() { + Ok(msg) => { + token.tick = Some(msg); + true + } + Err(TryRecvError::Disconnected) => { + token.tick = None; + true + } + Err(TryRecvError::Empty) => false, + } + } + + #[inline] + fn deadline(&self) -> Option<Instant> { + Some(self.delivery_time.load()) + } + + #[inline] + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unregister(&self, _oper: Operation) {} + + #[inline] + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + #[inline] + fn is_ready(&self) -> bool { + !self.is_empty() + } + + #[inline] + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unwatch(&self, _oper: Operation) {} +} |