summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/loom/std
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/src/loom/std')
-rw-r--r--vendor/tokio/src/loom/std/atomic_ptr.rs34
-rw-r--r--vendor/tokio/src/loom/std/atomic_u16.rs4
-rw-r--r--vendor/tokio/src/loom/std/atomic_u32.rs12
-rw-r--r--vendor/tokio/src/loom/std/atomic_u64.rs72
-rw-r--r--vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs76
-rw-r--r--vendor/tokio/src/loom/std/atomic_u64_native.rs4
-rw-r--r--vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs12
-rw-r--r--vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs57
-rw-r--r--vendor/tokio/src/loom/std/atomic_u8.rs34
-rw-r--r--vendor/tokio/src/loom/std/atomic_usize.rs4
-rw-r--r--vendor/tokio/src/loom/std/barrier.rs217
-rw-r--r--vendor/tokio/src/loom/std/mod.rs53
-rw-r--r--vendor/tokio/src/loom/std/mutex.rs8
-rw-r--r--vendor/tokio/src/loom/std/parking_lot.rs118
14 files changed, 536 insertions, 169 deletions
diff --git a/vendor/tokio/src/loom/std/atomic_ptr.rs b/vendor/tokio/src/loom/std/atomic_ptr.rs
deleted file mode 100644
index 236645f03..000000000
--- a/vendor/tokio/src/loom/std/atomic_ptr.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use std::fmt;
-use std::ops::{Deref, DerefMut};
-
-/// `AtomicPtr` providing an additional `load_unsync` function.
-pub(crate) struct AtomicPtr<T> {
- inner: std::sync::atomic::AtomicPtr<T>,
-}
-
-impl<T> AtomicPtr<T> {
- pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> {
- let inner = std::sync::atomic::AtomicPtr::new(ptr);
- AtomicPtr { inner }
- }
-}
-
-impl<T> Deref for AtomicPtr<T> {
- type Target = std::sync::atomic::AtomicPtr<T>;
-
- fn deref(&self) -> &Self::Target {
- &self.inner
- }
-}
-
-impl<T> DerefMut for AtomicPtr<T> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.inner
- }
-}
-
-impl<T> fmt::Debug for AtomicPtr<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.deref().fmt(fmt)
- }
-}
diff --git a/vendor/tokio/src/loom/std/atomic_u16.rs b/vendor/tokio/src/loom/std/atomic_u16.rs
index c1c531208..c9e105c19 100644
--- a/vendor/tokio/src/loom/std/atomic_u16.rs
+++ b/vendor/tokio/src/loom/std/atomic_u16.rs
@@ -2,7 +2,7 @@ use std::cell::UnsafeCell;
use std::fmt;
use std::ops::Deref;
-/// `AtomicU16` providing an additional `load_unsync` function.
+/// `AtomicU16` providing an additional `unsync_load` function.
pub(crate) struct AtomicU16 {
inner: UnsafeCell<std::sync::atomic::AtomicU16>,
}
@@ -23,7 +23,7 @@ impl AtomicU16 {
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> u16 {
- *(*self.inner.get()).get_mut()
+ core::ptr::read(self.inner.get() as *const u16)
}
}
diff --git a/vendor/tokio/src/loom/std/atomic_u32.rs b/vendor/tokio/src/loom/std/atomic_u32.rs
index 61f95fb30..ee0d2d380 100644
--- a/vendor/tokio/src/loom/std/atomic_u32.rs
+++ b/vendor/tokio/src/loom/std/atomic_u32.rs
@@ -2,7 +2,7 @@ use std::cell::UnsafeCell;
use std::fmt;
use std::ops::Deref;
-/// `AtomicU32` providing an additional `load_unsync` function.
+/// `AtomicU32` providing an additional `unsync_load` function.
pub(crate) struct AtomicU32 {
inner: UnsafeCell<std::sync::atomic::AtomicU32>,
}
@@ -15,6 +15,16 @@ impl AtomicU32 {
let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val));
AtomicU32 { inner }
}
+
+ /// Performs an unsynchronized load.
+ ///
+ /// # Safety
+ ///
+ /// All mutations must have happened before the unsynchronized load.
+ /// Additionally, there must be no concurrent mutations.
+ pub(crate) unsafe fn unsync_load(&self) -> u32 {
+ core::ptr::read(self.inner.get() as *const u32)
+ }
}
impl Deref for AtomicU32 {
diff --git a/vendor/tokio/src/loom/std/atomic_u64.rs b/vendor/tokio/src/loom/std/atomic_u64.rs
index 7eb457a24..ce391be3e 100644
--- a/vendor/tokio/src/loom/std/atomic_u64.rs
+++ b/vendor/tokio/src/loom/std/atomic_u64.rs
@@ -2,74 +2,18 @@
//! re-export of `AtomicU64`. On 32 bit platforms, this is implemented using a
//! `Mutex`.
-pub(crate) use self::imp::AtomicU64;
-
// `AtomicU64` can only be used on targets with `target_has_atomic` is 64 or greater.
// Once `cfg_target_has_atomic` feature is stable, we can replace it with
// `#[cfg(target_has_atomic = "64")]`.
// Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target
-#[cfg(not(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc")))]
-mod imp {
- pub(crate) use std::sync::atomic::AtomicU64;
+cfg_has_atomic_u64! {
+ #[path = "atomic_u64_native.rs"]
+ mod imp;
}
-#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))]
-mod imp {
- use crate::loom::sync::Mutex;
- use std::sync::atomic::Ordering;
-
- #[derive(Debug)]
- pub(crate) struct AtomicU64 {
- inner: Mutex<u64>,
- }
-
- impl AtomicU64 {
- pub(crate) fn new(val: u64) -> Self {
- Self {
- inner: Mutex::new(val),
- }
- }
-
- pub(crate) fn load(&self, _: Ordering) -> u64 {
- *self.inner.lock()
- }
-
- pub(crate) fn store(&self, val: u64, _: Ordering) {
- *self.inner.lock() = val;
- }
-
- pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
- let mut lock = self.inner.lock();
- let prev = *lock;
- *lock = prev | val;
- prev
- }
-
- pub(crate) fn compare_exchange(
- &self,
- current: u64,
- new: u64,
- _success: Ordering,
- _failure: Ordering,
- ) -> Result<u64, u64> {
- let mut lock = self.inner.lock();
-
- if *lock == current {
- *lock = new;
- Ok(current)
- } else {
- Err(*lock)
- }
- }
-
- pub(crate) fn compare_exchange_weak(
- &self,
- current: u64,
- new: u64,
- success: Ordering,
- failure: Ordering,
- ) -> Result<u64, u64> {
- self.compare_exchange(current, new, success, failure)
- }
- }
+cfg_not_has_atomic_u64! {
+ #[path = "atomic_u64_as_mutex.rs"]
+ mod imp;
}
+
+pub(crate) use imp::{AtomicU64, StaticAtomicU64};
diff --git a/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs b/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs
new file mode 100644
index 000000000..9b3b6fac6
--- /dev/null
+++ b/vendor/tokio/src/loom/std/atomic_u64_as_mutex.rs
@@ -0,0 +1,76 @@
+use crate::loom::sync::Mutex;
+use std::sync::atomic::Ordering;
+
+cfg_has_const_mutex_new! {
+ #[path = "atomic_u64_static_const_new.rs"]
+ mod static_macro;
+}
+
+cfg_not_has_const_mutex_new! {
+ #[path = "atomic_u64_static_once_cell.rs"]
+ mod static_macro;
+}
+
+pub(crate) use static_macro::StaticAtomicU64;
+
+#[derive(Debug)]
+pub(crate) struct AtomicU64 {
+ inner: Mutex<u64>,
+}
+
+impl AtomicU64 {
+ pub(crate) fn load(&self, _: Ordering) -> u64 {
+ *self.inner.lock()
+ }
+
+ pub(crate) fn store(&self, val: u64, _: Ordering) {
+ *self.inner.lock() = val;
+ }
+
+ pub(crate) fn fetch_add(&self, val: u64, _: Ordering) -> u64 {
+ let mut lock = self.inner.lock();
+ let prev = *lock;
+ *lock = prev + val;
+ prev
+ }
+
+ pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
+ let mut lock = self.inner.lock();
+ let prev = *lock;
+ *lock = prev | val;
+ prev
+ }
+
+ pub(crate) fn compare_exchange(
+ &self,
+ current: u64,
+ new: u64,
+ _success: Ordering,
+ _failure: Ordering,
+ ) -> Result<u64, u64> {
+ let mut lock = self.inner.lock();
+
+ if *lock == current {
+ *lock = new;
+ Ok(current)
+ } else {
+ Err(*lock)
+ }
+ }
+
+ pub(crate) fn compare_exchange_weak(
+ &self,
+ current: u64,
+ new: u64,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<u64, u64> {
+ self.compare_exchange(current, new, success, failure)
+ }
+}
+
+impl Default for AtomicU64 {
+ fn default() -> AtomicU64 {
+ AtomicU64::new(u64::default())
+ }
+}
diff --git a/vendor/tokio/src/loom/std/atomic_u64_native.rs b/vendor/tokio/src/loom/std/atomic_u64_native.rs
new file mode 100644
index 000000000..08adb2862
--- /dev/null
+++ b/vendor/tokio/src/loom/std/atomic_u64_native.rs
@@ -0,0 +1,4 @@
+pub(crate) use std::sync::atomic::{AtomicU64, Ordering};
+
+/// Alias `AtomicU64` to `StaticAtomicU64`
+pub(crate) type StaticAtomicU64 = AtomicU64;
diff --git a/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs b/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs
new file mode 100644
index 000000000..a4215342b
--- /dev/null
+++ b/vendor/tokio/src/loom/std/atomic_u64_static_const_new.rs
@@ -0,0 +1,12 @@
+use super::AtomicU64;
+use crate::loom::sync::Mutex;
+
+pub(crate) type StaticAtomicU64 = AtomicU64;
+
+impl AtomicU64 {
+ pub(crate) const fn new(val: u64) -> Self {
+ Self {
+ inner: Mutex::const_new(val),
+ }
+ }
+}
diff --git a/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs b/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs
new file mode 100644
index 000000000..40c6172a5
--- /dev/null
+++ b/vendor/tokio/src/loom/std/atomic_u64_static_once_cell.rs
@@ -0,0 +1,57 @@
+use super::AtomicU64;
+use crate::loom::sync::{atomic::Ordering, Mutex};
+use crate::util::once_cell::OnceCell;
+
+pub(crate) struct StaticAtomicU64 {
+ init: u64,
+ cell: OnceCell<Mutex<u64>>,
+}
+
+impl AtomicU64 {
+ pub(crate) fn new(val: u64) -> Self {
+ Self {
+ inner: Mutex::new(val),
+ }
+ }
+}
+
+impl StaticAtomicU64 {
+ pub(crate) const fn new(val: u64) -> StaticAtomicU64 {
+ StaticAtomicU64 {
+ init: val,
+ cell: OnceCell::new(),
+ }
+ }
+
+ pub(crate) fn load(&self, order: Ordering) -> u64 {
+ *self.inner().lock()
+ }
+
+ pub(crate) fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
+ let mut lock = self.inner().lock();
+ let prev = *lock;
+ *lock = prev + val;
+ prev
+ }
+
+ pub(crate) fn compare_exchange_weak(
+ &self,
+ current: u64,
+ new: u64,
+ _success: Ordering,
+ _failure: Ordering,
+ ) -> Result<u64, u64> {
+ let mut lock = self.inner().lock();
+
+ if *lock == current {
+ *lock = new;
+ Ok(current)
+ } else {
+ Err(*lock)
+ }
+ }
+
+ fn inner(&self) -> &Mutex<u64> {
+ self.cell.get(|| Mutex::new(self.init))
+ }
+}
diff --git a/vendor/tokio/src/loom/std/atomic_u8.rs b/vendor/tokio/src/loom/std/atomic_u8.rs
deleted file mode 100644
index 408aea338..000000000
--- a/vendor/tokio/src/loom/std/atomic_u8.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use std::cell::UnsafeCell;
-use std::fmt;
-use std::ops::Deref;
-
-/// `AtomicU8` providing an additional `load_unsync` function.
-pub(crate) struct AtomicU8 {
- inner: UnsafeCell<std::sync::atomic::AtomicU8>,
-}
-
-unsafe impl Send for AtomicU8 {}
-unsafe impl Sync for AtomicU8 {}
-
-impl AtomicU8 {
- pub(crate) const fn new(val: u8) -> AtomicU8 {
- let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val));
- AtomicU8 { inner }
- }
-}
-
-impl Deref for AtomicU8 {
- type Target = std::sync::atomic::AtomicU8;
-
- fn deref(&self) -> &Self::Target {
- // safety: it is always safe to access `&self` fns on the inner value as
- // we never perform unsafe mutations.
- unsafe { &*self.inner.get() }
- }
-}
-
-impl fmt::Debug for AtomicU8 {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.deref().fmt(fmt)
- }
-}
diff --git a/vendor/tokio/src/loom/std/atomic_usize.rs b/vendor/tokio/src/loom/std/atomic_usize.rs
index 0d5f36e43..c5503a2c1 100644
--- a/vendor/tokio/src/loom/std/atomic_usize.rs
+++ b/vendor/tokio/src/loom/std/atomic_usize.rs
@@ -2,7 +2,7 @@ use std::cell::UnsafeCell;
use std::fmt;
use std::ops;
-/// `AtomicUsize` providing an additional `load_unsync` function.
+/// `AtomicUsize` providing an additional `unsync_load` function.
pub(crate) struct AtomicUsize {
inner: UnsafeCell<std::sync::atomic::AtomicUsize>,
}
@@ -23,7 +23,7 @@ impl AtomicUsize {
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> usize {
- *(*self.inner.get()).get_mut()
+ core::ptr::read(self.inner.get() as *const usize)
}
pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R {
diff --git a/vendor/tokio/src/loom/std/barrier.rs b/vendor/tokio/src/loom/std/barrier.rs
new file mode 100644
index 000000000..a3f0ca0ab
--- /dev/null
+++ b/vendor/tokio/src/loom/std/barrier.rs
@@ -0,0 +1,217 @@
+//! A `Barrier` that provides `wait_timeout`.
+//!
+//! This implementation mirrors that of the Rust standard library.
+
+use crate::loom::sync::{Condvar, Mutex};
+use std::fmt;
+use std::time::{Duration, Instant};
+
+/// A barrier enables multiple threads to synchronize the beginning
+/// of some computation.
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::{Arc, Barrier};
+/// use std::thread;
+///
+/// let mut handles = Vec::with_capacity(10);
+/// let barrier = Arc::new(Barrier::new(10));
+/// for _ in 0..10 {
+/// let c = Arc::clone(&barrier);
+/// // The same messages will be printed together.
+/// // You will NOT see any interleaving.
+/// handles.push(thread::spawn(move|| {
+/// println!("before wait");
+/// c.wait();
+/// println!("after wait");
+/// }));
+/// }
+/// // Wait for other threads to finish.
+/// for handle in handles {
+/// handle.join().unwrap();
+/// }
+/// ```
+pub(crate) struct Barrier {
+ lock: Mutex<BarrierState>,
+ cvar: Condvar,
+ num_threads: usize,
+}
+
+// The inner state of a double barrier
+struct BarrierState {
+ count: usize,
+ generation_id: usize,
+}
+
+/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads
+/// in the [`Barrier`] have rendezvoused.
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::Barrier;
+///
+/// let barrier = Barrier::new(1);
+/// let barrier_wait_result = barrier.wait();
+/// ```
+pub(crate) struct BarrierWaitResult(bool);
+
+impl fmt::Debug for Barrier {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Barrier").finish_non_exhaustive()
+ }
+}
+
+impl Barrier {
+ /// Creates a new barrier that can block a given number of threads.
+ ///
+ /// A barrier will block `n`-1 threads which call [`wait()`] and then wake
+ /// up all threads at once when the `n`th thread calls [`wait()`].
+ ///
+ /// [`wait()`]: Barrier::wait
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Barrier;
+ ///
+ /// let barrier = Barrier::new(10);
+ /// ```
+ #[must_use]
+ pub(crate) fn new(n: usize) -> Barrier {
+ Barrier {
+ lock: Mutex::new(BarrierState {
+ count: 0,
+ generation_id: 0,
+ }),
+ cvar: Condvar::new(),
+ num_threads: n,
+ }
+ }
+
+ /// Blocks the current thread until all threads have rendezvoused here.
+ ///
+ /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// be used continuously.
+ ///
+ /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that
+ /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning
+ /// from this function, and all other threads will receive a result that
+ /// will return `false` from [`BarrierWaitResult::is_leader()`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::{Arc, Barrier};
+ /// use std::thread;
+ ///
+ /// let mut handles = Vec::with_capacity(10);
+ /// let barrier = Arc::new(Barrier::new(10));
+ /// for _ in 0..10 {
+ /// let c = Arc::clone(&barrier);
+ /// // The same messages will be printed together.
+ /// // You will NOT see any interleaving.
+ /// handles.push(thread::spawn(move|| {
+ /// println!("before wait");
+ /// c.wait();
+ /// println!("after wait");
+ /// }));
+ /// }
+ /// // Wait for other threads to finish.
+ /// for handle in handles {
+ /// handle.join().unwrap();
+ /// }
+ /// ```
+ pub(crate) fn wait(&self) -> BarrierWaitResult {
+ let mut lock = self.lock.lock();
+ let local_gen = lock.generation_id;
+ lock.count += 1;
+ if lock.count < self.num_threads {
+ // We need a while loop to guard against spurious wakeups.
+ // https://en.wikipedia.org/wiki/Spurious_wakeup
+ while local_gen == lock.generation_id {
+ lock = self.cvar.wait(lock).unwrap();
+ }
+ BarrierWaitResult(false)
+ } else {
+ lock.count = 0;
+ lock.generation_id = lock.generation_id.wrapping_add(1);
+ self.cvar.notify_all();
+ BarrierWaitResult(true)
+ }
+ }
+
+ /// Blocks the current thread until all threads have rendezvoused here for
+ /// at most `timeout` duration.
+ pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option<BarrierWaitResult> {
+ // This implementation mirrors `wait`, but with each blocking operation
+ // replaced by a timeout-amenable alternative.
+
+ let deadline = Instant::now() + timeout;
+
+ // Acquire `self.lock` with at most `timeout` duration.
+ let mut lock = loop {
+ if let Some(guard) = self.lock.try_lock() {
+ break guard;
+ } else if Instant::now() > deadline {
+ return None;
+ } else {
+ std::thread::yield_now();
+ }
+ };
+
+ // Shrink the `timeout` to account for the time taken to acquire `lock`.
+ let timeout = deadline.saturating_duration_since(Instant::now());
+
+ let local_gen = lock.generation_id;
+ lock.count += 1;
+ if lock.count < self.num_threads {
+ // We need a while loop to guard against spurious wakeups.
+ // https://en.wikipedia.org/wiki/Spurious_wakeup
+ while local_gen == lock.generation_id {
+ let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap();
+ lock = guard;
+ if timeout_result.timed_out() {
+ return None;
+ }
+ }
+ Some(BarrierWaitResult(false))
+ } else {
+ lock.count = 0;
+ lock.generation_id = lock.generation_id.wrapping_add(1);
+ self.cvar.notify_all();
+ Some(BarrierWaitResult(true))
+ }
+ }
+}
+
+impl fmt::Debug for BarrierWaitResult {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BarrierWaitResult")
+ .field("is_leader", &self.is_leader())
+ .finish()
+ }
+}
+
+impl BarrierWaitResult {
+ /// Returns `true` if this thread is the "leader thread" for the call to
+ /// [`Barrier::wait()`].
+ ///
+ /// Only one thread will have `true` returned from their result, all other
+ /// threads will have `false` returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Barrier;
+ ///
+ /// let barrier = Barrier::new(1);
+ /// let barrier_wait_result = barrier.wait();
+ /// println!("{:?}", barrier_wait_result.is_leader());
+ /// ```
+ #[must_use]
+ pub(crate) fn is_leader(&self) -> bool {
+ self.0
+ }
+}
diff --git a/vendor/tokio/src/loom/std/mod.rs b/vendor/tokio/src/loom/std/mod.rs
index b29cbeeb8..0a732791f 100644
--- a/vendor/tokio/src/loom/std/mod.rs
+++ b/vendor/tokio/src/loom/std/mod.rs
@@ -1,11 +1,10 @@
#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))]
-mod atomic_ptr;
mod atomic_u16;
mod atomic_u32;
mod atomic_u64;
-mod atomic_u8;
mod atomic_usize;
+mod barrier;
mod mutex;
#[cfg(feature = "parking_lot")]
mod parking_lot;
@@ -25,6 +24,10 @@ pub(crate) mod future {
pub(crate) use crate::sync::AtomicWaker;
}
+pub(crate) mod hint {
+ pub(crate) use std::hint::spin_loop;
+}
+
pub(crate) mod rand {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash, Hasher};
@@ -67,24 +70,41 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::mutex::Mutex;
pub(crate) mod atomic {
- pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
pub(crate) use crate::loom::std::atomic_u16::AtomicU16;
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
- pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
- pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
+ pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64};
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
- pub(crate) use std::sync::atomic::{fence, AtomicBool, Ordering};
- // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead.
- #[allow(deprecated)]
- pub(crate) use std::sync::atomic::spin_loop_hint;
+ pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering};
}
+
+ pub(crate) use super::barrier::Barrier;
}
pub(crate) mod sys {
#[cfg(feature = "rt-multi-thread")]
pub(crate) fn num_cpus() -> usize {
- usize::max(1, num_cpus::get())
+ const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS";
+
+ match std::env::var(ENV_WORKER_THREADS) {
+ Ok(s) => {
+ let n = s.parse().unwrap_or_else(|e| {
+ panic!(
+ "\"{}\" must be usize, error: {}, value: {}",
+ ENV_WORKER_THREADS, e, s
+ )
+ });
+ assert!(n > 0, "\"{}\" cannot be set to 0", ENV_WORKER_THREADS);
+ n
+ }
+ Err(std::env::VarError::NotPresent) => usize::max(1, num_cpus::get()),
+ Err(std::env::VarError::NotUnicode(e)) => {
+ panic!(
+ "\"{}\" must be valid unicode, error: {:?}",
+ ENV_WORKER_THREADS, e
+ )
+ }
+ }
}
#[cfg(not(feature = "rt-multi-thread"))]
@@ -93,4 +113,15 @@ pub(crate) mod sys {
}
}
-pub(crate) use std::thread;
+pub(crate) mod thread {
+ #[inline]
+ pub(crate) fn yield_now() {
+ std::hint::spin_loop();
+ }
+
+ #[allow(unused_imports)]
+ pub(crate) use std::thread::{
+ current, panicking, park, park_timeout, sleep, spawn, AccessError, Builder, JoinHandle,
+ LocalKey, Result, Thread, ThreadId,
+ };
+}
diff --git a/vendor/tokio/src/loom/std/mutex.rs b/vendor/tokio/src/loom/std/mutex.rs
index bf14d6242..076f78611 100644
--- a/vendor/tokio/src/loom/std/mutex.rs
+++ b/vendor/tokio/src/loom/std/mutex.rs
@@ -1,7 +1,7 @@
use std::sync::{self, MutexGuard, TryLockError};
/// Adapter for `std::Mutex` that removes the poisoning aspects
-// from its api
+/// from its api.
#[derive(Debug)]
pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);
@@ -13,6 +13,12 @@ impl<T> Mutex<T> {
}
#[inline]
+ #[cfg(not(tokio_no_const_mutex_new))]
+ pub(crate) const fn const_new(t: T) -> Mutex<T> {
+ Mutex(sync::Mutex::new(t))
+ }
+
+ #[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
match self.0.lock() {
Ok(guard) => guard,
diff --git a/vendor/tokio/src/loom/std/parking_lot.rs b/vendor/tokio/src/loom/std/parking_lot.rs
index 8448bed53..e3af258d1 100644
--- a/vendor/tokio/src/loom/std/parking_lot.rs
+++ b/vendor/tokio/src/loom/std/parking_lot.rs
@@ -3,83 +3,143 @@
//!
//! This can be extended to additional types/methods as required.
+use std::fmt;
+use std::marker::PhantomData;
+use std::ops::{Deref, DerefMut};
use std::sync::LockResult;
use std::time::Duration;
+// All types in this file are marked with PhantomData to ensure that
+// parking_lot's send_guard feature does not leak through and affect when Tokio
+// types are Send.
+//
+// See <https://github.com/tokio-rs/tokio/pull/4359> for more info.
+
// Types that do not need wrapping
-pub(crate) use parking_lot::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
+pub(crate) use parking_lot::WaitTimeoutResult;
+
+#[derive(Debug)]
+pub(crate) struct Mutex<T: ?Sized>(PhantomData<std::sync::Mutex<T>>, parking_lot::Mutex<T>);
+
+#[derive(Debug)]
+pub(crate) struct RwLock<T>(PhantomData<std::sync::RwLock<T>>, parking_lot::RwLock<T>);
+
+#[derive(Debug)]
+pub(crate) struct Condvar(PhantomData<std::sync::Condvar>, parking_lot::Condvar);
-/// Adapter for `parking_lot::Mutex` to the `std::sync::Mutex` interface.
#[derive(Debug)]
-pub(crate) struct Mutex<T: ?Sized>(parking_lot::Mutex<T>);
+pub(crate) struct MutexGuard<'a, T: ?Sized>(
+ PhantomData<std::sync::MutexGuard<'a, T>>,
+ parking_lot::MutexGuard<'a, T>,
+);
#[derive(Debug)]
-pub(crate) struct RwLock<T>(parking_lot::RwLock<T>);
+pub(crate) struct RwLockReadGuard<'a, T: ?Sized>(
+ PhantomData<std::sync::RwLockReadGuard<'a, T>>,
+ parking_lot::RwLockReadGuard<'a, T>,
+);
-/// Adapter for `parking_lot::Condvar` to the `std::sync::Condvar` interface.
#[derive(Debug)]
-pub(crate) struct Condvar(parking_lot::Condvar);
+pub(crate) struct RwLockWriteGuard<'a, T: ?Sized>(
+ PhantomData<std::sync::RwLockWriteGuard<'a, T>>,
+ parking_lot::RwLockWriteGuard<'a, T>,
+);
impl<T> Mutex<T> {
#[inline]
pub(crate) fn new(t: T) -> Mutex<T> {
- Mutex(parking_lot::Mutex::new(t))
+ Mutex(PhantomData, parking_lot::Mutex::new(t))
}
#[inline]
- #[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "parking_lot",))))]
pub(crate) const fn const_new(t: T) -> Mutex<T> {
- Mutex(parking_lot::const_mutex(t))
+ Mutex(PhantomData, parking_lot::const_mutex(t))
}
#[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
- self.0.lock()
+ MutexGuard(PhantomData, self.1.lock())
}
#[inline]
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
- self.0.try_lock()
+ self.1
+ .try_lock()
+ .map(|guard| MutexGuard(PhantomData, guard))
}
#[inline]
pub(crate) fn get_mut(&mut self) -> &mut T {
- self.0.get_mut()
+ self.1.get_mut()
}
// Note: Additional methods `is_poisoned` and `into_inner`, can be
// provided here as needed.
}
+impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ self.1.deref()
+ }
+}
+
+impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ self.1.deref_mut()
+ }
+}
+
impl<T> RwLock<T> {
pub(crate) fn new(t: T) -> RwLock<T> {
- RwLock(parking_lot::RwLock::new(t))
+ RwLock(PhantomData, parking_lot::RwLock::new(t))
}
pub(crate) fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
- Ok(self.0.read())
+ Ok(RwLockReadGuard(PhantomData, self.1.read()))
}
pub(crate) fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> {
- Ok(self.0.write())
+ Ok(RwLockWriteGuard(PhantomData, self.1.write()))
+ }
+}
+
+impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ self.1.deref()
+ }
+}
+
+impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ self.1.deref()
+ }
+}
+
+impl<'a, T: ?Sized> DerefMut for RwLockWriteGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ self.1.deref_mut()
}
}
impl Condvar {
#[inline]
pub(crate) fn new() -> Condvar {
- Condvar(parking_lot::Condvar::new())
+ Condvar(PhantomData, parking_lot::Condvar::new())
}
#[inline]
pub(crate) fn notify_one(&self) {
- self.0.notify_one();
+ self.1.notify_one();
}
#[inline]
pub(crate) fn notify_all(&self) {
- self.0.notify_all();
+ self.1.notify_all();
}
#[inline]
@@ -87,7 +147,7 @@ impl Condvar {
&self,
mut guard: MutexGuard<'a, T>,
) -> LockResult<MutexGuard<'a, T>> {
- self.0.wait(&mut guard);
+ self.1.wait(&mut guard.1);
Ok(guard)
}
@@ -97,10 +157,28 @@ impl Condvar {
mut guard: MutexGuard<'a, T>,
timeout: Duration,
) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
- let wtr = self.0.wait_for(&mut guard, timeout);
+ let wtr = self.1.wait_for(&mut guard.1, timeout);
Ok((guard, wtr))
}
// Note: Additional methods `wait_timeout_ms`, `wait_timeout_until`,
// `wait_until` can be provided here as needed.
}
+
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&self.1, f)
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&self.1, f)
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&self.1, f)
+ }
+}