summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-channel/src/channel.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/crossbeam-channel/src/channel.rs
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-channel/src/channel.rs')
-rw-r--r--third_party/rust/crossbeam-channel/src/channel.rs1511
1 files changed, 1511 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/channel.rs b/third_party/rust/crossbeam-channel/src/channel.rs
new file mode 100644
index 0000000000..800fe63527
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/channel.rs
@@ -0,0 +1,1511 @@
+//! The channel interface.
+
+use std::fmt;
+use std::iter::FusedIterator;
+use std::mem;
+use std::panic::{RefUnwindSafe, UnwindSafe};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use crate::context::Context;
+use crate::counter;
+use crate::err::{
+ RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
+};
+use crate::flavors;
+use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
+
+/// Creates a channel of unbounded capacity.
+///
+/// This channel has a growable buffer that can hold any number of messages at a time.
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s, r) = unbounded();
+///
+/// // Computes the n-th Fibonacci number.
+/// fn fib(n: i32) -> i32 {
+/// if n <= 1 {
+/// n
+/// } else {
+/// fib(n - 1) + fib(n - 2)
+/// }
+/// }
+///
+/// // Spawn an asynchronous computation.
+/// thread::spawn(move || s.send(fib(20)).unwrap());
+///
+/// // Print the result of the computation.
+/// println!("{}", r.recv().unwrap());
+/// ```
+pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
+ let (s, r) = counter::new(flavors::list::Channel::new());
+ let s = Sender {
+ flavor: SenderFlavor::List(s),
+ };
+ let r = Receiver {
+ flavor: ReceiverFlavor::List(r),
+ };
+ (s, r)
+}
+
+/// Creates a channel of bounded capacity.
+///
+/// This channel has a buffer that can hold at most `cap` messages at a time.
+///
+/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
+/// receive operations must appear at the same time in order to pair up and pass the message over.
+///
+/// # Examples
+///
+/// A channel of capacity 1:
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_channel::bounded;
+///
+/// let (s, r) = bounded(1);
+///
+/// // This call returns immediately because there is enough space in the channel.
+/// s.send(1).unwrap();
+///
+/// thread::spawn(move || {
+/// // This call blocks the current thread because the channel is full.
+/// // It will be able to complete only after the first message is received.
+/// s.send(2).unwrap();
+/// });
+///
+/// thread::sleep(Duration::from_secs(1));
+/// assert_eq!(r.recv(), Ok(1));
+/// assert_eq!(r.recv(), Ok(2));
+/// ```
+///
+/// A zero-capacity channel:
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_channel::bounded;
+///
+/// let (s, r) = bounded(0);
+///
+/// thread::spawn(move || {
+/// // This call blocks the current thread until a receive operation appears
+/// // on the other side of the channel.
+/// s.send(1).unwrap();
+/// });
+///
+/// thread::sleep(Duration::from_secs(1));
+/// assert_eq!(r.recv(), Ok(1));
+/// ```
+pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
+ if cap == 0 {
+ let (s, r) = counter::new(flavors::zero::Channel::new());
+ let s = Sender {
+ flavor: SenderFlavor::Zero(s),
+ };
+ let r = Receiver {
+ flavor: ReceiverFlavor::Zero(r),
+ };
+ (s, r)
+ } else {
+ let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
+ let s = Sender {
+ flavor: SenderFlavor::Array(s),
+ };
+ let r = Receiver {
+ flavor: ReceiverFlavor::Array(r),
+ };
+ (s, r)
+ }
+}
+
+/// Creates a receiver that delivers a message after a certain duration of time.
+///
+/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
+/// be sent into the channel after `duration` elapses. The message is the instant at which it is
+/// sent.
+///
+/// # Examples
+///
+/// Using an `after` channel for timeouts:
+///
+/// ```
+/// use std::time::Duration;
+/// use crossbeam_channel::{after, select, unbounded};
+///
+/// let (s, r) = unbounded::<i32>();
+/// let timeout = Duration::from_millis(100);
+///
+/// select! {
+/// recv(r) -> msg => println!("received {:?}", msg),
+/// recv(after(timeout)) -> _ => println!("timed out"),
+/// }
+/// ```
+///
+/// When the message gets sent:
+///
+/// ```
+/// use std::thread;
+/// use std::time::{Duration, Instant};
+/// use crossbeam_channel::after;
+///
+/// // Converts a number of milliseconds into a `Duration`.
+/// let ms = |ms| Duration::from_millis(ms);
+///
+/// // Returns `true` if `a` and `b` are very close `Instant`s.
+/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
+///
+/// let start = Instant::now();
+/// let r = after(ms(100));
+///
+/// thread::sleep(ms(500));
+///
+/// // This message was sent 100 ms from the start and received 500 ms from the start.
+/// assert!(eq(r.recv().unwrap(), start + ms(100)));
+/// assert!(eq(Instant::now(), start + ms(500)));
+/// ```
+pub fn after(duration: Duration) -> Receiver<Instant> {
+ Receiver {
+ flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
+ }
+}
+
+/// Creates a receiver that delivers a message at a certain instant in time.
+///
+/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
+/// be sent into the channel at the moment in time `when`. The message is the instant at which it
+/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
+/// instantly to the receiver.
+///
+/// # Examples
+///
+/// Using an `at` channel for timeouts:
+///
+/// ```
+/// use std::time::{Instant, Duration};
+/// use crossbeam_channel::{at, select, unbounded};
+///
+/// let (s, r) = unbounded::<i32>();
+/// let deadline = Instant::now() + Duration::from_millis(500);
+///
+/// select! {
+/// recv(r) -> msg => println!("received {:?}", msg),
+/// recv(at(deadline)) -> _ => println!("timed out"),
+/// }
+/// ```
+///
+/// When the message gets sent:
+///
+/// ```
+/// use std::time::{Duration, Instant};
+/// use crossbeam_channel::at;
+///
+/// // Converts a number of milliseconds into a `Duration`.
+/// let ms = |ms| Duration::from_millis(ms);
+///
+/// let start = Instant::now();
+/// let end = start + ms(100);
+///
+/// let r = at(end);
+///
+/// // This message was sent 100 ms from the start
+/// assert_eq!(r.recv().unwrap(), end);
+/// assert!(Instant::now() > start + ms(100));
+/// ```
+pub fn at(when: Instant) -> Receiver<Instant> {
+ Receiver {
+ flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
+ }
+}
+
+/// Creates a receiver that never delivers messages.
+///
+/// The channel is bounded with capacity of 0 and never gets disconnected.
+///
+/// # Examples
+///
+/// Using a `never` channel to optionally add a timeout to [`select!`]:
+///
+/// [`select!`]: crate::select!
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_channel::{after, select, never, unbounded};
+///
+/// let (s, r) = unbounded();
+///
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_secs(1));
+/// s.send(1).unwrap();
+/// });
+///
+/// // Suppose this duration can be a `Some` or a `None`.
+/// let duration = Some(Duration::from_millis(100));
+///
+/// // Create a channel that times out after the specified duration.
+/// let timeout = duration
+/// .map(|d| after(d))
+/// .unwrap_or(never());
+///
+/// select! {
+/// recv(r) -> msg => assert_eq!(msg, Ok(1)),
+/// recv(timeout) -> _ => println!("timed out"),
+/// }
+/// ```
+pub fn never<T>() -> Receiver<T> {
+ Receiver {
+ flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
+ }
+}
+
+/// Creates a receiver that delivers messages periodically.
+///
+/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
+/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
+/// sent.
+///
+/// # Examples
+///
+/// Using a `tick` channel to periodically print elapsed time:
+///
+/// ```
+/// use std::time::{Duration, Instant};
+/// use crossbeam_channel::tick;
+///
+/// let start = Instant::now();
+/// let ticker = tick(Duration::from_millis(100));
+///
+/// for _ in 0..5 {
+/// ticker.recv().unwrap();
+/// println!("elapsed: {:?}", start.elapsed());
+/// }
+/// ```
+///
+/// When messages get sent:
+///
+/// ```
+/// use std::thread;
+/// use std::time::{Duration, Instant};
+/// use crossbeam_channel::tick;
+///
+/// // Converts a number of milliseconds into a `Duration`.
+/// let ms = |ms| Duration::from_millis(ms);
+///
+/// // Returns `true` if `a` and `b` are very close `Instant`s.
+/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
+///
+/// let start = Instant::now();
+/// let r = tick(ms(100));
+///
+/// // This message was sent 100 ms from the start and received 100 ms from the start.
+/// assert!(eq(r.recv().unwrap(), start + ms(100)));
+/// assert!(eq(Instant::now(), start + ms(100)));
+///
+/// thread::sleep(ms(500));
+///
+/// // This message was sent 200 ms from the start and received 600 ms from the start.
+/// assert!(eq(r.recv().unwrap(), start + ms(200)));
+/// assert!(eq(Instant::now(), start + ms(600)));
+///
+/// // This message was sent 700 ms from the start and received 700 ms from the start.
+/// assert!(eq(r.recv().unwrap(), start + ms(700)));
+/// assert!(eq(Instant::now(), start + ms(700)));
+/// ```
+pub fn tick(duration: Duration) -> Receiver<Instant> {
+ Receiver {
+ flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
+ }
+}
+
+/// The sending side of a channel.
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s1, r) = unbounded();
+/// let s2 = s1.clone();
+///
+/// thread::spawn(move || s1.send(1).unwrap());
+/// thread::spawn(move || s2.send(2).unwrap());
+///
+/// let msg1 = r.recv().unwrap();
+/// let msg2 = r.recv().unwrap();
+///
+/// assert_eq!(msg1 + msg2, 3);
+/// ```
+pub struct Sender<T> {
+ flavor: SenderFlavor<T>,
+}
+
+/// Sender flavors.
+enum SenderFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Sender<flavors::array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Sender<flavors::list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Sender<flavors::zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Sync for Sender<T> {}
+
+impl<T> UnwindSafe for Sender<T> {}
+impl<T> RefUnwindSafe for Sender<T> {}
+
+impl<T> Sender<T> {
+ /// Attempts to send a message into the channel without blocking.
+ ///
+ /// This method will either send a message into the channel immediately or return an error if
+ /// the channel is full or disconnected. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will send the message only if there
+ /// happens to be a receive operation on the other side of the channel at the same time.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, TrySendError};
+ ///
+ /// let (s, r) = bounded(1);
+ ///
+ /// assert_eq!(s.try_send(1), Ok(()));
+ /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
+ ///
+ /// drop(r);
+ /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
+ /// ```
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.try_send(msg),
+ SenderFlavor::List(chan) => chan.try_send(msg),
+ SenderFlavor::Zero(chan) => chan.try_send(msg),
+ }
+ }
+
+ /// Blocks the current thread until a message is sent or the channel is disconnected.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed. If the channel becomes disconnected, this call will wake up and return an
+ /// error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{bounded, SendError};
+ ///
+ /// let (s, r) = bounded(1);
+ /// assert_eq!(s.send(1), Ok(()));
+ ///
+ /// thread::spawn(move || {
+ /// assert_eq!(r.recv(), Ok(1));
+ /// thread::sleep(Duration::from_secs(1));
+ /// drop(r);
+ /// });
+ ///
+ /// assert_eq!(s.send(2), Ok(()));
+ /// assert_eq!(s.send(3), Err(SendError(3)));
+ /// ```
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, None),
+ SenderFlavor::List(chan) => chan.send(msg, None),
+ SenderFlavor::Zero(chan) => chan.send(msg, None),
+ }
+ .map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => SendError(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+
+ /// Waits for a message to be sent into the channel, but only for a limited time.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{bounded, SendTimeoutError};
+ ///
+ /// let (s, r) = bounded(0);
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// assert_eq!(r.recv(), Ok(2));
+ /// drop(r);
+ /// });
+ ///
+ /// assert_eq!(
+ /// s.send_timeout(1, Duration::from_millis(500)),
+ /// Err(SendTimeoutError::Timeout(1)),
+ /// );
+ /// assert_eq!(
+ /// s.send_timeout(2, Duration::from_secs(1)),
+ /// Ok(()),
+ /// );
+ /// assert_eq!(
+ /// s.send_timeout(3, Duration::from_millis(500)),
+ /// Err(SendTimeoutError::Disconnected(3)),
+ /// );
+ /// ```
+ pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
+ self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout))
+ }
+
+ /// Waits for a message to be sent into the channel, but only until a given deadline.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_channel::{bounded, SendTimeoutError};
+ ///
+ /// let (s, r) = bounded(0);
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// assert_eq!(r.recv(), Ok(2));
+ /// drop(r);
+ /// });
+ ///
+ /// let now = Instant::now();
+ ///
+ /// assert_eq!(
+ /// s.send_deadline(1, now + Duration::from_millis(500)),
+ /// Err(SendTimeoutError::Timeout(1)),
+ /// );
+ /// assert_eq!(
+ /// s.send_deadline(2, now + Duration::from_millis(1500)),
+ /// Ok(()),
+ /// );
+ /// assert_eq!(
+ /// s.send_deadline(3, now + Duration::from_millis(2000)),
+ /// Err(SendTimeoutError::Disconnected(3)),
+ /// );
+ /// ```
+ pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
+ }
+ }
+
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded();
+ /// assert!(s.is_empty());
+ ///
+ /// s.send(0).unwrap();
+ /// assert!(!s.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_empty(),
+ SenderFlavor::List(chan) => chan.is_empty(),
+ SenderFlavor::Zero(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::bounded;
+ ///
+ /// let (s, r) = bounded(1);
+ ///
+ /// assert!(!s.is_full());
+ /// s.send(0).unwrap();
+ /// assert!(s.is_full());
+ /// ```
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_full(),
+ SenderFlavor::List(chan) => chan.is_full(),
+ SenderFlavor::Zero(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded();
+ /// assert_eq!(s.len(), 0);
+ ///
+ /// s.send(1).unwrap();
+ /// s.send(2).unwrap();
+ /// assert_eq!(s.len(), 2);
+ /// ```
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.len(),
+ SenderFlavor::List(chan) => chan.len(),
+ SenderFlavor::Zero(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, unbounded};
+ ///
+ /// let (s, _) = unbounded::<i32>();
+ /// assert_eq!(s.capacity(), None);
+ ///
+ /// let (s, _) = bounded::<i32>(5);
+ /// assert_eq!(s.capacity(), Some(5));
+ ///
+ /// let (s, _) = bounded::<i32>(0);
+ /// assert_eq!(s.capacity(), Some(0));
+ /// ```
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.capacity(),
+ SenderFlavor::List(chan) => chan.capacity(),
+ SenderFlavor::Zero(chan) => chan.capacity(),
+ }
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, _) = unbounded::<usize>();
+ ///
+ /// let s2 = s.clone();
+ /// assert!(s.same_channel(&s2));
+ ///
+ /// let (s3, _) = unbounded();
+ /// assert!(!s.same_channel(&s3));
+ /// ```
+ pub fn same_channel(&self, other: &Sender<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
+ (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
+ (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
+ SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ }
+ }
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
+ SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
+ SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
+ };
+
+ Sender { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Sender { .. }")
+ }
+}
+
+/// The receiving side of a channel.
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s, r) = unbounded();
+///
+/// thread::spawn(move || {
+/// let _ = s.send(1);
+/// thread::sleep(Duration::from_secs(1));
+/// let _ = s.send(2);
+/// });
+///
+/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
+/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
+/// ```
+pub struct Receiver<T> {
+ flavor: ReceiverFlavor<T>,
+}
+
+/// Receiver flavors.
+enum ReceiverFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Receiver<flavors::array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Receiver<flavors::list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Receiver<flavors::zero::Channel<T>>),
+
+ /// The after flavor.
+ At(Arc<flavors::at::Channel>),
+
+ /// The tick flavor.
+ Tick(Arc<flavors::tick::Channel>),
+
+ /// The never flavor.
+ Never(flavors::never::Channel<T>),
+}
+
+unsafe impl<T: Send> Send for Receiver<T> {}
+unsafe impl<T: Send> Sync for Receiver<T> {}
+
+impl<T> UnwindSafe for Receiver<T> {}
+impl<T> RefUnwindSafe for Receiver<T> {}
+
+impl<T> Receiver<T> {
+ /// Attempts to receive a message from the channel without blocking.
+ ///
+ /// This method will either receive a message from the channel immediately or return an error
+ /// if the channel is empty.
+ ///
+ /// If called on a zero-capacity channel, this method will receive a message only if there
+ /// happens to be a send operation on the other side of the channel at the same time.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, TryRecvError};
+ ///
+ /// let (s, r) = unbounded();
+ /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
+ ///
+ /// s.send(5).unwrap();
+ /// drop(s);
+ ///
+ /// assert_eq!(r.try_recv(), Ok(5));
+ /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
+ /// ```
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.try_recv(),
+ ReceiverFlavor::List(chan) => chan.try_recv(),
+ ReceiverFlavor::Zero(chan) => chan.try_recv(),
+ ReceiverFlavor::At(chan) => {
+ let msg = chan.try_recv();
+ unsafe {
+ mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
+ &msg,
+ )
+ }
+ }
+ ReceiverFlavor::Tick(chan) => {
+ let msg = chan.try_recv();
+ unsafe {
+ mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
+ &msg,
+ )
+ }
+ }
+ ReceiverFlavor::Never(chan) => chan.try_recv(),
+ }
+ }
+
+ /// Blocks the current thread until a message is received or the channel is empty and
+ /// disconnected.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed. If the channel is empty and becomes disconnected, this call will
+ /// wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, RecvError};
+ ///
+ /// let (s, r) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s.send(5).unwrap();
+ /// drop(s);
+ /// });
+ ///
+ /// assert_eq!(r.recv(), Ok(5));
+ /// assert_eq!(r.recv(), Err(RecvError));
+ /// ```
+ pub fn recv(&self) -> Result<T, RecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(None),
+ ReceiverFlavor::List(chan) => chan.recv(None),
+ ReceiverFlavor::Zero(chan) => chan.recv(None),
+ ReceiverFlavor::At(chan) => {
+ let msg = chan.recv(None);
+ unsafe {
+ mem::transmute_copy::<
+ Result<Instant, RecvTimeoutError>,
+ Result<T, RecvTimeoutError>,
+ >(&msg)
+ }
+ }
+ ReceiverFlavor::Tick(chan) => {
+ let msg = chan.recv(None);
+ unsafe {
+ mem::transmute_copy::<
+ Result<Instant, RecvTimeoutError>,
+ Result<T, RecvTimeoutError>,
+ >(&msg)
+ }
+ }
+ ReceiverFlavor::Never(chan) => chan.recv(None),
+ }
+ .map_err(|_| RecvError)
+ }
+
+ /// Waits for a message to be received from the channel, but only for a limited time.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, RecvTimeoutError};
+ ///
+ /// let (s, r) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s.send(5).unwrap();
+ /// drop(s);
+ /// });
+ ///
+ /// assert_eq!(
+ /// r.recv_timeout(Duration::from_millis(500)),
+ /// Err(RecvTimeoutError::Timeout),
+ /// );
+ /// assert_eq!(
+ /// r.recv_timeout(Duration::from_secs(1)),
+ /// Ok(5),
+ /// );
+ /// assert_eq!(
+ /// r.recv_timeout(Duration::from_secs(1)),
+ /// Err(RecvTimeoutError::Disconnected),
+ /// );
+ /// ```
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ self.recv_deadline(utils::convert_timeout_to_deadline(timeout))
+ }
+
+ /// Waits for a message to be received from the channel, but only before a given deadline.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::{Instant, Duration};
+ /// use crossbeam_channel::{unbounded, RecvTimeoutError};
+ ///
+ /// let (s, r) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s.send(5).unwrap();
+ /// drop(s);
+ /// });
+ ///
+ /// let now = Instant::now();
+ ///
+ /// assert_eq!(
+ /// r.recv_deadline(now + Duration::from_millis(500)),
+ /// Err(RecvTimeoutError::Timeout),
+ /// );
+ /// assert_eq!(
+ /// r.recv_deadline(now + Duration::from_millis(1500)),
+ /// Ok(5),
+ /// );
+ /// assert_eq!(
+ /// r.recv_deadline(now + Duration::from_secs(5)),
+ /// Err(RecvTimeoutError::Disconnected),
+ /// );
+ /// ```
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::At(chan) => {
+ let msg = chan.recv(Some(deadline));
+ unsafe {
+ mem::transmute_copy::<
+ Result<Instant, RecvTimeoutError>,
+ Result<T, RecvTimeoutError>,
+ >(&msg)
+ }
+ }
+ ReceiverFlavor::Tick(chan) => {
+ let msg = chan.recv(Some(deadline));
+ unsafe {
+ mem::transmute_copy::<
+ Result<Instant, RecvTimeoutError>,
+ Result<T, RecvTimeoutError>,
+ >(&msg)
+ }
+ }
+ ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
+ }
+ }
+
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded();
+ ///
+ /// assert!(r.is_empty());
+ /// s.send(0).unwrap();
+ /// assert!(!r.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_empty(),
+ ReceiverFlavor::List(chan) => chan.is_empty(),
+ ReceiverFlavor::Zero(chan) => chan.is_empty(),
+ ReceiverFlavor::At(chan) => chan.is_empty(),
+ ReceiverFlavor::Tick(chan) => chan.is_empty(),
+ ReceiverFlavor::Never(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::bounded;
+ ///
+ /// let (s, r) = bounded(1);
+ ///
+ /// assert!(!r.is_full());
+ /// s.send(0).unwrap();
+ /// assert!(r.is_full());
+ /// ```
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_full(),
+ ReceiverFlavor::List(chan) => chan.is_full(),
+ ReceiverFlavor::Zero(chan) => chan.is_full(),
+ ReceiverFlavor::At(chan) => chan.is_full(),
+ ReceiverFlavor::Tick(chan) => chan.is_full(),
+ ReceiverFlavor::Never(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded();
+ /// assert_eq!(r.len(), 0);
+ ///
+ /// s.send(1).unwrap();
+ /// s.send(2).unwrap();
+ /// assert_eq!(r.len(), 2);
+ /// ```
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.len(),
+ ReceiverFlavor::List(chan) => chan.len(),
+ ReceiverFlavor::Zero(chan) => chan.len(),
+ ReceiverFlavor::At(chan) => chan.len(),
+ ReceiverFlavor::Tick(chan) => chan.len(),
+ ReceiverFlavor::Never(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, unbounded};
+ ///
+ /// let (_, r) = unbounded::<i32>();
+ /// assert_eq!(r.capacity(), None);
+ ///
+ /// let (_, r) = bounded::<i32>(5);
+ /// assert_eq!(r.capacity(), Some(5));
+ ///
+ /// let (_, r) = bounded::<i32>(0);
+ /// assert_eq!(r.capacity(), Some(0));
+ /// ```
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.capacity(),
+ ReceiverFlavor::List(chan) => chan.capacity(),
+ ReceiverFlavor::Zero(chan) => chan.capacity(),
+ ReceiverFlavor::At(chan) => chan.capacity(),
+ ReceiverFlavor::Tick(chan) => chan.capacity(),
+ ReceiverFlavor::Never(chan) => chan.capacity(),
+ }
+ }
+
+ /// A blocking iterator over messages in the channel.
+ ///
+ /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
+ /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
+ ///
+ /// [`next`]: Iterator::next
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// s.send(1).unwrap();
+ /// s.send(2).unwrap();
+ /// s.send(3).unwrap();
+ /// drop(s); // Disconnect the channel.
+ /// });
+ ///
+ /// // Collect all messages from the channel.
+ /// // Note that the call to `collect` blocks until the sender is dropped.
+ /// let v: Vec<_> = r.iter().collect();
+ ///
+ /// assert_eq!(v, [1, 2, 3]);
+ /// ```
+ pub fn iter(&self) -> Iter<'_, T> {
+ Iter { receiver: self }
+ }
+
+ /// A non-blocking iterator over messages in the channel.
+ ///
+ /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
+ /// never blocks waiting for the next message.
+ ///
+ /// [`next`]: Iterator::next
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (s, r) = unbounded::<i32>();
+ ///
+ /// thread::spawn(move || {
+ /// s.send(1).unwrap();
+ /// thread::sleep(Duration::from_secs(1));
+ /// s.send(2).unwrap();
+ /// thread::sleep(Duration::from_secs(2));
+ /// s.send(3).unwrap();
+ /// });
+ ///
+ /// thread::sleep(Duration::from_secs(2));
+ ///
+ /// // Collect all messages from the channel without blocking.
+ /// // The third message hasn't been sent yet so we'll collect only the first two.
+ /// let v: Vec<_> = r.try_iter().collect();
+ ///
+ /// assert_eq!(v, [1, 2]);
+ /// ```
+ pub fn try_iter(&self) -> TryIter<'_, T> {
+ TryIter { receiver: self }
+ }
+
+ /// Returns `true` if receivers belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use crossbeam_channel::unbounded;
+ ///
+ /// let (_, r) = unbounded::<usize>();
+ ///
+ /// let r2 = r.clone();
+ /// assert!(r.same_channel(&r2));
+ ///
+ /// let (_, r3) = unbounded();
+ /// assert!(!r.same_channel(&r3));
+ /// ```
+ pub fn same_channel(&self, other: &Receiver<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
+ (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
+ (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
+ (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
+ (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
+ (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
+ ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::At(_) => {}
+ ReceiverFlavor::Tick(_) => {}
+ ReceiverFlavor::Never(_) => {}
+ }
+ }
+ }
+}
+
+impl<T> Clone for Receiver<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
+ ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
+ ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
+ ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
+ ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
+ ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
+ };
+
+ Receiver { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Receiver { .. }")
+ }
+}
+
+impl<'a, T> IntoIterator for &'a Receiver<T> {
+ type Item = T;
+ type IntoIter = Iter<'a, T>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
+ }
+}
+
+impl<T> IntoIterator for Receiver<T> {
+ type Item = T;
+ type IntoIter = IntoIter<T>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter { receiver: self }
+ }
+}
+
+/// A blocking iterator over messages in a channel.
+///
+/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
+/// channel becomes empty and disconnected, it returns [`None`] without blocking.
+///
+/// [`next`]: Iterator::next
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s, r) = unbounded();
+///
+/// thread::spawn(move || {
+/// s.send(1).unwrap();
+/// s.send(2).unwrap();
+/// s.send(3).unwrap();
+/// drop(s); // Disconnect the channel.
+/// });
+///
+/// // Collect all messages from the channel.
+/// // Note that the call to `collect` blocks until the sender is dropped.
+/// let v: Vec<_> = r.iter().collect();
+///
+/// assert_eq!(v, [1, 2, 3]);
+/// ```
+pub struct Iter<'a, T> {
+ receiver: &'a Receiver<T>,
+}
+
+impl<T> FusedIterator for Iter<'_, T> {}
+
+impl<T> Iterator for Iter<'_, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.recv().ok()
+ }
+}
+
+impl<T> fmt::Debug for Iter<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Iter { .. }")
+ }
+}
+
+/// A non-blocking iterator over messages in a channel.
+///
+/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
+/// never blocks waiting for the next message.
+///
+/// [`next`]: Iterator::next
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s, r) = unbounded::<i32>();
+///
+/// thread::spawn(move || {
+/// s.send(1).unwrap();
+/// thread::sleep(Duration::from_secs(1));
+/// s.send(2).unwrap();
+/// thread::sleep(Duration::from_secs(2));
+/// s.send(3).unwrap();
+/// });
+///
+/// thread::sleep(Duration::from_secs(2));
+///
+/// // Collect all messages from the channel without blocking.
+/// // The third message hasn't been sent yet so we'll collect only the first two.
+/// let v: Vec<_> = r.try_iter().collect();
+///
+/// assert_eq!(v, [1, 2]);
+/// ```
+pub struct TryIter<'a, T> {
+ receiver: &'a Receiver<T>,
+}
+
+impl<T> Iterator for TryIter<'_, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.try_recv().ok()
+ }
+}
+
+impl<T> fmt::Debug for TryIter<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("TryIter { .. }")
+ }
+}
+
+/// A blocking iterator over messages in a channel.
+///
+/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
+/// channel becomes empty and disconnected, it returns [`None`] without blocking.
+///
+/// [`next`]: Iterator::next
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use crossbeam_channel::unbounded;
+///
+/// let (s, r) = unbounded();
+///
+/// thread::spawn(move || {
+/// s.send(1).unwrap();
+/// s.send(2).unwrap();
+/// s.send(3).unwrap();
+/// drop(s); // Disconnect the channel.
+/// });
+///
+/// // Collect all messages from the channel.
+/// // Note that the call to `collect` blocks until the sender is dropped.
+/// let v: Vec<_> = r.into_iter().collect();
+///
+/// assert_eq!(v, [1, 2, 3]);
+/// ```
+pub struct IntoIter<T> {
+ receiver: Receiver<T>,
+}
+
+impl<T> FusedIterator for IntoIter<T> {}
+
+impl<T> Iterator for IntoIter<T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.recv().ok()
+ }
+}
+
+impl<T> fmt::Debug for IntoIter<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("IntoIter { .. }")
+ }
+}
+
+impl<T> SelectHandle for Sender<T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().try_select(token),
+ SenderFlavor::List(chan) => chan.sender().try_select(token),
+ SenderFlavor::Zero(chan) => chan.sender().try_select(token),
+ }
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
+ SenderFlavor::List(chan) => chan.sender().register(oper, cx),
+ SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
+ }
+ }
+
+ fn unregister(&self, oper: Operation) {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().unregister(oper),
+ SenderFlavor::List(chan) => chan.sender().unregister(oper),
+ SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
+ SenderFlavor::List(chan) => chan.sender().accept(token, cx),
+ SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
+ }
+ }
+
+ fn is_ready(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().is_ready(),
+ SenderFlavor::List(chan) => chan.sender().is_ready(),
+ SenderFlavor::Zero(chan) => chan.sender().is_ready(),
+ }
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
+ SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
+ SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
+ }
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
+ SenderFlavor::List(chan) => chan.sender().unwatch(oper),
+ SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
+ }
+ }
+}
+
+impl<T> SelectHandle for Receiver<T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
+ ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
+ ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
+ ReceiverFlavor::At(chan) => chan.try_select(token),
+ ReceiverFlavor::Tick(chan) => chan.try_select(token),
+ ReceiverFlavor::Never(chan) => chan.try_select(token),
+ }
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ match &self.flavor {
+ ReceiverFlavor::Array(_) => None,
+ ReceiverFlavor::List(_) => None,
+ ReceiverFlavor::Zero(_) => None,
+ ReceiverFlavor::At(chan) => chan.deadline(),
+ ReceiverFlavor::Tick(chan) => chan.deadline(),
+ ReceiverFlavor::Never(chan) => chan.deadline(),
+ }
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
+ ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
+ ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
+ ReceiverFlavor::At(chan) => chan.register(oper, cx),
+ ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
+ ReceiverFlavor::Never(chan) => chan.register(oper, cx),
+ }
+ }
+
+ fn unregister(&self, oper: Operation) {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
+ ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
+ ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
+ ReceiverFlavor::At(chan) => chan.unregister(oper),
+ ReceiverFlavor::Tick(chan) => chan.unregister(oper),
+ ReceiverFlavor::Never(chan) => chan.unregister(oper),
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
+ ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
+ ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
+ ReceiverFlavor::At(chan) => chan.accept(token, cx),
+ ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
+ ReceiverFlavor::Never(chan) => chan.accept(token, cx),
+ }
+ }
+
+ fn is_ready(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
+ ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
+ ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
+ ReceiverFlavor::At(chan) => chan.is_ready(),
+ ReceiverFlavor::Tick(chan) => chan.is_ready(),
+ ReceiverFlavor::Never(chan) => chan.is_ready(),
+ }
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
+ ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
+ ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
+ ReceiverFlavor::At(chan) => chan.watch(oper, cx),
+ ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
+ ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
+ }
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
+ ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
+ ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
+ ReceiverFlavor::At(chan) => chan.unwatch(oper),
+ ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
+ ReceiverFlavor::Never(chan) => chan.unwatch(oper),
+ }
+ }
+}
+
+/// Writes a message into the channel.
+pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
+ match &s.flavor {
+ SenderFlavor::Array(chan) => chan.write(token, msg),
+ SenderFlavor::List(chan) => chan.write(token, msg),
+ SenderFlavor::Zero(chan) => chan.write(token, msg),
+ }
+}
+
+/// Reads a message from the channel.
+pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
+ match &r.flavor {
+ ReceiverFlavor::Array(chan) => chan.read(token),
+ ReceiverFlavor::List(chan) => chan.read(token),
+ ReceiverFlavor::Zero(chan) => chan.read(token),
+ ReceiverFlavor::At(chan) => {
+ mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
+ }
+ ReceiverFlavor::Tick(chan) => {
+ mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
+ }
+ ReceiverFlavor::Never(chan) => chan.read(token),
+ }
+}