summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/semaphore.rs')
-rw-r--r--third_party/rust/tokio/src/sync/semaphore.rs105
1 files changed, 105 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/semaphore.rs b/third_party/rust/tokio/src/sync/semaphore.rs
new file mode 100644
index 0000000000..4cce7e8f5b
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/semaphore.rs
@@ -0,0 +1,105 @@
+use super::batch_semaphore as ll; // low level implementation
+use crate::coop::CoopFutureExt;
+
+/// Counting semaphore performing asynchronous permit aquisition.
+///
+/// A semaphore maintains a set of permits. Permits are used to synchronize
+/// access to a shared resource. A semaphore differs from a mutex in that it
+/// can allow more than one concurrent caller to access the shared resource at a
+/// time.
+///
+/// When `acquire` is called and the semaphore has remaining permits, the
+/// function immediately returns a permit. However, if no remaining permits are
+/// available, `acquire` (asynchronously) waits until an outstanding permit is
+/// dropped. At this point, the freed permit is assigned to the caller.
+#[derive(Debug)]
+pub struct Semaphore {
+ /// The low level semaphore
+ ll_sem: ll::Semaphore,
+}
+
+/// A permit from the semaphore
+#[must_use]
+#[derive(Debug)]
+pub struct SemaphorePermit<'a> {
+ sem: &'a Semaphore,
+ permits: u16,
+}
+
+/// Error returned from the [`Semaphore::try_acquire`] function.
+///
+/// A `try_acquire` operation can only fail if the semaphore has no available
+/// permits.
+///
+/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
+#[derive(Debug)]
+pub struct TryAcquireError(());
+
+#[test]
+#[cfg(not(loom))]
+fn bounds() {
+ fn check_unpin<T: Unpin>() {}
+ // This has to take a value, since the async fn's return type is unnameable.
+ fn check_send_sync_val<T: Send + Sync>(_t: T) {}
+ fn check_send_sync<T: Send + Sync>() {}
+ check_unpin::<Semaphore>();
+ check_unpin::<SemaphorePermit<'_>>();
+ check_send_sync::<Semaphore>();
+
+ let semaphore = Semaphore::new(0);
+ check_send_sync_val(semaphore.acquire());
+}
+
+impl Semaphore {
+ /// Creates a new semaphore with the initial number of permits
+ pub fn new(permits: usize) -> Self {
+ Self {
+ ll_sem: ll::Semaphore::new(permits),
+ }
+ }
+
+ /// Returns the current number of available permits
+ pub fn available_permits(&self) -> usize {
+ self.ll_sem.available_permits()
+ }
+
+ /// Adds `n` new permits to the semaphore.
+ pub fn add_permits(&self, n: usize) {
+ self.ll_sem.release(n);
+ }
+
+ /// Acquires permit from the semaphore
+ pub async fn acquire(&self) -> SemaphorePermit<'_> {
+ self.ll_sem.acquire(1).cooperate().await.unwrap();
+ SemaphorePermit {
+ sem: &self,
+ permits: 1,
+ }
+ }
+
+ /// Tries to acquire a permit form the semaphore
+ pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
+ match self.ll_sem.try_acquire(1) {
+ Ok(_) => Ok(SemaphorePermit {
+ sem: self,
+ permits: 1,
+ }),
+ Err(_) => Err(TryAcquireError(())),
+ }
+ }
+}
+
+impl<'a> SemaphorePermit<'a> {
+ /// Forgets the permit **without** releasing it back to the semaphore.
+ /// This can be used to reduce the amount of permits available from a
+ /// semaphore.
+ pub fn forget(mut self) {
+ self.permits = 0;
+ }
+}
+
+impl<'a> Drop for SemaphorePermit<'_> {
+ fn drop(&mut self) {
+ self.sem.add_permits(self.permits as usize);
+ }
+}