diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /vendor/tokio/src/sync/mpsc/chan.rs | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/sync/mpsc/chan.rs')
-rw-r--r-- | vendor/tokio/src/sync/mpsc/chan.rs | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/mpsc/chan.rs b/vendor/tokio/src/sync/mpsc/chan.rs new file mode 100644 index 000000000..554d02284 --- /dev/null +++ b/vendor/tokio/src/sync/mpsc/chan.rs @@ -0,0 +1,357 @@ +use crate::loom::cell::UnsafeCell; +use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::sync::mpsc::list; +use crate::sync::notify::Notify; + +use std::fmt; +use std::process; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; + +/// Channel sender +pub(crate) struct Tx<T, S> { + inner: Arc<Chan<T, S>>, +} + +impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx").field("inner", &self.inner).finish() + } +} + +/// Channel receiver +pub(crate) struct Rx<T, S: Semaphore> { + inner: Arc<Chan<T, S>>, +} + +impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx").field("inner", &self.inner).finish() + } +} + +pub(crate) trait Semaphore { + fn is_idle(&self) -> bool; + + fn add_permit(&self); + + fn close(&self); + + fn is_closed(&self) -> bool; +} + +struct Chan<T, S> { + /// Notifies all tasks listening for the receiver being dropped + notify_rx_closed: Notify, + + /// Handle to the push half of the lock-free list. + tx: list::Tx<T>, + + /// Coordinates access to channel's capacity. + semaphore: S, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, + + /// Tracks the number of outstanding sender handles. + /// + /// When this drops to zero, the send half of the channel is closed. + tx_count: AtomicUsize, + + /// Only accessed by `Rx` handle. + rx_fields: UnsafeCell<RxFields<T>>, +} + +impl<T, S> fmt::Debug for Chan<T, S> +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Chan") + .field("tx", &self.tx) + .field("semaphore", &self.semaphore) + .field("rx_waker", &self.rx_waker) + .field("tx_count", &self.tx_count) + .field("rx_fields", &"...") + .finish() + } +} + +/// Fields only accessed by `Rx` handle. +struct RxFields<T> { + /// Channel receiver. This field is only accessed by the `Receiver` type. + list: list::Rx<T>, + + /// `true` if `Rx::close` is called. + rx_closed: bool, +} + +impl<T> fmt::Debug for RxFields<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("RxFields") + .field("list", &self.list) + .field("rx_closed", &self.rx_closed) + .finish() + } +} + +unsafe impl<T: Send, S: Send> Send for Chan<T, S> {} +unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {} + +pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) { + let (tx, rx) = list::channel(); + + let chan = Arc::new(Chan { + notify_rx_closed: Notify::new(), + tx, + semaphore, + rx_waker: AtomicWaker::new(), + tx_count: AtomicUsize::new(1), + rx_fields: UnsafeCell::new(RxFields { + list: rx, + rx_closed: false, + }), + }); + + (Tx::new(chan.clone()), Rx::new(chan)) +} + +// ===== impl Tx ===== + +impl<T, S> Tx<T, S> { + fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> { + Tx { inner: chan } + } + + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore + } + + /// Send a message and notify the receiver. + pub(crate) fn send(&self, value: T) { + self.inner.send(value); + } + + /// Wake the receive half + pub(crate) fn wake_rx(&self) { + self.inner.rx_waker.wake(); + } + + /// Returns `true` if senders belong to the same channel. + pub(crate) fn same_channel(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } +} + +impl<T, S: Semaphore> Tx<T, S> { + pub(crate) fn is_closed(&self) -> bool { + self.inner.semaphore.is_closed() + } + + pub(crate) async fn closed(&self) { + // In order to avoid a race condition, we first request a notification, + // **then** check whether the semaphore is closed. If the semaphore is + // closed the notification request is dropped. + let notified = self.inner.notify_rx_closed.notified(); + + if self.inner.semaphore.is_closed() { + return; + } + notified.await; + } +} + +impl<T, S> Clone for Tx<T, S> { + fn clone(&self) -> Tx<T, S> { + // Using a Relaxed ordering here is sufficient as the caller holds a + // strong ref to `self`, preventing a concurrent decrement to zero. + self.inner.tx_count.fetch_add(1, Relaxed); + + Tx { + inner: self.inner.clone(), + } + } +} + +impl<T, S> Drop for Tx<T, S> { + fn drop(&mut self) { + if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { + return; + } + + // Close the list, which sends a `Close` message + self.inner.tx.close(); + + // Notify the receiver + self.wake_rx(); + } +} + +// ===== impl Rx ===== + +impl<T, S: Semaphore> Rx<T, S> { + fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> { + Rx { inner: chan } + } + + pub(crate) fn close(&mut self) { + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + if rx_fields.rx_closed { + return; + } + + rx_fields.rx_closed = true; + }); + + self.inner.semaphore.close(); + self.inner.notify_rx_closed.notify_waiters(); + } + + /// Receive the next value + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + use super::block::Read::*; + + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + coop.made_progress(); + return Ready(Some(value)); + } + Some(Closed) => { + // TODO: This check may not be required as it most + // likely can only return `true` at this point. A + // channel is closed when all tx handles are + // dropped. Dropping a tx handle releases memory, + // which ensures that if dropping the tx handle is + // visible, then all messages sent are also visible. + assert!(self.inner.semaphore.is_idle()); + coop.made_progress(); + return Ready(None); + } + None => {} // fall through + } + }; + } + + try_recv!(); + + self.inner.rx_waker.register_by_ref(cx.waker()); + + // It is possible that a value was pushed between attempting to read + // and registering the task, so we have to check the channel a + // second time here. + try_recv!(); + + if rx_fields.rx_closed && self.inner.semaphore.is_idle() { + coop.made_progress(); + Ready(None) + } else { + Pending + } + }) + } +} + +impl<T, S: Semaphore> Drop for Rx<T, S> { + fn drop(&mut self) { + use super::block::Read::Value; + + self.close(); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { + self.inner.semaphore.add_permit(); + } + }) + } +} + +// ===== impl Chan ===== + +impl<T, S> Chan<T, S> { + fn send(&self, value: T) { + // Push the value + self.tx.push(value); + + // Notify the rx task + self.rx_waker.wake(); + } +} + +impl<T, S> Drop for Chan<T, S> { + fn drop(&mut self) { + use super::block::Read::Value; + + // Safety: the only owner of the rx fields is Chan, and eing + // inside its own Drop means we're the last ones to touch it. + self.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} + unsafe { rx_fields.list.free_blocks() }; + }); + } +} + +// ===== impl Semaphore for (::Semaphore, capacity) ===== + +impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { + fn add_permit(&self) { + self.0.release(1) + } + + fn is_idle(&self) -> bool { + self.0.available_permits() == self.1 + } + + fn close(&self) { + self.0.close(); + } + + fn is_closed(&self) -> bool { + self.0.is_closed() + } +} + +// ===== impl Semaphore for AtomicUsize ===== + +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::usize; + +impl Semaphore for AtomicUsize { + fn add_permit(&self) { + let prev = self.fetch_sub(2, Release); + + if prev >> 1 == 0 { + // Something went wrong + process::abort(); + } + } + + fn is_idle(&self) -> bool { + self.load(Acquire) >> 1 == 0 + } + + fn close(&self) { + self.fetch_or(1, Release); + } + + fn is_closed(&self) -> bool { + self.load(Acquire) & 1 == 1 + } +} |