diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:39 +0000 |
commit | 1376c5a617be5c25655d0d7cb63e3beaa5a6e026 (patch) | |
tree | 3bb8d61aee02bc7a15eab3f36e3b921afc2075d0 /vendor/futures-util/src/lock | |
parent | Releasing progress-linux version 1.69.0+dfsg1-1~progress7.99u1. (diff) | |
download | rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.tar.xz rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.zip |
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures-util/src/lock')
-rw-r--r-- | vendor/futures-util/src/lock/bilock.rs | 49 | ||||
-rw-r--r-- | vendor/futures-util/src/lock/mod.rs | 22 | ||||
-rw-r--r-- | vendor/futures-util/src/lock/mutex.rs | 155 |
3 files changed, 195 insertions, 31 deletions
diff --git a/vendor/futures-util/src/lock/bilock.rs b/vendor/futures-util/src/lock/bilock.rs index 2f51ae7c9..7ddc66ad2 100644 --- a/vendor/futures-util/src/lock/bilock.rs +++ b/vendor/futures-util/src/lock/bilock.rs @@ -3,11 +3,11 @@ use alloc::boxed::Box; use alloc::sync::Arc; use core::cell::UnsafeCell; -use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; -use core::sync::atomic::AtomicUsize; +use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering::SeqCst; +use core::{fmt, ptr}; #[cfg(feature = "bilock")] use futures_core::future::Future; use futures_core::task::{Context, Poll, Waker}; @@ -41,7 +41,7 @@ pub struct BiLock<T> { #[derive(Debug)] struct Inner<T> { - state: AtomicUsize, + state: AtomicPtr<Waker>, value: Option<UnsafeCell<T>>, } @@ -61,7 +61,10 @@ impl<T> BiLock<T> { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); + let arc = Arc::new(Inner { + state: AtomicPtr::new(ptr::null_mut()), + value: Some(UnsafeCell::new(t)), + }); (Self { arc: arc.clone() }, Self { arc }) } @@ -87,7 +90,8 @@ impl<T> BiLock<T> { pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { let mut waker = None; loop { - match self.arc.state.swap(1, SeqCst) { + let n = self.arc.state.swap(invalid_ptr(1), SeqCst); + match n as usize { // Woohoo, we grabbed the lock! 0 => return Poll::Ready(BiLockGuard { bilock: self }), @@ -96,8 +100,8 @@ impl<T> BiLock<T> { // A task was previously blocked on this lock, likely our task, // so we need to update that task. - n => unsafe { - let mut prev = Box::from_raw(n as *mut Waker); + _ => unsafe { + let mut prev = Box::from_raw(n); *prev = cx.waker().clone(); waker = Some(prev); }, @@ -105,9 +109,9 @@ impl<T> BiLock<T> { // type ascription for safety's sake! let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); - let me = Box::into_raw(me) as usize; + let me = Box::into_raw(me); - match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { + match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { // The lock is still locked, but we've now parked ourselves, so // just report that we're scheduled to receive a notification. Ok(_) => return Poll::Pending, @@ -115,8 +119,8 @@ impl<T> BiLock<T> { // Oops, looks like the lock was unlocked after our swap above // and before the compare_exchange. Deallocate what we just // allocated and go through the loop again. - Err(0) => unsafe { - waker = Some(Box::from_raw(me as *mut Waker)); + Err(n) if n.is_null() => unsafe { + waker = Some(Box::from_raw(me)); }, // The top of this loop set the previous state to 1, so if we @@ -125,7 +129,7 @@ impl<T> BiLock<T> { // but we're trying to acquire the lock and there's only one // other reference of the lock, so it should be impossible for // that task to ever block itself. - Err(n) => panic!("invalid state: {}", n), + Err(n) => panic!("invalid state: {}", n as usize), } } } @@ -164,7 +168,8 @@ impl<T> BiLock<T> { } fn unlock(&self) { - match self.arc.state.swap(0, SeqCst) { + let n = self.arc.state.swap(ptr::null_mut(), SeqCst); + match n as usize { // we've locked the lock, shouldn't be possible for us to see an // unlocked lock. 0 => panic!("invalid unlocked state"), @@ -174,8 +179,8 @@ impl<T> BiLock<T> { // Another task has parked themselves on this lock, let's wake them // up as its now their turn. - n => unsafe { - Box::from_raw(n as *mut Waker).wake(); + _ => unsafe { + Box::from_raw(n).wake(); }, } } @@ -189,7 +194,7 @@ impl<T: Unpin> Inner<T> { impl<T> Drop for Inner<T> { fn drop(&mut self) { - assert_eq!(self.state.load(SeqCst), 0); + assert!(self.state.load(SeqCst).is_null()); } } @@ -224,6 +229,9 @@ pub struct BiLockGuard<'a, T> { bilock: &'a BiLock<T>, } +// We allow parallel access to T via Deref, so Sync bound is also needed here. +unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} + impl<T> Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -274,3 +282,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> { self.bilock.poll_lock(cx) } } + +// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. +#[allow(clippy::useless_transmute)] +#[inline] +fn invalid_ptr<T>(addr: usize) -> *mut T { + // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that + // pointer). + unsafe { core::mem::transmute(addr) } +} diff --git a/vendor/futures-util/src/lock/mod.rs b/vendor/futures-util/src/lock/mod.rs index cf374c016..0be72717c 100644 --- a/vendor/futures-util/src/lock/mod.rs +++ b/vendor/futures-util/src/lock/mod.rs @@ -4,11 +4,18 @@ //! library is activated, and it is activated by default. #[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -mod mutex; +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; +pub use self::mutex::{ + MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] @@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(not(futures_no_atomic_cas))] -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; +#[cfg(feature = "std")] +mod mutex; diff --git a/vendor/futures-util/src/lock/mutex.rs b/vendor/futures-util/src/lock/mutex.rs index 85dcb1537..335ad1427 100644 --- a/vendor/futures-util/src/lock/mutex.rs +++ b/vendor/futures-util/src/lock/mutex.rs @@ -1,14 +1,16 @@ -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex as StdMutex; +use std::sync::{Arc, Mutex as StdMutex}; use std::{fmt, mem}; +use slab::Slab; + +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + /// A futures-aware mutex. /// /// # Fairness @@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> { } } + /// Attempt to acquire the lock immediately. + /// + /// If the lock is currently held, this will return `None`. + pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> { + let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); + if (old_state & IS_LOCKED) == 0 { + Some(OwnedMutexGuard { mutex: self.clone() }) + } else { + None + } + } + /// Acquire the lock asynchronously. /// /// This method returns a future that will resolve once the lock has been @@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> { MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } + /// Acquire the lock asynchronously. + /// + /// This method returns a future that will resolve once the lock has been + /// successfully acquired. + pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> { + OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to @@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> { } // Sentinel for when no slot in the `Slab` has been dedicated to this object. -const WAIT_KEY_NONE: usize = usize::max_value(); +const WAIT_KEY_NONE: usize = usize::MAX; + +/// A future which resolves when the target mutex has been successfully acquired, owned version. +pub struct OwnedMutexLockFuture<T: ?Sized> { + // `None` indicates that the mutex was successfully acquired. + mutex: Option<Arc<Mutex<T>>>, + wait_key: usize, +} + +impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexLockFuture") + .field("was_acquired", &self.mutex.is_none()) + .field("mutex", &self.mutex) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) + .finish() + } +} + +impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> { + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<T: ?Sized> Future for OwnedMutexLockFuture<T> { + type Output = OwnedMutexGuard<T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.get_mut(); + + let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); + + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + { + let mut waiters = mutex.waiters.lock().unwrap(); + if this.wait_key == WAIT_KEY_NONE { + this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); + if waiters.len() == 1 { + mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock + } + } else { + waiters[this.wait_key].register(cx.waker()); + } + } + + // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by + // attempting to acquire the lock again. + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + Poll::Pending + } +} + +impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> { + fn drop(&mut self) { + if let Some(mutex) = self.mutex.as_ref() { + // This future was dropped before it acquired the mutex. + // + // Remove ourselves from the map, waking up another waiter if we + // had been awoken to acquire the lock. + mutex.remove_waker(self.wait_key, true); + } + } +} + +/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct OwnedMutexGuard<T: ?Sized> { + mutex: Arc<Mutex<T>>, +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexGuard") + .field("value", &&**self) + .field("mutex", &self.mutex) + .finish() + } +} + +impl<T: ?Sized> Drop for OwnedMutexGuard<T> { + fn drop(&mut self) { + self.mutex.unlock() + } +} + +impl<T: ?Sized> Deref for OwnedMutexGuard<T> { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} /// A future which resolves when the target mutex has been successfully acquired. pub struct MutexLockFuture<'a, T: ?Sized> { @@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} // It's safe to switch which thread the acquire is being attempted on so long as // `T` can be accessed on that thread. unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {} + // doesn't have any interesting `&self` methods (only Debug) unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {} +// It's safe to switch which thread the acquire is being attempted on so long as +// `T` can be accessed on that thread. +unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {} + +// doesn't have any interesting `&self` methods (only Debug) +unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {} + // Safe to send since we don't track any thread-specific details-- the inner // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} + +unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {} +unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {} + unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {} unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {} |