summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/rwlock.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/rwlock.rs')
-rw-r--r--third_party/rust/tokio/src/sync/rwlock.rs1078
1 files changed, 1078 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/rwlock.rs b/third_party/rust/tokio/src/sync/rwlock.rs
new file mode 100644
index 0000000000..b856cfc856
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/rwlock.rs
@@ -0,0 +1,1078 @@
+use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
+use crate::sync::mutex::TryLockError;
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::util::trace;
+use std::cell::UnsafeCell;
+use std::marker;
+use std::marker::PhantomData;
+use std::mem::ManuallyDrop;
+use std::sync::Arc;
+
+pub(crate) mod owned_read_guard;
+pub(crate) mod owned_write_guard;
+pub(crate) mod owned_write_guard_mapped;
+pub(crate) mod read_guard;
+pub(crate) mod write_guard;
+pub(crate) mod write_guard_mapped;
+pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
+pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
+pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
+pub(crate) use read_guard::RwLockReadGuard;
+pub(crate) use write_guard::RwLockWriteGuard;
+pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
+
+#[cfg(not(loom))]
+const MAX_READS: u32 = std::u32::MAX >> 3;
+
+#[cfg(loom)]
+const MAX_READS: u32 = 10;
+
+/// An asynchronous reader-writer lock.
+///
+/// This type of lock allows a number of readers or at most one writer at any
+/// point in time. The write portion of this lock typically allows modification
+/// of the underlying data (exclusive access) and the read portion of this lock
+/// typically allows for read-only access (shared access).
+///
+/// In comparison, a [`Mutex`] does not distinguish between readers or writers
+/// that acquire the lock, therefore causing any tasks waiting for the lock to
+/// become available to yield. An `RwLock` will allow any number of readers to
+/// acquire the lock as long as a writer is not holding the lock.
+///
+/// The priority policy of Tokio's read-write lock is _fair_ (or
+/// [_write-preferring_]), in order to ensure that readers cannot starve
+/// writers. Fairness is ensured using a first-in, first-out queue for the tasks
+/// awaiting the lock; if a task that wishes to acquire the write lock is at the
+/// head of the queue, read locks will not be given out until the write lock has
+/// been released. This is in contrast to the Rust standard library's
+/// `std::sync::RwLock`, where the priority policy is dependent on the
+/// operating system's implementation.
+///
+/// The type parameter `T` represents the data that this lock protects. It is
+/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
+/// returned from the locking methods implement [`Deref`](trait@std::ops::Deref)
+/// (and [`DerefMut`](trait@std::ops::DerefMut)
+/// for the `write` methods) to allow access to the content of the lock.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::RwLock;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let lock = RwLock::new(5);
+///
+/// // many reader locks can be held at once
+/// {
+/// let r1 = lock.read().await;
+/// let r2 = lock.read().await;
+/// assert_eq!(*r1, 5);
+/// assert_eq!(*r2, 5);
+/// } // read locks are dropped at this point
+///
+/// // only one write lock may be held, however
+/// {
+/// let mut w = lock.write().await;
+/// *w += 1;
+/// assert_eq!(*w, 6);
+/// } // write lock is dropped here
+/// }
+/// ```
+///
+/// [`Mutex`]: struct@super::Mutex
+/// [`RwLock`]: struct@RwLock
+/// [`RwLockReadGuard`]: struct@RwLockReadGuard
+/// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
+/// [`Send`]: trait@std::marker::Send
+/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
+#[derive(Debug)]
+pub struct RwLock<T: ?Sized> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
+
+ // maximum number of concurrent readers
+ mr: u32,
+
+ //semaphore to coordinate read and write access to T
+ s: Semaphore,
+
+ //inner data T
+ c: UnsafeCell<T>,
+}
+
+#[test]
+#[cfg(not(loom))]
+fn bounds() {
+ fn check_send<T: Send>() {}
+ fn check_sync<T: Sync>() {}
+ fn check_unpin<T: Unpin>() {}
+ // This has to take a value, since the async fn's return type is unnameable.
+ fn check_send_sync_val<T: Send + Sync>(_t: T) {}
+
+ check_send::<RwLock<u32>>();
+ check_sync::<RwLock<u32>>();
+ check_unpin::<RwLock<u32>>();
+
+ check_send::<RwLockReadGuard<'_, u32>>();
+ check_sync::<RwLockReadGuard<'_, u32>>();
+ check_unpin::<RwLockReadGuard<'_, u32>>();
+
+ check_send::<OwnedRwLockReadGuard<u32, i32>>();
+ check_sync::<OwnedRwLockReadGuard<u32, i32>>();
+ check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
+
+ check_send::<RwLockWriteGuard<'_, u32>>();
+ check_sync::<RwLockWriteGuard<'_, u32>>();
+ check_unpin::<RwLockWriteGuard<'_, u32>>();
+
+ check_send::<RwLockMappedWriteGuard<'_, u32>>();
+ check_sync::<RwLockMappedWriteGuard<'_, u32>>();
+ check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
+
+ check_send::<OwnedRwLockWriteGuard<u32>>();
+ check_sync::<OwnedRwLockWriteGuard<u32>>();
+ check_unpin::<OwnedRwLockWriteGuard<u32>>();
+
+ check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
+ check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
+ check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
+
+ let rwlock = Arc::new(RwLock::new(0));
+ check_send_sync_val(rwlock.read());
+ check_send_sync_val(Arc::clone(&rwlock).read_owned());
+ check_send_sync_val(rwlock.write());
+ check_send_sync_val(Arc::clone(&rwlock).write_owned());
+}
+
+// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
+// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
+// RwLock<T>.
+unsafe impl<T> Send for RwLock<T> where T: ?Sized + Send {}
+unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
+// NB: These impls need to be explicit since we're storing a raw pointer.
+// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
+// `T` is `Send`.
+unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
+unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
+// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
+// the RwLock, unlike RwLockReadGuard.
+unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
+where
+ T: ?Sized + Send + Sync,
+ U: ?Sized + Sync,
+{
+}
+unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
+where
+ T: ?Sized + Send + Sync,
+ U: ?Sized + Send + Sync,
+{
+}
+unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
+unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
+unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
+unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
+where
+ T: ?Sized + Send + Sync,
+ U: ?Sized + Send + Sync,
+{
+}
+// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
+// `T` is `Send` - but since this is also provides mutable access, we need to
+// make sure that `T` is `Send` since its value can be sent across thread
+// boundaries.
+unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
+unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
+unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
+unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
+where
+ T: ?Sized + Send + Sync,
+ U: ?Sized + Send + Sync,
+{
+}
+
+impl<T: ?Sized> RwLock<T> {
+ /// Creates a new instance of an `RwLock<T>` which is unlocked.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(5);
+ /// ```
+ #[track_caller]
+ pub fn new(value: T) -> RwLock<T>
+ where
+ T: Sized,
+ {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "RwLock",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ max_readers = MAX_READS,
+ );
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = false,
+ );
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 0,
+ );
+ });
+
+ resource_span
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let s = Semaphore::new(MAX_READS as usize);
+
+ RwLock {
+ mr: MAX_READS,
+ c: UnsafeCell::new(value),
+ s,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+ }
+
+ /// Creates a new instance of an `RwLock<T>` which is unlocked
+ /// and allows a maximum of `max_reads` concurrent readers.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// let lock = RwLock::with_max_readers(5, 1024);
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if `max_reads` is more than `u32::MAX >> 3`.
+ #[track_caller]
+ pub fn with_max_readers(value: T, max_reads: u32) -> RwLock<T>
+ where
+ T: Sized,
+ {
+ assert!(
+ max_reads <= MAX_READS,
+ "a RwLock may not be created with more than {} readers",
+ MAX_READS
+ );
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "RwLock",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ max_readers = max_reads,
+ );
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = false,
+ );
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 0,
+ );
+ });
+
+ resource_span
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let s = Semaphore::new(max_reads as usize);
+
+ RwLock {
+ mr: max_reads,
+ c: UnsafeCell::new(value),
+ s,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+ }
+
+ /// Creates a new instance of an `RwLock<T>` which is unlocked.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// static LOCK: RwLock<i32> = RwLock::const_new(5);
+ /// ```
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ pub const fn const_new(value: T) -> RwLock<T>
+ where
+ T: Sized,
+ {
+ RwLock {
+ mr: MAX_READS,
+ c: UnsafeCell::new(value),
+ s: Semaphore::const_new(MAX_READS as usize),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span::none(),
+ }
+ }
+
+ /// Creates a new instance of an `RwLock<T>` which is unlocked
+ /// and allows a maximum of `max_reads` concurrent readers.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
+ /// ```
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ pub const fn const_with_max_readers(value: T, mut max_reads: u32) -> RwLock<T>
+ where
+ T: Sized,
+ {
+ max_reads &= MAX_READS;
+ RwLock {
+ mr: max_reads,
+ c: UnsafeCell::new(value),
+ s: Semaphore::const_new(max_reads as usize),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span::none(),
+ }
+ }
+
+ /// Locks this `RwLock` with shared read access, causing the current task
+ /// to yield until the lock has been acquired.
+ ///
+ /// The calling task will yield until there are no writers which hold the
+ /// lock. There may be other readers inside the lock when the task resumes.
+ ///
+ /// Note that under the priority policy of [`RwLock`], read locks are not
+ /// granted until prior write locks, to prevent starvation. Therefore
+ /// deadlock may occur if a read lock is held by the current task, a write
+ /// lock attempt is made, and then a subsequent read lock attempt is made
+ /// by the current task.
+ ///
+ /// Returns an RAII guard which will drop this read access of the `RwLock`
+ /// when dropped.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute locks in the order they
+ /// were requested. Cancelling a call to `read` makes you lose your place in
+ /// the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let n = lock.read().await;
+ /// assert_eq!(*n, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// // While main has an active read lock, we acquire one too.
+ /// let r = c_lock.read().await;
+ /// assert_eq!(*r, 1);
+ /// }).await.expect("The spawned task has panicked");
+ ///
+ /// // Drop the guard after the spawned task finishes.
+ /// drop(n);
+ /// }
+ /// ```
+ pub async fn read(&self) -> RwLockReadGuard<'_, T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.s.acquire(1),
+ self.resource_span.clone(),
+ "RwLock::read",
+ "poll",
+ false,
+ );
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.s.acquire(1);
+
+ inner.await.unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 1,
+ current_readers.op = "add",
+ )
+ });
+
+ RwLockReadGuard {
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ }
+ }
+
+ /// Blockingly locks this `RwLock` with shared read access.
+ ///
+ /// This method is intended for use cases where you
+ /// need to use this rwlock in asynchronous code as well as in synchronous code.
+ ///
+ /// Returns an RAII guard which will drop the read access of this `RwLock` when dropped.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution context.
+ ///
+ /// - If you find yourself in an asynchronous execution context and needing
+ /// to call some (synchronous) function which performs one of these
+ /// `blocking_` operations, then consider wrapping that call inside
+ /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
+ /// (or [`block_in_place()`][crate::task::block_in_place]).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let rwlock = Arc::new(RwLock::new(1));
+ /// let mut write_lock = rwlock.write().await;
+ ///
+ /// let blocking_task = tokio::task::spawn_blocking({
+ /// let rwlock = Arc::clone(&rwlock);
+ /// move || {
+ /// // This shall block until the `write_lock` is released.
+ /// let read_lock = rwlock.blocking_read();
+ /// assert_eq!(*read_lock, 0);
+ /// }
+ /// });
+ ///
+ /// *write_lock -= 1;
+ /// drop(write_lock); // release the lock.
+ ///
+ /// // Await the completion of the blocking task.
+ /// blocking_task.await.unwrap();
+ ///
+ /// // Assert uncontended.
+ /// assert!(rwlock.try_write().is_ok());
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_read(&self) -> RwLockReadGuard<'_, T> {
+ crate::future::block_on(self.read())
+ }
+
+ /// Locks this `RwLock` with shared read access, causing the current task
+ /// to yield until the lock has been acquired.
+ ///
+ /// The calling task will yield until there are no writers which hold the
+ /// lock. There may be other readers inside the lock when the task resumes.
+ ///
+ /// This method is identical to [`RwLock::read`], except that the returned
+ /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
+ /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
+ /// method, and the guard will live for the `'static` lifetime, as it keeps
+ /// the `RwLock` alive by holding an `Arc`.
+ ///
+ /// Note that under the priority policy of [`RwLock`], read locks are not
+ /// granted until prior write locks, to prevent starvation. Therefore
+ /// deadlock may occur if a read lock is held by the current task, a write
+ /// lock attempt is made, and then a subsequent read lock attempt is made
+ /// by the current task.
+ ///
+ /// Returns an RAII guard which will drop this read access of the `RwLock`
+ /// when dropped.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute locks in the order they
+ /// were requested. Cancelling a call to `read_owned` makes you lose your
+ /// place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let n = lock.read_owned().await;
+ /// assert_eq!(*n, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// // While main has an active read lock, we acquire one too.
+ /// let r = c_lock.read_owned().await;
+ /// assert_eq!(*r, 1);
+ /// }).await.expect("The spawned task has panicked");
+ ///
+ /// // Drop the guard after the spawned task finishes.
+ /// drop(n);
+ ///}
+ /// ```
+ pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.s.acquire(1),
+ self.resource_span.clone(),
+ "RwLock::read_owned",
+ "poll",
+ false,
+ );
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.s.acquire(1);
+
+ inner.await.unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 1,
+ current_readers.op = "add",
+ )
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ OwnedRwLockReadGuard {
+ data: self.c.get(),
+ lock: ManuallyDrop::new(self),
+ _p: PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+ }
+
+ /// Attempts to acquire this `RwLock` with shared read access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release read access
+ /// when dropped.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let v = lock.try_read().unwrap();
+ /// assert_eq!(*v, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// // While main has an active read lock, we acquire one too.
+ /// let n = c_lock.read().await;
+ /// assert_eq!(*n, 1);
+ /// }).await.expect("The spawned task has panicked");
+ ///
+ /// // Drop the guard when spawned task finishes.
+ /// drop(v);
+ /// }
+ /// ```
+ pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
+ match self.s.try_acquire(1) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 1,
+ current_readers.op = "add",
+ )
+ });
+
+ Ok(RwLockReadGuard {
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ })
+ }
+
+ /// Attempts to acquire this `RwLock` with shared read access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release read access
+ /// when dropped.
+ ///
+ /// This method is identical to [`RwLock::try_read`], except that the
+ /// returned guard references the `RwLock` with an [`Arc`] rather than by
+ /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
+ /// call this method, and the guard will live for the `'static` lifetime,
+ /// as it keeps the `RwLock` alive by holding an `Arc`.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let v = lock.try_read_owned().unwrap();
+ /// assert_eq!(*v, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// // While main has an active read lock, we acquire one too.
+ /// let n = c_lock.read_owned().await;
+ /// assert_eq!(*n, 1);
+ /// }).await.expect("The spawned task has panicked");
+ ///
+ /// // Drop the guard when spawned task finishes.
+ /// drop(v);
+ /// }
+ /// ```
+ pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
+ match self.s.try_acquire(1) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ current_readers = 1,
+ current_readers.op = "add",
+ )
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ Ok(OwnedRwLockReadGuard {
+ data: self.c.get(),
+ lock: ManuallyDrop::new(self),
+ _p: PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ })
+ }
+
+ /// Locks this `RwLock` with exclusive write access, causing the current
+ /// task to yield until the lock has been acquired.
+ ///
+ /// The calling task will yield while other writers or readers currently
+ /// have access to the lock.
+ ///
+ /// Returns an RAII guard which will drop the write access of this `RwLock`
+ /// when dropped.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute locks in the order they
+ /// were requested. Cancelling a call to `write` makes you lose your place
+ /// in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = RwLock::new(1);
+ ///
+ /// let mut n = lock.write().await;
+ /// *n = 2;
+ ///}
+ /// ```
+ pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.s.acquire(self.mr),
+ self.resource_span.clone(),
+ "RwLock::write",
+ "poll",
+ false,
+ );
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.s.acquire(self.mr);
+
+ inner.await.unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = true,
+ write_locked.op = "override",
+ )
+ });
+
+ RwLockWriteGuard {
+ permits_acquired: self.mr,
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ }
+ }
+
+ /// Blockingly locks this `RwLock` with exclusive write access.
+ ///
+ /// This method is intended for use cases where you
+ /// need to use this rwlock in asynchronous code as well as in synchronous code.
+ ///
+ /// Returns an RAII guard which will drop the write access of this `RwLock` when dropped.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution context.
+ ///
+ /// - If you find yourself in an asynchronous execution context and needing
+ /// to call some (synchronous) function which performs one of these
+ /// `blocking_` operations, then consider wrapping that call inside
+ /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
+ /// (or [`block_in_place()`][crate::task::block_in_place]).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::{sync::RwLock};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let rwlock = Arc::new(RwLock::new(1));
+ /// let read_lock = rwlock.read().await;
+ ///
+ /// let blocking_task = tokio::task::spawn_blocking({
+ /// let rwlock = Arc::clone(&rwlock);
+ /// move || {
+ /// // This shall block until the `read_lock` is released.
+ /// let mut write_lock = rwlock.blocking_write();
+ /// *write_lock = 2;
+ /// }
+ /// });
+ ///
+ /// assert_eq!(*read_lock, 1);
+ /// // Release the last outstanding read lock.
+ /// drop(read_lock);
+ ///
+ /// // Await the completion of the blocking task.
+ /// blocking_task.await.unwrap();
+ ///
+ /// // Assert uncontended.
+ /// let read_lock = rwlock.try_read().unwrap();
+ /// assert_eq!(*read_lock, 2);
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_write(&self) -> RwLockWriteGuard<'_, T> {
+ crate::future::block_on(self.write())
+ }
+
+ /// Locks this `RwLock` with exclusive write access, causing the current
+ /// task to yield until the lock has been acquired.
+ ///
+ /// The calling task will yield while other writers or readers currently
+ /// have access to the lock.
+ ///
+ /// This method is identical to [`RwLock::write`], except that the returned
+ /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
+ /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
+ /// method, and the guard will live for the `'static` lifetime, as it keeps
+ /// the `RwLock` alive by holding an `Arc`.
+ ///
+ /// Returns an RAII guard which will drop the write access of this `RwLock`
+ /// when dropped.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute locks in the order they
+ /// were requested. Cancelling a call to `write_owned` makes you lose your
+ /// place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ ///
+ /// let mut n = lock.write_owned().await;
+ /// *n = 2;
+ ///}
+ /// ```
+ pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.s.acquire(self.mr),
+ self.resource_span.clone(),
+ "RwLock::write_owned",
+ "poll",
+ false,
+ );
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.s.acquire(self.mr);
+
+ inner.await.unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = true,
+ write_locked.op = "override",
+ )
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ OwnedRwLockWriteGuard {
+ permits_acquired: self.mr,
+ data: self.c.get(),
+ lock: ManuallyDrop::new(self),
+ _p: PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+ }
+
+ /// Attempts to acquire this `RwLock` with exclusive write access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release write access
+ /// when dropped.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let rw = RwLock::new(1);
+ ///
+ /// let v = rw.read().await;
+ /// assert_eq!(*v, 1);
+ ///
+ /// assert!(rw.try_write().is_err());
+ /// }
+ /// ```
+ pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
+ match self.s.try_acquire(self.mr) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = true,
+ write_locked.op = "override",
+ )
+ });
+
+ Ok(RwLockWriteGuard {
+ permits_acquired: self.mr,
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ })
+ }
+
+ /// Attempts to acquire this `RwLock` with exclusive write access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release write access
+ /// when dropped.
+ ///
+ /// This method is identical to [`RwLock::try_write`], except that the
+ /// returned guard references the `RwLock` with an [`Arc`] rather than by
+ /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
+ /// call this method, and the guard will live for the `'static` lifetime,
+ /// as it keeps the `RwLock` alive by holding an `Arc`.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let rw = Arc::new(RwLock::new(1));
+ ///
+ /// let v = Arc::clone(&rw).read_owned().await;
+ /// assert_eq!(*v, 1);
+ ///
+ /// assert!(rw.try_write_owned().is_err());
+ /// }
+ /// ```
+ pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
+ match self.s.try_acquire(self.mr) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ write_locked = true,
+ write_locked.op = "override",
+ )
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ Ok(OwnedRwLockWriteGuard {
+ permits_acquired: self.mr,
+ data: self.c.get(),
+ lock: ManuallyDrop::new(self),
+ _p: PhantomData,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ })
+ }
+
+ /// Returns a mutable reference to the underlying data.
+ ///
+ /// Since this call borrows the `RwLock` mutably, no actual locking needs to
+ /// take place -- the mutable borrow statically guarantees no locks exist.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// fn main() {
+ /// let mut lock = RwLock::new(1);
+ ///
+ /// let n = lock.get_mut();
+ /// *n = 2;
+ /// }
+ /// ```
+ pub fn get_mut(&mut self) -> &mut T {
+ unsafe {
+ // Safety: This is https://github.com/rust-lang/rust/pull/76936
+ &mut *self.c.get()
+ }
+ }
+
+ /// Consumes the lock, returning the underlying data.
+ pub fn into_inner(self) -> T
+ where
+ T: Sized,
+ {
+ self.c.into_inner()
+ }
+}
+
+impl<T> From<T> for RwLock<T> {
+ fn from(s: T) -> Self {
+ Self::new(s)
+ }
+}
+
+impl<T: ?Sized> Default for RwLock<T>
+where
+ T: Default,
+{
+ fn default() -> Self {
+ Self::new(T::default())
+ }
+}