//! The channel interface. use std::fmt; use std::iter::FusedIterator; use std::mem; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::context::Context; use crate::counter; use crate::err::{ RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, }; use crate::flavors; use crate::select::{Operation, SelectHandle, Token}; /// Creates a channel of unbounded capacity. /// /// This channel has a growable buffer that can hold any number of messages at a time. /// /// # Examples /// /// ``` /// use std::thread; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// // Computes the n-th Fibonacci number. /// fn fib(n: i32) -> i32 { /// if n <= 1 { /// n /// } else { /// fib(n - 1) + fib(n - 2) /// } /// } /// /// // Spawn an asynchronous computation. /// thread::spawn(move || s.send(fib(20)).unwrap()); /// /// // Print the result of the computation. /// println!("{}", r.recv().unwrap()); /// ``` pub fn unbounded() -> (Sender, Receiver) { let (s, r) = counter::new(flavors::list::Channel::new()); let s = Sender { flavor: SenderFlavor::List(s), }; let r = Receiver { flavor: ReceiverFlavor::List(r), }; (s, r) } /// Creates a channel of bounded capacity. /// /// This channel has a buffer that can hold at most `cap` messages at a time. /// /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and /// receive operations must appear at the same time in order to pair up and pass the message over. /// /// # Examples /// /// A channel of capacity 1: /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::bounded; /// /// let (s, r) = bounded(1); /// /// // This call returns immediately because there is enough space in the channel. /// s.send(1).unwrap(); /// /// thread::spawn(move || { /// // This call blocks the current thread because the channel is full. /// // It will be able to complete only after the first message is received. /// s.send(2).unwrap(); /// }); /// /// thread::sleep(Duration::from_secs(1)); /// assert_eq!(r.recv(), Ok(1)); /// assert_eq!(r.recv(), Ok(2)); /// ``` /// /// A zero-capacity channel: /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::bounded; /// /// let (s, r) = bounded(0); /// /// thread::spawn(move || { /// // This call blocks the current thread until a receive operation appears /// // on the other side of the channel. /// s.send(1).unwrap(); /// }); /// /// thread::sleep(Duration::from_secs(1)); /// assert_eq!(r.recv(), Ok(1)); /// ``` pub fn bounded(cap: usize) -> (Sender, Receiver) { if cap == 0 { let (s, r) = counter::new(flavors::zero::Channel::new()); let s = Sender { flavor: SenderFlavor::Zero(s), }; let r = Receiver { flavor: ReceiverFlavor::Zero(r), }; (s, r) } else { let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap)); let s = Sender { flavor: SenderFlavor::Array(s), }; let r = Receiver { flavor: ReceiverFlavor::Array(r), }; (s, r) } } /// Creates a receiver that delivers a message after a certain duration of time. /// /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will /// be sent into the channel after `duration` elapses. The message is the instant at which it is /// sent. /// /// # Examples /// /// Using an `after` channel for timeouts: /// /// ``` /// use std::time::Duration; /// use crossbeam_channel::{after, select, unbounded}; /// /// let (s, r) = unbounded::(); /// let timeout = Duration::from_millis(100); /// /// select! { /// recv(r) -> msg => println!("received {:?}", msg), /// recv(after(timeout)) -> _ => println!("timed out"), /// } /// ``` /// /// When the message gets sent: /// /// ``` /// use std::thread; /// use std::time::{Duration, Instant}; /// use crossbeam_channel::after; /// /// // Converts a number of milliseconds into a `Duration`. /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. /// let eq = |a, b| a + ms(60) > b && b + ms(60) > a; /// /// let start = Instant::now(); /// let r = after(ms(100)); /// /// thread::sleep(ms(500)); /// /// // This message was sent 100 ms from the start and received 500 ms from the start. /// assert!(eq(r.recv().unwrap(), start + ms(100))); /// assert!(eq(Instant::now(), start + ms(500))); /// ``` pub fn after(duration: Duration) -> Receiver { match Instant::now().checked_add(duration) { Some(deadline) => Receiver { flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))), }, None => never(), } } /// Creates a receiver that delivers a message at a certain instant in time. /// /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will /// be sent into the channel at the moment in time `when`. The message is the instant at which it /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered /// instantly to the receiver. /// /// # Examples /// /// Using an `at` channel for timeouts: /// /// ``` /// use std::time::{Instant, Duration}; /// use crossbeam_channel::{at, select, unbounded}; /// /// let (s, r) = unbounded::(); /// let deadline = Instant::now() + Duration::from_millis(500); /// /// select! { /// recv(r) -> msg => println!("received {:?}", msg), /// recv(at(deadline)) -> _ => println!("timed out"), /// } /// ``` /// /// When the message gets sent: /// /// ``` /// use std::time::{Duration, Instant}; /// use crossbeam_channel::at; /// /// // Converts a number of milliseconds into a `Duration`. /// let ms = |ms| Duration::from_millis(ms); /// /// let start = Instant::now(); /// let end = start + ms(100); /// /// let r = at(end); /// /// // This message was sent 100 ms from the start /// assert_eq!(r.recv().unwrap(), end); /// assert!(Instant::now() > start + ms(100)); /// ``` pub fn at(when: Instant) -> Receiver { Receiver { flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))), } } /// Creates a receiver that never delivers messages. /// /// The channel is bounded with capacity of 0 and never gets disconnected. /// /// # Examples /// /// Using a `never` channel to optionally add a timeout to [`select!`]: /// /// [`select!`]: crate::select! /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{after, select, never, unbounded}; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s.send(1).unwrap(); /// }); /// /// // Suppose this duration can be a `Some` or a `None`. /// let duration = Some(Duration::from_millis(100)); /// /// // Create a channel that times out after the specified duration. /// let timeout = duration /// .map(|d| after(d)) /// .unwrap_or(never()); /// /// select! { /// recv(r) -> msg => assert_eq!(msg, Ok(1)), /// recv(timeout) -> _ => println!("timed out"), /// } /// ``` pub fn never() -> Receiver { Receiver { flavor: ReceiverFlavor::Never(flavors::never::Channel::new()), } } /// Creates a receiver that delivers messages periodically. /// /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be /// sent into the channel in intervals of `duration`. Each message is the instant at which it is /// sent. /// /// # Examples /// /// Using a `tick` channel to periodically print elapsed time: /// /// ``` /// use std::time::{Duration, Instant}; /// use crossbeam_channel::tick; /// /// let start = Instant::now(); /// let ticker = tick(Duration::from_millis(100)); /// /// for _ in 0..5 { /// ticker.recv().unwrap(); /// println!("elapsed: {:?}", start.elapsed()); /// } /// ``` /// /// When messages get sent: /// /// ``` /// use std::thread; /// use std::time::{Duration, Instant}; /// use crossbeam_channel::tick; /// /// // Converts a number of milliseconds into a `Duration`. /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. /// let eq = |a, b| a + ms(65) > b && b + ms(65) > a; /// /// let start = Instant::now(); /// let r = tick(ms(100)); /// /// // This message was sent 100 ms from the start and received 100 ms from the start. /// assert!(eq(r.recv().unwrap(), start + ms(100))); /// assert!(eq(Instant::now(), start + ms(100))); /// /// thread::sleep(ms(500)); /// /// // This message was sent 200 ms from the start and received 600 ms from the start. /// assert!(eq(r.recv().unwrap(), start + ms(200))); /// assert!(eq(Instant::now(), start + ms(600))); /// /// // This message was sent 700 ms from the start and received 700 ms from the start. /// assert!(eq(r.recv().unwrap(), start + ms(700))); /// assert!(eq(Instant::now(), start + ms(700))); /// ``` pub fn tick(duration: Duration) -> Receiver { match Instant::now().checked_add(duration) { Some(delivery_time) => Receiver { flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new( delivery_time, duration, ))), }, None => never(), } } /// The sending side of a channel. /// /// # Examples /// /// ``` /// use std::thread; /// use crossbeam_channel::unbounded; /// /// let (s1, r) = unbounded(); /// let s2 = s1.clone(); /// /// thread::spawn(move || s1.send(1).unwrap()); /// thread::spawn(move || s2.send(2).unwrap()); /// /// let msg1 = r.recv().unwrap(); /// let msg2 = r.recv().unwrap(); /// /// assert_eq!(msg1 + msg2, 3); /// ``` pub struct Sender { flavor: SenderFlavor, } /// Sender flavors. enum SenderFlavor { /// Bounded channel based on a preallocated array. Array(counter::Sender>), /// Unbounded channel implemented as a linked list. List(counter::Sender>), /// Zero-capacity channel. Zero(counter::Sender>), } unsafe impl Send for Sender {} unsafe impl Sync for Sender {} impl UnwindSafe for Sender {} impl RefUnwindSafe for Sender {} impl Sender { /// Attempts to send a message into the channel without blocking. /// /// This method will either send a message into the channel immediately or return an error if /// the channel is full or disconnected. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will send the message only if there /// happens to be a receive operation on the other side of the channel at the same time. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, TrySendError}; /// /// let (s, r) = bounded(1); /// /// assert_eq!(s.try_send(1), Ok(())); /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); /// /// drop(r); /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3))); /// ``` pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { match &self.flavor { SenderFlavor::Array(chan) => chan.try_send(msg), SenderFlavor::List(chan) => chan.try_send(msg), SenderFlavor::Zero(chan) => chan.try_send(msg), } } /// Blocks the current thread until a message is sent or the channel is disconnected. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed. If the channel becomes disconnected, this call will wake up and return an /// error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{bounded, SendError}; /// /// let (s, r) = bounded(1); /// assert_eq!(s.send(1), Ok(())); /// /// thread::spawn(move || { /// assert_eq!(r.recv(), Ok(1)); /// thread::sleep(Duration::from_secs(1)); /// drop(r); /// }); /// /// assert_eq!(s.send(2), Ok(())); /// assert_eq!(s.send(3), Err(SendError(3))); /// ``` pub fn send(&self, msg: T) -> Result<(), SendError> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, None), SenderFlavor::List(chan) => chan.send(msg, None), SenderFlavor::Zero(chan) => chan.send(msg, None), } .map_err(|err| match err { SendTimeoutError::Disconnected(msg) => SendError(msg), SendTimeoutError::Timeout(_) => unreachable!(), }) } /// Waits for a message to be sent into the channel, but only for a limited time. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed or the operation times out. If the channel becomes disconnected, this call will /// wake up and return an error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{bounded, SendTimeoutError}; /// /// let (s, r) = bounded(0); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// assert_eq!(r.recv(), Ok(2)); /// drop(r); /// }); /// /// assert_eq!( /// s.send_timeout(1, Duration::from_millis(500)), /// Err(SendTimeoutError::Timeout(1)), /// ); /// assert_eq!( /// s.send_timeout(2, Duration::from_secs(1)), /// Ok(()), /// ); /// assert_eq!( /// s.send_timeout(3, Duration::from_millis(500)), /// Err(SendTimeoutError::Disconnected(3)), /// ); /// ``` pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError> { match Instant::now().checked_add(timeout) { Some(deadline) => self.send_deadline(msg, deadline), None => self.send(msg).map_err(SendTimeoutError::from), } } /// Waits for a message to be sent into the channel, but only until a given deadline. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed or the operation times out. If the channel becomes disconnected, this call will /// wake up and return an error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::{Duration, Instant}; /// use crossbeam_channel::{bounded, SendTimeoutError}; /// /// let (s, r) = bounded(0); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// assert_eq!(r.recv(), Ok(2)); /// drop(r); /// }); /// /// let now = Instant::now(); /// /// assert_eq!( /// s.send_deadline(1, now + Duration::from_millis(500)), /// Err(SendTimeoutError::Timeout(1)), /// ); /// assert_eq!( /// s.send_deadline(2, now + Duration::from_millis(1500)), /// Ok(()), /// ); /// assert_eq!( /// s.send_deadline(3, now + Duration::from_millis(2000)), /// Err(SendTimeoutError::Disconnected(3)), /// ); /// ``` pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), } } /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. /// /// # Examples /// /// ``` /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// assert!(s.is_empty()); /// /// s.send(0).unwrap(); /// assert!(!s.is_empty()); /// ``` pub fn is_empty(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_empty(), SenderFlavor::List(chan) => chan.is_empty(), SenderFlavor::Zero(chan) => chan.is_empty(), } } /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. /// /// # Examples /// /// ``` /// use crossbeam_channel::bounded; /// /// let (s, r) = bounded(1); /// /// assert!(!s.is_full()); /// s.send(0).unwrap(); /// assert!(s.is_full()); /// ``` pub fn is_full(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_full(), SenderFlavor::List(chan) => chan.is_full(), SenderFlavor::Zero(chan) => chan.is_full(), } } /// Returns the number of messages in the channel. /// /// # Examples /// /// ``` /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// assert_eq!(s.len(), 0); /// /// s.send(1).unwrap(); /// s.send(2).unwrap(); /// assert_eq!(s.len(), 2); /// ``` pub fn len(&self) -> usize { match &self.flavor { SenderFlavor::Array(chan) => chan.len(), SenderFlavor::List(chan) => chan.len(), SenderFlavor::Zero(chan) => chan.len(), } } /// If the channel is bounded, returns its capacity. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, unbounded}; /// /// let (s, _) = unbounded::(); /// assert_eq!(s.capacity(), None); /// /// let (s, _) = bounded::(5); /// assert_eq!(s.capacity(), Some(5)); /// /// let (s, _) = bounded::(0); /// assert_eq!(s.capacity(), Some(0)); /// ``` pub fn capacity(&self) -> Option { match &self.flavor { SenderFlavor::Array(chan) => chan.capacity(), SenderFlavor::List(chan) => chan.capacity(), SenderFlavor::Zero(chan) => chan.capacity(), } } /// Returns `true` if senders belong to the same channel. /// /// # Examples /// /// ```rust /// use crossbeam_channel::unbounded; /// /// let (s, _) = unbounded::(); /// /// let s2 = s.clone(); /// assert!(s.same_channel(&s2)); /// /// let (s3, _) = unbounded(); /// assert!(!s.same_channel(&s3)); /// ``` pub fn same_channel(&self, other: &Sender) -> bool { match (&self.flavor, &other.flavor) { (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, _ => false, } } } impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } } } } impl Clone for Sender { fn clone(&self) -> Self { let flavor = match &self.flavor { SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), }; Sender { flavor } } } impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Sender { .. }") } } /// The receiving side of a channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// let _ = s.send(1); /// thread::sleep(Duration::from_secs(1)); /// let _ = s.send(2); /// }); /// /// assert_eq!(r.recv(), Ok(1)); // Received immediately. /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second. /// ``` pub struct Receiver { flavor: ReceiverFlavor, } /// Receiver flavors. enum ReceiverFlavor { /// Bounded channel based on a preallocated array. Array(counter::Receiver>), /// Unbounded channel implemented as a linked list. List(counter::Receiver>), /// Zero-capacity channel. Zero(counter::Receiver>), /// The after flavor. At(Arc), /// The tick flavor. Tick(Arc), /// The never flavor. Never(flavors::never::Channel), } unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl UnwindSafe for Receiver {} impl RefUnwindSafe for Receiver {} impl Receiver { /// Attempts to receive a message from the channel without blocking. /// /// This method will either receive a message from the channel immediately or return an error /// if the channel is empty. /// /// If called on a zero-capacity channel, this method will receive a message only if there /// happens to be a send operation on the other side of the channel at the same time. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, TryRecvError}; /// /// let (s, r) = unbounded(); /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); /// /// s.send(5).unwrap(); /// drop(s); /// /// assert_eq!(r.try_recv(), Ok(5)); /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); /// ``` pub fn try_recv(&self) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.try_recv(), ReceiverFlavor::List(chan) => chan.try_recv(), ReceiverFlavor::Zero(chan) => chan.try_recv(), ReceiverFlavor::At(chan) => { let msg = chan.try_recv(); unsafe { mem::transmute_copy::, Result>( &msg, ) } } ReceiverFlavor::Tick(chan) => { let msg = chan.try_recv(); unsafe { mem::transmute_copy::, Result>( &msg, ) } } ReceiverFlavor::Never(chan) => chan.try_recv(), } } /// Blocks the current thread until a message is received or the channel is empty and /// disconnected. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed. If the channel is empty and becomes disconnected, this call will /// wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, RecvError}; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s.send(5).unwrap(); /// drop(s); /// }); /// /// assert_eq!(r.recv(), Ok(5)); /// assert_eq!(r.recv(), Err(RecvError)); /// ``` pub fn recv(&self) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(None), ReceiverFlavor::List(chan) => chan.recv(None), ReceiverFlavor::Zero(chan) => chan.recv(None), ReceiverFlavor::At(chan) => { let msg = chan.recv(None); unsafe { mem::transmute_copy::< Result, Result, >(&msg) } } ReceiverFlavor::Tick(chan) => { let msg = chan.recv(None); unsafe { mem::transmute_copy::< Result, Result, >(&msg) } } ReceiverFlavor::Never(chan) => chan.recv(None), } .map_err(|_| RecvError) } /// Waits for a message to be received from the channel, but only for a limited time. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed or the operation times out. If the channel is empty and becomes /// disconnected, this call will wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, RecvTimeoutError}; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s.send(5).unwrap(); /// drop(s); /// }); /// /// assert_eq!( /// r.recv_timeout(Duration::from_millis(500)), /// Err(RecvTimeoutError::Timeout), /// ); /// assert_eq!( /// r.recv_timeout(Duration::from_secs(1)), /// Ok(5), /// ); /// assert_eq!( /// r.recv_timeout(Duration::from_secs(1)), /// Err(RecvTimeoutError::Disconnected), /// ); /// ``` pub fn recv_timeout(&self, timeout: Duration) -> Result { match Instant::now().checked_add(timeout) { Some(deadline) => self.recv_deadline(deadline), None => self.recv().map_err(RecvTimeoutError::from), } } /// Waits for a message to be received from the channel, but only before a given deadline. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed or the operation times out. If the channel is empty and becomes /// disconnected, this call will wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::{Instant, Duration}; /// use crossbeam_channel::{unbounded, RecvTimeoutError}; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s.send(5).unwrap(); /// drop(s); /// }); /// /// let now = Instant::now(); /// /// assert_eq!( /// r.recv_deadline(now + Duration::from_millis(500)), /// Err(RecvTimeoutError::Timeout), /// ); /// assert_eq!( /// r.recv_deadline(now + Duration::from_millis(1500)), /// Ok(5), /// ); /// assert_eq!( /// r.recv_deadline(now + Duration::from_secs(5)), /// Err(RecvTimeoutError::Disconnected), /// ); /// ``` pub fn recv_deadline(&self, deadline: Instant) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), ReceiverFlavor::At(chan) => { let msg = chan.recv(Some(deadline)); unsafe { mem::transmute_copy::< Result, Result, >(&msg) } } ReceiverFlavor::Tick(chan) => { let msg = chan.recv(Some(deadline)); unsafe { mem::transmute_copy::< Result, Result, >(&msg) } } ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)), } } /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. /// /// # Examples /// /// ``` /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// assert!(r.is_empty()); /// s.send(0).unwrap(); /// assert!(!r.is_empty()); /// ``` pub fn is_empty(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_empty(), ReceiverFlavor::List(chan) => chan.is_empty(), ReceiverFlavor::Zero(chan) => chan.is_empty(), ReceiverFlavor::At(chan) => chan.is_empty(), ReceiverFlavor::Tick(chan) => chan.is_empty(), ReceiverFlavor::Never(chan) => chan.is_empty(), } } /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. /// /// # Examples /// /// ``` /// use crossbeam_channel::bounded; /// /// let (s, r) = bounded(1); /// /// assert!(!r.is_full()); /// s.send(0).unwrap(); /// assert!(r.is_full()); /// ``` pub fn is_full(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_full(), ReceiverFlavor::List(chan) => chan.is_full(), ReceiverFlavor::Zero(chan) => chan.is_full(), ReceiverFlavor::At(chan) => chan.is_full(), ReceiverFlavor::Tick(chan) => chan.is_full(), ReceiverFlavor::Never(chan) => chan.is_full(), } } /// Returns the number of messages in the channel. /// /// # Examples /// /// ``` /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// assert_eq!(r.len(), 0); /// /// s.send(1).unwrap(); /// s.send(2).unwrap(); /// assert_eq!(r.len(), 2); /// ``` pub fn len(&self) -> usize { match &self.flavor { ReceiverFlavor::Array(chan) => chan.len(), ReceiverFlavor::List(chan) => chan.len(), ReceiverFlavor::Zero(chan) => chan.len(), ReceiverFlavor::At(chan) => chan.len(), ReceiverFlavor::Tick(chan) => chan.len(), ReceiverFlavor::Never(chan) => chan.len(), } } /// If the channel is bounded, returns its capacity. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, unbounded}; /// /// let (_, r) = unbounded::(); /// assert_eq!(r.capacity(), None); /// /// let (_, r) = bounded::(5); /// assert_eq!(r.capacity(), Some(5)); /// /// let (_, r) = bounded::(0); /// assert_eq!(r.capacity(), Some(0)); /// ``` pub fn capacity(&self) -> Option { match &self.flavor { ReceiverFlavor::Array(chan) => chan.capacity(), ReceiverFlavor::List(chan) => chan.capacity(), ReceiverFlavor::Zero(chan) => chan.capacity(), ReceiverFlavor::At(chan) => chan.capacity(), ReceiverFlavor::Tick(chan) => chan.capacity(), ReceiverFlavor::Never(chan) => chan.capacity(), } } /// A blocking iterator over messages in the channel. /// /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if /// the channel becomes empty and disconnected, it returns [`None`] without blocking. /// /// [`next`]: Iterator::next /// /// # Examples /// /// ``` /// use std::thread; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// s.send(1).unwrap(); /// s.send(2).unwrap(); /// s.send(3).unwrap(); /// drop(s); // Disconnect the channel. /// }); /// /// // Collect all messages from the channel. /// // Note that the call to `collect` blocks until the sender is dropped. /// let v: Vec<_> = r.iter().collect(); /// /// assert_eq!(v, [1, 2, 3]); /// ``` pub fn iter(&self) -> Iter<'_, T> { Iter { receiver: self } } /// A non-blocking iterator over messages in the channel. /// /// Each call to [`next`] returns a message if there is one ready to be received. The iterator /// never blocks waiting for the next message. /// /// [`next`]: Iterator::next /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded::(); /// /// thread::spawn(move || { /// s.send(1).unwrap(); /// thread::sleep(Duration::from_secs(1)); /// s.send(2).unwrap(); /// thread::sleep(Duration::from_secs(2)); /// s.send(3).unwrap(); /// }); /// /// thread::sleep(Duration::from_secs(2)); /// /// // Collect all messages from the channel without blocking. /// // The third message hasn't been sent yet so we'll collect only the first two. /// let v: Vec<_> = r.try_iter().collect(); /// /// assert_eq!(v, [1, 2]); /// ``` pub fn try_iter(&self) -> TryIter<'_, T> { TryIter { receiver: self } } /// Returns `true` if receivers belong to the same channel. /// /// # Examples /// /// ```rust /// use crossbeam_channel::unbounded; /// /// let (_, r) = unbounded::(); /// /// let r2 = r.clone(); /// assert!(r.same_channel(&r2)); /// /// let (_, r3) = unbounded(); /// assert!(!r.same_channel(&r3)); /// ``` pub fn same_channel(&self, other: &Receiver) -> bool { match (&self.flavor, &other.flavor) { (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b), (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b), (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true, _ => false, } } } impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::At(_) => {} ReceiverFlavor::Tick(_) => {} ReceiverFlavor::Never(_) => {} } } } } impl Clone for Receiver { fn clone(&self) -> Self { let flavor = match &self.flavor { ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()), ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()), ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()), }; Receiver { flavor } } } impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Receiver { .. }") } } impl<'a, T> IntoIterator for &'a Receiver { type Item = T; type IntoIter = Iter<'a, T>; fn into_iter(self) -> Self::IntoIter { self.iter() } } impl IntoIterator for Receiver { type Item = T; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter { receiver: self } } } /// A blocking iterator over messages in a channel. /// /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the /// channel becomes empty and disconnected, it returns [`None`] without blocking. /// /// [`next`]: Iterator::next /// /// # Examples /// /// ``` /// use std::thread; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// s.send(1).unwrap(); /// s.send(2).unwrap(); /// s.send(3).unwrap(); /// drop(s); // Disconnect the channel. /// }); /// /// // Collect all messages from the channel. /// // Note that the call to `collect` blocks until the sender is dropped. /// let v: Vec<_> = r.iter().collect(); /// /// assert_eq!(v, [1, 2, 3]); /// ``` pub struct Iter<'a, T> { receiver: &'a Receiver, } impl FusedIterator for Iter<'_, T> {} impl Iterator for Iter<'_, T> { type Item = T; fn next(&mut self) -> Option { self.receiver.recv().ok() } } impl fmt::Debug for Iter<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Iter { .. }") } } /// A non-blocking iterator over messages in a channel. /// /// Each call to [`next`] returns a message if there is one ready to be received. The iterator /// never blocks waiting for the next message. /// /// [`next`]: Iterator::next /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded::(); /// /// thread::spawn(move || { /// s.send(1).unwrap(); /// thread::sleep(Duration::from_secs(1)); /// s.send(2).unwrap(); /// thread::sleep(Duration::from_secs(2)); /// s.send(3).unwrap(); /// }); /// /// thread::sleep(Duration::from_secs(2)); /// /// // Collect all messages from the channel without blocking. /// // The third message hasn't been sent yet so we'll collect only the first two. /// let v: Vec<_> = r.try_iter().collect(); /// /// assert_eq!(v, [1, 2]); /// ``` pub struct TryIter<'a, T> { receiver: &'a Receiver, } impl Iterator for TryIter<'_, T> { type Item = T; fn next(&mut self) -> Option { self.receiver.try_recv().ok() } } impl fmt::Debug for TryIter<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("TryIter { .. }") } } /// A blocking iterator over messages in a channel. /// /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the /// channel becomes empty and disconnected, it returns [`None`] without blocking. /// /// [`next`]: Iterator::next /// /// # Examples /// /// ``` /// use std::thread; /// use crossbeam_channel::unbounded; /// /// let (s, r) = unbounded(); /// /// thread::spawn(move || { /// s.send(1).unwrap(); /// s.send(2).unwrap(); /// s.send(3).unwrap(); /// drop(s); // Disconnect the channel. /// }); /// /// // Collect all messages from the channel. /// // Note that the call to `collect` blocks until the sender is dropped. /// let v: Vec<_> = r.into_iter().collect(); /// /// assert_eq!(v, [1, 2, 3]); /// ``` pub struct IntoIter { receiver: Receiver, } impl FusedIterator for IntoIter {} impl Iterator for IntoIter { type Item = T; fn next(&mut self) -> Option { self.receiver.recv().ok() } } impl fmt::Debug for IntoIter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("IntoIter { .. }") } } impl SelectHandle for Sender { fn try_select(&self, token: &mut Token) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().try_select(token), SenderFlavor::List(chan) => chan.sender().try_select(token), SenderFlavor::Zero(chan) => chan.sender().try_select(token), } } fn deadline(&self) -> Option { None } fn register(&self, oper: Operation, cx: &Context) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().register(oper, cx), SenderFlavor::List(chan) => chan.sender().register(oper, cx), SenderFlavor::Zero(chan) => chan.sender().register(oper, cx), } } fn unregister(&self, oper: Operation) { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().unregister(oper), SenderFlavor::List(chan) => chan.sender().unregister(oper), SenderFlavor::Zero(chan) => chan.sender().unregister(oper), } } fn accept(&self, token: &mut Token, cx: &Context) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().accept(token, cx), SenderFlavor::List(chan) => chan.sender().accept(token, cx), SenderFlavor::Zero(chan) => chan.sender().accept(token, cx), } } fn is_ready(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().is_ready(), SenderFlavor::List(chan) => chan.sender().is_ready(), SenderFlavor::Zero(chan) => chan.sender().is_ready(), } } fn watch(&self, oper: Operation, cx: &Context) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().watch(oper, cx), SenderFlavor::List(chan) => chan.sender().watch(oper, cx), SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx), } } fn unwatch(&self, oper: Operation) { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().unwatch(oper), SenderFlavor::List(chan) => chan.sender().unwatch(oper), SenderFlavor::Zero(chan) => chan.sender().unwatch(oper), } } } impl SelectHandle for Receiver { fn try_select(&self, token: &mut Token) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().try_select(token), ReceiverFlavor::List(chan) => chan.receiver().try_select(token), ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token), ReceiverFlavor::At(chan) => chan.try_select(token), ReceiverFlavor::Tick(chan) => chan.try_select(token), ReceiverFlavor::Never(chan) => chan.try_select(token), } } fn deadline(&self) -> Option { match &self.flavor { ReceiverFlavor::Array(_) => None, ReceiverFlavor::List(_) => None, ReceiverFlavor::Zero(_) => None, ReceiverFlavor::At(chan) => chan.deadline(), ReceiverFlavor::Tick(chan) => chan.deadline(), ReceiverFlavor::Never(chan) => chan.deadline(), } } fn register(&self, oper: Operation, cx: &Context) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx), ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx), ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx), ReceiverFlavor::At(chan) => chan.register(oper, cx), ReceiverFlavor::Tick(chan) => chan.register(oper, cx), ReceiverFlavor::Never(chan) => chan.register(oper, cx), } } fn unregister(&self, oper: Operation) { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper), ReceiverFlavor::List(chan) => chan.receiver().unregister(oper), ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper), ReceiverFlavor::At(chan) => chan.unregister(oper), ReceiverFlavor::Tick(chan) => chan.unregister(oper), ReceiverFlavor::Never(chan) => chan.unregister(oper), } } fn accept(&self, token: &mut Token, cx: &Context) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx), ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx), ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx), ReceiverFlavor::At(chan) => chan.accept(token, cx), ReceiverFlavor::Tick(chan) => chan.accept(token, cx), ReceiverFlavor::Never(chan) => chan.accept(token, cx), } } fn is_ready(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().is_ready(), ReceiverFlavor::List(chan) => chan.receiver().is_ready(), ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(), ReceiverFlavor::At(chan) => chan.is_ready(), ReceiverFlavor::Tick(chan) => chan.is_ready(), ReceiverFlavor::Never(chan) => chan.is_ready(), } } fn watch(&self, oper: Operation, cx: &Context) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx), ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx), ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx), ReceiverFlavor::At(chan) => chan.watch(oper, cx), ReceiverFlavor::Tick(chan) => chan.watch(oper, cx), ReceiverFlavor::Never(chan) => chan.watch(oper, cx), } } fn unwatch(&self, oper: Operation) { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper), ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper), ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper), ReceiverFlavor::At(chan) => chan.unwatch(oper), ReceiverFlavor::Tick(chan) => chan.unwatch(oper), ReceiverFlavor::Never(chan) => chan.unwatch(oper), } } } /// Writes a message into the channel. pub(crate) unsafe fn write(s: &Sender, token: &mut Token, msg: T) -> Result<(), T> { match &s.flavor { SenderFlavor::Array(chan) => chan.write(token, msg), SenderFlavor::List(chan) => chan.write(token, msg), SenderFlavor::Zero(chan) => chan.write(token, msg), } } /// Reads a message from the channel. pub(crate) unsafe fn read(r: &Receiver, token: &mut Token) -> Result { match &r.flavor { ReceiverFlavor::Array(chan) => chan.read(token), ReceiverFlavor::List(chan) => chan.read(token), ReceiverFlavor::Zero(chan) => chan.read(token), ReceiverFlavor::At(chan) => { mem::transmute_copy::, Result>(&chan.read(token)) } ReceiverFlavor::Tick(chan) => { mem::transmute_copy::, Result>(&chan.read(token)) } ReceiverFlavor::Never(chan) => chan.read(token), } }