summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-util/src/sync/cancellation_token.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-util/src/sync/cancellation_token.rs')
-rw-r--r--third_party/rust/tokio-util/src/sync/cancellation_token.rs224
1 files changed, 224 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/src/sync/cancellation_token.rs b/third_party/rust/tokio-util/src/sync/cancellation_token.rs
new file mode 100644
index 0000000000..2a6ef392bd
--- /dev/null
+++ b/third_party/rust/tokio-util/src/sync/cancellation_token.rs
@@ -0,0 +1,224 @@
+//! An asynchronously awaitable `CancellationToken`.
+//! The token allows to signal a cancellation request to one or more tasks.
+pub(crate) mod guard;
+mod tree_node;
+
+use crate::loom::sync::Arc;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+use guard::DropGuard;
+use pin_project_lite::pin_project;
+
+/// A token which can be used to signal a cancellation request to one or more
+/// tasks.
+///
+/// Tasks can call [`CancellationToken::cancelled()`] in order to
+/// obtain a Future which will be resolved when cancellation is requested.
+///
+/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
+///
+/// # Examples
+///
+/// ```no_run
+/// use tokio::select;
+/// use tokio_util::sync::CancellationToken;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let token = CancellationToken::new();
+/// let cloned_token = token.clone();
+///
+/// let join_handle = tokio::spawn(async move {
+/// // Wait for either cancellation or a very long time
+/// select! {
+/// _ = cloned_token.cancelled() => {
+/// // The token was cancelled
+/// 5
+/// }
+/// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
+/// 99
+/// }
+/// }
+/// });
+///
+/// tokio::spawn(async move {
+/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
+/// token.cancel();
+/// });
+///
+/// assert_eq!(5, join_handle.await.unwrap());
+/// }
+/// ```
+pub struct CancellationToken {
+ inner: Arc<tree_node::TreeNode>,
+}
+
+pin_project! {
+ /// A Future that is resolved once the corresponding [`CancellationToken`]
+ /// is cancelled.
+ #[must_use = "futures do nothing unless polled"]
+ pub struct WaitForCancellationFuture<'a> {
+ cancellation_token: &'a CancellationToken,
+ #[pin]
+ future: tokio::sync::futures::Notified<'a>,
+ }
+}
+
+// ===== impl CancellationToken =====
+
+impl core::fmt::Debug for CancellationToken {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ f.debug_struct("CancellationToken")
+ .field("is_cancelled", &self.is_cancelled())
+ .finish()
+ }
+}
+
+impl Clone for CancellationToken {
+ fn clone(&self) -> Self {
+ tree_node::increase_handle_refcount(&self.inner);
+ CancellationToken {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl Drop for CancellationToken {
+ fn drop(&mut self) {
+ tree_node::decrease_handle_refcount(&self.inner);
+ }
+}
+
+impl Default for CancellationToken {
+ fn default() -> CancellationToken {
+ CancellationToken::new()
+ }
+}
+
+impl CancellationToken {
+ /// Creates a new CancellationToken in the non-cancelled state.
+ pub fn new() -> CancellationToken {
+ CancellationToken {
+ inner: Arc::new(tree_node::TreeNode::new()),
+ }
+ }
+
+ /// Creates a `CancellationToken` which will get cancelled whenever the
+ /// current token gets cancelled.
+ ///
+ /// If the current token is already cancelled, the child token will get
+ /// returned in cancelled state.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::select;
+ /// use tokio_util::sync::CancellationToken;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let token = CancellationToken::new();
+ /// let child_token = token.child_token();
+ ///
+ /// let join_handle = tokio::spawn(async move {
+ /// // Wait for either cancellation or a very long time
+ /// select! {
+ /// _ = child_token.cancelled() => {
+ /// // The token was cancelled
+ /// 5
+ /// }
+ /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
+ /// 99
+ /// }
+ /// }
+ /// });
+ ///
+ /// tokio::spawn(async move {
+ /// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
+ /// token.cancel();
+ /// });
+ ///
+ /// assert_eq!(5, join_handle.await.unwrap());
+ /// }
+ /// ```
+ pub fn child_token(&self) -> CancellationToken {
+ CancellationToken {
+ inner: tree_node::child_node(&self.inner),
+ }
+ }
+
+ /// Cancel the [`CancellationToken`] and all child tokens which had been
+ /// derived from it.
+ ///
+ /// This will wake up all tasks which are waiting for cancellation.
+ ///
+ /// Be aware that cancellation is not an atomic operation. It is possible
+ /// for another thread running in parallel with a call to `cancel` to first
+ /// receive `true` from `is_cancelled` on one child node, and then receive
+ /// `false` from `is_cancelled` on another child node. However, once the
+ /// call to `cancel` returns, all child nodes have been fully cancelled.
+ pub fn cancel(&self) {
+ tree_node::cancel(&self.inner);
+ }
+
+ /// Returns `true` if the `CancellationToken` is cancelled.
+ pub fn is_cancelled(&self) -> bool {
+ tree_node::is_cancelled(&self.inner)
+ }
+
+ /// Returns a `Future` that gets fulfilled when cancellation is requested.
+ ///
+ /// The future will complete immediately if the token is already cancelled
+ /// when this method is called.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe.
+ pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
+ WaitForCancellationFuture {
+ cancellation_token: self,
+ future: self.inner.notified(),
+ }
+ }
+
+ /// Creates a `DropGuard` for this token.
+ ///
+ /// Returned guard will cancel this token (and all its children) on drop
+ /// unless disarmed.
+ pub fn drop_guard(self) -> DropGuard {
+ DropGuard { inner: Some(self) }
+ }
+}
+
+// ===== impl WaitForCancellationFuture =====
+
+impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ f.debug_struct("WaitForCancellationFuture").finish()
+ }
+}
+
+impl<'a> Future for WaitForCancellationFuture<'a> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let mut this = self.project();
+ loop {
+ if this.cancellation_token.is_cancelled() {
+ return Poll::Ready(());
+ }
+
+ // No wakeups can be lost here because there is always a call to
+ // `is_cancelled` between the creation of the future and the call to
+ // `poll`, and the code that sets the cancelled flag does so before
+ // waking the `Notified`.
+ if this.future.as_mut().poll(cx).is_pending() {
+ return Poll::Pending;
+ }
+
+ this.future.set(this.cancellation_token.inner.notified());
+ }
+ }
+}