diff options
Diffstat (limited to 'third_party/rust/crossbeam-utils-0.7.2/src')
13 files changed, 3312 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/atomic/atomic_cell.rs b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/atomic_cell.rs new file mode 100644 index 0000000000..cf0658aad4 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/atomic_cell.rs @@ -0,0 +1,890 @@ +use core::cell::UnsafeCell; +use core::fmt; +use core::mem; +use core::ptr; +use core::sync::atomic::{self, AtomicBool, Ordering}; + +#[cfg(feature = "std")] +use std::panic::{RefUnwindSafe, UnwindSafe}; + +use super::seq_lock::SeqLock; + +/// A thread-safe mutable memory location. +/// +/// This type is equivalent to [`Cell`], except it can also be shared among multiple threads. +/// +/// Operations on `AtomicCell`s use atomic instructions whenever possible, and synchronize using +/// global locks otherwise. You can call [`AtomicCell::<T>::is_lock_free()`] to check whether +/// atomic instructions or locks will be used. +/// +/// Atomic loads use the [`Acquire`] ordering and atomic stores use the [`Release`] ordering. +/// +/// [`Cell`]: https://doc.rust-lang.org/std/cell/struct.Cell.html +/// [`AtomicCell::<T>::is_lock_free()`]: struct.AtomicCell.html#method.is_lock_free +/// [`Acquire`]: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html#variant.Acquire +/// [`Release`]: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html#variant.Release +#[repr(transparent)] +pub struct AtomicCell<T: ?Sized> { + /// The inner value. + /// + /// If this value can be transmuted into a primitive atomic type, it will be treated as such. + /// Otherwise, all potentially concurrent operations on this data will be protected by a global + /// lock. + value: UnsafeCell<T>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +#[cfg(feature = "std")] +impl<T> UnwindSafe for AtomicCell<T> {} +#[cfg(feature = "std")] +impl<T> RefUnwindSafe for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + /// Creates a new atomic cell initialized with `val`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// ``` + #[cfg(not(has_min_const_fn))] + pub fn new(val: T) -> AtomicCell<T> { + AtomicCell { + value: UnsafeCell::new(val), + } + } + + /// Creates a new atomic cell initialized with `val`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// ``` + #[cfg(has_min_const_fn)] + pub const fn new(val: T) -> AtomicCell<T> { + AtomicCell { + value: UnsafeCell::new(val), + } + } + + /// Unwraps the atomic cell and returns its inner value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let mut a = AtomicCell::new(7); + /// let v = a.into_inner(); + /// + /// assert_eq!(v, 7); + /// ``` + pub fn into_inner(self) -> T { + self.value.into_inner() + } + + /// Returns `true` if operations on values of this type are lock-free. + /// + /// If the compiler or the platform doesn't support the necessary atomic instructions, + /// `AtomicCell<T>` will use global locks for every potentially concurrent atomic operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// // This type is internally represented as `AtomicUsize` so we can just use atomic + /// // operations provided by it. + /// assert_eq!(AtomicCell::<usize>::is_lock_free(), true); + /// + /// // A wrapper struct around `isize`. + /// struct Foo { + /// bar: isize, + /// } + /// // `AtomicCell<Foo>` will be internally represented as `AtomicIsize`. + /// assert_eq!(AtomicCell::<Foo>::is_lock_free(), true); + /// + /// // Operations on zero-sized types are always lock-free. + /// assert_eq!(AtomicCell::<()>::is_lock_free(), true); + /// + /// // Very large types cannot be represented as any of the standard atomic types, so atomic + /// // operations on them will have to use global locks for synchronization. + /// assert_eq!(AtomicCell::<[u8; 1000]>::is_lock_free(), false); + /// ``` + pub fn is_lock_free() -> bool { + atomic_is_lock_free::<T>() + } + + /// Stores `val` into the atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// a.store(8); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn store(&self, val: T) { + if mem::needs_drop::<T>() { + drop(self.swap(val)); + } else { + unsafe { + atomic_store(self.value.get(), val); + } + } + } + + /// Stores `val` into the atomic cell and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// assert_eq!(a.swap(8), 7); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn swap(&self, val: T) -> T { + unsafe { atomic_swap(self.value.get(), val) } + } +} + +impl<T: ?Sized> AtomicCell<T> { + /// Returns a raw pointer to the underlying data in this atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let mut a = AtomicCell::new(5); + /// + /// let ptr = a.as_ptr(); + /// ``` + #[inline] + pub fn as_ptr(&self) -> *mut T { + self.value.get() + } + + /// Returns a mutable reference to the inner value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let mut a = AtomicCell::new(7); + /// *a.get_mut() += 1; + /// + /// assert_eq!(a.load(), 8); + /// ``` + #[doc(hidden)] + #[deprecated(note = "this method is unsound and will be removed in the next release")] + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.value.get() } + } +} + +impl<T: Default> AtomicCell<T> { + /// Takes the value of the atomic cell, leaving `Default::default()` in its place. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(5); + /// let five = a.take(); + /// + /// assert_eq!(five, 5); + /// assert_eq!(a.into_inner(), 0); + /// ``` + pub fn take(&self) -> T { + self.swap(Default::default()) + } +} + +impl<T: Copy> AtomicCell<T> { + /// Loads a value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// ``` + pub fn load(&self) -> T { + unsafe { atomic_load(self.value.get()) } + } +} + +impl<T: Copy + Eq> AtomicCell<T> { + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is always the previous value. If it is equal to `current`, then the value + /// was updated. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_and_swap(2, 3), 1); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_and_swap(1, 2), 1); + /// assert_eq!(a.load(), 2); + /// ``` + pub fn compare_and_swap(&self, current: T, new: T) -> T { + match self.compare_exchange(current, new) { + Ok(v) => v, + Err(v) => v, + } + } + + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is a result indicating whether the new value was written and containing + /// the previous value. On success this value is guaranteed to be equal to `current`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_exchange(2, 3), Err(1)); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_exchange(1, 2), Ok(1)); + /// assert_eq!(a.load(), 2); + /// ``` + pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> { + unsafe { atomic_compare_exchange_weak(self.value.get(), current, new) } + } +} + +macro_rules! impl_arithmetic { + ($t:ty, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + if can_transmute::<$t, atomic::AtomicUsize>() { + let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; + a.fetch_add(val as usize, Ordering::AcqRel) as $t + } else { + let _guard = lock(self.value.get() as usize).write(); + let value = unsafe { &mut *(self.value.get()) }; + let old = *value; + *value = value.wrapping_add(val); + old + } + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + if can_transmute::<$t, atomic::AtomicUsize>() { + let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; + a.fetch_sub(val as usize, Ordering::AcqRel) as $t + } else { + let _guard = lock(self.value.get() as usize).write(); + let value = unsafe { &mut *(self.value.get()) }; + let old = *value; + *value = value.wrapping_sub(val); + old + } + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + if can_transmute::<$t, atomic::AtomicUsize>() { + let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; + a.fetch_and(val as usize, Ordering::AcqRel) as $t + } else { + let _guard = lock(self.value.get() as usize).write(); + let value = unsafe { &mut *(self.value.get()) }; + let old = *value; + *value &= val; + old + } + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + if can_transmute::<$t, atomic::AtomicUsize>() { + let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; + a.fetch_or(val as usize, Ordering::AcqRel) as $t + } else { + let _guard = lock(self.value.get() as usize).write(); + let value = unsafe { &mut *(self.value.get()) }; + let old = *value; + *value |= val; + old + } + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + if can_transmute::<$t, atomic::AtomicUsize>() { + let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; + a.fetch_xor(val as usize, Ordering::AcqRel) as $t + } else { + let _guard = lock(self.value.get() as usize).write(); + let value = unsafe { &mut *(self.value.get()) }; + let old = *value; + *value ^= val; + old + } + } + } + }; + ($t:ty, $atomic:ty, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + let a = unsafe { &*(self.value.get() as *const $atomic) }; + a.fetch_add(val, Ordering::AcqRel) + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + let a = unsafe { &*(self.value.get() as *const $atomic) }; + a.fetch_sub(val, Ordering::AcqRel) + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + let a = unsafe { &*(self.value.get() as *const $atomic) }; + a.fetch_and(val, Ordering::AcqRel) + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + let a = unsafe { &*(self.value.get() as *const $atomic) }; + a.fetch_or(val, Ordering::AcqRel) + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + let a = unsafe { &*(self.value.get() as *const $atomic) }; + a.fetch_xor(val, Ordering::AcqRel) + } + } + }; +} + +#[cfg(has_atomic_u8)] +impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);"); +#[cfg(has_atomic_u8)] +impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);"); +#[cfg(has_atomic_u16)] +impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);"); +#[cfg(has_atomic_u16)] +impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);"); +#[cfg(has_atomic_u32)] +impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);"); +#[cfg(has_atomic_u32)] +impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);"); +#[cfg(has_atomic_u64)] +impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);"); +#[cfg(has_atomic_u64)] +impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);"); +#[cfg(has_atomic_u128)] +impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);"); +#[cfg(has_atomic_u128)] +impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);"); + +impl_arithmetic!( + usize, + atomic::AtomicUsize, + "let a = AtomicCell::new(7usize);" +); +impl_arithmetic!( + isize, + atomic::AtomicIsize, + "let a = AtomicCell::new(7isize);" +); + +impl AtomicCell<bool> { + /// Applies logical "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_and(true), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_and(false), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_and(&self, val: bool) -> bool { + let a = unsafe { &*(self.value.get() as *const AtomicBool) }; + a.fetch_and(val, Ordering::AcqRel) + } + + /// Applies logical "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(false); + /// + /// assert_eq!(a.fetch_or(false), false); + /// assert_eq!(a.load(), false); + /// + /// assert_eq!(a.fetch_or(true), false); + /// assert_eq!(a.load(), true); + /// ``` + #[inline] + pub fn fetch_or(&self, val: bool) -> bool { + let a = unsafe { &*(self.value.get() as *const AtomicBool) }; + a.fetch_or(val, Ordering::AcqRel) + } + + /// Applies logical "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_xor(false), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_xor(true), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: bool) -> bool { + let a = unsafe { &*(self.value.get() as *const AtomicBool) }; + a.fetch_xor(val, Ordering::AcqRel) + } +} + +impl<T: Default> Default for AtomicCell<T> { + fn default() -> AtomicCell<T> { + AtomicCell::new(T::default()) + } +} + +impl<T: Copy + fmt::Debug> fmt::Debug for AtomicCell<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AtomicCell") + .field("value", &self.load()) + .finish() + } +} + +/// Returns `true` if values of type `A` can be transmuted into values of type `B`. +fn can_transmute<A, B>() -> bool { + // Sizes must be equal, but alignment of `A` must be greater or equal than that of `B`. + mem::size_of::<A>() == mem::size_of::<B>() && mem::align_of::<A>() >= mem::align_of::<B>() +} + +/// Returns a reference to the global lock associated with the `AtomicCell` at address `addr`. +/// +/// This function is used to protect atomic data which doesn't fit into any of the primitive atomic +/// types in `std::sync::atomic`. Operations on such atomics must therefore use a global lock. +/// +/// However, there is not only one global lock but an array of many locks, and one of them is +/// picked based on the given address. Having many locks reduces contention and improves +/// scalability. +#[inline] +#[must_use] +fn lock(addr: usize) -> &'static SeqLock { + // The number of locks is a prime number because we want to make sure `addr % LEN` gets + // dispersed across all locks. + // + // Note that addresses are always aligned to some power of 2, depending on type `T` in + // `AtomicCell<T>`. If `LEN` was an even number, then `addr % LEN` would be an even number, + // too, which means only half of the locks would get utilized! + // + // It is also possible for addresses to accidentally get aligned to a number that is not a + // power of 2. Consider this example: + // + // ``` + // #[repr(C)] + // struct Foo { + // a: AtomicCell<u8>, + // b: u8, + // c: u8, + // } + // ``` + // + // Now, if we have a slice of type `&[Foo]`, it is possible that field `a` in all items gets + // stored at addresses that are multiples of 3. It'd be too bad if `LEN` was divisible by 3. + // In order to protect from such cases, we simply choose a large prime number for `LEN`. + const LEN: usize = 97; + + const L: SeqLock = SeqLock::INIT; + + static LOCKS: [SeqLock; LEN] = [ + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, + ]; + + // If the modulus is a constant number, the compiler will use crazy math to transform this into + // a sequence of cheap arithmetic operations rather than using the slow modulo instruction. + &LOCKS[addr % LEN] +} + +/// An atomic `()`. +/// +/// All operations are noops. +struct AtomicUnit; + +impl AtomicUnit { + #[inline] + fn load(&self, _order: Ordering) {} + + #[inline] + fn store(&self, _val: (), _order: Ordering) {} + + #[inline] + fn swap(&self, _val: (), _order: Ordering) {} + + #[inline] + fn compare_exchange_weak( + &self, + _current: (), + _new: (), + _success: Ordering, + _failure: Ordering, + ) -> Result<(), ()> { + Ok(()) + } +} + +macro_rules! atomic { + // If values of type `$t` can be transmuted into values of the primitive atomic type `$atomic`, + // declares variable `$a` of type `$atomic` and executes `$atomic_op`, breaking out of the loop. + (@check, $t:ty, $atomic:ty, $a:ident, $atomic_op:expr) => { + if can_transmute::<$t, $atomic>() { + let $a: &$atomic; + break $atomic_op; + } + }; + + // If values of type `$t` can be transmuted into values of a primitive atomic type, declares + // variable `$a` of that type and executes `$atomic_op`. Otherwise, just executes + // `$fallback_op`. + ($t:ty, $a:ident, $atomic_op:expr, $fallback_op:expr) => { + loop { + atomic!(@check, $t, AtomicUnit, $a, $atomic_op); + atomic!(@check, $t, atomic::AtomicUsize, $a, $atomic_op); + + #[cfg(has_atomic_u8)] + atomic!(@check, $t, atomic::AtomicU8, $a, $atomic_op); + #[cfg(has_atomic_u16)] + atomic!(@check, $t, atomic::AtomicU16, $a, $atomic_op); + #[cfg(has_atomic_u32)] + atomic!(@check, $t, atomic::AtomicU32, $a, $atomic_op); + #[cfg(has_atomic_u64)] + atomic!(@check, $t, atomic::AtomicU64, $a, $atomic_op); + + break $fallback_op; + } + }; +} + +/// Returns `true` if operations on `AtomicCell<T>` are lock-free. +fn atomic_is_lock_free<T>() -> bool { + atomic! { T, _a, true, false } +} + +/// Atomically reads data from `src`. +/// +/// This operation uses the `Acquire` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_load<T>(src: *mut T) -> T +where + T: Copy, +{ + atomic! { + T, a, + { + a = &*(src as *const _ as *const _); + mem::transmute_copy(&a.load(Ordering::Acquire)) + }, + { + let lock = lock(src as usize); + + // Try doing an optimistic read first. + if let Some(stamp) = lock.optimistic_read() { + // We need a volatile read here because other threads might concurrently modify the + // value. In theory, data races are *always* UB, even if we use volatile reads and + // discard the data when a data race is detected. The proper solution would be to + // do atomic reads and atomic writes, but we can't atomically read and write all + // kinds of data since `AtomicU8` is not available on stable Rust yet. + let val = ptr::read_volatile(src); + + if lock.validate_read(stamp) { + return val; + } + } + + // Grab a regular write lock so that writers don't starve this load. + let guard = lock.write(); + let val = ptr::read(src); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + val + } + } +} + +/// Atomically writes `val` to `dst`. +/// +/// This operation uses the `Release` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_store<T>(dst: *mut T, val: T) { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + a.store(mem::transmute_copy(&val), Ordering::Release); + mem::forget(val); + }, + { + let _guard = lock(dst as usize).write(); + ptr::write(dst, val); + } + } +} + +/// Atomically swaps data at `dst` with `val`. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_swap<T>(dst: *mut T, val: T) -> T { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let res = mem::transmute_copy(&a.swap(mem::transmute_copy(&val), Ordering::AcqRel)); + mem::forget(val); + res + }, + { + let _guard = lock(dst as usize).write(); + ptr::replace(dst, val) + } + } +} + +/// Atomically compares data at `dst` to `current` and, if equal byte-for-byte, exchanges data at +/// `dst` with `new`. +/// +/// Returns the old value on success, or the current value at `dst` on failure. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_compare_exchange_weak<T>(dst: *mut T, mut current: T, new: T) -> Result<T, T> +where + T: Copy + Eq, +{ + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let mut current_raw = mem::transmute_copy(¤t); + let new_raw = mem::transmute_copy(&new); + + loop { + match a.compare_exchange_weak( + current_raw, + new_raw, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break Ok(current), + Err(previous_raw) => { + let previous = mem::transmute_copy(&previous_raw); + + if !T::eq(&previous, ¤t) { + break Err(previous); + } + + // The compare-exchange operation has failed and didn't store `new`. The + // failure is either spurious, or `previous` was semantically equal to + // `current` but not byte-equal. Let's retry with `previous` as the new + // `current`. + current = previous; + current_raw = previous_raw; + } + } + } + }, + { + let guard = lock(dst as usize).write(); + + if T::eq(&*dst, ¤t) { + Ok(ptr::replace(dst, new)) + } else { + let val = ptr::read(dst); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + Err(val) + } + } + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/atomic/consume.rs b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/consume.rs new file mode 100644 index 0000000000..9be5464fb3 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/consume.rs @@ -0,0 +1,82 @@ +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +use core::sync::atomic::compiler_fence; +use core::sync::atomic::Ordering; + +/// Trait which allows reading from primitive atomic types with "consume" ordering. +pub trait AtomicConsume { + /// Type returned by `load_consume`. + type Val; + + /// Loads a value from the atomic using a "consume" memory ordering. + /// + /// This is similar to the "acquire" ordering, except that an ordering is + /// only guaranteed with operations that "depend on" the result of the load. + /// However consume loads are usually much faster than acquire loads on + /// architectures with a weak memory model since they don't require memory + /// fence instructions. + /// + /// The exact definition of "depend on" is a bit vague, but it works as you + /// would expect in practice since a lot of software, especially the Linux + /// kernel, rely on this behavior. + /// + /// This is currently only implemented on ARM and AArch64, where a fence + /// can be avoided. On other architectures this will fall back to a simple + /// `load(Ordering::Acquire)`. + fn load_consume(&self) -> Self::Val; +} + +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + let result = self.load(Ordering::Relaxed); + compiler_fence(Ordering::Acquire); + result + } + }; +} + +#[cfg(not(any(target_arch = "arm", target_arch = "aarch64")))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + self.load(Ordering::Acquire) + } + }; +} + +macro_rules! impl_atomic { + ($atomic:ident, $val:ty) => { + impl AtomicConsume for ::core::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } + }; +} + +impl_atomic!(AtomicBool, bool); +impl_atomic!(AtomicUsize, usize); +impl_atomic!(AtomicIsize, isize); +#[cfg(all(feature = "nightly", target_has_atomic = "8"))] +impl_atomic!(AtomicU8, u8); +#[cfg(all(feature = "nightly", target_has_atomic = "8"))] +impl_atomic!(AtomicI8, i8); +#[cfg(all(feature = "nightly", target_has_atomic = "16"))] +impl_atomic!(AtomicU16, u16); +#[cfg(all(feature = "nightly", target_has_atomic = "16"))] +impl_atomic!(AtomicI16, i16); +#[cfg(all(feature = "nightly", target_has_atomic = "32"))] +impl_atomic!(AtomicU32, u32); +#[cfg(all(feature = "nightly", target_has_atomic = "32"))] +impl_atomic!(AtomicI32, i32); +#[cfg(all(feature = "nightly", target_has_atomic = "64"))] +impl_atomic!(AtomicU64, u64); +#[cfg(all(feature = "nightly", target_has_atomic = "64"))] +impl_atomic!(AtomicI64, i64); + +impl<T> AtomicConsume for ::core::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/atomic/mod.rs b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/mod.rs new file mode 100644 index 0000000000..074b0ca53f --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/mod.rs @@ -0,0 +1,25 @@ +//! Atomic types. + +cfg_if! { + // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap + // around. + // + // We are ignoring too wide architectures (pointer width >= 256), since such a system will not + // appear in a conceivable future. + // + // In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be + // vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the + // counter will not be increased that fast. + if #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] { + mod seq_lock; + } else { + #[path = "seq_lock_wide.rs"] + mod seq_lock; + } +} + +mod atomic_cell; +mod consume; + +pub use self::atomic_cell::AtomicCell; +pub use self::consume::AtomicConsume; diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock.rs b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock.rs new file mode 100644 index 0000000000..533a036b5d --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock.rs @@ -0,0 +1,88 @@ +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use Backoff; + +/// A simple stamped lock. +pub struct SeqLock { + /// The current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state + /// equals 1 and doesn't contain a valid stamp. + state: AtomicUsize, +} + +impl SeqLock { + pub const INIT: Self = Self { + state: AtomicUsize::new(0), + }; + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub fn optimistic_read(&self) -> Option<usize> { + let state = self.state.load(Ordering::Acquire); + if state == 1 { + None + } else { + Some(state) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub fn validate_read(&self, stamp: usize) -> bool { + atomic::fence(Ordering::Acquire); + self.state.load(Ordering::Relaxed) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state.swap(1, Ordering::Acquire); + + if previous != 1 { + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub fn abort(self) { + self.lock.state.store(self.state, Ordering::Release); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + // Release the lock and increment the stamp. + self.lock + .state + .store(self.state.wrapping_add(2), Ordering::Release); + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock_wide.rs b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock_wide.rs new file mode 100644 index 0000000000..857c074f59 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/atomic/seq_lock_wide.rs @@ -0,0 +1,134 @@ +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use Backoff; + +/// A simple stamped lock. +/// +/// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low +/// bits. +pub struct SeqLock { + /// The high bits of the current state of the lock. + state_hi: AtomicUsize, + + /// The low bits of the current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state_lo + /// equals 1 and doesn't contain a valid stamp. + state_lo: AtomicUsize, +} + +impl SeqLock { + pub const INIT: Self = Self { + state_hi: AtomicUsize::new(0), + state_lo: AtomicUsize::new(0), + }; + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub fn optimistic_read(&self) -> Option<(usize, usize)> { + // The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in + // `SeqLockWriteGuard::drop`. + // + // As a consequence, we can make sure that (1) all writes within the era of `state_hi - 1` + // happens before now; and therefore, (2) if `state_lo` is even, all writes within the + // critical section of (`state_hi`, `state_lo`) happens before now. + let state_hi = self.state_hi.load(Ordering::Acquire); + let state_lo = self.state_lo.load(Ordering::Acquire); + if state_lo == 1 { + None + } else { + Some((state_hi, state_lo)) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub fn validate_read(&self, stamp: (usize, usize)) -> bool { + // Thanks to the fence, if we're noticing any modification to the data at the critical + // section of `(a, b)`, then the critical section's write of 1 to state_lo should be + // visible. + atomic::fence(Ordering::Acquire); + + // So if `state_lo` coincides with `stamp.1`, then either (1) we're noticing no modification + // to the data after the critical section of `(stamp.0, stamp.1)`, or (2) `state_lo` wrapped + // around. + // + // If (2) is the case, the acquire ordering ensures we see the new value of `state_hi`. + let state_lo = self.state_lo.load(Ordering::Acquire); + + // If (2) is the case and `state_hi` coincides with `stamp.0`, then `state_hi` also wrapped + // around, which we give up to correctly validate the read. + let state_hi = self.state_hi.load(Ordering::Relaxed); + + // Except for the case that both `state_hi` and `state_lo` wrapped around, the following + // condition implies that we're noticing no modification to the data after the critical + // section of `(stamp.0, stamp.1)`. + (state_hi, state_lo) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state_lo.swap(1, Ordering::Acquire); + + if previous != 1 { + // To synchronize with the acquire fence in `validate_read` via any modification to + // the data at the critical section of `(state_hi, previous)`. + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state_lo: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state_lo: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub fn abort(self) { + self.lock.state_lo.store(self.state_lo, Ordering::Release); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + let state_lo = self.state_lo.wrapping_add(2); + + // Increase the high bits if the low bits wrap around. + // + // Release ordering for synchronizing with `optimistic_read`. + if state_lo == 0 { + let state_hi = self.lock.state_hi.load(Ordering::Relaxed); + self.lock + .state_hi + .store(state_hi.wrapping_add(1), Ordering::Release); + } + + // Release the lock and increment the stamp. + // + // Release ordering for synchronizing with `optimistic_read`. + self.lock.state_lo.store(state_lo, Ordering::Release); + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/backoff.rs b/third_party/rust/crossbeam-utils-0.7.2/src/backoff.rs new file mode 100644 index 0000000000..446755bbc7 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/backoff.rs @@ -0,0 +1,292 @@ +use core::cell::Cell; +use core::fmt; +use core::sync::atomic; + +const SPIN_LIMIT: u32 = 6; +const YIELD_LIMIT: u32 = 10; + +/// Performs exponential backoff in spin loops. +/// +/// Backing off in spin loops reduces contention and improves overall performance. +/// +/// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS +/// scheduler, and tell when is a good time to block the thread using a different synchronization +/// mechanism. Each step of the back off procedure takes roughly twice as long as the previous +/// step. +/// +/// # Examples +/// +/// Backing off in a lock-free loop: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicUsize; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { +/// let backoff = Backoff::new(); +/// loop { +/// let val = a.load(SeqCst); +/// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val { +/// return val; +/// } +/// backoff.spin(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true`: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn spin_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// backoff.snooze(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait. +/// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling +/// [`unpark()`]: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// use std::thread; +/// +/// fn blocking_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// if backoff.is_completed() { +/// thread::park(); +/// } else { +/// backoff.snooze(); +/// } +/// } +/// } +/// ``` +/// +/// [`is_completed`]: struct.Backoff.html#method.is_completed +/// [`std::thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html +/// [`Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html +/// [`AtomicBool`]: https://doc.rust-lang.org/std/sync/atomic/struct.AtomicBool.html +/// [`unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark +pub struct Backoff { + step: Cell<u32>, +} + +impl Backoff { + /// Creates a new `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// ``` + #[inline] + pub fn new() -> Self { + Backoff { step: Cell::new(0) } + } + + /// Resets the `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// backoff.reset(); + /// ``` + #[inline] + pub fn reset(&self) { + self.step.set(0); + } + + /// Backs off in a lock-free loop. + /// + /// This method should be used when we need to retry an operation because another thread made + /// progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction. + /// + /// # Examples + /// + /// Backing off in a lock-free loop: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::atomic::AtomicUsize; + /// use std::sync::atomic::Ordering::SeqCst; + /// + /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { + /// let backoff = Backoff::new(); + /// loop { + /// let val = a.load(SeqCst); + /// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val { + /// return val; + /// } + /// backoff.spin(); + /// } + /// } + /// + /// let a = AtomicUsize::new(7); + /// assert_eq!(fetch_mul(&a, 8), 7); + /// assert_eq!(a.load(SeqCst), 56); + /// ``` + #[inline] + pub fn spin(&self) { + for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { + atomic::spin_loop_hint(); + } + + if self.step.get() <= SPIN_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Backs off in a blocking loop. + /// + /// This method should be used when we need to wait for another thread to make progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread + /// may yield by giving up a timeslice to the OS scheduler. + /// + /// In `#[no_std]` environments, this method is equivalent to [`spin`]. + /// + /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and + /// block the current thread using a different synchronization mechanism instead. + /// + /// [`spin`]: struct.Backoff.html#method.spin + /// [`is_completed`]: struct.Backoff.html#method.is_completed + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true`: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn spin_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// backoff.snooze(); + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// spin_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// ``` + /// + /// [`AtomicBool`]: https://doc.rust-lang.org/std/sync/atomic/struct.AtomicBool.html + #[inline] + pub fn snooze(&self) { + if self.step.get() <= SPIN_LIMIT { + for _ in 0..1 << self.step.get() { + atomic::spin_loop_hint(); + } + } else { + #[cfg(not(feature = "std"))] + for _ in 0..1 << self.step.get() { + atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + ::std::thread::yield_now(); + } + + if self.step.get() <= YIELD_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Returns `true` if exponential backoff has completed and blocking the thread is advised. + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn blocking_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// if backoff.is_completed() { + /// thread::park(); + /// } else { + /// backoff.snooze(); + /// } + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// let waiter = thread::current(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// waiter.unpark(); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// blocking_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// ``` + /// + /// [`AtomicBool`]: https://doc.rust-lang.org/std/sync/atomic/struct.AtomicBool.html + #[inline] + pub fn is_completed(&self) -> bool { + self.step.get() > YIELD_LIMIT + } + + #[inline] + #[doc(hidden)] + #[deprecated(note = "use `is_completed` instead")] + pub fn is_complete(&self) -> bool { + self.is_completed() + } +} + +impl fmt::Debug for Backoff { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Backoff") + .field("step", &self.step) + .field("is_completed", &self.is_completed()) + .finish() + } +} + +impl Default for Backoff { + fn default() -> Backoff { + Backoff::new() + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/cache_padded.rs b/third_party/rust/crossbeam-utils-0.7.2/src/cache_padded.rs new file mode 100644 index 0000000000..bb864a4191 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/cache_padded.rs @@ -0,0 +1,131 @@ +use core::fmt; +use core::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +/// +/// In concurrent programming, sometimes it is desirable to make sure commonly accessed pieces of +/// data are not placed into the same cache line. Updating an atomic value invalides the whole +/// cache line it belongs to, which makes the next access to the same cache line slower for other +/// CPU cores. Use `CachePadded` to ensure updating one piece of data doesn't invalidate other +/// cached data. +/// +/// # Size and alignment +/// +/// Cache lines are assumed to be N bytes long, depending on the architecture: +/// +/// * On x86-64, N = 128. +/// * On all others, N = 64. +/// +/// Note that N is just a reasonable guess and is not guaranteed to match the actual cache line +/// length of the machine the program is running on. On modern Intel architectures, spatial +/// prefetcher is pulling pairs of 64-byte cache lines at a time, so we pessimistically assume that +/// cache lines are 128 bytes long. +/// +/// The size of `CachePadded<T>` is the smallest multiple of N bytes large enough to accommodate +/// a value of type `T`. +/// +/// The alignment of `CachePadded<T>` is the maximum of N bytes and the alignment of `T`. +/// +/// # Examples +/// +/// Alignment and padding: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// +/// let array = [CachePadded::new(1i8), CachePadded::new(2i8)]; +/// let addr1 = &*array[0] as *const i8 as usize; +/// let addr2 = &*array[1] as *const i8 as usize; +/// +/// assert!(addr2 - addr1 >= 64); +/// assert_eq!(addr1 % 64, 0); +/// assert_eq!(addr2 % 64, 0); +/// ``` +/// +/// When building a concurrent queue with a head and a tail index, it is wise to place them in +/// different cache lines so that concurrent threads pushing and popping elements don't invalidate +/// each other's cache lines: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// use std::sync::atomic::AtomicUsize; +/// +/// struct Queue<T> { +/// head: CachePadded<AtomicUsize>, +/// tail: CachePadded<AtomicUsize>, +/// buffer: *mut T, +/// } +/// ``` +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +#[cfg_attr(target_arch = "x86_64", repr(align(128)))] +#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] +pub struct CachePadded<T> { + value: T, +} + +unsafe impl<T: Send> Send for CachePadded<T> {} +unsafe impl<T: Sync> Sync for CachePadded<T> {} + +impl<T> CachePadded<T> { + /// Pads and aligns a value to the length of a cache line. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(1); + /// ``` + pub fn new(t: T) -> CachePadded<T> { + CachePadded::<T> { value: t } + } + + /// Returns the inner value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(7); + /// let value = padded_value.into_inner(); + /// assert_eq!(value, 7); + /// ``` + pub fn into_inner(self) -> T { + self.value + } +} + +impl<T> Deref for CachePadded<T> { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl<T> DerefMut for CachePadded<T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<T: fmt::Debug> fmt::Debug for CachePadded<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("CachePadded") + .field("value", &self.value) + .finish() + } +} + +impl<T> From<T> for CachePadded<T> { + fn from(t: T) -> Self { + CachePadded::new(t) + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/lib.rs b/third_party/rust/crossbeam-utils-0.7.2/src/lib.rs new file mode 100644 index 0000000000..06f23beb47 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/lib.rs @@ -0,0 +1,64 @@ +//! Miscellaneous tools for concurrent programming. +//! +//! ## Atomics +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. +//! +//! ## Thread synchronization +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. +//! +//! ## Utilities +//! +//! * [`Backoff`], for exponential backoff in spin loops. +//! * [`CachePadded`], for padding and aligning a value to the length of a cache line. +//! * [`scope`], for spawning threads that borrow local variables from the stack. +//! +//! [`AtomicCell`]: atomic/struct.AtomicCell.html +//! [`AtomicConsume`]: atomic/trait.AtomicConsume.html +//! [`Parker`]: sync/struct.Parker.html +//! [`ShardedLock`]: sync/struct.ShardedLock.html +//! [`WaitGroup`]: sync/struct.WaitGroup.html +//! [`Backoff`]: struct.Backoff.html +//! [`CachePadded`]: struct.CachePadded.html +//! [`scope`]: thread/fn.scope.html + +#![warn(missing_docs)] +#![warn(missing_debug_implementations)] +#![cfg_attr(not(feature = "std"), no_std)] +#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] + +#[macro_use] +extern crate cfg_if; +#[cfg(feature = "std")] +extern crate core; + +cfg_if! { + if #[cfg(feature = "alloc")] { + extern crate alloc; + } else if #[cfg(feature = "std")] { + extern crate std as alloc; + } +} + +#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] +pub mod atomic; + +mod cache_padded; +pub use cache_padded::CachePadded; + +mod backoff; +pub use backoff::Backoff; + +cfg_if! { + if #[cfg(feature = "std")] { + #[macro_use] + extern crate lazy_static; + + pub mod sync; + pub mod thread; + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/sync/mod.rs b/third_party/rust/crossbeam-utils-0.7.2/src/sync/mod.rs new file mode 100644 index 0000000000..3634963725 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/sync/mod.rs @@ -0,0 +1,17 @@ +//! Thread synchronization primitives. +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. +//! +//! [`Parker`]: struct.Parker.html +//! [`ShardedLock`]: struct.ShardedLock.html +//! [`WaitGroup`]: struct.WaitGroup.html + +mod parker; +mod sharded_lock; +mod wait_group; + +pub use self::parker::{Parker, Unparker}; +pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; +pub use self::wait_group::WaitGroup; diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/sync/parker.rs b/third_party/rust/crossbeam-utils-0.7.2/src/sync/parker.rs new file mode 100644 index 0000000000..051afe512d --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/sync/parker.rs @@ -0,0 +1,315 @@ +use std::fmt; +use std::marker::PhantomData; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; + +/// A thread parking primitive. +/// +/// Conceptually, each `Parker` has an associated token which is initially not present: +/// +/// * The [`park`] method blocks the current thread unless or until the token is available, at +/// which point it automatically consumes the token. It may also return *spuriously*, without +/// consuming the token. +/// +/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum +/// time. +/// +/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the +/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call +/// returning immediately. +/// +/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using +/// [`park`] and [`unpark`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_utils::sync::Parker; +/// +/// let mut p = Parker::new(); +/// let u = p.unparker().clone(); +/// +/// // Make the token available. +/// u.unpark(); +/// // Wakes up immediately and consumes the token. +/// p.park(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// u.unpark(); +/// }); +/// +/// // Wakes up when `u.unpark()` provides the token, but may also wake up +/// // spuriously before that without consuming the token. +/// p.park(); +/// ``` +/// +/// [`park`]: struct.Parker.html#method.park +/// [`park_timeout`]: struct.Parker.html#method.park_timeout +/// [`unpark`]: struct.Unparker.html#method.unpark +pub struct Parker { + unparker: Unparker, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for Parker {} + +impl Parker { + /// Creates a new `Parker`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// ``` + /// + pub fn new() -> Parker { + Parker { + unparker: Unparker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + }, + _marker: PhantomData, + } + } + + /// Blocks the current thread until the token is made available. + /// + /// A call to `park` may wake up spuriously without consuming the token, and callers should be + /// prepared for this possibility. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let mut p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + pub fn park(&self) { + self.unparker.inner.park(None); + } + + /// Blocks the current thread until the token is made available, but only for a limited time. + /// + /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers + /// should be prepared for this possibility. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let mut p = Parker::new(); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_timeout(Duration::from_millis(500)); + /// ``` + pub fn park_timeout(&self, timeout: Duration) { + self.unparker.inner.park(Some(timeout)); + } + + /// Returns a reference to an associated [`Unparker`]. + /// + /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let mut p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + /// + /// [`park`]: struct.Parker.html#method.park + /// [`park_timeout`]: struct.Parker.html#method.park_timeout + /// + /// [`Unparker`]: struct.Unparker.html + pub fn unparker(&self) -> &Unparker { + &self.unparker + } +} + +impl fmt::Debug for Parker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Parker { .. }") + } +} + +/// Unparks a thread parked by the associated [`Parker`]. +/// +/// [`Parker`]: struct.Parker.html +pub struct Unparker { + inner: Arc<Inner>, +} + +unsafe impl Send for Unparker {} +unsafe impl Sync for Unparker {} + +impl Unparker { + /// Atomically makes the token available if it is not already. + /// + /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is + /// any. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let mut p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// u.unpark(); + /// }); + /// + /// // Wakes up when `u.unpark()` provides the token, but may also wake up + /// // spuriously before that without consuming the token. + /// p.park(); + /// ``` + /// + /// [`park`]: struct.Parker.html#method.park + /// [`park_timeout`]: struct.Parker.html#method.park_timeout + pub fn unpark(&self) { + self.inner.unpark() + } +} + +impl fmt::Debug for Unparker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Unparker { .. }") + } +} + +impl Clone for Unparker { + fn clone(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Inner { + fn park(&self, timeout: Option<Duration>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(ref dur) = timeout { + if *dur == Duration::from_millis(0) { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + match timeout { + None => { + loop { + // Block the current thread on the conditional variable. + m = self.cvar.wait(m).unwrap(); + + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => return, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } + } + } + Some(timeout) => { + // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a + // notification we just want to unconditionally set `state` back to `EMPTY`, either + // consuming a notification or un-flagging ourselves as parked. + let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification + PARKED => {} // no notification + n => panic!("inconsistent park_timeout state: {}", n), + } + } + } + } + + pub fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before this call, we must + // perform a release operation that `park` can synchronize with. To do that we must write + // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather + // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to `PARKED` (or last + // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. + // If we were to notify during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this + // stage so we can acquire `lock` to wait until it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes + // it doesn't get woken only to have to wait for us to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one(); + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/sync/sharded_lock.rs b/third_party/rust/crossbeam-utils-0.7.2/src/sync/sharded_lock.rs new file mode 100644 index 0000000000..bd269d1584 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/sync/sharded_lock.rs @@ -0,0 +1,608 @@ +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; +use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::thread::{self, ThreadId}; + +use CachePadded; + +/// The number of shards per sharded lock. Must be a power of two. +const NUM_SHARDS: usize = 8; + +/// A shard containing a single reader-writer lock. +struct Shard { + /// The inner reader-writer lock. + lock: RwLock<()>, + + /// The write-guard keeping this shard locked. + /// + /// Write operations will lock each shard and store the guard here. These guards get dropped at + /// the same time the big guard is dropped. + write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>, +} + +/// A sharded reader-writer lock. +/// +/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations +/// are slower. +/// +/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a +/// single cache line. Read operations will pick one of the shards depending on the current thread +/// and lock it. Write operations need to lock all shards in succession. +/// +/// By splitting the lock into shards, concurrent read operations will in most cases choose +/// different shards and thus update different cache lines, which is good for scalability. However, +/// write operations need to do more work and are therefore slower than usual. +/// +/// The priority policy of the lock is dependent on the underlying operating system's +/// implementation, and this type does not guarantee that any particular policy will be used. +/// +/// # Poisoning +/// +/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be +/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any +/// read operation, the lock will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::ShardedLock; +/// +/// let lock = ShardedLock::new(5); +/// +/// // Any number of read locks can be held at once. +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // Read locks are dropped at this point. +/// +/// // However, only one write lock may be held. +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // Write lock is dropped here. +/// ``` +/// +/// [`RwLock`]: https://doc.rust-lang.org/std/sync/struct.RwLock.html +pub struct ShardedLock<T: ?Sized> { + /// A list of locks protecting the internal data. + shards: Box<[CachePadded<Shard>]>, + + /// The internal data. + value: UnsafeCell<T>, +} + +unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {} +unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {} + +impl<T: ?Sized> UnwindSafe for ShardedLock<T> {} +impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {} + +impl<T> ShardedLock<T> { + /// Creates a new sharded reader-writer lock. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(5); + /// ``` + pub fn new(value: T) -> ShardedLock<T> { + ShardedLock { + shards: (0..NUM_SHARDS) + .map(|_| { + CachePadded::new(Shard { + lock: RwLock::new(()), + write_guard: UnsafeCell::new(None), + }) + }) + .collect::<Vec<_>>() + .into_boxed_slice(), + value: UnsafeCell::new(value), + } + } + + /// Consumes this lock, returning the underlying data. + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + pub fn into_inner(self) -> LockResult<T> { + let is_poisoned = self.is_poisoned(); + let inner = self.value.into_inner(); + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } +} + +impl<T: ?Sized> ShardedLock<T> { + /// Returns `true` if the lock is poisoned. + /// + /// If another thread can still access the lock, it may become poisoned at any time. A `false` + /// result should not be trusted without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + pub fn is_poisoned(&self) -> bool { + self.shards[0].lock.is_poisoned() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the lock mutably, no actual locking needs to take place. + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let mut lock = ShardedLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let is_poisoned = self.is_poisoned(); + let inner = unsafe { &mut *self.value.get() }; + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } + + /// Attempts to acquire this lock with shared read access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the shared access when it is dropped. This method does not + /// provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.try_read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(TryLockError::Poisoned(err)) => { + let guard = ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } + Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock), + } + } + + /// Locks with shared read access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the shared access when dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + pub fn read(&self) -> LockResult<ShardedLockReadGuard<T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(err) => Err(PoisonError::new(ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + })), + } + } + + /// Attempts to acquire this lock with exclusive write access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the exclusive access when it is dropped. This method does + /// not provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<T>> { + let mut poisoned = false; + let mut blocked = None; + + // Write-lock each shard in succession. + for (i, shard) in self.shards.iter().enumerate() { + let guard = match shard.lock.try_write() { + Ok(guard) => guard, + Err(TryLockError::Poisoned(err)) => { + poisoned = true; + err.into_inner() + } + Err(TryLockError::WouldBlock) => { + blocked = Some(i); + break; + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if let Some(i) = blocked { + // Unlock the shards in reverse order of locking. + for shard in self.shards[0..i].iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + Err(TryLockError::WouldBlock) + } else if poisoned { + let guard = ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } + + /// Locks with exclusive write access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the exclusive access when dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + pub fn write(&self) -> LockResult<ShardedLockWriteGuard<T>> { + let mut poisoned = false; + + // Write-lock each shard in succession. + for shard in self.shards.iter() { + let guard = match shard.lock.write() { + Ok(guard) => guard, + Err(err) => { + poisoned = true; + err.into_inner() + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'_, ()> = guard; + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if poisoned { + Err(PoisonError::new(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + })) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_read() { + Ok(guard) => f + .debug_struct("ShardedLock") + .field("data", &&*guard) + .finish(), + Err(TryLockError::Poisoned(err)) => f + .debug_struct("ShardedLock") + .field("data", &&**err.get_ref()) + .finish(), + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("<locked>") + } + } + f.debug_struct("ShardedLock") + .field("data", &LockedPlaceholder) + .finish() + } + } + } +} + +impl<T: Default> Default for ShardedLock<T> { + fn default() -> ShardedLock<T> { + ShardedLock::new(Default::default()) + } +} + +impl<T> From<T> for ShardedLock<T> { + fn from(t: T) -> Self { + ShardedLock::new(t) + } +} + +/// A guard used to release the shared read access of a [`ShardedLock`] when dropped. +/// +/// [`ShardedLock`]: struct.ShardedLock.html +pub struct ShardedLockReadGuard<'a, T: ?Sized + 'a> { + lock: &'a ShardedLock<T>, + _guard: RwLockReadGuard<'a, ()>, + _marker: PhantomData<RwLockReadGuard<'a, T>>, +} + +unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockReadGuard<'a, T> {} + +impl<'a, T: ?Sized> Deref for ShardedLockReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ShardedLockReadGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(f) + } +} + +/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped. +/// +/// [`ShardedLock`]: struct.ShardedLock.html +pub struct ShardedLockWriteGuard<'a, T: ?Sized + 'a> { + lock: &'a ShardedLock<T>, + _marker: PhantomData<RwLockWriteGuard<'a, T>>, +} + +unsafe impl<'a, T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'a, T> {} + +impl<'a, T: ?Sized> Drop for ShardedLockWriteGuard<'a, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.lock.shards.iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + } +} + +impl<'a, T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ShardedLockWriteGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(f) + } +} + +impl<'a, T: ?Sized> Deref for ShardedLockWriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<'a, T: ?Sized> DerefMut for ShardedLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. While there are no particular guarantees, indices +/// usually tend to be consecutive numbers between 0 and the number of running threads. +/// +/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is +/// tearing down. +#[inline] +fn current_index() -> Option<usize> { + REGISTRATION.try_with(|reg| reg.index).ok() +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap<ThreadId, usize>, + + /// A list of free indices. + free_list: Vec<usize>, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +lazy_static! { + static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }); +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = THREAD_INDICES.lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = THREAD_INDICES.lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/sync/wait_group.rs b/third_party/rust/crossbeam-utils-0.7.2/src/sync/wait_group.rs new file mode 100644 index 0000000000..0527b31593 --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/sync/wait_group.rs @@ -0,0 +1,137 @@ +use std::fmt; +use std::sync::{Arc, Condvar, Mutex}; + +/// Enables threads to synchronize the beginning or end of some computation. +/// +/// # Wait groups vs barriers +/// +/// `WaitGroup` is very similar to [`Barrier`], but there are a few differences: +/// +/// * `Barrier` needs to know the number of threads at construction, while `WaitGroup` is cloned to +/// register more threads. +/// +/// * A `Barrier` can be reused even after all threads have synchronized, while a `WaitGroup` +/// synchronizes threads only once. +/// +/// * All threads wait for others to reach the `Barrier`. With `WaitGroup`, each thread can choose +/// to either wait for other threads or to continue without blocking. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::WaitGroup; +/// use std::thread; +/// +/// // Create a new wait group. +/// let wg = WaitGroup::new(); +/// +/// for _ in 0..4 { +/// // Create another reference to the wait group. +/// let wg = wg.clone(); +/// +/// thread::spawn(move || { +/// // Do some work. +/// +/// // Drop the reference to the wait group. +/// drop(wg); +/// }); +/// } +/// +/// // Block until all threads have finished their work. +/// wg.wait(); +/// ``` +/// +/// [`Barrier`]: https://doc.rust-lang.org/std/sync/struct.Barrier.html +pub struct WaitGroup { + inner: Arc<Inner>, +} + +/// Inner state of a `WaitGroup`. +struct Inner { + cvar: Condvar, + count: Mutex<usize>, +} + +impl WaitGroup { + /// Creates a new wait group and returns the single reference to it. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// ``` + pub fn new() -> WaitGroup { + WaitGroup { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(1), + }), + } + } + + /// Drops this reference and waits until all other references are dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// + /// thread::spawn({ + /// let wg = wg.clone(); + /// move || { + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// } + /// }); + /// + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// ``` + pub fn wait(self) { + if *self.inner.count.lock().unwrap() == 1 { + return; + } + + let inner = self.inner.clone(); + drop(self); + + let mut count = inner.count.lock().unwrap(); + while *count > 0 { + count = inner.cvar.wait(count).unwrap(); + } + } +} + +impl Drop for WaitGroup { + fn drop(&mut self) { + let mut count = self.inner.count.lock().unwrap(); + *count -= 1; + + if *count == 0 { + self.inner.cvar.notify_all(); + } + } +} + +impl Clone for WaitGroup { + fn clone(&self) -> WaitGroup { + let mut count = self.inner.count.lock().unwrap(); + *count += 1; + + WaitGroup { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for WaitGroup { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let count: &usize = &*self.inner.count.lock().unwrap(); + f.debug_struct("WaitGroup").field("count", count).finish() + } +} diff --git a/third_party/rust/crossbeam-utils-0.7.2/src/thread.rs b/third_party/rust/crossbeam-utils-0.7.2/src/thread.rs new file mode 100644 index 0000000000..a88c0f101c --- /dev/null +++ b/third_party/rust/crossbeam-utils-0.7.2/src/thread.rs @@ -0,0 +1,529 @@ +//! Threads that can borrow variables from the stack. +//! +//! Create a scope when spawned threads need to access variables on the stack: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! thread::scope(|s| { +//! for person in &people { +//! s.spawn(move |_| { +//! println!("Hello, {}!", person); +//! }); +//! } +//! }).unwrap(); +//! ``` +//! +//! # Why scoped threads? +//! +//! Suppose we wanted to re-write the previous example using plain threads: +//! +//! ```ignore +//! use std::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! let mut threads = Vec::new(); +//! +//! for person in &people { +//! threads.push(thread::spawn(move |_| { +//! println!("Hello, {}!", person); +//! })); +//! } +//! +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! This doesn't work because the borrow checker complains about `people` not living long enough: +//! +//! ```text +//! error[E0597]: `people` does not live long enough +//! --> src/main.rs:12:20 +//! | +//! 12 | for person in &people { +//! | ^^^^^^ borrowed value does not live long enough +//! ... +//! 21 | } +//! | - borrowed value only lives until here +//! | +//! = note: borrowed value must be valid for the static lifetime... +//! ``` +//! +//! The problem here is that spawned threads are not allowed to borrow variables on stack because +//! the compiler cannot prove they will be joined before `people` is destroyed. +//! +//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined +//! before the scope ends. +//! +//! # How scoped threads work +//! +//! If a variable is borrowed by a thread, the thread must complete before the variable is +//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the +//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. +//! +//! A scope creates a clear boundary between variables outside the scope and threads inside the +//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. +//! This way we guarantee to the borrow checker that scoped threads only live within the scope and +//! can safely access variables outside it. +//! +//! # Nesting scoped threads +//! +//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little +//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such +//! cannot be borrowed by scoped threads: +//! +//! ```ignore +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! s.spawn(|_| { +//! // Not going to compile because we're trying to borrow `s`, +//! // which lives *inside* the scope! :( +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }); +//! ``` +//! +//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an +//! argument, which can be used for spawning nested threads: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! // Note the `|s|` here. +//! s.spawn(|s| { +//! // Yay, this works because we're using a fresh argument `s`! :) +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }); +//! ``` +//! +//! [`std::thread::spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem; +use std::panic; +use std::sync::{Arc, Mutex}; +use std::thread; + +use sync::WaitGroup; + +type SharedVec<T> = Arc<Mutex<Vec<T>>>; +type SharedOption<T> = Arc<Mutex<Option<T>>>; + +/// Creates a new scope for spawning threads. +/// +/// All child threads that haven't been manually joined will be automatically joined just before +/// this function invocation ends. If all joined threads have successfully completed, `Ok` is +/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is +/// returned containing errors from panicked threads. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// let var = vec![1, 2, 3]; +/// +/// thread::scope(|s| { +/// s.spawn(|_| { +/// println!("A child thread borrowing `var`: {:?}", var); +/// }); +/// }).unwrap(); +/// ``` +pub fn scope<'env, F, R>(f: F) -> thread::Result<R> +where + F: FnOnce(&Scope<'env>) -> R, +{ + let wg = WaitGroup::new(); + let scope = Scope::<'env> { + handles: SharedVec::default(), + wait_group: wg.clone(), + _marker: PhantomData, + }; + + // Execute the scoped function, but catch any panics. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); + + // Wait until all nested scopes are dropped. + drop(scope.wait_group); + wg.wait(); + + // Join all remaining spawned threads. + let panics: Vec<_> = { + let mut handles = scope.handles.lock().unwrap(); + + // Filter handles that haven't been joined, join them, and collect errors. + let panics = handles + .drain(..) + .filter_map(|handle| handle.lock().unwrap().take()) + .filter_map(|handle| handle.join().err()) + .collect(); + + panics + }; + + // If `f` has panicked, resume unwinding. + // If any of the child threads have panicked, return the panic errors. + // Otherwise, everything is OK and return the result of `f`. + match result { + Err(err) => panic::resume_unwind(err), + Ok(res) => { + if panics.is_empty() { + Ok(res) + } else { + Err(Box::new(panics)) + } + } + } +} + +/// A scope for spawning threads. +pub struct Scope<'env> { + /// The list of the thread join handles. + handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, + + /// Used to wait until all subscopes all dropped. + wait_group: WaitGroup, + + /// Borrows data with invariant lifetime `'env`. + _marker: PhantomData<&'env mut &'env ()>, +} + +unsafe impl<'env> Sync for Scope<'env> {} + +impl<'env> Scope<'env> { + /// Spawns a scoped thread. + /// + /// This method is similar to the [`spawn`] function in Rust's standard library. The difference + /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits, + /// allowing it to reference variables outside the scope. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned handle can be used to manually join the thread before the scope exits. + /// + /// [`spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + self.builder().spawn(f).unwrap() + } + + /// Creates a builder that can configure a thread before spawning. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// use std::thread::current; + /// + /// thread::scope(|s| { + /// s.builder() + /// .spawn(|_| println!("A child thread is running")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { + ScopedThreadBuilder { + scope: self, + builder: thread::Builder::new(), + } + } +} + +impl<'env> fmt::Debug for Scope<'env> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Scope { .. }") + } +} + +/// Configures the properties of a new thread. +/// +/// The two configurable properties are: +/// +/// - [`name`]: Specifies an [associated name for the thread][naming-threads]. +/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size]. +/// +/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the +/// thread handle with the given configuration. +/// +/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return +/// value. You may want to use this builder when you want to recover from a failure to launch a +/// thread. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// thread::scope(|s| { +/// s.builder() +/// .spawn(|_| println!("Running a child thread")) +/// .unwrap(); +/// }).unwrap(); +/// ``` +/// +/// [`name`]: struct.ScopedThreadBuilder.html#method.name +/// [`stack_size`]: struct.ScopedThreadBuilder.html#method.stack_size +/// [`spawn`]: struct.ScopedThreadBuilder.html#method.spawn +/// [`Scope::spawn`]: struct.Scope.html#method.spawn +/// [`io::Result`]: https://doc.rust-lang.org/std/io/type.Result.html +/// [naming-threads]: https://doc.rust-lang.org/std/thread/index.html#naming-threads +/// [stack-size]: https://doc.rust-lang.org/std/thread/index.html#stack-size +#[derive(Debug)] +pub struct ScopedThreadBuilder<'scope, 'env: 'scope> { + scope: &'scope Scope<'env>, + builder: thread::Builder, +} + +impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { + /// Sets the name for the new thread. + /// + /// The name must not contain null bytes. For more information about named threads, see + /// [here][naming-threads]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// use std::thread::current; + /// + /// thread::scope(|s| { + /// s.builder() + /// .name("my thread".to_string()) + /// .spawn(|_| assert_eq!(current().name(), Some("my thread"))) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [naming-threads]: https://doc.rust-lang.org/std/thread/index.html#naming-threads + pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.name(name); + self + } + + /// Sets the size of the stack for the new thread. + /// + /// The stack size is measured in bytes. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .stack_size(32 * 1024) + /// .spawn(|_| println!("Running a child thread")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.stack_size(size); + self + } + + /// Spawns a scoped thread with this configuration. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned handle can be used to manually join the thread before the scope exits. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.builder() + /// .spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }) + /// .unwrap(); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + // The result of `f` will be stored here. + let result = SharedOption::default(); + + // Spawn the thread and grab its join handle and thread handle. + let (handle, thread) = { + let result = Arc::clone(&result); + + // A clone of the scope that will be moved into the new thread. + let scope = Scope::<'env> { + handles: Arc::clone(&self.scope.handles), + wait_group: self.scope.wait_group.clone(), + _marker: PhantomData, + }; + + // Spawn the thread. + let handle = { + let closure = move || { + // Make sure the scope is inside the closure with the proper `'env` lifetime. + let scope: Scope<'env> = scope; + + // Run the closure. + let res = f(&scope); + + // Store the result if the closure didn't panic. + *result.lock().unwrap() = Some(res); + }; + + // Change the type of `closure` from `FnOnce() -> T` to `FnMut() -> T`. + let mut closure = Some(closure); + let closure = move || closure.take().unwrap()(); + + // Allocate `clsoure` on the heap and erase the `'env` bound. + let closure: Box<dyn FnMut() + Send + 'env> = Box::new(closure); + let closure: Box<dyn FnMut() + Send + 'static> = unsafe { mem::transmute(closure) }; + + // Finally, spawn the closure. + let mut closure = closure; + self.builder.spawn(move || closure())? + }; + + let thread = handle.thread().clone(); + let handle = Arc::new(Mutex::new(Some(handle))); + (handle, thread) + }; + + // Add the handle to the shared list of join handles. + self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); + + Ok(ScopedJoinHandle { + handle, + result, + thread, + _marker: PhantomData, + }) + } +} + +unsafe impl<'scope, T> Send for ScopedJoinHandle<'scope, T> {} +unsafe impl<'scope, T> Sync for ScopedJoinHandle<'scope, T> {} + +/// A handle that can be used to join its scoped thread. +pub struct ScopedJoinHandle<'scope, T> { + /// A join handle to the spawned thread. + handle: SharedOption<thread::JoinHandle<()>>, + + /// Holds the result of the inner closure. + result: SharedOption<T>, + + /// A handle to the the spawned thread. + thread: thread::Thread, + + /// Borrows the parent scope with lifetime `'scope`. + _marker: PhantomData<&'scope ()>, +} + +impl<'scope, T> ScopedJoinHandle<'scope, T> { + /// Waits for the thread to finish and returns its result. + /// + /// If the child thread panics, an error is returned. + /// + /// # Panics + /// + /// This function may panic on some platforms if a thread attempts to join itself or otherwise + /// may create a deadlock with joining threads. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)")); + /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :(")); + /// + /// // Join the first thread and verify that it succeeded. + /// let res = handle1.join(); + /// assert!(res.is_ok()); + /// + /// // Join the second thread and verify that it panicked. + /// let res = handle2.join(); + /// assert!(res.is_err()); + /// }).unwrap(); + /// ``` + pub fn join(self) -> thread::Result<T> { + // Take out the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap().take().unwrap(); + + // Join the thread and then take the result out of its inner closure. + handle + .join() + .map(|()| self.result.lock().unwrap().take().unwrap()) + } + + /// Returns a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| println!("A child thread is running")); + /// println!("The child thread ID: {:?}", handle.thread().id()); + /// }).unwrap(); + /// ``` + pub fn thread(&self) -> &thread::Thread { + &self.thread + } +} + +impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("ScopedJoinHandle { .. }") + } +} |