//! Multi-producer multi-consumer channels. // This module is not currently exposed publicly, but is used // as the implementation for the channels in `sync::mpsc`. The // implementation comes from the crossbeam-channel crate: // // Copyright (c) 2019 The Crossbeam Project Developers // // Permission is hereby granted, free of charge, to any // person obtaining a copy of this software and associated // documentation files (the "Software"), to deal in the // Software without restriction, including without // limitation the rights to use, copy, modify, merge, // publish, distribute, sublicense, and/or sell copies of // the Software, and to permit persons to whom the Software // is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice // shall be included in all copies or substantial portions // of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. mod array; mod context; mod counter; mod error; mod list; mod select; mod utils; mod waker; mod zero; use crate::fmt; use crate::panic::{RefUnwindSafe, UnwindSafe}; use crate::time::{Duration, Instant}; pub use error::*; /// Creates a channel of unbounded capacity. /// /// This channel has a growable buffer that can hold any number of messages at a time. pub fn channel() -> (Sender, Receiver) { let (s, r) = counter::new(list::Channel::new()); let s = Sender { flavor: SenderFlavor::List(s) }; let r = Receiver { flavor: ReceiverFlavor::List(r) }; (s, r) } /// Creates a channel of bounded capacity. /// /// This channel has a buffer that can hold at most `cap` messages at a time. /// /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and /// receive operations must appear at the same time in order to pair up and pass the message over. pub fn sync_channel(cap: usize) -> (Sender, Receiver) { if cap == 0 { let (s, r) = counter::new(zero::Channel::new()); let s = Sender { flavor: SenderFlavor::Zero(s) }; let r = Receiver { flavor: ReceiverFlavor::Zero(r) }; (s, r) } else { let (s, r) = counter::new(array::Channel::with_capacity(cap)); let s = Sender { flavor: SenderFlavor::Array(s) }; let r = Receiver { flavor: ReceiverFlavor::Array(r) }; (s, r) } } /// The sending side of a channel. pub struct Sender { flavor: SenderFlavor, } /// Sender flavors. enum SenderFlavor { /// Bounded channel based on a preallocated array. Array(counter::Sender>), /// Unbounded channel implemented as a linked list. List(counter::Sender>), /// Zero-capacity channel. Zero(counter::Sender>), } unsafe impl Send for Sender {} unsafe impl Sync for Sender {} impl UnwindSafe for Sender {} impl RefUnwindSafe for Sender {} impl Sender { /// Attempts to send a message into the channel without blocking. /// /// This method will either send a message into the channel immediately or return an error if /// the channel is full or disconnected. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will send the message only if there /// happens to be a receive operation on the other side of the channel at the same time. pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { match &self.flavor { SenderFlavor::Array(chan) => chan.try_send(msg), SenderFlavor::List(chan) => chan.try_send(msg), SenderFlavor::Zero(chan) => chan.try_send(msg), } } /// Blocks the current thread until a message is sent or the channel is disconnected. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed. If the channel becomes disconnected, this call will wake up and return an /// error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. pub fn send(&self, msg: T) -> Result<(), SendError> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, None), SenderFlavor::List(chan) => chan.send(msg, None), SenderFlavor::Zero(chan) => chan.send(msg, None), } .map_err(|err| match err { SendTimeoutError::Disconnected(msg) => SendError(msg), SendTimeoutError::Timeout(_) => unreachable!(), }) } } // The methods below are not used by `sync::mpsc`, but // are useful and we'll likely want to expose them // eventually #[allow(unused)] impl Sender { /// Waits for a message to be sent into the channel, but only for a limited time. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed or the operation times out. If the channel becomes disconnected, this call will /// wake up and return an error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError> { match Instant::now().checked_add(timeout) { Some(deadline) => self.send_deadline(msg, deadline), // So far in the future that it's practically the same as waiting indefinitely. None => self.send(msg).map_err(SendTimeoutError::from), } } /// Waits for a message to be sent into the channel, but only until a given deadline. /// /// If the channel is full and not disconnected, this call will block until the send operation /// can proceed or the operation times out. If the channel becomes disconnected, this call will /// wake up and return an error. The returned error contains the original message. /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), } } /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. pub fn is_empty(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_empty(), SenderFlavor::List(chan) => chan.is_empty(), SenderFlavor::Zero(chan) => chan.is_empty(), } } /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. pub fn is_full(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_full(), SenderFlavor::List(chan) => chan.is_full(), SenderFlavor::Zero(chan) => chan.is_full(), } } /// Returns the number of messages in the channel. pub fn len(&self) -> usize { match &self.flavor { SenderFlavor::Array(chan) => chan.len(), SenderFlavor::List(chan) => chan.len(), SenderFlavor::Zero(chan) => chan.len(), } } /// If the channel is bounded, returns its capacity. pub fn capacity(&self) -> Option { match &self.flavor { SenderFlavor::Array(chan) => chan.capacity(), SenderFlavor::List(chan) => chan.capacity(), SenderFlavor::Zero(chan) => chan.capacity(), } } /// Returns `true` if senders belong to the same channel. pub fn same_channel(&self, other: &Sender) -> bool { match (&self.flavor, &other.flavor) { (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, _ => false, } } } impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } } } } impl Clone for Sender { fn clone(&self) -> Self { let flavor = match &self.flavor { SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), }; Sender { flavor } } } impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Sender { .. }") } } /// The receiving side of a channel. pub struct Receiver { flavor: ReceiverFlavor, } /// Receiver flavors. enum ReceiverFlavor { /// Bounded channel based on a preallocated array. Array(counter::Receiver>), /// Unbounded channel implemented as a linked list. List(counter::Receiver>), /// Zero-capacity channel. Zero(counter::Receiver>), } unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl UnwindSafe for Receiver {} impl RefUnwindSafe for Receiver {} impl Receiver { /// Attempts to receive a message from the channel without blocking. /// /// This method will either receive a message from the channel immediately or return an error /// if the channel is empty. /// /// If called on a zero-capacity channel, this method will receive a message only if there /// happens to be a send operation on the other side of the channel at the same time. pub fn try_recv(&self) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.try_recv(), ReceiverFlavor::List(chan) => chan.try_recv(), ReceiverFlavor::Zero(chan) => chan.try_recv(), } } /// Blocks the current thread until a message is received or the channel is empty and /// disconnected. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed. If the channel is empty and becomes disconnected, this call will /// wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. pub fn recv(&self) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(None), ReceiverFlavor::List(chan) => chan.recv(None), ReceiverFlavor::Zero(chan) => chan.recv(None), } .map_err(|_| RecvError) } /// Waits for a message to be received from the channel, but only for a limited time. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed or the operation times out. If the channel is empty and becomes /// disconnected, this call will wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. pub fn recv_timeout(&self, timeout: Duration) -> Result { match Instant::now().checked_add(timeout) { Some(deadline) => self.recv_deadline(deadline), // So far in the future that it's practically the same as waiting indefinitely. None => self.recv().map_err(RecvTimeoutError::from), } } /// Waits for a message to be received from the channel, but only for a limited time. /// /// If the channel is empty and not disconnected, this call will block until the receive /// operation can proceed or the operation times out. If the channel is empty and becomes /// disconnected, this call will wake up and return an error. /// /// If called on a zero-capacity channel, this method will wait for a send operation to appear /// on the other side of the channel. pub fn recv_deadline(&self, deadline: Instant) -> Result { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), } } } // The methods below are not used by `sync::mpsc`, but // are useful and we'll likely want to expose them // eventually #[allow(unused)] impl Receiver { /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. pub fn is_empty(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_empty(), ReceiverFlavor::List(chan) => chan.is_empty(), ReceiverFlavor::Zero(chan) => chan.is_empty(), } } /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. pub fn is_full(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_full(), ReceiverFlavor::List(chan) => chan.is_full(), ReceiverFlavor::Zero(chan) => chan.is_full(), } } /// Returns the number of messages in the channel. pub fn len(&self) -> usize { match &self.flavor { ReceiverFlavor::Array(chan) => chan.len(), ReceiverFlavor::List(chan) => chan.len(), ReceiverFlavor::Zero(chan) => chan.len(), } } /// If the channel is bounded, returns its capacity. pub fn capacity(&self) -> Option { match &self.flavor { ReceiverFlavor::Array(chan) => chan.capacity(), ReceiverFlavor::List(chan) => chan.capacity(), ReceiverFlavor::Zero(chan) => chan.capacity(), } } /// Returns `true` if receivers belong to the same channel. pub fn same_channel(&self, other: &Receiver) -> bool { match (&self.flavor, &other.flavor) { (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, _ => false, } } } impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } } } } impl Clone for Receiver { fn clone(&self) -> Self { let flavor = match &self.flavor { ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), }; Receiver { flavor } } } impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Receiver { .. }") } }