diff options
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/sync')
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/mod.rs | 17 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/once_lock.rs | 103 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/parker.rs | 413 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs | 634 | ||||
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/wait_group.rs | 145 |
5 files changed, 1312 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/sync/mod.rs b/third_party/rust/crossbeam-utils/src/sync/mod.rs new file mode 100644 index 0000000000..f9eec71fb3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/mod.rs @@ -0,0 +1,17 @@ +//! Thread synchronization primitives. +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. + +#[cfg(not(crossbeam_loom))] +mod once_lock; +mod parker; +#[cfg(not(crossbeam_loom))] +mod sharded_lock; +mod wait_group; + +pub use self::parker::{Parker, Unparker}; +#[cfg(not(crossbeam_loom))] +pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; +pub use self::wait_group::WaitGroup; diff --git a/third_party/rust/crossbeam-utils/src/sync/once_lock.rs b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs new file mode 100644 index 0000000000..c1fefc96cc --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs @@ -0,0 +1,103 @@ +// Based on unstable std::sync::OnceLock. +// +// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs + +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; + +pub(crate) struct OnceLock<T> { + once: Once, + // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized. + is_initialized: AtomicBool, + value: UnsafeCell<MaybeUninit<T>>, + // Unlike std::sync::OnceLock, we don't need PhantomData here because + // we don't use #[may_dangle]. +} + +unsafe impl<T: Sync + Send> Sync for OnceLock<T> {} +unsafe impl<T: Send> Send for OnceLock<T> {} + +impl<T> OnceLock<T> { + /// Creates a new empty cell. + #[must_use] + pub(crate) const fn new() -> Self { + Self { + once: Once::new(), + is_initialized: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Gets the contents of the cell, initializing it with `f` if the cell + /// was empty. + /// + /// Many threads may call `get_or_init` concurrently with different + /// initializing functions, but it is guaranteed that only one function + /// will be executed. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and the cell + /// remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. The + /// exact outcome is unspecified. Current implementation deadlocks, but + /// this may be changed to a panic in the future. + pub(crate) fn get_or_init<F>(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Fast path check + if self.is_initialized() { + // SAFETY: The inner value has been initialized + return unsafe { self.get_unchecked() }; + } + self.initialize(f); + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + unsafe { self.get_unchecked() } + } + + #[inline] + fn is_initialized(&self) -> bool { + self.is_initialized.load(Ordering::Acquire) + } + + #[cold] + fn initialize<F>(&self, f: F) + where + F: FnOnce() -> T, + { + let slot = self.value.get().cast::<T>(); + let is_initialized = &self.is_initialized; + + self.once.call_once(|| { + let value = f(); + unsafe { + slot.write(value); + } + is_initialized.store(true, Ordering::Release); + }); + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + &*self.value.get().cast::<T>() + } +} + +impl<T> Drop for OnceLock<T> { + fn drop(&mut self) { + if self.is_initialized() { + // SAFETY: The inner value has been initialized + unsafe { self.value.get().cast::<T>().drop_in_place() }; + } + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs new file mode 100644 index 0000000000..e791c44852 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs @@ -0,0 +1,413 @@ +use crate::primitive::sync::atomic::AtomicUsize; +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use core::sync::atomic::Ordering::SeqCst; +use std::fmt; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +/// A thread parking primitive. +/// +/// Conceptually, each `Parker` has an associated token which is initially not present: +/// +/// * The [`park`] method blocks the current thread unless or until the token is available, at +/// which point it automatically consumes the token. +/// +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. +/// +/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the +/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call +/// returning immediately. +/// +/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using +/// [`park`] and [`unpark`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_utils::sync::Parker; +/// +/// let p = Parker::new(); +/// let u = p.unparker().clone(); +/// +/// // Make the token available. +/// u.unpark(); +/// // Wakes up immediately and consumes the token. +/// p.park(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// u.unpark(); +/// }); +/// +/// // Wakes up when `u.unpark()` provides the token. +/// p.park(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`park`]: Parker::park +/// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline +/// [`unpark`]: Unparker::unpark +pub struct Parker { + unparker: Unparker, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for Parker {} + +impl Default for Parker { + fn default() -> Self { + Self { + unparker: Unparker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + }, + _marker: PhantomData, + } + } +} + +impl Parker { + /// Creates a new `Parker`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// ``` + /// + pub fn new() -> Parker { + Self::default() + } + + /// Blocks the current thread until the token is made available. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + pub fn park(&self) { + self.unparker.inner.park(None); + } + + /// Blocks the current thread until the token is made available, but only for a limited time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_timeout(Duration::from_millis(500)); + /// ``` + pub fn park_timeout(&self, timeout: Duration) { + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) + } + + /// Returns a reference to an associated [`Unparker`]. + /// + /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unparker(&self) -> &Unparker { + &self.unparker + } + + /// Converts a `Parker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// # let _ = unsafe { Parker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Parker) -> *const () { + Unparker::into_raw(this.unparker) + } + + /// Converts a raw pointer into a `Parker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// let p = unsafe { Parker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Parker { + Parker { + unparker: Unparker::from_raw(ptr), + _marker: PhantomData, + } + } +} + +impl fmt::Debug for Parker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Parker { .. }") + } +} + +/// Unparks a thread parked by the associated [`Parker`]. +pub struct Unparker { + inner: Arc<Inner>, +} + +unsafe impl Send for Unparker {} +unsafe impl Sync for Unparker {} + +impl Unparker { + /// Atomically makes the token available if it is not already. + /// + /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is + /// any. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// u.unpark(); + /// }); + /// + /// // Wakes up when `u.unpark()` provides the token. + /// p.park(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unpark(&self) { + self.inner.unpark() + } + + /// Converts an `Unparker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// let raw = Unparker::into_raw(u); + /// # let _ = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Unparker) -> *const () { + Arc::into_raw(this.inner).cast::<()>() + } + + /// Converts a raw pointer into an `Unparker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// let raw = Unparker::into_raw(u); + /// let u = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Unparker { + Unparker { + inner: Arc::from_raw(ptr.cast::<Inner>()), + } + } +} + +impl fmt::Debug for Unparker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Unparker { .. }") + } +} + +impl Clone for Unparker { + fn clone(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Inner { + fn park(&self, deadline: Option<Instant>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(deadline) = deadline { + if deadline <= Instant::now() { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; + } + } + }; + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past + } + } + + pub(crate) fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before this call, we must + // perform a release operation that `park` can synchronize with. To do that we must write + // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather + // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to `PARKED` (or last + // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. + // If we were to notify during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this + // stage so we can acquire `lock` to wait until it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes + // it doesn't get woken only to have to wait for us to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one(); + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs new file mode 100644 index 0000000000..b43c55ea42 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs @@ -0,0 +1,634 @@ +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; +use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::thread::{self, ThreadId}; + +use crate::sync::once_lock::OnceLock; +use crate::CachePadded; + +/// The number of shards per sharded lock. Must be a power of two. +const NUM_SHARDS: usize = 8; + +/// A shard containing a single reader-writer lock. +struct Shard { + /// The inner reader-writer lock. + lock: RwLock<()>, + + /// The write-guard keeping this shard locked. + /// + /// Write operations will lock each shard and store the guard here. These guards get dropped at + /// the same time the big guard is dropped. + write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>, +} + +/// A sharded reader-writer lock. +/// +/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations +/// are slower. +/// +/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a +/// single cache line. Read operations will pick one of the shards depending on the current thread +/// and lock it. Write operations need to lock all shards in succession. +/// +/// By splitting the lock into shards, concurrent read operations will in most cases choose +/// different shards and thus update different cache lines, which is good for scalability. However, +/// write operations need to do more work and are therefore slower than usual. +/// +/// The priority policy of the lock is dependent on the underlying operating system's +/// implementation, and this type does not guarantee that any particular policy will be used. +/// +/// # Poisoning +/// +/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be +/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any +/// read operation, the lock will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::ShardedLock; +/// +/// let lock = ShardedLock::new(5); +/// +/// // Any number of read locks can be held at once. +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // Read locks are dropped at this point. +/// +/// // However, only one write lock may be held. +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // Write lock is dropped here. +/// ``` +/// +/// [`RwLock`]: std::sync::RwLock +pub struct ShardedLock<T: ?Sized> { + /// A list of locks protecting the internal data. + shards: Box<[CachePadded<Shard>]>, + + /// The internal data. + value: UnsafeCell<T>, +} + +unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {} +unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {} + +impl<T: ?Sized> UnwindSafe for ShardedLock<T> {} +impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {} + +impl<T> ShardedLock<T> { + /// Creates a new sharded reader-writer lock. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(5); + /// ``` + pub fn new(value: T) -> ShardedLock<T> { + ShardedLock { + shards: (0..NUM_SHARDS) + .map(|_| { + CachePadded::new(Shard { + lock: RwLock::new(()), + write_guard: UnsafeCell::new(None), + }) + }) + .collect::<Box<[_]>>(), + value: UnsafeCell::new(value), + } + } + + /// Consumes this lock, returning the underlying data. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + pub fn into_inner(self) -> LockResult<T> { + let is_poisoned = self.is_poisoned(); + let inner = self.value.into_inner(); + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } +} + +impl<T: ?Sized> ShardedLock<T> { + /// Returns `true` if the lock is poisoned. + /// + /// If another thread can still access the lock, it may become poisoned at any time. A `false` + /// result should not be trusted without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + pub fn is_poisoned(&self) -> bool { + self.shards[0].lock.is_poisoned() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the lock mutably, no actual locking needs to take place. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let mut lock = ShardedLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let is_poisoned = self.is_poisoned(); + let inner = unsafe { &mut *self.value.get() }; + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } + + /// Attempts to acquire this lock with shared read access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the shared access when it is dropped. This method does not + /// provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.try_read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(TryLockError::Poisoned(err)) => { + let guard = ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } + Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock), + } + } + + /// Locks with shared read access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the shared access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(err) => Err(PoisonError::new(ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + })), + } + } + + /// Attempts to acquire this lock with exclusive write access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the exclusive access when it is dropped. This method does + /// not provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + let mut blocked = None; + + // Write-lock each shard in succession. + for (i, shard) in self.shards.iter().enumerate() { + let guard = match shard.lock.try_write() { + Ok(guard) => guard, + Err(TryLockError::Poisoned(err)) => { + poisoned = true; + err.into_inner() + } + Err(TryLockError::WouldBlock) => { + blocked = Some(i); + break; + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if let Some(i) = blocked { + // Unlock the shards in reverse order of locking. + for shard in self.shards[0..i].iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + Err(TryLockError::WouldBlock) + } else if poisoned { + let guard = ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } + + /// Locks with exclusive write access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the exclusive access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + + // Write-lock each shard in succession. + for shard in self.shards.iter() { + let guard = match shard.lock.write() { + Ok(guard) => guard, + Err(err) => { + poisoned = true; + err.into_inner() + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'_, ()> = guard; + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if poisoned { + Err(PoisonError::new(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + })) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_read() { + Ok(guard) => f + .debug_struct("ShardedLock") + .field("data", &&*guard) + .finish(), + Err(TryLockError::Poisoned(err)) => f + .debug_struct("ShardedLock") + .field("data", &&**err.get_ref()) + .finish(), + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<locked>") + } + } + f.debug_struct("ShardedLock") + .field("data", &LockedPlaceholder) + .finish() + } + } + } +} + +impl<T: Default> Default for ShardedLock<T> { + fn default() -> ShardedLock<T> { + ShardedLock::new(Default::default()) + } +} + +impl<T> From<T> for ShardedLock<T> { + fn from(t: T) -> Self { + ShardedLock::new(t) + } +} + +/// A guard used to release the shared read access of a [`ShardedLock`] when dropped. +pub struct ShardedLockReadGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _guard: RwLockReadGuard<'a, ()>, + _marker: PhantomData<RwLockReadGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {} + +impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockReadGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped. +pub struct ShardedLockWriteGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _marker: PhantomData<RwLockWriteGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {} + +impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.lock.shards.iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockWriteGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. While there are no particular guarantees, indices +/// usually tend to be consecutive numbers between 0 and the number of running threads. +/// +/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is +/// tearing down. +#[inline] +fn current_index() -> Option<usize> { + REGISTRATION.try_with(|reg| reg.index).ok() +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap<ThreadId, usize>, + + /// A list of free indices. + free_list: Vec<usize>, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +fn thread_indices() -> &'static Mutex<ThreadIndices> { + static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new(); + fn init() -> Mutex<ThreadIndices> { + Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }) + } + THREAD_INDICES.get_or_init(init) +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = thread_indices().lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = thread_indices().lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +} diff --git a/third_party/rust/crossbeam-utils/src/sync/wait_group.rs b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs new file mode 100644 index 0000000000..19d6074157 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs @@ -0,0 +1,145 @@ +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use std::fmt; + +/// Enables threads to synchronize the beginning or end of some computation. +/// +/// # Wait groups vs barriers +/// +/// `WaitGroup` is very similar to [`Barrier`], but there are a few differences: +/// +/// * [`Barrier`] needs to know the number of threads at construction, while `WaitGroup` is cloned to +/// register more threads. +/// +/// * A [`Barrier`] can be reused even after all threads have synchronized, while a `WaitGroup` +/// synchronizes threads only once. +/// +/// * All threads wait for others to reach the [`Barrier`]. With `WaitGroup`, each thread can choose +/// to either wait for other threads or to continue without blocking. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::WaitGroup; +/// use std::thread; +/// +/// // Create a new wait group. +/// let wg = WaitGroup::new(); +/// +/// for _ in 0..4 { +/// // Create another reference to the wait group. +/// let wg = wg.clone(); +/// +/// thread::spawn(move || { +/// // Do some work. +/// +/// // Drop the reference to the wait group. +/// drop(wg); +/// }); +/// } +/// +/// // Block until all threads have finished their work. +/// wg.wait(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`Barrier`]: std::sync::Barrier +pub struct WaitGroup { + inner: Arc<Inner>, +} + +/// Inner state of a `WaitGroup`. +struct Inner { + cvar: Condvar, + count: Mutex<usize>, +} + +impl Default for WaitGroup { + fn default() -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(1), + }), + } + } +} + +impl WaitGroup { + /// Creates a new wait group and returns the single reference to it. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// ``` + pub fn new() -> Self { + Self::default() + } + + /// Drops this reference and waits until all other references are dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// + /// thread::spawn({ + /// let wg = wg.clone(); + /// move || { + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// } + /// }); + /// + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + pub fn wait(self) { + if *self.inner.count.lock().unwrap() == 1 { + return; + } + + let inner = self.inner.clone(); + drop(self); + + let mut count = inner.count.lock().unwrap(); + while *count > 0 { + count = inner.cvar.wait(count).unwrap(); + } + } +} + +impl Drop for WaitGroup { + fn drop(&mut self) { + let mut count = self.inner.count.lock().unwrap(); + *count -= 1; + + if *count == 0 { + self.inner.cvar.notify_all(); + } + } +} + +impl Clone for WaitGroup { + fn clone(&self) -> WaitGroup { + let mut count = self.inner.count.lock().unwrap(); + *count += 1; + + WaitGroup { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for WaitGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let count: &usize = &*self.inner.count.lock().unwrap(); + f.debug_struct("WaitGroup").field("count", count).finish() + } +} |