use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::runtime::park::CachedParkThread; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{bounded, list, unbounded}; use crate::sync::notify::Notify; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; /// Channel sender. pub(crate) struct Tx { inner: Arc>, } impl fmt::Debug for Tx { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Tx").field("inner", &self.inner).finish() } } /// Channel receiver. pub(crate) struct Rx { inner: Arc>, } impl fmt::Debug for Rx { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Rx").field("inner", &self.inner).finish() } } pub(crate) trait Semaphore { fn is_idle(&self) -> bool; fn add_permit(&self); fn close(&self); fn is_closed(&self) -> bool; } pub(super) struct Chan { /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, /// Handle to the push half of the lock-free list. tx: list::Tx, /// Coordinates access to channel's capacity. semaphore: S, /// Receiver waker. Notified when a value is pushed into the channel. rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// /// When this drops to zero, the send half of the channel is closed. tx_count: AtomicUsize, /// Only accessed by `Rx` handle. rx_fields: UnsafeCell>, } impl fmt::Debug for Chan where S: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() } } /// Fields only accessed by `Rx` handle. struct RxFields { /// Channel receiver. This field is only accessed by the `Receiver` type. list: list::Rx, /// `true` if `Rx::close` is called. rx_closed: bool, } impl fmt::Debug for RxFields { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("RxFields") .field("list", &self.list) .field("rx_closed", &self.rx_closed) .finish() } } unsafe impl Send for Chan {} unsafe impl Sync for Chan {} pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { notify_rx_closed: Notify::new(), tx, semaphore, rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: UnsafeCell::new(RxFields { list: rx, rx_closed: false, }), }); (Tx::new(chan.clone()), Rx::new(chan)) } // ===== impl Tx ===== impl Tx { fn new(chan: Arc>) -> Tx { Tx { inner: chan } } pub(super) fn downgrade(&self) -> Arc> { self.inner.clone() } // Returns the upgraded channel or None if the upgrade failed. pub(super) fn upgrade(chan: Arc>) -> Option { let mut tx_count = chan.tx_count.load(Acquire); loop { if tx_count == 0 { // channel is closed return None; } match chan .tx_count .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire) { Ok(_) => return Some(Tx { inner: chan }), Err(prev_count) => tx_count = prev_count, } } } pub(super) fn semaphore(&self) -> &S { &self.inner.semaphore } /// Send a message and notify the receiver. pub(crate) fn send(&self, value: T) { self.inner.send(value); } /// Wake the receive half pub(crate) fn wake_rx(&self) { self.inner.rx_waker.wake(); } /// Returns `true` if senders belong to the same channel. pub(crate) fn same_channel(&self, other: &Self) -> bool { Arc::ptr_eq(&self.inner, &other.inner) } } impl Tx { pub(crate) fn is_closed(&self) -> bool { self.inner.semaphore.is_closed() } pub(crate) async fn closed(&self) { // In order to avoid a race condition, we first request a notification, // **then** check whether the semaphore is closed. If the semaphore is // closed the notification request is dropped. let notified = self.inner.notify_rx_closed.notified(); if self.inner.semaphore.is_closed() { return; } notified.await; } } impl Clone for Tx { fn clone(&self) -> Tx { // Using a Relaxed ordering here is sufficient as the caller holds a // strong ref to `self`, preventing a concurrent decrement to zero. self.inner.tx_count.fetch_add(1, Relaxed); Tx { inner: self.inner.clone(), } } } impl Drop for Tx { fn drop(&mut self) { if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; } // Close the list, which sends a `Close` message self.inner.tx.close(); // Notify the receiver self.wake_rx(); } } // ===== impl Rx ===== impl Rx { fn new(chan: Arc>) -> Rx { Rx { inner: chan } } pub(crate) fn close(&mut self) { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; if rx_fields.rx_closed { return; } rx_fields.rx_closed = true; }); self.inner.semaphore.close(); self.inner.notify_rx_closed.notify_waiters(); } /// Receive the next value pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; macro_rules! try_recv { () => { match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); coop.made_progress(); return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most // likely can only return `true` at this point. A // channel is closed when all tx handles are // dropped. Dropping a tx handle releases memory, // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); coop.made_progress(); return Ready(None); } None => {} // fall through } }; } try_recv!(); self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a // second time here. try_recv!(); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { coop.made_progress(); Ready(None) } else { Pending } }) } /// Try to receive the next value. pub(crate) fn try_recv(&mut self) -> Result { use super::list::TryPopResult; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; macro_rules! try_recv { () => { match rx_fields.list.try_pop(&self.inner.tx) { TryPopResult::Ok(value) => { self.inner.semaphore.add_permit(); return Ok(value); } TryPopResult::Closed => return Err(TryRecvError::Disconnected), TryPopResult::Empty => return Err(TryRecvError::Empty), TryPopResult::Busy => {} // fall through } }; } try_recv!(); // If a previous `poll_recv` call has set a waker, we wake it here. // This allows us to put our own CachedParkThread waker in the // AtomicWaker slot instead. // // This is not a spurious wakeup to `poll_recv` since we just got a // Busy from `try_pop`, which only happens if there are messages in // the queue. self.inner.rx_waker.wake(); // Park the thread until the problematic send has completed. let mut park = CachedParkThread::new(); let waker = park.waker().unwrap(); loop { self.inner.rx_waker.register_by_ref(&waker); // It is possible that the problematic send has now completed, // so we have to check for messages again. try_recv!(); park.park(); } }) } } impl Drop for Rx { fn drop(&mut self) { use super::block::Read::Value; self.close(); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { self.inner.semaphore.add_permit(); } }) } } // ===== impl Chan ===== impl Chan { fn send(&self, value: T) { // Push the value self.tx.push(value); // Notify the rx task self.rx_waker.wake(); } } impl Drop for Chan { fn drop(&mut self) { use super::block::Read::Value; // Safety: the only owner of the rx fields is Chan, and being // inside its own Drop means we're the last ones to touch it. self.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} unsafe { rx_fields.list.free_blocks() }; }); } } // ===== impl Semaphore for (::Semaphore, capacity) ===== impl Semaphore for bounded::Semaphore { fn add_permit(&self) { self.semaphore.release(1) } fn is_idle(&self) -> bool { self.semaphore.available_permits() == self.bound } fn close(&self) { self.semaphore.close(); } fn is_closed(&self) -> bool { self.semaphore.is_closed() } } // ===== impl Semaphore for AtomicUsize ===== impl Semaphore for unbounded::Semaphore { fn add_permit(&self) { let prev = self.0.fetch_sub(2, Release); if prev >> 1 == 0 { // Something went wrong process::abort(); } } fn is_idle(&self) -> bool { self.0.load(Acquire) >> 1 == 0 } fn close(&self) { self.0.fetch_or(1, Release); } fn is_closed(&self) -> bool { self.0.load(Acquire) & 1 == 1 } }