diff options
Diffstat (limited to 'vendor/crossbeam-channel/src')
-rw-r--r-- | vendor/crossbeam-channel/src/channel.rs | 30 | ||||
-rw-r--r-- | vendor/crossbeam-channel/src/flavors/at.rs | 7 | ||||
-rw-r--r-- | vendor/crossbeam-channel/src/flavors/list.rs | 11 | ||||
-rw-r--r-- | vendor/crossbeam-channel/src/flavors/tick.rs | 5 | ||||
-rw-r--r-- | vendor/crossbeam-channel/src/select.rs | 10 | ||||
-rw-r--r-- | vendor/crossbeam-channel/src/utils.rs | 8 |
6 files changed, 44 insertions, 27 deletions
diff --git a/vendor/crossbeam-channel/src/channel.rs b/vendor/crossbeam-channel/src/channel.rs index 800fe6352..bd241156f 100644 --- a/vendor/crossbeam-channel/src/channel.rs +++ b/vendor/crossbeam-channel/src/channel.rs @@ -14,7 +14,6 @@ use crate::err::{ }; use crate::flavors; use crate::select::{Operation, SelectHandle, Token}; -use crate::utils; /// Creates a channel of unbounded capacity. /// @@ -160,7 +159,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. -/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a; /// /// let start = Instant::now(); /// let r = after(ms(100)); @@ -172,8 +171,11 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { /// assert!(eq(Instant::now(), start + ms(500))); /// ``` pub fn after(duration: Duration) -> Receiver<Instant> { - Receiver { - flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))), + match Instant::now().checked_add(duration) { + Some(deadline) => Receiver { + flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))), + }, + None => never(), } } @@ -320,8 +322,14 @@ pub fn never<T>() -> Receiver<T> { /// assert!(eq(Instant::now(), start + ms(700))); /// ``` pub fn tick(duration: Duration) -> Receiver<Instant> { - Receiver { - flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))), + match Instant::now().checked_add(duration) { + Some(delivery_time) => Receiver { + flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new( + delivery_time, + duration, + ))), + }, + None => never(), } } @@ -474,7 +482,10 @@ impl<T> Sender<T> { /// ); /// ``` pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { - self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout)) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.send_deadline(msg, deadline), + None => self.send(msg).map_err(SendTimeoutError::from), + } } /// Waits for a message to be sent into the channel, but only until a given deadline. @@ -864,7 +875,10 @@ impl<T> Receiver<T> { /// ); /// ``` pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { - self.recv_deadline(utils::convert_timeout_to_deadline(timeout)) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + None => self.recv().map_err(RecvTimeoutError::from), + } } /// Waits for a message to be received from the channel, but only before a given deadline. diff --git a/vendor/crossbeam-channel/src/flavors/at.rs b/vendor/crossbeam-channel/src/flavors/at.rs index ca5ee60f5..515c4e33b 100644 --- a/vendor/crossbeam-channel/src/flavors/at.rs +++ b/vendor/crossbeam-channel/src/flavors/at.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; @@ -32,11 +32,6 @@ impl Channel { received: AtomicBool::new(false), } } - /// Creates a channel that delivers a message after a certain duration of time. - #[inline] - pub(crate) fn new_timeout(dur: Duration) -> Self { - Self::new_deadline(utils::convert_timeout_to_deadline(dur)) - } /// Attempts to receive a message without blocking. #[inline] diff --git a/vendor/crossbeam-channel/src/flavors/list.rs b/vendor/crossbeam-channel/src/flavors/list.rs index 6090b8d47..230edd8d2 100644 --- a/vendor/crossbeam-channel/src/flavors/list.rs +++ b/vendor/crossbeam-channel/src/flavors/list.rs @@ -584,6 +584,17 @@ impl<T> Channel<T> { let mut head = self.head.index.load(Ordering::Acquire); let mut block = self.head.block.load(Ordering::Acquire); + // If we're going to be dropping messages we need to synchronize with initialization + if head >> SHIFT != tail >> SHIFT { + // The block can be null here only if a sender is in the process of initializing the + // channel while another sender managed to send a message by inserting it into the + // semi-initialized channel and advanced the tail. + // In that case, just wait until it gets initialized. + while block.is_null() { + backoff.snooze(); + block = self.head.block.load(Ordering::Acquire); + } + } unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. while head >> SHIFT != tail >> SHIFT { diff --git a/vendor/crossbeam-channel/src/flavors/tick.rs b/vendor/crossbeam-channel/src/flavors/tick.rs index 4201b6eb0..d38f6a594 100644 --- a/vendor/crossbeam-channel/src/flavors/tick.rs +++ b/vendor/crossbeam-channel/src/flavors/tick.rs @@ -10,7 +10,6 @@ use crossbeam_utils::atomic::AtomicCell; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; -use crate::utils; /// Result of a receive operation. pub(crate) type TickToken = Option<Instant>; @@ -27,9 +26,9 @@ pub(crate) struct Channel { impl Channel { /// Creates a channel that delivers messages periodically. #[inline] - pub(crate) fn new(dur: Duration) -> Self { + pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self { Channel { - delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)), + delivery_time: AtomicCell::new(delivery_time), duration: dur, } } diff --git a/vendor/crossbeam-channel/src/select.rs b/vendor/crossbeam-channel/src/select.rs index 57d67a3a1..3eb0b97c8 100644 --- a/vendor/crossbeam-channel/src/select.rs +++ b/vendor/crossbeam-channel/src/select.rs @@ -487,7 +487,10 @@ pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { - select_deadline(handles, utils::convert_timeout_to_deadline(timeout)) + match Instant::now().checked_add(timeout) { + Some(deadline) => select_deadline(handles, deadline), + None => Ok(select(handles)), + } } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. @@ -1045,7 +1048,10 @@ impl<'a> Select<'a> { /// } /// ``` pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { - self.ready_deadline(utils::convert_timeout_to_deadline(timeout)) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.ready_deadline(deadline), + None => Ok(self.ready()), + } } /// Blocks until a given deadline, or until one of the operations becomes ready. diff --git a/vendor/crossbeam-channel/src/utils.rs b/vendor/crossbeam-channel/src/utils.rs index 9f14c8e65..f623f2708 100644 --- a/vendor/crossbeam-channel/src/utils.rs +++ b/vendor/crossbeam-channel/src/utils.rs @@ -56,11 +56,3 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) { } } } - -// https://github.com/crossbeam-rs/crossbeam/issues/795 -pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant { - match Instant::now().checked_add(timeout) { - Some(deadline) => deadline, - None => Instant::now() + Duration::from_secs(86400 * 365 * 30), - } -} |