//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by //! all consumers. //! //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`] //! values. [`Sender`] handles are clone-able, allowing concurrent send and //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as //! long as `T` is also `Send` or `Sync` respectively. //! //! When a value is sent, **all** [`Receiver`] handles are notified and will //! receive the value. The value is stored once inside the channel and cloned on //! demand for each receiver. Once all receivers have received a clone of the //! value, the value is released from the channel. //! //! A channel is created by calling [`channel`], specifying the maximum number //! of messages the channel can retain at any given time. //! //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The //! returned [`Receiver`] will receive values sent **after** the call to //! `subscribe`. //! //! ## Lagging //! //! As sent messages must be retained until **all** [`Receiver`] handles receive //! a clone, broadcast channels are suspectible to the "slow receiver" problem. //! In this case, all but one receiver are able to receive values at the rate //! they are sent. Because one receiver is stalled, the channel starts to fill //! up. //! //! This broadcast channel implementation handles this case by setting a hard //! upper bound on the number of values the channel may retain at any given //! time. This upper bound is passed to the [`channel`] function as an argument. //! //! If a value is sent when the channel is at capacity, the oldest value //! currently held by the channel is released. This frees up space for the new //! value. Any receiver that has not yet seen the released value will return //! [`RecvError::Lagged`] the next time [`recv`] is called. //! //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is //! updated to the oldest value contained by the channel. The next call to //! [`recv`] will return this value. //! //! This behavior enables a receiver to detect when it has lagged so far behind //! that data has been dropped. The caller may decide how to respond to this: //! either by aborting its task or by tolerating lost messages and resuming //! consumption of the channel. //! //! ## Closing //! //! When **all** [`Sender`] handles have been dropped, no new values may be //! sent. At this point, the channel is "closed". Once a receiver has received //! all values retained by the channel, the next call to [`recv`] will return //! with [`RecvError::Closed`]. //! //! [`Sender`]: crate::sync::broadcast::Sender //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe //! [`Receiver`]: crate::sync::broadcast::Receiver //! [`channel`]: crate::sync::broadcast::channel //! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged //! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed //! [`recv`]: crate::sync::broadcast::Receiver::recv //! //! # Examples //! //! Basic usage //! //! ``` //! use tokio::sync::broadcast; //! //! #[tokio::main] //! async fn main() { //! let (tx, mut rx1) = broadcast::channel(16); //! let mut rx2 = tx.subscribe(); //! //! tokio::spawn(async move { //! assert_eq!(rx1.recv().await.unwrap(), 10); //! assert_eq!(rx1.recv().await.unwrap(), 20); //! }); //! //! tokio::spawn(async move { //! assert_eq!(rx2.recv().await.unwrap(), 10); //! assert_eq!(rx2.recv().await.unwrap(), 20); //! }); //! //! tx.send(10).unwrap(); //! tx.send(20).unwrap(); //! } //! ``` //! //! Handling lag //! //! ``` //! use tokio::sync::broadcast; //! //! #[tokio::main] //! async fn main() { //! let (tx, mut rx) = broadcast::channel(2); //! //! tx.send(10).unwrap(); //! tx.send(20).unwrap(); //! tx.send(30).unwrap(); //! //! // The receiver lagged behind //! assert!(rx.recv().await.is_err()); //! //! // At this point, we can abort or continue with lost messages //! //! assert_eq!(20, rx.recv().await.unwrap()); //! assert_eq!(30, rx.recv().await.unwrap()); //! } use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize}; use crate::loom::sync::{Arc, Condvar, Mutex}; use std::fmt; use std::mem; use std::ptr; use std::sync::atomic::Ordering::SeqCst; use std::task::{Context, Poll, Waker}; use std::usize; /// Sending-half of the [`broadcast`] channel. /// /// May be used from many threads. Messages can be sent with /// [`send`][Sender::send]. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// [`broadcast`]: crate::sync::broadcast pub struct Sender { shared: Arc>, } /// Receiving-half of the [`broadcast`] channel. /// /// Must not be used concurrently. Messages may be retrieved using /// [`recv`][Receiver::recv]. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// [`broadcast`]: crate::sync::broadcast pub struct Receiver { /// State shared with all receivers and senders. shared: Arc>, /// Next position to read from next: u64, /// Waiter state wait: Arc, } /// Error returned by [`Sender::send`][Sender::send]. /// /// A **send** operation can only fail if there are no active receivers, /// implying that the message could never be received. The error contains the /// message being sent as a payload so it can be recovered. #[derive(Debug)] pub struct SendError(pub T); /// An error returned from the [`recv`] function on a [`Receiver`]. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver #[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. Closed, /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. /// /// Includes the number of skipped messages. Lagged(u64), } /// An error returned from the [`try_recv`] function on a [`Receiver`]. /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver #[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`][Sender] handles, so data may yet become available. Empty, /// There are no more active senders implying no further messages will ever /// be sent. Closed, /// The receiver lagged too far behind and has been forcibly disconnected. /// Attempting to receive again will return the oldest message still /// retained by the channel. /// /// Includes the number of skipped messages. Lagged(u64), } /// Data shared between senders and receivers struct Shared { /// slots in the channel buffer: Box<[Slot]>, /// Mask a position -> index mask: usize, /// Tail of the queue tail: Mutex, /// Notifies a sender that the slot is unlocked condvar: Condvar, /// Stack of pending waiters wait_stack: AtomicPtr, /// Number of outstanding Sender handles num_tx: AtomicUsize, } /// Next position to write a value struct Tail { /// Next position to write to pos: u64, /// Number of active receivers rx_cnt: usize, } /// Slot in the buffer struct Slot { /// Remaining number of receivers that are expected to see this value. /// /// When this goes to zero, the value is released. rem: AtomicUsize, /// Used to lock the `write` field. lock: AtomicUsize, /// The value being broadcast /// /// Synchronized by `state` write: Write, } /// A write in the buffer struct Write { /// Uniquely identifies this write pos: UnsafeCell, /// The written value val: UnsafeCell>, } /// Tracks a waiting receiver #[derive(Debug)] struct WaitNode { /// `true` if queued queued: AtomicBool, /// Task to wake when a permit is made available. waker: AtomicWaker, /// Next pointer in the stack of waiting senders. next: UnsafeCell<*const WaitNode>, } struct RecvGuard<'a, T> { slot: &'a Slot, tail: &'a Mutex, condvar: &'a Condvar, } /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 1; /// Create a bounded, multi-producer, multi-consumer channel where each sent /// value is broadcasted to all active receivers. /// /// All data sent on [`Sender`] will become available on every active /// [`Receiver`] in the same order as it was sent. /// /// The `Sender` can be cloned to `send` to the same channel from multiple /// points in the process or it can be used concurrently from an `Arc`. New /// `Receiver` handles are created by calling [`Sender::subscribe`]. /// /// If all [`Receiver`] handles are dropped, the `send` method will return a /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`] /// method will return a [`RecvError`]. /// /// [`Sender`]: crate::sync::broadcast::Sender /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`SendError`]: crate::sync::broadcast::SendError /// [`RecvError`]: crate::sync::broadcast::RecvError /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` pub fn channel(mut capacity: usize) -> (Sender, Receiver) { assert!(capacity > 0, "capacity is empty"); assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); // Round to a power of two capacity = capacity.next_power_of_two(); let mut buffer = Vec::with_capacity(capacity); for i in 0..capacity { buffer.push(Slot { rem: AtomicUsize::new(0), lock: AtomicUsize::new(0), write: Write { pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)), val: UnsafeCell::new(None), }, }); } let shared = Arc::new(Shared { buffer: buffer.into_boxed_slice(), mask: capacity - 1, tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }), condvar: Condvar::new(), wait_stack: AtomicPtr::new(ptr::null_mut()), num_tx: AtomicUsize::new(1), }); let rx = Receiver { shared: shared.clone(), next: 0, wait: Arc::new(WaitNode { queued: AtomicBool::new(false), waker: AtomicWaker::new(), next: UnsafeCell::new(ptr::null()), }), }; let tx = Sender { shared }; (tx, rx) } unsafe impl Send for Sender {} unsafe impl Sync for Sender {} unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Sender { /// Attempts to send a value to all active [`Receiver`] handles, returning /// it back if it could not be sent. /// /// A successful send occurs when there is at least one active [`Receiver`] /// handle. An unsuccessful send would be one where all associated /// [`Receiver`] handles have already been dropped. /// /// # Return /// /// On success, the number of subscribed [`Receiver`] handles is returned. /// This does not mean that this number of receivers will see the message as /// a receiver may drop before receiving the message. /// /// # Note /// /// A return value of `Ok` **does not** mean that the sent value will be /// observed by all or any of the active [`Receiver`] handles. [`Receiver`] /// handles may be dropped before receiving the sent message. /// /// A return value of `Err` **does not** mean that future calls to `send` /// will fail. New [`Receiver`] handles may be created by calling /// [`subscribe`]. /// /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` pub fn send(&self, value: T) -> Result> { self.send2(Some(value)) .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap())) } /// Creates a new [`Receiver`] handle that will receive values sent **after** /// this call to `subscribe`. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx) = broadcast::channel(16); /// /// // Will not be seen /// tx.send(10).unwrap(); /// /// let mut rx = tx.subscribe(); /// /// tx.send(20).unwrap(); /// /// let value = rx.recv().await.unwrap(); /// assert_eq!(20, value); /// } /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); let mut tail = shared.tail.lock().unwrap(); if tail.rx_cnt == MAX_RECEIVERS { panic!("max receivers"); } tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); let next = tail.pos; drop(tail); Receiver { shared, next, wait: Arc::new(WaitNode { queued: AtomicBool::new(false), waker: AtomicWaker::new(), next: UnsafeCell::new(ptr::null()), }), } } /// Returns the number of active receivers /// /// An active receiver is a [`Receiver`] handle returned from [`channel`] or /// [`subscribe`]. These are the handles that will receive values sent on /// this [`Sender`]. /// /// # Note /// /// It is not guaranteed that a sent message will reach this number of /// receivers. Active receivers may never call [`recv`] again before /// dropping. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`Sender`]: crate::sync::broadcast::Sender /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe /// [`channel`]: crate::sync::broadcast::channel /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx1) = broadcast::channel(16); /// /// assert_eq!(1, tx.receiver_count()); /// /// let mut _rx2 = tx.subscribe(); /// /// assert_eq!(2, tx.receiver_count()); /// /// tx.send(10).unwrap(); /// } /// ``` pub fn receiver_count(&self) -> usize { let tail = self.shared.tail.lock().unwrap(); tail.rx_cnt } fn send2(&self, value: Option) -> Result>> { let mut tail = self.shared.tail.lock().unwrap(); if tail.rx_cnt == 0 { return Err(SendError(value)); } // Position to write into let pos = tail.pos; let rem = tail.rx_cnt; let idx = (pos & self.shared.mask as u64) as usize; // Update the tail position tail.pos = tail.pos.wrapping_add(1); // Get the slot let slot = &self.shared.buffer[idx]; // Acquire the write lock let mut prev = slot.lock.fetch_or(1, SeqCst); while prev & !1 != 0 { // Concurrent readers, we must go to sleep tail = self.shared.condvar.wait(tail).unwrap(); prev = slot.lock.load(SeqCst); if prev & 1 == 0 { // The writer lock bit was cleared while this thread was // sleeping. This can only happen if a newer write happened on // this slot by another thread. Bail early as an optimization, // there is nothing left to do. return Ok(rem); } } if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 { // There is a newer pending write to the same slot. return Ok(rem); } // Slot lock acquired slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos }); slot.write.val.with_mut(|ptr| unsafe { *ptr = value }); // Set remaining receivers slot.rem.store(rem, SeqCst); // Release the slot lock slot.lock.store(0, SeqCst); // Release the mutex. This must happen after the slot lock is released, // otherwise the writer lock bit could be cleared while another thread // is in the critical section. drop(tail); // Notify waiting receivers self.notify_rx(); Ok(rem) } fn notify_rx(&self) { let mut curr = self.shared.wait_stack.swap(ptr::null_mut(), SeqCst) as *const WaitNode; while !curr.is_null() { let waiter = unsafe { Arc::from_raw(curr) }; // Update `curr` before toggling `queued` and waking curr = waiter.next.with(|ptr| unsafe { *ptr }); // Unset queued waiter.queued.store(false, SeqCst); // Wake waiter.waker.wake(); } } } impl Clone for Sender { fn clone(&self) -> Sender { let shared = self.shared.clone(); shared.num_tx.fetch_add(1, SeqCst); Sender { shared } } } impl Drop for Sender { fn drop(&mut self) { if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) { let _ = self.send2(None); } } } impl Receiver { /// Locks the next value if there is one. /// /// The caller is responsible for unlocking fn recv_ref(&mut self, spin: bool) -> Result, TryRecvError> { let idx = (self.next & self.shared.mask as u64) as usize; // The slot holding the next value to read let slot = &self.shared.buffer[idx]; // Lock the slot if !slot.try_rx_lock() { if spin { while !slot.try_rx_lock() { spin_loop_hint(); } } else { return Err(TryRecvError::Empty); } } let guard = RecvGuard { slot, tail: &self.shared.tail, condvar: &self.shared.condvar, }; if guard.pos() != self.next { let pos = guard.pos(); guard.drop_no_rem_dec(); if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next { return Err(TryRecvError::Empty); } else { let tail = self.shared.tail.lock().unwrap(); // `tail.pos` points to the slot the **next** send writes to. // Because a receiver is lagging, this slot also holds the // oldest value. To make the positions match, we subtract the // capacity. let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64); let missed = next.wrapping_sub(self.next); self.next = next; return Err(TryRecvError::Lagged(missed)); } } self.next = self.next.wrapping_add(1); Ok(guard) } } impl Receiver where T: Clone, { /// Attempts to return a pending value on this receiver without awaiting. /// /// This is useful for a flavor of "optimistic check" before deciding to /// await on a receiver. /// /// Compared with [`recv`], this function has three failure cases instead of one /// (one for closed, one for an empty buffer, one for a lagging receiver). /// /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have /// dropped, indicating that no further values can be sent on the channel. /// /// If the [`Receiver`] handle falls behind, once the channel is full, newly /// sent values will overwrite old values. At this point, a call to [`recv`] /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s /// internal cursor is updated to point to the oldest value still held by /// the channel. A subsequent call to [`try_recv`] will return this value /// **unless** it has been since overwritten. If there are no values to /// receive, `Err(TryRecvError::Empty)` is returned. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = broadcast::channel(16); /// /// assert!(rx.try_recv().is_err()); /// /// tx.send(10).unwrap(); /// /// let value = rx.try_recv().unwrap(); /// assert_eq!(10, value); /// } /// ``` pub fn try_recv(&mut self) -> Result { let guard = self.recv_ref(false)?; guard.clone_value().ok_or(TryRecvError::Closed) } #[doc(hidden)] // TODO: document pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(value) = ok_empty(self.try_recv())? { return Poll::Ready(Ok(value)); } self.register_waker(cx.waker()); if let Some(value) = ok_empty(self.try_recv())? { Poll::Ready(Ok(value)) } else { Poll::Pending } } /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent /// **after** it has subscribed. /// /// `Err(RecvError::Closed)` is returned when all `Sender` halves have /// dropped, indicating that no further values can be sent on the channel. /// /// If the [`Receiver`] handle falls behind, once the channel is full, newly /// sent values will overwrite old values. At this point, a call to [`recv`] /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s /// internal cursor is updated to point to the oldest value still held by /// the channel. A subsequent call to [`recv`] will return this value /// **unless** it has been since overwritten. /// /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// Handling lag /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = broadcast::channel(2); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// tx.send(30).unwrap(); /// /// // The receiver lagged behind /// assert!(rx.recv().await.is_err()); /// /// // At this point, we can abort or continue with lost messages /// /// assert_eq!(20, rx.recv().await.unwrap()); /// assert_eq!(30, rx.recv().await.unwrap()); /// } pub async fn recv(&mut self) -> Result { use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } fn register_waker(&self, cx: &Waker) { self.wait.waker.register_by_ref(cx); if !self.wait.queued.load(SeqCst) { // Set `queued` before queuing. self.wait.queued.store(true, SeqCst); let mut curr = self.shared.wait_stack.load(SeqCst); // The ref count is decremented in `notify_rx` when all nodes are // removed from the waiter stack. let node = Arc::into_raw(self.wait.clone()) as *mut _; loop { // Safety: `queued == false` means the caller has exclusive // access to `self.wait.next`. self.wait.next.with_mut(|ptr| unsafe { *ptr = curr }); let res = self .shared .wait_stack .compare_exchange(curr, node, SeqCst, SeqCst); match res { Ok(_) => return, Err(actual) => curr = actual, } } } } } #[cfg(feature = "stream")] impl crate::stream::Stream for Receiver where T: Clone, { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { self.poll_recv(cx).map(|v| match v { Ok(v) => Some(Ok(v)), lag @ Err(RecvError::Lagged(_)) => Some(lag), Err(RecvError::Closed) => None, }) } } impl Drop for Receiver { fn drop(&mut self) { let mut tail = self.shared.tail.lock().unwrap(); tail.rx_cnt -= 1; let until = tail.pos; drop(tail); while self.next != until { match self.recv_ref(true) { // Ignore the value Ok(_) => {} // The channel is closed Err(TryRecvError::Closed) => break, // Ignore lagging, we will catch up Err(TryRecvError::Lagged(..)) => {} // Can't be empty Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"), } } } } impl Drop for Shared { fn drop(&mut self) { // Clear the wait stack let mut curr = self.wait_stack.with_mut(|ptr| *ptr as *const WaitNode); while !curr.is_null() { let waiter = unsafe { Arc::from_raw(curr) }; curr = waiter.next.with(|ptr| unsafe { *ptr }); } } } impl fmt::Debug for Sender { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "broadcast::Sender") } } impl fmt::Debug for Receiver { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "broadcast::Receiver") } } impl Slot { /// Tries to lock the slot for a receiver. If `false`, then a sender holds the /// lock and the calling task will be notified once the sender has released /// the lock. fn try_rx_lock(&self) -> bool { let mut curr = self.lock.load(SeqCst); loop { if curr & 1 == 1 { // Locked by sender return false; } // Only increment (by 2) if the LSB "lock" bit is not set. let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst); match res { Ok(_) => return true, Err(actual) => curr = actual, } } } fn rx_unlock(&self, tail: &Mutex, condvar: &Condvar, rem_dec: bool) { if rem_dec { // Decrement the remaining counter if 1 == self.rem.fetch_sub(1, SeqCst) { // Last receiver, drop the value self.write.val.with_mut(|ptr| unsafe { *ptr = None }); } } if 1 == self.lock.fetch_sub(2, SeqCst) - 2 { // First acquire the lock to make sure our sender is waiting on the // condition variable, otherwise the notification could be lost. mem::drop(tail.lock().unwrap()); // Wake up senders condvar.notify_all(); } } } impl<'a, T> RecvGuard<'a, T> { fn pos(&self) -> u64 { self.slot.write.pos.with(|ptr| unsafe { *ptr }) } fn clone_value(&self) -> Option where T: Clone, { self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() }) } fn drop_no_rem_dec(self) { self.slot.rx_unlock(self.tail, self.condvar, false); mem::forget(self); } } impl<'a, T> Drop for RecvGuard<'a, T> { fn drop(&mut self) { self.slot.rx_unlock(self.tail, self.condvar, true) } } fn ok_empty(res: Result) -> Result, RecvError> { match res { Ok(value) => Ok(Some(value)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Lagged(n)) => Err(RecvError::Lagged(n)), Err(TryRecvError::Closed) => Err(RecvError::Closed), } } impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RecvError::Closed => write!(f, "channel closed"), RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), } } } impl std::error::Error for RecvError {} impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { TryRecvError::Empty => write!(f, "channel empty"), TryRecvError::Closed => write!(f, "channel closed"), TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), } } } impl std::error::Error for TryRecvError {}