//! An asynchronously awaitable `CancellationToken`. //! The token allows to signal a cancellation request to one or more tasks. pub(crate) mod guard; mod tree_node; use crate::loom::sync::Arc; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; use guard::DropGuard; use pin_project_lite::pin_project; /// A token which can be used to signal a cancellation request to one or more /// tasks. /// /// Tasks can call [`CancellationToken::cancelled()`] in order to /// obtain a Future which will be resolved when cancellation is requested. /// /// Cancellation can be requested through the [`CancellationToken::cancel`] method. /// /// # Examples /// /// ```no_run /// use tokio::select; /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { /// let token = CancellationToken::new(); /// let cloned_token = token.clone(); /// /// let join_handle = tokio::spawn(async move { /// // Wait for either cancellation or a very long time /// select! { /// _ = cloned_token.cancelled() => { /// // The token was cancelled /// 5 /// } /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { /// 99 /// } /// } /// }); /// /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// /// assert_eq!(5, join_handle.await.unwrap()); /// } /// ``` pub struct CancellationToken { inner: Arc, } pin_project! { /// A Future that is resolved once the corresponding [`CancellationToken`] /// is cancelled. #[must_use = "futures do nothing unless polled"] pub struct WaitForCancellationFuture<'a> { cancellation_token: &'a CancellationToken, #[pin] future: tokio::sync::futures::Notified<'a>, } } // ===== impl CancellationToken ===== impl core::fmt::Debug for CancellationToken { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("CancellationToken") .field("is_cancelled", &self.is_cancelled()) .finish() } } impl Clone for CancellationToken { fn clone(&self) -> Self { tree_node::increase_handle_refcount(&self.inner); CancellationToken { inner: self.inner.clone(), } } } impl Drop for CancellationToken { fn drop(&mut self) { tree_node::decrease_handle_refcount(&self.inner); } } impl Default for CancellationToken { fn default() -> CancellationToken { CancellationToken::new() } } impl CancellationToken { /// Creates a new CancellationToken in the non-cancelled state. pub fn new() -> CancellationToken { CancellationToken { inner: Arc::new(tree_node::TreeNode::new()), } } /// Creates a `CancellationToken` which will get cancelled whenever the /// current token gets cancelled. /// /// If the current token is already cancelled, the child token will get /// returned in cancelled state. /// /// # Examples /// /// ```no_run /// use tokio::select; /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { /// let token = CancellationToken::new(); /// let child_token = token.child_token(); /// /// let join_handle = tokio::spawn(async move { /// // Wait for either cancellation or a very long time /// select! { /// _ = child_token.cancelled() => { /// // The token was cancelled /// 5 /// } /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { /// 99 /// } /// } /// }); /// /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// /// assert_eq!(5, join_handle.await.unwrap()); /// } /// ``` pub fn child_token(&self) -> CancellationToken { CancellationToken { inner: tree_node::child_node(&self.inner), } } /// Cancel the [`CancellationToken`] and all child tokens which had been /// derived from it. /// /// This will wake up all tasks which are waiting for cancellation. /// /// Be aware that cancellation is not an atomic operation. It is possible /// for another thread running in parallel with a call to `cancel` to first /// receive `true` from `is_cancelled` on one child node, and then receive /// `false` from `is_cancelled` on another child node. However, once the /// call to `cancel` returns, all child nodes have been fully cancelled. pub fn cancel(&self) { tree_node::cancel(&self.inner); } /// Returns `true` if the `CancellationToken` is cancelled. pub fn is_cancelled(&self) -> bool { tree_node::is_cancelled(&self.inner) } /// Returns a `Future` that gets fulfilled when cancellation is requested. /// /// The future will complete immediately if the token is already cancelled /// when this method is called. /// /// # Cancel safety /// /// This method is cancel safe. pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { WaitForCancellationFuture { cancellation_token: self, future: self.inner.notified(), } } /// Creates a `DropGuard` for this token. /// /// Returned guard will cancel this token (and all its children) on drop /// unless disarmed. pub fn drop_guard(self) -> DropGuard { DropGuard { inner: Some(self) } } } // ===== impl WaitForCancellationFuture ===== impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("WaitForCancellationFuture").finish() } } impl<'a> Future for WaitForCancellationFuture<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut this = self.project(); loop { if this.cancellation_token.is_cancelled() { return Poll::Ready(()); } // No wakeups can be lost here because there is always a call to // `is_cancelled` between the creation of the future and the call to // `poll`, and the code that sets the cancelled flag does so before // waking the `Notified`. if this.future.as_mut().poll(cx).is_pending() { return Poll::Pending; } this.future.set(this.cancellation_token.inner.notified()); } } }