summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/semaphore.rs')
-rw-r--r--third_party/rust/tokio/src/sync/semaphore.rs644
1 files changed, 644 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/semaphore.rs b/third_party/rust/tokio/src/sync/semaphore.rs
new file mode 100644
index 0000000000..860f46f399
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/semaphore.rs
@@ -0,0 +1,644 @@
+use super::batch_semaphore as ll; // low level implementation
+use super::{AcquireError, TryAcquireError};
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::util::trace;
+use std::sync::Arc;
+
+/// Counting semaphore performing asynchronous permit acquisition.
+///
+/// A semaphore maintains a set of permits. Permits are used to synchronize
+/// access to a shared resource. A semaphore differs from a mutex in that it
+/// can allow more than one concurrent caller to access the shared resource at a
+/// time.
+///
+/// When `acquire` is called and the semaphore has remaining permits, the
+/// function immediately returns a permit. However, if no remaining permits are
+/// available, `acquire` (asynchronously) waits until an outstanding permit is
+/// dropped. At this point, the freed permit is assigned to the caller.
+///
+/// This `Semaphore` is fair, which means that permits are given out in the order
+/// they were requested. This fairness is also applied when `acquire_many` gets
+/// involved, so if a call to `acquire_many` at the front of the queue requests
+/// more permits than currently available, this can prevent a call to `acquire`
+/// from completing, even if the semaphore has enough permits complete the call
+/// to `acquire`.
+///
+/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
+/// utility.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio::sync::{Semaphore, TryAcquireError};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let semaphore = Semaphore::new(3);
+///
+/// let a_permit = semaphore.acquire().await.unwrap();
+/// let two_permits = semaphore.acquire_many(2).await.unwrap();
+///
+/// assert_eq!(semaphore.available_permits(), 0);
+///
+/// let permit_attempt = semaphore.try_acquire();
+/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
+/// }
+/// ```
+///
+/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
+///
+/// ```
+/// use std::sync::Arc;
+/// use tokio::sync::Semaphore;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let semaphore = Arc::new(Semaphore::new(3));
+/// let mut join_handles = Vec::new();
+///
+/// for _ in 0..5 {
+/// let permit = semaphore.clone().acquire_owned().await.unwrap();
+/// join_handles.push(tokio::spawn(async move {
+/// // perform task...
+/// // explicitly own `permit` in the task
+/// drop(permit);
+/// }));
+/// }
+///
+/// for handle in join_handles {
+/// handle.await.unwrap();
+/// }
+/// }
+/// ```
+///
+/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
+/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
+#[derive(Debug)]
+pub struct Semaphore {
+ /// The low level semaphore
+ ll_sem: ll::Semaphore,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
+}
+
+/// A permit from the semaphore.
+///
+/// This type is created by the [`acquire`] method.
+///
+/// [`acquire`]: crate::sync::Semaphore::acquire()
+#[must_use]
+#[derive(Debug)]
+pub struct SemaphorePermit<'a> {
+ sem: &'a Semaphore,
+ permits: u32,
+}
+
+/// An owned permit from the semaphore.
+///
+/// This type is created by the [`acquire_owned`] method.
+///
+/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
+#[must_use]
+#[derive(Debug)]
+pub struct OwnedSemaphorePermit {
+ sem: Arc<Semaphore>,
+ permits: u32,
+}
+
+#[test]
+#[cfg(not(loom))]
+fn bounds() {
+ fn check_unpin<T: Unpin>() {}
+ // This has to take a value, since the async fn's return type is unnameable.
+ fn check_send_sync_val<T: Send + Sync>(_t: T) {}
+ fn check_send_sync<T: Send + Sync>() {}
+ check_unpin::<Semaphore>();
+ check_unpin::<SemaphorePermit<'_>>();
+ check_send_sync::<Semaphore>();
+
+ let semaphore = Semaphore::new(0);
+ check_send_sync_val(semaphore.acquire());
+}
+
+impl Semaphore {
+ /// Creates a new semaphore with the initial number of permits.
+ #[track_caller]
+ pub fn new(permits: usize) -> Self {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+
+ tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Semaphore",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ inherits_child_attrs = true,
+ )
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let ll_sem = ll::Semaphore::new(permits);
+
+ Self {
+ ll_sem,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
+ }
+
+ /// Creates a new semaphore with the initial number of permits.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// static SEM: Semaphore = Semaphore::const_new(10);
+ /// ```
+ ///
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ pub const fn const_new(permits: usize) -> Self {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ return Self {
+ ll_sem: ll::Semaphore::const_new(permits),
+ resource_span: tracing::Span::none(),
+ };
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ return Self {
+ ll_sem: ll::Semaphore::const_new(permits),
+ };
+ }
+
+ /// Returns the current number of available permits.
+ pub fn available_permits(&self) -> usize {
+ self.ll_sem.available_permits()
+ }
+
+ /// Adds `n` new permits to the semaphore.
+ ///
+ /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
+ pub fn add_permits(&self, n: usize) {
+ self.ll_sem.release(n);
+ }
+
+ /// Acquires a permit from the semaphore.
+ ///
+ /// If the semaphore has been closed, this returns an [`AcquireError`].
+ /// Otherwise, this returns a [`SemaphorePermit`] representing the
+ /// acquired permit.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute permits in the order they
+ /// were requested. Cancelling a call to `acquire` makes you lose your place
+ /// in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Semaphore::new(2);
+ ///
+ /// let permit_1 = semaphore.acquire().await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.acquire().await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// drop(permit_1);
+ /// assert_eq!(semaphore.available_permits(), 1);
+ /// }
+ /// ```
+ ///
+ /// [`AcquireError`]: crate::sync::AcquireError
+ /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
+ pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.ll_sem.acquire(1),
+ self.resource_span.clone(),
+ "Semaphore::acquire",
+ "poll",
+ true,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.ll_sem.acquire(1);
+
+ inner.await?;
+ Ok(SemaphorePermit {
+ sem: self,
+ permits: 1,
+ })
+ }
+
+ /// Acquires `n` permits from the semaphore.
+ ///
+ /// If the semaphore has been closed, this returns an [`AcquireError`].
+ /// Otherwise, this returns a [`SemaphorePermit`] representing the
+ /// acquired permits.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute permits in the order they
+ /// were requested. Cancelling a call to `acquire_many` makes you lose your
+ /// place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Semaphore::new(5);
+ ///
+ /// let permit = semaphore.acquire_many(3).await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 2);
+ /// }
+ /// ```
+ ///
+ /// [`AcquireError`]: crate::sync::AcquireError
+ /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
+ pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ trace::async_op(
+ || self.ll_sem.acquire(n),
+ self.resource_span.clone(),
+ "Semaphore::acquire_many",
+ "poll",
+ true,
+ )
+ .await?;
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ self.ll_sem.acquire(n).await?;
+
+ Ok(SemaphorePermit {
+ sem: self,
+ permits: n,
+ })
+ }
+
+ /// Tries to acquire a permit from the semaphore.
+ ///
+ /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
+ /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
+ /// this returns a [`SemaphorePermit`] representing the acquired permits.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Semaphore::new(2);
+ ///
+ /// let permit_1 = semaphore.try_acquire().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// let permit_3 = semaphore.try_acquire();
+ /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
+ /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
+ /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
+ /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
+ pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
+ match self.ll_sem.try_acquire(1) {
+ Ok(_) => Ok(SemaphorePermit {
+ sem: self,
+ permits: 1,
+ }),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Tries to acquire `n` permits from the semaphore.
+ ///
+ /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
+ /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
+ /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Semaphore::new(4);
+ ///
+ /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire_many(2);
+ /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
+ /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
+ /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
+ /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
+ pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
+ match self.ll_sem.try_acquire(n) {
+ Ok(_) => Ok(SemaphorePermit {
+ sem: self,
+ permits: n,
+ }),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Acquires a permit from the semaphore.
+ ///
+ /// The semaphore must be wrapped in an [`Arc`] to call this method.
+ /// If the semaphore has been closed, this returns an [`AcquireError`].
+ /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
+ /// acquired permit.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute permits in the order they
+ /// were requested. Cancelling a call to `acquire_owned` makes you lose your
+ /// place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(3));
+ /// let mut join_handles = Vec::new();
+ ///
+ /// for _ in 0..5 {
+ /// let permit = semaphore.clone().acquire_owned().await.unwrap();
+ /// join_handles.push(tokio::spawn(async move {
+ /// // perform task...
+ /// // explicitly own `permit` in the task
+ /// drop(permit);
+ /// }));
+ /// }
+ ///
+ /// for handle in join_handles {
+ /// handle.await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`Arc`]: std::sync::Arc
+ /// [`AcquireError`]: crate::sync::AcquireError
+ /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
+ pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.ll_sem.acquire(1),
+ self.resource_span.clone(),
+ "Semaphore::acquire_owned",
+ "poll",
+ true,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.ll_sem.acquire(1);
+
+ inner.await?;
+ Ok(OwnedSemaphorePermit {
+ sem: self,
+ permits: 1,
+ })
+ }
+
+ /// Acquires `n` permits from the semaphore.
+ ///
+ /// The semaphore must be wrapped in an [`Arc`] to call this method.
+ /// If the semaphore has been closed, this returns an [`AcquireError`].
+ /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
+ /// acquired permit.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method uses a queue to fairly distribute permits in the order they
+ /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
+ /// your place in the queue.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(10));
+ /// let mut join_handles = Vec::new();
+ ///
+ /// for _ in 0..5 {
+ /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
+ /// join_handles.push(tokio::spawn(async move {
+ /// // perform task...
+ /// // explicitly own `permit` in the task
+ /// drop(permit);
+ /// }));
+ /// }
+ ///
+ /// for handle in join_handles {
+ /// handle.await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`Arc`]: std::sync::Arc
+ /// [`AcquireError`]: crate::sync::AcquireError
+ /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
+ pub async fn acquire_many_owned(
+ self: Arc<Self>,
+ n: u32,
+ ) -> Result<OwnedSemaphorePermit, AcquireError> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = trace::async_op(
+ || self.ll_sem.acquire(n),
+ self.resource_span.clone(),
+ "Semaphore::acquire_many_owned",
+ "poll",
+ true,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = self.ll_sem.acquire(n);
+
+ inner.await?;
+ Ok(OwnedSemaphorePermit {
+ sem: self,
+ permits: n,
+ })
+ }
+
+ /// Tries to acquire a permit from the semaphore.
+ ///
+ /// The semaphore must be wrapped in an [`Arc`] to call this method. If
+ /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
+ /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
+ /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
+ /// acquired permit.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(2));
+ ///
+ /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// let permit_3 = semaphore.try_acquire_owned();
+ /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
+ /// [`Arc`]: std::sync::Arc
+ /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
+ /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
+ /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
+ pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
+ match self.ll_sem.try_acquire(1) {
+ Ok(_) => Ok(OwnedSemaphorePermit {
+ sem: self,
+ permits: 1,
+ }),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Tries to acquire `n` permits from the semaphore.
+ ///
+ /// The semaphore must be wrapped in an [`Arc`] to call this method. If
+ /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
+ /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
+ /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
+ /// acquired permit.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(4));
+ ///
+ /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire_many_owned(2);
+ /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
+ /// [`Arc`]: std::sync::Arc
+ /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
+ /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
+ /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
+ pub fn try_acquire_many_owned(
+ self: Arc<Self>,
+ n: u32,
+ ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
+ match self.ll_sem.try_acquire(n) {
+ Ok(_) => Ok(OwnedSemaphorePermit {
+ sem: self,
+ permits: n,
+ }),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Closes the semaphore.
+ ///
+ /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ /// use std::sync::Arc;
+ /// use tokio::sync::TryAcquireError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(1));
+ /// let semaphore2 = semaphore.clone();
+ ///
+ /// tokio::spawn(async move {
+ /// let permit = semaphore.acquire_many(2).await;
+ /// assert!(permit.is_err());
+ /// println!("waiter received error");
+ /// });
+ ///
+ /// println!("closing semaphore");
+ /// semaphore2.close();
+ ///
+ /// // Cannot obtain more permits
+ /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
+ /// }
+ /// ```
+ pub fn close(&self) {
+ self.ll_sem.close();
+ }
+
+ /// Returns true if the semaphore is closed
+ pub fn is_closed(&self) -> bool {
+ self.ll_sem.is_closed()
+ }
+}
+
+impl<'a> SemaphorePermit<'a> {
+ /// Forgets the permit **without** releasing it back to the semaphore.
+ /// This can be used to reduce the amount of permits available from a
+ /// semaphore.
+ pub fn forget(mut self) {
+ self.permits = 0;
+ }
+}
+
+impl OwnedSemaphorePermit {
+ /// Forgets the permit **without** releasing it back to the semaphore.
+ /// This can be used to reduce the amount of permits available from a
+ /// semaphore.
+ pub fn forget(mut self) {
+ self.permits = 0;
+ }
+}
+
+impl<'a> Drop for SemaphorePermit<'_> {
+ fn drop(&mut self) {
+ self.sem.add_permits(self.permits as usize);
+ }
+}
+
+impl Drop for OwnedSemaphorePermit {
+ fn drop(&mut self) {
+ self.sem.add_permits(self.permits as usize);
+ }
+}