diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/crossbeam-utils/src | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-utils/src')
-rw-r--r-- | third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs | 1121 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/atomic/consume.rs | 92 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/atomic/mod.rs | 37 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs | 112 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs | 155 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/backoff.rs | 296 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/cache_padded.rs | 191 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/lib.rs | 104 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/mod.rs | 17 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/once_lock.rs | 103 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/parker.rs | 413 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs | 634 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/wait_group.rs | 145 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/thread.rs | 587 |
14 files changed, 4007 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs b/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs new file mode 100644 index 0000000000..7941c5c87c --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs @@ -0,0 +1,1121 @@ +// Necessary for implementing atomic methods for `AtomicUnit` +#![allow(clippy::unit_arg)] + +use crate::primitive::sync::atomic::{self, AtomicBool}; +use core::cell::UnsafeCell; +use core::cmp; +use core::fmt; +use core::mem::{self, ManuallyDrop, MaybeUninit}; +use core::sync::atomic::Ordering; + +use core::ptr; + +#[cfg(feature = "std")] +use std::panic::{RefUnwindSafe, UnwindSafe}; + +use super::seq_lock::SeqLock; + +/// A thread-safe mutable memory location. +/// +/// This type is equivalent to [`Cell`], except it can also be shared among multiple threads. +/// +/// Operations on `AtomicCell`s use atomic instructions whenever possible, and synchronize using +/// global locks otherwise. You can call [`AtomicCell::<T>::is_lock_free()`] to check whether +/// atomic instructions or locks will be used. +/// +/// Atomic loads use the [`Acquire`] ordering and atomic stores use the [`Release`] ordering. +/// +/// [`Cell`]: std::cell::Cell +/// [`AtomicCell::<T>::is_lock_free()`]: AtomicCell::is_lock_free +/// [`Acquire`]: std::sync::atomic::Ordering::Acquire +/// [`Release`]: std::sync::atomic::Ordering::Release +#[repr(transparent)] +pub struct AtomicCell<T> { + /// The inner value. + /// + /// If this value can be transmuted into a primitive atomic type, it will be treated as such. + /// Otherwise, all potentially concurrent operations on this data will be protected by a global + /// lock. + /// + /// Using MaybeUninit to prevent code outside the cell from observing partially initialized state: + /// <https://github.com/crossbeam-rs/crossbeam/issues/833> + /// + /// Note: + /// - we'll never store uninitialized `T` due to our API only using initialized `T`. + /// - this `MaybeUninit` does *not* fix <https://github.com/crossbeam-rs/crossbeam/issues/315>. + value: UnsafeCell<MaybeUninit<T>>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +#[cfg(feature = "std")] +impl<T> UnwindSafe for AtomicCell<T> {} +#[cfg(feature = "std")] +impl<T> RefUnwindSafe for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + /// Creates a new atomic cell initialized with `val`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// ``` + pub const fn new(val: T) -> AtomicCell<T> { + AtomicCell { + value: UnsafeCell::new(MaybeUninit::new(val)), + } + } + + /// Consumes the atomic and returns the contained value. + /// + /// This is safe because passing `self` by value guarantees that no other threads are + /// concurrently accessing the atomic data. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// let v = a.into_inner(); + /// + /// assert_eq!(v, 7); + /// ``` + pub fn into_inner(self) -> T { + let this = ManuallyDrop::new(self); + // SAFETY: + // - passing `self` by value guarantees that no other threads are concurrently + // accessing the atomic data + // - the raw pointer passed in is valid because we got it from an owned value. + // - `ManuallyDrop` prevents double dropping `T` + unsafe { this.as_ptr().read() } + } + + /// Returns `true` if operations on values of this type are lock-free. + /// + /// If the compiler or the platform doesn't support the necessary atomic instructions, + /// `AtomicCell<T>` will use global locks for every potentially concurrent atomic operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// // This type is internally represented as `AtomicUsize` so we can just use atomic + /// // operations provided by it. + /// assert_eq!(AtomicCell::<usize>::is_lock_free(), true); + /// + /// // A wrapper struct around `isize`. + /// struct Foo { + /// bar: isize, + /// } + /// // `AtomicCell<Foo>` will be internally represented as `AtomicIsize`. + /// assert_eq!(AtomicCell::<Foo>::is_lock_free(), true); + /// + /// // Operations on zero-sized types are always lock-free. + /// assert_eq!(AtomicCell::<()>::is_lock_free(), true); + /// + /// // Very large types cannot be represented as any of the standard atomic types, so atomic + /// // operations on them will have to use global locks for synchronization. + /// assert_eq!(AtomicCell::<[u8; 1000]>::is_lock_free(), false); + /// ``` + pub const fn is_lock_free() -> bool { + atomic_is_lock_free::<T>() + } + + /// Stores `val` into the atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// a.store(8); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn store(&self, val: T) { + if mem::needs_drop::<T>() { + drop(self.swap(val)); + } else { + unsafe { + atomic_store(self.as_ptr(), val); + } + } + } + + /// Stores `val` into the atomic cell and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// assert_eq!(a.swap(8), 7); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn swap(&self, val: T) -> T { + unsafe { atomic_swap(self.as_ptr(), val) } + } + + /// Returns a raw pointer to the underlying data in this atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(5); + /// + /// let ptr = a.as_ptr(); + /// ``` + #[inline] + pub fn as_ptr(&self) -> *mut T { + self.value.get().cast::<T>() + } +} + +impl<T: Default> AtomicCell<T> { + /// Takes the value of the atomic cell, leaving `Default::default()` in its place. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(5); + /// let five = a.take(); + /// + /// assert_eq!(five, 5); + /// assert_eq!(a.into_inner(), 0); + /// ``` + pub fn take(&self) -> T { + self.swap(Default::default()) + } +} + +impl<T: Copy> AtomicCell<T> { + /// Loads a value from the atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// ``` + pub fn load(&self) -> T { + unsafe { atomic_load(self.as_ptr()) } + } +} + +impl<T: Copy + Eq> AtomicCell<T> { + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is always the previous value. If it is equal to `current`, then the value + /// was updated. + /// + /// # Examples + /// + /// ``` + /// # #![allow(deprecated)] + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_and_swap(2, 3), 1); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_and_swap(1, 2), 1); + /// assert_eq!(a.load(), 2); + /// ``` + // TODO: remove in the next major version. + #[deprecated(note = "Use `compare_exchange` instead")] + pub fn compare_and_swap(&self, current: T, new: T) -> T { + match self.compare_exchange(current, new) { + Ok(v) => v, + Err(v) => v, + } + } + + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is a result indicating whether the new value was written and containing + /// the previous value. On success this value is guaranteed to be equal to `current`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_exchange(2, 3), Err(1)); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_exchange(1, 2), Ok(1)); + /// assert_eq!(a.load(), 2); + /// ``` + pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> { + unsafe { atomic_compare_exchange_weak(self.as_ptr(), current, new) } + } + + /// Fetches the value, and applies a function to it that returns an optional + /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else + /// `Err(previous_value)`. + /// + /// Note: This may call the function multiple times if the value has been changed from other threads in + /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied + /// only once to the stored value. + /// + /// # Examples + /// + /// ```rust + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// assert_eq!(a.fetch_update(|_| None), Err(7)); + /// assert_eq!(a.fetch_update(|a| Some(a + 1)), Ok(7)); + /// assert_eq!(a.fetch_update(|a| Some(a + 1)), Ok(8)); + /// assert_eq!(a.load(), 9); + /// ``` + #[inline] + pub fn fetch_update<F>(&self, mut f: F) -> Result<T, T> + where + F: FnMut(T) -> Option<T>, + { + let mut prev = self.load(); + while let Some(next) = f(prev) { + match self.compare_exchange(prev, next) { + x @ Ok(_) => return x, + Err(next_prev) => prev = next_prev, + } + } + Err(prev) + } +} + +// `MaybeUninit` prevents `T` from being dropped, so we need to implement `Drop` +// for `AtomicCell` to avoid leaks of non-`Copy` types. +impl<T> Drop for AtomicCell<T> { + fn drop(&mut self) { + if mem::needs_drop::<T>() { + // SAFETY: + // - the mutable reference guarantees that no other threads are concurrently accessing the atomic data + // - the raw pointer passed in is valid because we got it from a reference + // - `MaybeUninit` prevents double dropping `T` + unsafe { + self.as_ptr().drop_in_place(); + } + } + } +} + +macro_rules! impl_arithmetic { + ($t:ty, fallback, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_add(val); + old + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_sub(val); + old + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value &= val; + old + } + + /// Applies bitwise "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_nand(3), 7); + /// assert_eq!(a.load(), !(7 & 3)); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = !(old & val); + old + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value |= val; + old + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value ^= val; + old + } + + /// Compares and sets the maximum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_max(2), 7); + /// assert_eq!(a.load(), 7); + /// ``` + #[inline] + pub fn fetch_max(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::max(old, val); + old + } + + /// Compares and sets the minimum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_min(2), 7); + /// assert_eq!(a.load(), 2); + /// ``` + #[inline] + pub fn fetch_min(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::min(old, val); + old + } + } + }; + ($t:ty, $atomic:ty, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_add(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_add(val); + old + } + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_sub(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_sub(val); + old + } + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_and(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value &= val; + old + } + } + + /// Applies bitwise "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_nand(3), 7); + /// assert_eq!(a.load(), !(7 & 3)); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_nand(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = !(old & val); + old + } + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_or(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value |= val; + old + } + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_xor(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value ^= val; + old + } + } + + /// Compares and sets the maximum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_max(9), 7); + /// assert_eq!(a.load(), 9); + /// ``` + #[inline] + pub fn fetch_max(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + // TODO: Atomic*::fetch_max requires Rust 1.45. + self.fetch_update(|old| Some(cmp::max(old, val))).unwrap() + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::max(old, val); + old + } + } + + /// Compares and sets the minimum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_min(2), 7); + /// assert_eq!(a.load(), 2); + /// ``` + #[inline] + pub fn fetch_min(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + // TODO: Atomic*::fetch_min requires Rust 1.45. + self.fetch_update(|old| Some(cmp::min(old, val))).unwrap() + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::min(old, val); + old + } + } + } + }; +} + +impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);"); +impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);"); +impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);"); +impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);"); +impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);"); +impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);"); +#[cfg(not(crossbeam_no_atomic_64))] +impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);"); +#[cfg(not(crossbeam_no_atomic_64))] +impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);"); +#[cfg(crossbeam_no_atomic_64)] +impl_arithmetic!(u64, fallback, "let a = AtomicCell::new(7u64);"); +#[cfg(crossbeam_no_atomic_64)] +impl_arithmetic!(i64, fallback, "let a = AtomicCell::new(7i64);"); +// TODO: AtomicU128 is unstable +// impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);"); +// impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);"); +impl_arithmetic!(u128, fallback, "let a = AtomicCell::new(7u128);"); +impl_arithmetic!(i128, fallback, "let a = AtomicCell::new(7i128);"); + +impl_arithmetic!( + usize, + atomic::AtomicUsize, + "let a = AtomicCell::new(7usize);" +); +impl_arithmetic!( + isize, + atomic::AtomicIsize, + "let a = AtomicCell::new(7isize);" +); + +impl AtomicCell<bool> { + /// Applies logical "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_and(true), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_and(false), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_and(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_and(val, Ordering::AcqRel) + } + + /// Applies logical "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_nand(false), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_nand(true), true); + /// assert_eq!(a.load(), false); + /// + /// assert_eq!(a.fetch_nand(false), false); + /// assert_eq!(a.load(), true); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_nand(val, Ordering::AcqRel) + } + + /// Applies logical "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(false); + /// + /// assert_eq!(a.fetch_or(false), false); + /// assert_eq!(a.load(), false); + /// + /// assert_eq!(a.fetch_or(true), false); + /// assert_eq!(a.load(), true); + /// ``` + #[inline] + pub fn fetch_or(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_or(val, Ordering::AcqRel) + } + + /// Applies logical "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_xor(false), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_xor(true), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_xor(val, Ordering::AcqRel) + } +} + +impl<T: Default> Default for AtomicCell<T> { + fn default() -> AtomicCell<T> { + AtomicCell::new(T::default()) + } +} + +impl<T> From<T> for AtomicCell<T> { + #[inline] + fn from(val: T) -> AtomicCell<T> { + AtomicCell::new(val) + } +} + +impl<T: Copy + fmt::Debug> fmt::Debug for AtomicCell<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AtomicCell") + .field("value", &self.load()) + .finish() + } +} + +/// Returns `true` if values of type `A` can be transmuted into values of type `B`. +const fn can_transmute<A, B>() -> bool { + // Sizes must be equal, but alignment of `A` must be greater or equal than that of `B`. + (mem::size_of::<A>() == mem::size_of::<B>()) & (mem::align_of::<A>() >= mem::align_of::<B>()) +} + +/// Returns a reference to the global lock associated with the `AtomicCell` at address `addr`. +/// +/// This function is used to protect atomic data which doesn't fit into any of the primitive atomic +/// types in `std::sync::atomic`. Operations on such atomics must therefore use a global lock. +/// +/// However, there is not only one global lock but an array of many locks, and one of them is +/// picked based on the given address. Having many locks reduces contention and improves +/// scalability. +#[inline] +#[must_use] +fn lock(addr: usize) -> &'static SeqLock { + // The number of locks is a prime number because we want to make sure `addr % LEN` gets + // dispersed across all locks. + // + // Note that addresses are always aligned to some power of 2, depending on type `T` in + // `AtomicCell<T>`. If `LEN` was an even number, then `addr % LEN` would be an even number, + // too, which means only half of the locks would get utilized! + // + // It is also possible for addresses to accidentally get aligned to a number that is not a + // power of 2. Consider this example: + // + // ``` + // #[repr(C)] + // struct Foo { + // a: AtomicCell<u8>, + // b: u8, + // c: u8, + // } + // ``` + // + // Now, if we have a slice of type `&[Foo]`, it is possible that field `a` in all items gets + // stored at addresses that are multiples of 3. It'd be too bad if `LEN` was divisible by 3. + // In order to protect from such cases, we simply choose a large prime number for `LEN`. + const LEN: usize = 97; + #[allow(clippy::declare_interior_mutable_const)] + const L: SeqLock = SeqLock::new(); + static LOCKS: [SeqLock; LEN] = [L; LEN]; + + // If the modulus is a constant number, the compiler will use crazy math to transform this into + // a sequence of cheap arithmetic operations rather than using the slow modulo instruction. + &LOCKS[addr % LEN] +} + +/// An atomic `()`. +/// +/// All operations are noops. +struct AtomicUnit; + +impl AtomicUnit { + #[inline] + fn load(&self, _order: Ordering) {} + + #[inline] + fn store(&self, _val: (), _order: Ordering) {} + + #[inline] + fn swap(&self, _val: (), _order: Ordering) {} + + #[inline] + fn compare_exchange_weak( + &self, + _current: (), + _new: (), + _success: Ordering, + _failure: Ordering, + ) -> Result<(), ()> { + Ok(()) + } +} + +macro_rules! atomic { + // If values of type `$t` can be transmuted into values of the primitive atomic type `$atomic`, + // declares variable `$a` of type `$atomic` and executes `$atomic_op`, breaking out of the loop. + (@check, $t:ty, $atomic:ty, $a:ident, $atomic_op:expr) => { + if can_transmute::<$t, $atomic>() { + let $a: &$atomic; + break $atomic_op; + } + }; + + // If values of type `$t` can be transmuted into values of a primitive atomic type, declares + // variable `$a` of that type and executes `$atomic_op`. Otherwise, just executes + // `$fallback_op`. + ($t:ty, $a:ident, $atomic_op:expr, $fallback_op:expr) => { + loop { + atomic!(@check, $t, AtomicUnit, $a, $atomic_op); + + atomic!(@check, $t, atomic::AtomicU8, $a, $atomic_op); + atomic!(@check, $t, atomic::AtomicU16, $a, $atomic_op); + atomic!(@check, $t, atomic::AtomicU32, $a, $atomic_op); + #[cfg(not(crossbeam_no_atomic_64))] + atomic!(@check, $t, atomic::AtomicU64, $a, $atomic_op); + // TODO: AtomicU128 is unstable + // atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op); + + break $fallback_op; + } + }; +} + +/// Returns `true` if operations on `AtomicCell<T>` are lock-free. +const fn atomic_is_lock_free<T>() -> bool { + // HACK(taiki-e): This is equivalent to `atomic! { T, _a, true, false }`, but can be used in const fn even in our MSRV (Rust 1.38). + let is_lock_free = can_transmute::<T, AtomicUnit>() + | can_transmute::<T, atomic::AtomicU8>() + | can_transmute::<T, atomic::AtomicU16>() + | can_transmute::<T, atomic::AtomicU32>(); + #[cfg(not(crossbeam_no_atomic_64))] + let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU64>(); + // TODO: AtomicU128 is unstable + // let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU128>(); + is_lock_free +} + +/// Atomically reads data from `src`. +/// +/// This operation uses the `Acquire` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_load<T>(src: *mut T) -> T +where + T: Copy, +{ + atomic! { + T, a, + { + a = &*(src as *const _ as *const _); + mem::transmute_copy(&a.load(Ordering::Acquire)) + }, + { + let lock = lock(src as usize); + + // Try doing an optimistic read first. + if let Some(stamp) = lock.optimistic_read() { + // We need a volatile read here because other threads might concurrently modify the + // value. In theory, data races are *always* UB, even if we use volatile reads and + // discard the data when a data race is detected. The proper solution would be to + // do atomic reads and atomic writes, but we can't atomically read and write all + // kinds of data since `AtomicU8` is not available on stable Rust yet. + // Load as `MaybeUninit` because we may load a value that is not valid as `T`. + let val = ptr::read_volatile(src.cast::<MaybeUninit<T>>()); + + if lock.validate_read(stamp) { + return val.assume_init(); + } + } + + // Grab a regular write lock so that writers don't starve this load. + let guard = lock.write(); + let val = ptr::read(src); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + val + } + } +} + +/// Atomically writes `val` to `dst`. +/// +/// This operation uses the `Release` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_store<T>(dst: *mut T, val: T) { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + a.store(mem::transmute_copy(&val), Ordering::Release); + mem::forget(val); + }, + { + let _guard = lock(dst as usize).write(); + ptr::write(dst, val); + } + } +} + +/// Atomically swaps data at `dst` with `val`. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_swap<T>(dst: *mut T, val: T) -> T { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let res = mem::transmute_copy(&a.swap(mem::transmute_copy(&val), Ordering::AcqRel)); + mem::forget(val); + res + }, + { + let _guard = lock(dst as usize).write(); + ptr::replace(dst, val) + } + } +} + +/// Atomically compares data at `dst` to `current` and, if equal byte-for-byte, exchanges data at +/// `dst` with `new`. +/// +/// Returns the old value on success, or the current value at `dst` on failure. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +#[allow(clippy::let_unit_value)] +unsafe fn atomic_compare_exchange_weak<T>(dst: *mut T, mut current: T, new: T) -> Result<T, T> +where + T: Copy + Eq, +{ + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let mut current_raw = mem::transmute_copy(¤t); + let new_raw = mem::transmute_copy(&new); + + loop { + match a.compare_exchange_weak( + current_raw, + new_raw, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break Ok(current), + Err(previous_raw) => { + let previous = mem::transmute_copy(&previous_raw); + + if !T::eq(&previous, ¤t) { + break Err(previous); + } + + // The compare-exchange operation has failed and didn't store `new`. The + // failure is either spurious, or `previous` was semantically equal to + // `current` but not byte-equal. Let's retry with `previous` as the new + // `current`. + current = previous; + current_raw = previous_raw; + } + } + } + }, + { + let guard = lock(dst as usize).write(); + + if T::eq(&*dst, ¤t) { + Ok(ptr::replace(dst, new)) + } else { + let val = ptr::read(dst); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + Err(val) + } + } + } +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/consume.rs b/third_party/rust/crossbeam-utils/src/atomic/consume.rs new file mode 100644 index 0000000000..277b370a55 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/consume.rs @@ -0,0 +1,92 @@ +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +use crate::primitive::sync::atomic::compiler_fence; +#[cfg(not(crossbeam_no_atomic))] +use core::sync::atomic::Ordering; + +/// Trait which allows reading from primitive atomic types with "consume" ordering. +pub trait AtomicConsume { + /// Type returned by `load_consume`. + type Val; + + /// Loads a value from the atomic using a "consume" memory ordering. + /// + /// This is similar to the "acquire" ordering, except that an ordering is + /// only guaranteed with operations that "depend on" the result of the load. + /// However consume loads are usually much faster than acquire loads on + /// architectures with a weak memory model since they don't require memory + /// fence instructions. + /// + /// The exact definition of "depend on" is a bit vague, but it works as you + /// would expect in practice since a lot of software, especially the Linux + /// kernel, rely on this behavior. + /// + /// This is currently only implemented on ARM and AArch64, where a fence + /// can be avoided. On other architectures this will fall back to a simple + /// `load(Ordering::Acquire)`. + fn load_consume(&self) -> Self::Val; +} + +#[cfg(not(crossbeam_no_atomic))] +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + let result = self.load(Ordering::Relaxed); + compiler_fence(Ordering::Acquire); + result + } + }; +} + +#[cfg(not(crossbeam_no_atomic))] +#[cfg(not(any(target_arch = "arm", target_arch = "aarch64")))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + self.load(Ordering::Acquire) + } + }; +} + +macro_rules! impl_atomic { + ($atomic:ident, $val:ty) => { + #[cfg(not(crossbeam_no_atomic))] + impl AtomicConsume for core::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } + #[cfg(crossbeam_loom)] + impl AtomicConsume for loom::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } + }; +} + +impl_atomic!(AtomicBool, bool); +impl_atomic!(AtomicUsize, usize); +impl_atomic!(AtomicIsize, isize); +impl_atomic!(AtomicU8, u8); +impl_atomic!(AtomicI8, i8); +impl_atomic!(AtomicU16, u16); +impl_atomic!(AtomicI16, i16); +impl_atomic!(AtomicU32, u32); +impl_atomic!(AtomicI32, i32); +#[cfg(not(crossbeam_no_atomic_64))] +impl_atomic!(AtomicU64, u64); +#[cfg(not(crossbeam_no_atomic_64))] +impl_atomic!(AtomicI64, i64); + +#[cfg(not(crossbeam_no_atomic))] +impl<T> AtomicConsume for core::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} + +#[cfg(crossbeam_loom)] +impl<T> AtomicConsume for loom::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/mod.rs b/third_party/rust/crossbeam-utils/src/atomic/mod.rs new file mode 100644 index 0000000000..38967859f9 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/mod.rs @@ -0,0 +1,37 @@ +//! Atomic types. +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. + +#[cfg(not(crossbeam_no_atomic_cas))] +#[cfg(not(crossbeam_loom))] +cfg_if::cfg_if! { + // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap + // around. + // + // We are ignoring too wide architectures (pointer width >= 256), since such a system will not + // appear in a conceivable future. + // + // In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be + // vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the + // counter will not be increased that fast. + if #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] { + mod seq_lock; + } else { + #[path = "seq_lock_wide.rs"] + mod seq_lock; + } +} + +#[cfg(not(crossbeam_no_atomic_cas))] +// We cannot provide AtomicCell under cfg(crossbeam_loom) because loom's atomic +// types have a different in-memory representation than the underlying type. +// TODO: The latest loom supports fences, so fallback using seqlock may be available. +#[cfg(not(crossbeam_loom))] +mod atomic_cell; +mod consume; + +#[cfg(not(crossbeam_no_atomic_cas))] +#[cfg(not(crossbeam_loom))] +pub use self::atomic_cell::AtomicCell; +pub use self::consume::AtomicConsume; diff --git a/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs b/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs new file mode 100644 index 0000000000..ff8defd26d --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs @@ -0,0 +1,112 @@ +use core::mem; +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use crate::Backoff; + +/// A simple stamped lock. +pub(crate) struct SeqLock { + /// The current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state + /// equals 1 and doesn't contain a valid stamp. + state: AtomicUsize, +} + +impl SeqLock { + pub(crate) const fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub(crate) fn optimistic_read(&self) -> Option<usize> { + let state = self.state.load(Ordering::Acquire); + if state == 1 { + None + } else { + Some(state) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub(crate) fn validate_read(&self, stamp: usize) -> bool { + atomic::fence(Ordering::Acquire); + self.state.load(Ordering::Relaxed) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state.swap(1, Ordering::Acquire); + + if previous != 1 { + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub(crate) struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub(crate) fn abort(self) { + self.lock.state.store(self.state, Ordering::Release); + + // We specifically don't want to call drop(), since that's + // what increments the stamp. + mem::forget(self); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + // Release the lock and increment the stamp. + self.lock + .state + .store(self.state.wrapping_add(2), Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::SeqLock; + + #[test] + fn test_abort() { + static LK: SeqLock = SeqLock::new(); + let before = LK.optimistic_read().unwrap(); + { + let guard = LK.write(); + guard.abort(); + } + let after = LK.optimistic_read().unwrap(); + assert_eq!(before, after, "aborted write does not update the stamp"); + } +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs b/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs new file mode 100644 index 0000000000..ef5d94a454 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs @@ -0,0 +1,155 @@ +use core::mem; +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use crate::Backoff; + +/// A simple stamped lock. +/// +/// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low +/// bits. +pub(crate) struct SeqLock { + /// The high bits of the current state of the lock. + state_hi: AtomicUsize, + + /// The low bits of the current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state_lo + /// equals 1 and doesn't contain a valid stamp. + state_lo: AtomicUsize, +} + +impl SeqLock { + pub(crate) const fn new() -> Self { + Self { + state_hi: AtomicUsize::new(0), + state_lo: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub(crate) fn optimistic_read(&self) -> Option<(usize, usize)> { + // The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in + // `SeqLockWriteGuard::drop`. + // + // As a consequence, we can make sure that (1) all writes within the era of `state_hi - 1` + // happens before now; and therefore, (2) if `state_lo` is even, all writes within the + // critical section of (`state_hi`, `state_lo`) happens before now. + let state_hi = self.state_hi.load(Ordering::Acquire); + let state_lo = self.state_lo.load(Ordering::Acquire); + if state_lo == 1 { + None + } else { + Some((state_hi, state_lo)) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub(crate) fn validate_read(&self, stamp: (usize, usize)) -> bool { + // Thanks to the fence, if we're noticing any modification to the data at the critical + // section of `(a, b)`, then the critical section's write of 1 to state_lo should be + // visible. + atomic::fence(Ordering::Acquire); + + // So if `state_lo` coincides with `stamp.1`, then either (1) we're noticing no modification + // to the data after the critical section of `(stamp.0, stamp.1)`, or (2) `state_lo` wrapped + // around. + // + // If (2) is the case, the acquire ordering ensures we see the new value of `state_hi`. + let state_lo = self.state_lo.load(Ordering::Acquire); + + // If (2) is the case and `state_hi` coincides with `stamp.0`, then `state_hi` also wrapped + // around, which we give up to correctly validate the read. + let state_hi = self.state_hi.load(Ordering::Relaxed); + + // Except for the case that both `state_hi` and `state_lo` wrapped around, the following + // condition implies that we're noticing no modification to the data after the critical + // section of `(stamp.0, stamp.1)`. + (state_hi, state_lo) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state_lo.swap(1, Ordering::Acquire); + + if previous != 1 { + // To synchronize with the acquire fence in `validate_read` via any modification to + // the data at the critical section of `(state_hi, previous)`. + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state_lo: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub(crate) struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state_lo: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub(crate) fn abort(self) { + self.lock.state_lo.store(self.state_lo, Ordering::Release); + mem::forget(self); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + let state_lo = self.state_lo.wrapping_add(2); + + // Increase the high bits if the low bits wrap around. + // + // Release ordering for synchronizing with `optimistic_read`. + if state_lo == 0 { + let state_hi = self.lock.state_hi.load(Ordering::Relaxed); + self.lock + .state_hi + .store(state_hi.wrapping_add(1), Ordering::Release); + } + + // Release the lock and increment the stamp. + // + // Release ordering for synchronizing with `optimistic_read`. + self.lock.state_lo.store(state_lo, Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::SeqLock; + + #[test] + fn test_abort() { + static LK: SeqLock = SeqLock::new(); + let before = LK.optimistic_read().unwrap(); + { + let guard = LK.write(); + guard.abort(); + } + let after = LK.optimistic_read().unwrap(); + assert_eq!(before, after, "aborted write does not update the stamp"); + } +} diff --git a/third_party/rust/crossbeam-utils/src/backoff.rs b/third_party/rust/crossbeam-utils/src/backoff.rs new file mode 100644 index 0000000000..9e256aaf2e --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/backoff.rs @@ -0,0 +1,296 @@ +use crate::primitive::sync::atomic; +use core::cell::Cell; +use core::fmt; + +const SPIN_LIMIT: u32 = 6; +const YIELD_LIMIT: u32 = 10; + +/// Performs exponential backoff in spin loops. +/// +/// Backing off in spin loops reduces contention and improves overall performance. +/// +/// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS +/// scheduler, and tell when is a good time to block the thread using a different synchronization +/// mechanism. Each step of the back off procedure takes roughly twice as long as the previous +/// step. +/// +/// # Examples +/// +/// Backing off in a lock-free loop: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicUsize; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { +/// let backoff = Backoff::new(); +/// loop { +/// let val = a.load(SeqCst); +/// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { +/// return val; +/// } +/// backoff.spin(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true`: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn spin_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// backoff.snooze(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait. +/// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling +/// [`unpark()`]: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// use std::thread; +/// +/// fn blocking_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// if backoff.is_completed() { +/// thread::park(); +/// } else { +/// backoff.snooze(); +/// } +/// } +/// } +/// ``` +/// +/// [`is_completed`]: Backoff::is_completed +/// [`std::thread::park()`]: std::thread::park +/// [`Condvar`]: std::sync::Condvar +/// [`AtomicBool`]: std::sync::atomic::AtomicBool +/// [`unpark()`]: std::thread::Thread::unpark +pub struct Backoff { + step: Cell<u32>, +} + +impl Backoff { + /// Creates a new `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// ``` + #[inline] + pub fn new() -> Self { + Backoff { step: Cell::new(0) } + } + + /// Resets the `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// backoff.reset(); + /// ``` + #[inline] + pub fn reset(&self) { + self.step.set(0); + } + + /// Backs off in a lock-free loop. + /// + /// This method should be used when we need to retry an operation because another thread made + /// progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction. + /// + /// # Examples + /// + /// Backing off in a lock-free loop: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::atomic::AtomicUsize; + /// use std::sync::atomic::Ordering::SeqCst; + /// + /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { + /// let backoff = Backoff::new(); + /// loop { + /// let val = a.load(SeqCst); + /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { + /// return val; + /// } + /// backoff.spin(); + /// } + /// } + /// + /// let a = AtomicUsize::new(7); + /// assert_eq!(fetch_mul(&a, 8), 7); + /// assert_eq!(a.load(SeqCst), 56); + /// ``` + #[inline] + pub fn spin(&self) { + for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + if self.step.get() <= SPIN_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Backs off in a blocking loop. + /// + /// This method should be used when we need to wait for another thread to make progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread + /// may yield by giving up a timeslice to the OS scheduler. + /// + /// In `#[no_std]` environments, this method is equivalent to [`spin`]. + /// + /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and + /// block the current thread using a different synchronization mechanism instead. + /// + /// [`spin`]: Backoff::spin + /// [`is_completed`]: Backoff::is_completed + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true`: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn spin_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// backoff.snooze(); + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// spin_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`AtomicBool`]: std::sync::atomic::AtomicBool + #[inline] + pub fn snooze(&self) { + if self.step.get() <= SPIN_LIMIT { + for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + } else { + #[cfg(not(feature = "std"))] + for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + ::std::thread::yield_now(); + } + + if self.step.get() <= YIELD_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Returns `true` if exponential backoff has completed and blocking the thread is advised. + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn blocking_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// if backoff.is_completed() { + /// thread::park(); + /// } else { + /// backoff.snooze(); + /// } + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// let waiter = thread::current(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// waiter.unpark(); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// blocking_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`AtomicBool`]: std::sync::atomic::AtomicBool + #[inline] + pub fn is_completed(&self) -> bool { + self.step.get() > YIELD_LIMIT + } +} + +impl fmt::Debug for Backoff { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Backoff") + .field("step", &self.step) + .field("is_completed", &self.is_completed()) + .finish() + } +} + +impl Default for Backoff { + fn default() -> Backoff { + Backoff::new() + } +} diff --git a/third_party/rust/crossbeam-utils/src/cache_padded.rs b/third_party/rust/crossbeam-utils/src/cache_padded.rs new file mode 100644 index 0000000000..b5d5d33c96 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/cache_padded.rs @@ -0,0 +1,191 @@ +use core::fmt; +use core::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +/// +/// In concurrent programming, sometimes it is desirable to make sure commonly accessed pieces of +/// data are not placed into the same cache line. Updating an atomic value invalidates the whole +/// cache line it belongs to, which makes the next access to the same cache line slower for other +/// CPU cores. Use `CachePadded` to ensure updating one piece of data doesn't invalidate other +/// cached data. +/// +/// # Size and alignment +/// +/// Cache lines are assumed to be N bytes long, depending on the architecture: +/// +/// * On x86-64, aarch64, and powerpc64, N = 128. +/// * On arm, mips, mips64, and riscv64, N = 32. +/// * On s390x, N = 256. +/// * On all others, N = 64. +/// +/// Note that N is just a reasonable guess and is not guaranteed to match the actual cache line +/// length of the machine the program is running on. On modern Intel architectures, spatial +/// prefetcher is pulling pairs of 64-byte cache lines at a time, so we pessimistically assume that +/// cache lines are 128 bytes long. +/// +/// The size of `CachePadded<T>` is the smallest multiple of N bytes large enough to accommodate +/// a value of type `T`. +/// +/// The alignment of `CachePadded<T>` is the maximum of N bytes and the alignment of `T`. +/// +/// # Examples +/// +/// Alignment and padding: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// +/// let array = [CachePadded::new(1i8), CachePadded::new(2i8)]; +/// let addr1 = &*array[0] as *const i8 as usize; +/// let addr2 = &*array[1] as *const i8 as usize; +/// +/// assert!(addr2 - addr1 >= 32); +/// assert_eq!(addr1 % 32, 0); +/// assert_eq!(addr2 % 32, 0); +/// ``` +/// +/// When building a concurrent queue with a head and a tail index, it is wise to place them in +/// different cache lines so that concurrent threads pushing and popping elements don't invalidate +/// each other's cache lines: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// use std::sync::atomic::AtomicUsize; +/// +/// struct Queue<T> { +/// head: CachePadded<AtomicUsize>, +/// tail: CachePadded<AtomicUsize>, +/// buffer: *mut T, +/// } +/// ``` +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub struct CachePadded<T> { + value: T, +} + +unsafe impl<T: Send> Send for CachePadded<T> {} +unsafe impl<T: Sync> Sync for CachePadded<T> {} + +impl<T> CachePadded<T> { + /// Pads and aligns a value to the length of a cache line. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(1); + /// ``` + pub const fn new(t: T) -> CachePadded<T> { + CachePadded::<T> { value: t } + } + + /// Returns the inner value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(7); + /// let value = padded_value.into_inner(); + /// assert_eq!(value, 7); + /// ``` + pub fn into_inner(self) -> T { + self.value + } +} + +impl<T> Deref for CachePadded<T> { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl<T> DerefMut for CachePadded<T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<T: fmt::Debug> fmt::Debug for CachePadded<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachePadded") + .field("value", &self.value) + .finish() + } +} + +impl<T> From<T> for CachePadded<T> { + fn from(t: T) -> Self { + CachePadded::new(t) + } +} diff --git a/third_party/rust/crossbeam-utils/src/lib.rs b/third_party/rust/crossbeam-utils/src/lib.rs new file mode 100644 index 0000000000..191c5a17d1 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/lib.rs @@ -0,0 +1,104 @@ +//! Miscellaneous tools for concurrent programming. +//! +//! ## Atomics +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. +//! +//! ## Thread synchronization +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. +//! +//! ## Utilities +//! +//! * [`Backoff`], for exponential backoff in spin loops. +//! * [`CachePadded`], for padding and aligning a value to the length of a cache line. +//! * [`scope`], for spawning threads that borrow local variables from the stack. +//! +//! [`AtomicCell`]: atomic::AtomicCell +//! [`AtomicConsume`]: atomic::AtomicConsume +//! [`Parker`]: sync::Parker +//! [`ShardedLock`]: sync::ShardedLock +//! [`WaitGroup`]: sync::WaitGroup +//! [`scope`]: thread::scope + +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms), + allow(dead_code, unused_assignments, unused_variables) + ) +))] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(crossbeam_loom)] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::spin_loop_hint; + pub(crate) use loom::sync::atomic::{ + AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicU16, + AtomicU32, AtomicU64, AtomicU8, AtomicUsize, + }; + + // FIXME: loom does not support compiler_fence at the moment. + // https://github.com/tokio-rs/loom/issues/117 + // we use fence as a stand-in for compiler_fence for the time being. + // this may miss some races since fence is stronger than compiler_fence, + // but it's the best we can do for the time being. + pub(crate) use loom::sync::atomic::fence as compiler_fence; + } + pub(crate) use loom::sync::{Arc, Condvar, Mutex}; + } +} +#[cfg(not(crossbeam_loom))] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + pub(crate) use core::sync::atomic::spin_loop_hint; + #[cfg(not(crossbeam_no_atomic))] + pub(crate) use core::sync::atomic::{ + AtomicBool, AtomicI16, AtomicI32, AtomicI8, AtomicIsize, AtomicU16, AtomicU32, + AtomicU8, AtomicUsize, + }; + #[cfg(not(crossbeam_no_atomic_64))] + pub(crate) use core::sync::atomic::{AtomicI64, AtomicU64}; + } + + #[cfg(feature = "std")] + pub(crate) use std::sync::{Arc, Condvar, Mutex}; + } +} + +pub mod atomic; + +mod cache_padded; +pub use crate::cache_padded::CachePadded; + +mod backoff; +pub use crate::backoff::Backoff; + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(feature = "std")] { + pub mod sync; + + #[cfg(not(crossbeam_loom))] + pub mod thread; + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/mod.rs b/third_party/rust/crossbeam-utils/src/sync/mod.rs new file mode 100644 index 0000000000..f9eec71fb3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/mod.rs @@ -0,0 +1,17 @@ +//! Thread synchronization primitives. +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. + +#[cfg(not(crossbeam_loom))] +mod once_lock; +mod parker; +#[cfg(not(crossbeam_loom))] +mod sharded_lock; +mod wait_group; + +pub use self::parker::{Parker, Unparker}; +#[cfg(not(crossbeam_loom))] +pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; +pub use self::wait_group::WaitGroup; diff --git a/third_party/rust/crossbeam-utils/src/sync/once_lock.rs b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs new file mode 100644 index 0000000000..c1fefc96cc --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs @@ -0,0 +1,103 @@ +// Based on unstable std::sync::OnceLock. +// +// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs + +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; + +pub(crate) struct OnceLock<T> { + once: Once, + // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized. + is_initialized: AtomicBool, + value: UnsafeCell<MaybeUninit<T>>, + // Unlike std::sync::OnceLock, we don't need PhantomData here because + // we don't use #[may_dangle]. +} + +unsafe impl<T: Sync + Send> Sync for OnceLock<T> {} +unsafe impl<T: Send> Send for OnceLock<T> {} + +impl<T> OnceLock<T> { + /// Creates a new empty cell. + #[must_use] + pub(crate) const fn new() -> Self { + Self { + once: Once::new(), + is_initialized: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Gets the contents of the cell, initializing it with `f` if the cell + /// was empty. + /// + /// Many threads may call `get_or_init` concurrently with different + /// initializing functions, but it is guaranteed that only one function + /// will be executed. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and the cell + /// remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. The + /// exact outcome is unspecified. Current implementation deadlocks, but + /// this may be changed to a panic in the future. + pub(crate) fn get_or_init<F>(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Fast path check + if self.is_initialized() { + // SAFETY: The inner value has been initialized + return unsafe { self.get_unchecked() }; + } + self.initialize(f); + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + unsafe { self.get_unchecked() } + } + + #[inline] + fn is_initialized(&self) -> bool { + self.is_initialized.load(Ordering::Acquire) + } + + #[cold] + fn initialize<F>(&self, f: F) + where + F: FnOnce() -> T, + { + let slot = self.value.get().cast::<T>(); + let is_initialized = &self.is_initialized; + + self.once.call_once(|| { + let value = f(); + unsafe { + slot.write(value); + } + is_initialized.store(true, Ordering::Release); + }); + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + &*self.value.get().cast::<T>() + } +} + +impl<T> Drop for OnceLock<T> { + fn drop(&mut self) { + if self.is_initialized() { + // SAFETY: The inner value has been initialized + unsafe { self.value.get().cast::<T>().drop_in_place() }; + } + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs new file mode 100644 index 0000000000..e791c44852 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs @@ -0,0 +1,413 @@ +use crate::primitive::sync::atomic::AtomicUsize; +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use core::sync::atomic::Ordering::SeqCst; +use std::fmt; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +/// A thread parking primitive. +/// +/// Conceptually, each `Parker` has an associated token which is initially not present: +/// +/// * The [`park`] method blocks the current thread unless or until the token is available, at +/// which point it automatically consumes the token. +/// +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. +/// +/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the +/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call +/// returning immediately. +/// +/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using +/// [`park`] and [`unpark`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_utils::sync::Parker; +/// +/// let p = Parker::new(); +/// let u = p.unparker().clone(); +/// +/// // Make the token available. +/// u.unpark(); +/// // Wakes up immediately and consumes the token. +/// p.park(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// u.unpark(); +/// }); +/// +/// // Wakes up when `u.unpark()` provides the token. +/// p.park(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`park`]: Parker::park +/// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline +/// [`unpark`]: Unparker::unpark +pub struct Parker { + unparker: Unparker, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for Parker {} + +impl Default for Parker { + fn default() -> Self { + Self { + unparker: Unparker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + }, + _marker: PhantomData, + } + } +} + +impl Parker { + /// Creates a new `Parker`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// ``` + /// + pub fn new() -> Parker { + Self::default() + } + + /// Blocks the current thread until the token is made available. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + pub fn park(&self) { + self.unparker.inner.park(None); + } + + /// Blocks the current thread until the token is made available, but only for a limited time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_timeout(Duration::from_millis(500)); + /// ``` + pub fn park_timeout(&self, timeout: Duration) { + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) + } + + /// Returns a reference to an associated [`Unparker`]. + /// + /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unparker(&self) -> &Unparker { + &self.unparker + } + + /// Converts a `Parker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// # let _ = unsafe { Parker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Parker) -> *const () { + Unparker::into_raw(this.unparker) + } + + /// Converts a raw pointer into a `Parker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// let p = unsafe { Parker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Parker { + Parker { + unparker: Unparker::from_raw(ptr), + _marker: PhantomData, + } + } +} + +impl fmt::Debug for Parker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Parker { .. }") + } +} + +/// Unparks a thread parked by the associated [`Parker`]. +pub struct Unparker { + inner: Arc<Inner>, +} + +unsafe impl Send for Unparker {} +unsafe impl Sync for Unparker {} + +impl Unparker { + /// Atomically makes the token available if it is not already. + /// + /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is + /// any. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// u.unpark(); + /// }); + /// + /// // Wakes up when `u.unpark()` provides the token. + /// p.park(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unpark(&self) { + self.inner.unpark() + } + + /// Converts an `Unparker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// let raw = Unparker::into_raw(u); + /// # let _ = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Unparker) -> *const () { + Arc::into_raw(this.inner).cast::<()>() + } + + /// Converts a raw pointer into an `Unparker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// let raw = Unparker::into_raw(u); + /// let u = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Unparker { + Unparker { + inner: Arc::from_raw(ptr.cast::<Inner>()), + } + } +} + +impl fmt::Debug for Unparker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Unparker { .. }") + } +} + +impl Clone for Unparker { + fn clone(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Inner { + fn park(&self, deadline: Option<Instant>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(deadline) = deadline { + if deadline <= Instant::now() { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; + } + } + }; + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past + } + } + + pub(crate) fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before this call, we must + // perform a release operation that `park` can synchronize with. To do that we must write + // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather + // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to `PARKED` (or last + // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. + // If we were to notify during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this + // stage so we can acquire `lock` to wait until it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes + // it doesn't get woken only to have to wait for us to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one(); + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs new file mode 100644 index 0000000000..b43c55ea42 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs @@ -0,0 +1,634 @@ +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; +use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::thread::{self, ThreadId}; + +use crate::sync::once_lock::OnceLock; +use crate::CachePadded; + +/// The number of shards per sharded lock. Must be a power of two. +const NUM_SHARDS: usize = 8; + +/// A shard containing a single reader-writer lock. +struct Shard { + /// The inner reader-writer lock. + lock: RwLock<()>, + + /// The write-guard keeping this shard locked. + /// + /// Write operations will lock each shard and store the guard here. These guards get dropped at + /// the same time the big guard is dropped. + write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>, +} + +/// A sharded reader-writer lock. +/// +/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations +/// are slower. +/// +/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a +/// single cache line. Read operations will pick one of the shards depending on the current thread +/// and lock it. Write operations need to lock all shards in succession. +/// +/// By splitting the lock into shards, concurrent read operations will in most cases choose +/// different shards and thus update different cache lines, which is good for scalability. However, +/// write operations need to do more work and are therefore slower than usual. +/// +/// The priority policy of the lock is dependent on the underlying operating system's +/// implementation, and this type does not guarantee that any particular policy will be used. +/// +/// # Poisoning +/// +/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be +/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any +/// read operation, the lock will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::ShardedLock; +/// +/// let lock = ShardedLock::new(5); +/// +/// // Any number of read locks can be held at once. +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // Read locks are dropped at this point. +/// +/// // However, only one write lock may be held. +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // Write lock is dropped here. +/// ``` +/// +/// [`RwLock`]: std::sync::RwLock +pub struct ShardedLock<T: ?Sized> { + /// A list of locks protecting the internal data. + shards: Box<[CachePadded<Shard>]>, + + /// The internal data. + value: UnsafeCell<T>, +} + +unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {} +unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {} + +impl<T: ?Sized> UnwindSafe for ShardedLock<T> {} +impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {} + +impl<T> ShardedLock<T> { + /// Creates a new sharded reader-writer lock. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(5); + /// ``` + pub fn new(value: T) -> ShardedLock<T> { + ShardedLock { + shards: (0..NUM_SHARDS) + .map(|_| { + CachePadded::new(Shard { + lock: RwLock::new(()), + write_guard: UnsafeCell::new(None), + }) + }) + .collect::<Box<[_]>>(), + value: UnsafeCell::new(value), + } + } + + /// Consumes this lock, returning the underlying data. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + pub fn into_inner(self) -> LockResult<T> { + let is_poisoned = self.is_poisoned(); + let inner = self.value.into_inner(); + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } +} + +impl<T: ?Sized> ShardedLock<T> { + /// Returns `true` if the lock is poisoned. + /// + /// If another thread can still access the lock, it may become poisoned at any time. A `false` + /// result should not be trusted without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + pub fn is_poisoned(&self) -> bool { + self.shards[0].lock.is_poisoned() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the lock mutably, no actual locking needs to take place. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let mut lock = ShardedLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let is_poisoned = self.is_poisoned(); + let inner = unsafe { &mut *self.value.get() }; + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } + + /// Attempts to acquire this lock with shared read access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the shared access when it is dropped. This method does not + /// provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.try_read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(TryLockError::Poisoned(err)) => { + let guard = ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } + Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock), + } + } + + /// Locks with shared read access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the shared access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(err) => Err(PoisonError::new(ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + })), + } + } + + /// Attempts to acquire this lock with exclusive write access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the exclusive access when it is dropped. This method does + /// not provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + let mut blocked = None; + + // Write-lock each shard in succession. + for (i, shard) in self.shards.iter().enumerate() { + let guard = match shard.lock.try_write() { + Ok(guard) => guard, + Err(TryLockError::Poisoned(err)) => { + poisoned = true; + err.into_inner() + } + Err(TryLockError::WouldBlock) => { + blocked = Some(i); + break; + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if let Some(i) = blocked { + // Unlock the shards in reverse order of locking. + for shard in self.shards[0..i].iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + Err(TryLockError::WouldBlock) + } else if poisoned { + let guard = ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } + + /// Locks with exclusive write access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the exclusive access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + + // Write-lock each shard in succession. + for shard in self.shards.iter() { + let guard = match shard.lock.write() { + Ok(guard) => guard, + Err(err) => { + poisoned = true; + err.into_inner() + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'_, ()> = guard; + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if poisoned { + Err(PoisonError::new(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + })) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_read() { + Ok(guard) => f + .debug_struct("ShardedLock") + .field("data", &&*guard) + .finish(), + Err(TryLockError::Poisoned(err)) => f + .debug_struct("ShardedLock") + .field("data", &&**err.get_ref()) + .finish(), + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<locked>") + } + } + f.debug_struct("ShardedLock") + .field("data", &LockedPlaceholder) + .finish() + } + } + } +} + +impl<T: Default> Default for ShardedLock<T> { + fn default() -> ShardedLock<T> { + ShardedLock::new(Default::default()) + } +} + +impl<T> From<T> for ShardedLock<T> { + fn from(t: T) -> Self { + ShardedLock::new(t) + } +} + +/// A guard used to release the shared read access of a [`ShardedLock`] when dropped. +pub struct ShardedLockReadGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _guard: RwLockReadGuard<'a, ()>, + _marker: PhantomData<RwLockReadGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {} + +impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockReadGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped. +pub struct ShardedLockWriteGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _marker: PhantomData<RwLockWriteGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {} + +impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.lock.shards.iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockWriteGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. While there are no particular guarantees, indices +/// usually tend to be consecutive numbers between 0 and the number of running threads. +/// +/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is +/// tearing down. +#[inline] +fn current_index() -> Option<usize> { + REGISTRATION.try_with(|reg| reg.index).ok() +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap<ThreadId, usize>, + + /// A list of free indices. + free_list: Vec<usize>, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +fn thread_indices() -> &'static Mutex<ThreadIndices> { + static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new(); + fn init() -> Mutex<ThreadIndices> { + Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }) + } + THREAD_INDICES.get_or_init(init) +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = thread_indices().lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = thread_indices().lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +} diff --git a/third_party/rust/crossbeam-utils/src/sync/wait_group.rs b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs new file mode 100644 index 0000000000..19d6074157 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs @@ -0,0 +1,145 @@ +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use std::fmt; + +/// Enables threads to synchronize the beginning or end of some computation. +/// +/// # Wait groups vs barriers +/// +/// `WaitGroup` is very similar to [`Barrier`], but there are a few differences: +/// +/// * [`Barrier`] needs to know the number of threads at construction, while `WaitGroup` is cloned to +/// register more threads. +/// +/// * A [`Barrier`] can be reused even after all threads have synchronized, while a `WaitGroup` +/// synchronizes threads only once. +/// +/// * All threads wait for others to reach the [`Barrier`]. With `WaitGroup`, each thread can choose +/// to either wait for other threads or to continue without blocking. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::WaitGroup; +/// use std::thread; +/// +/// // Create a new wait group. +/// let wg = WaitGroup::new(); +/// +/// for _ in 0..4 { +/// // Create another reference to the wait group. +/// let wg = wg.clone(); +/// +/// thread::spawn(move || { +/// // Do some work. +/// +/// // Drop the reference to the wait group. +/// drop(wg); +/// }); +/// } +/// +/// // Block until all threads have finished their work. +/// wg.wait(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`Barrier`]: std::sync::Barrier +pub struct WaitGroup { + inner: Arc<Inner>, +} + +/// Inner state of a `WaitGroup`. +struct Inner { + cvar: Condvar, + count: Mutex<usize>, +} + +impl Default for WaitGroup { + fn default() -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(1), + }), + } + } +} + +impl WaitGroup { + /// Creates a new wait group and returns the single reference to it. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// ``` + pub fn new() -> Self { + Self::default() + } + + /// Drops this reference and waits until all other references are dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// + /// thread::spawn({ + /// let wg = wg.clone(); + /// move || { + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// } + /// }); + /// + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + pub fn wait(self) { + if *self.inner.count.lock().unwrap() == 1 { + return; + } + + let inner = self.inner.clone(); + drop(self); + + let mut count = inner.count.lock().unwrap(); + while *count > 0 { + count = inner.cvar.wait(count).unwrap(); + } + } +} + +impl Drop for WaitGroup { + fn drop(&mut self) { + let mut count = self.inner.count.lock().unwrap(); + *count -= 1; + + if *count == 0 { + self.inner.cvar.notify_all(); + } + } +} + +impl Clone for WaitGroup { + fn clone(&self) -> WaitGroup { + let mut count = self.inner.count.lock().unwrap(); + *count += 1; + + WaitGroup { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for WaitGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let count: &usize = &*self.inner.count.lock().unwrap(); + f.debug_struct("WaitGroup").field("count", count).finish() + } +} diff --git a/third_party/rust/crossbeam-utils/src/thread.rs b/third_party/rust/crossbeam-utils/src/thread.rs new file mode 100644 index 0000000000..f1086d9ec3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/thread.rs @@ -0,0 +1,587 @@ +//! Threads that can borrow variables from the stack. +//! +//! Create a scope when spawned threads need to access variables on the stack: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! thread::scope(|s| { +//! for person in &people { +//! s.spawn(move |_| { +//! println!("Hello, {}!", person); +//! }); +//! } +//! }).unwrap(); +//! ``` +//! +//! # Why scoped threads? +//! +//! Suppose we wanted to re-write the previous example using plain threads: +//! +//! ```compile_fail,E0597 +//! use std::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! let mut threads = Vec::new(); +//! +//! for person in &people { +//! threads.push(thread::spawn(move || { +//! println!("Hello, {}!", person); +//! })); +//! } +//! +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! This doesn't work because the borrow checker complains about `people` not living long enough: +//! +//! ```text +//! error[E0597]: `people` does not live long enough +//! --> src/main.rs:12:20 +//! | +//! 12 | for person in &people { +//! | ^^^^^^ borrowed value does not live long enough +//! ... +//! 21 | } +//! | - borrowed value only lives until here +//! | +//! = note: borrowed value must be valid for the static lifetime... +//! ``` +//! +//! The problem here is that spawned threads are not allowed to borrow variables on stack because +//! the compiler cannot prove they will be joined before `people` is destroyed. +//! +//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined +//! before the scope ends. +//! +//! # How scoped threads work +//! +//! If a variable is borrowed by a thread, the thread must complete before the variable is +//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the +//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. +//! +//! A scope creates a clear boundary between variables outside the scope and threads inside the +//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. +//! This way we guarantee to the borrow checker that scoped threads only live within the scope and +//! can safely access variables outside it. +//! +//! # Nesting scoped threads +//! +//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little +//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such +//! cannot be borrowed by scoped threads: +//! +//! ```compile_fail,E0373,E0521 +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! s.spawn(|_| { +//! // Not going to compile because we're trying to borrow `s`, +//! // which lives *inside* the scope! :( +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }); +//! ``` +//! +//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an +//! argument, which can be used for spawning nested threads: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! // Note the `|s|` here. +//! s.spawn(|s| { +//! // Yay, this works because we're using a fresh argument `s`! :) +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }).unwrap(); +//! ``` + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem; +use std::panic; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::sync::WaitGroup; +use cfg_if::cfg_if; + +type SharedVec<T> = Arc<Mutex<Vec<T>>>; +type SharedOption<T> = Arc<Mutex<Option<T>>>; + +/// Creates a new scope for spawning threads. +/// +/// All child threads that haven't been manually joined will be automatically joined just before +/// this function invocation ends. If all joined threads have successfully completed, `Ok` is +/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is +/// returned containing errors from panicked threads. Note that if panics are implemented by +/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// let var = vec![1, 2, 3]; +/// +/// thread::scope(|s| { +/// s.spawn(|_| { +/// println!("A child thread borrowing `var`: {:?}", var); +/// }); +/// }).unwrap(); +/// ``` +pub fn scope<'env, F, R>(f: F) -> thread::Result<R> +where + F: FnOnce(&Scope<'env>) -> R, +{ + let wg = WaitGroup::new(); + let scope = Scope::<'env> { + handles: SharedVec::default(), + wait_group: wg.clone(), + _marker: PhantomData, + }; + + // Execute the scoped function, but catch any panics. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); + + // Wait until all nested scopes are dropped. + drop(scope.wait_group); + wg.wait(); + + // Join all remaining spawned threads. + let panics: Vec<_> = scope + .handles + .lock() + .unwrap() + // Filter handles that haven't been joined, join them, and collect errors. + .drain(..) + .filter_map(|handle| handle.lock().unwrap().take()) + .filter_map(|handle| handle.join().err()) + .collect(); + + // If `f` has panicked, resume unwinding. + // If any of the child threads have panicked, return the panic errors. + // Otherwise, everything is OK and return the result of `f`. + match result { + Err(err) => panic::resume_unwind(err), + Ok(res) => { + if panics.is_empty() { + Ok(res) + } else { + Err(Box::new(panics)) + } + } + } +} + +/// A scope for spawning threads. +pub struct Scope<'env> { + /// The list of the thread join handles. + handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, + + /// Used to wait until all subscopes all dropped. + wait_group: WaitGroup, + + /// Borrows data with invariant lifetime `'env`. + _marker: PhantomData<&'env mut &'env ()>, +} + +unsafe impl Sync for Scope<'_> {} + +impl<'env> Scope<'env> { + /// Spawns a scoped thread. + /// + /// This method is similar to the [`spawn`] function in Rust's standard library. The difference + /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits, + /// allowing it to reference variables outside the scope. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned [handle](ScopedJoinHandle) can be used to manually + /// [join](ScopedJoinHandle::join) the thread before the scope exits. + /// + /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the + /// stack size or the name of the thread, use this API instead. + /// + /// [`spawn`]: std::thread::spawn + /// + /// # Panics + /// + /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`] + /// to recover from such errors. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + self.builder() + .spawn(f) + .expect("failed to spawn scoped thread") + } + + /// Creates a builder that can configure a thread before spawning. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .spawn(|_| println!("A child thread is running")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { + ScopedThreadBuilder { + scope: self, + builder: thread::Builder::new(), + } + } +} + +impl fmt::Debug for Scope<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Scope { .. }") + } +} + +/// Configures the properties of a new thread. +/// +/// The two configurable properties are: +/// +/// - [`name`]: Specifies an [associated name for the thread][naming-threads]. +/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size]. +/// +/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the +/// thread handle with the given configuration. +/// +/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return +/// value. You may want to use this builder when you want to recover from a failure to launch a +/// thread. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// thread::scope(|s| { +/// s.builder() +/// .spawn(|_| println!("Running a child thread")) +/// .unwrap(); +/// }).unwrap(); +/// ``` +/// +/// [`name`]: ScopedThreadBuilder::name +/// [`stack_size`]: ScopedThreadBuilder::stack_size +/// [`spawn`]: ScopedThreadBuilder::spawn +/// [`io::Result`]: std::io::Result +/// [naming-threads]: std::thread#naming-threads +/// [stack-size]: std::thread#stack-size +#[derive(Debug)] +pub struct ScopedThreadBuilder<'scope, 'env> { + scope: &'scope Scope<'env>, + builder: thread::Builder, +} + +impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { + /// Sets the name for the new thread. + /// + /// The name must not contain null bytes (`\0`). + /// + /// For more information about named threads, see [here][naming-threads]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// use std::thread::current; + /// + /// thread::scope(|s| { + /// s.builder() + /// .name("my thread".to_string()) + /// .spawn(|_| assert_eq!(current().name(), Some("my thread"))) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [naming-threads]: std::thread#naming-threads + pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.name(name); + self + } + + /// Sets the size of the stack for the new thread. + /// + /// The stack size is measured in bytes. + /// + /// For more information about the stack size for threads, see [here][stack-size]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .stack_size(32 * 1024) + /// .spawn(|_| println!("Running a child thread")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [stack-size]: std::thread#stack-size + pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.stack_size(size); + self + } + + /// Spawns a scoped thread with this configuration. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned handle can be used to manually join the thread before the scope exits. + /// + /// # Errors + /// + /// Unlike the [`Scope::spawn`] method, this method yields an + /// [`io::Result`] to capture any failure to create the thread at + /// the OS level. + /// + /// [`io::Result`]: std::io::Result + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.builder() + /// .spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }) + /// .unwrap(); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + // The result of `f` will be stored here. + let result = SharedOption::default(); + + // Spawn the thread and grab its join handle and thread handle. + let (handle, thread) = { + let result = Arc::clone(&result); + + // A clone of the scope that will be moved into the new thread. + let scope = Scope::<'env> { + handles: Arc::clone(&self.scope.handles), + wait_group: self.scope.wait_group.clone(), + _marker: PhantomData, + }; + + // Spawn the thread. + let handle = { + let closure = move || { + // Make sure the scope is inside the closure with the proper `'env` lifetime. + let scope: Scope<'env> = scope; + + // Run the closure. + let res = f(&scope); + + // Store the result if the closure didn't panic. + *result.lock().unwrap() = Some(res); + }; + + // Allocate `closure` on the heap and erase the `'env` bound. + let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure); + let closure: Box<dyn FnOnce() + Send + 'static> = + unsafe { mem::transmute(closure) }; + + // Finally, spawn the closure. + self.builder.spawn(closure)? + }; + + let thread = handle.thread().clone(); + let handle = Arc::new(Mutex::new(Some(handle))); + (handle, thread) + }; + + // Add the handle to the shared list of join handles. + self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); + + Ok(ScopedJoinHandle { + handle, + result, + thread, + _marker: PhantomData, + }) + } +} + +unsafe impl<T> Send for ScopedJoinHandle<'_, T> {} +unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {} + +/// A handle that can be used to join its scoped thread. +/// +/// This struct is created by the [`Scope::spawn`] method and the +/// [`ScopedThreadBuilder::spawn`] method. +pub struct ScopedJoinHandle<'scope, T> { + /// A join handle to the spawned thread. + handle: SharedOption<thread::JoinHandle<()>>, + + /// Holds the result of the inner closure. + result: SharedOption<T>, + + /// A handle to the the spawned thread. + thread: thread::Thread, + + /// Borrows the parent scope with lifetime `'scope`. + _marker: PhantomData<&'scope ()>, +} + +impl<T> ScopedJoinHandle<'_, T> { + /// Waits for the thread to finish and returns its result. + /// + /// If the child thread panics, an error is returned. Note that if panics are implemented by + /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. + /// + /// # Panics + /// + /// This function may panic on some platforms if a thread attempts to join itself or otherwise + /// may create a deadlock with joining threads. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)")); + /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :(")); + /// + /// // Join the first thread and verify that it succeeded. + /// let res = handle1.join(); + /// assert!(res.is_ok()); + /// + /// // Join the second thread and verify that it panicked. + /// let res = handle2.join(); + /// assert!(res.is_err()); + /// }).unwrap(); + /// ``` + pub fn join(self) -> thread::Result<T> { + // Take out the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap().take().unwrap(); + + // Join the thread and then take the result out of its inner closure. + handle + .join() + .map(|()| self.result.lock().unwrap().take().unwrap()) + } + + /// Returns a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| println!("A child thread is running")); + /// println!("The child thread ID: {:?}", handle.thread().id()); + /// }).unwrap(); + /// ``` + pub fn thread(&self) -> &thread::Thread { + &self.thread + } +} + +cfg_if! { + if #[cfg(unix)] { + use std::os::unix::thread::{JoinHandleExt, RawPthread}; + + impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> { + fn as_pthread_t(&self) -> RawPthread { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_pthread_t() + } + fn into_pthread_t(self) -> RawPthread { + self.as_pthread_t() + } + } + } else if #[cfg(windows)] { + use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle}; + + impl<T> AsRawHandle for ScopedJoinHandle<'_, T> { + fn as_raw_handle(&self) -> RawHandle { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_raw_handle() + } + } + + impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> { + fn into_raw_handle(self) -> RawHandle { + self.as_raw_handle() + } + } + } +} + +impl<T> fmt::Debug for ScopedJoinHandle<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("ScopedJoinHandle { .. }") + } +} |