diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/crossbeam-channel/src | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-channel/src')
16 files changed, 7683 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/channel.rs b/third_party/rust/crossbeam-channel/src/channel.rs new file mode 100644 index 0000000000..ebcd652018 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/channel.rs @@ -0,0 +1,1510 @@ +//! The channel interface. + +use std::fmt; +use std::iter::FusedIterator; +use std::mem; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use crate::context::Context; +use crate::counter; +use crate::err::{ + RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, +}; +use crate::flavors; +use crate::select::{Operation, SelectHandle, Token}; + +/// Creates a channel of unbounded capacity. +/// +/// This channel has a growable buffer that can hold any number of messages at a time. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use crossbeam_channel::unbounded; +/// +/// let (s, r) = unbounded(); +/// +/// // Computes the n-th Fibonacci number. +/// fn fib(n: i32) -> i32 { +/// if n <= 1 { +/// n +/// } else { +/// fib(n - 1) + fib(n - 2) +/// } +/// } +/// +/// // Spawn an asynchronous computation. +/// thread::spawn(move || s.send(fib(20)).unwrap()); +/// +/// // Print the result of the computation. +/// println!("{}", r.recv().unwrap()); +/// ``` +pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { + let (s, r) = counter::new(flavors::list::Channel::new()); + let s = Sender { + flavor: SenderFlavor::List(s), + }; + let r = Receiver { + flavor: ReceiverFlavor::List(r), + }; + (s, r) +} + +/// Creates a channel of bounded capacity. +/// +/// This channel has a buffer that can hold at most `cap` messages at a time. +/// +/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and +/// receive operations must appear at the same time in order to pair up and pass the message over. +/// +/// # Examples +/// +/// A channel of capacity 1: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::bounded; +/// +/// let (s, r) = bounded(1); +/// +/// // This call returns immediately because there is enough space in the channel. +/// s.send(1).unwrap(); +/// +/// thread::spawn(move || { +/// // This call blocks the current thread because the channel is full. +/// // It will be able to complete only after the first message is received. +/// s.send(2).unwrap(); +/// }); +/// +/// thread::sleep(Duration::from_secs(1)); +/// assert_eq!(r.recv(), Ok(1)); +/// assert_eq!(r.recv(), Ok(2)); +/// ``` +/// +/// A zero-capacity channel: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::bounded; +/// +/// let (s, r) = bounded(0); +/// +/// thread::spawn(move || { +/// // This call blocks the current thread until a receive operation appears +/// // on the other side of the channel. +/// s.send(1).unwrap(); +/// }); +/// +/// thread::sleep(Duration::from_secs(1)); +/// assert_eq!(r.recv(), Ok(1)); +/// ``` +pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { + if cap == 0 { + let (s, r) = counter::new(flavors::zero::Channel::new()); + let s = Sender { + flavor: SenderFlavor::Zero(s), + }; + let r = Receiver { + flavor: ReceiverFlavor::Zero(r), + }; + (s, r) + } else { + let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap)); + let s = Sender { + flavor: SenderFlavor::Array(s), + }; + let r = Receiver { + flavor: ReceiverFlavor::Array(r), + }; + (s, r) + } +} + +/// Creates a receiver that delivers a message after a certain duration of time. +/// +/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will +/// be sent into the channel after `duration` elapses. The message is the instant at which it is +/// sent. +/// +/// # Examples +/// +/// Using an `after` channel for timeouts: +/// +/// ``` +/// use std::time::Duration; +/// use crossbeam_channel::{after, select, unbounded}; +/// +/// let (s, r) = unbounded::<i32>(); +/// let timeout = Duration::from_millis(100); +/// +/// select! { +/// recv(r) -> msg => println!("received {:?}", msg), +/// recv(after(timeout)) -> _ => println!("timed out"), +/// } +/// ``` +/// +/// When the message gets sent: +/// +/// ``` +/// use std::thread; +/// use std::time::{Duration, Instant}; +/// use crossbeam_channel::after; +/// +/// // Converts a number of milliseconds into a `Duration`. +/// let ms = |ms| Duration::from_millis(ms); +/// +/// // Returns `true` if `a` and `b` are very close `Instant`s. +/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// +/// let start = Instant::now(); +/// let r = after(ms(100)); +/// +/// thread::sleep(ms(500)); +/// +/// // This message was sent 100 ms from the start and received 500 ms from the start. +/// assert!(eq(r.recv().unwrap(), start + ms(100))); +/// assert!(eq(Instant::now(), start + ms(500))); +/// ``` +pub fn after(duration: Duration) -> Receiver<Instant> { + Receiver { + flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))), + } +} + +/// Creates a receiver that delivers a message at a certain instant in time. +/// +/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will +/// be sent into the channel at the moment in time `when`. The message is the instant at which it +/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered +/// instantly to the receiver. +/// +/// # Examples +/// +/// Using an `at` channel for timeouts: +/// +/// ``` +/// use std::time::{Instant, Duration}; +/// use crossbeam_channel::{at, select, unbounded}; +/// +/// let (s, r) = unbounded::<i32>(); +/// let deadline = Instant::now() + Duration::from_millis(500); +/// +/// select! { +/// recv(r) -> msg => println!("received {:?}", msg), +/// recv(at(deadline)) -> _ => println!("timed out"), +/// } +/// ``` +/// +/// When the message gets sent: +/// +/// ``` +/// use std::time::{Duration, Instant}; +/// use crossbeam_channel::at; +/// +/// // Converts a number of milliseconds into a `Duration`. +/// let ms = |ms| Duration::from_millis(ms); +/// +/// let start = Instant::now(); +/// let end = start + ms(100); +/// +/// let r = at(end); +/// +/// // This message was sent 100 ms from the start +/// assert_eq!(r.recv().unwrap(), end); +/// assert!(Instant::now() > start + ms(100)); +/// ``` +pub fn at(when: Instant) -> Receiver<Instant> { + Receiver { + flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))), + } +} + +/// Creates a receiver that never delivers messages. +/// +/// The channel is bounded with capacity of 0 and never gets disconnected. +/// +/// # Examples +/// +/// Using a `never` channel to optionally add a timeout to [`select!`]: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::{after, select, never, unbounded}; +/// +/// let (s, r) = unbounded(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_secs(1)); +/// s.send(1).unwrap(); +/// }); +/// +/// // Suppose this duration can be a `Some` or a `None`. +/// let duration = Some(Duration::from_millis(100)); +/// +/// // Create a channel that times out after the specified duration. +/// let timeout = duration +/// .map(|d| after(d)) +/// .unwrap_or(never()); +/// +/// select! { +/// recv(r) -> msg => assert_eq!(msg, Ok(1)), +/// recv(timeout) -> _ => println!("timed out"), +/// } +/// ``` +/// +/// [`select!`]: macro.select.html +pub fn never<T>() -> Receiver<T> { + Receiver { + flavor: ReceiverFlavor::Never(flavors::never::Channel::new()), + } +} + +/// Creates a receiver that delivers messages periodically. +/// +/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be +/// sent into the channel in intervals of `duration`. Each message is the instant at which it is +/// sent. +/// +/// # Examples +/// +/// Using a `tick` channel to periodically print elapsed time: +/// +/// ``` +/// use std::time::{Duration, Instant}; +/// use crossbeam_channel::tick; +/// +/// let start = Instant::now(); +/// let ticker = tick(Duration::from_millis(100)); +/// +/// for _ in 0..5 { +/// ticker.recv().unwrap(); +/// println!("elapsed: {:?}", start.elapsed()); +/// } +/// ``` +/// +/// When messages get sent: +/// +/// ``` +/// use std::thread; +/// use std::time::{Duration, Instant}; +/// use crossbeam_channel::tick; +/// +/// // Converts a number of milliseconds into a `Duration`. +/// let ms = |ms| Duration::from_millis(ms); +/// +/// // Returns `true` if `a` and `b` are very close `Instant`s. +/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// +/// let start = Instant::now(); +/// let r = tick(ms(100)); +/// +/// // This message was sent 100 ms from the start and received 100 ms from the start. +/// assert!(eq(r.recv().unwrap(), start + ms(100))); +/// assert!(eq(Instant::now(), start + ms(100))); +/// +/// thread::sleep(ms(500)); +/// +/// // This message was sent 200 ms from the start and received 600 ms from the start. +/// assert!(eq(r.recv().unwrap(), start + ms(200))); +/// assert!(eq(Instant::now(), start + ms(600))); +/// +/// // This message was sent 700 ms from the start and received 700 ms from the start. +/// assert!(eq(r.recv().unwrap(), start + ms(700))); +/// assert!(eq(Instant::now(), start + ms(700))); +/// ``` +pub fn tick(duration: Duration) -> Receiver<Instant> { + Receiver { + flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))), + } +} + +/// The sending side of a channel. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use crossbeam_channel::unbounded; +/// +/// let (s1, r) = unbounded(); +/// let s2 = s1.clone(); +/// +/// thread::spawn(move || s1.send(1).unwrap()); +/// thread::spawn(move || s2.send(2).unwrap()); +/// +/// let msg1 = r.recv().unwrap(); +/// let msg2 = r.recv().unwrap(); +/// +/// assert_eq!(msg1 + msg2, 3); +/// ``` +pub struct Sender<T> { + flavor: SenderFlavor<T>, +} + +/// Sender flavors. +enum SenderFlavor<T> { + /// Bounded channel based on a preallocated array. + Array(counter::Sender<flavors::array::Channel<T>>), + + /// Unbounded channel implemented as a linked list. + List(counter::Sender<flavors::list::Channel<T>>), + + /// Zero-capacity channel. + Zero(counter::Sender<flavors::zero::Channel<T>>), +} + +unsafe impl<T: Send> Send for Sender<T> {} +unsafe impl<T: Send> Sync for Sender<T> {} + +impl<T> UnwindSafe for Sender<T> {} +impl<T> RefUnwindSafe for Sender<T> {} + +impl<T> Sender<T> { + /// Attempts to send a message into the channel without blocking. + /// + /// This method will either send a message into the channel immediately or return an error if + /// the channel is full or disconnected. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will send the message only if there + /// happens to be a receive operation on the other side of the channel at the same time. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, TrySendError}; + /// + /// let (s, r) = bounded(1); + /// + /// assert_eq!(s.try_send(1), Ok(())); + /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); + /// + /// drop(r); + /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3))); + /// ``` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.try_send(msg), + SenderFlavor::List(chan) => chan.try_send(msg), + SenderFlavor::Zero(chan) => chan.try_send(msg), + } + } + + /// Blocks the current thread until a message is sent or the channel is disconnected. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed. If the channel becomes disconnected, this call will wake up and return an + /// error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{bounded, SendError}; + /// + /// let (s, r) = bounded(1); + /// assert_eq!(s.send(1), Ok(())); + /// + /// thread::spawn(move || { + /// assert_eq!(r.recv(), Ok(1)); + /// thread::sleep(Duration::from_secs(1)); + /// drop(r); + /// }); + /// + /// assert_eq!(s.send(2), Ok(())); + /// assert_eq!(s.send(3), Err(SendError(3))); + /// ``` + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.send(msg, None), + SenderFlavor::List(chan) => chan.send(msg, None), + SenderFlavor::Zero(chan) => chan.send(msg, None), + } + .map_err(|err| match err { + SendTimeoutError::Disconnected(msg) => SendError(msg), + SendTimeoutError::Timeout(_) => unreachable!(), + }) + } + + /// Waits for a message to be sent into the channel, but only for a limited time. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{bounded, SendTimeoutError}; + /// + /// let (s, r) = bounded(0); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// assert_eq!(r.recv(), Ok(2)); + /// drop(r); + /// }); + /// + /// assert_eq!( + /// s.send_timeout(1, Duration::from_millis(500)), + /// Err(SendTimeoutError::Timeout(1)), + /// ); + /// assert_eq!( + /// s.send_timeout(2, Duration::from_secs(1)), + /// Ok(()), + /// ); + /// assert_eq!( + /// s.send_timeout(3, Duration::from_millis(500)), + /// Err(SendTimeoutError::Disconnected(3)), + /// ); + /// ``` + pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { + self.send_deadline(msg, Instant::now() + timeout) + } + + /// Waits for a message to be sent into the channel, but only until a given deadline. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use crossbeam_channel::{bounded, SendTimeoutError}; + /// + /// let (s, r) = bounded(0); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// assert_eq!(r.recv(), Ok(2)); + /// drop(r); + /// }); + /// + /// let now = Instant::now(); + /// + /// assert_eq!( + /// s.send_deadline(1, now + Duration::from_millis(500)), + /// Err(SendTimeoutError::Timeout(1)), + /// ); + /// assert_eq!( + /// s.send_deadline(2, now + Duration::from_millis(1500)), + /// Ok(()), + /// ); + /// assert_eq!( + /// s.send_deadline(3, now + Duration::from_millis(2000)), + /// Err(SendTimeoutError::Disconnected(3)), + /// ); + /// ``` + pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), + SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), + SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), + } + } + + /// Returns `true` if the channel is empty. + /// + /// Note: Zero-capacity channels are always empty. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// assert!(s.is_empty()); + /// + /// s.send(0).unwrap(); + /// assert!(!s.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.is_empty(), + SenderFlavor::List(chan) => chan.is_empty(), + SenderFlavor::Zero(chan) => chan.is_empty(), + } + } + + /// Returns `true` if the channel is full. + /// + /// Note: Zero-capacity channels are always full. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::bounded; + /// + /// let (s, r) = bounded(1); + /// + /// assert!(!s.is_full()); + /// s.send(0).unwrap(); + /// assert!(s.is_full()); + /// ``` + pub fn is_full(&self) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.is_full(), + SenderFlavor::List(chan) => chan.is_full(), + SenderFlavor::Zero(chan) => chan.is_full(), + } + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(s.len(), 0); + /// + /// s.send(1).unwrap(); + /// s.send(2).unwrap(); + /// assert_eq!(s.len(), 2); + /// ``` + pub fn len(&self) -> usize { + match &self.flavor { + SenderFlavor::Array(chan) => chan.len(), + SenderFlavor::List(chan) => chan.len(), + SenderFlavor::Zero(chan) => chan.len(), + } + } + + /// If the channel is bounded, returns its capacity. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, unbounded}; + /// + /// let (s, _) = unbounded::<i32>(); + /// assert_eq!(s.capacity(), None); + /// + /// let (s, _) = bounded::<i32>(5); + /// assert_eq!(s.capacity(), Some(5)); + /// + /// let (s, _) = bounded::<i32>(0); + /// assert_eq!(s.capacity(), Some(0)); + /// ``` + pub fn capacity(&self) -> Option<usize> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.capacity(), + SenderFlavor::List(chan) => chan.capacity(), + SenderFlavor::Zero(chan) => chan.capacity(), + } + } + + /// Returns `true` if senders belong to the same channel. + /// + /// # Examples + /// + /// ```rust + /// use crossbeam_channel::unbounded; + /// + /// let (s, _) = unbounded::<usize>(); + /// + /// let s2 = s.clone(); + /// assert!(s.same_channel(&s2)); + /// + /// let (s3, _) = unbounded(); + /// assert!(!s.same_channel(&s3)); + /// ``` + pub fn same_channel(&self, other: &Sender<T>) -> bool { + match (&self.flavor, &other.flavor) { + (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, + (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, + (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, + _ => false, + } + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + unsafe { + match &self.flavor { + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::List(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), + } + } + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + let flavor = match &self.flavor { + SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), + SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), + SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), + }; + + Sender { flavor } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Sender { .. }") + } +} + +/// The receiving side of a channel. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::unbounded; +/// +/// let (s, r) = unbounded(); +/// +/// thread::spawn(move || { +/// let _ = s.send(1); +/// thread::sleep(Duration::from_secs(1)); +/// let _ = s.send(2); +/// }); +/// +/// assert_eq!(r.recv(), Ok(1)); // Received immediately. +/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second. +/// ``` +pub struct Receiver<T> { + flavor: ReceiverFlavor<T>, +} + +/// Receiver flavors. +enum ReceiverFlavor<T> { + /// Bounded channel based on a preallocated array. + Array(counter::Receiver<flavors::array::Channel<T>>), + + /// Unbounded channel implemented as a linked list. + List(counter::Receiver<flavors::list::Channel<T>>), + + /// Zero-capacity channel. + Zero(counter::Receiver<flavors::zero::Channel<T>>), + + /// The after flavor. + At(Arc<flavors::at::Channel>), + + /// The tick flavor. + Tick(Arc<flavors::tick::Channel>), + + /// The never flavor. + Never(flavors::never::Channel<T>), +} + +unsafe impl<T: Send> Send for Receiver<T> {} +unsafe impl<T: Send> Sync for Receiver<T> {} + +impl<T> UnwindSafe for Receiver<T> {} +impl<T> RefUnwindSafe for Receiver<T> {} + +impl<T> Receiver<T> { + /// Attempts to receive a message from the channel without blocking. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + /// + /// If called on a zero-capacity channel, this method will receive a message only if there + /// happens to be a send operation on the other side of the channel at the same time. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, TryRecvError}; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + /// + /// s.send(5).unwrap(); + /// drop(s); + /// + /// assert_eq!(r.try_recv(), Ok(5)); + /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + /// ``` + pub fn try_recv(&self) -> Result<T, TryRecvError> { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.try_recv(), + ReceiverFlavor::List(chan) => chan.try_recv(), + ReceiverFlavor::Zero(chan) => chan.try_recv(), + ReceiverFlavor::At(chan) => { + let msg = chan.try_recv(); + unsafe { + mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( + &msg, + ) + } + } + ReceiverFlavor::Tick(chan) => { + let msg = chan.try_recv(); + unsafe { + mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( + &msg, + ) + } + } + ReceiverFlavor::Never(chan) => chan.try_recv(), + } + } + + /// Blocks the current thread until a message is received or the channel is empty and + /// disconnected. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed. If the channel is empty and becomes disconnected, this call will + /// wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s.send(5).unwrap(); + /// drop(s); + /// }); + /// + /// assert_eq!(r.recv(), Ok(5)); + /// assert_eq!(r.recv(), Err(RecvError)); + /// ``` + pub fn recv(&self) -> Result<T, RecvError> { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.recv(None), + ReceiverFlavor::List(chan) => chan.recv(None), + ReceiverFlavor::Zero(chan) => chan.recv(None), + ReceiverFlavor::At(chan) => { + let msg = chan.recv(None); + unsafe { + mem::transmute_copy::< + Result<Instant, RecvTimeoutError>, + Result<T, RecvTimeoutError>, + >(&msg) + } + } + ReceiverFlavor::Tick(chan) => { + let msg = chan.recv(None); + unsafe { + mem::transmute_copy::< + Result<Instant, RecvTimeoutError>, + Result<T, RecvTimeoutError>, + >(&msg) + } + } + ReceiverFlavor::Never(chan) => chan.recv(None), + } + .map_err(|_| RecvError) + } + + /// Waits for a message to be received from the channel, but only for a limited time. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed or the operation times out. If the channel is empty and becomes + /// disconnected, this call will wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, RecvTimeoutError}; + /// + /// let (s, r) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s.send(5).unwrap(); + /// drop(s); + /// }); + /// + /// assert_eq!( + /// r.recv_timeout(Duration::from_millis(500)), + /// Err(RecvTimeoutError::Timeout), + /// ); + /// assert_eq!( + /// r.recv_timeout(Duration::from_secs(1)), + /// Ok(5), + /// ); + /// assert_eq!( + /// r.recv_timeout(Duration::from_secs(1)), + /// Err(RecvTimeoutError::Disconnected), + /// ); + /// ``` + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + self.recv_deadline(Instant::now() + timeout) + } + + /// Waits for a message to be received from the channel, but only before a given deadline. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed or the operation times out. If the channel is empty and becomes + /// disconnected, this call will wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::{Instant, Duration}; + /// use crossbeam_channel::{unbounded, RecvTimeoutError}; + /// + /// let (s, r) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s.send(5).unwrap(); + /// drop(s); + /// }); + /// + /// let now = Instant::now(); + /// + /// assert_eq!( + /// r.recv_deadline(now + Duration::from_millis(500)), + /// Err(RecvTimeoutError::Timeout), + /// ); + /// assert_eq!( + /// r.recv_deadline(now + Duration::from_millis(1500)), + /// Ok(5), + /// ); + /// assert_eq!( + /// r.recv_deadline(now + Duration::from_secs(5)), + /// Err(RecvTimeoutError::Disconnected), + /// ); + /// ``` + pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), + ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), + ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), + ReceiverFlavor::At(chan) => { + let msg = chan.recv(Some(deadline)); + unsafe { + mem::transmute_copy::< + Result<Instant, RecvTimeoutError>, + Result<T, RecvTimeoutError>, + >(&msg) + } + } + ReceiverFlavor::Tick(chan) => { + let msg = chan.recv(Some(deadline)); + unsafe { + mem::transmute_copy::< + Result<Instant, RecvTimeoutError>, + Result<T, RecvTimeoutError>, + >(&msg) + } + } + ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)), + } + } + + /// Returns `true` if the channel is empty. + /// + /// Note: Zero-capacity channels are always empty. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// + /// assert!(r.is_empty()); + /// s.send(0).unwrap(); + /// assert!(!r.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.is_empty(), + ReceiverFlavor::List(chan) => chan.is_empty(), + ReceiverFlavor::Zero(chan) => chan.is_empty(), + ReceiverFlavor::At(chan) => chan.is_empty(), + ReceiverFlavor::Tick(chan) => chan.is_empty(), + ReceiverFlavor::Never(chan) => chan.is_empty(), + } + } + + /// Returns `true` if the channel is full. + /// + /// Note: Zero-capacity channels are always full. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::bounded; + /// + /// let (s, r) = bounded(1); + /// + /// assert!(!r.is_full()); + /// s.send(0).unwrap(); + /// assert!(r.is_full()); + /// ``` + pub fn is_full(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.is_full(), + ReceiverFlavor::List(chan) => chan.is_full(), + ReceiverFlavor::Zero(chan) => chan.is_full(), + ReceiverFlavor::At(chan) => chan.is_full(), + ReceiverFlavor::Tick(chan) => chan.is_full(), + ReceiverFlavor::Never(chan) => chan.is_full(), + } + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(r.len(), 0); + /// + /// s.send(1).unwrap(); + /// s.send(2).unwrap(); + /// assert_eq!(r.len(), 2); + /// ``` + pub fn len(&self) -> usize { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.len(), + ReceiverFlavor::List(chan) => chan.len(), + ReceiverFlavor::Zero(chan) => chan.len(), + ReceiverFlavor::At(chan) => chan.len(), + ReceiverFlavor::Tick(chan) => chan.len(), + ReceiverFlavor::Never(chan) => chan.len(), + } + } + + /// If the channel is bounded, returns its capacity. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, unbounded}; + /// + /// let (_, r) = unbounded::<i32>(); + /// assert_eq!(r.capacity(), None); + /// + /// let (_, r) = bounded::<i32>(5); + /// assert_eq!(r.capacity(), Some(5)); + /// + /// let (_, r) = bounded::<i32>(0); + /// assert_eq!(r.capacity(), Some(0)); + /// ``` + pub fn capacity(&self) -> Option<usize> { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.capacity(), + ReceiverFlavor::List(chan) => chan.capacity(), + ReceiverFlavor::Zero(chan) => chan.capacity(), + ReceiverFlavor::At(chan) => chan.capacity(), + ReceiverFlavor::Tick(chan) => chan.capacity(), + ReceiverFlavor::Never(chan) => chan.capacity(), + } + } + + /// A blocking iterator over messages in the channel. + /// + /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if + /// the channel becomes empty and disconnected, it returns [`None`] without blocking. + /// + /// [`next`]: Iterator::next + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// + /// thread::spawn(move || { + /// s.send(1).unwrap(); + /// s.send(2).unwrap(); + /// s.send(3).unwrap(); + /// drop(s); // Disconnect the channel. + /// }); + /// + /// // Collect all messages from the channel. + /// // Note that the call to `collect` blocks until the sender is dropped. + /// let v: Vec<_> = r.iter().collect(); + /// + /// assert_eq!(v, [1, 2, 3]); + /// ``` + pub fn iter(&self) -> Iter<'_, T> { + Iter { receiver: self } + } + + /// A non-blocking iterator over messages in the channel. + /// + /// Each call to [`next`] returns a message if there is one ready to be received. The iterator + /// never blocks waiting for the next message. + /// + /// [`next`]: Iterator::next + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded::<i32>(); + /// + /// thread::spawn(move || { + /// s.send(1).unwrap(); + /// thread::sleep(Duration::from_secs(1)); + /// s.send(2).unwrap(); + /// thread::sleep(Duration::from_secs(2)); + /// s.send(3).unwrap(); + /// }); + /// + /// thread::sleep(Duration::from_secs(2)); + /// + /// // Collect all messages from the channel without blocking. + /// // The third message hasn't been sent yet so we'll collect only the first two. + /// let v: Vec<_> = r.try_iter().collect(); + /// + /// assert_eq!(v, [1, 2]); + /// ``` + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter { receiver: self } + } + + /// Returns `true` if receivers belong to the same channel. + /// + /// # Examples + /// + /// ```rust + /// use crossbeam_channel::unbounded; + /// + /// let (_, r) = unbounded::<usize>(); + /// + /// let r2 = r.clone(); + /// assert!(r.same_channel(&r2)); + /// + /// let (_, r3) = unbounded(); + /// assert!(!r.same_channel(&r3)); + /// ``` + pub fn same_channel(&self, other: &Receiver<T>) -> bool { + match (&self.flavor, &other.flavor) { + (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, + (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, + (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, + (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b), + (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b), + (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true, + _ => false, + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + unsafe { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::At(_) => {} + ReceiverFlavor::Tick(_) => {} + ReceiverFlavor::Never(_) => {} + } + } + } +} + +impl<T> Clone for Receiver<T> { + fn clone(&self) -> Self { + let flavor = match &self.flavor { + ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), + ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), + ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), + ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()), + ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()), + ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()), + }; + + Receiver { flavor } + } +} + +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Receiver { .. }") + } +} + +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { receiver: self } + } +} + +/// A blocking iterator over messages in a channel. +/// +/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the +/// channel becomes empty and disconnected, it returns [`None`] without blocking. +/// +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use crossbeam_channel::unbounded; +/// +/// let (s, r) = unbounded(); +/// +/// thread::spawn(move || { +/// s.send(1).unwrap(); +/// s.send(2).unwrap(); +/// s.send(3).unwrap(); +/// drop(s); // Disconnect the channel. +/// }); +/// +/// // Collect all messages from the channel. +/// // Note that the call to `collect` blocks until the sender is dropped. +/// let v: Vec<_> = r.iter().collect(); +/// +/// assert_eq!(v, [1, 2, 3]); +/// ``` +pub struct Iter<'a, T> { + receiver: &'a Receiver<T>, +} + +impl<T> FusedIterator for Iter<'_, T> {} + +impl<T> Iterator for Iter<'_, T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.recv().ok() + } +} + +impl<T> fmt::Debug for Iter<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Iter { .. }") + } +} + +/// A non-blocking iterator over messages in a channel. +/// +/// Each call to [`next`] returns a message if there is one ready to be received. The iterator +/// never blocks waiting for the next message. +/// +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::unbounded; +/// +/// let (s, r) = unbounded::<i32>(); +/// +/// thread::spawn(move || { +/// s.send(1).unwrap(); +/// thread::sleep(Duration::from_secs(1)); +/// s.send(2).unwrap(); +/// thread::sleep(Duration::from_secs(2)); +/// s.send(3).unwrap(); +/// }); +/// +/// thread::sleep(Duration::from_secs(2)); +/// +/// // Collect all messages from the channel without blocking. +/// // The third message hasn't been sent yet so we'll collect only the first two. +/// let v: Vec<_> = r.try_iter().collect(); +/// +/// assert_eq!(v, [1, 2]); +/// ``` +pub struct TryIter<'a, T> { + receiver: &'a Receiver<T>, +} + +impl<T> Iterator for TryIter<'_, T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.try_recv().ok() + } +} + +impl<T> fmt::Debug for TryIter<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("TryIter { .. }") + } +} + +/// A blocking iterator over messages in a channel. +/// +/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the +/// channel becomes empty and disconnected, it returns [`None`] without blocking. +/// +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use crossbeam_channel::unbounded; +/// +/// let (s, r) = unbounded(); +/// +/// thread::spawn(move || { +/// s.send(1).unwrap(); +/// s.send(2).unwrap(); +/// s.send(3).unwrap(); +/// drop(s); // Disconnect the channel. +/// }); +/// +/// // Collect all messages from the channel. +/// // Note that the call to `collect` blocks until the sender is dropped. +/// let v: Vec<_> = r.into_iter().collect(); +/// +/// assert_eq!(v, [1, 2, 3]); +/// ``` +pub struct IntoIter<T> { + receiver: Receiver<T>, +} + +impl<T> FusedIterator for IntoIter<T> {} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.recv().ok() + } +} + +impl<T> fmt::Debug for IntoIter<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("IntoIter { .. }") + } +} + +impl<T> SelectHandle for Sender<T> { + fn try_select(&self, token: &mut Token) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().try_select(token), + SenderFlavor::List(chan) => chan.sender().try_select(token), + SenderFlavor::Zero(chan) => chan.sender().try_select(token), + } + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().register(oper, cx), + SenderFlavor::List(chan) => chan.sender().register(oper, cx), + SenderFlavor::Zero(chan) => chan.sender().register(oper, cx), + } + } + + fn unregister(&self, oper: Operation) { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().unregister(oper), + SenderFlavor::List(chan) => chan.sender().unregister(oper), + SenderFlavor::Zero(chan) => chan.sender().unregister(oper), + } + } + + fn accept(&self, token: &mut Token, cx: &Context) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().accept(token, cx), + SenderFlavor::List(chan) => chan.sender().accept(token, cx), + SenderFlavor::Zero(chan) => chan.sender().accept(token, cx), + } + } + + fn is_ready(&self) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().is_ready(), + SenderFlavor::List(chan) => chan.sender().is_ready(), + SenderFlavor::Zero(chan) => chan.sender().is_ready(), + } + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().watch(oper, cx), + SenderFlavor::List(chan) => chan.sender().watch(oper, cx), + SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx), + } + } + + fn unwatch(&self, oper: Operation) { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().unwatch(oper), + SenderFlavor::List(chan) => chan.sender().unwatch(oper), + SenderFlavor::Zero(chan) => chan.sender().unwatch(oper), + } + } +} + +impl<T> SelectHandle for Receiver<T> { + fn try_select(&self, token: &mut Token) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().try_select(token), + ReceiverFlavor::List(chan) => chan.receiver().try_select(token), + ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token), + ReceiverFlavor::At(chan) => chan.try_select(token), + ReceiverFlavor::Tick(chan) => chan.try_select(token), + ReceiverFlavor::Never(chan) => chan.try_select(token), + } + } + + fn deadline(&self) -> Option<Instant> { + match &self.flavor { + ReceiverFlavor::Array(_) => None, + ReceiverFlavor::List(_) => None, + ReceiverFlavor::Zero(_) => None, + ReceiverFlavor::At(chan) => chan.deadline(), + ReceiverFlavor::Tick(chan) => chan.deadline(), + ReceiverFlavor::Never(chan) => chan.deadline(), + } + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx), + ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx), + ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx), + ReceiverFlavor::At(chan) => chan.register(oper, cx), + ReceiverFlavor::Tick(chan) => chan.register(oper, cx), + ReceiverFlavor::Never(chan) => chan.register(oper, cx), + } + } + + fn unregister(&self, oper: Operation) { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper), + ReceiverFlavor::List(chan) => chan.receiver().unregister(oper), + ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper), + ReceiverFlavor::At(chan) => chan.unregister(oper), + ReceiverFlavor::Tick(chan) => chan.unregister(oper), + ReceiverFlavor::Never(chan) => chan.unregister(oper), + } + } + + fn accept(&self, token: &mut Token, cx: &Context) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx), + ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx), + ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx), + ReceiverFlavor::At(chan) => chan.accept(token, cx), + ReceiverFlavor::Tick(chan) => chan.accept(token, cx), + ReceiverFlavor::Never(chan) => chan.accept(token, cx), + } + } + + fn is_ready(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().is_ready(), + ReceiverFlavor::List(chan) => chan.receiver().is_ready(), + ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(), + ReceiverFlavor::At(chan) => chan.is_ready(), + ReceiverFlavor::Tick(chan) => chan.is_ready(), + ReceiverFlavor::Never(chan) => chan.is_ready(), + } + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx), + ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx), + ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx), + ReceiverFlavor::At(chan) => chan.watch(oper, cx), + ReceiverFlavor::Tick(chan) => chan.watch(oper, cx), + ReceiverFlavor::Never(chan) => chan.watch(oper, cx), + } + } + + fn unwatch(&self, oper: Operation) { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper), + ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper), + ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper), + ReceiverFlavor::At(chan) => chan.unwatch(oper), + ReceiverFlavor::Tick(chan) => chan.unwatch(oper), + ReceiverFlavor::Never(chan) => chan.unwatch(oper), + } + } +} + +/// Writes a message into the channel. +pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> { + match &s.flavor { + SenderFlavor::Array(chan) => chan.write(token, msg), + SenderFlavor::List(chan) => chan.write(token, msg), + SenderFlavor::Zero(chan) => chan.write(token, msg), + } +} + +/// Reads a message from the channel. +pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> { + match &r.flavor { + ReceiverFlavor::Array(chan) => chan.read(token), + ReceiverFlavor::List(chan) => chan.read(token), + ReceiverFlavor::Zero(chan) => chan.read(token), + ReceiverFlavor::At(chan) => { + mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) + } + ReceiverFlavor::Tick(chan) => { + mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) + } + ReceiverFlavor::Never(chan) => chan.read(token), + } +} diff --git a/third_party/rust/crossbeam-channel/src/context.rs b/third_party/rust/crossbeam-channel/src/context.rs new file mode 100644 index 0000000000..e2e8480f00 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/context.rs @@ -0,0 +1,191 @@ +//! Thread-local context used in select. + +use std::cell::Cell; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::{self, Thread, ThreadId}; +use std::time::Instant; + +use crossbeam_utils::Backoff; + +use crate::select::Selected; + +/// Thread-local context used in select. +#[derive(Debug, Clone)] +pub struct Context { + inner: Arc<Inner>, +} + +/// Inner representation of `Context`. +#[derive(Debug)] +struct Inner { + /// Selected operation. + select: AtomicUsize, + + /// A slot into which another thread may store a pointer to its `Packet`. + packet: AtomicUsize, + + /// Thread handle. + thread: Thread, + + /// Thread id. + thread_id: ThreadId, +} + +impl Context { + /// Creates a new context for the duration of the closure. + #[inline] + pub fn with<F, R>(f: F) -> R + where + F: FnOnce(&Context) -> R, + { + thread_local! { + /// Cached thread-local context. + static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); + } + + let mut f = Some(f); + let mut f = move |cx: &Context| -> R { + let f = f.take().unwrap(); + f(cx) + }; + + CONTEXT + .try_with(|cell| match cell.take() { + None => f(&Context::new()), + Some(cx) => { + cx.reset(); + let res = f(&cx); + cell.set(Some(cx)); + res + } + }) + .unwrap_or_else(|_| f(&Context::new())) + } + + /// Creates a new `Context`. + #[cold] + fn new() -> Context { + Context { + inner: Arc::new(Inner { + select: AtomicUsize::new(Selected::Waiting.into()), + packet: AtomicUsize::new(0), + thread: thread::current(), + thread_id: thread::current().id(), + }), + } + } + + /// Resets `select` and `packet`. + #[inline] + fn reset(&self) { + self.inner + .select + .store(Selected::Waiting.into(), Ordering::Release); + self.inner.packet.store(0, Ordering::Release); + } + + /// Attempts to select an operation. + /// + /// On failure, the previously selected operation is returned. + #[inline] + pub fn try_select(&self, select: Selected) -> Result<(), Selected> { + self.inner + .select + .compare_exchange( + Selected::Waiting.into(), + select.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) + .map(|_| ()) + .map_err(|e| e.into()) + } + + /// Returns the selected operation. + #[inline] + pub fn selected(&self) -> Selected { + Selected::from(self.inner.select.load(Ordering::Acquire)) + } + + /// Stores a packet. + /// + /// This method must be called after `try_select` succeeds and there is a packet to provide. + #[inline] + pub fn store_packet(&self, packet: usize) { + if packet != 0 { + self.inner.packet.store(packet, Ordering::Release); + } + } + + /// Waits until a packet is provided and returns it. + #[inline] + pub fn wait_packet(&self) -> usize { + let backoff = Backoff::new(); + loop { + let packet = self.inner.packet.load(Ordering::Acquire); + if packet != 0 { + return packet; + } + backoff.snooze(); + } + } + + /// Waits until an operation is selected and returns it. + /// + /// If the deadline is reached, `Selected::Aborted` will be selected. + #[inline] + pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { + // Spin for a short time, waiting until an operation is selected. + let backoff = Backoff::new(); + loop { + let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); + if sel != Selected::Waiting { + return sel; + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + loop { + // Check whether an operation has been selected. + let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); + if sel != Selected::Waiting { + return sel; + } + + // If there's a deadline, park the current thread until the deadline is reached. + if let Some(end) = deadline { + let now = Instant::now(); + + if now < end { + thread::park_timeout(end - now); + } else { + // The deadline has been reached. Try aborting select. + return match self.try_select(Selected::Aborted) { + Ok(()) => Selected::Aborted, + Err(s) => s, + }; + } + } else { + thread::park(); + } + } + } + + /// Unparks the thread this context belongs to. + #[inline] + pub fn unpark(&self) { + self.inner.thread.unpark(); + } + + /// Returns the id of the thread this context belongs to. + #[inline] + pub fn thread_id(&self) -> ThreadId { + self.inner.thread_id + } +} diff --git a/third_party/rust/crossbeam-channel/src/counter.rs b/third_party/rust/crossbeam-channel/src/counter.rs new file mode 100644 index 0000000000..2eaf067820 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/counter.rs @@ -0,0 +1,144 @@ +//! Reference counter for channels. + +use std::isize; +use std::ops; +use std::process; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +/// Reference counter internals. +struct Counter<C> { + /// The number of senders associated with the channel. + senders: AtomicUsize, + + /// The number of receivers associated with the channel. + receivers: AtomicUsize, + + /// Set to `true` if the last sender or the last receiver reference deallocates the channel. + destroy: AtomicBool, + + /// The internal channel. + chan: C, +} + +/// Wraps a channel into the reference counter. +pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { + let counter = Box::into_raw(Box::new(Counter { + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + destroy: AtomicBool::new(false), + chan, + })); + let s = Sender { counter }; + let r = Receiver { counter }; + (s, r) +} + +/// The sending side. +pub struct Sender<C> { + counter: *mut Counter<C>, +} + +impl<C> Sender<C> { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter<C> { + unsafe { &*self.counter } + } + + /// Acquires another sender reference. + pub fn acquire(&self) -> Sender<C> { + let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); + + // Cloning senders and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + process::abort(); + } + + Sender { + counter: self.counter, + } + } + + /// Releases the sender reference. + /// + /// Function `disconnect` will be called if this is the last sender reference. + pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { + if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(Box::from_raw(self.counter)); + } + } + } +} + +impl<C> ops::Deref for Sender<C> { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl<C> PartialEq for Sender<C> { + fn eq(&self, other: &Sender<C>) -> bool { + self.counter == other.counter + } +} + +/// The receiving side. +pub struct Receiver<C> { + counter: *mut Counter<C>, +} + +impl<C> Receiver<C> { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter<C> { + unsafe { &*self.counter } + } + + /// Acquires another receiver reference. + pub fn acquire(&self) -> Receiver<C> { + let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); + + // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + process::abort(); + } + + Receiver { + counter: self.counter, + } + } + + /// Releases the receiver reference. + /// + /// Function `disconnect` will be called if this is the last receiver reference. + pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { + if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(Box::from_raw(self.counter)); + } + } + } +} + +impl<C> ops::Deref for Receiver<C> { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl<C> PartialEq for Receiver<C> { + fn eq(&self, other: &Receiver<C>) -> bool { + self.counter == other.counter + } +} diff --git a/third_party/rust/crossbeam-channel/src/err.rs b/third_party/rust/crossbeam-channel/src/err.rs new file mode 100644 index 0000000000..578acc69ac --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/err.rs @@ -0,0 +1,382 @@ +use std::error; +use std::fmt; + +/// An error returned from the [`send`] method. +/// +/// The message could not be sent because the channel is disconnected. +/// +/// The error contains the message so it can be recovered. +/// +/// [`send`]: super::Sender::send +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError<T>(pub T); + +/// An error returned from the [`try_send`] method. +/// +/// The error contains the message being sent so it can be recovered. +/// +/// [`try_send`]: super::Sender::try_send +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum TrySendError<T> { + /// The message could not be sent because the channel is full. + /// + /// If this is a zero-capacity channel, then the error indicates that there was no receiver + /// available to receive the message at the time. + Full(T), + + /// The message could not be sent because the channel is disconnected. + Disconnected(T), +} + +/// An error returned from the [`send_timeout`] method. +/// +/// The error contains the message being sent so it can be recovered. +/// +/// [`send_timeout`]: super::Sender::send_timeout +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum SendTimeoutError<T> { + /// The message could not be sent because the channel is full and the operation timed out. + /// + /// If this is a zero-capacity channel, then the error indicates that there was no receiver + /// available to receive the message and the operation timed out. + Timeout(T), + + /// The message could not be sent because the channel is disconnected. + Disconnected(T), +} + +/// An error returned from the [`recv`] method. +/// +/// A message could not be received because the channel is empty and disconnected. +/// +/// [`recv`]: super::Receiver::recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct RecvError; + +/// An error returned from the [`try_recv`] method. +/// +/// [`try_recv`]: super::Receiver::try_recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum TryRecvError { + /// A message could not be received because the channel is empty. + /// + /// If this is a zero-capacity channel, then the error indicates that there was no sender + /// available to send a message at the time. + Empty, + + /// The message could not be received because the channel is empty and disconnected. + Disconnected, +} + +/// An error returned from the [`recv_timeout`] method. +/// +/// [`recv_timeout`]: super::Receiver::recv_timeout +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum RecvTimeoutError { + /// A message could not be received because the channel is empty and the operation timed out. + /// + /// If this is a zero-capacity channel, then the error indicates that there was no sender + /// available to send a message and the operation timed out. + Timeout, + + /// The message could not be received because the channel is empty and disconnected. + Disconnected, +} + +/// An error returned from the [`try_select`] method. +/// +/// Failed because none of the channel operations were ready. +/// +/// [`try_select`]: super::Select::try_select +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct TrySelectError; + +/// An error returned from the [`select_timeout`] method. +/// +/// Failed because none of the channel operations became ready before the timeout. +/// +/// [`select_timeout`]: super::Select::select_timeout +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct SelectTimeoutError; + +/// An error returned from the [`try_ready`] method. +/// +/// Failed because none of the channel operations were ready. +/// +/// [`try_ready`]: super::Select::try_ready +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct TryReadyError; + +/// An error returned from the [`ready_timeout`] method. +/// +/// Failed because none of the channel operations became ready before the timeout. +/// +/// [`ready_timeout`]: super::Select::ready_timeout +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct ReadyTimeoutError; + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendError(..)".fmt(f) + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a disconnected channel".fmt(f) + } +} + +impl<T: Send> error::Error for SendError<T> {} + +impl<T> SendError<T> { + /// Unwraps the message. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// drop(r); + /// + /// if let Err(err) = s.send("foo") { + /// assert_eq!(err.into_inner(), "foo"); + /// } + /// ``` + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "Full(..)".fmt(f), + TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), + } + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a disconnected channel".fmt(f), + } + } +} + +impl<T: Send> error::Error for TrySendError<T> {} + +impl<T> From<SendError<T>> for TrySendError<T> { + fn from(err: SendError<T>) -> TrySendError<T> { + match err { + SendError(t) => TrySendError::Disconnected(t), + } + } +} + +impl<T> TrySendError<T> { + /// Unwraps the message. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::bounded; + /// + /// let (s, r) = bounded(0); + /// + /// if let Err(err) = s.try_send("foo") { + /// assert_eq!(err.into_inner(), "foo"); + /// } + /// ``` + pub fn into_inner(self) -> T { + match self { + TrySendError::Full(v) => v, + TrySendError::Disconnected(v) => v, + } + } + + /// Returns `true` if the send operation failed because the channel is full. + pub fn is_full(&self) -> bool { + match self { + TrySendError::Full(_) => true, + _ => false, + } + } + + /// Returns `true` if the send operation failed because the channel is disconnected. + pub fn is_disconnected(&self) -> bool { + match self { + TrySendError::Disconnected(_) => true, + _ => false, + } + } +} + +impl<T> fmt::Debug for SendTimeoutError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendTimeoutError(..)".fmt(f) + } +} + +impl<T> fmt::Display for SendTimeoutError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f), + SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f), + } + } +} + +impl<T: Send> error::Error for SendTimeoutError<T> {} + +impl<T> From<SendError<T>> for SendTimeoutError<T> { + fn from(err: SendError<T>) -> SendTimeoutError<T> { + match err { + SendError(e) => SendTimeoutError::Disconnected(e), + } + } +} + +impl<T> SendTimeoutError<T> { + /// Unwraps the message. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// + /// if let Err(err) = s.send_timeout("foo", Duration::from_secs(1)) { + /// assert_eq!(err.into_inner(), "foo"); + /// } + /// ``` + pub fn into_inner(self) -> T { + match self { + SendTimeoutError::Timeout(v) => v, + SendTimeoutError::Disconnected(v) => v, + } + } + + /// Returns `true` if the send operation timed out. + pub fn is_timeout(&self) -> bool { + match self { + SendTimeoutError::Timeout(_) => true, + _ => false, + } + } + + /// Returns `true` if the send operation failed because the channel is disconnected. + pub fn is_disconnected(&self) -> bool { + match self { + SendTimeoutError::Disconnected(_) => true, + _ => false, + } + } +} + +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on an empty and disconnected channel".fmt(f) + } +} + +impl error::Error for RecvError {} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on an empty and disconnected channel".fmt(f), + } + } +} + +impl error::Error for TryRecvError {} + +impl From<RecvError> for TryRecvError { + fn from(err: RecvError) -> TryRecvError { + match err { + RecvError => TryRecvError::Disconnected, + } + } +} + +impl TryRecvError { + /// Returns `true` if the receive operation failed because the channel is empty. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn is_empty(&self) -> bool { + match self { + TryRecvError::Empty => true, + _ => false, + } + } + + /// Returns `true` if the receive operation failed because the channel is disconnected. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn is_disconnected(&self) -> bool { + match self { + TryRecvError::Disconnected => true, + _ => false, + } + } +} + +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on receive operation".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and disconnected".fmt(f), + } + } +} + +impl error::Error for RecvTimeoutError {} + +impl From<RecvError> for RecvTimeoutError { + fn from(err: RecvError) -> RecvTimeoutError { + match err { + RecvError => RecvTimeoutError::Disconnected, + } + } +} + +impl RecvTimeoutError { + /// Returns `true` if the receive operation timed out. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn is_timeout(&self) -> bool { + match self { + RecvTimeoutError::Timeout => true, + _ => false, + } + } + + /// Returns `true` if the receive operation failed because the channel is disconnected. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn is_disconnected(&self) -> bool { + match self { + RecvTimeoutError::Disconnected => true, + _ => false, + } + } +} + +impl fmt::Display for TrySelectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "all operations in select would block".fmt(f) + } +} + +impl error::Error for TrySelectError {} + +impl fmt::Display for SelectTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "timed out waiting on select".fmt(f) + } +} + +impl error::Error for SelectTimeoutError {} diff --git a/third_party/rust/crossbeam-channel/src/flavors/array.rs b/third_party/rust/crossbeam-channel/src/flavors/array.rs new file mode 100644 index 0000000000..323a200c25 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/array.rs @@ -0,0 +1,641 @@ +//! Bounded channel based on a preallocated array. +//! +//! This flavor has a fixed, positive capacity. +//! +//! The implementation is based on Dmitry Vyukov's bounded MPMC queue. +//! +//! Source: +//! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +//! - https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub +//! +//! Copyright & License: +//! - Copyright (c) 2010-2011 Dmitry Vyukov +//! - Simplified BSD License and Apache License, Version 2.0 +//! - http://www.1024cores.net/home/code-license + +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::mem::{self, MaybeUninit}; +use std::ptr; +use std::sync::atomic::{self, AtomicUsize, Ordering}; +use std::time::Instant; + +use crossbeam_utils::{Backoff, CachePadded}; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::select::{Operation, SelectHandle, Selected, Token}; +use crate::waker::SyncWaker; + +/// A slot in a channel. +struct Slot<T> { + /// The current stamp. + stamp: AtomicUsize, + + /// The message in this slot. + msg: UnsafeCell<MaybeUninit<T>>, +} + +/// The token type for the array flavor. +#[derive(Debug)] +pub struct ArrayToken { + /// Slot to read from or write to. + slot: *const u8, + + /// Stamp to store into the slot after reading or writing. + stamp: usize, +} + +impl Default for ArrayToken { + #[inline] + fn default() -> Self { + ArrayToken { + slot: ptr::null(), + stamp: 0, + } + } +} + +/// Bounded channel based on a preallocated array. +pub struct Channel<T> { + /// The head of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit in the head is always zero. + /// + /// Messages are popped from the head of the channel. + head: CachePadded<AtomicUsize>, + + /// The tail of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit indicates that the channel is disconnected. + /// + /// Messages are pushed into the tail of the channel. + tail: CachePadded<AtomicUsize>, + + /// The buffer holding slots. + buffer: *mut Slot<T>, + + /// The channel capacity. + cap: usize, + + /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. + one_lap: usize, + + /// If this bit is set in the tail, that means the channel is disconnected. + mark_bit: usize, + + /// Senders waiting while the channel is full. + senders: SyncWaker, + + /// Receivers waiting while the channel is empty and not disconnected. + receivers: SyncWaker, + + /// Indicates that dropping a `Channel<T>` may drop values of type `T`. + _marker: PhantomData<T>, +} + +impl<T> Channel<T> { + /// Creates a bounded channel of capacity `cap`. + pub fn with_capacity(cap: usize) -> Self { + assert!(cap > 0, "capacity must be positive"); + + // Compute constants `mark_bit` and `one_lap`. + let mark_bit = (cap + 1).next_power_of_two(); + let one_lap = mark_bit * 2; + + // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let head = 0; + // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let tail = 0; + + // Allocate a buffer of `cap` slots initialized + // with stamps. + let buffer = { + let mut boxed: Box<[Slot<T>]> = (0..cap) + .map(|i| { + // Set the stamp to `{ lap: 0, mark: 0, index: i }`. + Slot { + stamp: AtomicUsize::new(i), + msg: UnsafeCell::new(MaybeUninit::uninit()), + } + }) + .collect(); + let ptr = boxed.as_mut_ptr(); + mem::forget(boxed); + ptr + }; + + Channel { + buffer, + cap, + one_lap, + mark_bit, + head: CachePadded::new(AtomicUsize::new(head)), + tail: CachePadded::new(AtomicUsize::new(tail)), + senders: SyncWaker::new(), + receivers: SyncWaker::new(), + _marker: PhantomData, + } + } + + /// Returns a receiver handle to the channel. + pub fn receiver(&self) -> Receiver<'_, T> { + Receiver(self) + } + + /// Returns a sender handle to the channel. + pub fn sender(&self) -> Sender<'_, T> { + Sender(self) + } + + /// Attempts to reserve a slot for sending a message. + fn start_send(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut tail = self.tail.load(Ordering::Relaxed); + + loop { + // Check if the channel is disconnected. + if tail & self.mark_bit != 0 { + token.array.slot = ptr::null(); + token.array.stamp = 0; + return true; + } + + // Deconstruct the tail. + let index = tail & (self.mark_bit - 1); + let lap = tail & !(self.one_lap - 1); + + // Inspect the corresponding slot. + let slot = unsafe { &*self.buffer.add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the tail and the stamp match, we may attempt to push. + if tail == stamp { + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the tail. + match self.tail.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Prepare the token for the follow-up call to `write`. + token.array.slot = slot as *const Slot<T> as *const u8; + token.array.stamp = tail + 1; + return true; + } + Err(t) => { + tail = t; + backoff.spin(); + } + } + } else if stamp.wrapping_add(self.one_lap) == tail + 1 { + atomic::fence(Ordering::SeqCst); + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the channel is full. + return false; + } + + backoff.spin(); + tail = self.tail.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + tail = self.tail.load(Ordering::Relaxed); + } + } + } + + /// Writes a message into the channel. + pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no slot, the channel is disconnected. + if token.array.slot.is_null() { + return Err(msg); + } + + let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + + // Write the message into the slot and update the stamp. + slot.msg.get().write(MaybeUninit::new(msg)); + slot.stamp.store(token.array.stamp, Ordering::Release); + + // Wake a sleeping receiver. + self.receivers.notify(); + Ok(()) + } + + /// Attempts to reserve a slot for receiving a message. + fn start_recv(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut head = self.head.load(Ordering::Relaxed); + + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + let slot = unsafe { &*self.buffer.add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the the stamp is ahead of the head by 1, we may attempt to pop. + if head + 1 == stamp { + let new = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the head. + match self.head.compare_exchange_weak( + head, + new, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Prepare the token for the follow-up call to `read`. + token.array.slot = slot as *const Slot<T> as *const u8; + token.array.stamp = head.wrapping_add(self.one_lap); + return true; + } + Err(h) => { + head = h; + backoff.spin(); + } + } + } else if stamp == head { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if (tail & !self.mark_bit) == head { + // If the channel is disconnected... + if tail & self.mark_bit != 0 { + // ...then receive an error. + token.array.slot = ptr::null(); + token.array.stamp = 0; + return true; + } else { + // Otherwise, the receive operation is not ready. + return false; + } + } + + backoff.spin(); + head = self.head.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + head = self.head.load(Ordering::Relaxed); + } + } + } + + /// Reads a message from the channel. + pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + if token.array.slot.is_null() { + // The channel is disconnected. + return Err(()); + } + + let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + + // Read the message from the slot and update the stamp. + let msg = slot.msg.get().read().assume_init(); + slot.stamp.store(token.array.stamp, Ordering::Release); + + // Wake a sleeping sender. + self.senders.notify(); + Ok(msg) + } + + /// Attempts to send a message into the channel. + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + let token = &mut Token::default(); + if self.start_send(token) { + unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } + } else { + Err(TrySendError::Full(msg)) + } + } + + /// Sends a message into the channel. + pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + let token = &mut Token::default(); + loop { + // Try sending a message several times. + let backoff = Backoff::new(); + loop { + if self.start_send(token) { + let res = unsafe { self.write(token, msg) }; + return res.map_err(SendTimeoutError::Disconnected); + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(SendTimeoutError::Timeout(msg)); + } + } + + Context::with(|cx| { + // Prepare for blocking until a receiver wakes us up. + let oper = Operation::hook(token); + self.senders.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_full() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.senders.unregister(oper).unwrap(); + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Attempts to receive a message without blocking. + pub fn try_recv(&self) -> Result<T, TryRecvError> { + let token = &mut Token::default(); + + if self.start_recv(token) { + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + let token = &mut Token::default(); + loop { + // Try receiving a message several times. + let backoff = Backoff::new(); + loop { + if self.start_recv(token) { + let res = unsafe { self.read(token) }; + return res.map_err(|_| RecvTimeoutError::Disconnected); + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(RecvTimeoutError::Timeout); + } + } + + Context::with(|cx| { + // Prepare for blocking until a sender wakes us up. + let oper = Operation::hook(token); + self.receivers.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_empty() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unregister(oper).unwrap(); + // If the channel was disconnected, we still have to check for remaining + // messages. + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Returns the current number of messages inside the channel. + pub fn len(&self) -> usize { + loop { + // Load the tail, then load the head. + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // If the tail didn't change, we've got consistent values to work with. + if self.tail.load(Ordering::SeqCst) == tail { + let hix = head & (self.mark_bit - 1); + let tix = tail & (self.mark_bit - 1); + + return if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if (tail & !self.mark_bit) == head { + 0 + } else { + self.cap + }; + } + } + } + + /// Returns the capacity of the channel. + pub fn capacity(&self) -> Option<usize> { + Some(self.cap) + } + + /// Disconnects the channel and wakes up all blocked senders and receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub fn disconnect(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + + if tail & self.mark_bit == 0 { + self.senders.disconnect(); + self.receivers.disconnect(); + true + } else { + false + } + } + + /// Returns `true` if the channel is disconnected. + pub fn is_disconnected(&self) -> bool { + self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 + } + + /// Returns `true` if the channel is empty. + pub fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::SeqCst); + let tail = self.tail.load(Ordering::SeqCst); + + // Is the tail equal to the head? + // + // Note: If the head changes just before we load the tail, that means there was a moment + // when the channel was not empty, so it is safe to just return `false`. + (tail & !self.mark_bit) == head + } + + /// Returns `true` if the channel is full. + pub fn is_full(&self) -> bool { + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // Is the head lagging one lap behind tail? + // + // Note: If the tail changes just before we load the head, that means there was a moment + // when the channel was not full, so it is safe to just return `false`. + head.wrapping_add(self.one_lap) == tail & !self.mark_bit + } +} + +impl<T> Drop for Channel<T> { + fn drop(&mut self) { + // Get the index of the head. + let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); + + // Loop over all slots that hold a message and drop them. + for i in 0..self.len() { + // Compute the index of the next slot holding a message. + let index = if hix + i < self.cap { + hix + i + } else { + hix + i - self.cap + }; + + unsafe { + let p = { + let slot = &mut *self.buffer.add(index); + let msg = &mut *slot.msg.get(); + msg.as_mut_ptr() + }; + p.drop_in_place(); + } + } + + // Finally, deallocate the buffer, but don't run any destructors. + unsafe { + // Create a slice from the buffer to make + // a fat pointer. Then, use Box::from_raw + // to deallocate it. + let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; + Box::from_raw(ptr); + } + } +} + +/// Receiver handle to a channel. +pub struct Receiver<'a, T>(&'a Channel<T>); + +/// Sender handle to a channel. +pub struct Sender<'a, T>(&'a Channel<T>); + +impl<T> SelectHandle for Receiver<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_recv(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + self.0.receivers.register(oper, cx); + self.is_ready() + } + + fn unregister(&self, oper: Operation) { + self.0.receivers.unregister(oper); + } + + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + fn is_ready(&self) -> bool { + !self.0.is_empty() || self.0.is_disconnected() + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + self.0.receivers.watch(oper, cx); + self.is_ready() + } + + fn unwatch(&self, oper: Operation) { + self.0.receivers.unwatch(oper); + } +} + +impl<T> SelectHandle for Sender<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_send(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + self.0.senders.register(oper, cx); + self.is_ready() + } + + fn unregister(&self, oper: Operation) { + self.0.senders.unregister(oper); + } + + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + fn is_ready(&self) -> bool { + !self.0.is_full() || self.0.is_disconnected() + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + self.0.senders.watch(oper, cx); + self.is_ready() + } + + fn unwatch(&self, oper: Operation) { + self.0.senders.unwatch(oper); + } +} diff --git a/third_party/rust/crossbeam-channel/src/flavors/at.rs b/third_party/rust/crossbeam-channel/src/flavors/at.rs new file mode 100644 index 0000000000..a2b1b578ec --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/at.rs @@ -0,0 +1,202 @@ +//! Channel that delivers a message at a certain moment in time. +//! +//! Messages cannot be sent into this kind of channel; they are materialized on demand. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::{Duration, Instant}; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, TryRecvError}; +use crate::select::{Operation, SelectHandle, Token}; +use crate::utils; + +/// Result of a receive operation. +pub type AtToken = Option<Instant>; + +/// Channel that delivers a message at a certain moment in time +pub struct Channel { + /// The instant at which the message will be delivered. + delivery_time: Instant, + + /// `true` if the message has been received. + received: AtomicBool, +} + +impl Channel { + /// Creates a channel that delivers a message at a certain instant in time. + #[inline] + pub fn new_deadline(when: Instant) -> Self { + Channel { + delivery_time: when, + received: AtomicBool::new(false), + } + } + /// Creates a channel that delivers a message after a certain duration of time. + #[inline] + pub fn new_timeout(dur: Duration) -> Self { + Self::new_deadline(Instant::now() + dur) + } + + /// Attempts to receive a message without blocking. + #[inline] + pub fn try_recv(&self) -> Result<Instant, TryRecvError> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + // The message has already been received. + return Err(TryRecvError::Empty); + } + + if Instant::now() < self.delivery_time { + // The message was not delivered yet. + return Err(TryRecvError::Empty); + } + + // Try receiving the message if it is still available. + if !self.received.swap(true, Ordering::SeqCst) { + // Success! Return delivery time as the message. + Ok(self.delivery_time) + } else { + // The message was already received. + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + #[inline] + pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + // The message has already been received. + utils::sleep_until(deadline); + return Err(RecvTimeoutError::Timeout); + } + + // Wait until the message is received or the deadline is reached. + loop { + let now = Instant::now(); + + let deadline = match deadline { + // Check if we can receive the next message. + _ if now >= self.delivery_time => break, + // Check if the timeout deadline has been reached. + Some(d) if now >= d => return Err(RecvTimeoutError::Timeout), + + // Sleep until one of the above happens + Some(d) if d < self.delivery_time => d, + _ => self.delivery_time, + }; + + thread::sleep(deadline - now); + } + + // Try receiving the message if it is still available. + if !self.received.swap(true, Ordering::SeqCst) { + // Success! Return the message, which is the instant at which it was delivered. + Ok(self.delivery_time) + } else { + // The message was already received. Block forever. + utils::sleep_until(None); + unreachable!() + } + } + + /// Reads a message from the channel. + #[inline] + pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + token.at.ok_or(()) + } + + /// Returns `true` if the channel is empty. + #[inline] + pub fn is_empty(&self) -> bool { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + return true; + } + + // If the delivery time hasn't been reached yet, the channel is empty. + if Instant::now() < self.delivery_time { + return true; + } + + // The delivery time has been reached. The channel is empty only if the message has already + // been received. + self.received.load(Ordering::SeqCst) + } + + /// Returns `true` if the channel is full. + #[inline] + pub fn is_full(&self) -> bool { + !self.is_empty() + } + + /// Returns the number of messages in the channel. + #[inline] + pub fn len(&self) -> usize { + if self.is_empty() { + 0 + } else { + 1 + } + } + + /// Returns the capacity of the channel. + #[inline] + pub fn capacity(&self) -> Option<usize> { + Some(1) + } +} + +impl SelectHandle for Channel { + #[inline] + fn try_select(&self, token: &mut Token) -> bool { + match self.try_recv() { + Ok(msg) => { + token.at = Some(msg); + true + } + Err(TryRecvError::Disconnected) => { + token.at = None; + true + } + Err(TryRecvError::Empty) => false, + } + } + + #[inline] + fn deadline(&self) -> Option<Instant> { + // We use relaxed ordering because this is just an optional optimistic check. + if self.received.load(Ordering::Relaxed) { + None + } else { + Some(self.delivery_time) + } + } + + #[inline] + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unregister(&self, _oper: Operation) {} + + #[inline] + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + #[inline] + fn is_ready(&self) -> bool { + !self.is_empty() + } + + #[inline] + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unwatch(&self, _oper: Operation) {} +} diff --git a/third_party/rust/crossbeam-channel/src/flavors/list.rs b/third_party/rust/crossbeam-channel/src/flavors/list.rs new file mode 100644 index 0000000000..532e8b6ad4 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/list.rs @@ -0,0 +1,669 @@ +//! Unbounded channel implemented as a linked list. + +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ptr; +use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; +use std::time::Instant; + +use crossbeam_utils::{Backoff, CachePadded}; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::select::{Operation, SelectHandle, Selected, Token}; +use crate::waker::SyncWaker; + +// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the +// following changes by @kleimkuhler: +// +// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100 +// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101 + +// Bits indicating the state of a slot: +// * If a message has been written into the slot, `WRITE` is set. +// * If a message has been read from the slot, `READ` is set. +// * If the block is being destroyed, `DESTROY` is set. +const WRITE: usize = 1; +const READ: usize = 2; +const DESTROY: usize = 4; + +// Each block covers one "lap" of indices. +const LAP: usize = 32; +// The maximum number of messages a block can hold. +const BLOCK_CAP: usize = LAP - 1; +// How many lower bits are reserved for metadata. +const SHIFT: usize = 1; +// Has two different purposes: +// * If set in head, indicates that the block is not the last one. +// * If set in tail, indicates that the channel is disconnected. +const MARK_BIT: usize = 1; + +/// A slot in a block. +struct Slot<T> { + /// The message. + msg: UnsafeCell<MaybeUninit<T>>, + + /// The state of the slot. + state: AtomicUsize, +} + +impl<T> Slot<T> { + /// Waits until a message is written into the slot. + fn wait_write(&self) { + let backoff = Backoff::new(); + while self.state.load(Ordering::Acquire) & WRITE == 0 { + backoff.snooze(); + } + } +} + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` messages. +struct Block<T> { + /// The next block in the linked list. + next: AtomicPtr<Block<T>>, + + /// Slots for messages. + slots: [Slot<T>; BLOCK_CAP], +} + +impl<T> Block<T> { + /// Creates an empty block. + fn new() -> Block<T> { + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + unsafe { MaybeUninit::zeroed().assume_init() } + } + + /// Waits until the next pointer is set. + fn wait_next(&self) -> *mut Block<T> { + let backoff = Backoff::new(); + loop { + let next = self.next.load(Ordering::Acquire); + if !next.is_null() { + return next; + } + backoff.snooze(); + } + } + + /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. + unsafe fn destroy(this: *mut Block<T>, start: usize) { + // It is not necessary to set the `DESTROY` bit in the last slot because that slot has + // begun destruction of the block. + for i in start..BLOCK_CAP - 1 { + let slot = (*this).slots.get_unchecked(i); + + // Mark the `DESTROY` bit if a thread is still using the slot. + if slot.state.load(Ordering::Acquire) & READ == 0 + && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 + { + // If a thread is still using the slot, it will continue destruction of the block. + return; + } + } + + // No thread is using the block, now it is safe to destroy it. + drop(Box::from_raw(this)); + } +} + +/// A position in a channel. +#[derive(Debug)] +struct Position<T> { + /// The index in the channel. + index: AtomicUsize, + + /// The block in the linked list. + block: AtomicPtr<Block<T>>, +} + +/// The token type for the list flavor. +#[derive(Debug)] +pub struct ListToken { + /// The block of slots. + block: *const u8, + + /// The offset into the block. + offset: usize, +} + +impl Default for ListToken { + #[inline] + fn default() -> Self { + ListToken { + block: ptr::null(), + offset: 0, + } + } +} + +/// Unbounded channel implemented as a linked list. +/// +/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are +/// represented as numbers of type `usize` and wrap on overflow. +/// +/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and +/// improve cache efficiency. +pub struct Channel<T> { + /// The head of the channel. + head: CachePadded<Position<T>>, + + /// The tail of the channel. + tail: CachePadded<Position<T>>, + + /// Receivers waiting while the channel is empty and not disconnected. + receivers: SyncWaker, + + /// Indicates that dropping a `Channel<T>` may drop messages of type `T`. + _marker: PhantomData<T>, +} + +impl<T> Channel<T> { + /// Creates a new unbounded channel. + pub fn new() -> Self { + Channel { + head: CachePadded::new(Position { + block: AtomicPtr::new(ptr::null_mut()), + index: AtomicUsize::new(0), + }), + tail: CachePadded::new(Position { + block: AtomicPtr::new(ptr::null_mut()), + index: AtomicUsize::new(0), + }), + receivers: SyncWaker::new(), + _marker: PhantomData, + } + } + + /// Returns a receiver handle to the channel. + pub fn receiver(&self) -> Receiver<'_, T> { + Receiver(self) + } + + /// Returns a sender handle to the channel. + pub fn sender(&self) -> Sender<'_, T> { + Sender(self) + } + + /// Attempts to reserve a slot for sending a message. + fn start_send(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + let mut block = self.tail.block.load(Ordering::Acquire); + let mut next_block = None; + + loop { + // Check if the channel is disconnected. + if tail & MARK_BIT != 0 { + token.list.block = ptr::null(); + return true; + } + + // Calculate the offset of the index into the block. + let offset = (tail >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + continue; + } + + // If we're going to have to install the next block, allocate it in advance in order to + // make the wait for other threads as short as possible. + if offset + 1 == BLOCK_CAP && next_block.is_none() { + next_block = Some(Box::new(Block::<T>::new())); + } + + // If this is the first message to be sent into the channel, we need to allocate the + // first block and install it. + if block.is_null() { + let new = Box::into_raw(Box::new(Block::<T>::new())); + + if self + .tail + .block + .compare_and_swap(block, new, Ordering::Release) + == block + { + self.head.block.store(new, Ordering::Release); + block = new; + } else { + next_block = unsafe { Some(Box::from_raw(new)) }; + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + continue; + } + } + + let new_tail = tail + (1 << SHIFT); + + // Try advancing the tail forward. + match self.tail.index.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Acquire, + ) { + Ok(_) => unsafe { + // If we've reached the end of the block, install the next one. + if offset + 1 == BLOCK_CAP { + let next_block = Box::into_raw(next_block.unwrap()); + self.tail.block.store(next_block, Ordering::Release); + self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); + (*block).next.store(next_block, Ordering::Release); + } + + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + }, + Err(t) => { + tail = t; + block = self.tail.block.load(Ordering::Acquire); + backoff.spin(); + } + } + } + } + + /// Writes a message into the channel. + pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no slot, the channel is disconnected. + if token.list.block.is_null() { + return Err(msg); + } + + // Write the message into the slot. + let block = token.list.block as *mut Block<T>; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.msg.get().write(MaybeUninit::new(msg)); + slot.state.fetch_or(WRITE, Ordering::Release); + + // Wake a sleeping receiver. + self.receivers.notify(); + Ok(()) + } + + /// Attempts to reserve a slot for receiving a message. + fn start_recv(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); + + loop { + // Calculate the offset of the index into the block. + let offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + continue; + } + + let mut new_head = head + (1 << SHIFT); + + if new_head & MARK_BIT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if head >> SHIFT == tail >> SHIFT { + // If the channel is disconnected... + if tail & MARK_BIT != 0 { + // ...then receive an error. + token.list.block = ptr::null(); + return true; + } else { + // Otherwise, the receive operation is not ready. + return false; + } + } + + // If head and tail are not in the same block, set `MARK_BIT` in head. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= MARK_BIT; + } + } + + // The block can be null here only if the first message is being sent into the channel. + // In that case, just wait until it gets initialized. + if block.is_null() { + backoff.snooze(); + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + continue; + } + + // Try moving the head index forward. + match self.head.index.compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) { + Ok(_) => unsafe { + // If we've reached the end of the block, move to the next one. + if offset + 1 == BLOCK_CAP { + let next = (*block).wait_next(); + let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= MARK_BIT; + } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + }, + Err(h) => { + head = h; + block = self.head.block.load(Ordering::Acquire); + backoff.spin(); + } + } + } + } + + /// Reads a message from the channel. + pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + if token.list.block.is_null() { + // The channel is disconnected. + return Err(()); + } + + // Read the message. + let block = token.list.block as *mut Block<T>; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let msg = slot.msg.get().read().assume_init(); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy but + // couldn't because we were busy reading from the slot. + if offset + 1 == BLOCK_CAP { + Block::destroy(block, 0); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset + 1); + } + + Ok(msg) + } + + /// Attempts to send a message into the channel. + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + self.send(msg, None).map_err(|err| match err { + SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), + SendTimeoutError::Timeout(_) => unreachable!(), + }) + } + + /// Sends a message into the channel. + pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + let token = &mut Token::default(); + assert!(self.start_send(token)); + unsafe { + self.write(token, msg) + .map_err(SendTimeoutError::Disconnected) + } + } + + /// Attempts to receive a message without blocking. + pub fn try_recv(&self) -> Result<T, TryRecvError> { + let token = &mut Token::default(); + + if self.start_recv(token) { + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + let token = &mut Token::default(); + loop { + // Try receiving a message several times. + let backoff = Backoff::new(); + loop { + if self.start_recv(token) { + unsafe { + return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + } + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(RecvTimeoutError::Timeout); + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + self.receivers.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_empty() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unregister(oper).unwrap(); + // If the channel was disconnected, we still have to check for remaining + // messages. + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Returns the current number of messages inside the channel. + pub fn len(&self) -> usize { + loop { + // Load the tail index, then load the head index. + let mut tail = self.tail.index.load(Ordering::SeqCst); + let mut head = self.head.index.load(Ordering::SeqCst); + + // If the tail index didn't change, we've got consistent indices to work with. + if self.tail.index.load(Ordering::SeqCst) == tail { + // Erase the lower bits. + tail &= !((1 << SHIFT) - 1); + head &= !((1 << SHIFT) - 1); + + // Fix up indices if they fall onto block ends. + if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { + tail = tail.wrapping_add(1 << SHIFT); + } + if (head >> SHIFT) & (LAP - 1) == LAP - 1 { + head = head.wrapping_add(1 << SHIFT); + } + + // Rotate indices so that head falls into the first block. + let lap = (head >> SHIFT) / LAP; + tail = tail.wrapping_sub((lap * LAP) << SHIFT); + head = head.wrapping_sub((lap * LAP) << SHIFT); + + // Remove the lower bits. + tail >>= SHIFT; + head >>= SHIFT; + + // Return the difference minus the number of blocks between tail and head. + return tail - head - tail / LAP; + } + } + } + + /// Returns the capacity of the channel. + pub fn capacity(&self) -> Option<usize> { + None + } + + /// Disconnects the channel and wakes up all blocked receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub fn disconnect(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + self.receivers.disconnect(); + true + } else { + false + } + } + + /// Returns `true` if the channel is disconnected. + pub fn is_disconnected(&self) -> bool { + self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 + } + + /// Returns `true` if the channel is empty. + pub fn is_empty(&self) -> bool { + let head = self.head.index.load(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::SeqCst); + head >> SHIFT == tail >> SHIFT + } + + /// Returns `true` if the channel is full. + pub fn is_full(&self) -> bool { + false + } +} + +impl<T> Drop for Channel<T> { + fn drop(&mut self) { + let mut head = self.head.index.load(Ordering::Relaxed); + let mut tail = self.tail.index.load(Ordering::Relaxed); + let mut block = self.head.block.load(Ordering::Relaxed); + + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Relaxed); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + } +} + +/// Receiver handle to a channel. +pub struct Receiver<'a, T>(&'a Channel<T>); + +/// Sender handle to a channel. +pub struct Sender<'a, T>(&'a Channel<T>); + +impl<T> SelectHandle for Receiver<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_recv(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + self.0.receivers.register(oper, cx); + self.is_ready() + } + + fn unregister(&self, oper: Operation) { + self.0.receivers.unregister(oper); + } + + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + fn is_ready(&self) -> bool { + !self.0.is_empty() || self.0.is_disconnected() + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + self.0.receivers.watch(oper, cx); + self.is_ready() + } + + fn unwatch(&self, oper: Operation) { + self.0.receivers.unwatch(oper); + } +} + +impl<T> SelectHandle for Sender<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_send(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + fn unregister(&self, _oper: Operation) {} + + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + fn is_ready(&self) -> bool { + true + } + + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + fn unwatch(&self, _oper: Operation) {} +} diff --git a/third_party/rust/crossbeam-channel/src/flavors/mod.rs b/third_party/rust/crossbeam-channel/src/flavors/mod.rs new file mode 100644 index 0000000000..299e78f696 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/mod.rs @@ -0,0 +1,17 @@ +//! Channel flavors. +//! +//! There are six flavors: +//! +//! 1. `at` - Channel that delivers a message after a certain amount of time. +//! 2. `array` - Bounded channel based on a preallocated array. +//! 3. `list` - Unbounded channel implemented as a linked list. +//! 4. `never` - Channel that never delivers messages. +//! 5. `tick` - Channel that delivers messages periodically. +//! 6. `zero` - Zero-capacity channel. + +pub mod array; +pub mod at; +pub mod list; +pub mod never; +pub mod tick; +pub mod zero; diff --git a/third_party/rust/crossbeam-channel/src/flavors/never.rs b/third_party/rust/crossbeam-channel/src/flavors/never.rs new file mode 100644 index 0000000000..e49d2147c4 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/never.rs @@ -0,0 +1,110 @@ +//! Channel that never delivers messages. +//! +//! Messages cannot be sent into this kind of channel. + +use std::marker::PhantomData; +use std::time::Instant; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, TryRecvError}; +use crate::select::{Operation, SelectHandle, Token}; +use crate::utils; + +/// This flavor doesn't need a token. +pub type NeverToken = (); + +/// Channel that never delivers messages. +pub struct Channel<T> { + _marker: PhantomData<T>, +} + +impl<T> Channel<T> { + /// Creates a channel that never delivers messages. + #[inline] + pub fn new() -> Self { + Channel { + _marker: PhantomData, + } + } + + /// Attempts to receive a message without blocking. + #[inline] + pub fn try_recv(&self) -> Result<T, TryRecvError> { + Err(TryRecvError::Empty) + } + + /// Receives a message from the channel. + #[inline] + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + utils::sleep_until(deadline); + Err(RecvTimeoutError::Timeout) + } + + /// Reads a message from the channel. + #[inline] + pub unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> { + Err(()) + } + + /// Returns `true` if the channel is empty. + #[inline] + pub fn is_empty(&self) -> bool { + true + } + + /// Returns `true` if the channel is full. + #[inline] + pub fn is_full(&self) -> bool { + true + } + + /// Returns the number of messages in the channel. + #[inline] + pub fn len(&self) -> usize { + 0 + } + + /// Returns the capacity of the channel. + #[inline] + pub fn capacity(&self) -> Option<usize> { + Some(0) + } +} + +impl<T> SelectHandle for Channel<T> { + #[inline] + fn try_select(&self, _token: &mut Token) -> bool { + false + } + + #[inline] + fn deadline(&self) -> Option<Instant> { + None + } + + #[inline] + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unregister(&self, _oper: Operation) {} + + #[inline] + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + #[inline] + fn is_ready(&self) -> bool { + false + } + + #[inline] + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unwatch(&self, _oper: Operation) {} +} diff --git a/third_party/rust/crossbeam-channel/src/flavors/tick.rs b/third_party/rust/crossbeam-channel/src/flavors/tick.rs new file mode 100644 index 0000000000..e8e7020ca8 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/tick.rs @@ -0,0 +1,167 @@ +//! Channel that delivers messages periodically. +//! +//! Messages cannot be sent into this kind of channel; they are materialized on demand. + +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_utils::atomic::AtomicCell; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, TryRecvError}; +use crate::select::{Operation, SelectHandle, Token}; + +/// Result of a receive operation. +pub type TickToken = Option<Instant>; + +/// Channel that delivers messages periodically. +pub struct Channel { + /// The instant at which the next message will be delivered. + delivery_time: AtomicCell<Instant>, + + /// The time interval in which messages get delivered. + duration: Duration, +} + +impl Channel { + /// Creates a channel that delivers messages periodically. + #[inline] + pub fn new(dur: Duration) -> Self { + Channel { + delivery_time: AtomicCell::new(Instant::now() + dur), + duration: dur, + } + } + + /// Attempts to receive a message without blocking. + #[inline] + pub fn try_recv(&self) -> Result<Instant, TryRecvError> { + loop { + let now = Instant::now(); + let delivery_time = self.delivery_time.load(); + + if now < delivery_time { + return Err(TryRecvError::Empty); + } + + if self + .delivery_time + .compare_exchange(delivery_time, now + self.duration) + .is_ok() + { + return Ok(delivery_time); + } + } + } + + /// Receives a message from the channel. + #[inline] + pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + loop { + let delivery_time = self.delivery_time.load(); + let now = Instant::now(); + + if let Some(d) = deadline { + if d < delivery_time { + if now < d { + thread::sleep(d - now); + } + return Err(RecvTimeoutError::Timeout); + } + } + + if self + .delivery_time + .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) + .is_ok() + { + if now < delivery_time { + thread::sleep(delivery_time - now); + } + return Ok(delivery_time); + } + } + } + + /// Reads a message from the channel. + #[inline] + pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + token.tick.ok_or(()) + } + + /// Returns `true` if the channel is empty. + #[inline] + pub fn is_empty(&self) -> bool { + Instant::now() < self.delivery_time.load() + } + + /// Returns `true` if the channel is full. + #[inline] + pub fn is_full(&self) -> bool { + !self.is_empty() + } + + /// Returns the number of messages in the channel. + #[inline] + pub fn len(&self) -> usize { + if self.is_empty() { + 0 + } else { + 1 + } + } + + /// Returns the capacity of the channel. + #[inline] + pub fn capacity(&self) -> Option<usize> { + Some(1) + } +} + +impl SelectHandle for Channel { + #[inline] + fn try_select(&self, token: &mut Token) -> bool { + match self.try_recv() { + Ok(msg) => { + token.tick = Some(msg); + true + } + Err(TryRecvError::Disconnected) => { + token.tick = None; + true + } + Err(TryRecvError::Empty) => false, + } + } + + #[inline] + fn deadline(&self) -> Option<Instant> { + Some(self.delivery_time.load()) + } + + #[inline] + fn register(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unregister(&self, _oper: Operation) {} + + #[inline] + fn accept(&self, token: &mut Token, _cx: &Context) -> bool { + self.try_select(token) + } + + #[inline] + fn is_ready(&self) -> bool { + !self.is_empty() + } + + #[inline] + fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + self.is_ready() + } + + #[inline] + fn unwatch(&self, _oper: Operation) {} +} diff --git a/third_party/rust/crossbeam-channel/src/flavors/zero.rs b/third_party/rust/crossbeam-channel/src/flavors/zero.rs new file mode 100644 index 0000000000..be647b55c8 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/flavors/zero.rs @@ -0,0 +1,466 @@ +//! Zero-capacity channel. +//! +//! This kind of channel is also known as *rendezvous* channel. + +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Instant; + +use crossbeam_utils::Backoff; + +use crate::context::Context; +use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::select::{Operation, SelectHandle, Selected, Token}; +use crate::utils::Spinlock; +use crate::waker::Waker; + +/// A pointer to a packet. +pub type ZeroToken = usize; + +/// A slot for passing one message from a sender to a receiver. +struct Packet<T> { + /// Equals `true` if the packet is allocated on the stack. + on_stack: bool, + + /// Equals `true` once the packet is ready for reading or writing. + ready: AtomicBool, + + /// The message. + msg: UnsafeCell<Option<T>>, +} + +impl<T> Packet<T> { + /// Creates an empty packet on the stack. + fn empty_on_stack() -> Packet<T> { + Packet { + on_stack: true, + ready: AtomicBool::new(false), + msg: UnsafeCell::new(None), + } + } + + /// Creates an empty packet on the heap. + fn empty_on_heap() -> Box<Packet<T>> { + Box::new(Packet { + on_stack: false, + ready: AtomicBool::new(false), + msg: UnsafeCell::new(None), + }) + } + + /// Creates a packet on the stack, containing a message. + fn message_on_stack(msg: T) -> Packet<T> { + Packet { + on_stack: true, + ready: AtomicBool::new(false), + msg: UnsafeCell::new(Some(msg)), + } + } + + /// Waits until the packet becomes ready for reading or writing. + fn wait_ready(&self) { + let backoff = Backoff::new(); + while !self.ready.load(Ordering::Acquire) { + backoff.snooze(); + } + } +} + +/// Inner representation of a zero-capacity channel. +struct Inner { + /// Senders waiting to pair up with a receive operation. + senders: Waker, + + /// Receivers waiting to pair up with a send operation. + receivers: Waker, + + /// Equals `true` when the channel is disconnected. + is_disconnected: bool, +} + +/// Zero-capacity channel. +pub struct Channel<T> { + /// Inner representation of the channel. + inner: Spinlock<Inner>, + + /// Indicates that dropping a `Channel<T>` may drop values of type `T`. + _marker: PhantomData<T>, +} + +impl<T> Channel<T> { + /// Constructs a new zero-capacity channel. + pub fn new() -> Self { + Channel { + inner: Spinlock::new(Inner { + senders: Waker::new(), + receivers: Waker::new(), + is_disconnected: false, + }), + _marker: PhantomData, + } + } + + /// Returns a receiver handle to the channel. + pub fn receiver(&self) -> Receiver<'_, T> { + Receiver(self) + } + + /// Returns a sender handle to the channel. + pub fn sender(&self) -> Sender<'_, T> { + Sender(self) + } + + /// Attempts to reserve a slot for sending a message. + fn start_send(&self, token: &mut Token) -> bool { + let mut inner = self.inner.lock(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero = operation.packet; + true + } else if inner.is_disconnected { + token.zero = 0; + true + } else { + false + } + } + + /// Writes a message into the packet. + pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no packet, the channel is disconnected. + if token.zero == 0 { + return Err(msg); + } + + let packet = &*(token.zero as *const Packet<T>); + packet.msg.get().write(Some(msg)); + packet.ready.store(true, Ordering::Release); + Ok(()) + } + + /// Attempts to pair up with a sender. + fn start_recv(&self, token: &mut Token) -> bool { + let mut inner = self.inner.lock(); + + // If there's a waiting sender, pair up with it. + if let Some(operation) = inner.senders.try_select() { + token.zero = operation.packet; + true + } else if inner.is_disconnected { + token.zero = 0; + true + } else { + false + } + } + + /// Reads a message from the packet. + pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + // If there is no packet, the channel is disconnected. + if token.zero == 0 { + return Err(()); + } + + let packet = &*(token.zero as *const Packet<T>); + + if packet.on_stack { + // The message has been in the packet from the beginning, so there is no need to wait + // for it. However, after reading the message, we need to set `ready` to `true` in + // order to signal that the packet can be destroyed. + let msg = packet.msg.get().replace(None).unwrap(); + packet.ready.store(true, Ordering::Release); + Ok(msg) + } else { + // Wait until the message becomes available, then read it and destroy the + // heap-allocated packet. + packet.wait_ready(); + let msg = packet.msg.get().replace(None).unwrap(); + drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); + Ok(msg) + } + } + + /// Attempts to send a message into the channel. + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + let token = &mut Token::default(); + let mut inner = self.inner.lock(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero = operation.packet; + drop(inner); + unsafe { + self.write(token, msg).ok().unwrap(); + } + Ok(()) + } else if inner.is_disconnected { + Err(TrySendError::Disconnected(msg)) + } else { + Err(TrySendError::Full(msg)) + } + } + + /// Sends a message into the channel. + pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + let token = &mut Token::default(); + let mut inner = self.inner.lock(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero = operation.packet; + drop(inner); + unsafe { + self.write(token, msg).ok().unwrap(); + } + return Ok(()); + } + + if inner.is_disconnected { + return Err(SendTimeoutError::Disconnected(msg)); + } + + Context::with(|cx| { + // Prepare for blocking until a receiver wakes us up. + let oper = Operation::hook(token); + let packet = Packet::<T>::message_on_stack(msg); + inner + .senders + .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + inner.receivers.notify(); + drop(inner); + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => { + self.inner.lock().senders.unregister(oper).unwrap(); + let msg = unsafe { packet.msg.get().replace(None).unwrap() }; + Err(SendTimeoutError::Timeout(msg)) + } + Selected::Disconnected => { + self.inner.lock().senders.unregister(oper).unwrap(); + let msg = unsafe { packet.msg.get().replace(None).unwrap() }; + Err(SendTimeoutError::Disconnected(msg)) + } + Selected::Operation(_) => { + // Wait until the message is read, then drop the packet. + packet.wait_ready(); + Ok(()) + } + } + }) + } + + /// Attempts to receive a message without blocking. + pub fn try_recv(&self) -> Result<T, TryRecvError> { + let token = &mut Token::default(); + let mut inner = self.inner.lock(); + + // If there's a waiting sender, pair up with it. + if let Some(operation) = inner.senders.try_select() { + token.zero = operation.packet; + drop(inner); + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else if inner.is_disconnected { + Err(TryRecvError::Disconnected) + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + let token = &mut Token::default(); + let mut inner = self.inner.lock(); + + // If there's a waiting sender, pair up with it. + if let Some(operation) = inner.senders.try_select() { + token.zero = operation.packet; + drop(inner); + unsafe { + return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + } + } + + if inner.is_disconnected { + return Err(RecvTimeoutError::Disconnected); + } + + Context::with(|cx| { + // Prepare for blocking until a sender wakes us up. + let oper = Operation::hook(token); + let packet = Packet::<T>::empty_on_stack(); + inner + .receivers + .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + inner.senders.notify(); + drop(inner); + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => { + self.inner.lock().receivers.unregister(oper).unwrap(); + Err(RecvTimeoutError::Timeout) + } + Selected::Disconnected => { + self.inner.lock().receivers.unregister(oper).unwrap(); + Err(RecvTimeoutError::Disconnected) + } + Selected::Operation(_) => { + // Wait until the message is provided, then read it. + packet.wait_ready(); + unsafe { Ok(packet.msg.get().replace(None).unwrap()) } + } + } + }) + } + + /// Disconnects the channel and wakes up all blocked senders and receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub fn disconnect(&self) -> bool { + let mut inner = self.inner.lock(); + + if !inner.is_disconnected { + inner.is_disconnected = true; + inner.senders.disconnect(); + inner.receivers.disconnect(); + true + } else { + false + } + } + + /// Returns the current number of messages inside the channel. + pub fn len(&self) -> usize { + 0 + } + + /// Returns the capacity of the channel. + pub fn capacity(&self) -> Option<usize> { + Some(0) + } + + /// Returns `true` if the channel is empty. + pub fn is_empty(&self) -> bool { + true + } + + /// Returns `true` if the channel is full. + pub fn is_full(&self) -> bool { + true + } +} + +/// Receiver handle to a channel. +pub struct Receiver<'a, T>(&'a Channel<T>); + +/// Sender handle to a channel. +pub struct Sender<'a, T>(&'a Channel<T>); + +impl<T> SelectHandle for Receiver<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_recv(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + let packet = Box::into_raw(Packet::<T>::empty_on_heap()); + + let mut inner = self.0.inner.lock(); + inner + .receivers + .register_with_packet(oper, packet as usize, cx); + inner.senders.notify(); + inner.senders.can_select() || inner.is_disconnected + } + + fn unregister(&self, oper: Operation) { + if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { + unsafe { + drop(Box::from_raw(operation.packet as *mut Packet<T>)); + } + } + } + + fn accept(&self, token: &mut Token, cx: &Context) -> bool { + token.zero = cx.wait_packet(); + true + } + + fn is_ready(&self) -> bool { + let inner = self.0.inner.lock(); + inner.senders.can_select() || inner.is_disconnected + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + let mut inner = self.0.inner.lock(); + inner.receivers.watch(oper, cx); + inner.senders.can_select() || inner.is_disconnected + } + + fn unwatch(&self, oper: Operation) { + let mut inner = self.0.inner.lock(); + inner.receivers.unwatch(oper); + } +} + +impl<T> SelectHandle for Sender<'_, T> { + fn try_select(&self, token: &mut Token) -> bool { + self.0.start_send(token) + } + + fn deadline(&self) -> Option<Instant> { + None + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + let packet = Box::into_raw(Packet::<T>::empty_on_heap()); + + let mut inner = self.0.inner.lock(); + inner + .senders + .register_with_packet(oper, packet as usize, cx); + inner.receivers.notify(); + inner.receivers.can_select() || inner.is_disconnected + } + + fn unregister(&self, oper: Operation) { + if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { + unsafe { + drop(Box::from_raw(operation.packet as *mut Packet<T>)); + } + } + } + + fn accept(&self, token: &mut Token, cx: &Context) -> bool { + token.zero = cx.wait_packet(); + true + } + + fn is_ready(&self) -> bool { + let inner = self.0.inner.lock(); + inner.receivers.can_select() || inner.is_disconnected + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + let mut inner = self.0.inner.lock(); + inner.senders.watch(oper, cx); + inner.receivers.can_select() || inner.is_disconnected + } + + fn unwatch(&self, oper: Operation) { + let mut inner = self.0.inner.lock(); + inner.senders.unwatch(oper); + } +} diff --git a/third_party/rust/crossbeam-channel/src/lib.rs b/third_party/rust/crossbeam-channel/src/lib.rs new file mode 100644 index 0000000000..e08ac08f90 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/lib.rs @@ -0,0 +1,368 @@ +//! Multi-producer multi-consumer channels for message passing. +//! +//! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. +//! +//! # Hello, world! +//! +//! ``` +//! use crossbeam_channel::unbounded; +//! +//! // Create a channel of unbounded capacity. +//! let (s, r) = unbounded(); +//! +//! // Send a message into the channel. +//! s.send("Hello, world!").unwrap(); +//! +//! // Receive the message from the channel. +//! assert_eq!(r.recv(), Ok("Hello, world!")); +//! ``` +//! +//! # Channel types +//! +//! Channels can be created using two functions: +//! +//! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages +//! it can hold at a time. +//! +//! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of +//! messages at a time. +//! +//! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides +//! of a channel. +//! +//! Creating a bounded channel: +//! +//! ``` +//! use crossbeam_channel::bounded; +//! +//! // Create a channel that can hold at most 5 messages at a time. +//! let (s, r) = bounded(5); +//! +//! // Can send only 5 messages without blocking. +//! for i in 0..5 { +//! s.send(i).unwrap(); +//! } +//! +//! // Another call to `send` would block because the channel is full. +//! // s.send(5).unwrap(); +//! ``` +//! +//! Creating an unbounded channel: +//! +//! ``` +//! use crossbeam_channel::unbounded; +//! +//! // Create an unbounded channel. +//! let (s, r) = unbounded(); +//! +//! // Can send any number of messages into the channel without blocking. +//! for i in 0..1000 { +//! s.send(i).unwrap(); +//! } +//! ``` +//! +//! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and +//! receive operations must appear at the same time in order to pair up and pass the message over: +//! +//! ``` +//! use std::thread; +//! use crossbeam_channel::bounded; +//! +//! // Create a zero-capacity channel. +//! let (s, r) = bounded(0); +//! +//! // Sending blocks until a receive operation appears on the other side. +//! thread::spawn(move || s.send("Hi!").unwrap()); +//! +//! // Receiving blocks until a send operation appears on the other side. +//! assert_eq!(r.recv(), Ok("Hi!")); +//! ``` +//! +//! # Sharing channels +//! +//! Senders and receivers can be cloned and sent to other threads: +//! +//! ``` +//! use std::thread; +//! use crossbeam_channel::bounded; +//! +//! let (s1, r1) = bounded(0); +//! let (s2, r2) = (s1.clone(), r1.clone()); +//! +//! // Spawn a thread that receives a message and then sends one. +//! thread::spawn(move || { +//! r2.recv().unwrap(); +//! s2.send(2).unwrap(); +//! }); +//! +//! // Send a message and then receive one. +//! s1.send(1).unwrap(); +//! r1.recv().unwrap(); +//! ``` +//! +//! Note that cloning only creates a new handle to the same sending or receiving side. It does not +//! create a separate stream of messages in any way: +//! +//! ``` +//! use crossbeam_channel::unbounded; +//! +//! let (s1, r1) = unbounded(); +//! let (s2, r2) = (s1.clone(), r1.clone()); +//! let (s3, r3) = (s2.clone(), r2.clone()); +//! +//! s1.send(10).unwrap(); +//! s2.send(20).unwrap(); +//! s3.send(30).unwrap(); +//! +//! assert_eq!(r3.recv(), Ok(10)); +//! assert_eq!(r1.recv(), Ok(20)); +//! assert_eq!(r2.recv(), Ok(30)); +//! ``` +//! +//! It's also possible to share senders and receivers by reference: +//! +//! ``` +//! use crossbeam_channel::bounded; +//! use crossbeam_utils::thread::scope; +//! +//! let (s, r) = bounded(0); +//! +//! scope(|scope| { +//! // Spawn a thread that receives a message and then sends one. +//! scope.spawn(|_| { +//! r.recv().unwrap(); +//! s.send(2).unwrap(); +//! }); +//! +//! // Send a message and then receive one. +//! s.send(1).unwrap(); +//! r.recv().unwrap(); +//! }).unwrap(); +//! ``` +//! +//! # Disconnection +//! +//! When all senders or all receivers associated with a channel get dropped, the channel becomes +//! disconnected. No more messages can be sent, but any remaining messages can still be received. +//! Send and receive operations on a disconnected channel never block. +//! +//! ``` +//! use crossbeam_channel::{unbounded, RecvError}; +//! +//! let (s, r) = unbounded(); +//! s.send(1).unwrap(); +//! s.send(2).unwrap(); +//! s.send(3).unwrap(); +//! +//! // The only sender is dropped, disconnecting the channel. +//! drop(s); +//! +//! // The remaining messages can be received. +//! assert_eq!(r.recv(), Ok(1)); +//! assert_eq!(r.recv(), Ok(2)); +//! assert_eq!(r.recv(), Ok(3)); +//! +//! // There are no more messages in the channel. +//! assert!(r.is_empty()); +//! +//! // Note that calling `r.recv()` does not block. +//! // Instead, `Err(RecvError)` is returned immediately. +//! assert_eq!(r.recv(), Err(RecvError)); +//! ``` +//! +//! # Blocking operations +//! +//! Send and receive operations come in three flavors: +//! +//! * Non-blocking (returns immediately with success or failure). +//! * Blocking (waits until the operation succeeds or the channel becomes disconnected). +//! * Blocking with a timeout (blocks only for a certain duration of time). +//! +//! A simple example showing the difference between non-blocking and blocking operations: +//! +//! ``` +//! use crossbeam_channel::{bounded, RecvError, TryRecvError}; +//! +//! let (s, r) = bounded(1); +//! +//! // Send a message into the channel. +//! s.send("foo").unwrap(); +//! +//! // This call would block because the channel is full. +//! // s.send("bar").unwrap(); +//! +//! // Receive the message. +//! assert_eq!(r.recv(), Ok("foo")); +//! +//! // This call would block because the channel is empty. +//! // r.recv(); +//! +//! // Try receiving a message without blocking. +//! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +//! +//! // Disconnect the channel. +//! drop(s); +//! +//! // This call doesn't block because the channel is now disconnected. +//! assert_eq!(r.recv(), Err(RecvError)); +//! ``` +//! +//! # Iteration +//! +//! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that +//! receives messages until the channel becomes empty and disconnected. Note that iteration may +//! block waiting for next message to arrive. +//! +//! ``` +//! use std::thread; +//! use crossbeam_channel::unbounded; +//! +//! let (s, r) = unbounded(); +//! +//! thread::spawn(move || { +//! s.send(1).unwrap(); +//! s.send(2).unwrap(); +//! s.send(3).unwrap(); +//! drop(s); // Disconnect the channel. +//! }); +//! +//! // Collect all messages from the channel. +//! // Note that the call to `collect` blocks until the sender is dropped. +//! let v: Vec<_> = r.iter().collect(); +//! +//! assert_eq!(v, [1, 2, 3]); +//! ``` +//! +//! A non-blocking iterator can be created using [`try_iter`], which receives all available +//! messages without blocking: +//! +//! ``` +//! use crossbeam_channel::unbounded; +//! +//! let (s, r) = unbounded(); +//! s.send(1).unwrap(); +//! s.send(2).unwrap(); +//! s.send(3).unwrap(); +//! // No need to drop the sender. +//! +//! // Receive all messages currently in the channel. +//! let v: Vec<_> = r.try_iter().collect(); +//! +//! assert_eq!(v, [1, 2, 3]); +//! ``` +//! +//! # Selection +//! +//! The [`select!`] macro allows you to define a set of channel operations, wait until any one of +//! them becomes ready, and finally execute it. If multiple operations are ready at the same time, +//! a random one among them is selected. +//! +//! It is also possible to define a `default` case that gets executed if none of the operations are +//! ready, either right away or for a certain duration of time. +//! +//! An operation is considered to be ready if it doesn't have to block. Note that it is ready even +//! when it will simply return an error because the channel is disconnected. +//! +//! An example of receiving a message from two channels: +//! +//! ``` +//! use std::thread; +//! use std::time::Duration; +//! use crossbeam_channel::{select, unbounded}; +//! +//! let (s1, r1) = unbounded(); +//! let (s2, r2) = unbounded(); +//! +//! thread::spawn(move || s1.send(10).unwrap()); +//! thread::spawn(move || s2.send(20).unwrap()); +//! +//! // At most one of these two receive operations will be executed. +//! select! { +//! recv(r1) -> msg => assert_eq!(msg, Ok(10)), +//! recv(r2) -> msg => assert_eq!(msg, Ok(20)), +//! default(Duration::from_secs(1)) => println!("timed out"), +//! } +//! ``` +//! +//! If you need to select over a dynamically created list of channel operations, use [`Select`] +//! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. +//! +//! # Extra channels +//! +//! Three functions can create special kinds of channels, all of which return just a [`Receiver`] +//! handle: +//! +//! * [`after`] creates a channel that delivers a single message after a certain duration of time. +//! * [`tick`] creates a channel that delivers messages periodically. +//! * [`never`] creates a channel that never delivers messages. +//! +//! These channels are very efficient because messages get lazily generated on receive operations. +//! +//! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: +//! +//! ``` +//! use std::time::{Duration, Instant}; +//! use crossbeam_channel::{after, select, tick}; +//! +//! let start = Instant::now(); +//! let ticker = tick(Duration::from_millis(50)); +//! let timeout = after(Duration::from_secs(1)); +//! +//! loop { +//! select! { +//! recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()), +//! recv(timeout) -> _ => break, +//! } +//! } +//! ``` +//! +//! [`send`]: Sender::send +//! [`recv`]: Receiver::recv +//! [`iter`]: Receiver::iter +//! [`try_iter`]: Receiver::try_iter + +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms), + allow(dead_code, unused_assignments, unused_variables) + ) +))] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![cfg_attr(not(feature = "std"), no_std)] +// matches! requires Rust 1.42 +#![allow(clippy::match_like_matches_macro)] + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(feature = "std")] { + mod channel; + mod context; + mod counter; + mod err; + mod flavors; + mod select; + mod select_macro; + mod utils; + mod waker; + + /// Crate internals used by the `select!` macro. + #[doc(hidden)] + pub mod internal { + pub use crate::select::SelectHandle; + pub use crate::select::{select, select_timeout, try_select}; + } + + pub use crate::channel::{after, at, never, tick}; + pub use crate::channel::{bounded, unbounded}; + pub use crate::channel::{IntoIter, Iter, TryIter}; + pub use crate::channel::{Receiver, Sender}; + + pub use crate::select::{Select, SelectedOperation}; + + pub use crate::err::{ReadyTimeoutError, SelectTimeoutError, TryReadyError, TrySelectError}; + pub use crate::err::{RecvError, RecvTimeoutError, TryRecvError}; + pub use crate::err::{SendError, SendTimeoutError, TrySendError}; + } +} diff --git a/third_party/rust/crossbeam-channel/src/select.rs b/third_party/rust/crossbeam-channel/src/select.rs new file mode 100644 index 0000000000..1488f80d5f --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/select.rs @@ -0,0 +1,1251 @@ +//! Interface to the select mechanism. + +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::time::{Duration, Instant}; + +use crossbeam_utils::Backoff; + +use crate::channel::{self, Receiver, Sender}; +use crate::context::Context; +use crate::err::{ReadyTimeoutError, TryReadyError}; +use crate::err::{RecvError, SendError}; +use crate::err::{SelectTimeoutError, TrySelectError}; +use crate::flavors; +use crate::utils; + +/// Temporary data that gets initialized during select or a blocking operation, and is consumed by +/// `read` or `write`. +/// +/// Each field contains data associated with a specific channel flavor. +#[derive(Debug, Default)] +pub struct Token { + pub at: flavors::at::AtToken, + pub array: flavors::array::ArrayToken, + pub list: flavors::list::ListToken, + pub never: flavors::never::NeverToken, + pub tick: flavors::tick::TickToken, + pub zero: flavors::zero::ZeroToken, +} + +/// Identifier associated with an operation by a specific thread on a specific channel. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Operation(usize); + +impl Operation { + /// Creates an operation identifier from a mutable reference. + /// + /// This function essentially just turns the address of the reference into a number. The + /// reference should point to a variable that is specific to the thread and the operation, + /// and is alive for the entire duration of select or blocking operation. + #[inline] + pub fn hook<T>(r: &mut T) -> Operation { + let val = r as *mut T as usize; + // Make sure that the pointer address doesn't equal the numerical representation of + // `Selected::{Waiting, Aborted, Disconnected}`. + assert!(val > 2); + Operation(val) + } +} + +/// Current state of a select or a blocking operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Selected { + /// Still waiting for an operation. + Waiting, + + /// The attempt to block the current thread has been aborted. + Aborted, + + /// An operation became ready because a channel is disconnected. + Disconnected, + + /// An operation became ready because a message can be sent or received. + Operation(Operation), +} + +impl From<usize> for Selected { + #[inline] + fn from(val: usize) -> Selected { + match val { + 0 => Selected::Waiting, + 1 => Selected::Aborted, + 2 => Selected::Disconnected, + oper => Selected::Operation(Operation(oper)), + } + } +} + +impl Into<usize> for Selected { + #[inline] + fn into(self) -> usize { + match self { + Selected::Waiting => 0, + Selected::Aborted => 1, + Selected::Disconnected => 2, + Selected::Operation(Operation(val)) => val, + } + } +} + +/// A receiver or a sender that can participate in select. +/// +/// This is a handle that assists select in executing an operation, registration, deciding on the +/// appropriate deadline for blocking, etc. +pub trait SelectHandle { + /// Attempts to select an operation and returns `true` on success. + fn try_select(&self, token: &mut Token) -> bool; + + /// Returns a deadline for an operation, if there is one. + fn deadline(&self) -> Option<Instant>; + + /// Registers an operation for execution and returns `true` if it is now ready. + fn register(&self, oper: Operation, cx: &Context) -> bool; + + /// Unregisters an operation for execution. + fn unregister(&self, oper: Operation); + + /// Attempts to select an operation the thread got woken up for and returns `true` on success. + fn accept(&self, token: &mut Token, cx: &Context) -> bool; + + /// Returns `true` if an operation can be executed without blocking. + fn is_ready(&self) -> bool; + + /// Registers an operation for readiness notification and returns `true` if it is now ready. + fn watch(&self, oper: Operation, cx: &Context) -> bool; + + /// Unregisters an operation for readiness notification. + fn unwatch(&self, oper: Operation); +} + +impl<T: SelectHandle> SelectHandle for &T { + fn try_select(&self, token: &mut Token) -> bool { + (**self).try_select(token) + } + + fn deadline(&self) -> Option<Instant> { + (**self).deadline() + } + + fn register(&self, oper: Operation, cx: &Context) -> bool { + (**self).register(oper, cx) + } + + fn unregister(&self, oper: Operation) { + (**self).unregister(oper); + } + + fn accept(&self, token: &mut Token, cx: &Context) -> bool { + (**self).accept(token, cx) + } + + fn is_ready(&self) -> bool { + (**self).is_ready() + } + + fn watch(&self, oper: Operation, cx: &Context) -> bool { + (**self).watch(oper, cx) + } + + fn unwatch(&self, oper: Operation) { + (**self).unwatch(oper) + } +} + +/// Determines when a select operation should time out. +#[derive(Clone, Copy, Eq, PartialEq)] +enum Timeout { + /// No blocking. + Now, + + /// Block forever. + Never, + + /// Time out after the time instant. + At(Instant), +} + +/// Runs until one of the operations is selected, potentially blocking the current thread. +/// +/// Successful receive operations will have to be followed up by `channel::read()` and successful +/// send operations by `channel::write()`. +fn run_select( + handles: &mut [(&dyn SelectHandle, usize, *const u8)], + timeout: Timeout, +) -> Option<(Token, usize, *const u8)> { + if handles.is_empty() { + // Wait until the timeout and return. + match timeout { + Timeout::Now => return None, + Timeout::Never => { + utils::sleep_until(None); + unreachable!(); + } + Timeout::At(when) => { + utils::sleep_until(Some(when)); + return None; + } + } + } + + // Shuffle the operations for fairness. + utils::shuffle(handles); + + // Create a token, which serves as a temporary variable that gets initialized in this function + // and is later used by a call to `channel::read()` or `channel::write()` that completes the + // selected operation. + let mut token = Token::default(); + + // Try selecting one of the operations without blocking. + for &(handle, i, ptr) in handles.iter() { + if handle.try_select(&mut token) { + return Some((token, i, ptr)); + } + } + + loop { + // Prepare for blocking. + let res = Context::with(|cx| { + let mut sel = Selected::Waiting; + let mut registered_count = 0; + let mut index_ready = None; + + if let Timeout::Now = timeout { + cx.try_select(Selected::Aborted).unwrap(); + } + + // Register all operations. + for (handle, i, _) in handles.iter_mut() { + registered_count += 1; + + // If registration returns `false`, that means the operation has just become ready. + if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) { + // Try aborting select. + sel = match cx.try_select(Selected::Aborted) { + Ok(()) => { + index_ready = Some(*i); + Selected::Aborted + } + Err(s) => s, + }; + break; + } + + // If another thread has already selected one of the operations, stop registration. + sel = cx.selected(); + if sel != Selected::Waiting { + break; + } + } + + if sel == Selected::Waiting { + // Check with each operation for how long we're allowed to block, and compute the + // earliest deadline. + let mut deadline: Option<Instant> = match timeout { + Timeout::Now => return None, + Timeout::Never => None, + Timeout::At(when) => Some(when), + }; + for &(handle, _, _) in handles.iter() { + if let Some(x) = handle.deadline() { + deadline = deadline.map(|y| x.min(y)).or(Some(x)); + } + } + + // Block the current thread. + sel = cx.wait_until(deadline); + } + + // Unregister all registered operations. + for (handle, _, _) in handles.iter_mut().take(registered_count) { + handle.unregister(Operation::hook::<&dyn SelectHandle>(handle)); + } + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => { + // If an operation became ready during registration, try selecting it. + if let Some(index_ready) = index_ready { + for &(handle, i, ptr) in handles.iter() { + if i == index_ready && handle.try_select(&mut token) { + return Some((i, ptr)); + } + } + } + } + Selected::Disconnected => {} + Selected::Operation(_) => { + // Find the selected operation. + for (handle, i, ptr) in handles.iter_mut() { + // Is this the selected operation? + if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle)) + { + // Try selecting this operation. + if handle.accept(&mut token, cx) { + return Some((*i, *ptr)); + } + } + } + } + } + + None + }); + + // Return if an operation was selected. + if let Some((i, ptr)) = res { + return Some((token, i, ptr)); + } + + // Try selecting one of the operations without blocking. + for &(handle, i, ptr) in handles.iter() { + if handle.try_select(&mut token) { + return Some((token, i, ptr)); + } + } + + match timeout { + Timeout::Now => return None, + Timeout::Never => {} + Timeout::At(when) => { + if Instant::now() >= when { + return None; + } + } + } + } +} + +/// Runs until one of the operations becomes ready, potentially blocking the current thread. +fn run_ready( + handles: &mut [(&dyn SelectHandle, usize, *const u8)], + timeout: Timeout, +) -> Option<usize> { + if handles.is_empty() { + // Wait until the timeout and return. + match timeout { + Timeout::Now => return None, + Timeout::Never => { + utils::sleep_until(None); + unreachable!(); + } + Timeout::At(when) => { + utils::sleep_until(Some(when)); + return None; + } + } + } + + // Shuffle the operations for fairness. + utils::shuffle(handles); + + loop { + let backoff = Backoff::new(); + loop { + // Check operations for readiness. + for &(handle, i, _) in handles.iter() { + if handle.is_ready() { + return Some(i); + } + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + // Check for timeout. + match timeout { + Timeout::Now => return None, + Timeout::Never => {} + Timeout::At(when) => { + if Instant::now() >= when { + return None; + } + } + } + + // Prepare for blocking. + let res = Context::with(|cx| { + let mut sel = Selected::Waiting; + let mut registered_count = 0; + + // Begin watching all operations. + for (handle, _, _) in handles.iter_mut() { + registered_count += 1; + let oper = Operation::hook::<&dyn SelectHandle>(handle); + + // If registration returns `false`, that means the operation has just become ready. + if handle.watch(oper, cx) { + sel = match cx.try_select(Selected::Operation(oper)) { + Ok(()) => Selected::Operation(oper), + Err(s) => s, + }; + break; + } + + // If another thread has already chosen one of the operations, stop registration. + sel = cx.selected(); + if sel != Selected::Waiting { + break; + } + } + + if sel == Selected::Waiting { + // Check with each operation for how long we're allowed to block, and compute the + // earliest deadline. + let mut deadline: Option<Instant> = match timeout { + Timeout::Now => unreachable!(), + Timeout::Never => None, + Timeout::At(when) => Some(when), + }; + for &(handle, _, _) in handles.iter() { + if let Some(x) = handle.deadline() { + deadline = deadline.map(|y| x.min(y)).or(Some(x)); + } + } + + // Block the current thread. + sel = cx.wait_until(deadline); + } + + // Unwatch all operations. + for (handle, _, _) in handles.iter_mut().take(registered_count) { + handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle)); + } + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => {} + Selected::Disconnected => {} + Selected::Operation(_) => { + for (handle, i, _) in handles.iter_mut() { + let oper = Operation::hook::<&dyn SelectHandle>(handle); + if sel == Selected::Operation(oper) { + return Some(*i); + } + } + } + } + + None + }); + + // Return if an operation became ready. + if res.is_some() { + return res; + } + } +} + +/// Attempts to select one of the operations without blocking. +#[inline] +pub fn try_select<'a>( + handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], +) -> Result<SelectedOperation<'a>, TrySelectError> { + match run_select(handles, Timeout::Now) { + None => Err(TrySelectError), + Some((token, index, ptr)) => Ok(SelectedOperation { + token, + index, + ptr, + _marker: PhantomData, + }), + } +} + +/// Blocks until one of the operations becomes ready and selects it. +#[inline] +pub fn select<'a>( + handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], +) -> SelectedOperation<'a> { + if handles.is_empty() { + panic!("no operations have been added to `Select`"); + } + + let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap(); + SelectedOperation { + token, + index, + ptr, + _marker: PhantomData, + } +} + +/// Blocks for a limited time until one of the operations becomes ready and selects it. +#[inline] +pub fn select_timeout<'a>( + handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + timeout: Duration, +) -> Result<SelectedOperation<'a>, SelectTimeoutError> { + select_deadline(handles, Instant::now() + timeout) +} + +/// Blocks until a given deadline, or until one of the operations becomes ready and selects it. +#[inline] +pub fn select_deadline<'a>( + handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + deadline: Instant, +) -> Result<SelectedOperation<'a>, SelectTimeoutError> { + match run_select(handles, Timeout::At(deadline)) { + None => Err(SelectTimeoutError), + Some((token, index, ptr)) => Ok(SelectedOperation { + token, + index, + ptr, + _marker: PhantomData, + }), + } +} + +/// Selects from a set of channel operations. +/// +/// `Select` allows you to define a set of channel operations, wait until any one of them becomes +/// ready, and finally execute it. If multiple operations are ready at the same time, a random one +/// among them is selected. +/// +/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even +/// when it will simply return an error because the channel is disconnected. +/// +/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a +/// dynamically created list of channel operations. +/// +/// Once a list of operations has been built with `Select`, there are two different ways of +/// proceeding: +/// +/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful, +/// the returned selected operation has already begun and **must** be completed. If we don't +/// complete it, a panic will occur. +/// +/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If +/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's +/// possible for another thread to make the operation not ready just before we try executing it, +/// so it's wise to use a retry loop. However, note that these methods might return with success +/// spuriously, so it's a good idea to always double check if the operation is really ready. +/// +/// # Examples +/// +/// Use [`select`] to receive a message from a list of receivers: +/// +/// ``` +/// use crossbeam_channel::{Receiver, RecvError, Select}; +/// +/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { +/// // Build a list of operations. +/// let mut sel = Select::new(); +/// for r in rs { +/// sel.recv(r); +/// } +/// +/// // Complete the selected operation. +/// let oper = sel.select(); +/// let index = oper.index(); +/// oper.recv(&rs[index]) +/// } +/// ``` +/// +/// Use [`ready`] to receive a message from a list of receivers: +/// +/// ``` +/// use crossbeam_channel::{Receiver, RecvError, Select}; +/// +/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { +/// // Build a list of operations. +/// let mut sel = Select::new(); +/// for r in rs { +/// sel.recv(r); +/// } +/// +/// loop { +/// // Wait until a receive operation becomes ready and try executing it. +/// let index = sel.ready(); +/// let res = rs[index].try_recv(); +/// +/// // If the operation turns out not to be ready, retry. +/// if let Err(e) = res { +/// if e.is_empty() { +/// continue; +/// } +/// } +/// +/// // Success! +/// return res.map_err(|_| RecvError); +/// } +/// } +/// ``` +/// +/// [`try_select`]: Select::try_select +/// [`select`]: Select::select +/// [`select_timeout`]: Select::select_timeout +/// [`try_ready`]: Select::try_ready +/// [`ready`]: Select::ready +/// [`ready_timeout`]: Select::ready_timeout +pub struct Select<'a> { + /// A list of senders and receivers participating in selection. + handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>, + + /// The next index to assign to an operation. + next_index: usize, +} + +unsafe impl Send for Select<'_> {} +unsafe impl Sync for Select<'_> {} + +impl<'a> Select<'a> { + /// Creates an empty list of channel operations for selection. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::Select; + /// + /// let mut sel = Select::new(); + /// + /// // The list of operations is empty, which means no operation can be selected. + /// assert!(sel.try_select().is_err()); + /// ``` + pub fn new() -> Select<'a> { + Select { + handles: Vec::with_capacity(4), + next_index: 0, + } + } + + /// Adds a send operation. + /// + /// Returns the index of the added operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s, r) = unbounded::<i32>(); + /// + /// let mut sel = Select::new(); + /// let index = sel.send(&s); + /// ``` + pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize { + let i = self.next_index; + let ptr = s as *const Sender<_> as *const u8; + self.handles.push((s, i, ptr)); + self.next_index += 1; + i + } + + /// Adds a receive operation. + /// + /// Returns the index of the added operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s, r) = unbounded::<i32>(); + /// + /// let mut sel = Select::new(); + /// let index = sel.recv(&r); + /// ``` + pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize { + let i = self.next_index; + let ptr = r as *const Receiver<_> as *const u8; + self.handles.push((r, i, ptr)); + self.next_index += 1; + i + } + + /// Removes a previously added operation. + /// + /// This is useful when an operation is selected because the channel got disconnected and we + /// want to try again to select a different operation instead. + /// + /// If new operations are added after removing some, the indices of removed operations will not + /// be reused. + /// + /// # Panics + /// + /// An attempt to remove a non-existing or already removed operation will panic. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded::<i32>(); + /// let (_, r2) = unbounded::<i32>(); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // Both operations are initially ready, so a random one will be executed. + /// let oper = sel.select(); + /// assert_eq!(oper.index(), oper2); + /// assert!(oper.recv(&r2).is_err()); + /// sel.remove(oper2); + /// + /// s1.send(10).unwrap(); + /// + /// let oper = sel.select(); + /// assert_eq!(oper.index(), oper1); + /// assert_eq!(oper.recv(&r1), Ok(10)); + /// ``` + pub fn remove(&mut self, index: usize) { + assert!( + index < self.next_index, + "index out of bounds; {} >= {}", + index, + self.next_index, + ); + + let i = self + .handles + .iter() + .enumerate() + .find(|(_, (_, i, _))| *i == index) + .expect("no operation with this index") + .0; + + self.handles.swap_remove(i); + } + + /// Attempts to select one of the operations without blocking. + /// + /// If an operation is ready, it is selected and returned. If multiple operations are ready at + /// the same time, a random one among them is selected. If none of the operations are ready, an + /// error is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// The selected operation must be completed with [`SelectedOperation::send`] + /// or [`SelectedOperation::recv`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// s1.send(10).unwrap(); + /// s2.send(20).unwrap(); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // Both operations are initially ready, so a random one will be executed. + /// let oper = sel.try_select(); + /// match oper { + /// Err(_) => panic!("both operations should be ready"), + /// Ok(oper) => match oper.index() { + /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), + /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), + /// _ => unreachable!(), + /// } + /// } + /// ``` + pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> { + try_select(&mut self.handles) + } + + /// Blocks until one of the operations becomes ready and selects it. + /// + /// Once an operation becomes ready, it is selected and returned. If multiple operations are + /// ready at the same time, a random one among them is selected. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// The selected operation must be completed with [`SelectedOperation::send`] + /// or [`SelectedOperation::recv`]. + /// + /// # Panics + /// + /// Panics if no operations have been added to `Select`. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // The second operation will be selected because it becomes ready first. + /// let oper = sel.select(); + /// match oper.index() { + /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), + /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), + /// _ => unreachable!(), + /// } + /// ``` + pub fn select(&mut self) -> SelectedOperation<'a> { + select(&mut self.handles) + } + + /// Blocks for a limited time until one of the operations becomes ready and selects it. + /// + /// If an operation becomes ready, it is selected and returned. If multiple operations are + /// ready at the same time, a random one among them is selected. If none of the operations + /// become ready for the specified duration, an error is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// The selected operation must be completed with [`SelectedOperation::send`] + /// or [`SelectedOperation::recv`]. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // The second operation will be selected because it becomes ready first. + /// let oper = sel.select_timeout(Duration::from_millis(500)); + /// match oper { + /// Err(_) => panic!("should not have timed out"), + /// Ok(oper) => match oper.index() { + /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), + /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), + /// _ => unreachable!(), + /// } + /// } + /// ``` + pub fn select_timeout( + &mut self, + timeout: Duration, + ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { + select_timeout(&mut self.handles, timeout) + } + + /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. + /// + /// If an operation becomes ready, it is selected and returned. If multiple operations are + /// ready at the same time, a random one among them is selected. If none of the operations + /// become ready before the given deadline, an error is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// The selected operation must be completed with [`SelectedOperation::send`] + /// or [`SelectedOperation::recv`]. + /// + /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send + /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::{Instant, Duration}; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // The second operation will be selected because it becomes ready first. + /// let oper = sel.select_deadline(deadline); + /// match oper { + /// Err(_) => panic!("should not have timed out"), + /// Ok(oper) => match oper.index() { + /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), + /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), + /// _ => unreachable!(), + /// } + /// } + /// ``` + pub fn select_deadline( + &mut self, + deadline: Instant, + ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { + select_deadline(&mut self.handles, deadline) + } + + /// Attempts to find a ready operation without blocking. + /// + /// If an operation is ready, its index is returned. If multiple operations are ready at the + /// same time, a random one among them is chosen. If none of the operations are ready, an error + /// is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// Note that this method might return with success spuriously, so it's a good idea to always + /// double check if the operation is really ready. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// s1.send(10).unwrap(); + /// s2.send(20).unwrap(); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // Both operations are initially ready, so a random one will be chosen. + /// match sel.try_ready() { + /// Err(_) => panic!("both operations should be ready"), + /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), + /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), + /// Ok(_) => unreachable!(), + /// } + /// ``` + pub fn try_ready(&mut self) -> Result<usize, TryReadyError> { + match run_ready(&mut self.handles, Timeout::Now) { + None => Err(TryReadyError), + Some(index) => Ok(index), + } + } + + /// Blocks until one of the operations becomes ready. + /// + /// Once an operation becomes ready, its index is returned. If multiple operations are ready at + /// the same time, a random one among them is chosen. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// Note that this method might return with success spuriously, so it's a good idea to always + /// double check if the operation is really ready. + /// + /// # Panics + /// + /// Panics if no operations have been added to `Select`. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // The second operation will be selected because it becomes ready first. + /// match sel.ready() { + /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), + /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), + /// _ => unreachable!(), + /// } + /// ``` + pub fn ready(&mut self) -> usize { + if self.handles.is_empty() { + panic!("no operations have been added to `Select`"); + } + + run_ready(&mut self.handles, Timeout::Never).unwrap() + } + + /// Blocks for a limited time until one of the operations becomes ready. + /// + /// If an operation becomes ready, its index is returned. If multiple operations are ready at + /// the same time, a random one among them is chosen. If none of the operations become ready + /// for the specified duration, an error is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// Note that this method might return with success spuriously, so it's a good idea to double + /// check if the operation is really ready. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // The second operation will be selected because it becomes ready first. + /// match sel.ready_timeout(Duration::from_millis(500)) { + /// Err(_) => panic!("should not have timed out"), + /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), + /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), + /// Ok(_) => unreachable!(), + /// } + /// ``` + pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { + self.ready_deadline(Instant::now() + timeout) + } + + /// Blocks until a given deadline, or until one of the operations becomes ready. + /// + /// If an operation becomes ready, its index is returned. If multiple operations are ready at + /// the same time, a random one among them is chosen. If none of the operations become ready + /// before the deadline, an error is returned. + /// + /// An operation is considered to be ready if it doesn't have to block. Note that it is ready + /// even when it will simply return an error because the channel is disconnected. + /// + /// Note that this method might return with success spuriously, so it's a good idea to double + /// check if the operation is really ready. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use crossbeam_channel::{unbounded, Select}; + /// + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// let (s1, r1) = unbounded(); + /// let (s2, r2) = unbounded(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// s1.send(10).unwrap(); + /// }); + /// thread::spawn(move || s2.send(20).unwrap()); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r1); + /// let oper2 = sel.recv(&r2); + /// + /// // The second operation will be selected because it becomes ready first. + /// match sel.ready_deadline(deadline) { + /// Err(_) => panic!("should not have timed out"), + /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), + /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), + /// Ok(_) => unreachable!(), + /// } + /// ``` + pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> { + match run_ready(&mut self.handles, Timeout::At(deadline)) { + None => Err(ReadyTimeoutError), + Some(index) => Ok(index), + } + } +} + +impl<'a> Clone for Select<'a> { + fn clone(&self) -> Select<'a> { + Select { + handles: self.handles.clone(), + next_index: self.next_index, + } + } +} + +impl<'a> Default for Select<'a> { + fn default() -> Select<'a> { + Select::new() + } +} + +impl fmt::Debug for Select<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Select { .. }") + } +} + +/// A selected operation that needs to be completed. +/// +/// To complete the operation, call [`send`] or [`recv`]. +/// +/// # Panics +/// +/// Forgetting to complete the operation is an error and might lead to deadlocks. If a +/// `SelectedOperation` is dropped without completion, a panic occurs. +/// +/// [`send`]: SelectedOperation::send +/// [`recv`]: SelectedOperation::recv +#[must_use] +pub struct SelectedOperation<'a> { + /// Token needed to complete the operation. + token: Token, + + /// The index of the selected operation. + index: usize, + + /// The address of the selected `Sender` or `Receiver`. + ptr: *const u8, + + /// Indicates that `Sender`s and `Receiver`s are borrowed. + _marker: PhantomData<&'a ()>, +} + +impl SelectedOperation<'_> { + /// Returns the index of the selected operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, Select}; + /// + /// let (s1, r1) = bounded::<()>(0); + /// let (s2, r2) = bounded::<()>(0); + /// let (s3, r3) = bounded::<()>(1); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.send(&s1); + /// let oper2 = sel.recv(&r2); + /// let oper3 = sel.send(&s3); + /// + /// // Only the last operation is ready. + /// let oper = sel.select(); + /// assert_eq!(oper.index(), 2); + /// assert_eq!(oper.index(), oper3); + /// + /// // Complete the operation. + /// oper.send(&s3, ()).unwrap(); + /// ``` + pub fn index(&self) -> usize { + self.index + } + + /// Completes the send operation. + /// + /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`] + /// when the operation was added. + /// + /// # Panics + /// + /// Panics if an incorrect [`Sender`] reference is passed. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, Select, SendError}; + /// + /// let (s, r) = bounded::<i32>(0); + /// drop(r); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.send(&s); + /// + /// let oper = sel.select(); + /// assert_eq!(oper.index(), oper1); + /// assert_eq!(oper.send(&s, 10), Err(SendError(10))); + /// ``` + pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> { + assert!( + s as *const Sender<T> as *const u8 == self.ptr, + "passed a sender that wasn't selected", + ); + let res = unsafe { channel::write(s, &mut self.token, msg) }; + mem::forget(self); + res.map_err(SendError) + } + + /// Completes the receive operation. + /// + /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`] + /// when the operation was added. + /// + /// # Panics + /// + /// Panics if an incorrect [`Receiver`] reference is passed. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::{bounded, Select, RecvError}; + /// + /// let (s, r) = bounded::<i32>(0); + /// drop(s); + /// + /// let mut sel = Select::new(); + /// let oper1 = sel.recv(&r); + /// + /// let oper = sel.select(); + /// assert_eq!(oper.index(), oper1); + /// assert_eq!(oper.recv(&r), Err(RecvError)); + /// ``` + pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> { + assert!( + r as *const Receiver<T> as *const u8 == self.ptr, + "passed a receiver that wasn't selected", + ); + let res = unsafe { channel::read(r, &mut self.token) }; + mem::forget(self); + res.map_err(|_| RecvError) + } +} + +impl fmt::Debug for SelectedOperation<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("SelectedOperation { .. }") + } +} + +impl Drop for SelectedOperation<'_> { + fn drop(&mut self) { + panic!("dropped `SelectedOperation` without completing the operation"); + } +} diff --git a/third_party/rust/crossbeam-channel/src/select_macro.rs b/third_party/rust/crossbeam-channel/src/select_macro.rs new file mode 100644 index 0000000000..f8b247ea43 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/select_macro.rs @@ -0,0 +1,1166 @@ +//! The `select!` macro. + +/// A helper macro for `select!` to hide the long list of macro patterns from the documentation. +/// +/// The macro consists of two stages: +/// 1. Parsing +/// 2. Code generation +/// +/// The parsing stage consists of these subparts: +/// 1. `@list`: Turns a list of tokens into a list of cases. +/// 2. `@list_errorN`: Diagnoses the syntax error. +/// 3. `@case`: Parses a single case and verifies its argument list. +/// +/// The codegen stage consists of these subparts: +/// 1. `@init`: Attempts to optimize `select!` away and initializes the list of handles. +/// 1. `@count`: Counts the listed cases. +/// 3. `@add`: Adds send/receive operations to the list of handles and starts selection. +/// 4. `@complete`: Completes the selected send/receive operation. +/// +/// If the parsing stage encounters a syntax error or the codegen stage ends up with too many +/// cases to process, the macro fails with a compile-time error. +#[doc(hidden)] +#[macro_export] +macro_rules! crossbeam_channel_internal { + // The list is empty. Now check the arguments of each processed case. + (@list + () + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($head)*) + () + () + ) + }; + // If necessary, insert an empty argument list after `default`. + (@list + (default => $($tail:tt)*) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @list + (default() => $($tail)*) + ($($head)*) + ) + }; + // But print an error if `default` is followed by a `->`. + (@list + (default -> $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!( + "expected `=>` after `default` case, found `->`" + ) + }; + // Print an error if there's an `->` after the argument list in the default case. + (@list + (default $args:tt -> $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!( + "expected `=>` after `default` case, found `->`" + ) + }; + // Print an error if there is a missing result in a recv case. + (@list + (recv($($args:tt)*) => $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!( + "expected `->` after `recv` case, found `=>`" + ) + }; + // Print an error if there is a missing result in a send case. + (@list + (send($($args:tt)*) => $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!( + "expected `->` after `send` operation, found `=>`" + ) + }; + // Make sure the arrow and the result are not repeated. + (@list + ($case:ident $args:tt -> $res:tt -> $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!("expected `=>`, found `->`") + }; + // Print an error if there is a semicolon after the block. + (@list + ($case:ident $args:tt $(-> $res:pat)* => $body:block; $($tail:tt)*) + ($($head:tt)*) + ) => { + compile_error!( + "did you mean to put a comma instead of the semicolon after `}`?" + ) + }; + // The first case is separated by a comma. + (@list + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr, $($tail:tt)*) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @list + ($($tail)*) + ($($head)* $case ($($args)*) $(-> $res)* => { $body },) + ) + }; + // Don't require a comma after the case if it has a proper block. + (@list + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:block $($tail:tt)*) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @list + ($($tail)*) + ($($head)* $case ($($args)*) $(-> $res)* => { $body },) + ) + }; + // Only one case remains. + (@list + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @list + () + ($($head)* $case ($($args)*) $(-> $res)* => { $body },) + ) + }; + // Accept a trailing comma at the end of the list. + (@list + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr,) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!( + @list + () + ($($head)* $case ($($args)*) $(-> $res)* => { $body },) + ) + }; + // Diagnose and print an error. + (@list + ($($tail:tt)*) + ($($head:tt)*) + ) => { + $crate::crossbeam_channel_internal!(@list_error1 $($tail)*) + }; + // Stage 1: check the case type. + (@list_error1 recv $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error2 recv $($tail)*) + }; + (@list_error1 send $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error2 send $($tail)*) + }; + (@list_error1 default $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error2 default $($tail)*) + }; + (@list_error1 $t:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected one of `recv`, `send`, or `default`, found `", + stringify!($t), + "`", + ) + ) + }; + (@list_error1 $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error2 $($tail)*); + }; + // Stage 2: check the argument list. + (@list_error2 $case:ident) => { + compile_error!( + concat!( + "missing argument list after `", + stringify!($case), + "`", + ) + ) + }; + (@list_error2 $case:ident => $($tail:tt)*) => { + compile_error!( + concat!( + "missing argument list after `", + stringify!($case), + "`", + ) + ) + }; + (@list_error2 $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error3 $($tail)*) + }; + // Stage 3: check the `=>` and what comes after it. + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)*) => { + compile_error!( + concat!( + "missing `=>` after `", + stringify!($case), + "` case", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* =>) => { + compile_error!( + "expected expression after `=>`" + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $body:expr; $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma instead of the semicolon after `", + stringify!($body), + "`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => recv($($a:tt)*) $($tail:tt)*) => { + compile_error!( + "expected an expression after `=>`" + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => send($($a:tt)*) $($tail:tt)*) => { + compile_error!( + "expected an expression after `=>`" + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => default($($a:tt)*) $($tail:tt)*) => { + compile_error!( + "expected an expression after `=>`" + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $f:ident($($a:tt)*) $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma after `", + stringify!($f), + "(", + stringify!($($a)*), + ")`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $f:ident!($($a:tt)*) $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma after `", + stringify!($f), + "!(", + stringify!($($a)*), + ")`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $f:ident![$($a:tt)*] $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma after `", + stringify!($f), + "![", + stringify!($($a)*), + "]`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $f:ident!{$($a:tt)*} $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma after `", + stringify!($f), + "!{", + stringify!($($a)*), + "}`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) $(-> $r:pat)* => $body:tt $($tail:tt)*) => { + compile_error!( + concat!( + "did you mean to put a comma after `", + stringify!($body), + "`?", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) -> => $($tail:tt)*) => { + compile_error!("missing pattern after `->`") + }; + (@list_error3 $case:ident($($args:tt)*) $t:tt $(-> $r:pat)* => $($tail:tt)*) => { + compile_error!( + concat!( + "expected `->`, found `", + stringify!($t), + "`", + ) + ) + }; + (@list_error3 $case:ident($($args:tt)*) -> $t:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected a pattern, found `", + stringify!($t), + "`", + ) + ) + }; + (@list_error3 recv($($args:tt)*) $t:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected `->`, found `", + stringify!($t), + "`", + ) + ) + }; + (@list_error3 send($($args:tt)*) $t:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected `->`, found `", + stringify!($t), + "`", + ) + ) + }; + (@list_error3 recv $args:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected an argument list after `recv`, found `", + stringify!($args), + "`", + ) + ) + }; + (@list_error3 send $args:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected an argument list after `send`, found `", + stringify!($args), + "`", + ) + ) + }; + (@list_error3 default $args:tt $($tail:tt)*) => { + compile_error!( + concat!( + "expected an argument list or `=>` after `default`, found `", + stringify!($args), + "`", + ) + ) + }; + (@list_error3 $($tail:tt)*) => { + $crate::crossbeam_channel_internal!(@list_error4 $($tail)*) + }; + // Stage 4: fail with a generic error message. + (@list_error4 $($tail:tt)*) => { + compile_error!("invalid syntax") + }; + + // Success! All cases were parsed. + (@case + () + $cases:tt + $default:tt + ) => { + $crate::crossbeam_channel_internal!( + @init + $cases + $default + ) + }; + + // Check the format of a recv case. + (@case + (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + ($($cases)* recv($r) -> $res => $body,) + $default + ) + }; + // Allow trailing comma... + (@case + (recv($r:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + ($($cases)* recv($r) -> $res => $body,) + $default + ) + }; + // Print an error if the argument list is invalid. + (@case + (recv($($args:tt)*) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + compile_error!( + concat!( + "invalid argument list in `recv(", + stringify!($($args)*), + ")`", + ) + ) + }; + // Print an error if there is no argument list. + (@case + (recv $t:tt $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + compile_error!( + concat!( + "expected an argument list after `recv`, found `", + stringify!($t), + "`", + ) + ) + }; + + // Check the format of a send case. + (@case + (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + ($($cases)* send($s, $m) -> $res => $body,) + $default + ) + }; + // Allow trailing comma... + (@case + (send($s:expr, $m:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + ($($cases)* send($s, $m) -> $res => $body,) + $default + ) + }; + // Print an error if the argument list is invalid. + (@case + (send($($args:tt)*) -> $res:pat => $body:tt, $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + compile_error!( + concat!( + "invalid argument list in `send(", + stringify!($($args)*), + ")`", + ) + ) + }; + // Print an error if there is no argument list. + (@case + (send $t:tt $($tail:tt)*) + ($($cases:tt)*) + $default:tt + ) => { + compile_error!( + concat!( + "expected an argument list after `send`, found `", + stringify!($t), + "`", + ) + ) + }; + + // Check the format of a default case. + (@case + (default() => $body:tt, $($tail:tt)*) + $cases:tt + () + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + $cases + (default() => $body,) + ) + }; + // Check the format of a default case with timeout. + (@case + (default($timeout:expr) => $body:tt, $($tail:tt)*) + $cases:tt + () + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + $cases + (default($timeout) => $body,) + ) + }; + // Allow trailing comma... + (@case + (default($timeout:expr,) => $body:tt, $($tail:tt)*) + $cases:tt + () + ) => { + $crate::crossbeam_channel_internal!( + @case + ($($tail)*) + $cases + (default($timeout) => $body,) + ) + }; + // Check for duplicate default cases... + (@case + (default $($tail:tt)*) + $cases:tt + ($($def:tt)+) + ) => { + compile_error!( + "there can be only one `default` case in a `select!` block" + ) + }; + // Print an error if the argument list is invalid. + (@case + (default($($args:tt)*) => $body:tt, $($tail:tt)*) + $cases:tt + $default:tt + ) => { + compile_error!( + concat!( + "invalid argument list in `default(", + stringify!($($args)*), + ")`", + ) + ) + }; + // Print an error if there is an unexpected token after `default`. + (@case + (default $t:tt $($tail:tt)*) + $cases:tt + $default:tt + ) => { + compile_error!( + concat!( + "expected an argument list or `=>` after `default`, found `", + stringify!($t), + "`", + ) + ) + }; + + // The case was not consumed, therefore it must be invalid. + (@case + ($case:ident $($tail:tt)*) + $cases:tt + $default:tt + ) => { + compile_error!( + concat!( + "expected one of `recv`, `send`, or `default`, found `", + stringify!($case), + "`", + ) + ) + }; + + // Optimize `select!` into `try_recv()`. + (@init + (recv($r:expr) -> $res:pat => $recv_body:tt,) + (default() => $default_body:tt,) + ) => {{ + match $r { + ref _r => { + let _r: &$crate::Receiver<_> = _r; + match _r.try_recv() { + ::std::result::Result::Err($crate::TryRecvError::Empty) => { + $default_body + } + _res => { + let _res = _res.map_err(|_| $crate::RecvError); + let $res = _res; + $recv_body + } + } + } + } + }}; + // Optimize `select!` into `recv()`. + (@init + (recv($r:expr) -> $res:pat => $body:tt,) + () + ) => {{ + match $r { + ref _r => { + let _r: &$crate::Receiver<_> = _r; + let _res = _r.recv(); + let $res = _res; + $body + } + } + }}; + // Optimize `select!` into `recv_timeout()`. + (@init + (recv($r:expr) -> $res:pat => $recv_body:tt,) + (default($timeout:expr) => $default_body:tt,) + ) => {{ + match $r { + ref _r => { + let _r: &$crate::Receiver<_> = _r; + match _r.recv_timeout($timeout) { + ::std::result::Result::Err($crate::RecvTimeoutError::Timeout) => { + $default_body + } + _res => { + let _res = _res.map_err(|_| $crate::RecvError); + let $res = _res; + $recv_body + } + } + } + } + }}; + + // // Optimize the non-blocking case with two receive operations. + // (@init + // (recv($r1:expr) -> $res1:pat => $recv_body1:tt,) + // (recv($r2:expr) -> $res2:pat => $recv_body2:tt,) + // (default() => $default_body:tt,) + // ) => {{ + // match $r1 { + // ref _r1 => { + // let _r1: &$crate::Receiver<_> = _r1; + // + // match $r2 { + // ref _r2 => { + // let _r2: &$crate::Receiver<_> = _r2; + // + // // TODO(stjepang): Implement this optimization. + // } + // } + // } + // } + // }}; + // // Optimize the blocking case with two receive operations. + // (@init + // (recv($r1:expr) -> $res1:pat => $body1:tt,) + // (recv($r2:expr) -> $res2:pat => $body2:tt,) + // () + // ) => {{ + // match $r1 { + // ref _r1 => { + // let _r1: &$crate::Receiver<_> = _r1; + // + // match $r2 { + // ref _r2 => { + // let _r2: &$crate::Receiver<_> = _r2; + // + // // TODO(stjepang): Implement this optimization. + // } + // } + // } + // } + // }}; + // // Optimize the case with two receive operations and a timeout. + // (@init + // (recv($r1:expr) -> $res1:pat => $recv_body1:tt,) + // (recv($r2:expr) -> $res2:pat => $recv_body2:tt,) + // (default($timeout:expr) => $default_body:tt,) + // ) => {{ + // match $r1 { + // ref _r1 => { + // let _r1: &$crate::Receiver<_> = _r1; + // + // match $r2 { + // ref _r2 => { + // let _r2: &$crate::Receiver<_> = _r2; + // + // // TODO(stjepang): Implement this optimization. + // } + // } + // } + // } + // }}; + + // // Optimize `select!` into `try_send()`. + // (@init + // (send($s:expr, $m:expr) -> $res:pat => $send_body:tt,) + // (default() => $default_body:tt,) + // ) => {{ + // match $s { + // ref _s => { + // let _s: &$crate::Sender<_> = _s; + // // TODO(stjepang): Implement this optimization. + // } + // } + // }}; + // // Optimize `select!` into `send()`. + // (@init + // (send($s:expr, $m:expr) -> $res:pat => $body:tt,) + // () + // ) => {{ + // match $s { + // ref _s => { + // let _s: &$crate::Sender<_> = _s; + // // TODO(stjepang): Implement this optimization. + // } + // } + // }}; + // // Optimize `select!` into `send_timeout()`. + // (@init + // (send($s:expr, $m:expr) -> $res:pat => $body:tt,) + // (default($timeout:expr) => $body:tt,) + // ) => {{ + // match $s { + // ref _s => { + // let _s: &$crate::Sender<_> = _s; + // // TODO(stjepang): Implement this optimization. + // } + // } + // }}; + + // Create the list of handles and add operations to it. + (@init + ($($cases:tt)*) + $default:tt + ) => {{ + const _LEN: usize = $crate::crossbeam_channel_internal!(@count ($($cases)*)); + let _handle: &$crate::internal::SelectHandle = &$crate::never::<()>(); + + #[allow(unused_mut)] + let mut _sel = [(_handle, 0, ::std::ptr::null()); _LEN]; + + $crate::crossbeam_channel_internal!( + @add + _sel + ($($cases)*) + $default + ( + (0usize _oper0) + (1usize _oper1) + (2usize _oper2) + (3usize _oper3) + (4usize _oper4) + (5usize _oper5) + (6usize _oper6) + (7usize _oper7) + (8usize _oper8) + (9usize _oper9) + (10usize _oper10) + (11usize _oper11) + (12usize _oper12) + (13usize _oper13) + (14usize _oper14) + (15usize _oper15) + (16usize _oper16) + (17usize _oper17) + (18usize _oper18) + (19usize _oper19) + (20usize _oper20) + (21usize _oper21) + (22usize _oper22) + (23usize _oper23) + (24usize _oper24) + (25usize _oper25) + (26usize _oper26) + (27usize _oper27) + (28usize _oper28) + (29usize _oper29) + (30usize _oper30) + (31usize _oper31) + ) + () + ) + }}; + + // Count the listed cases. + (@count ()) => { + 0 + }; + (@count ($oper:ident $args:tt -> $res:pat => $body:tt, $($cases:tt)*)) => { + 1 + $crate::crossbeam_channel_internal!(@count ($($cases)*)) + }; + + // Run blocking selection. + (@add + $sel:ident + () + () + $labels:tt + $cases:tt + ) => {{ + let _oper: $crate::SelectedOperation<'_> = { + let _oper = $crate::internal::select(&mut $sel); + + // Erase the lifetime so that `sel` can be dropped early even without NLL. + unsafe { ::std::mem::transmute(_oper) } + }; + + $crate::crossbeam_channel_internal! { + @complete + $sel + _oper + $cases + } + }}; + // Run non-blocking selection. + (@add + $sel:ident + () + (default() => $body:tt,) + $labels:tt + $cases:tt + ) => {{ + let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { + let _oper = $crate::internal::try_select(&mut $sel); + + // Erase the lifetime so that `sel` can be dropped early even without NLL. + unsafe { ::std::mem::transmute(_oper) } + }; + + match _oper { + None => { + { $sel }; + $body + } + Some(_oper) => { + $crate::crossbeam_channel_internal! { + @complete + $sel + _oper + $cases + } + } + } + }}; + // Run selection with a timeout. + (@add + $sel:ident + () + (default($timeout:expr) => $body:tt,) + $labels:tt + $cases:tt + ) => {{ + let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { + let _oper = $crate::internal::select_timeout(&mut $sel, $timeout); + + // Erase the lifetime so that `sel` can be dropped early even without NLL. + unsafe { ::std::mem::transmute(_oper) } + }; + + match _oper { + ::std::option::Option::None => { + { $sel }; + $body + } + ::std::option::Option::Some(_oper) => { + $crate::crossbeam_channel_internal! { + @complete + $sel + _oper + $cases + } + } + } + }}; + // Have we used up all labels? + (@add + $sel:ident + $input:tt + $default:tt + () + $cases:tt + ) => { + compile_error!("too many operations in a `select!` block") + }; + // Add a receive operation to `sel`. + (@add + $sel:ident + (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*) + $default:tt + (($i:tt $var:ident) $($labels:tt)*) + ($($cases:tt)*) + ) => {{ + match $r { + ref _r => { + let $var: &$crate::Receiver<_> = unsafe { + let _r: &$crate::Receiver<_> = _r; + + // Erase the lifetime so that `sel` can be dropped early even without NLL. + unsafe fn unbind<'a, T>(x: &T) -> &'a T { + ::std::mem::transmute(x) + } + unbind(_r) + }; + $sel[$i] = ($var, $i, $var as *const $crate::Receiver<_> as *const u8); + + $crate::crossbeam_channel_internal!( + @add + $sel + ($($tail)*) + $default + ($($labels)*) + ($($cases)* [$i] recv($var) -> $res => $body,) + ) + } + } + }}; + // Add a send operation to `sel`. + (@add + $sel:ident + (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*) + $default:tt + (($i:tt $var:ident) $($labels:tt)*) + ($($cases:tt)*) + ) => {{ + match $s { + ref _s => { + let $var: &$crate::Sender<_> = unsafe { + let _s: &$crate::Sender<_> = _s; + + // Erase the lifetime so that `sel` can be dropped early even without NLL. + unsafe fn unbind<'a, T>(x: &T) -> &'a T { + ::std::mem::transmute(x) + } + unbind(_s) + }; + $sel[$i] = ($var, $i, $var as *const $crate::Sender<_> as *const u8); + + $crate::crossbeam_channel_internal!( + @add + $sel + ($($tail)*) + $default + ($($labels)*) + ($($cases)* [$i] send($var, $m) -> $res => $body,) + ) + } + } + }}; + + // Complete a receive operation. + (@complete + $sel:ident + $oper:ident + ([$i:tt] recv($r:ident) -> $res:pat => $body:tt, $($tail:tt)*) + ) => {{ + if $oper.index() == $i { + let _res = $oper.recv($r); + { $sel }; + + let $res = _res; + $body + } else { + $crate::crossbeam_channel_internal! { + @complete + $sel + $oper + ($($tail)*) + } + } + }}; + // Complete a send operation. + (@complete + $sel:ident + $oper:ident + ([$i:tt] send($s:ident, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*) + ) => {{ + if $oper.index() == $i { + let _res = $oper.send($s, $m); + { $sel }; + + let $res = _res; + $body + } else { + $crate::crossbeam_channel_internal! { + @complete + $sel + $oper + ($($tail)*) + } + } + }}; + // Panic if we don't identify the selected case, but this should never happen. + (@complete + $sel:ident + $oper:ident + () + ) => {{ + unreachable!( + "internal error in crossbeam-channel: invalid case" + ) + }}; + + // Catches a bug within this macro (should not happen). + (@$($tokens:tt)*) => { + compile_error!( + concat!( + "internal error in crossbeam-channel: ", + stringify!(@$($tokens)*), + ) + ) + }; + + // The entry points. + () => { + compile_error!("empty `select!` block") + }; + ($($case:ident $(($($args:tt)*))* => $body:expr $(,)*)*) => { + $crate::crossbeam_channel_internal!( + @list + ($($case $(($($args)*))* => { $body },)*) + () + ) + }; + ($($tokens:tt)*) => { + $crate::crossbeam_channel_internal!( + @list + ($($tokens)*) + () + ) + }; +} + +/// Selects from a set of channel operations. +/// +/// This macro allows you to define a set of channel operations, wait until any one of them becomes +/// ready, and finally execute it. If multiple operations are ready at the same time, a random one +/// among them is selected. +/// +/// It is also possible to define a `default` case that gets executed if none of the operations are +/// ready, either right away or for a certain duration of time. +/// +/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even +/// when it will simply return an error because the channel is disconnected. +/// +/// The `select` macro is a convenience wrapper around [`Select`]. However, it cannot select over a +/// dynamically created list of channel operations. +/// +/// [`Select`]: super::Select +/// +/// # Examples +/// +/// Block until a send or a receive operation is selected: +/// +/// ``` +/// use crossbeam_channel::{select, unbounded}; +/// +/// let (s1, r1) = unbounded(); +/// let (s2, r2) = unbounded(); +/// s1.send(10).unwrap(); +/// +/// // Since both operations are initially ready, a random one will be executed. +/// select! { +/// recv(r1) -> msg => assert_eq!(msg, Ok(10)), +/// send(s2, 20) -> res => { +/// assert_eq!(res, Ok(())); +/// assert_eq!(r2.recv(), Ok(20)); +/// } +/// } +/// ``` +/// +/// Select from a set of operations without blocking: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::{select, unbounded}; +/// +/// let (s1, r1) = unbounded(); +/// let (s2, r2) = unbounded(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_secs(1)); +/// s1.send(10).unwrap(); +/// }); +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// s2.send(20).unwrap(); +/// }); +/// +/// // None of the operations are initially ready. +/// select! { +/// recv(r1) -> msg => panic!(), +/// recv(r2) -> msg => panic!(), +/// default => println!("not ready"), +/// } +/// ``` +/// +/// Select over a set of operations with a timeout: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::{select, unbounded}; +/// +/// let (s1, r1) = unbounded(); +/// let (s2, r2) = unbounded(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_secs(1)); +/// s1.send(10).unwrap(); +/// }); +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// s2.send(20).unwrap(); +/// }); +/// +/// // None of the two operations will become ready within 100 milliseconds. +/// select! { +/// recv(r1) -> msg => panic!(), +/// recv(r2) -> msg => panic!(), +/// default(Duration::from_millis(100)) => println!("timed out"), +/// } +/// ``` +/// +/// Optionally add a receive operation to `select!` using [`never`]: +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_channel::{select, never, unbounded}; +/// +/// let (s1, r1) = unbounded(); +/// let (s2, r2) = unbounded(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_secs(1)); +/// s1.send(10).unwrap(); +/// }); +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// s2.send(20).unwrap(); +/// }); +/// +/// // This receiver can be a `Some` or a `None`. +/// let r2 = Some(&r2); +/// +/// // None of the two operations will become ready within 100 milliseconds. +/// select! { +/// recv(r1) -> msg => panic!(), +/// recv(r2.unwrap_or(&never())) -> msg => assert_eq!(msg, Ok(20)), +/// } +/// ``` +/// +/// To optionally add a timeout to `select!`, see the [example] for [`never`]. +/// +/// [`never`]: super::never +/// [example]: super::never#examples +#[macro_export] +macro_rules! select { + ($($tokens:tt)*) => { + $crate::crossbeam_channel_internal!( + $($tokens)* + ) + }; +} diff --git a/third_party/rust/crossbeam-channel/src/utils.rs b/third_party/rust/crossbeam-channel/src/utils.rs new file mode 100644 index 0000000000..3fe171b2e4 --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/utils.rs @@ -0,0 +1,112 @@ +//! Miscellaneous utilities. + +use std::cell::{Cell, UnsafeCell}; +use std::num::Wrapping; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_utils::Backoff; + +/// Randomly shuffles a slice. +pub fn shuffle<T>(v: &mut [T]) { + let len = v.len(); + if len <= 1 { + return; + } + + thread_local! { + static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647)); + } + + let _ = RNG.try_with(|rng| { + for i in 1..len { + // This is the 32-bit variant of Xorshift. + // + // Source: https://en.wikipedia.org/wiki/Xorshift + let mut x = rng.get(); + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng.set(x); + + let x = x.0; + let n = i + 1; + + // This is a fast alternative to `let j = x % n`. + // + // Author: Daniel Lemire + // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + let j = ((x as u64).wrapping_mul(n as u64) >> 32) as u32 as usize; + + v.swap(i, j); + } + }); +} + +/// Sleeps until the deadline, or forever if the deadline isn't specified. +pub fn sleep_until(deadline: Option<Instant>) { + loop { + match deadline { + None => thread::sleep(Duration::from_secs(1000)), + Some(d) => { + let now = Instant::now(); + if now >= d { + break; + } + thread::sleep(d - now); + } + } + } +} + +/// A simple spinlock. +pub struct Spinlock<T> { + flag: AtomicBool, + value: UnsafeCell<T>, +} + +impl<T> Spinlock<T> { + /// Returns a new spinlock initialized with `value`. + pub fn new(value: T) -> Spinlock<T> { + Spinlock { + flag: AtomicBool::new(false), + value: UnsafeCell::new(value), + } + } + + /// Locks the spinlock. + pub fn lock(&self) -> SpinlockGuard<'_, T> { + let backoff = Backoff::new(); + while self.flag.swap(true, Ordering::Acquire) { + backoff.snooze(); + } + SpinlockGuard { parent: self } + } +} + +/// A guard holding a spinlock locked. +pub struct SpinlockGuard<'a, T> { + parent: &'a Spinlock<T>, +} + +impl<T> Drop for SpinlockGuard<'_, T> { + fn drop(&mut self) { + self.parent.flag.store(false, Ordering::Release); + } +} + +impl<T> Deref for SpinlockGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +impl<T> DerefMut for SpinlockGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.parent.value.get() } + } +} diff --git a/third_party/rust/crossbeam-channel/src/waker.rs b/third_party/rust/crossbeam-channel/src/waker.rs new file mode 100644 index 0000000000..3d0af2616c --- /dev/null +++ b/third_party/rust/crossbeam-channel/src/waker.rs @@ -0,0 +1,287 @@ +//! Waking mechanism for threads blocked on channel operations. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread::{self, ThreadId}; + +use crate::context::Context; +use crate::select::{Operation, Selected}; +use crate::utils::Spinlock; + +/// Represents a thread blocked on a specific channel operation. +pub struct Entry { + /// The operation. + pub oper: Operation, + + /// Optional packet. + pub packet: usize, + + /// Context associated with the thread owning this operation. + pub cx: Context, +} + +/// A queue of threads blocked on channel operations. +/// +/// This data structure is used by threads to register blocking operations and get woken up once +/// an operation becomes ready. +pub struct Waker { + /// A list of select operations. + selectors: Vec<Entry>, + + /// A list of operations waiting to be ready. + observers: Vec<Entry>, +} + +impl Waker { + /// Creates a new `Waker`. + #[inline] + pub fn new() -> Self { + Waker { + selectors: Vec::new(), + observers: Vec::new(), + } + } + + /// Registers a select operation. + #[inline] + pub fn register(&mut self, oper: Operation, cx: &Context) { + self.register_with_packet(oper, 0, cx); + } + + /// Registers a select operation and a packet. + #[inline] + pub fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) { + self.selectors.push(Entry { + oper, + packet, + cx: cx.clone(), + }); + } + + /// Unregisters a select operation. + #[inline] + pub fn unregister(&mut self, oper: Operation) -> Option<Entry> { + if let Some((i, _)) = self + .selectors + .iter() + .enumerate() + .find(|&(_, entry)| entry.oper == oper) + { + let entry = self.selectors.remove(i); + Some(entry) + } else { + None + } + } + + /// Attempts to find another thread's entry, select the operation, and wake it up. + #[inline] + pub fn try_select(&mut self) -> Option<Entry> { + let mut entry = None; + + if !self.selectors.is_empty() { + let thread_id = current_thread_id(); + + for i in 0..self.selectors.len() { + // Does the entry belong to a different thread? + if self.selectors[i].cx.thread_id() != thread_id { + // Try selecting this operation. + let sel = Selected::Operation(self.selectors[i].oper); + let res = self.selectors[i].cx.try_select(sel); + + if res.is_ok() { + // Provide the packet. + self.selectors[i].cx.store_packet(self.selectors[i].packet); + // Wake the thread up. + self.selectors[i].cx.unpark(); + + // Remove the entry from the queue to keep it clean and improve + // performance. + entry = Some(self.selectors.remove(i)); + break; + } + } + } + } + + entry + } + + /// Returns `true` if there is an entry which can be selected by the current thread. + #[inline] + pub fn can_select(&self) -> bool { + if self.selectors.is_empty() { + false + } else { + let thread_id = current_thread_id(); + + self.selectors.iter().any(|entry| { + entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting + }) + } + } + + /// Registers an operation waiting to be ready. + #[inline] + pub fn watch(&mut self, oper: Operation, cx: &Context) { + self.observers.push(Entry { + oper, + packet: 0, + cx: cx.clone(), + }); + } + + /// Unregisters an operation waiting to be ready. + #[inline] + pub fn unwatch(&mut self, oper: Operation) { + self.observers.retain(|e| e.oper != oper); + } + + /// Notifies all operations waiting to be ready. + #[inline] + pub fn notify(&mut self) { + for entry in self.observers.drain(..) { + if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { + entry.cx.unpark(); + } + } + } + + /// Notifies all registered operations that the channel is disconnected. + #[inline] + pub fn disconnect(&mut self) { + for entry in self.selectors.iter() { + if entry.cx.try_select(Selected::Disconnected).is_ok() { + // Wake the thread up. + // + // Here we don't remove the entry from the queue. Registered threads must + // unregister from the waker by themselves. They might also want to recover the + // packet value and destroy it, if necessary. + entry.cx.unpark(); + } + } + + self.notify(); + } +} + +impl Drop for Waker { + #[inline] + fn drop(&mut self) { + debug_assert_eq!(self.selectors.len(), 0); + debug_assert_eq!(self.observers.len(), 0); + } +} + +/// A waker that can be shared among threads without locking. +/// +/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. +pub struct SyncWaker { + /// The inner `Waker`. + inner: Spinlock<Waker>, + + /// `true` if the waker is empty. + is_empty: AtomicBool, +} + +impl SyncWaker { + /// Creates a new `SyncWaker`. + #[inline] + pub fn new() -> Self { + SyncWaker { + inner: Spinlock::new(Waker::new()), + is_empty: AtomicBool::new(true), + } + } + + /// Registers the current thread with an operation. + #[inline] + pub fn register(&self, oper: Operation, cx: &Context) { + let mut inner = self.inner.lock(); + inner.register(oper, cx); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } + + /// Unregisters an operation previously registered by the current thread. + #[inline] + pub fn unregister(&self, oper: Operation) -> Option<Entry> { + let mut inner = self.inner.lock(); + let entry = inner.unregister(oper); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + entry + } + + /// Attempts to find one thread (not the current one), select its operation, and wake it up. + #[inline] + pub fn notify(&self) { + if !self.is_empty.load(Ordering::SeqCst) { + let mut inner = self.inner.lock(); + if !self.is_empty.load(Ordering::SeqCst) { + inner.try_select(); + inner.notify(); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } + } + } + + /// Registers an operation waiting to be ready. + #[inline] + pub fn watch(&self, oper: Operation, cx: &Context) { + let mut inner = self.inner.lock(); + inner.watch(oper, cx); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } + + /// Unregisters an operation waiting to be ready. + #[inline] + pub fn unwatch(&self, oper: Operation) { + let mut inner = self.inner.lock(); + inner.unwatch(oper); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } + + /// Notifies all threads that the channel is disconnected. + #[inline] + pub fn disconnect(&self) { + let mut inner = self.inner.lock(); + inner.disconnect(); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } +} + +impl Drop for SyncWaker { + #[inline] + fn drop(&mut self) { + debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true); + } +} + +/// Returns the id of the current thread. +#[inline] +fn current_thread_id() -> ThreadId { + thread_local! { + /// Cached thread-local id. + static THREAD_ID: ThreadId = thread::current().id(); + } + + THREAD_ID + .try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) +} |