From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- third_party/rust/tokio/src/sync/mpsc/chan.rs | 405 +++++++++++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 third_party/rust/tokio/src/sync/mpsc/chan.rs (limited to 'third_party/rust/tokio/src/sync/mpsc/chan.rs') diff --git a/third_party/rust/tokio/src/sync/mpsc/chan.rs b/third_party/rust/tokio/src/sync/mpsc/chan.rs new file mode 100644 index 0000000000..c3007de89c --- /dev/null +++ b/third_party/rust/tokio/src/sync/mpsc/chan.rs @@ -0,0 +1,405 @@ +use crate::loom::cell::UnsafeCell; +use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::park::thread::CachedParkThread; +use crate::park::Park; +use crate::sync::mpsc::error::TryRecvError; +use crate::sync::mpsc::list; +use crate::sync::notify::Notify; + +use std::fmt; +use std::process; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; + +/// Channel sender. +pub(crate) struct Tx { + inner: Arc>, +} + +impl fmt::Debug for Tx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx").field("inner", &self.inner).finish() + } +} + +/// Channel receiver. +pub(crate) struct Rx { + inner: Arc>, +} + +impl fmt::Debug for Rx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx").field("inner", &self.inner).finish() + } +} + +pub(crate) trait Semaphore { + fn is_idle(&self) -> bool; + + fn add_permit(&self); + + fn close(&self); + + fn is_closed(&self) -> bool; +} + +struct Chan { + /// Notifies all tasks listening for the receiver being dropped. + notify_rx_closed: Notify, + + /// Handle to the push half of the lock-free list. + tx: list::Tx, + + /// Coordinates access to channel's capacity. + semaphore: S, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, + + /// Tracks the number of outstanding sender handles. + /// + /// When this drops to zero, the send half of the channel is closed. + tx_count: AtomicUsize, + + /// Only accessed by `Rx` handle. + rx_fields: UnsafeCell>, +} + +impl fmt::Debug for Chan +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Chan") + .field("tx", &self.tx) + .field("semaphore", &self.semaphore) + .field("rx_waker", &self.rx_waker) + .field("tx_count", &self.tx_count) + .field("rx_fields", &"...") + .finish() + } +} + +/// Fields only accessed by `Rx` handle. +struct RxFields { + /// Channel receiver. This field is only accessed by the `Receiver` type. + list: list::Rx, + + /// `true` if `Rx::close` is called. + rx_closed: bool, +} + +impl fmt::Debug for RxFields { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("RxFields") + .field("list", &self.list) + .field("rx_closed", &self.rx_closed) + .finish() + } +} + +unsafe impl Send for Chan {} +unsafe impl Sync for Chan {} + +pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { + let (tx, rx) = list::channel(); + + let chan = Arc::new(Chan { + notify_rx_closed: Notify::new(), + tx, + semaphore, + rx_waker: AtomicWaker::new(), + tx_count: AtomicUsize::new(1), + rx_fields: UnsafeCell::new(RxFields { + list: rx, + rx_closed: false, + }), + }); + + (Tx::new(chan.clone()), Rx::new(chan)) +} + +// ===== impl Tx ===== + +impl Tx { + fn new(chan: Arc>) -> Tx { + Tx { inner: chan } + } + + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore + } + + /// Send a message and notify the receiver. + pub(crate) fn send(&self, value: T) { + self.inner.send(value); + } + + /// Wake the receive half + pub(crate) fn wake_rx(&self) { + self.inner.rx_waker.wake(); + } + + /// Returns `true` if senders belong to the same channel. + pub(crate) fn same_channel(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } +} + +impl Tx { + pub(crate) fn is_closed(&self) -> bool { + self.inner.semaphore.is_closed() + } + + pub(crate) async fn closed(&self) { + // In order to avoid a race condition, we first request a notification, + // **then** check whether the semaphore is closed. If the semaphore is + // closed the notification request is dropped. + let notified = self.inner.notify_rx_closed.notified(); + + if self.inner.semaphore.is_closed() { + return; + } + notified.await; + } +} + +impl Clone for Tx { + fn clone(&self) -> Tx { + // Using a Relaxed ordering here is sufficient as the caller holds a + // strong ref to `self`, preventing a concurrent decrement to zero. + self.inner.tx_count.fetch_add(1, Relaxed); + + Tx { + inner: self.inner.clone(), + } + } +} + +impl Drop for Tx { + fn drop(&mut self) { + if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { + return; + } + + // Close the list, which sends a `Close` message + self.inner.tx.close(); + + // Notify the receiver + self.wake_rx(); + } +} + +// ===== impl Rx ===== + +impl Rx { + fn new(chan: Arc>) -> Rx { + Rx { inner: chan } + } + + pub(crate) fn close(&mut self) { + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + if rx_fields.rx_closed { + return; + } + + rx_fields.rx_closed = true; + }); + + self.inner.semaphore.close(); + self.inner.notify_rx_closed.notify_waiters(); + } + + /// Receive the next value + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { + use super::block::Read::*; + + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + coop.made_progress(); + return Ready(Some(value)); + } + Some(Closed) => { + // TODO: This check may not be required as it most + // likely can only return `true` at this point. A + // channel is closed when all tx handles are + // dropped. Dropping a tx handle releases memory, + // which ensures that if dropping the tx handle is + // visible, then all messages sent are also visible. + assert!(self.inner.semaphore.is_idle()); + coop.made_progress(); + return Ready(None); + } + None => {} // fall through + } + }; + } + + try_recv!(); + + self.inner.rx_waker.register_by_ref(cx.waker()); + + // It is possible that a value was pushed between attempting to read + // and registering the task, so we have to check the channel a + // second time here. + try_recv!(); + + if rx_fields.rx_closed && self.inner.semaphore.is_idle() { + coop.made_progress(); + Ready(None) + } else { + Pending + } + }) + } + + /// Try to receive the next value. + pub(crate) fn try_recv(&mut self) -> Result { + use super::list::TryPopResult; + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.try_pop(&self.inner.tx) { + TryPopResult::Ok(value) => { + self.inner.semaphore.add_permit(); + return Ok(value); + } + TryPopResult::Closed => return Err(TryRecvError::Disconnected), + TryPopResult::Empty => return Err(TryRecvError::Empty), + TryPopResult::Busy => {} // fall through + } + }; + } + + try_recv!(); + + // If a previous `poll_recv` call has set a waker, we wake it here. + // This allows us to put our own CachedParkThread waker in the + // AtomicWaker slot instead. + // + // This is not a spurious wakeup to `poll_recv` since we just got a + // Busy from `try_pop`, which only happens if there are messages in + // the queue. + self.inner.rx_waker.wake(); + + // Park the thread until the problematic send has completed. + let mut park = CachedParkThread::new(); + let waker = park.unpark().into_waker(); + loop { + self.inner.rx_waker.register_by_ref(&waker); + // It is possible that the problematic send has now completed, + // so we have to check for messages again. + try_recv!(); + park.park().expect("park failed"); + } + }) + } +} + +impl Drop for Rx { + fn drop(&mut self) { + use super::block::Read::Value; + + self.close(); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { + self.inner.semaphore.add_permit(); + } + }) + } +} + +// ===== impl Chan ===== + +impl Chan { + fn send(&self, value: T) { + // Push the value + self.tx.push(value); + + // Notify the rx task + self.rx_waker.wake(); + } +} + +impl Drop for Chan { + fn drop(&mut self) { + use super::block::Read::Value; + + // Safety: the only owner of the rx fields is Chan, and eing + // inside its own Drop means we're the last ones to touch it. + self.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} + unsafe { rx_fields.list.free_blocks() }; + }); + } +} + +// ===== impl Semaphore for (::Semaphore, capacity) ===== + +impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { + fn add_permit(&self) { + self.0.release(1) + } + + fn is_idle(&self) -> bool { + self.0.available_permits() == self.1 + } + + fn close(&self) { + self.0.close(); + } + + fn is_closed(&self) -> bool { + self.0.is_closed() + } +} + +// ===== impl Semaphore for AtomicUsize ===== + +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::usize; + +impl Semaphore for AtomicUsize { + fn add_permit(&self) { + let prev = self.fetch_sub(2, Release); + + if prev >> 1 == 0 { + // Something went wrong + process::abort(); + } + } + + fn is_idle(&self) -> bool { + self.load(Acquire) >> 1 == 0 + } + + fn close(&self) { + self.fetch_or(1, Release); + } + + fn is_closed(&self) -> bool { + self.load(Acquire) & 1 == 1 + } +} -- cgit v1.2.3