diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/loom/std | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/loom/std')
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_ptr.rs | 34 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u16.rs | 4 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u32.rs | 12 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u64.rs | 72 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs | 76 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u64_native.rs | 4 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs | 12 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs | 57 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_u8.rs | 34 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/atomic_usize.rs | 4 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/barrier.rs | 217 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/mod.rs | 53 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/mutex.rs | 8 | ||||
-rw-r--r-- | vendor/tokio/src/loom/std/parking_lot.rs | 118 |
14 files changed, 536 insertions, 169 deletions
diff --git a/vendor/tokio/src/loom/std/atomic_ptr.rs b/vendor/tokio/src/loom/std/atomic_ptr.rs deleted file mode 100644 index 236645f03..000000000 --- a/vendor/tokio/src/loom/std/atomic_ptr.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::fmt; -use std::ops::{Deref, DerefMut}; - -/// `AtomicPtr` providing an additional `load_unsync` function. -pub(crate) struct AtomicPtr<T> { - inner: std::sync::atomic::AtomicPtr<T>, -} - -impl<T> AtomicPtr<T> { - pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> { - let inner = std::sync::atomic::AtomicPtr::new(ptr); - AtomicPtr { inner } - } -} - -impl<T> Deref for AtomicPtr<T> { - type Target = std::sync::atomic::AtomicPtr<T>; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl<T> DerefMut for AtomicPtr<T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl<T> fmt::Debug for AtomicPtr<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - self.deref().fmt(fmt) - } -} diff --git a/vendor/tokio/src/loom/std/atomic_u16.rs b/vendor/tokio/src/loom/std/atomic_u16.rs index c1c531208..c9e105c19 100644 --- a/vendor/tokio/src/loom/std/atomic_u16.rs +++ b/vendor/tokio/src/loom/std/atomic_u16.rs @@ -2,7 +2,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::ops::Deref; -/// `AtomicU16` providing an additional `load_unsync` function. +/// `AtomicU16` providing an additional `unsync_load` function. pub(crate) struct AtomicU16 { inner: UnsafeCell<std::sync::atomic::AtomicU16>, } @@ -23,7 +23,7 @@ impl AtomicU16 { /// All mutations must have happened before the unsynchronized load. /// Additionally, there must be no concurrent mutations. pub(crate) unsafe fn unsync_load(&self) -> u16 { - *(*self.inner.get()).get_mut() + core::ptr::read(self.inner.get() as *const u16) } } diff --git a/vendor/tokio/src/loom/std/atomic_u32.rs b/vendor/tokio/src/loom/std/atomic_u32.rs index 61f95fb30..ee0d2d380 100644 --- a/vendor/tokio/src/loom/std/atomic_u32.rs +++ b/vendor/tokio/src/loom/std/atomic_u32.rs @@ -2,7 +2,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::ops::Deref; -/// `AtomicU32` providing an additional `load_unsync` function. +/// `AtomicU32` providing an additional `unsync_load` function. pub(crate) struct AtomicU32 { inner: UnsafeCell<std::sync::atomic::AtomicU32>, } @@ -15,6 +15,16 @@ impl AtomicU32 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val)); AtomicU32 { inner } } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u32 { + core::ptr::read(self.inner.get() as *const u32) + } } impl Deref for AtomicU32 { diff --git a/vendor/tokio/src/loom/std/atomic_u64.rs b/vendor/tokio/src/loom/std/atomic_u64.rs index 7eb457a24..ce391be3e 100644 --- a/vendor/tokio/src/loom/std/atomic_u64.rs +++ b/vendor/tokio/src/loom/std/atomic_u64.rs @@ -2,74 +2,18 @@ //! re-export of `AtomicU64`. On 32 bit platforms, this is implemented using a //! `Mutex`. -pub(crate) use self::imp::AtomicU64; - // `AtomicU64` can only be used on targets with `target_has_atomic` is 64 or greater. // Once `cfg_target_has_atomic` feature is stable, we can replace it with // `#[cfg(target_has_atomic = "64")]`. // Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target -#[cfg(not(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc")))] -mod imp { - pub(crate) use std::sync::atomic::AtomicU64; +cfg_has_atomic_u64! { + #[path = "atomic_u64_native.rs"] + mod imp; } -#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))] -mod imp { - use crate::loom::sync::Mutex; - use std::sync::atomic::Ordering; - - #[derive(Debug)] - pub(crate) struct AtomicU64 { - inner: Mutex<u64>, - } - - impl AtomicU64 { - pub(crate) fn new(val: u64) -> Self { - Self { - inner: Mutex::new(val), - } - } - - pub(crate) fn load(&self, _: Ordering) -> u64 { - *self.inner.lock() - } - - pub(crate) fn store(&self, val: u64, _: Ordering) { - *self.inner.lock() = val; - } - - pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { - let mut lock = self.inner.lock(); - let prev = *lock; - *lock = prev | val; - prev - } - - pub(crate) fn compare_exchange( - &self, - current: u64, - new: u64, - _success: Ordering, - _failure: Ordering, - ) -> Result<u64, u64> { - let mut lock = self.inner.lock(); - - if *lock == current { - *lock = new; - Ok(current) - } else { - Err(*lock) - } - } - - pub(crate) fn compare_exchange_weak( - &self, - current: u64, - new: u64, - success: Ordering, - failure: Ordering, - ) -> Result<u64, u64> { - self.compare_exchange(current, new, success, failure) - } - } +cfg_not_has_atomic_u64! { + #[path = "atomic_u64_as_mutex.rs"] + mod imp; } + +pub(crate) use imp::{AtomicU64, StaticAtomicU64}; diff --git a/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs b/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs new file mode 100644 index 000000000..9b3b6fac6 --- /dev/null +++ b/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs @@ -0,0 +1,76 @@ +use crate::loom::sync::Mutex; +use std::sync::atomic::Ordering; + +cfg_has_const_mutex_new! { + #[path = "atomic_u64_static_const_new.rs"] + mod static_macro; +} + +cfg_not_has_const_mutex_new! { + #[path = "atomic_u64_static_once_cell.rs"] + mod static_macro; +} + +pub(crate) use static_macro::StaticAtomicU64; + +#[derive(Debug)] +pub(crate) struct AtomicU64 { + inner: Mutex<u64>, +} + +impl AtomicU64 { + pub(crate) fn load(&self, _: Ordering) -> u64 { + *self.inner.lock() + } + + pub(crate) fn store(&self, val: u64, _: Ordering) { + *self.inner.lock() = val; + } + + pub(crate) fn fetch_add(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock(); + let prev = *lock; + *lock = prev + val; + prev + } + + pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock(); + let prev = *lock; + *lock = prev | val; + prev + } + + pub(crate) fn compare_exchange( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result<u64, u64> { + let mut lock = self.inner.lock(); + + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) + } + } + + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + success: Ordering, + failure: Ordering, + ) -> Result<u64, u64> { + self.compare_exchange(current, new, success, failure) + } +} + +impl Default for AtomicU64 { + fn default() -> AtomicU64 { + AtomicU64::new(u64::default()) + } +} diff --git a/vendor/tokio/src/loom/std/atomic_u64_native.rs b/vendor/tokio/src/loom/std/atomic_u64_native.rs new file mode 100644 index 000000000..08adb2862 --- /dev/null +++ b/vendor/tokio/src/loom/std/atomic_u64_native.rs @@ -0,0 +1,4 @@ +pub(crate) use std::sync::atomic::{AtomicU64, Ordering}; + +/// Alias `AtomicU64` to `StaticAtomicU64` +pub(crate) type StaticAtomicU64 = AtomicU64; diff --git a/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs b/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs new file mode 100644 index 000000000..a4215342b --- /dev/null +++ b/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs @@ -0,0 +1,12 @@ +use super::AtomicU64; +use crate::loom::sync::Mutex; + +pub(crate) type StaticAtomicU64 = AtomicU64; + +impl AtomicU64 { + pub(crate) const fn new(val: u64) -> Self { + Self { + inner: Mutex::const_new(val), + } + } +} diff --git a/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs b/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs new file mode 100644 index 000000000..40c6172a5 --- /dev/null +++ b/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs @@ -0,0 +1,57 @@ +use super::AtomicU64; +use crate::loom::sync::{atomic::Ordering, Mutex}; +use crate::util::once_cell::OnceCell; + +pub(crate) struct StaticAtomicU64 { + init: u64, + cell: OnceCell<Mutex<u64>>, +} + +impl AtomicU64 { + pub(crate) fn new(val: u64) -> Self { + Self { + inner: Mutex::new(val), + } + } +} + +impl StaticAtomicU64 { + pub(crate) const fn new(val: u64) -> StaticAtomicU64 { + StaticAtomicU64 { + init: val, + cell: OnceCell::new(), + } + } + + pub(crate) fn load(&self, order: Ordering) -> u64 { + *self.inner().lock() + } + + pub(crate) fn fetch_add(&self, val: u64, order: Ordering) -> u64 { + let mut lock = self.inner().lock(); + let prev = *lock; + *lock = prev + val; + prev + } + + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result<u64, u64> { + let mut lock = self.inner().lock(); + + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) + } + } + + fn inner(&self) -> &Mutex<u64> { + self.cell.get(|| Mutex::new(self.init)) + } +} diff --git a/vendor/tokio/src/loom/std/atomic_u8.rs b/vendor/tokio/src/loom/std/atomic_u8.rs deleted file mode 100644 index 408aea338..000000000 --- a/vendor/tokio/src/loom/std/atomic_u8.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::cell::UnsafeCell; -use std::fmt; -use std::ops::Deref; - -/// `AtomicU8` providing an additional `load_unsync` function. -pub(crate) struct AtomicU8 { - inner: UnsafeCell<std::sync::atomic::AtomicU8>, -} - -unsafe impl Send for AtomicU8 {} -unsafe impl Sync for AtomicU8 {} - -impl AtomicU8 { - pub(crate) const fn new(val: u8) -> AtomicU8 { - let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); - AtomicU8 { inner } - } -} - -impl Deref for AtomicU8 { - type Target = std::sync::atomic::AtomicU8; - - fn deref(&self) -> &Self::Target { - // safety: it is always safe to access `&self` fns on the inner value as - // we never perform unsafe mutations. - unsafe { &*self.inner.get() } - } -} - -impl fmt::Debug for AtomicU8 { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - self.deref().fmt(fmt) - } -} diff --git a/vendor/tokio/src/loom/std/atomic_usize.rs b/vendor/tokio/src/loom/std/atomic_usize.rs index 0d5f36e43..c5503a2c1 100644 --- a/vendor/tokio/src/loom/std/atomic_usize.rs +++ b/vendor/tokio/src/loom/std/atomic_usize.rs @@ -2,7 +2,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::ops; -/// `AtomicUsize` providing an additional `load_unsync` function. +/// `AtomicUsize` providing an additional `unsync_load` function. pub(crate) struct AtomicUsize { inner: UnsafeCell<std::sync::atomic::AtomicUsize>, } @@ -23,7 +23,7 @@ impl AtomicUsize { /// All mutations must have happened before the unsynchronized load. /// Additionally, there must be no concurrent mutations. pub(crate) unsafe fn unsync_load(&self) -> usize { - *(*self.inner.get()).get_mut() + core::ptr::read(self.inner.get() as *const usize) } pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R { diff --git a/vendor/tokio/src/loom/std/barrier.rs b/vendor/tokio/src/loom/std/barrier.rs new file mode 100644 index 000000000..a3f0ca0ab --- /dev/null +++ b/vendor/tokio/src/loom/std/barrier.rs @@ -0,0 +1,217 @@ +//! A `Barrier` that provides `wait_timeout`. +//! +//! This implementation mirrors that of the Rust standard library. + +use crate::loom::sync::{Condvar, Mutex}; +use std::fmt; +use std::time::{Duration, Instant}; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Barrier}; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = Arc::clone(&barrier); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +pub(crate) struct Barrier { + lock: Mutex<BarrierState>, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads +/// in the [`Barrier`] have rendezvoused. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +pub(crate) struct BarrierWaitResult(bool); + +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Barrier").finish_non_exhaustive() + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait()`] and then wake + /// up all threads at once when the `n`th thread calls [`wait()`]. + /// + /// [`wait()`]: Barrier::wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(10); + /// ``` + #[must_use] + pub(crate) fn new(n: usize) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning + /// from this function, and all other threads will receive a result that + /// will return `false` from [`BarrierWaitResult::is_leader()`]. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Barrier}; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(Barrier::new(10)); + /// for _ in 0..10 { + /// let c = Arc::clone(&barrier); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + pub(crate) fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + lock = self.cvar.wait(lock).unwrap(); + } + BarrierWaitResult(false) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } + + /// Blocks the current thread until all threads have rendezvoused here for + /// at most `timeout` duration. + pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option<BarrierWaitResult> { + // This implementation mirrors `wait`, but with each blocking operation + // replaced by a timeout-amenable alternative. + + let deadline = Instant::now() + timeout; + + // Acquire `self.lock` with at most `timeout` duration. + let mut lock = loop { + if let Some(guard) = self.lock.try_lock() { + break guard; + } else if Instant::now() > deadline { + return None; + } else { + std::thread::yield_now(); + } + }; + + // Shrink the `timeout` to account for the time taken to acquire `lock`. + let timeout = deadline.saturating_duration_since(Instant::now()); + + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap(); + lock = guard; + if timeout_result.timed_out() { + return None; + } + } + Some(BarrierWaitResult(false)) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + Some(BarrierWaitResult(true)) + } + } +} + +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult") + .field("is_leader", &self.is_leader()) + .finish() + } +} + +impl BarrierWaitResult { + /// Returns `true` if this thread is the "leader thread" for the call to + /// [`Barrier::wait()`]. + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + #[must_use] + pub(crate) fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/vendor/tokio/src/loom/std/mod.rs b/vendor/tokio/src/loom/std/mod.rs index b29cbeeb8..0a732791f 100644 --- a/vendor/tokio/src/loom/std/mod.rs +++ b/vendor/tokio/src/loom/std/mod.rs @@ -1,11 +1,10 @@ #![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] -mod atomic_ptr; mod atomic_u16; mod atomic_u32; mod atomic_u64; -mod atomic_u8; mod atomic_usize; +mod barrier; mod mutex; #[cfg(feature = "parking_lot")] mod parking_lot; @@ -25,6 +24,10 @@ pub(crate) mod future { pub(crate) use crate::sync::AtomicWaker; } +pub(crate) mod hint { + pub(crate) use std::hint::spin_loop; +} + pub(crate) mod rand { use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hash, Hasher}; @@ -67,24 +70,41 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::mutex::Mutex; pub(crate) mod atomic { - pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; pub(crate) use crate::loom::std::atomic_u16::AtomicU16; pub(crate) use crate::loom::std::atomic_u32::AtomicU32; - pub(crate) use crate::loom::std::atomic_u64::AtomicU64; - pub(crate) use crate::loom::std::atomic_u8::AtomicU8; + pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64}; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::{fence, AtomicBool, Ordering}; - // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead. - #[allow(deprecated)] - pub(crate) use std::sync::atomic::spin_loop_hint; + pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering}; } + + pub(crate) use super::barrier::Barrier; } pub(crate) mod sys { #[cfg(feature = "rt-multi-thread")] pub(crate) fn num_cpus() -> usize { - usize::max(1, num_cpus::get()) + const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS"; + + match std::env::var(ENV_WORKER_THREADS) { + Ok(s) => { + let n = s.parse().unwrap_or_else(|e| { + panic!( + "\"{}\" must be usize, error: {}, value: {}", + ENV_WORKER_THREADS, e, s + ) + }); + assert!(n > 0, "\"{}\" cannot be set to 0", ENV_WORKER_THREADS); + n + } + Err(std::env::VarError::NotPresent) => usize::max(1, num_cpus::get()), + Err(std::env::VarError::NotUnicode(e)) => { + panic!( + "\"{}\" must be valid unicode, error: {:?}", + ENV_WORKER_THREADS, e + ) + } + } } #[cfg(not(feature = "rt-multi-thread"))] @@ -93,4 +113,15 @@ pub(crate) mod sys { } } -pub(crate) use std::thread; +pub(crate) mod thread { + #[inline] + pub(crate) fn yield_now() { + std::hint::spin_loop(); + } + + #[allow(unused_imports)] + pub(crate) use std::thread::{ + current, panicking, park, park_timeout, sleep, spawn, AccessError, Builder, JoinHandle, + LocalKey, Result, Thread, ThreadId, + }; +} diff --git a/vendor/tokio/src/loom/std/mutex.rs b/vendor/tokio/src/loom/std/mutex.rs index bf14d6242..076f78611 100644 --- a/vendor/tokio/src/loom/std/mutex.rs +++ b/vendor/tokio/src/loom/std/mutex.rs @@ -1,7 +1,7 @@ use std::sync::{self, MutexGuard, TryLockError}; /// Adapter for `std::Mutex` that removes the poisoning aspects -// from its api +/// from its api. #[derive(Debug)] pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>); @@ -13,6 +13,12 @@ impl<T> Mutex<T> { } #[inline] + #[cfg(not(tokio_no_const_mutex_new))] + pub(crate) const fn const_new(t: T) -> Mutex<T> { + Mutex(sync::Mutex::new(t)) + } + + #[inline] pub(crate) fn lock(&self) -> MutexGuard<'_, T> { match self.0.lock() { Ok(guard) => guard, diff --git a/vendor/tokio/src/loom/std/parking_lot.rs b/vendor/tokio/src/loom/std/parking_lot.rs index 8448bed53..e3af258d1 100644 --- a/vendor/tokio/src/loom/std/parking_lot.rs +++ b/vendor/tokio/src/loom/std/parking_lot.rs @@ -3,83 +3,143 @@ //! //! This can be extended to additional types/methods as required. +use std::fmt; +use std::marker::PhantomData; +use std::ops::{Deref, DerefMut}; use std::sync::LockResult; use std::time::Duration; +// All types in this file are marked with PhantomData to ensure that +// parking_lot's send_guard feature does not leak through and affect when Tokio +// types are Send. +// +// See <https://github.com/tokio-rs/tokio/pull/4359> for more info. + // Types that do not need wrapping -pub(crate) use parking_lot::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult}; +pub(crate) use parking_lot::WaitTimeoutResult; + +#[derive(Debug)] +pub(crate) struct Mutex<T: ?Sized>(PhantomData<std::sync::Mutex<T>>, parking_lot::Mutex<T>); + +#[derive(Debug)] +pub(crate) struct RwLock<T>(PhantomData<std::sync::RwLock<T>>, parking_lot::RwLock<T>); + +#[derive(Debug)] +pub(crate) struct Condvar(PhantomData<std::sync::Condvar>, parking_lot::Condvar); -/// Adapter for `parking_lot::Mutex` to the `std::sync::Mutex` interface. #[derive(Debug)] -pub(crate) struct Mutex<T: ?Sized>(parking_lot::Mutex<T>); +pub(crate) struct MutexGuard<'a, T: ?Sized>( + PhantomData<std::sync::MutexGuard<'a, T>>, + parking_lot::MutexGuard<'a, T>, +); #[derive(Debug)] -pub(crate) struct RwLock<T>(parking_lot::RwLock<T>); +pub(crate) struct RwLockReadGuard<'a, T: ?Sized>( + PhantomData<std::sync::RwLockReadGuard<'a, T>>, + parking_lot::RwLockReadGuard<'a, T>, +); -/// Adapter for `parking_lot::Condvar` to the `std::sync::Condvar` interface. #[derive(Debug)] -pub(crate) struct Condvar(parking_lot::Condvar); +pub(crate) struct RwLockWriteGuard<'a, T: ?Sized>( + PhantomData<std::sync::RwLockWriteGuard<'a, T>>, + parking_lot::RwLockWriteGuard<'a, T>, +); impl<T> Mutex<T> { #[inline] pub(crate) fn new(t: T) -> Mutex<T> { - Mutex(parking_lot::Mutex::new(t)) + Mutex(PhantomData, parking_lot::Mutex::new(t)) } #[inline] - #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(all(feature = "parking_lot",))))] pub(crate) const fn const_new(t: T) -> Mutex<T> { - Mutex(parking_lot::const_mutex(t)) + Mutex(PhantomData, parking_lot::const_mutex(t)) } #[inline] pub(crate) fn lock(&self) -> MutexGuard<'_, T> { - self.0.lock() + MutexGuard(PhantomData, self.1.lock()) } #[inline] pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { - self.0.try_lock() + self.1 + .try_lock() + .map(|guard| MutexGuard(PhantomData, guard)) } #[inline] pub(crate) fn get_mut(&mut self) -> &mut T { - self.0.get_mut() + self.1.get_mut() } // Note: Additional methods `is_poisoned` and `into_inner`, can be // provided here as needed. } +impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.1.deref_mut() + } +} + impl<T> RwLock<T> { pub(crate) fn new(t: T) -> RwLock<T> { - RwLock(parking_lot::RwLock::new(t)) + RwLock(PhantomData, parking_lot::RwLock::new(t)) } pub(crate) fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> { - Ok(self.0.read()) + Ok(RwLockReadGuard(PhantomData, self.1.read())) } pub(crate) fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> { - Ok(self.0.write()) + Ok(RwLockWriteGuard(PhantomData, self.1.write())) + } +} + +impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.1.deref() + } +} + +impl<'a, T: ?Sized> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.1.deref_mut() } } impl Condvar { #[inline] pub(crate) fn new() -> Condvar { - Condvar(parking_lot::Condvar::new()) + Condvar(PhantomData, parking_lot::Condvar::new()) } #[inline] pub(crate) fn notify_one(&self) { - self.0.notify_one(); + self.1.notify_one(); } #[inline] pub(crate) fn notify_all(&self) { - self.0.notify_all(); + self.1.notify_all(); } #[inline] @@ -87,7 +147,7 @@ impl Condvar { &self, mut guard: MutexGuard<'a, T>, ) -> LockResult<MutexGuard<'a, T>> { - self.0.wait(&mut guard); + self.1.wait(&mut guard.1); Ok(guard) } @@ -97,10 +157,28 @@ impl Condvar { mut guard: MutexGuard<'a, T>, timeout: Duration, ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { - let wtr = self.0.wait_for(&mut guard, timeout); + let wtr = self.1.wait_for(&mut guard.1, timeout); Ok((guard, wtr)) } // Note: Additional methods `wait_timeout_ms`, `wait_timeout_until`, // `wait_until` can be provided here as needed. } + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.1, f) + } +} |