summaryrefslogtreecommitdiffstats
path: root/vendor/crossbeam-channel/src
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/crossbeam-channel/src')
-rw-r--r--vendor/crossbeam-channel/src/channel.rs30
-rw-r--r--vendor/crossbeam-channel/src/flavors/at.rs7
-rw-r--r--vendor/crossbeam-channel/src/flavors/list.rs11
-rw-r--r--vendor/crossbeam-channel/src/flavors/tick.rs5
-rw-r--r--vendor/crossbeam-channel/src/select.rs10
-rw-r--r--vendor/crossbeam-channel/src/utils.rs8
6 files changed, 44 insertions, 27 deletions
diff --git a/vendor/crossbeam-channel/src/channel.rs b/vendor/crossbeam-channel/src/channel.rs
index 800fe6352..bd241156f 100644
--- a/vendor/crossbeam-channel/src/channel.rs
+++ b/vendor/crossbeam-channel/src/channel.rs
@@ -14,7 +14,6 @@ use crate::err::{
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
-use crate::utils;
/// Creates a channel of unbounded capacity.
///
@@ -160,7 +159,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
/// let ms = |ms| Duration::from_millis(ms);
///
/// // Returns `true` if `a` and `b` are very close `Instant`s.
-/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
+/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
///
/// let start = Instant::now();
/// let r = after(ms(100));
@@ -172,8 +171,11 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
/// assert!(eq(Instant::now(), start + ms(500)));
/// ```
pub fn after(duration: Duration) -> Receiver<Instant> {
- Receiver {
- flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
+ match Instant::now().checked_add(duration) {
+ Some(deadline) => Receiver {
+ flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
+ },
+ None => never(),
}
}
@@ -320,8 +322,14 @@ pub fn never<T>() -> Receiver<T> {
/// assert!(eq(Instant::now(), start + ms(700)));
/// ```
pub fn tick(duration: Duration) -> Receiver<Instant> {
- Receiver {
- flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
+ match Instant::now().checked_add(duration) {
+ Some(delivery_time) => Receiver {
+ flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
+ delivery_time,
+ duration,
+ ))),
+ },
+ None => never(),
}
}
@@ -474,7 +482,10 @@ impl<T> Sender<T> {
/// );
/// ```
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
- self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout))
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.send_deadline(msg, deadline),
+ None => self.send(msg).map_err(SendTimeoutError::from),
+ }
}
/// Waits for a message to be sent into the channel, but only until a given deadline.
@@ -864,7 +875,10 @@ impl<T> Receiver<T> {
/// );
/// ```
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- self.recv_deadline(utils::convert_timeout_to_deadline(timeout))
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ None => self.recv().map_err(RecvTimeoutError::from),
+ }
}
/// Waits for a message to be received from the channel, but only before a given deadline.
diff --git a/vendor/crossbeam-channel/src/flavors/at.rs b/vendor/crossbeam-channel/src/flavors/at.rs
index ca5ee60f5..515c4e33b 100644
--- a/vendor/crossbeam-channel/src/flavors/at.rs
+++ b/vendor/crossbeam-channel/src/flavors/at.rs
@@ -4,7 +4,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
-use std::time::{Duration, Instant};
+use std::time::Instant;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
@@ -32,11 +32,6 @@ impl Channel {
received: AtomicBool::new(false),
}
}
- /// Creates a channel that delivers a message after a certain duration of time.
- #[inline]
- pub(crate) fn new_timeout(dur: Duration) -> Self {
- Self::new_deadline(utils::convert_timeout_to_deadline(dur))
- }
/// Attempts to receive a message without blocking.
#[inline]
diff --git a/vendor/crossbeam-channel/src/flavors/list.rs b/vendor/crossbeam-channel/src/flavors/list.rs
index 6090b8d47..230edd8d2 100644
--- a/vendor/crossbeam-channel/src/flavors/list.rs
+++ b/vendor/crossbeam-channel/src/flavors/list.rs
@@ -584,6 +584,17 @@ impl<T> Channel<T> {
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
+ // If we're going to be dropping messages we need to synchronize with initialization
+ if head >> SHIFT != tail >> SHIFT {
+ // The block can be null here only if a sender is in the process of initializing the
+ // channel while another sender managed to send a message by inserting it into the
+ // semi-initialized channel and advanced the tail.
+ // In that case, just wait until it gets initialized.
+ while block.is_null() {
+ backoff.snooze();
+ block = self.head.block.load(Ordering::Acquire);
+ }
+ }
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
diff --git a/vendor/crossbeam-channel/src/flavors/tick.rs b/vendor/crossbeam-channel/src/flavors/tick.rs
index 4201b6eb0..d38f6a594 100644
--- a/vendor/crossbeam-channel/src/flavors/tick.rs
+++ b/vendor/crossbeam-channel/src/flavors/tick.rs
@@ -10,7 +10,6 @@ use crossbeam_utils::atomic::AtomicCell;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
-use crate::utils;
/// Result of a receive operation.
pub(crate) type TickToken = Option<Instant>;
@@ -27,9 +26,9 @@ pub(crate) struct Channel {
impl Channel {
/// Creates a channel that delivers messages periodically.
#[inline]
- pub(crate) fn new(dur: Duration) -> Self {
+ pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
Channel {
- delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)),
+ delivery_time: AtomicCell::new(delivery_time),
duration: dur,
}
}
diff --git a/vendor/crossbeam-channel/src/select.rs b/vendor/crossbeam-channel/src/select.rs
index 57d67a3a1..3eb0b97c8 100644
--- a/vendor/crossbeam-channel/src/select.rs
+++ b/vendor/crossbeam-channel/src/select.rs
@@ -487,7 +487,10 @@ pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
- select_deadline(handles, utils::convert_timeout_to_deadline(timeout))
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => select_deadline(handles, deadline),
+ None => Ok(select(handles)),
+ }
}
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
@@ -1045,7 +1048,10 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
- self.ready_deadline(utils::convert_timeout_to_deadline(timeout))
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.ready_deadline(deadline),
+ None => Ok(self.ready()),
+ }
}
/// Blocks until a given deadline, or until one of the operations becomes ready.
diff --git a/vendor/crossbeam-channel/src/utils.rs b/vendor/crossbeam-channel/src/utils.rs
index 9f14c8e65..f623f2708 100644
--- a/vendor/crossbeam-channel/src/utils.rs
+++ b/vendor/crossbeam-channel/src/utils.rs
@@ -56,11 +56,3 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
}
}
}
-
-// https://github.com/crossbeam-rs/crossbeam/issues/795
-pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant {
- match Instant::now().checked_add(timeout) {
- Some(deadline) => deadline,
- None => Instant::now() + Duration::from_secs(86400 * 365 * 30),
- }
-}