diff options
Diffstat (limited to 'vendor/tokio/src/sync/mutex.rs')
-rw-r--r-- | vendor/tokio/src/sync/mutex.rs | 725 |
1 files changed, 725 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/mutex.rs b/vendor/tokio/src/sync/mutex.rs new file mode 100644 index 000000000..8ae824770 --- /dev/null +++ b/vendor/tokio/src/sync/mutex.rs @@ -0,0 +1,725 @@ +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] + +use crate::sync::batch_semaphore as semaphore; + +use std::cell::UnsafeCell; +use std::error::Error; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use std::{fmt, marker, mem}; + +/// An asynchronous `Mutex`-like type. +/// +/// This type acts similarly to [`std::sync::Mutex`], with two major +/// differences: [`lock`] is an async method so does not block, and the lock +/// guard is designed to be held across `.await` points. +/// +/// # Which kind of mutex should you use? +/// +/// Contrary to popular belief, it is ok and often preferred to use the ordinary +/// [`Mutex`][std] from the standard library in asynchronous code. +/// +/// The feature that the async mutex offers over the blocking mutex is the +/// ability to keep it locked across an `.await` point. This makes the async +/// mutex more expensive than the blocking mutex, so the blocking mutex should +/// be preferred in the cases where it can be used. The primary use case for the +/// async mutex is to provide shared mutable access to IO resources such as a +/// database connection. If the value behind the mutex is just data, it's +/// usually appropriate to use a blocking mutex such as the one in the standard +/// library or [`parking_lot`]. +/// +/// Note that, although the compiler will not prevent the std `Mutex` from holding +/// its guard across `.await` points in situations where the task is not movable +/// between threads, this virtually never leads to correct concurrent code in +/// practice as it can easily lead to deadlocks. +/// +/// A common pattern is to wrap the `Arc<Mutex<...>>` in a struct that provides +/// non-async methods for performing operations on the data within, and only +/// lock the mutex inside these methods. The [mini-redis] example provides an +/// illustration of this pattern. +/// +/// Additionally, when you _do_ want shared access to an IO resource, it is +/// often better to spawn a task to manage the IO resource, and to use message +/// passing to communicate with that task. +/// +/// [std]: std::sync::Mutex +/// [`parking_lot`]: https://docs.rs/parking_lot +/// [mini-redis]: https://github.com/tokio-rs/mini-redis/blob/master/src/db.rs +/// +/// # Examples: +/// +/// ```rust,no_run +/// use tokio::sync::Mutex; +/// use std::sync::Arc; +/// +/// #[tokio::main] +/// async fn main() { +/// let data1 = Arc::new(Mutex::new(0)); +/// let data2 = Arc::clone(&data1); +/// +/// tokio::spawn(async move { +/// let mut lock = data2.lock().await; +/// *lock += 1; +/// }); +/// +/// let mut lock = data1.lock().await; +/// *lock += 1; +/// } +/// ``` +/// +/// +/// ```rust,no_run +/// use tokio::sync::Mutex; +/// use std::sync::Arc; +/// +/// #[tokio::main] +/// async fn main() { +/// let count = Arc::new(Mutex::new(0)); +/// +/// for i in 0..5 { +/// let my_count = Arc::clone(&count); +/// tokio::spawn(async move { +/// for j in 0..10 { +/// let mut lock = my_count.lock().await; +/// *lock += 1; +/// println!("{} {} {}", i, j, lock); +/// } +/// }); +/// } +/// +/// loop { +/// if *count.lock().await >= 50 { +/// break; +/// } +/// } +/// println!("Count hit 50."); +/// } +/// ``` +/// There are a few things of note here to pay attention to in this example. +/// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across +/// threads. +/// 2. Each spawned task obtains a lock and releases it on every iteration. +/// 3. Mutation of the data protected by the Mutex is done by de-referencing +/// the obtained lock as seen on lines 12 and 19. +/// +/// Tokio's Mutex works in a simple FIFO (first in, first out) style where all +/// calls to [`lock`] complete in the order they were performed. In that way the +/// Mutex is "fair" and predictable in how it distributes the locks to inner +/// data. Locks are released and reacquired after every iteration, so basically, +/// each thread goes to the back of the line after it increments the value once. +/// Note that there's some unpredictability to the timing between when the +/// threads are started, but once they are going they alternate predictably. +/// Finally, since there is only a single valid lock at any given time, there is +/// no possibility of a race condition when mutating the inner value. +/// +/// Note that in contrast to [`std::sync::Mutex`], this implementation does not +/// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a +/// case, the mutex will be unlocked. If the panic is caught, this might leave +/// the data protected by the mutex in an inconsistent state. +/// +/// [`Mutex`]: struct@Mutex +/// [`MutexGuard`]: struct@MutexGuard +/// [`Arc`]: struct@std::sync::Arc +/// [`std::sync::Mutex`]: struct@std::sync::Mutex +/// [`Send`]: trait@std::marker::Send +/// [`lock`]: method@Mutex::lock +pub struct Mutex<T: ?Sized> { + s: semaphore::Semaphore, + c: UnsafeCell<T>, +} + +/// A handle to a held `Mutex`. The guard can be held across any `.await` point +/// as it is [`Send`]. +/// +/// As long as you have this guard, you have exclusive access to the underlying +/// `T`. The guard internally borrows the `Mutex`, so the mutex will not be +/// dropped while a guard exists. +/// +/// The lock is automatically released whenever the guard is dropped, at which +/// point `lock` will succeed yet again. +pub struct MutexGuard<'a, T: ?Sized> { + lock: &'a Mutex<T>, +} + +/// An owned handle to a held `Mutex`. +/// +/// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It +/// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`, +/// it clones the `Arc`, incrementing the reference count. This means that +/// unlike `MutexGuard`, it will have the `'static` lifetime. +/// +/// As long as you have this guard, you have exclusive access to the underlying +/// `T`. The guard internally keeps a reference-counted pointer to the original +/// `Mutex`, so even if the lock goes away, the guard remains valid. +/// +/// The lock is automatically released whenever the guard is dropped, at which +/// point `lock` will succeed yet again. +/// +/// [`Arc`]: std::sync::Arc +pub struct OwnedMutexGuard<T: ?Sized> { + lock: Arc<Mutex<T>>, +} + +/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`]. +/// +/// This can be used to hold a subfield of the protected data. +/// +/// [`MutexGuard::map`]: method@MutexGuard::map +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MappedMutexGuard<'a, T: ?Sized> { + s: &'a semaphore::Semaphore, + data: *mut T, + // Needed to tell the borrow checker that we are holding a `&mut T` + marker: marker::PhantomData<&'a mut T>, +} + +// As long as T: Send, it's fine to send and share Mutex<T> between threads. +// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can +// access T through Mutex<T>. +unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {} +unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {} +unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {} +unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {} +unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {} +unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {} + +/// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and +/// [`RwLock::try_write`] functions. +/// +/// `Mutex::try_lock` operation will only fail if the mutex is already locked. +/// +/// `RwLock::try_read` operation will only fail if the lock is currently held +/// by an exclusive writer. +/// +/// `RwLock::try_write` operation will if lock is held by any reader or by an +/// exclusive writer. +/// +/// [`Mutex::try_lock`]: Mutex::try_lock +/// [`RwLock::try_read`]: fn@super::RwLock::try_read +/// [`RwLock::try_write`]: fn@super::RwLock::try_write +#[derive(Debug)] +pub struct TryLockError(pub(super) ()); + +impl fmt::Display for TryLockError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "operation would block") + } +} + +impl Error for TryLockError {} + +#[test] +#[cfg(not(loom))] +fn bounds() { + fn check_send<T: Send>() {} + fn check_unpin<T: Unpin>() {} + // This has to take a value, since the async fn's return type is unnameable. + fn check_send_sync_val<T: Send + Sync>(_t: T) {} + fn check_send_sync<T: Send + Sync>() {} + fn check_static<T: 'static>() {} + fn check_static_val<T: 'static>(_t: T) {} + + check_send::<MutexGuard<'_, u32>>(); + check_send::<OwnedMutexGuard<u32>>(); + check_unpin::<Mutex<u32>>(); + check_send_sync::<Mutex<u32>>(); + check_static::<OwnedMutexGuard<u32>>(); + + let mutex = Mutex::new(1); + check_send_sync_val(mutex.lock()); + let arc_mutex = Arc::new(Mutex::new(1)); + check_send_sync_val(arc_mutex.clone().lock_owned()); + check_static_val(arc_mutex.lock_owned()); +} + +impl<T: ?Sized> Mutex<T> { + /// Creates a new lock in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// let lock = Mutex::new(5); + /// ``` + pub fn new(t: T) -> Self + where + T: Sized, + { + Self { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::new(1), + } + } + + /// Creates a new lock in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// static LOCK: Mutex<i32> = Mutex::const_new(5); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(t: T) -> Self + where + T: Sized, + { + Self { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::const_new(1), + } + } + + /// Locks this mutex, causing the current task to yield until the lock has + /// been acquired. When the lock has been acquired, function returns a + /// [`MutexGuard`]. + /// + /// # Cancel safety + /// + /// This method uses a queue to fairly distribute locks in the order they + /// were requested. Cancelling a call to `lock` makes you lose your place in + /// the queue. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Mutex::new(1); + /// + /// let mut n = mutex.lock().await; + /// *n = 2; + /// } + /// ``` + pub async fn lock(&self) -> MutexGuard<'_, T> { + self.acquire().await; + MutexGuard { lock: self } + } + + /// Locks this mutex, causing the current task to yield until the lock has + /// been acquired. When the lock has been acquired, this returns an + /// [`OwnedMutexGuard`]. + /// + /// This method is identical to [`Mutex::lock`], except that the returned + /// guard references the `Mutex` with an [`Arc`] rather than by borrowing + /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this + /// method, and the guard will live for the `'static` lifetime, as it keeps + /// the `Mutex` alive by holding an `Arc`. + /// + /// # Cancel safety + /// + /// This method uses a queue to fairly distribute locks in the order they + /// were requested. Cancelling a call to `lock_owned` makes you lose your + /// place in the queue. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Arc::new(Mutex::new(1)); + /// + /// let mut n = mutex.clone().lock_owned().await; + /// *n = 2; + /// } + /// ``` + /// + /// [`Arc`]: std::sync::Arc + pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> { + self.acquire().await; + OwnedMutexGuard { lock: self } + } + + async fn acquire(&self) { + self.s.acquire(1).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and + // we own it exclusively, which means that this can never happen. + unreachable!() + }); + } + + /// Attempts to acquire the lock, and returns [`TryLockError`] if the + /// lock is currently held somewhere else. + /// + /// [`TryLockError`]: TryLockError + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// # async fn dox() -> Result<(), tokio::sync::TryLockError> { + /// + /// let mutex = Mutex::new(1); + /// + /// let n = mutex.try_lock()?; + /// assert_eq!(*n, 1); + /// # Ok(()) + /// # } + /// ``` + pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> { + match self.s.try_acquire(1) { + Ok(_) => Ok(MutexGuard { lock: self }), + Err(_) => Err(TryLockError(())), + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// fn main() { + /// let mut mutex = Mutex::new(1); + /// + /// let n = mutex.get_mut(); + /// *n = 2; + /// } + /// ``` + pub fn get_mut(&mut self) -> &mut T { + unsafe { + // Safety: This is https://github.com/rust-lang/rust/pull/76936 + &mut *self.c.get() + } + } + + /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock + /// is currently held somewhere else. + /// + /// This method is identical to [`Mutex::try_lock`], except that the + /// returned guard references the `Mutex` with an [`Arc`] rather than by + /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call + /// this method, and the guard will live for the `'static` lifetime, as it + /// keeps the `Mutex` alive by holding an `Arc`. + /// + /// [`TryLockError`]: TryLockError + /// [`Arc`]: std::sync::Arc + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// use std::sync::Arc; + /// # async fn dox() -> Result<(), tokio::sync::TryLockError> { + /// + /// let mutex = Arc::new(Mutex::new(1)); + /// + /// let n = mutex.clone().try_lock_owned()?; + /// assert_eq!(*n, 1); + /// # Ok(()) + /// # } + pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> { + match self.s.try_acquire(1) { + Ok(_) => Ok(OwnedMutexGuard { lock: self }), + Err(_) => Err(TryLockError(())), + } + } + + /// Consumes the mutex, returning the underlying data. + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Mutex::new(1); + /// + /// let n = mutex.into_inner(); + /// assert_eq!(n, 1); + /// } + /// ``` + pub fn into_inner(self) -> T + where + T: Sized, + { + self.c.into_inner() + } +} + +impl<T> From<T> for Mutex<T> { + fn from(s: T) -> Self { + Self::new(s) + } +} + +impl<T> Default for Mutex<T> +where + T: Default, +{ + fn default() -> Self { + Self::new(T::default()) + } +} + +impl<T> std::fmt::Debug for Mutex<T> +where + T: std::fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("Mutex"); + match self.try_lock() { + Ok(inner) => d.field("data", &*inner), + Err(_) => d.field("data", &format_args!("<locked>")), + }; + d.finish() + } +} + +// === impl MutexGuard === + +impl<'a, T: ?Sized> MutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::map(...)`. A method + /// would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::map(foo.lock().await, |f| &mut f.0); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = &this.lock.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::try_map(foo.lock().await, |f| Some(&mut f.0)) + /// .expect("should not fail"); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = &this.lock.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<T: ?Sized> Drop for MutexGuard<'_, T> { + fn drop(&mut self) { + self.lock.s.release(1) + } +} + +impl<T: ?Sized> Deref for MutexGuard<'_, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.lock.c.get() } + } +} + +impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.lock.c.get() } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +// === impl OwnedMutexGuard === + +impl<T: ?Sized> Drop for OwnedMutexGuard<T> { + fn drop(&mut self) { + self.lock.s.release(1) + } +} + +impl<T: ?Sized> Deref for OwnedMutexGuard<T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.lock.c.get() } + } +} + +impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.lock.c.get() } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +// === impl MappedMutexGuard === + +impl<'a, T: ?Sized> MappedMutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = this.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = this.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> { + fn drop(&mut self) { + self.s.release(1) + } +} + +impl<'a, T: ?Sized> Deref for MappedMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for MappedMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} |