summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/lock
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/lock')
-rw-r--r--vendor/futures-util/src/lock/bilock.rs49
-rw-r--r--vendor/futures-util/src/lock/mod.rs22
-rw-r--r--vendor/futures-util/src/lock/mutex.rs155
3 files changed, 195 insertions, 31 deletions
diff --git a/vendor/futures-util/src/lock/bilock.rs b/vendor/futures-util/src/lock/bilock.rs
index 2f51ae7c9..7ddc66ad2 100644
--- a/vendor/futures-util/src/lock/bilock.rs
+++ b/vendor/futures-util/src/lock/bilock.rs
@@ -3,11 +3,11 @@
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::cell::UnsafeCell;
-use core::fmt;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
-use core::sync::atomic::AtomicUsize;
+use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering::SeqCst;
+use core::{fmt, ptr};
#[cfg(feature = "bilock")]
use futures_core::future::Future;
use futures_core::task::{Context, Poll, Waker};
@@ -41,7 +41,7 @@ pub struct BiLock<T> {
#[derive(Debug)]
struct Inner<T> {
- state: AtomicUsize,
+ state: AtomicPtr<Waker>,
value: Option<UnsafeCell<T>>,
}
@@ -61,7 +61,10 @@ impl<T> BiLock<T> {
/// Similarly, reuniting the lock and extracting the inner value is only
/// possible when `T` is `Unpin`.
pub fn new(t: T) -> (Self, Self) {
- let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) });
+ let arc = Arc::new(Inner {
+ state: AtomicPtr::new(ptr::null_mut()),
+ value: Some(UnsafeCell::new(t)),
+ });
(Self { arc: arc.clone() }, Self { arc })
}
@@ -87,7 +90,8 @@ impl<T> BiLock<T> {
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
let mut waker = None;
loop {
- match self.arc.state.swap(1, SeqCst) {
+ let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
+ match n as usize {
// Woohoo, we grabbed the lock!
0 => return Poll::Ready(BiLockGuard { bilock: self }),
@@ -96,8 +100,8 @@ impl<T> BiLock<T> {
// A task was previously blocked on this lock, likely our task,
// so we need to update that task.
- n => unsafe {
- let mut prev = Box::from_raw(n as *mut Waker);
+ _ => unsafe {
+ let mut prev = Box::from_raw(n);
*prev = cx.waker().clone();
waker = Some(prev);
},
@@ -105,9 +109,9 @@ impl<T> BiLock<T> {
// type ascription for safety's sake!
let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
- let me = Box::into_raw(me) as usize;
+ let me = Box::into_raw(me);
- match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) {
+ match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
// The lock is still locked, but we've now parked ourselves, so
// just report that we're scheduled to receive a notification.
Ok(_) => return Poll::Pending,
@@ -115,8 +119,8 @@ impl<T> BiLock<T> {
// Oops, looks like the lock was unlocked after our swap above
// and before the compare_exchange. Deallocate what we just
// allocated and go through the loop again.
- Err(0) => unsafe {
- waker = Some(Box::from_raw(me as *mut Waker));
+ Err(n) if n.is_null() => unsafe {
+ waker = Some(Box::from_raw(me));
},
// The top of this loop set the previous state to 1, so if we
@@ -125,7 +129,7 @@ impl<T> BiLock<T> {
// but we're trying to acquire the lock and there's only one
// other reference of the lock, so it should be impossible for
// that task to ever block itself.
- Err(n) => panic!("invalid state: {}", n),
+ Err(n) => panic!("invalid state: {}", n as usize),
}
}
}
@@ -164,7 +168,8 @@ impl<T> BiLock<T> {
}
fn unlock(&self) {
- match self.arc.state.swap(0, SeqCst) {
+ let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
+ match n as usize {
// we've locked the lock, shouldn't be possible for us to see an
// unlocked lock.
0 => panic!("invalid unlocked state"),
@@ -174,8 +179,8 @@ impl<T> BiLock<T> {
// Another task has parked themselves on this lock, let's wake them
// up as its now their turn.
- n => unsafe {
- Box::from_raw(n as *mut Waker).wake();
+ _ => unsafe {
+ Box::from_raw(n).wake();
},
}
}
@@ -189,7 +194,7 @@ impl<T: Unpin> Inner<T> {
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
- assert_eq!(self.state.load(SeqCst), 0);
+ assert!(self.state.load(SeqCst).is_null());
}
}
@@ -224,6 +229,9 @@ pub struct BiLockGuard<'a, T> {
bilock: &'a BiLock<T>,
}
+// We allow parallel access to T via Deref, so Sync bound is also needed here.
+unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {}
+
impl<T> Deref for BiLockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
@@ -274,3 +282,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> {
self.bilock.poll_lock(cx)
}
}
+
+// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible.
+#[allow(clippy::useless_transmute)]
+#[inline]
+fn invalid_ptr<T>(addr: usize) -> *mut T {
+ // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that
+ // pointer).
+ unsafe { core::mem::transmute(addr) }
+}
diff --git a/vendor/futures-util/src/lock/mod.rs b/vendor/futures-util/src/lock/mod.rs
index cf374c016..0be72717c 100644
--- a/vendor/futures-util/src/lock/mod.rs
+++ b/vendor/futures-util/src/lock/mod.rs
@@ -4,11 +4,18 @@
//! library is activated, and it is activated by default.
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "std")]
-mod mutex;
+#[cfg(any(feature = "sink", feature = "io"))]
+#[cfg(not(feature = "bilock"))]
+pub(crate) use self::bilock::BiLock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "std")]
-pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
+pub use self::mutex::{
+ MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,
+};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
@@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(any(feature = "sink", feature = "io"))]
-#[cfg(not(feature = "bilock"))]
-pub(crate) use self::bilock::BiLock;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "bilock")]
-#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
-pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
+#[cfg(feature = "std")]
+mod mutex;
diff --git a/vendor/futures-util/src/lock/mutex.rs b/vendor/futures-util/src/lock/mutex.rs
index 85dcb1537..335ad1427 100644
--- a/vendor/futures-util/src/lock/mutex.rs
+++ b/vendor/futures-util/src/lock/mutex.rs
@@ -1,14 +1,16 @@
-use futures_core::future::{FusedFuture, Future};
-use futures_core::task::{Context, Poll, Waker};
-use slab::Slab;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex as StdMutex;
+use std::sync::{Arc, Mutex as StdMutex};
use std::{fmt, mem};
+use slab::Slab;
+
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+
/// A futures-aware mutex.
///
/// # Fairness
@@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
}
}
+ /// Attempt to acquire the lock immediately.
+ ///
+ /// If the lock is currently held, this will return `None`.
+ pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
+ let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
+ if (old_state & IS_LOCKED) == 0 {
+ Some(OwnedMutexGuard { mutex: self.clone() })
+ } else {
+ None
+ }
+ }
+
/// Acquire the lock asynchronously.
///
/// This method returns a future that will resolve once the lock has been
@@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
}
+ /// Acquire the lock asynchronously.
+ ///
+ /// This method returns a future that will resolve once the lock has been
+ /// successfully acquired.
+ pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
+ OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
+ }
+
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
@@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
}
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
-const WAIT_KEY_NONE: usize = usize::max_value();
+const WAIT_KEY_NONE: usize = usize::MAX;
+
+/// A future which resolves when the target mutex has been successfully acquired, owned version.
+pub struct OwnedMutexLockFuture<T: ?Sized> {
+ // `None` indicates that the mutex was successfully acquired.
+ mutex: Option<Arc<Mutex<T>>>,
+ wait_key: usize,
+}
+
+impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexLockFuture")
+ .field("was_acquired", &self.mutex.is_none())
+ .field("mutex", &self.mutex)
+ .field(
+ "wait_key",
+ &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
+ )
+ .finish()
+ }
+}
+
+impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
+ fn is_terminated(&self) -> bool {
+ self.mutex.is_none()
+ }
+}
+
+impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
+ type Output = OwnedMutexGuard<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.get_mut();
+
+ let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
+
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ {
+ let mut waiters = mutex.waiters.lock().unwrap();
+ if this.wait_key == WAIT_KEY_NONE {
+ this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
+ if waiters.len() == 1 {
+ mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
+ }
+ } else {
+ waiters[this.wait_key].register(cx.waker());
+ }
+ }
+
+ // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
+ // attempting to acquire the lock again.
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ Poll::Pending
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
+ fn drop(&mut self) {
+ if let Some(mutex) = self.mutex.as_ref() {
+ // This future was dropped before it acquired the mutex.
+ //
+ // Remove ourselves from the map, waking up another waiter if we
+ // had been awoken to acquire the lock.
+ mutex.remove_waker(self.wait_key, true);
+ }
+ }
+}
+
+/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
+/// When this structure is dropped (falls out of scope), the lock will be
+/// unlocked.
+pub struct OwnedMutexGuard<T: ?Sized> {
+ mutex: Arc<Mutex<T>>,
+}
+
+impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexGuard")
+ .field("value", &&**self)
+ .field("mutex", &self.mutex)
+ .finish()
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
+ fn drop(&mut self) {
+ self.mutex.unlock()
+ }
+}
+
+impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ unsafe { &*self.mutex.value.get() }
+ }
+}
+
+impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.mutex.value.get() }
+ }
+}
/// A future which resolves when the target mutex has been successfully acquired.
pub struct MutexLockFuture<'a, T: ?Sized> {
@@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
+
// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
+// It's safe to switch which thread the acquire is being attempted on so long as
+// `T` can be accessed on that thread.
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
+
+// doesn't have any interesting `&self` methods (only Debug)
+unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
+
// Safe to send since we don't track any thread-specific details-- the inner
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
+
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
+unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
+
unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}