summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-utils/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/sync')
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/mod.rs17
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/once_lock.rs103
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/parker.rs413
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs634
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/wait_group.rs145
5 files changed, 1312 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/sync/mod.rs b/third_party/rust/crossbeam-utils/src/sync/mod.rs
new file mode 100644
index 0000000000..f9eec71fb3
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/mod.rs
@@ -0,0 +1,17 @@
+//! Thread synchronization primitives.
+//!
+//! * [`Parker`], a thread parking primitive.
+//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads.
+//! * [`WaitGroup`], for synchronizing the beginning or end of some computation.
+
+#[cfg(not(crossbeam_loom))]
+mod once_lock;
+mod parker;
+#[cfg(not(crossbeam_loom))]
+mod sharded_lock;
+mod wait_group;
+
+pub use self::parker::{Parker, Unparker};
+#[cfg(not(crossbeam_loom))]
+pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
+pub use self::wait_group::WaitGroup;
diff --git a/third_party/rust/crossbeam-utils/src/sync/once_lock.rs b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs
new file mode 100644
index 0000000000..c1fefc96cc
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs
@@ -0,0 +1,103 @@
+// Based on unstable std::sync::OnceLock.
+//
+// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs
+
+use core::cell::UnsafeCell;
+use core::mem::MaybeUninit;
+use core::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Once;
+
+pub(crate) struct OnceLock<T> {
+ once: Once,
+ // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized.
+ is_initialized: AtomicBool,
+ value: UnsafeCell<MaybeUninit<T>>,
+ // Unlike std::sync::OnceLock, we don't need PhantomData here because
+ // we don't use #[may_dangle].
+}
+
+unsafe impl<T: Sync + Send> Sync for OnceLock<T> {}
+unsafe impl<T: Send> Send for OnceLock<T> {}
+
+impl<T> OnceLock<T> {
+ /// Creates a new empty cell.
+ #[must_use]
+ pub(crate) const fn new() -> Self {
+ Self {
+ once: Once::new(),
+ is_initialized: AtomicBool::new(false),
+ value: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ }
+
+ /// Gets the contents of the cell, initializing it with `f` if the cell
+ /// was empty.
+ ///
+ /// Many threads may call `get_or_init` concurrently with different
+ /// initializing functions, but it is guaranteed that only one function
+ /// will be executed.
+ ///
+ /// # Panics
+ ///
+ /// If `f` panics, the panic is propagated to the caller, and the cell
+ /// remains uninitialized.
+ ///
+ /// It is an error to reentrantly initialize the cell from `f`. The
+ /// exact outcome is unspecified. Current implementation deadlocks, but
+ /// this may be changed to a panic in the future.
+ pub(crate) fn get_or_init<F>(&self, f: F) -> &T
+ where
+ F: FnOnce() -> T,
+ {
+ // Fast path check
+ if self.is_initialized() {
+ // SAFETY: The inner value has been initialized
+ return unsafe { self.get_unchecked() };
+ }
+ self.initialize(f);
+
+ debug_assert!(self.is_initialized());
+
+ // SAFETY: The inner value has been initialized
+ unsafe { self.get_unchecked() }
+ }
+
+ #[inline]
+ fn is_initialized(&self) -> bool {
+ self.is_initialized.load(Ordering::Acquire)
+ }
+
+ #[cold]
+ fn initialize<F>(&self, f: F)
+ where
+ F: FnOnce() -> T,
+ {
+ let slot = self.value.get().cast::<T>();
+ let is_initialized = &self.is_initialized;
+
+ self.once.call_once(|| {
+ let value = f();
+ unsafe {
+ slot.write(value);
+ }
+ is_initialized.store(true, Ordering::Release);
+ });
+ }
+
+ /// # Safety
+ ///
+ /// The value must be initialized
+ unsafe fn get_unchecked(&self) -> &T {
+ debug_assert!(self.is_initialized());
+ &*self.value.get().cast::<T>()
+ }
+}
+
+impl<T> Drop for OnceLock<T> {
+ fn drop(&mut self) {
+ if self.is_initialized() {
+ // SAFETY: The inner value has been initialized
+ unsafe { self.value.get().cast::<T>().drop_in_place() };
+ }
+ }
+}
diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs
new file mode 100644
index 0000000000..e791c44852
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs
@@ -0,0 +1,413 @@
+use crate::primitive::sync::atomic::AtomicUsize;
+use crate::primitive::sync::{Arc, Condvar, Mutex};
+use core::sync::atomic::Ordering::SeqCst;
+use std::fmt;
+use std::marker::PhantomData;
+use std::time::{Duration, Instant};
+
+/// A thread parking primitive.
+///
+/// Conceptually, each `Parker` has an associated token which is initially not present:
+///
+/// * The [`park`] method blocks the current thread unless or until the token is available, at
+/// which point it automatically consumes the token.
+///
+/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
+/// a specified maximum time.
+///
+/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
+/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
+/// returning immediately.
+///
+/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
+/// [`park`] and [`unpark`].
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_utils::sync::Parker;
+///
+/// let p = Parker::new();
+/// let u = p.unparker().clone();
+///
+/// // Make the token available.
+/// u.unpark();
+/// // Wakes up immediately and consumes the token.
+/// p.park();
+///
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_millis(500));
+/// u.unpark();
+/// });
+///
+/// // Wakes up when `u.unpark()` provides the token.
+/// p.park();
+/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+/// ```
+///
+/// [`park`]: Parker::park
+/// [`park_timeout`]: Parker::park_timeout
+/// [`park_deadline`]: Parker::park_deadline
+/// [`unpark`]: Unparker::unpark
+pub struct Parker {
+ unparker: Unparker,
+ _marker: PhantomData<*const ()>,
+}
+
+unsafe impl Send for Parker {}
+
+impl Default for Parker {
+ fn default() -> Self {
+ Self {
+ unparker: Unparker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ }),
+ },
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl Parker {
+ /// Creates a new `Parker`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// ```
+ ///
+ pub fn new() -> Parker {
+ Self::default()
+ }
+
+ /// Blocks the current thread until the token is made available.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ ///
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ pub fn park(&self) {
+ self.unparker.inner.park(None);
+ }
+
+ /// Blocks the current thread until the token is made available, but only for a limited time.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_timeout(Duration::from_millis(500));
+ /// ```
+ pub fn park_timeout(&self, timeout: Duration) {
+ self.park_deadline(Instant::now() + timeout)
+ }
+
+ /// Blocks the current thread until the token is made available, or until a certain deadline.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_deadline(deadline);
+ /// ```
+ pub fn park_deadline(&self, deadline: Instant) {
+ self.unparker.inner.park(Some(deadline))
+ }
+
+ /// Returns a reference to an associated [`Unparker`].
+ ///
+ /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ ///
+ /// [`park`]: Parker::park
+ /// [`park_timeout`]: Parker::park_timeout
+ pub fn unparker(&self) -> &Unparker {
+ &self.unparker
+ }
+
+ /// Converts a `Parker` into a raw pointer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let raw = Parker::into_raw(p);
+ /// # let _ = unsafe { Parker::from_raw(raw) };
+ /// ```
+ pub fn into_raw(this: Parker) -> *const () {
+ Unparker::into_raw(this.unparker)
+ }
+
+ /// Converts a raw pointer into a `Parker`.
+ ///
+ /// # Safety
+ ///
+ /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let raw = Parker::into_raw(p);
+ /// let p = unsafe { Parker::from_raw(raw) };
+ /// ```
+ pub unsafe fn from_raw(ptr: *const ()) -> Parker {
+ Parker {
+ unparker: Unparker::from_raw(ptr),
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl fmt::Debug for Parker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Parker { .. }")
+ }
+}
+
+/// Unparks a thread parked by the associated [`Parker`].
+pub struct Unparker {
+ inner: Arc<Inner>,
+}
+
+unsafe impl Send for Unparker {}
+unsafe impl Sync for Unparker {}
+
+impl Unparker {
+ /// Atomically makes the token available if it is not already.
+ ///
+ /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
+ /// any.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(500));
+ /// u.unpark();
+ /// });
+ ///
+ /// // Wakes up when `u.unpark()` provides the token.
+ /// p.park();
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ /// ```
+ ///
+ /// [`park`]: Parker::park
+ /// [`park_timeout`]: Parker::park_timeout
+ pub fn unpark(&self) {
+ self.inner.unpark()
+ }
+
+ /// Converts an `Unparker` into a raw pointer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::{Parker, Unparker};
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ /// let raw = Unparker::into_raw(u);
+ /// # let _ = unsafe { Unparker::from_raw(raw) };
+ /// ```
+ pub fn into_raw(this: Unparker) -> *const () {
+ Arc::into_raw(this.inner).cast::<()>()
+ }
+
+ /// Converts a raw pointer into an `Unparker`.
+ ///
+ /// # Safety
+ ///
+ /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::{Parker, Unparker};
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// let raw = Unparker::into_raw(u);
+ /// let u = unsafe { Unparker::from_raw(raw) };
+ /// ```
+ pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
+ Unparker {
+ inner: Arc::from_raw(ptr.cast::<Inner>()),
+ }
+ }
+}
+
+impl fmt::Debug for Unparker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Unparker { .. }")
+ }
+}
+
+impl Clone for Unparker {
+ fn clone(&self) -> Unparker {
+ Unparker {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+struct Inner {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl Inner {
+ fn park(&self, deadline: Option<Instant>) {
+ // If we were previously notified then we consume this notification and return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
+ }
+
+ // If the timeout is zero, then there is no need to actually block.
+ if let Some(deadline) = deadline {
+ if deadline <= Instant::now() {
+ return;
+ }
+ }
+
+ // Otherwise we need to coordinate going to sleep.
+ let mut m = self.lock.lock().unwrap();
+
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ // Consume this notification to avoid spurious wakeups in the next park.
+ Err(NOTIFIED) => {
+ // We must read `state` here, even though we know it will be `NOTIFIED`. This is
+ // because `unpark` may have been called again since we read `NOTIFIED` in the
+ // `compare_exchange` above. We must perform an acquire operation that synchronizes
+ // with that `unpark` to observe any writes it made before the call to `unpark`. To
+ // do that we must read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ }
+ Err(n) => panic!("inconsistent park_timeout state: {}", n),
+ }
+
+ loop {
+ // Block the current thread on the conditional variable.
+ m = match deadline {
+ None => self.cvar.wait(m).unwrap(),
+ Some(deadline) => {
+ let now = Instant::now();
+ if now < deadline {
+ // We could check for a timeout here, in the return value of wait_timeout,
+ // but in the case that a timeout and an unpark arrive simultaneously, we
+ // prefer to report the former.
+ self.cvar.wait_timeout(m, deadline - now).unwrap().0
+ } else {
+ // We've timed out; swap out the state back to empty on our way out
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED | PARKED => return,
+ n => panic!("inconsistent park_timeout state: {}", n),
+ };
+ }
+ }
+ };
+
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
+ }
+
+ // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
+ // in the branch above, when we discover the deadline is in the past
+ }
+ }
+
+ pub(crate) fn unpark(&self) {
+ // To ensure the unparked thread will observe any writes we made before this call, we must
+ // perform a release operation that `park` can synchronize with. To do that we must write
+ // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
+ // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
+ }
+
+ // There is a period between when the parked thread sets `state` to `PARKED` (or last
+ // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
+ // If we were to notify during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
+ // stage so we can acquire `lock` to wait until it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
+ // it doesn't get woken only to have to wait for us to release `lock`.
+ drop(self.lock.lock().unwrap());
+ self.cvar.notify_one();
+ }
+}
diff --git a/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs
new file mode 100644
index 0000000000..b43c55ea42
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs
@@ -0,0 +1,634 @@
+use std::cell::UnsafeCell;
+use std::collections::HashMap;
+use std::fmt;
+use std::marker::PhantomData;
+use std::mem;
+use std::ops::{Deref, DerefMut};
+use std::panic::{RefUnwindSafe, UnwindSafe};
+use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
+use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
+use std::thread::{self, ThreadId};
+
+use crate::sync::once_lock::OnceLock;
+use crate::CachePadded;
+
+/// The number of shards per sharded lock. Must be a power of two.
+const NUM_SHARDS: usize = 8;
+
+/// A shard containing a single reader-writer lock.
+struct Shard {
+ /// The inner reader-writer lock.
+ lock: RwLock<()>,
+
+ /// The write-guard keeping this shard locked.
+ ///
+ /// Write operations will lock each shard and store the guard here. These guards get dropped at
+ /// the same time the big guard is dropped.
+ write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
+}
+
+/// A sharded reader-writer lock.
+///
+/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
+/// are slower.
+///
+/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
+/// single cache line. Read operations will pick one of the shards depending on the current thread
+/// and lock it. Write operations need to lock all shards in succession.
+///
+/// By splitting the lock into shards, concurrent read operations will in most cases choose
+/// different shards and thus update different cache lines, which is good for scalability. However,
+/// write operations need to do more work and are therefore slower than usual.
+///
+/// The priority policy of the lock is dependent on the underlying operating system's
+/// implementation, and this type does not guarantee that any particular policy will be used.
+///
+/// # Poisoning
+///
+/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
+/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
+/// read operation, the lock will not be poisoned.
+///
+/// # Examples
+///
+/// ```
+/// use crossbeam_utils::sync::ShardedLock;
+///
+/// let lock = ShardedLock::new(5);
+///
+/// // Any number of read locks can be held at once.
+/// {
+/// let r1 = lock.read().unwrap();
+/// let r2 = lock.read().unwrap();
+/// assert_eq!(*r1, 5);
+/// assert_eq!(*r2, 5);
+/// } // Read locks are dropped at this point.
+///
+/// // However, only one write lock may be held.
+/// {
+/// let mut w = lock.write().unwrap();
+/// *w += 1;
+/// assert_eq!(*w, 6);
+/// } // Write lock is dropped here.
+/// ```
+///
+/// [`RwLock`]: std::sync::RwLock
+pub struct ShardedLock<T: ?Sized> {
+ /// A list of locks protecting the internal data.
+ shards: Box<[CachePadded<Shard>]>,
+
+ /// The internal data.
+ value: UnsafeCell<T>,
+}
+
+unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
+unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
+
+impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
+impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
+
+impl<T> ShardedLock<T> {
+ /// Creates a new sharded reader-writer lock.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let lock = ShardedLock::new(5);
+ /// ```
+ pub fn new(value: T) -> ShardedLock<T> {
+ ShardedLock {
+ shards: (0..NUM_SHARDS)
+ .map(|_| {
+ CachePadded::new(Shard {
+ lock: RwLock::new(()),
+ write_guard: UnsafeCell::new(None),
+ })
+ })
+ .collect::<Box<[_]>>(),
+ value: UnsafeCell::new(value),
+ }
+ }
+
+ /// Consumes this lock, returning the underlying data.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let lock = ShardedLock::new(String::new());
+ /// {
+ /// let mut s = lock.write().unwrap();
+ /// *s = "modified".to_owned();
+ /// }
+ /// assert_eq!(lock.into_inner().unwrap(), "modified");
+ /// ```
+ pub fn into_inner(self) -> LockResult<T> {
+ let is_poisoned = self.is_poisoned();
+ let inner = self.value.into_inner();
+
+ if is_poisoned {
+ Err(PoisonError::new(inner))
+ } else {
+ Ok(inner)
+ }
+ }
+}
+
+impl<T: ?Sized> ShardedLock<T> {
+ /// Returns `true` if the lock is poisoned.
+ ///
+ /// If another thread can still access the lock, it may become poisoned at any time. A `false`
+ /// result should not be trusted without additional synchronization.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ /// use std::sync::Arc;
+ /// use std::thread;
+ ///
+ /// let lock = Arc::new(ShardedLock::new(0));
+ /// let c_lock = lock.clone();
+ ///
+ /// let _ = thread::spawn(move || {
+ /// let _lock = c_lock.write().unwrap();
+ /// panic!(); // the lock gets poisoned
+ /// }).join();
+ /// assert_eq!(lock.is_poisoned(), true);
+ /// ```
+ pub fn is_poisoned(&self) -> bool {
+ self.shards[0].lock.is_poisoned()
+ }
+
+ /// Returns a mutable reference to the underlying data.
+ ///
+ /// Since this call borrows the lock mutably, no actual locking needs to take place.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let mut lock = ShardedLock::new(0);
+ /// *lock.get_mut().unwrap() = 10;
+ /// assert_eq!(*lock.read().unwrap(), 10);
+ /// ```
+ pub fn get_mut(&mut self) -> LockResult<&mut T> {
+ let is_poisoned = self.is_poisoned();
+ let inner = unsafe { &mut *self.value.get() };
+
+ if is_poisoned {
+ Err(PoisonError::new(inner))
+ } else {
+ Ok(inner)
+ }
+ }
+
+ /// Attempts to acquire this lock with shared read access.
+ ///
+ /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
+ /// is returned which will release the shared access when it is dropped. This method does not
+ /// provide any guarantees with respect to the ordering of whether contentious readers or
+ /// writers will acquire the lock first.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let lock = ShardedLock::new(1);
+ ///
+ /// match lock.try_read() {
+ /// Ok(n) => assert_eq!(*n, 1),
+ /// Err(_) => unreachable!(),
+ /// };
+ /// ```
+ pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
+ // Take the current thread index and map it to a shard index. Thread indices will tend to
+ // distribute shards among threads equally, thus reducing contention due to read-locking.
+ let current_index = current_index().unwrap_or(0);
+ let shard_index = current_index & (self.shards.len() - 1);
+
+ match self.shards[shard_index].lock.try_read() {
+ Ok(guard) => Ok(ShardedLockReadGuard {
+ lock: self,
+ _guard: guard,
+ _marker: PhantomData,
+ }),
+ Err(TryLockError::Poisoned(err)) => {
+ let guard = ShardedLockReadGuard {
+ lock: self,
+ _guard: err.into_inner(),
+ _marker: PhantomData,
+ };
+ Err(TryLockError::Poisoned(PoisonError::new(guard)))
+ }
+ Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
+ }
+ }
+
+ /// Locks with shared read access, blocking the current thread until it can be acquired.
+ ///
+ /// The calling thread will be blocked until there are no more writers which hold the lock.
+ /// There may be other readers currently inside the lock when this method returns. This method
+ /// does not provide any guarantees with respect to the ordering of whether contentious readers
+ /// or writers will acquire the lock first.
+ ///
+ /// Returns a guard which will release the shared access when dropped.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Panics
+ ///
+ /// This method might panic when called if the lock is already held by the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ /// use std::sync::Arc;
+ /// use std::thread;
+ ///
+ /// let lock = Arc::new(ShardedLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let n = lock.read().unwrap();
+ /// assert_eq!(*n, 1);
+ ///
+ /// thread::spawn(move || {
+ /// let r = c_lock.read();
+ /// assert!(r.is_ok());
+ /// }).join().unwrap();
+ /// ```
+ pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
+ // Take the current thread index and map it to a shard index. Thread indices will tend to
+ // distribute shards among threads equally, thus reducing contention due to read-locking.
+ let current_index = current_index().unwrap_or(0);
+ let shard_index = current_index & (self.shards.len() - 1);
+
+ match self.shards[shard_index].lock.read() {
+ Ok(guard) => Ok(ShardedLockReadGuard {
+ lock: self,
+ _guard: guard,
+ _marker: PhantomData,
+ }),
+ Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
+ lock: self,
+ _guard: err.into_inner(),
+ _marker: PhantomData,
+ })),
+ }
+ }
+
+ /// Attempts to acquire this lock with exclusive write access.
+ ///
+ /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
+ /// is returned which will release the exclusive access when it is dropped. This method does
+ /// not provide any guarantees with respect to the ordering of whether contentious readers or
+ /// writers will acquire the lock first.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let lock = ShardedLock::new(1);
+ ///
+ /// let n = lock.read().unwrap();
+ /// assert_eq!(*n, 1);
+ ///
+ /// assert!(lock.try_write().is_err());
+ /// ```
+ pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
+ let mut poisoned = false;
+ let mut blocked = None;
+
+ // Write-lock each shard in succession.
+ for (i, shard) in self.shards.iter().enumerate() {
+ let guard = match shard.lock.try_write() {
+ Ok(guard) => guard,
+ Err(TryLockError::Poisoned(err)) => {
+ poisoned = true;
+ err.into_inner()
+ }
+ Err(TryLockError::WouldBlock) => {
+ blocked = Some(i);
+ break;
+ }
+ };
+
+ // Store the guard into the shard.
+ unsafe {
+ let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
+ let dest: *mut _ = shard.write_guard.get();
+ *dest = Some(guard);
+ }
+ }
+
+ if let Some(i) = blocked {
+ // Unlock the shards in reverse order of locking.
+ for shard in self.shards[0..i].iter().rev() {
+ unsafe {
+ let dest: *mut _ = shard.write_guard.get();
+ let guard = mem::replace(&mut *dest, None);
+ drop(guard);
+ }
+ }
+ Err(TryLockError::WouldBlock)
+ } else if poisoned {
+ let guard = ShardedLockWriteGuard {
+ lock: self,
+ _marker: PhantomData,
+ };
+ Err(TryLockError::Poisoned(PoisonError::new(guard)))
+ } else {
+ Ok(ShardedLockWriteGuard {
+ lock: self,
+ _marker: PhantomData,
+ })
+ }
+ }
+
+ /// Locks with exclusive write access, blocking the current thread until it can be acquired.
+ ///
+ /// The calling thread will be blocked until there are no more writers which hold the lock.
+ /// There may be other readers currently inside the lock when this method returns. This method
+ /// does not provide any guarantees with respect to the ordering of whether contentious readers
+ /// or writers will acquire the lock first.
+ ///
+ /// Returns a guard which will release the exclusive access when dropped.
+ ///
+ /// # Errors
+ ///
+ /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
+ /// operation panics.
+ ///
+ /// # Panics
+ ///
+ /// This method might panic when called if the lock is already held by the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::ShardedLock;
+ ///
+ /// let lock = ShardedLock::new(1);
+ ///
+ /// let mut n = lock.write().unwrap();
+ /// *n = 2;
+ ///
+ /// assert!(lock.try_read().is_err());
+ /// ```
+ pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
+ let mut poisoned = false;
+
+ // Write-lock each shard in succession.
+ for shard in self.shards.iter() {
+ let guard = match shard.lock.write() {
+ Ok(guard) => guard,
+ Err(err) => {
+ poisoned = true;
+ err.into_inner()
+ }
+ };
+
+ // Store the guard into the shard.
+ unsafe {
+ let guard: RwLockWriteGuard<'_, ()> = guard;
+ let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
+ let dest: *mut _ = shard.write_guard.get();
+ *dest = Some(guard);
+ }
+ }
+
+ if poisoned {
+ Err(PoisonError::new(ShardedLockWriteGuard {
+ lock: self,
+ _marker: PhantomData,
+ }))
+ } else {
+ Ok(ShardedLockWriteGuard {
+ lock: self,
+ _marker: PhantomData,
+ })
+ }
+ }
+}
+
+impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.try_read() {
+ Ok(guard) => f
+ .debug_struct("ShardedLock")
+ .field("data", &&*guard)
+ .finish(),
+ Err(TryLockError::Poisoned(err)) => f
+ .debug_struct("ShardedLock")
+ .field("data", &&**err.get_ref())
+ .finish(),
+ Err(TryLockError::WouldBlock) => {
+ struct LockedPlaceholder;
+ impl fmt::Debug for LockedPlaceholder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("<locked>")
+ }
+ }
+ f.debug_struct("ShardedLock")
+ .field("data", &LockedPlaceholder)
+ .finish()
+ }
+ }
+ }
+}
+
+impl<T: Default> Default for ShardedLock<T> {
+ fn default() -> ShardedLock<T> {
+ ShardedLock::new(Default::default())
+ }
+}
+
+impl<T> From<T> for ShardedLock<T> {
+ fn from(t: T) -> Self {
+ ShardedLock::new(t)
+ }
+}
+
+/// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
+pub struct ShardedLockReadGuard<'a, T: ?Sized> {
+ lock: &'a ShardedLock<T>,
+ _guard: RwLockReadGuard<'a, ()>,
+ _marker: PhantomData<RwLockReadGuard<'a, T>>,
+}
+
+unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
+
+impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.lock.value.get() }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ShardedLockReadGuard")
+ .field("lock", &self.lock)
+ .finish()
+ }
+}
+
+impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ (**self).fmt(f)
+ }
+}
+
+/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
+pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
+ lock: &'a ShardedLock<T>,
+ _marker: PhantomData<RwLockWriteGuard<'a, T>>,
+}
+
+unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
+
+impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
+ fn drop(&mut self) {
+ // Unlock the shards in reverse order of locking.
+ for shard in self.lock.shards.iter().rev() {
+ unsafe {
+ let dest: *mut _ = shard.write_guard.get();
+ let guard = mem::replace(&mut *dest, None);
+ drop(guard);
+ }
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ShardedLockWriteGuard")
+ .field("lock", &self.lock)
+ .finish()
+ }
+}
+
+impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ (**self).fmt(f)
+ }
+}
+
+impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.lock.value.get() }
+ }
+}
+
+impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.lock.value.get() }
+ }
+}
+
+/// Returns a `usize` that identifies the current thread.
+///
+/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
+/// usually tend to be consecutive numbers between 0 and the number of running threads.
+///
+/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
+/// tearing down.
+#[inline]
+fn current_index() -> Option<usize> {
+ REGISTRATION.try_with(|reg| reg.index).ok()
+}
+
+/// The global registry keeping track of registered threads and indices.
+struct ThreadIndices {
+ /// Mapping from `ThreadId` to thread index.
+ mapping: HashMap<ThreadId, usize>,
+
+ /// A list of free indices.
+ free_list: Vec<usize>,
+
+ /// The next index to allocate if the free list is empty.
+ next_index: usize,
+}
+
+fn thread_indices() -> &'static Mutex<ThreadIndices> {
+ static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
+ fn init() -> Mutex<ThreadIndices> {
+ Mutex::new(ThreadIndices {
+ mapping: HashMap::new(),
+ free_list: Vec::new(),
+ next_index: 0,
+ })
+ }
+ THREAD_INDICES.get_or_init(init)
+}
+
+/// A registration of a thread with an index.
+///
+/// When dropped, unregisters the thread and frees the reserved index.
+struct Registration {
+ index: usize,
+ thread_id: ThreadId,
+}
+
+impl Drop for Registration {
+ fn drop(&mut self) {
+ let mut indices = thread_indices().lock().unwrap();
+ indices.mapping.remove(&self.thread_id);
+ indices.free_list.push(self.index);
+ }
+}
+
+thread_local! {
+ static REGISTRATION: Registration = {
+ let thread_id = thread::current().id();
+ let mut indices = thread_indices().lock().unwrap();
+
+ let index = match indices.free_list.pop() {
+ Some(i) => i,
+ None => {
+ let i = indices.next_index;
+ indices.next_index += 1;
+ i
+ }
+ };
+ indices.mapping.insert(thread_id, index);
+
+ Registration {
+ index,
+ thread_id,
+ }
+ };
+}
diff --git a/third_party/rust/crossbeam-utils/src/sync/wait_group.rs b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs
new file mode 100644
index 0000000000..19d6074157
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs
@@ -0,0 +1,145 @@
+use crate::primitive::sync::{Arc, Condvar, Mutex};
+use std::fmt;
+
+/// Enables threads to synchronize the beginning or end of some computation.
+///
+/// # Wait groups vs barriers
+///
+/// `WaitGroup` is very similar to [`Barrier`], but there are a few differences:
+///
+/// * [`Barrier`] needs to know the number of threads at construction, while `WaitGroup` is cloned to
+/// register more threads.
+///
+/// * A [`Barrier`] can be reused even after all threads have synchronized, while a `WaitGroup`
+/// synchronizes threads only once.
+///
+/// * All threads wait for others to reach the [`Barrier`]. With `WaitGroup`, each thread can choose
+/// to either wait for other threads or to continue without blocking.
+///
+/// # Examples
+///
+/// ```
+/// use crossbeam_utils::sync::WaitGroup;
+/// use std::thread;
+///
+/// // Create a new wait group.
+/// let wg = WaitGroup::new();
+///
+/// for _ in 0..4 {
+/// // Create another reference to the wait group.
+/// let wg = wg.clone();
+///
+/// thread::spawn(move || {
+/// // Do some work.
+///
+/// // Drop the reference to the wait group.
+/// drop(wg);
+/// });
+/// }
+///
+/// // Block until all threads have finished their work.
+/// wg.wait();
+/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+/// ```
+///
+/// [`Barrier`]: std::sync::Barrier
+pub struct WaitGroup {
+ inner: Arc<Inner>,
+}
+
+/// Inner state of a `WaitGroup`.
+struct Inner {
+ cvar: Condvar,
+ count: Mutex<usize>,
+}
+
+impl Default for WaitGroup {
+ fn default() -> Self {
+ Self {
+ inner: Arc::new(Inner {
+ cvar: Condvar::new(),
+ count: Mutex::new(1),
+ }),
+ }
+ }
+}
+
+impl WaitGroup {
+ /// Creates a new wait group and returns the single reference to it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::WaitGroup;
+ ///
+ /// let wg = WaitGroup::new();
+ /// ```
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Drops this reference and waits until all other references are dropped.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::WaitGroup;
+ /// use std::thread;
+ ///
+ /// let wg = WaitGroup::new();
+ ///
+ /// thread::spawn({
+ /// let wg = wg.clone();
+ /// move || {
+ /// // Block until both threads have reached `wait()`.
+ /// wg.wait();
+ /// }
+ /// });
+ ///
+ /// // Block until both threads have reached `wait()`.
+ /// wg.wait();
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ /// ```
+ pub fn wait(self) {
+ if *self.inner.count.lock().unwrap() == 1 {
+ return;
+ }
+
+ let inner = self.inner.clone();
+ drop(self);
+
+ let mut count = inner.count.lock().unwrap();
+ while *count > 0 {
+ count = inner.cvar.wait(count).unwrap();
+ }
+ }
+}
+
+impl Drop for WaitGroup {
+ fn drop(&mut self) {
+ let mut count = self.inner.count.lock().unwrap();
+ *count -= 1;
+
+ if *count == 0 {
+ self.inner.cvar.notify_all();
+ }
+ }
+}
+
+impl Clone for WaitGroup {
+ fn clone(&self) -> WaitGroup {
+ let mut count = self.inner.count.lock().unwrap();
+ *count += 1;
+
+ WaitGroup {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl fmt::Debug for WaitGroup {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let count: &usize = &*self.inner.count.lock().unwrap();
+ f.debug_struct("WaitGroup").field("count", count).finish()
+ }
+}