diff options
Diffstat (limited to 'third_party/rust/crossbeam-channel/src/flavors/at.rs')
-rw-r--r-- | third_party/rust/crossbeam-channel/src/flavors/at.rs | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/flavors/at.rs b/third_party/rust/crossbeam-channel/src/flavors/at.rs new file mode 100644 index 0000000000..ca5ee60f52 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/at.rs @@ -0,0 +1,202 @@ +//! Channel that delivers a message at a certain moment in time. +//! +//! Messages cannot be sent into this kind of channel; they are materialized on demand. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::{Duration, Instant}; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, TryRecvError}; +use crate::select::{Operation, SelectHandle, Token}; +use crate::utils; + +/// Result of a receive operation. +pub(crate) type AtToken = Option<Instant>; + +/// Channel that delivers a message at a certain moment in time +pub(crate) struct Channel { + /// The instant at which the message will be delivered. + delivery_time: Instant, + + /// `true` if the message has been received. + received: AtomicBool, +} + +impl Channel { + /// Creates a channel that delivers a message at a certain instant in time. + #[inline] + pub(crate) fn new_deadline(when: Instant) -> Self { + Channel { + delivery_time: when, + received: AtomicBool::new(false), + } + } + /// Creates a channel that delivers a message after a certain duration of time. + #[inline] + pub(crate) fn new_timeout(dur: Duration) -> Self { + Self::new_deadline(utils::convert_timeout_to_deadline(dur)) + } + + /// Attempts to receive a message without blocking. + #[inline] + pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + // The message has already been received. + return Err(TryRecvError::Empty); + } + + if Instant::now() < self.delivery_time { + // The message was not delivered yet. + return Err(TryRecvError::Empty); + } + + // Try receiving the message if it is still available. + if !self.received.swap(true, Ordering::SeqCst) { + // Success! Return delivery time as the message. + Ok(self.delivery_time) + } else { + // The message was already received. + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + #[inline] + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + // The message has already been received. + utils::sleep_until(deadline); + return Err(RecvTimeoutError::Timeout); + } + + // Wait until the message is received or the deadline is reached. + loop { + let now = Instant::now(); + + let deadline = match deadline { + // Check if we can receive the next message. + _ if now >= self.delivery_time => break, + // Check if the timeout deadline has been reached. + Some(d) if now >= d => return Err(RecvTimeoutError::Timeout), + + // Sleep until one of the above happens + Some(d) if d < self.delivery_time => d, + _ => self.delivery_time, + }; + + thread::sleep(deadline - now); + } + + // Try receiving the message if it is still available. + if !self.received.swap(true, Ordering::SeqCst) { + // Success! Return the message, which is the instant at which it was delivered. + Ok(self.delivery_time) + } else { + // The message was already received. Block forever. + utils::sleep_until(None); + unreachable!() + } + } + + /// Reads a message from the channel. + #[inline] + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + token.at.ok_or(()) + } + + /// Returns `true` if the channel is empty. + #[inline] + pub(crate) fn is_empty(&self) -> bool { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + return true; + } + + // If the delivery time hasn't been reached yet, the channel is empty. + if Instant::now() < self.delivery_time { + return true; + } + + // The delivery time has been reached. The channel is empty only if the message has already + // been received. + self.received.load(Ordering::SeqCst) + } + + /// Returns `true` if the channel is full. + #[inline] + pub(crate) fn is_full(&self) -> bool { + !self.is_empty() + } + + /// Returns the number of messages in the channel. + #[inline] + pub(crate) fn len(&self) -> usize { + if self.is_empty() { + 0 + } else { + 1 + } + } + + /// Returns the capacity of the channel. + #[inline] + pub(crate) fn capacity(&self) -> Option<usize> { + Some(1) + } +} + +impl SelectHandle for Channel { + #[inline] + fn try_select(&self, token: &mut Token) -> bool { + match self.try_recv() { + Ok(msg) => { + token.at = Some(msg); + true + } + Err(TryRecvError::Disconnected) => { + token.at = None; + true + } + Err(TryRecvError::Empty) => false, + } + } + + #[inline] + fn deadline(&self) -> Option<Instant> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + None + } else { + Some(self.delivery_time) + } + } + + #[inline] + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unregister(&self, _oper: Operation) {} + + #[inline] + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + #[inline] + fn is_ready(&self) -> bool { + !self.is_empty() + } + + #[inline] + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unwatch(&self, _oper: Operation) {} +} |