diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/tokio/src/loom | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/loom')
-rw-r--r-- | third_party/rust/tokio/src/loom/mocked.rs | 52 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/mod.rs | 14 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u16.rs | 44 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u32.rs | 44 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u64.rs | 19 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u64_as_mutex.rs | 76 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u64_native.rs | 4 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u64_static_const_new.rs | 12 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_u64_static_once_cell.rs | 57 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/atomic_usize.rs | 56 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/barrier.rs | 217 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/mod.rs | 127 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/mutex.rs | 37 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/parking_lot.rs | 184 | ||||
-rw-r--r-- | third_party/rust/tokio/src/loom/std/unsafe_cell.rs | 16 |
15 files changed, 959 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/loom/mocked.rs b/third_party/rust/tokio/src/loom/mocked.rs new file mode 100644 index 0000000000..56dc1a0634 --- /dev/null +++ b/third_party/rust/tokio/src/loom/mocked.rs @@ -0,0 +1,52 @@ +pub(crate) use loom::*; + +pub(crate) mod sync { + + pub(crate) use loom::sync::MutexGuard; + + #[derive(Debug)] + pub(crate) struct Mutex<T>(loom::sync::Mutex<T>); + + #[allow(dead_code)] + impl<T> Mutex<T> { + #[inline] + pub(crate) fn new(t: T) -> Mutex<T> { + Mutex(loom::sync::Mutex::new(t)) + } + + #[inline] + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + self.0.lock().unwrap() + } + + #[inline] + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + self.0.try_lock().ok() + } + } + pub(crate) use loom::sync::*; + + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::*; + + // TODO: implement a loom version + pub(crate) type StaticAtomicU64 = std::sync::atomic::AtomicU64; + } +} + +pub(crate) mod rand { + pub(crate) fn seed() -> u64 { + 1 + } +} + +pub(crate) mod sys { + pub(crate) fn num_cpus() -> usize { + 2 + } +} + +pub(crate) mod thread { + pub use loom::lazy_static::AccessError; + pub use loom::thread::*; +} diff --git a/third_party/rust/tokio/src/loom/mod.rs b/third_party/rust/tokio/src/loom/mod.rs new file mode 100644 index 0000000000..5957b5377d --- /dev/null +++ b/third_party/rust/tokio/src/loom/mod.rs @@ -0,0 +1,14 @@ +//! This module abstracts over `loom` and `std::sync` depending on whether we +//! are running tests or not. + +#![allow(unused)] + +#[cfg(not(all(test, loom)))] +mod std; +#[cfg(not(all(test, loom)))] +pub(crate) use self::std::*; + +#[cfg(all(test, loom))] +mod mocked; +#[cfg(all(test, loom))] +pub(crate) use self::mocked::*; diff --git a/third_party/rust/tokio/src/loom/std/atomic_u16.rs b/third_party/rust/tokio/src/loom/std/atomic_u16.rs new file mode 100644 index 0000000000..c9e105c193 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u16.rs @@ -0,0 +1,44 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; + +/// `AtomicU16` providing an additional `unsync_load` function. +pub(crate) struct AtomicU16 { + inner: UnsafeCell<std::sync::atomic::AtomicU16>, +} + +unsafe impl Send for AtomicU16 {} +unsafe impl Sync for AtomicU16 {} + +impl AtomicU16 { + pub(crate) const fn new(val: u16) -> AtomicU16 { + let inner = UnsafeCell::new(std::sync::atomic::AtomicU16::new(val)); + AtomicU16 { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u16 { + core::ptr::read(self.inner.get() as *const u16) + } +} + +impl Deref for AtomicU16 { + type Target = std::sync::atomic::AtomicU16; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl fmt::Debug for AtomicU16 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/third_party/rust/tokio/src/loom/std/atomic_u32.rs b/third_party/rust/tokio/src/loom/std/atomic_u32.rs new file mode 100644 index 0000000000..ee0d2d3805 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u32.rs @@ -0,0 +1,44 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; + +/// `AtomicU32` providing an additional `unsync_load` function. +pub(crate) struct AtomicU32 { + inner: UnsafeCell<std::sync::atomic::AtomicU32>, +} + +unsafe impl Send for AtomicU32 {} +unsafe impl Sync for AtomicU32 {} + +impl AtomicU32 { + pub(crate) const fn new(val: u32) -> AtomicU32 { + let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val)); + AtomicU32 { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u32 { + core::ptr::read(self.inner.get() as *const u32) + } +} + +impl Deref for AtomicU32 { + type Target = std::sync::atomic::AtomicU32; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl fmt::Debug for AtomicU32 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/third_party/rust/tokio/src/loom/std/atomic_u64.rs b/third_party/rust/tokio/src/loom/std/atomic_u64.rs new file mode 100644 index 0000000000..ce391be3e1 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u64.rs @@ -0,0 +1,19 @@ +//! Implementation of an atomic u64 cell. On 64 bit platforms, this is a +//! re-export of `AtomicU64`. On 32 bit platforms, this is implemented using a +//! `Mutex`. + +// `AtomicU64` can only be used on targets with `target_has_atomic` is 64 or greater. +// Once `cfg_target_has_atomic` feature is stable, we can replace it with +// `#[cfg(target_has_atomic = "64")]`. +// Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target +cfg_has_atomic_u64! { + #[path = "atomic_u64_native.rs"] + mod imp; +} + +cfg_not_has_atomic_u64! { + #[path = "atomic_u64_as_mutex.rs"] + mod imp; +} + +pub(crate) use imp::{AtomicU64, StaticAtomicU64}; diff --git a/third_party/rust/tokio/src/loom/std/atomic_u64_as_mutex.rs b/third_party/rust/tokio/src/loom/std/atomic_u64_as_mutex.rs new file mode 100644 index 0000000000..9b3b6fac68 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u64_as_mutex.rs @@ -0,0 +1,76 @@ +use crate::loom::sync::Mutex; +use std::sync::atomic::Ordering; + +cfg_has_const_mutex_new! { + #[path = "atomic_u64_static_const_new.rs"] + mod static_macro; +} + +cfg_not_has_const_mutex_new! { + #[path = "atomic_u64_static_once_cell.rs"] + mod static_macro; +} + +pub(crate) use static_macro::StaticAtomicU64; + +#[derive(Debug)] +pub(crate) struct AtomicU64 { + inner: Mutex<u64>, +} + +impl AtomicU64 { + pub(crate) fn load(&self, _: Ordering) -> u64 { + *self.inner.lock() + } + + pub(crate) fn store(&self, val: u64, _: Ordering) { + *self.inner.lock() = val; + } + + pub(crate) fn fetch_add(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock(); + let prev = *lock; + *lock = prev + val; + prev + } + + pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock(); + let prev = *lock; + *lock = prev | val; + prev + } + + pub(crate) fn compare_exchange( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result<u64, u64> { + let mut lock = self.inner.lock(); + + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) + } + } + + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + success: Ordering, + failure: Ordering, + ) -> Result<u64, u64> { + self.compare_exchange(current, new, success, failure) + } +} + +impl Default for AtomicU64 { + fn default() -> AtomicU64 { + AtomicU64::new(u64::default()) + } +} diff --git a/third_party/rust/tokio/src/loom/std/atomic_u64_native.rs b/third_party/rust/tokio/src/loom/std/atomic_u64_native.rs new file mode 100644 index 0000000000..08adb28629 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u64_native.rs @@ -0,0 +1,4 @@ +pub(crate) use std::sync::atomic::{AtomicU64, Ordering}; + +/// Alias `AtomicU64` to `StaticAtomicU64` +pub(crate) type StaticAtomicU64 = AtomicU64; diff --git a/third_party/rust/tokio/src/loom/std/atomic_u64_static_const_new.rs b/third_party/rust/tokio/src/loom/std/atomic_u64_static_const_new.rs new file mode 100644 index 0000000000..a4215342b6 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u64_static_const_new.rs @@ -0,0 +1,12 @@ +use super::AtomicU64; +use crate::loom::sync::Mutex; + +pub(crate) type StaticAtomicU64 = AtomicU64; + +impl AtomicU64 { + pub(crate) const fn new(val: u64) -> Self { + Self { + inner: Mutex::const_new(val), + } + } +} diff --git a/third_party/rust/tokio/src/loom/std/atomic_u64_static_once_cell.rs b/third_party/rust/tokio/src/loom/std/atomic_u64_static_once_cell.rs new file mode 100644 index 0000000000..40c6172a52 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_u64_static_once_cell.rs @@ -0,0 +1,57 @@ +use super::AtomicU64; +use crate::loom::sync::{atomic::Ordering, Mutex}; +use crate::util::once_cell::OnceCell; + +pub(crate) struct StaticAtomicU64 { + init: u64, + cell: OnceCell<Mutex<u64>>, +} + +impl AtomicU64 { + pub(crate) fn new(val: u64) -> Self { + Self { + inner: Mutex::new(val), + } + } +} + +impl StaticAtomicU64 { + pub(crate) const fn new(val: u64) -> StaticAtomicU64 { + StaticAtomicU64 { + init: val, + cell: OnceCell::new(), + } + } + + pub(crate) fn load(&self, order: Ordering) -> u64 { + *self.inner().lock() + } + + pub(crate) fn fetch_add(&self, val: u64, order: Ordering) -> u64 { + let mut lock = self.inner().lock(); + let prev = *lock; + *lock = prev + val; + prev + } + + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result<u64, u64> { + let mut lock = self.inner().lock(); + + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) + } + } + + fn inner(&self) -> &Mutex<u64> { + self.cell.get(|| Mutex::new(self.init)) + } +} diff --git a/third_party/rust/tokio/src/loom/std/atomic_usize.rs b/third_party/rust/tokio/src/loom/std/atomic_usize.rs new file mode 100644 index 0000000000..c5503a2c12 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/atomic_usize.rs @@ -0,0 +1,56 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops; + +/// `AtomicUsize` providing an additional `unsync_load` function. +pub(crate) struct AtomicUsize { + inner: UnsafeCell<std::sync::atomic::AtomicUsize>, +} + +unsafe impl Send for AtomicUsize {} +unsafe impl Sync for AtomicUsize {} + +impl AtomicUsize { + pub(crate) const fn new(val: usize) -> AtomicUsize { + let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val)); + AtomicUsize { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> usize { + core::ptr::read(self.inner.get() as *const usize) + } + + pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R { + // safety: we have mutable access + f(unsafe { (*self.inner.get()).get_mut() }) + } +} + +impl ops::Deref for AtomicUsize { + type Target = std::sync::atomic::AtomicUsize; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl ops::DerefMut for AtomicUsize { + fn deref_mut(&mut self) -> &mut Self::Target { + // safety: we hold `&mut self` + unsafe { &mut *self.inner.get() } + } +} + +impl fmt::Debug for AtomicUsize { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(fmt) + } +} diff --git a/third_party/rust/tokio/src/loom/std/barrier.rs b/third_party/rust/tokio/src/loom/std/barrier.rs new file mode 100644 index 0000000000..a3f0ca0ab6 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/barrier.rs @@ -0,0 +1,217 @@ +//! A `Barrier` that provides `wait_timeout`. +//! +//! This implementation mirrors that of the Rust standard library. + +use crate::loom::sync::{Condvar, Mutex}; +use std::fmt; +use std::time::{Duration, Instant}; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Barrier}; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = Arc::clone(&barrier); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +pub(crate) struct Barrier { + lock: Mutex<BarrierState>, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads +/// in the [`Barrier`] have rendezvoused. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +pub(crate) struct BarrierWaitResult(bool); + +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Barrier").finish_non_exhaustive() + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait()`] and then wake + /// up all threads at once when the `n`th thread calls [`wait()`]. + /// + /// [`wait()`]: Barrier::wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(10); + /// ``` + #[must_use] + pub(crate) fn new(n: usize) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning + /// from this function, and all other threads will receive a result that + /// will return `false` from [`BarrierWaitResult::is_leader()`]. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Barrier}; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(Barrier::new(10)); + /// for _ in 0..10 { + /// let c = Arc::clone(&barrier); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + pub(crate) fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + lock = self.cvar.wait(lock).unwrap(); + } + BarrierWaitResult(false) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } + + /// Blocks the current thread until all threads have rendezvoused here for + /// at most `timeout` duration. + pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option<BarrierWaitResult> { + // This implementation mirrors `wait`, but with each blocking operation + // replaced by a timeout-amenable alternative. + + let deadline = Instant::now() + timeout; + + // Acquire `self.lock` with at most `timeout` duration. + let mut lock = loop { + if let Some(guard) = self.lock.try_lock() { + break guard; + } else if Instant::now() > deadline { + return None; + } else { + std::thread::yield_now(); + } + }; + + // Shrink the `timeout` to account for the time taken to acquire `lock`. + let timeout = deadline.saturating_duration_since(Instant::now()); + + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap(); + lock = guard; + if timeout_result.timed_out() { + return None; + } + } + Some(BarrierWaitResult(false)) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + Some(BarrierWaitResult(true)) + } + } +} + +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult") + .field("is_leader", &self.is_leader()) + .finish() + } +} + +impl BarrierWaitResult { + /// Returns `true` if this thread is the "leader thread" for the call to + /// [`Barrier::wait()`]. + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + #[must_use] + pub(crate) fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/third_party/rust/tokio/src/loom/std/mod.rs b/third_party/rust/tokio/src/loom/std/mod.rs new file mode 100644 index 0000000000..0a732791f7 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/mod.rs @@ -0,0 +1,127 @@ +#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] + +mod atomic_u16; +mod atomic_u32; +mod atomic_u64; +mod atomic_usize; +mod barrier; +mod mutex; +#[cfg(feature = "parking_lot")] +mod parking_lot; +mod unsafe_cell; + +pub(crate) mod cell { + pub(crate) use super::unsafe_cell::UnsafeCell; +} + +#[cfg(any( + feature = "net", + feature = "process", + feature = "signal", + feature = "sync", +))] +pub(crate) mod future { + pub(crate) use crate::sync::AtomicWaker; +} + +pub(crate) mod hint { + pub(crate) use std::hint::spin_loop; +} + +pub(crate) mod rand { + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hash, Hasher}; + use std::sync::atomic::AtomicU32; + use std::sync::atomic::Ordering::Relaxed; + + static COUNTER: AtomicU32 = AtomicU32::new(1); + + pub(crate) fn seed() -> u64 { + let rand_state = RandomState::new(); + + let mut hasher = rand_state.build_hasher(); + + // Hash some unique-ish data to generate some new state + COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); + + // Get the seed + hasher.finish() + } +} + +pub(crate) mod sync { + pub(crate) use std::sync::{Arc, Weak}; + + // Below, make sure all the feature-influenced types are exported for + // internal use. Note however that some are not _currently_ named by + // consuming code. + + #[cfg(feature = "parking_lot")] + #[allow(unused_imports)] + pub(crate) use crate::loom::std::parking_lot::{ + Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult, + }; + + #[cfg(not(feature = "parking_lot"))] + #[allow(unused_imports)] + pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult}; + + #[cfg(not(feature = "parking_lot"))] + pub(crate) use crate::loom::std::mutex::Mutex; + + pub(crate) mod atomic { + pub(crate) use crate::loom::std::atomic_u16::AtomicU16; + pub(crate) use crate::loom::std::atomic_u32::AtomicU32; + pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64}; + pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; + + pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering}; + } + + pub(crate) use super::barrier::Barrier; +} + +pub(crate) mod sys { + #[cfg(feature = "rt-multi-thread")] + pub(crate) fn num_cpus() -> usize { + const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS"; + + match std::env::var(ENV_WORKER_THREADS) { + Ok(s) => { + let n = s.parse().unwrap_or_else(|e| { + panic!( + "\"{}\" must be usize, error: {}, value: {}", + ENV_WORKER_THREADS, e, s + ) + }); + assert!(n > 0, "\"{}\" cannot be set to 0", ENV_WORKER_THREADS); + n + } + Err(std::env::VarError::NotPresent) => usize::max(1, num_cpus::get()), + Err(std::env::VarError::NotUnicode(e)) => { + panic!( + "\"{}\" must be valid unicode, error: {:?}", + ENV_WORKER_THREADS, e + ) + } + } + } + + #[cfg(not(feature = "rt-multi-thread"))] + pub(crate) fn num_cpus() -> usize { + 1 + } +} + +pub(crate) mod thread { + #[inline] + pub(crate) fn yield_now() { + std::hint::spin_loop(); + } + + #[allow(unused_imports)] + pub(crate) use std::thread::{ + current, panicking, park, park_timeout, sleep, spawn, AccessError, Builder, JoinHandle, + LocalKey, Result, Thread, ThreadId, + }; +} diff --git a/third_party/rust/tokio/src/loom/std/mutex.rs b/third_party/rust/tokio/src/loom/std/mutex.rs new file mode 100644 index 0000000000..076f786110 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/mutex.rs @@ -0,0 +1,37 @@ +use std::sync::{self, MutexGuard, TryLockError}; + +/// Adapter for `std::Mutex` that removes the poisoning aspects +/// from its api. +#[derive(Debug)] +pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>); + +#[allow(dead_code)] +impl<T> Mutex<T> { + #[inline] + pub(crate) fn new(t: T) -> Mutex<T> { + Mutex(sync::Mutex::new(t)) + } + + #[inline] + #[cfg(not(tokio_no_const_mutex_new))] + pub(crate) const fn const_new(t: T) -> Mutex<T> { + Mutex(sync::Mutex::new(t)) + } + + #[inline] + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + match self.0.lock() { + Ok(guard) => guard, + Err(p_err) => p_err.into_inner(), + } + } + + #[inline] + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + match self.0.try_lock() { + Ok(guard) => Some(guard), + Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()), + Err(TryLockError::WouldBlock) => None, + } + } +} diff --git a/third_party/rust/tokio/src/loom/std/parking_lot.rs b/third_party/rust/tokio/src/loom/std/parking_lot.rs new file mode 100644 index 0000000000..e3af258d11 --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/parking_lot.rs @@ -0,0 +1,184 @@ +//! A minimal adaption of the `parking_lot` synchronization primitives to the +//! equivalent `std::sync` types. +//! +//! This can be extended to additional types/methods as required. + +use std::fmt; +use std::marker::PhantomData; +use std::ops::{Deref, DerefMut}; +use std::sync::LockResult; +use std::time::Duration; + +// All types in this file are marked with PhantomData to ensure that +// parking_lot's send_guard feature does not leak through and affect when Tokio +// types are Send. +// +// See <https://github.com/tokio-rs/tokio/pull/4359> for more info. + +// Types that do not need wrapping +pub(crate) use parking_lot::WaitTimeoutResult; + +#[derive(Debug)] +pub(crate) struct Mutex<T: ?Sized>(PhantomData<std::sync::Mutex<T>>, parking_lot::Mutex<T>); + +#[derive(Debug)] +pub(crate) struct RwLock<T>(PhantomData<std::sync::RwLock<T>>, parking_lot::RwLock<T>); + +#[derive(Debug)] +pub(crate) struct Condvar(PhantomData<std::sync::Condvar>, parking_lot::Condvar); + +#[derive(Debug)] +pub(crate) struct MutexGuard<'a, T: ?Sized>( + PhantomData<std::sync::MutexGuard<'a, T>>, + parking_lot::MutexGuard<'a, T>, +); + +#[derive(Debug)] +pub(crate) struct RwLockReadGuard<'a, T: ?Sized>( + PhantomData<std::sync::RwLockReadGuard<'a, T>>, + parking_lot::RwLockReadGuard<'a, T>, +); + +#[derive(Debug)] +pub(crate) struct RwLockWriteGuard<'a, T: ?Sized>( + PhantomData<std::sync::RwLockWriteGuard<'a, T>>, + parking_lot::RwLockWriteGuard<'a, T>, +); + +impl<T> Mutex<T> { + #[inline] + pub(crate) fn new(t: T) -> Mutex<T> { + Mutex(PhantomData, parking_lot::Mutex::new(t)) + } + + #[inline] + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "parking_lot",))))] + pub(crate) const fn const_new(t: T) -> Mutex<T> { + Mutex(PhantomData, parking_lot::const_mutex(t)) + } + + #[inline] + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + MutexGuard(PhantomData, self.1.lock()) + } + + #[inline] + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + self.1 + .try_lock() + .map(|guard| MutexGuard(PhantomData, guard)) + } + + #[inline] + pub(crate) fn get_mut(&mut self) -> &mut T { + self.1.get_mut() + } + + // Note: Additional methods `is_poisoned` and `into_inner`, can be + // provided here as needed. +} + +impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.1.deref_mut() + } +} + +impl<T> RwLock<T> { + pub(crate) fn new(t: T) -> RwLock<T> { + RwLock(PhantomData, parking_lot::RwLock::new(t)) + } + + pub(crate) fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> { + Ok(RwLockReadGuard(PhantomData, self.1.read())) + } + + pub(crate) fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> { + Ok(RwLockWriteGuard(PhantomData, self.1.write())) + } +} + +impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.1.deref_mut() + } +} + +impl Condvar { + #[inline] + pub(crate) fn new() -> Condvar { + Condvar(PhantomData, parking_lot::Condvar::new()) + } + + #[inline] + pub(crate) fn notify_one(&self) { + self.1.notify_one(); + } + + #[inline] + pub(crate) fn notify_all(&self) { + self.1.notify_all(); + } + + #[inline] + pub(crate) fn wait<'a, T>( + &self, + mut guard: MutexGuard<'a, T>, + ) -> LockResult<MutexGuard<'a, T>> { + self.1.wait(&mut guard.1); + Ok(guard) + } + + #[inline] + pub(crate) fn wait_timeout<'a, T>( + &self, + mut guard: MutexGuard<'a, T>, + timeout: Duration, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { + let wtr = self.1.wait_for(&mut guard.1, timeout); + Ok((guard, wtr)) + } + + // Note: Additional methods `wait_timeout_ms`, `wait_timeout_until`, + // `wait_until` can be provided here as needed. +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} diff --git a/third_party/rust/tokio/src/loom/std/unsafe_cell.rs b/third_party/rust/tokio/src/loom/std/unsafe_cell.rs new file mode 100644 index 0000000000..66c1d7943e --- /dev/null +++ b/third_party/rust/tokio/src/loom/std/unsafe_cell.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>); + +impl<T> UnsafeCell<T> { + pub(crate) const fn new(data: T) -> UnsafeCell<T> { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R { + f(self.0.get()) + } + + pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R { + f(self.0.get()) + } +} |