summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/task
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/task')
-rw-r--r--third_party/rust/tokio/src/task/blocking.rs211
-rw-r--r--third_party/rust/tokio/src/task/builder.rs201
-rw-r--r--third_party/rust/tokio/src/task/consume_budget.rs46
-rw-r--r--third_party/rust/tokio/src/task/join_set.rs584
-rw-r--r--third_party/rust/tokio/src/task/local.rs1175
-rw-r--r--third_party/rust/tokio/src/task/mod.rs333
-rw-r--r--third_party/rust/tokio/src/task/spawn.rs206
-rw-r--r--third_party/rust/tokio/src/task/task_local.rs451
-rw-r--r--third_party/rust/tokio/src/task/unconstrained.rs45
-rw-r--r--third_party/rust/tokio/src/task/yield_now.rs64
10 files changed, 3316 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/task/blocking.rs b/third_party/rust/tokio/src/task/blocking.rs
new file mode 100644
index 0000000000..9bd15ebd5d
--- /dev/null
+++ b/third_party/rust/tokio/src/task/blocking.rs
@@ -0,0 +1,211 @@
+use crate::task::JoinHandle;
+
+cfg_rt_multi_thread! {
+ /// Runs the provided blocking function on the current thread without
+ /// blocking the executor.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is problematic, as it may prevent the executor
+ /// from driving other tasks forward. Calling this function informs the
+ /// executor that the currently executing task is about to block the thread,
+ /// so the executor is able to hand off any other tasks it has to a new
+ /// worker thread before that happens. See the [CPU-bound tasks and blocking
+ /// code][blocking] section for more information.
+ ///
+ /// Be aware that although this function avoids starving other independently
+ /// spawned tasks, any other code running concurrently in the same task will
+ /// be suspended during the call to `block_in_place`. This can happen e.g.
+ /// when using the [`join!`] macro. To avoid this issue, use
+ /// [`spawn_blocking`] instead of `block_in_place`.
+ ///
+ /// Note that this function cannot be used within a [`current_thread`] runtime
+ /// because in this case there are no other worker threads to hand off tasks
+ /// to. On the other hand, calling the function outside a runtime is
+ /// allowed. In this case, `block_in_place` just calls the provided closure
+ /// normally.
+ ///
+ /// Code running behind `block_in_place` cannot be cancelled. When you shut
+ /// down the executor, it will wait indefinitely for all blocking operations
+ /// to finish. You can use [`shutdown_timeout`] to stop waiting for them
+ /// after a certain timeout. Be aware that this will still not cancel the
+ /// tasks — they are simply allowed to keep running after the method
+ /// returns.
+ ///
+ /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
+ /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
+ /// [`join!`]: macro@join
+ /// [`thread::spawn`]: fn@std::thread::spawn
+ /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// // do some compute-heavy work or call synchronous code
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Code running inside `block_in_place` may use `block_on` to reenter the
+ /// async context.
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::runtime::Handle;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// Handle::current().block_on(async move {
+ /// // do something async
+ /// });
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called from a [`current_thread`] runtime.
+ ///
+ /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread
+ #[track_caller]
+ pub fn block_in_place<F, R>(f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ crate::runtime::scheduler::multi_thread::block_in_place(f)
+ }
+}
+
+cfg_rt! {
+ /// Runs the provided closure on a thread where blocking is acceptable.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is problematic, as it may prevent the executor from
+ /// driving other futures forward. This function runs the provided closure on a
+ /// thread dedicated to blocking operations. See the [CPU-bound tasks and
+ /// blocking code][blocking] section for more information.
+ ///
+ /// Tokio will spawn more blocking threads when they are requested through this
+ /// function until the upper limit configured on the [`Builder`] is reached.
+ /// After reaching the upper limit, the tasks are put in a queue.
+ /// The thread limit is very large by default, because `spawn_blocking` is often
+ /// used for various kinds of IO operations that cannot be performed
+ /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
+ /// should keep this large upper limit in mind. When running many CPU-bound
+ /// computations, a semaphore or some other synchronization primitive should be
+ /// used to limit the number of computation executed in parallel. Specialized
+ /// CPU-bound executors, such as [rayon], may also be a good fit.
+ ///
+ /// This function is intended for non-async operations that eventually finish on
+ /// their own. If you want to spawn an ordinary thread, you should use
+ /// [`thread::spawn`] instead.
+ ///
+ /// Closures spawned using `spawn_blocking` cannot be cancelled abruptly; there
+ /// is no standard low level API to cause a thread to stop running. However,
+ /// a useful pattern is to pass some form of "cancellation token" into
+ /// the thread. This could be an [`AtomicBool`] that the task checks periodically.
+ /// Another approach is to have the thread primarily read or write from a channel,
+ /// and to exit when the channel closes; assuming the other side of the channel is dropped
+ /// when cancellation occurs, this will cause the blocking task thread to exit
+ /// soon after as well.
+ ///
+ /// When you shut down the executor, it will wait indefinitely for all blocking operations to
+ /// finish. You can use [`shutdown_timeout`] to stop waiting for them after a
+ /// certain timeout. Be aware that this will still not cancel the tasks — they
+ /// are simply allowed to keep running after the method returns. It is possible
+ /// for a blocking task to be cancelled if it has not yet started running, but this
+ /// is not guaranteed.
+ ///
+ /// Note that if you are using the single threaded runtime, this function will
+ /// still spawn additional threads for blocking operations. The current-thread
+ /// scheduler's single thread is only used for asynchronous code.
+ ///
+ /// # Related APIs and patterns for bridging asynchronous and blocking code
+ ///
+ /// In simple cases, it is sufficient to have the closure accept input
+ /// parameters at creation time and return a single value (or struct/tuple, etc.).
+ ///
+ /// For more complex situations in which it is desirable to stream data to or from
+ /// the synchronous context, the [`mpsc channel`] has `blocking_send` and
+ /// `blocking_recv` methods for use in non-async code such as the thread created
+ /// by `spawn_blocking`.
+ ///
+ /// Another option is [`SyncIoBridge`] for cases where the synchronous context
+ /// is operating on byte streams. For example, you might use an asynchronous
+ /// HTTP client such as [hyper] to fetch data, but perform complex parsing
+ /// of the payload body using a library written for synchronous I/O.
+ ///
+ /// Finally, see also [Bridging with sync code][bridgesync] for discussions
+ /// around the opposite case of using Tokio as part of a larger synchronous
+ /// codebase.
+ ///
+ /// [`Builder`]: struct@crate::runtime::Builder
+ /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
+ /// [rayon]: https://docs.rs/rayon
+ /// [`mpsc channel`]: crate::sync::mpsc
+ /// [`SyncIoBridge`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.SyncIoBridge.html
+ /// [hyper]: https://docs.rs/hyper
+ /// [`thread::spawn`]: fn@std::thread::spawn
+ /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ /// [bridgesync]: https://tokio.rs/tokio/topics/bridging
+ /// [`AtomicBool`]: struct@std::sync::atomic::AtomicBool
+ ///
+ /// # Examples
+ ///
+ /// Pass an input value and receive result of computation:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+ /// // Initial input
+ /// let mut v = "Hello, ".to_string();
+ /// let res = task::spawn_blocking(move || {
+ /// // Stand-in for compute-heavy work or using synchronous APIs
+ /// v.push_str("world");
+ /// // Pass ownership of the value back to the asynchronous context
+ /// v
+ /// }).await?;
+ ///
+ /// // `res` is the value returned from the thread
+ /// assert_eq!(res.as_str(), "Hello, world");
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// Use a channel:
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::sync::mpsc;
+ ///
+ /// # async fn docs() {
+ /// let (tx, mut rx) = mpsc::channel(2);
+ /// let start = 5;
+ /// let worker = task::spawn_blocking(move || {
+ /// for x in 0..10 {
+ /// // Stand in for complex computation
+ /// tx.blocking_send(start + x).unwrap();
+ /// }
+ /// });
+ ///
+ /// let mut acc = 0;
+ /// while let Some(v) = rx.recv().await {
+ /// acc += v;
+ /// }
+ /// assert_eq!(acc, 95);
+ /// worker.await.unwrap();
+ /// # }
+ /// ```
+ #[track_caller]
+ pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ crate::runtime::spawn_blocking(f)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/builder.rs b/third_party/rust/tokio/src/task/builder.rs
new file mode 100644
index 0000000000..306cc39d1e
--- /dev/null
+++ b/third_party/rust/tokio/src/task/builder.rs
@@ -0,0 +1,201 @@
+#![allow(unreachable_pub)]
+use crate::{
+ runtime::Handle,
+ task::{JoinHandle, LocalSet},
+};
+use std::{future::Future, io};
+
+/// Factory which is used to configure the properties of a new task.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// Methods can be chained in order to configure it.
+///
+/// Currently, there is only one configuration option:
+///
+/// - [`name`], which specifies an associated name for
+/// the task
+///
+/// There are three types of task that can be spawned from a Builder:
+/// - [`spawn_local`] for executing futures on the current thread
+/// - [`spawn`] for executing [`Send`] futures on the runtime
+/// - [`spawn_blocking`] for executing blocking code in the
+/// blocking thread pool.
+///
+/// ## Example
+///
+/// ```no_run
+/// use tokio::net::{TcpListener, TcpStream};
+///
+/// use std::io;
+///
+/// async fn process(socket: TcpStream) {
+/// // ...
+/// # drop(socket);
+/// }
+///
+/// #[tokio::main]
+/// async fn main() -> io::Result<()> {
+/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
+///
+/// loop {
+/// let (socket, _) = listener.accept().await?;
+///
+/// tokio::task::Builder::new()
+/// .name("tcp connection handler")
+/// .spawn(async move {
+/// // Process each socket concurrently.
+/// process(socket).await
+/// })?;
+/// }
+/// }
+/// ```
+/// [unstable]: crate#unstable-features
+/// [`name`]: Builder::name
+/// [`spawn_local`]: Builder::spawn_local
+/// [`spawn`]: Builder::spawn
+/// [`spawn_blocking`]: Builder::spawn_blocking
+#[derive(Default, Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+pub struct Builder<'a> {
+ name: Option<&'a str>,
+}
+
+impl<'a> Builder<'a> {
+ /// Creates a new task builder.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Assigns a name to the task which will be spawned.
+ pub fn name(&self, name: &'a str) -> Self {
+ Self { name: Some(name) }
+ }
+
+ /// Spawns a task with this builder's settings on the current runtime.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// See [`task::spawn`](crate::task::spawn()) for
+ /// more details.
+ #[track_caller]
+ pub fn spawn<Fut>(self, future: Fut) -> io::Result<JoinHandle<Fut::Output>>
+ where
+ Fut: Future + Send + 'static,
+ Fut::Output: Send + 'static,
+ {
+ Ok(super::spawn::spawn_inner(future, self.name))
+ }
+
+ /// Spawn a task with this builder's settings on the provided [runtime
+ /// handle].
+ ///
+ /// See [`Handle::spawn`] for more details.
+ ///
+ /// [runtime handle]: crate::runtime::Handle
+ /// [`Handle::spawn`]: crate::runtime::Handle::spawn
+ #[track_caller]
+ pub fn spawn_on<Fut>(self, future: Fut, handle: &Handle) -> io::Result<JoinHandle<Fut::Output>>
+ where
+ Fut: Future + Send + 'static,
+ Fut::Output: Send + 'static,
+ {
+ Ok(handle.spawn_named(future, self.name))
+ }
+
+ /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
+ /// settings.
+ ///
+ /// The spawned future will be run on the same thread that called `spawn_local`.
+ /// This may only be called from the context of a [local task set][`LocalSet`].
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called outside of a [local task set][`LocalSet`].
+ ///
+ /// See [`task::spawn_local`] for more details.
+ ///
+ /// [`task::spawn_local`]: crate::task::spawn_local
+ /// [`LocalSet`]: crate::task::LocalSet
+ #[track_caller]
+ pub fn spawn_local<Fut>(self, future: Fut) -> io::Result<JoinHandle<Fut::Output>>
+ where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+ {
+ Ok(super::local::spawn_local_inner(future, self.name))
+ }
+
+ /// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's
+ /// settings.
+ ///
+ /// See [`LocalSet::spawn_local`] for more details.
+ ///
+ /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local
+ /// [`LocalSet`]: crate::task::LocalSet
+ #[track_caller]
+ pub fn spawn_local_on<Fut>(
+ self,
+ future: Fut,
+ local_set: &LocalSet,
+ ) -> io::Result<JoinHandle<Fut::Output>>
+ where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+ {
+ Ok(local_set.spawn_named(future, self.name))
+ }
+
+ /// Spawns blocking code on the blocking threadpool.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// See [`task::spawn_blocking`](crate::task::spawn_blocking)
+ /// for more details.
+ #[track_caller]
+ pub fn spawn_blocking<Function, Output>(
+ self,
+ function: Function,
+ ) -> io::Result<JoinHandle<Output>>
+ where
+ Function: FnOnce() -> Output + Send + 'static,
+ Output: Send + 'static,
+ {
+ let handle = Handle::current();
+ self.spawn_blocking_on(function, &handle)
+ }
+
+ /// Spawns blocking code on the provided [runtime handle]'s blocking threadpool.
+ ///
+ /// See [`Handle::spawn_blocking`] for more details.
+ ///
+ /// [runtime handle]: crate::runtime::Handle
+ /// [`Handle::spawn_blocking`]: crate::runtime::Handle::spawn_blocking
+ #[track_caller]
+ pub fn spawn_blocking_on<Function, Output>(
+ self,
+ function: Function,
+ handle: &Handle,
+ ) -> io::Result<JoinHandle<Output>>
+ where
+ Function: FnOnce() -> Output + Send + 'static,
+ Output: Send + 'static,
+ {
+ use crate::runtime::Mandatory;
+ let (join_handle, spawn_result) = handle.inner.blocking_spawner().spawn_blocking_inner(
+ function,
+ Mandatory::NonMandatory,
+ self.name,
+ handle,
+ );
+
+ spawn_result?;
+ Ok(join_handle)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/consume_budget.rs b/third_party/rust/tokio/src/task/consume_budget.rs
new file mode 100644
index 0000000000..e7432ffe7d
--- /dev/null
+++ b/third_party/rust/tokio/src/task/consume_budget.rs
@@ -0,0 +1,46 @@
+use std::task::Poll;
+
+/// Consumes a unit of budget and returns the execution back to the Tokio
+/// runtime *if* the task's coop budget was exhausted.
+///
+/// The task will only yield if its entire coop budget has been exhausted.
+/// This function can be used in order to insert optional yield points into long
+/// computations that do not use Tokio resources like sockets or semaphores,
+/// without redundantly yielding to the runtime each time.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// # Examples
+///
+/// Make sure that a function which returns a sum of (potentially lots of)
+/// iterated values is cooperative.
+///
+/// ```
+/// async fn sum_iterator(input: &mut impl std::iter::Iterator<Item=i64>) -> i64 {
+/// let mut sum: i64 = 0;
+/// while let Some(i) = input.next() {
+/// sum += i;
+/// tokio::task::consume_budget().await
+/// }
+/// sum
+/// }
+/// ```
+/// [unstable]: crate#unstable-features
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))]
+pub async fn consume_budget() {
+ let mut status = Poll::Pending;
+
+ crate::future::poll_fn(move |cx| {
+ ready!(crate::trace::trace_leaf(cx));
+ if status.is_ready() {
+ return status;
+ }
+ status = crate::runtime::coop::poll_proceed(cx).map(|restore| {
+ restore.made_progress();
+ });
+ status
+ })
+ .await
+}
diff --git a/third_party/rust/tokio/src/task/join_set.rs b/third_party/rust/tokio/src/task/join_set.rs
new file mode 100644
index 0000000000..4eb15a24d5
--- /dev/null
+++ b/third_party/rust/tokio/src/task/join_set.rs
@@ -0,0 +1,584 @@
+//! A collection of tasks spawned on a Tokio runtime.
+//!
+//! This module provides the [`JoinSet`] type, a collection which stores a set
+//! of spawned tasks and allows asynchronously awaiting the output of those
+//! tasks as they complete. See the documentation for the [`JoinSet`] type for
+//! details.
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use crate::runtime::Handle;
+#[cfg(tokio_unstable)]
+use crate::task::Id;
+use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
+use crate::util::IdleNotifiedSet;
+
+/// A collection of tasks spawned on a Tokio runtime.
+///
+/// A `JoinSet` can be used to await the completion of some or all of the tasks
+/// in the set. The set is not ordered, and the tasks will be returned in the
+/// order they complete.
+///
+/// All of the tasks must have the same return type `T`.
+///
+/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
+///
+/// # Examples
+///
+/// Spawn multiple tasks and wait for them.
+///
+/// ```
+/// use tokio::task::JoinSet;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut set = JoinSet::new();
+///
+/// for i in 0..10 {
+/// set.spawn(async move { i });
+/// }
+///
+/// let mut seen = [false; 10];
+/// while let Some(res) = set.join_next().await {
+/// let idx = res.unwrap();
+/// seen[idx] = true;
+/// }
+///
+/// for i in 0..10 {
+/// assert!(seen[i]);
+/// }
+/// }
+/// ```
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub struct JoinSet<T> {
+ inner: IdleNotifiedSet<JoinHandle<T>>,
+}
+
+/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
+/// than on the current default runtime.
+///
+/// [`task::Builder`]: crate::task::Builder
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+#[must_use = "builders do nothing unless used to spawn a task"]
+pub struct Builder<'a, T> {
+ joinset: &'a mut JoinSet<T>,
+ builder: super::Builder<'a>,
+}
+
+impl<T> JoinSet<T> {
+ /// Create a new `JoinSet`.
+ pub fn new() -> Self {
+ Self {
+ inner: IdleNotifiedSet::new(),
+ }
+ }
+
+ /// Returns the number of tasks currently in the `JoinSet`.
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// Returns whether the `JoinSet` is empty.
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+}
+
+impl<T: 'static> JoinSet<T> {
+ /// Returns a [`Builder`] that can be used to configure a task prior to
+ /// spawning it on this `JoinSet`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::task::JoinSet;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> std::io::Result<()> {
+ /// let mut set = JoinSet::new();
+ ///
+ /// // Use the builder to configure a task's name before spawning it.
+ /// set.build_task()
+ /// .name("my_task")
+ /// .spawn(async { /* ... */ })?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+ pub fn build_task(&mut self) -> Builder<'_, T> {
+ Builder {
+ builder: super::Builder::new(),
+ joinset: self,
+ }
+ }
+
+ /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
+ /// that can be used to remotely cancel the task.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn<F>(&mut self, task: F) -> AbortHandle
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(crate::spawn(task))
+ }
+
+ /// Spawn the provided task on the provided runtime and store it in this
+ /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
+ /// cancel the task.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(handle.spawn(task))
+ }
+
+ /// Spawn the provided task on the current [`LocalSet`] and store it in this
+ /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
+ /// cancel the task.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if it is called outside of a `LocalSet`.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ self.insert(crate::task::spawn_local(task))
+ }
+
+ /// Spawn the provided task on the provided [`LocalSet`] and store it in
+ /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
+ /// remotely cancel the task.
+ ///
+ /// Unlike the [`spawn_local`] method, this method may be used to spawn local
+ /// tasks on a `LocalSet` that is _not_ currently running. The provided
+ /// future will start running whenever the `LocalSet` is next started.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ /// [`spawn_local`]: Self::spawn_local
+ #[track_caller]
+ pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ self.insert(local_set.spawn_local(task))
+ }
+
+ /// Spawn the blocking code on the blocking threadpool and store
+ /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
+ /// used to remotely cancel the task.
+ ///
+ /// # Examples
+ ///
+ /// Spawn multiple blocking tasks and wait for them.
+ ///
+ /// ```
+ /// use tokio::task::JoinSet;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut set = JoinSet::new();
+ ///
+ /// for i in 0..10 {
+ /// set.spawn_blocking(move || { i });
+ /// }
+ ///
+ /// let mut seen = [false; 10];
+ /// while let Some(res) = set.join_next().await {
+ /// let idx = res.unwrap();
+ /// seen[idx] = true;
+ /// }
+ ///
+ /// for i in 0..10 {
+ /// assert!(seen[i]);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
+ where
+ F: FnOnce() -> T,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(crate::runtime::spawn_blocking(f))
+ }
+
+ /// Spawn the blocking code on the blocking threadpool of the
+ /// provided runtime and store it in this `JoinSet`, returning an
+ /// [`AbortHandle`] that can be used to remotely cancel the task.
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
+ where
+ F: FnOnce() -> T,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(handle.spawn_blocking(f))
+ }
+
+ fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
+ let abort = jh.abort_handle();
+ let mut entry = self.inner.insert_idle(jh);
+
+ // Set the waker that is notified when the task completes.
+ entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
+ abort
+ }
+
+ /// Waits until one of the tasks in the set completes and returns its output.
+ ///
+ /// Returns `None` if the set is empty.
+ ///
+ /// # Cancel Safety
+ ///
+ /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
+ /// statement and some other branch completes first, it is guaranteed that no tasks were
+ /// removed from this `JoinSet`.
+ pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
+ crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
+ }
+
+ /// Waits until one of the tasks in the set completes and returns its
+ /// output, along with the [task ID] of the completed task.
+ ///
+ /// Returns `None` if the set is empty.
+ ///
+ /// When this method returns an error, then the id of the task that failed can be accessed
+ /// using the [`JoinError::id`] method.
+ ///
+ /// # Cancel Safety
+ ///
+ /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
+ /// statement and some other branch completes first, it is guaranteed that no tasks were
+ /// removed from this `JoinSet`.
+ ///
+ /// [task ID]: crate::task::Id
+ /// [`JoinError::id`]: fn@crate::task::JoinError::id
+ #[cfg(tokio_unstable)]
+ #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
+ pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
+ crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
+ }
+
+ /// Aborts all tasks and waits for them to finish shutting down.
+ ///
+ /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
+ /// a loop until it returns `None`.
+ ///
+ /// This method ignores any panics in the tasks shutting down. When this call returns, the
+ /// `JoinSet` will be empty.
+ ///
+ /// [`abort_all`]: fn@Self::abort_all
+ /// [`join_next`]: fn@Self::join_next
+ pub async fn shutdown(&mut self) {
+ self.abort_all();
+ while self.join_next().await.is_some() {}
+ }
+
+ /// Aborts all tasks on this `JoinSet`.
+ ///
+ /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
+ /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
+ pub fn abort_all(&mut self) {
+ self.inner.for_each(|jh| jh.abort());
+ }
+
+ /// Removes all tasks from this `JoinSet` without aborting them.
+ ///
+ /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
+ /// is dropped.
+ pub fn detach_all(&mut self) {
+ self.inner.drain(drop);
+ }
+
+ /// Polls for one of the tasks in the set to complete.
+ ///
+ /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
+ /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
+ /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup.
+ ///
+ /// # Returns
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
+ /// available right now.
+ /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
+ /// The `value` is the return value of one of the tasks that completed.
+ /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
+ /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
+ /// * `Poll::Ready(None)` if the `JoinSet` is empty.
+ ///
+ /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
+ /// This can happen if the [coop budget] is reached.
+ ///
+ /// [coop budget]: crate::task#cooperative-scheduling
+ pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
+ // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
+ // the `notified` list if the waker is notified in the `poll` call below.
+ let mut entry = match self.inner.pop_notified(cx.waker()) {
+ Some(entry) => entry,
+ None => {
+ if self.is_empty() {
+ return Poll::Ready(None);
+ } else {
+ // The waker was set by `pop_notified`.
+ return Poll::Pending;
+ }
+ }
+ };
+
+ let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
+
+ if let Poll::Ready(res) = res {
+ let _entry = entry.remove();
+ Poll::Ready(Some(res))
+ } else {
+ // A JoinHandle generally won't emit a wakeup without being ready unless
+ // the coop limit has been reached. We yield to the executor in this
+ // case.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ /// Polls for one of the tasks in the set to complete.
+ ///
+ /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
+ /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
+ /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup.
+ ///
+ /// # Returns
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
+ /// available right now.
+ /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
+ /// The `value` is the return value of one of the tasks that completed, and
+ /// `id` is the [task ID] of that task.
+ /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
+ /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
+ /// * `Poll::Ready(None)` if the `JoinSet` is empty.
+ ///
+ /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
+ /// This can happen if the [coop budget] is reached.
+ ///
+ /// [coop budget]: crate::task#cooperative-scheduling
+ /// [task ID]: crate::task::Id
+ #[cfg(tokio_unstable)]
+ #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
+ pub fn poll_join_next_with_id(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<(Id, T), JoinError>>> {
+ // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
+ // the `notified` list if the waker is notified in the `poll` call below.
+ let mut entry = match self.inner.pop_notified(cx.waker()) {
+ Some(entry) => entry,
+ None => {
+ if self.is_empty() {
+ return Poll::Ready(None);
+ } else {
+ // The waker was set by `pop_notified`.
+ return Poll::Pending;
+ }
+ }
+ };
+
+ let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
+
+ if let Poll::Ready(res) = res {
+ let entry = entry.remove();
+ // If the task succeeded, add the task ID to the output. Otherwise, the
+ // `JoinError` will already have the task's ID.
+ Poll::Ready(Some(res.map(|output| (entry.id(), output))))
+ } else {
+ // A JoinHandle generally won't emit a wakeup without being ready unless
+ // the coop limit has been reached. We yield to the executor in this
+ // case.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+}
+
+impl<T> Drop for JoinSet<T> {
+ fn drop(&mut self) {
+ self.inner.drain(|join_handle| join_handle.abort());
+ }
+}
+
+impl<T> fmt::Debug for JoinSet<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JoinSet").field("len", &self.len()).finish()
+ }
+}
+
+impl<T> Default for JoinSet<T> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// === impl Builder ===
+
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+impl<'a, T: 'static> Builder<'a, T> {
+ /// Assigns a name to the task which will be spawned.
+ pub fn name(self, name: &'a str) -> Self {
+ let builder = self.builder.name(name);
+ Self { builder, ..self }
+ }
+
+ /// Spawn the provided task with this builder's settings and store it in the
+ /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
+ /// cancel the task.
+ ///
+ /// # Returns
+ ///
+ /// An [`AbortHandle`] that can be used to remotely cancel the task.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ Ok(self.joinset.insert(self.builder.spawn(future)?))
+ }
+
+ /// Spawn the provided task on the provided [runtime handle] with this
+ /// builder's settings, and store it in the [`JoinSet`].
+ ///
+ /// # Returns
+ ///
+ /// An [`AbortHandle`] that can be used to remotely cancel the task.
+ ///
+ ///
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ /// [runtime handle]: crate::runtime::Handle
+ #[track_caller]
+ pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
+ }
+
+ /// Spawn the provided task on the current [`LocalSet`] with this builder's
+ /// settings, and store it in the [`JoinSet`].
+ ///
+ /// # Returns
+ ///
+ /// An [`AbortHandle`] that can be used to remotely cancel the task.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if it is called outside of a `LocalSet`.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ Ok(self.joinset.insert(self.builder.spawn_local(future)?))
+ }
+
+ /// Spawn the provided task on the provided [`LocalSet`] with this builder's
+ /// settings, and store it in the [`JoinSet`].
+ ///
+ /// # Returns
+ ///
+ /// An [`AbortHandle`] that can be used to remotely cancel the task.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ /// [`AbortHandle`]: crate::task::AbortHandle
+ #[track_caller]
+ pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ Ok(self
+ .joinset
+ .insert(self.builder.spawn_local_on(future, local_set)?))
+ }
+}
+
+// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
+// `Debug`.
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+impl<'a, T> fmt::Debug for Builder<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("join_set::Builder")
+ .field("joinset", &self.joinset)
+ .field("builder", &self.builder)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/task/local.rs b/third_party/rust/tokio/src/task/local.rs
new file mode 100644
index 0000000000..734b95587b
--- /dev/null
+++ b/third_party/rust/tokio/src/task/local.rs
@@ -0,0 +1,1175 @@
+//! Runs `!Send` futures on the current thread.
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::{Arc, Mutex};
+use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
+use crate::runtime::{context, ThreadId};
+use crate::sync::AtomicWaker;
+use crate::util::RcCell;
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Poll;
+
+use pin_project_lite::pin_project;
+
+cfg_rt! {
+ /// A set of tasks which are executed on the same thread.
+ ///
+ /// In some cases, it is necessary to run one or more futures that do not
+ /// implement [`Send`] and thus are unsafe to send between threads. In these
+ /// cases, a [local task set] may be used to schedule one or more `!Send`
+ /// futures to run together on the same thread.
+ ///
+ /// For example, the following code will not compile:
+ ///
+ /// ```rust,compile_fail
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // `Rc` does not implement `Send`, and thus may not be sent between
+ /// // threads safely.
+ /// let nonsend_data = Rc::new("my nonsend data...");
+ ///
+ /// let nonsend_data = nonsend_data.clone();
+ /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
+ /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
+ /// // will not compile.
+ /// tokio::spawn(async move {
+ /// println!("{}", nonsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// # Use with `run_until`
+ ///
+ /// To spawn `!Send` futures, we can use a local task set to schedule them
+ /// on the thread calling [`Runtime::block_on`]. When running inside of the
+ /// local task set, we can use [`task::spawn_local`], which can spawn
+ /// `!Send` futures. For example:
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let nonsend_data = Rc::new("my nonsend data...");
+ ///
+ /// // Construct a local task set that can run `!Send` futures.
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let nonsend_data = nonsend_data.clone();
+ /// // `spawn_local` ensures that the future is spawned on the local
+ /// // task set.
+ /// task::spawn_local(async move {
+ /// println!("{}", nonsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
+ /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
+ /// cannot be used inside a task spawned with `tokio::spawn`.
+ ///
+ /// ## Awaiting a `LocalSet`
+ ///
+ /// Additionally, a `LocalSet` itself implements `Future`, completing when
+ /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
+ /// several futures on a `LocalSet` and drive the whole set until they
+ /// complete. For example,
+ ///
+ /// ```rust
+ /// use tokio::{task, time};
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let nonsend_data = Rc::new("world");
+ /// let local = task::LocalSet::new();
+ ///
+ /// let nonsend_data2 = nonsend_data.clone();
+ /// local.spawn_local(async move {
+ /// // ...
+ /// println!("hello {}", nonsend_data2)
+ /// });
+ ///
+ /// local.spawn_local(async move {
+ /// time::sleep(time::Duration::from_millis(100)).await;
+ /// println!("goodbye {}", nonsend_data)
+ /// });
+ ///
+ /// // ...
+ ///
+ /// local.await;
+ /// }
+ /// ```
+ /// **Note:** Awaiting a `LocalSet` can only be done inside
+ /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
+ /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
+ /// `tokio::spawn`.
+ ///
+ /// ## Use inside `tokio::spawn`
+ ///
+ /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
+ /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
+ /// something else. The solution is to create the `LocalSet` somewhere else,
+ /// and communicate with it using an [`mpsc`] channel.
+ ///
+ /// The following example puts the `LocalSet` inside a new thread.
+ /// ```
+ /// use tokio::runtime::Builder;
+ /// use tokio::sync::{mpsc, oneshot};
+ /// use tokio::task::LocalSet;
+ ///
+ /// // This struct describes the task you want to spawn. Here we include
+ /// // some simple examples. The oneshot channel allows sending a response
+ /// // to the spawner.
+ /// #[derive(Debug)]
+ /// enum Task {
+ /// PrintNumber(u32),
+ /// AddOne(u32, oneshot::Sender<u32>),
+ /// }
+ ///
+ /// #[derive(Clone)]
+ /// struct LocalSpawner {
+ /// send: mpsc::UnboundedSender<Task>,
+ /// }
+ ///
+ /// impl LocalSpawner {
+ /// pub fn new() -> Self {
+ /// let (send, mut recv) = mpsc::unbounded_channel();
+ ///
+ /// let rt = Builder::new_current_thread()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// std::thread::spawn(move || {
+ /// let local = LocalSet::new();
+ ///
+ /// local.spawn_local(async move {
+ /// while let Some(new_task) = recv.recv().await {
+ /// tokio::task::spawn_local(run_task(new_task));
+ /// }
+ /// // If the while loop returns, then all the LocalSpawner
+ /// // objects have been dropped.
+ /// });
+ ///
+ /// // This will return once all senders are dropped and all
+ /// // spawned tasks have returned.
+ /// rt.block_on(local);
+ /// });
+ ///
+ /// Self {
+ /// send,
+ /// }
+ /// }
+ ///
+ /// pub fn spawn(&self, task: Task) {
+ /// self.send.send(task).expect("Thread with LocalSet has shut down.");
+ /// }
+ /// }
+ ///
+ /// // This task may do !Send stuff. We use printing a number as an example,
+ /// // but it could be anything.
+ /// //
+ /// // The Task struct is an enum to support spawning many different kinds
+ /// // of operations.
+ /// async fn run_task(task: Task) {
+ /// match task {
+ /// Task::PrintNumber(n) => {
+ /// println!("{}", n);
+ /// },
+ /// Task::AddOne(n, response) => {
+ /// // We ignore failures to send the response.
+ /// let _ = response.send(n + 1);
+ /// },
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let spawner = LocalSpawner::new();
+ ///
+ /// let (send, response) = oneshot::channel();
+ /// spawner.spawn(Task::AddOne(10, send));
+ /// let eleven = response.await.unwrap();
+ /// assert_eq!(eleven, 11);
+ /// }
+ /// ```
+ ///
+ /// [`Send`]: trait@std::marker::Send
+ /// [local task set]: struct@LocalSet
+ /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
+ /// [`task::spawn_local`]: fn@spawn_local
+ /// [`mpsc`]: mod@crate::sync::mpsc
+ pub struct LocalSet {
+ /// Current scheduler tick.
+ tick: Cell<u8>,
+
+ /// State available from thread-local.
+ context: Rc<Context>,
+
+ /// This type should not be Send.
+ _not_send: PhantomData<*const ()>,
+ }
+}
+
+/// State available from the thread-local.
+struct Context {
+ /// State shared between threads.
+ shared: Arc<Shared>,
+
+ /// True if a task panicked without being handled and the local set is
+ /// configured to shutdown on unhandled panic.
+ unhandled_panic: Cell<bool>,
+}
+
+/// LocalSet state shared between threads.
+struct Shared {
+ /// # Safety
+ ///
+ /// This field must *only* be accessed from the thread that owns the
+ /// `LocalSet` (i.e., `Thread::current().id() == owner`).
+ local_state: LocalState,
+
+ /// Remote run queue sender.
+ queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
+
+ /// Wake the `LocalSet` task.
+ waker: AtomicWaker,
+
+ /// How to respond to unhandled task panics.
+ #[cfg(tokio_unstable)]
+ pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
+}
+
+/// Tracks the `LocalSet` state that must only be accessed from the thread that
+/// created the `LocalSet`.
+struct LocalState {
+ /// The `ThreadId` of the thread that owns the `LocalSet`.
+ owner: ThreadId,
+
+ /// Local run queue sender and receiver.
+ local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
+
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LocalOwnedTasks<Arc<Shared>>,
+}
+
+pin_project! {
+ #[derive(Debug)]
+ struct RunUntil<'a, F> {
+ local_set: &'a LocalSet,
+ #[pin]
+ future: F,
+ }
+}
+
+tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
+ ctx: RcCell::new(),
+} });
+
+struct LocalData {
+ ctx: RcCell<Context>,
+}
+
+cfg_rt! {
+ /// Spawns a `!Send` future on the current [`LocalSet`].
+ ///
+ /// The spawned future will run on the same thread that called `spawn_local`.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when `spawn_local` is called, even if you don't await the returned
+ /// `JoinHandle`.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called outside of a [`LocalSet`].
+ ///
+ /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
+ /// resulting new task will _not_ be inside the `LocalSet`, so you must use
+ /// `spawn_local` if you want to stay within the `LocalSet`.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let nonsend_data = Rc::new("my nonsend data...");
+ ///
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let nonsend_data = nonsend_data.clone();
+ /// task::spawn_local(async move {
+ /// println!("{}", nonsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ ///
+ /// [`LocalSet`]: struct@crate::task::LocalSet
+ /// [`tokio::spawn`]: fn@crate::task::spawn
+ #[track_caller]
+ pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ spawn_local_inner(future, None)
+ }
+
+
+ #[track_caller]
+ pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
+ where F: Future + 'static,
+ F::Output: 'static
+ {
+ match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
+ None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
+ Some(cx) => cx.spawn(future, name)
+ }
+ }
+}
+
+/// Initial queue capacity.
+const INITIAL_CAPACITY: usize = 64;
+
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often it check the remote queue first.
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+/// Context guard for LocalSet
+pub struct LocalEnterGuard(Option<Rc<Context>>);
+
+impl Drop for LocalEnterGuard {
+ fn drop(&mut self) {
+ CURRENT.with(|LocalData { ctx, .. }| {
+ ctx.set(self.0.take());
+ })
+ }
+}
+
+impl fmt::Debug for LocalEnterGuard {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("LocalEnterGuard").finish()
+ }
+}
+
+impl LocalSet {
+ /// Returns a new local task set.
+ pub fn new() -> LocalSet {
+ let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
+
+ LocalSet {
+ tick: Cell::new(0),
+ context: Rc::new(Context {
+ shared: Arc::new(Shared {
+ local_state: LocalState {
+ owner,
+ owned: LocalOwnedTasks::new(),
+ local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ },
+ queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
+ waker: AtomicWaker::new(),
+ #[cfg(tokio_unstable)]
+ unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
+ }),
+ unhandled_panic: Cell::new(false),
+ }),
+ _not_send: PhantomData,
+ }
+ }
+
+ /// Enters the context of this `LocalSet`.
+ ///
+ /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
+ /// context you are inside.
+ ///
+ /// [`spawn_local`]: fn@crate::task::spawn_local
+ pub fn enter(&self) -> LocalEnterGuard {
+ CURRENT.with(|LocalData { ctx, .. }| {
+ let old = ctx.replace(Some(self.context.clone()));
+ LocalEnterGuard(old)
+ })
+ }
+
+ /// Spawns a `!Send` task onto the local task set.
+ ///
+ /// This task is guaranteed to be run on the current thread.
+ ///
+ /// Unlike the free function [`spawn_local`], this method may be used to
+ /// spawn local tasks when the `LocalSet` is _not_ running. The provided
+ /// future will start running once the `LocalSet` is next started, even if
+ /// you don't await the returned `JoinHandle`.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Spawn a future on the local set. This future will be run when
+ /// // we call `run_until` to drive the task set.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ ///
+ /// // When `run` finishes, we can spawn _more_ futures, which will
+ /// // run in subsequent calls to `run_until`.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ /// [`spawn_local`]: fn@spawn_local
+ #[track_caller]
+ pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ self.spawn_named(future, None)
+ }
+
+ /// Runs a future to completion on the provided runtime, driving any local
+ /// futures spawned on this task set on the current thread.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers which
+ /// the future spawns internally will be executed on the runtime. The future
+ /// may also call [`spawn_local`] to spawn_local additional local futures on the
+ /// current thread.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ ///
+ /// # Notes
+ ///
+ /// Since this function internally calls [`Runtime::block_on`], and drives
+ /// futures in the local task set inside that call to `block_on`, the local
+ /// futures may not use [in-place blocking]. If a blocking call needs to be
+ /// issued from a local task, the [`spawn_blocking`] API may be used instead.
+ ///
+ /// For example, this will panic:
+ /// ```should_panic
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::block_in_place(|| {
+ /// // ...
+ /// });
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ /// This, however, will not panic:
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::spawn_blocking(|| {
+ /// // ...
+ /// }).await;
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
+ /// [in-place blocking]: fn@crate::task::block_in_place
+ /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
+ #[track_caller]
+ #[cfg(feature = "rt")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+ pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ rt.block_on(self.run_until(future))
+ }
+
+ /// Runs a future to completion on the local set, returning its output.
+ ///
+ /// This returns a future that runs the given future with a local set,
+ /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
+ /// Any local futures spawned on the local set will be driven in the
+ /// background until the future passed to `run_until` completes. When the future
+ /// passed to `run` finishes, any local futures which have not completed
+ /// will remain on the local set, and will be driven on subsequent calls to
+ /// `run_until` or when [awaiting the local set] itself.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// task::LocalSet::new().run_until(async {
+ /// task::spawn_local(async move {
+ /// // ...
+ /// }).await.unwrap();
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [awaiting the local set]: #awaiting-a-localset
+ pub async fn run_until<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ let run_until = RunUntil {
+ future,
+ local_set: self,
+ };
+ run_until.await
+ }
+
+ pub(in crate::task) fn spawn_named<F>(
+ &self,
+ future: F,
+ name: Option<&str>,
+ ) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ let handle = self.context.spawn(future, name);
+
+ // Because a task was spawned from *outside* the `LocalSet`, wake the
+ // `LocalSet` future to execute the new task, if it hasn't been woken.
+ //
+ // Spawning via the free fn `spawn` does not require this, as it can
+ // only be called from *within* a future executing on the `LocalSet` —
+ // in that case, the `LocalSet` must already be awake.
+ self.context.shared.waker.wake();
+ handle
+ }
+
+ /// Ticks the scheduler, returning whether the local future needs to be
+ /// notified again.
+ fn tick(&self) -> bool {
+ for _ in 0..MAX_TASKS_PER_TICK {
+ // Make sure we didn't hit an unhandled panic
+ if self.context.unhandled_panic.get() {
+ panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
+ }
+
+ match self.next_task() {
+ // Run the task
+ //
+ // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
+ // used. We are responsible for maintaining the invariant that
+ // `run_unchecked` is only called on threads that spawned the
+ // task initially. Because `LocalSet` itself is `!Send`, and
+ // `spawn_local` spawns into the `LocalSet` on the current
+ // thread, the invariant is maintained.
+ Some(task) => crate::runtime::coop::budget(|| task.run()),
+ // We have fully drained the queue of notified tasks, so the
+ // local future doesn't need to be notified again — it can wait
+ // until something else wakes a task in the local set.
+ None => return false,
+ }
+ }
+
+ true
+ }
+
+ fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
+ let tick = self.tick.get();
+ self.tick.set(tick.wrapping_add(1));
+
+ let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
+ self.context
+ .shared
+ .queue
+ .lock()
+ .as_mut()
+ .and_then(|queue| queue.pop_front())
+ .or_else(|| self.pop_local())
+ } else {
+ self.pop_local().or_else(|| {
+ self.context
+ .shared
+ .queue
+ .lock()
+ .as_mut()
+ .and_then(|queue| queue.pop_front())
+ })
+ };
+
+ task.map(|task| unsafe {
+ // Safety: because the `LocalSet` itself is `!Send`, we know we are
+ // on the same thread if we have access to the `LocalSet`, and can
+ // therefore access the local run queue.
+ self.context.shared.local_state.assert_owner(task)
+ })
+ }
+
+ fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
+ unsafe {
+ // Safety: because the `LocalSet` itself is `!Send`, we know we are
+ // on the same thread if we have access to the `LocalSet`, and can
+ // therefore access the local run queue.
+ self.context.shared.local_state.task_pop_front()
+ }
+ }
+
+ fn with<T>(&self, f: impl FnOnce() -> T) -> T {
+ CURRENT.with(|LocalData { ctx, .. }| {
+ struct Reset<'a> {
+ ctx_ref: &'a RcCell<Context>,
+ val: Option<Rc<Context>>,
+ }
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.ctx_ref.set(self.val.take());
+ }
+ }
+ let old = ctx.replace(Some(self.context.clone()));
+
+ let _reset = Reset {
+ ctx_ref: ctx,
+ val: old,
+ };
+
+ f()
+ })
+ }
+
+ /// This method is like `with`, but it just calls `f` without setting the thread-local if that
+ /// fails.
+ fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
+ let mut f = Some(f);
+
+ let res = CURRENT.try_with(|LocalData { ctx, .. }| {
+ struct Reset<'a> {
+ ctx_ref: &'a RcCell<Context>,
+ val: Option<Rc<Context>>,
+ }
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.ctx_ref.replace(self.val.take());
+ }
+ }
+ let old = ctx.replace(Some(self.context.clone()));
+
+ let _reset = Reset {
+ ctx_ref: ctx,
+ val: old,
+ };
+
+ (f.take().unwrap())()
+ });
+
+ match res {
+ Ok(res) => res,
+ Err(_access_error) => (f.take().unwrap())(),
+ }
+ }
+}
+
+cfg_unstable! {
+ impl LocalSet {
+ /// Configure how the `LocalSet` responds to an unhandled panic on a
+ /// spawned task.
+ ///
+ /// By default, an unhandled panic (i.e. a panic not caught by
+ /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
+ /// execution. The panic is error value is forwarded to the task's
+ /// [`JoinHandle`] and all other spawned tasks continue running.
+ ///
+ /// The `unhandled_panic` option enables configuring this behavior.
+ ///
+ /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
+ /// spawned tasks have no impact on the `LocalSet`'s execution.
+ /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
+ /// shutdown immediately when a spawned task panics even if that
+ /// task's `JoinHandle` has not been dropped. All other spawned tasks
+ /// will immediately terminate and further calls to
+ /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called after the `LocalSet` has started
+ /// running.
+ ///
+ /// # Unstable
+ ///
+ /// This option is currently unstable and its implementation is
+ /// incomplete. The API may change or be removed in the future. See
+ /// tokio-rs/tokio#4516 for more details.
+ ///
+ /// # Examples
+ ///
+ /// The following demonstrates a `LocalSet` configured to shutdown on
+ /// panic. The first spawned task panics and results in the `LocalSet`
+ /// shutting down. The second spawned task never has a chance to
+ /// execute. The call to `run_until` will panic due to the runtime being
+ /// forcibly shutdown.
+ ///
+ /// ```should_panic
+ /// use tokio::runtime::UnhandledPanic;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// tokio::task::LocalSet::new()
+ /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ /// .run_until(async {
+ /// tokio::task::spawn_local(async { panic!("boom"); });
+ /// tokio::task::spawn_local(async {
+ /// // This task never completes
+ /// });
+ ///
+ /// // Do some work, but `run_until` will panic before it completes
+ /// # loop { tokio::task::yield_now().await; }
+ /// })
+ /// .await;
+ /// # }
+ /// ```
+ ///
+ /// [`JoinHandle`]: struct@crate::task::JoinHandle
+ pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
+ // TODO: This should be set as a builder
+ Rc::get_mut(&mut self.context)
+ .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
+ .expect("Unhandled Panic behavior modified after starting LocalSet")
+ .unhandled_panic = behavior;
+ self
+ }
+ }
+}
+
+impl fmt::Debug for LocalSet {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("LocalSet").finish()
+ }
+}
+
+impl Future for LocalSet {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ // Register the waker before starting to work
+ self.context.shared.waker.register_by_ref(cx.waker());
+
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+
+ // Safety: called from the thread that owns `LocalSet`. Because
+ // `LocalSet` is `!Send`, this is safe.
+ } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
+ }
+}
+
+impl Default for LocalSet {
+ fn default() -> LocalSet {
+ LocalSet::new()
+ }
+}
+
+impl Drop for LocalSet {
+ fn drop(&mut self) {
+ self.with_if_possible(|| {
+ // Shut down all tasks in the LocalOwnedTasks and close it to
+ // prevent new tasks from ever being added.
+ unsafe {
+ // Safety: called from the thread that owns `LocalSet`
+ self.context.shared.local_state.close_and_shutdown_all();
+ }
+
+ // We already called shutdown on all tasks above, so there is no
+ // need to call shutdown.
+
+ // Safety: note that this *intentionally* bypasses the unsafe
+ // `Shared::local_queue()` method. This is in order to avoid the
+ // debug assertion that we are on the thread that owns the
+ // `LocalSet`, because on some systems (e.g. at least some macOS
+ // versions), attempting to get the current thread ID can panic due
+ // to the thread's local data that stores the thread ID being
+ // dropped *before* the `LocalSet`.
+ //
+ // Despite avoiding the assertion here, it is safe for us to access
+ // the local queue in `Drop`, because the `LocalSet` itself is
+ // `!Send`, so we can reasonably guarantee that it will not be
+ // `Drop`ped from another thread.
+ let local_queue = unsafe {
+ // Safety: called from the thread that owns `LocalSet`
+ self.context.shared.local_state.take_local_queue()
+ };
+ for task in local_queue {
+ drop(task);
+ }
+
+ // Take the queue from the Shared object to prevent pushing
+ // notifications to it in the future.
+ let queue = self.context.shared.queue.lock().take().unwrap();
+ for task in queue {
+ drop(task);
+ }
+
+ // Safety: called from the thread that owns `LocalSet`
+ assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
+ });
+ }
+}
+
+// === impl Context ===
+
+impl Context {
+ #[track_caller]
+ fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ let id = crate::runtime::task::Id::next();
+ let future = crate::util::trace::task(future, "local", name, id.as_u64());
+
+ // Safety: called from the thread that owns the `LocalSet`
+ let (handle, notified) = {
+ self.shared.local_state.assert_called_from_owner_thread();
+ self.shared
+ .local_state
+ .owned
+ .bind(future, self.shared.clone(), id)
+ };
+
+ if let Some(notified) = notified {
+ self.shared.schedule(notified);
+ }
+
+ handle
+ }
+}
+
+// === impl LocalFuture ===
+
+impl<T: Future> Future for RunUntil<'_, T> {
+ type Output = T::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ me.local_set.with(|| {
+ me.local_set
+ .context
+ .shared
+ .waker
+ .register_by_ref(cx.waker());
+
+ let _no_blocking = crate::runtime::context::disallow_block_in_place();
+ let f = me.future;
+
+ if let Poll::Ready(output) = f.poll(cx) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ })
+ }
+}
+
+impl Shared {
+ /// Schedule the provided task on the scheduler.
+ fn schedule(&self, task: task::Notified<Arc<Self>>) {
+ CURRENT.with(|localdata| {
+ match localdata.ctx.get() {
+ Some(cx) if cx.shared.ptr_eq(self) => unsafe {
+ // Safety: if the current `LocalSet` context points to this
+ // `LocalSet`, then we are on the thread that owns it.
+ cx.shared.local_state.task_push_back(task);
+ },
+
+ // We are on the thread that owns the `LocalSet`, so we can
+ // wake to the local queue.
+ _ if context::thread_id().ok() == Some(self.local_state.owner) => {
+ unsafe {
+ // Safety: we just checked that the thread ID matches
+ // the localset's owner, so this is safe.
+ self.local_state.task_push_back(task);
+ }
+ // We still have to wake the `LocalSet`, because it isn't
+ // currently being polled.
+ self.waker.wake();
+ }
+
+ // We are *not* on the thread that owns the `LocalSet`, so we
+ // have to wake to the remote queue.
+ _ => {
+ // First, check whether the queue is still there (if not, the
+ // LocalSet is dropped). Then push to it if so, and if not,
+ // do nothing.
+ let mut lock = self.queue.lock();
+
+ if let Some(queue) = lock.as_mut() {
+ queue.push_back(task);
+ drop(lock);
+ self.waker.wake();
+ }
+ }
+ }
+ });
+ }
+
+ fn ptr_eq(&self, other: &Shared) -> bool {
+ std::ptr::eq(self, other)
+ }
+}
+
+// This is safe because (and only because) we *pinky pwomise* to never touch the
+// local run queue except from the thread that owns the `LocalSet`.
+unsafe impl Sync for Shared {}
+
+impl task::Schedule for Arc<Shared> {
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ // Safety, this is always called from the thread that owns `LocalSet`
+ unsafe { self.local_state.task_remove(task) }
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ Shared::schedule(self, task);
+ }
+
+ cfg_unstable! {
+ fn unhandled_panic(&self) {
+ use crate::runtime::UnhandledPanic;
+
+ match self.unhandled_panic {
+ UnhandledPanic::Ignore => {
+ // Do nothing
+ }
+ UnhandledPanic::ShutdownRuntime => {
+ // This hook is only called from within the runtime, so
+ // `CURRENT` should match with `&self`, i.e. there is no
+ // opportunity for a nested scheduler to be called.
+ CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
+ Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
+ cx.unhandled_panic.set(true);
+ // Safety: this is always called from the thread that owns `LocalSet`
+ unsafe { cx.shared.local_state.close_and_shutdown_all(); }
+ }
+ _ => unreachable!("runtime core not set in CURRENT thread-local"),
+ })
+ }
+ }
+ }
+ }
+}
+
+impl LocalState {
+ unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.local_queue.with_mut(|ptr| (*ptr).pop_front())
+ }
+
+ unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.local_queue.with_mut(|ptr| (*ptr).push_back(task))
+ }
+
+ unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
+ }
+
+ unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.owned.remove(task)
+ }
+
+ /// Returns true if the `LocalSet` does not have any spawned tasks
+ unsafe fn owned_is_empty(&self) -> bool {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.owned.is_empty()
+ }
+
+ unsafe fn assert_owner(
+ &self,
+ task: task::Notified<Arc<Shared>>,
+ ) -> task::LocalNotified<Arc<Shared>> {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.owned.assert_owner(task)
+ }
+
+ unsafe fn close_and_shutdown_all(&self) {
+ // The caller ensures it is called from the same thread that owns
+ // the LocalSet.
+ self.assert_called_from_owner_thread();
+
+ self.owned.close_and_shutdown_all()
+ }
+
+ #[track_caller]
+ fn assert_called_from_owner_thread(&self) {
+ // FreeBSD has some weirdness around thread-local destruction.
+ // TODO: remove this hack when thread id is cleaned up
+ #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
+ debug_assert!(
+ // if we couldn't get the thread ID because we're dropping the local
+ // data, skip the assertion --- the `Drop` impl is not going to be
+ // called from another thread, because `LocalSet` is `!Send`
+ context::thread_id()
+ .map(|id| id == self.owner)
+ .unwrap_or(true),
+ "`LocalSet`'s local run queue must not be accessed by another thread!"
+ );
+ }
+}
+
+// This is `Send` because it is stored in `Shared`. It is up to the caller to
+// ensure they are on the same thread that owns the `LocalSet`.
+unsafe impl Send for LocalState {}
+
+#[cfg(all(test, not(loom)))]
+mod tests {
+ use super::*;
+
+ // Does a `LocalSet` running on a current-thread runtime...basically work?
+ //
+ // This duplicates a test in `tests/task_local_set.rs`, but because this is
+ // a lib test, it wil run under Miri, so this is necessary to catch stacked
+ // borrows violations in the `LocalSet` implementation.
+ #[test]
+ fn local_current_thread_scheduler() {
+ let f = async {
+ LocalSet::new()
+ .run_until(async {
+ spawn_local(async {}).await.unwrap();
+ })
+ .await;
+ };
+ crate::runtime::Builder::new_current_thread()
+ .build()
+ .expect("rt")
+ .block_on(f)
+ }
+
+ // Tests that when a task on a `LocalSet` is woken by an io driver on the
+ // same thread, the task is woken to the localset's local queue rather than
+ // its remote queue.
+ //
+ // This test has to be defined in the `local.rs` file as a lib test, rather
+ // than in `tests/`, because it makes assertions about the local set's
+ // internal state.
+ #[test]
+ fn wakes_to_local_queue() {
+ use super::*;
+ use crate::sync::Notify;
+ let rt = crate::runtime::Builder::new_current_thread()
+ .build()
+ .expect("rt");
+ rt.block_on(async {
+ let local = LocalSet::new();
+ let notify = Arc::new(Notify::new());
+ let task = local.spawn_local({
+ let notify = notify.clone();
+ async move {
+ notify.notified().await;
+ }
+ });
+ let mut run_until = Box::pin(local.run_until(async move {
+ task.await.unwrap();
+ }));
+
+ // poll the run until future once
+ crate::future::poll_fn(|cx| {
+ let _ = run_until.as_mut().poll(cx);
+ Poll::Ready(())
+ })
+ .await;
+
+ notify.notify_one();
+ let task = unsafe { local.context.shared.local_state.task_pop_front() };
+ // TODO(eliza): it would be nice to be able to assert that this is
+ // the local task.
+ assert!(
+ task.is_some(),
+ "task should have been notified to the LocalSet's local queue"
+ );
+ })
+ }
+}
diff --git a/third_party/rust/tokio/src/task/mod.rs b/third_party/rust/tokio/src/task/mod.rs
new file mode 100644
index 0000000000..9b75370185
--- /dev/null
+++ b/third_party/rust/tokio/src/task/mod.rs
@@ -0,0 +1,333 @@
+//! Asynchronous green-threads.
+//!
+//! ## What are Tasks?
+//!
+//! A _task_ is a light weight, non-blocking unit of execution. A task is similar
+//! to an OS thread, but rather than being managed by the OS scheduler, they are
+//! managed by the [Tokio runtime][rt]. Another name for this general pattern is
+//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's
+//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as
+//! something similar.
+//!
+//! Key points about tasks include:
+//!
+//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio
+//! runtime rather than the operating system, creating new tasks or switching
+//! between tasks does not require a context switch and has fairly low
+//! overhead. Creating, running, and destroying large numbers of tasks is
+//! quite cheap, especially compared to OS threads.
+//!
+//! * Tasks are scheduled **cooperatively**. Most operating systems implement
+//! _preemptive multitasking_. This is a scheduling technique where the
+//! operating system allows each thread to run for a period of time, and then
+//! _preempts_ it, temporarily pausing that thread and switching to another.
+//! Tasks, on the other hand, implement _cooperative multitasking_. In
+//! cooperative multitasking, a task is allowed to run until it _yields_,
+//! indicating to the Tokio runtime's scheduler that it cannot currently
+//! continue executing. When a task yields, the Tokio runtime switches to
+//! executing the next task.
+//!
+//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or
+//! must synchronize with another thread, it _blocks_, allowing the OS to
+//! schedule another thread. When a task cannot continue executing, it must
+//! yield instead, allowing the Tokio runtime to schedule another task. Tasks
+//! should generally not perform system calls or other operations that could
+//! block a thread, as this would prevent other tasks running on the same
+//! thread from executing as well. Instead, this module provides APIs for
+//! running blocking operations in an asynchronous context.
+//!
+//! [rt]: crate::runtime
+//! [green threads]: https://en.wikipedia.org/wiki/Green_threads
+//! [Go's goroutines]: https://tour.golang.org/concurrency/1
+//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html
+//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes
+//!
+//! ## Working with Tasks
+//!
+//! This module provides the following APIs for working with tasks:
+//!
+//! ### Spawning
+//!
+//! Perhaps the most important function in this module is [`task::spawn`]. This
+//! function can be thought of as an async equivalent to the standard library's
+//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other
+//! [future], and creates a new task to run that work concurrently:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn doc() {
+//! task::spawn(async {
+//! // perform some work here...
+//! });
+//! # }
+//! ```
+//!
+//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct.
+//! A `JoinHandle` is itself a future which may be used to await the output of
+//! the spawned task. For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let join = task::spawn(async {
+//! // ...
+//! "hello world!"
+//! });
+//!
+//! // ...
+//!
+//! // Await the result of the spawned task.
+//! let result = join.await?;
+//! assert_eq!(result, "hello world!");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned
+//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! let join = task::spawn(async {
+//! panic!("something bad happened!")
+//! });
+//!
+//! // The returned result indicates that the task failed.
+//! assert!(join.await.is_err());
+//! # }
+//! ```
+//!
+//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt"
+//! feature flag is enabled.
+//!
+//! [`task::spawn`]: crate::task::spawn()
+//! [future]: std::future::Future
+//! [`std::thread::spawn`]: std::thread::spawn
+//! [`JoinHandle`]: crate::task::JoinHandle
+//! [thread_join]: std::thread::JoinHandle
+//! [`JoinError`]: crate::task::JoinError
+//!
+//! ### Blocking and Yielding
+//!
+//! As we discussed above, code running in asynchronous tasks should not perform
+//! operations that can block. A blocking operation performed in a task running
+//! on a thread that is also running other tasks would block the entire thread,
+//! preventing other tasks from running.
+//!
+//! Instead, Tokio provides two APIs for running blocking operations in an
+//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`].
+//!
+//! Be aware that if you call a non-async method from async code, that non-async
+//! method is still inside the asynchronous context, so you should also avoid
+//! blocking operations there. This includes destructors of objects destroyed in
+//! async code.
+//!
+//! #### spawn_blocking
+//!
+//! The `task::spawn_blocking` function is similar to the `task::spawn` function
+//! discussed in the previous section, but rather than spawning an
+//! _non-blocking_ future on the Tokio runtime, it instead spawns a
+//! _blocking_ function on a dedicated thread pool for blocking tasks. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! });
+//! # }
+//! ```
+//!
+//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle`
+//! which we can use to await the result of the blocking operation:
+//!
+//! ```rust
+//! # use tokio::task;
+//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+//! let join = task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! let result = join.await?;
+//! assert_eq!(result, "blocking completed");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! #### block_in_place
+//!
+//! When using the [multi-threaded runtime][rt-multi-thread], the [`task::block_in_place`]
+//! function is also available. Like `task::spawn_blocking`, this function
+//! allows running a blocking operation from an asynchronous context. Unlike
+//! `spawn_blocking`, however, `block_in_place` works by transitioning the
+//! _current_ worker thread to a blocking thread, moving other tasks running on
+//! that thread to another worker thread. This can improve performance by avoiding
+//! context switches.
+//!
+//! For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! let result = task::block_in_place(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! assert_eq!(result, "blocking completed");
+//! # }
+//! ```
+//!
+//! #### yield_now
+//!
+//! In addition, this module provides a [`task::yield_now`] async function
+//! that is analogous to the standard library's [`thread::yield_now`]. Calling
+//! and `await`ing this function will cause the current task to yield to the
+//! Tokio runtime's scheduler, allowing other tasks to be
+//! scheduled. Eventually, the yielding task will be polled again, allowing it
+//! to execute. For example:
+//!
+//! ```rust
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! async {
+//! task::spawn(async {
+//! // ...
+//! println!("spawned task done!")
+//! });
+//!
+//! // Yield, allowing the newly-spawned task to execute first.
+//! task::yield_now().await;
+//! println!("main task done!");
+//! }
+//! # .await;
+//! # }
+//! ```
+//!
+//! ### Cooperative scheduling
+//!
+//! A single call to [`poll`] on a top-level task may potentially do a lot of
+//! work before it returns `Poll::Pending`. If a task runs for a long period of
+//! time without yielding back to the executor, it can starve other tasks
+//! waiting on that executor to execute them, or drive underlying resources.
+//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
+//! long-running task. Instead, this module provides an opt-in mechanism for
+//! futures to collaborate with the executor to avoid starvation.
+//!
+//! Consider a future like this one:
+//!
+//! ```
+//! # use tokio_stream::{Stream, StreamExt};
+//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
+//! while let Some(_) = input.next().await {}
+//! }
+//! ```
+//!
+//! It may look harmless, but consider what happens under heavy load if the
+//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
+//! yield, and will starve other tasks and resources on the same executor.
+//!
+//! To account for this, Tokio has explicit yield points in a number of library
+//! functions, which force tasks to return to the executor periodically.
+//!
+//!
+//! #### unconstrained
+//!
+//! If necessary, [`task::unconstrained`] lets you opt a future out of of Tokio's cooperative
+//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
+//! Tokio. For example:
+//!
+//! ```
+//! # #[tokio::main]
+//! # async fn main() {
+//! use tokio::{task, sync::mpsc};
+//!
+//! let fut = async {
+//! let (tx, mut rx) = mpsc::unbounded_channel();
+//!
+//! for i in 0..1000 {
+//! let _ = tx.send(());
+//! // This will always be ready. If coop was in effect, this code would be forced to yield
+//! // periodically. However, if left unconstrained, then this code will never yield.
+//! rx.recv().await;
+//! }
+//! };
+//!
+//! task::unconstrained(fut).await;
+//! # }
+//! ```
+//!
+//! [`task::spawn_blocking`]: crate::task::spawn_blocking
+//! [`task::block_in_place`]: crate::task::block_in_place
+//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
+//! [`task::yield_now`]: crate::task::yield_now()
+//! [`thread::yield_now`]: std::thread::yield_now
+//! [`task::unconstrained`]: crate::task::unconstrained()
+//! [`poll`]: method@std::future::Future::poll
+
+cfg_rt! {
+ pub use crate::runtime::task::{JoinError, JoinHandle};
+
+ cfg_not_wasi! {
+ mod blocking;
+ pub use blocking::spawn_blocking;
+ }
+
+ mod spawn;
+ pub use spawn::spawn;
+
+ cfg_rt_multi_thread! {
+ pub use blocking::block_in_place;
+ }
+
+ mod yield_now;
+ pub use yield_now::yield_now;
+
+ cfg_unstable! {
+ mod consume_budget;
+ pub use consume_budget::consume_budget;
+ }
+
+ mod local;
+ pub use local::{spawn_local, LocalSet, LocalEnterGuard};
+
+ mod task_local;
+ pub use task_local::LocalKey;
+
+ mod unconstrained;
+ pub use unconstrained::{unconstrained, Unconstrained};
+
+ #[doc(inline)]
+ pub use join_set::JoinSet;
+ pub use crate::runtime::task::AbortHandle;
+
+ // Uses #[cfg(...)] instead of macro since the macro adds docsrs annotations.
+ #[cfg(not(tokio_unstable))]
+ mod join_set;
+ #[cfg(tokio_unstable)]
+ pub mod join_set;
+
+ cfg_unstable! {
+ pub use crate::runtime::task::{Id, id, try_id};
+ }
+
+ cfg_trace! {
+ mod builder;
+ pub use builder::Builder;
+ }
+
+ /// Task-related futures.
+ pub mod futures {
+ pub use super::task_local::TaskLocalFuture;
+ }
+}
diff --git a/third_party/rust/tokio/src/task/spawn.rs b/third_party/rust/tokio/src/task/spawn.rs
new file mode 100644
index 0000000000..23a46d7456
--- /dev/null
+++ b/third_party/rust/tokio/src/task/spawn.rs
@@ -0,0 +1,206 @@
+use crate::task::JoinHandle;
+
+use std::future::Future;
+
+cfg_rt! {
+ /// Spawns a new asynchronous task, returning a
+ /// [`JoinHandle`](super::JoinHandle) for it.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
+ ///
+ /// Spawning a task enables the task to execute concurrently to other tasks. The
+ /// spawned task may execute on the current thread, or it may be sent to a
+ /// different thread to be executed. The specifics depend on the current
+ /// [`Runtime`](crate::runtime::Runtime) configuration.
+ ///
+ /// It is guaranteed that spawn will not synchronously poll the task being spawned.
+ /// This means that calling spawn while holding a lock does not pose a risk of
+ /// deadlocking with the spawned task.
+ ///
+ /// There is no guarantee that a spawned task will execute to completion.
+ /// When a runtime is shutdown, all outstanding tasks are dropped,
+ /// regardless of the lifecycle of that task.
+ ///
+ /// This function must be called from the context of a Tokio runtime. Tasks running on
+ /// the Tokio runtime are always inside its context, but you can also enter the context
+ /// using the [`Runtime::enter`](crate::runtime::Runtime::enter()) method.
+ ///
+ /// # Examples
+ ///
+ /// In this example, a server is started and `spawn` is used to start a new task
+ /// that processes each received connection.
+ ///
+ /// ```no_run
+ /// use tokio::net::{TcpListener, TcpStream};
+ ///
+ /// use std::io;
+ ///
+ /// async fn process(socket: TcpStream) {
+ /// // ...
+ /// # drop(socket);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let (socket, _) = listener.accept().await?;
+ ///
+ /// tokio::spawn(async move {
+ /// // Process each socket concurrently.
+ /// process(socket).await
+ /// });
+ /// }
+ /// }
+ /// ```
+ ///
+ /// To run multiple tasks in parallel and receive their results, join
+ /// handles can be stored in a vector.
+ /// ```
+ /// # #[tokio::main(flavor = "current_thread")] async fn main() {
+ /// async fn my_background_op(id: i32) -> String {
+ /// let s = format!("Starting background task {}.", id);
+ /// println!("{}", s);
+ /// s
+ /// }
+ ///
+ /// let ops = vec![1, 2, 3];
+ /// let mut tasks = Vec::with_capacity(ops.len());
+ /// for op in ops {
+ /// // This call will make them start running in the background
+ /// // immediately.
+ /// tasks.push(tokio::spawn(my_background_op(op)));
+ /// }
+ ///
+ /// let mut outputs = Vec::with_capacity(tasks.len());
+ /// for task in tasks {
+ /// outputs.push(task.await.unwrap());
+ /// }
+ /// println!("{:?}", outputs);
+ /// # }
+ /// ```
+ /// This example pushes the tasks to `outputs` in the order they were
+ /// started in. If you do not care about the ordering of the outputs, then
+ /// you can also use a [`JoinSet`].
+ ///
+ /// [`JoinSet`]: struct@crate::task::JoinSet
+ ///
+ /// # Panics
+ ///
+ /// Panics if called from **outside** of the Tokio runtime.
+ ///
+ /// # Using `!Send` values from a task
+ ///
+ /// The task supplied to `spawn` must implement `Send`. However, it is
+ /// possible to **use** `!Send` values from the task as long as they only
+ /// exist between calls to `.await`.
+ ///
+ /// For example, this will work:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// // Force the `Rc` to stay in a scope with no `.await`
+ /// {
+ /// let rc = Rc::new(());
+ /// use_rc(rc.clone());
+ /// }
+ ///
+ /// task::yield_now().await;
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// This will **not** work:
+ ///
+ /// ```compile_fail
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// let rc = Rc::new(());
+ ///
+ /// task::yield_now().await;
+ ///
+ /// use_rc(rc.clone());
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// Holding on to a `!Send` value across calls to `.await` will result in
+ /// an unfriendly compile error message similar to:
+ ///
+ /// ```text
+ /// `[... some type ...]` cannot be sent between threads safely
+ /// ```
+ ///
+ /// or:
+ ///
+ /// ```text
+ /// error[E0391]: cycle detected when processing `main`
+ /// ```
+ #[track_caller]
+ pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ // preventing stack overflows on debug mode, by quickly sending the
+ // task to the heap.
+ if cfg!(debug_assertions) && std::mem::size_of::<T>() > 2048 {
+ spawn_inner(Box::pin(future), None)
+ } else {
+ spawn_inner(future, None)
+ }
+ }
+
+ #[track_caller]
+ pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ use crate::runtime::{context, task};
+
+ #[cfg(all(
+ tokio_unstable,
+ tokio_taskdump,
+ feature = "rt",
+ target_os = "linux",
+ any(
+ target_arch = "aarch64",
+ target_arch = "x86",
+ target_arch = "x86_64"
+ )
+ ))]
+ let future = task::trace::Trace::root(future);
+ let id = task::Id::next();
+ let task = crate::util::trace::task(future, "task", name, id.as_u64());
+
+ match context::with_current(|handle| handle.spawn(task, id)) {
+ Ok(join_handle) => join_handle,
+ Err(e) => panic!("{}", e),
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/task/task_local.rs b/third_party/rust/tokio/src/task/task_local.rs
new file mode 100644
index 0000000000..eeadfbd3ef
--- /dev/null
+++ b/third_party/rust/tokio/src/task/task_local.rs
@@ -0,0 +1,451 @@
+use pin_project_lite::pin_project;
+use std::cell::RefCell;
+use std::error::Error;
+use std::future::Future;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{fmt, mem, thread};
+
+/// Declares a new task-local key of type [`tokio::task::LocalKey`].
+///
+/// # Syntax
+///
+/// The macro wraps any number of static declarations and makes them local to the current task.
+/// Publicity and attributes for each static is preserved. For example:
+///
+/// # Examples
+///
+/// ```
+/// # use tokio::task_local;
+/// task_local! {
+/// pub static ONE: u32;
+///
+/// #[allow(unused)]
+/// static TWO: f32;
+/// }
+/// # fn main() {}
+/// ```
+///
+/// See [LocalKey documentation][`tokio::task::LocalKey`] for more
+/// information.
+///
+/// [`tokio::task::LocalKey`]: struct@crate::task::LocalKey
+#[macro_export]
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+macro_rules! task_local {
+ // empty (base case for the recursion)
+ () => {};
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ $crate::task_local!($($rest)*);
+ };
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ }
+}
+
+#[doc(hidden)]
+#[cfg(not(tokio_no_const_thread_local))]
+#[macro_export]
+macro_rules! __task_local_inner {
+ ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
+ $(#[$attr])*
+ $vis static $name: $crate::task::LocalKey<$t> = {
+ std::thread_local! {
+ static __KEY: std::cell::RefCell<Option<$t>> = const { std::cell::RefCell::new(None) };
+ }
+
+ $crate::task::LocalKey { inner: __KEY }
+ };
+ };
+}
+
+#[doc(hidden)]
+#[cfg(tokio_no_const_thread_local)]
+#[macro_export]
+macro_rules! __task_local_inner {
+ ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
+ $(#[$attr])*
+ $vis static $name: $crate::task::LocalKey<$t> = {
+ std::thread_local! {
+ static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
+ }
+
+ $crate::task::LocalKey { inner: __KEY }
+ };
+ };
+}
+
+/// A key for task-local data.
+///
+/// This type is generated by the [`task_local!`] macro.
+///
+/// Unlike [`std::thread::LocalKey`], `tokio::task::LocalKey` will
+/// _not_ lazily initialize the value on first access. Instead, the
+/// value is first initialized when the future containing
+/// the task-local is first polled by a futures executor, like Tokio.
+///
+/// # Examples
+///
+/// ```
+/// # async fn dox() {
+/// tokio::task_local! {
+/// static NUMBER: u32;
+/// }
+///
+/// NUMBER.scope(1, async move {
+/// assert_eq!(NUMBER.get(), 1);
+/// }).await;
+///
+/// NUMBER.scope(2, async move {
+/// assert_eq!(NUMBER.get(), 2);
+///
+/// NUMBER.scope(3, async move {
+/// assert_eq!(NUMBER.get(), 3);
+/// }).await;
+/// }).await;
+/// # }
+/// ```
+///
+/// [`std::thread::LocalKey`]: struct@std::thread::LocalKey
+/// [`task_local!`]: ../macro.task_local.html
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub struct LocalKey<T: 'static> {
+ #[doc(hidden)]
+ pub inner: thread::LocalKey<RefCell<Option<T>>>,
+}
+
+impl<T: 'static> LocalKey<T> {
+ /// Sets a value `T` as the task-local value for the future `F`.
+ ///
+ /// On completion of `scope`, the task-local will be dropped.
+ ///
+ /// ### Panics
+ ///
+ /// If you poll the returned future inside a call to [`with`] or
+ /// [`try_with`] on the same `LocalKey`, then the call to `poll` will panic.
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.scope(1, async move {
+ /// println!("task local value: {}", NUMBER.get());
+ /// }).await;
+ /// # }
+ /// ```
+ ///
+ /// [`with`]: fn@Self::with
+ /// [`try_with`]: fn@Self::try_with
+ pub fn scope<F>(&'static self, value: T, f: F) -> TaskLocalFuture<T, F>
+ where
+ F: Future,
+ {
+ TaskLocalFuture {
+ local: self,
+ slot: Some(value),
+ future: Some(f),
+ _pinned: PhantomPinned,
+ }
+ }
+
+ /// Sets a value `T` as the task-local value for the closure `F`.
+ ///
+ /// On completion of `sync_scope`, the task-local will be dropped.
+ ///
+ /// ### Panics
+ ///
+ /// This method panics if called inside a call to [`with`] or [`try_with`]
+ /// on the same `LocalKey`.
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.sync_scope(1, || {
+ /// println!("task local value: {}", NUMBER.get());
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// [`with`]: fn@Self::with
+ /// [`try_with`]: fn@Self::try_with
+ #[track_caller]
+ pub fn sync_scope<F, R>(&'static self, value: T, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ let mut value = Some(value);
+ match self.scope_inner(&mut value, f) {
+ Ok(res) => res,
+ Err(err) => err.panic(),
+ }
+ }
+
+ fn scope_inner<F, R>(&'static self, slot: &mut Option<T>, f: F) -> Result<R, ScopeInnerErr>
+ where
+ F: FnOnce() -> R,
+ {
+ struct Guard<'a, T: 'static> {
+ local: &'static LocalKey<T>,
+ slot: &'a mut Option<T>,
+ }
+
+ impl<'a, T: 'static> Drop for Guard<'a, T> {
+ fn drop(&mut self) {
+ // This should not panic.
+ //
+ // We know that the RefCell was not borrowed before the call to
+ // `scope_inner`, so the only way for this to panic is if the
+ // closure has created but not destroyed a RefCell guard.
+ // However, we never give user-code access to the guards, so
+ // there's no way for user-code to forget to destroy a guard.
+ //
+ // The call to `with` also should not panic, since the
+ // thread-local wasn't destroyed when we first called
+ // `scope_inner`, and it shouldn't have gotten destroyed since
+ // then.
+ self.local.inner.with(|inner| {
+ let mut ref_mut = inner.borrow_mut();
+ mem::swap(self.slot, &mut *ref_mut);
+ });
+ }
+ }
+
+ self.inner.try_with(|inner| {
+ inner
+ .try_borrow_mut()
+ .map(|mut ref_mut| mem::swap(slot, &mut *ref_mut))
+ })??;
+
+ let guard = Guard { local: self, slot };
+
+ let res = f();
+
+ drop(guard);
+
+ Ok(res)
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if the task local doesn't have a value set.
+ #[track_caller]
+ pub fn with<F, R>(&'static self, f: F) -> R
+ where
+ F: FnOnce(&T) -> R,
+ {
+ match self.try_with(f) {
+ Ok(res) => res,
+ Err(_) => panic!("cannot access a task-local storage value without setting it first"),
+ }
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// If the task-local with the associated key is not present, this
+ /// method will return an `AccessError`. For a panicking variant,
+ /// see `with`.
+ pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
+ where
+ F: FnOnce(&T) -> R,
+ {
+ // If called after the thread-local storing the task-local is destroyed,
+ // then we are outside of a closure where the task-local is set.
+ //
+ // Therefore, it is correct to return an AccessError if `try_with`
+ // returns an error.
+ let try_with_res = self.inner.try_with(|v| {
+ // This call to `borrow` cannot panic because no user-defined code
+ // runs while a `borrow_mut` call is active.
+ v.borrow().as_ref().map(f)
+ });
+
+ match try_with_res {
+ Ok(Some(res)) => Ok(res),
+ Ok(None) | Err(_) => Err(AccessError { _private: () }),
+ }
+ }
+}
+
+impl<T: Copy + 'static> LocalKey<T> {
+ /// Returns a copy of the task-local value
+ /// if the task-local value implements `Copy`.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if the task local doesn't have a value set.
+ #[track_caller]
+ pub fn get(&'static self) -> T {
+ self.with(|v| *v)
+ }
+}
+
+impl<T: 'static> fmt::Debug for LocalKey<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("LocalKey { .. }")
+ }
+}
+
+pin_project! {
+ /// A future that sets a value `T` of a task local for the future `F` during
+ /// its execution.
+ ///
+ /// The value of the task-local must be `'static` and will be dropped on the
+ /// completion of the future.
+ ///
+ /// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.scope(1, async move {
+ /// println!("task local value: {}", NUMBER.get());
+ /// }).await;
+ /// # }
+ /// ```
+ pub struct TaskLocalFuture<T, F>
+ where
+ T: 'static,
+ {
+ local: &'static LocalKey<T>,
+ slot: Option<T>,
+ #[pin]
+ future: Option<F>,
+ #[pin]
+ _pinned: PhantomPinned,
+ }
+
+ impl<T: 'static, F> PinnedDrop for TaskLocalFuture<T, F> {
+ fn drop(this: Pin<&mut Self>) {
+ let this = this.project();
+ if mem::needs_drop::<F>() && this.future.is_some() {
+ // Drop the future while the task-local is set, if possible. Otherwise
+ // the future is dropped normally when the `Option<F>` field drops.
+ let mut future = this.future;
+ let _ = this.local.scope_inner(this.slot, || {
+ future.set(None);
+ });
+ }
+ }
+ }
+}
+
+impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
+ type Output = F::Output;
+
+ #[track_caller]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.project();
+ let mut future_opt = this.future;
+
+ let res = this
+ .local
+ .scope_inner(this.slot, || match future_opt.as_mut().as_pin_mut() {
+ Some(fut) => {
+ let res = fut.poll(cx);
+ if res.is_ready() {
+ future_opt.set(None);
+ }
+ Some(res)
+ }
+ None => None,
+ });
+
+ match res {
+ Ok(Some(res)) => res,
+ Ok(None) => panic!("`TaskLocalFuture` polled after completion"),
+ Err(err) => err.panic(),
+ }
+ }
+}
+
+impl<T: 'static, F> fmt::Debug for TaskLocalFuture<T, F>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ /// Format the Option without Some.
+ struct TransparentOption<'a, T> {
+ value: &'a Option<T>,
+ }
+ impl<'a, T: fmt::Debug> fmt::Debug for TransparentOption<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.value.as_ref() {
+ Some(value) => value.fmt(f),
+ // Hitting the None branch should not be possible.
+ None => f.pad("<missing>"),
+ }
+ }
+ }
+
+ f.debug_struct("TaskLocalFuture")
+ .field("value", &TransparentOption { value: &self.slot })
+ .finish()
+ }
+}
+
+/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with).
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct AccessError {
+ _private: (),
+}
+
+impl fmt::Debug for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AccessError").finish()
+ }
+}
+
+impl fmt::Display for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt("task-local value not set", f)
+ }
+}
+
+impl Error for AccessError {}
+
+enum ScopeInnerErr {
+ BorrowError,
+ AccessError,
+}
+
+impl ScopeInnerErr {
+ #[track_caller]
+ fn panic(&self) -> ! {
+ match self {
+ Self::BorrowError => panic!("cannot enter a task-local scope while the task-local storage is borrowed"),
+ Self::AccessError => panic!("cannot enter a task-local scope during or after destruction of the underlying thread-local"),
+ }
+ }
+}
+
+impl From<std::cell::BorrowMutError> for ScopeInnerErr {
+ fn from(_: std::cell::BorrowMutError) -> Self {
+ Self::BorrowError
+ }
+}
+
+impl From<std::thread::AccessError> for ScopeInnerErr {
+ fn from(_: std::thread::AccessError) -> Self {
+ Self::AccessError
+ }
+}
diff --git a/third_party/rust/tokio/src/task/unconstrained.rs b/third_party/rust/tokio/src/task/unconstrained.rs
new file mode 100644
index 0000000000..40384c8709
--- /dev/null
+++ b/third_party/rust/tokio/src/task/unconstrained.rs
@@ -0,0 +1,45 @@
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Future for the [`unconstrained`](unconstrained) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+ #[must_use = "Unconstrained does nothing unless polled"]
+ pub struct Unconstrained<F> {
+ #[pin]
+ inner: F,
+ }
+}
+
+impl<F> Future for Unconstrained<F>
+where
+ F: Future,
+{
+ type Output = <F as Future>::Output;
+
+ cfg_coop! {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ crate::runtime::coop::with_unconstrained(|| inner.poll(cx))
+ }
+ }
+
+ cfg_not_coop! {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ inner.poll(cx)
+ }
+ }
+}
+
+/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
+/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
+/// otherwise.
+///
+/// See also the usage example in the [task module](index.html#unconstrained).
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
+ Unconstrained { inner }
+}
diff --git a/third_party/rust/tokio/src/task/yield_now.rs b/third_party/rust/tokio/src/task/yield_now.rs
new file mode 100644
index 0000000000..428d124c3b
--- /dev/null
+++ b/third_party/rust/tokio/src/task/yield_now.rs
@@ -0,0 +1,64 @@
+use crate::runtime::context;
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Yields execution back to the Tokio runtime.
+///
+/// A task yields by awaiting on `yield_now()`, and may resume when that future
+/// completes (with no output.) The current task will be re-added as a pending
+/// task at the _back_ of the pending queue. Any other pending tasks will be
+/// scheduled. No other waking is required for the task to continue.
+///
+/// See also the usage example in the [task module](index.html#yield_now).
+///
+/// ## Non-guarantees
+///
+/// This function may not yield all the way up to the executor if there are any
+/// special combinators above it in the call stack. For example, if a
+/// [`tokio::select!`] has another branch complete during the same poll as the
+/// `yield_now()`, then the yield is not propagated all the way up to the
+/// runtime.
+///
+/// It is generally not guaranteed that the runtime behaves like you expect it
+/// to when deciding which task to schedule next after a call to `yield_now()`.
+/// In particular, the runtime may choose to poll the task that just ran
+/// `yield_now()` again immediately without polling any other tasks first. For
+/// example, the runtime will not drive the IO driver between every poll of a
+/// task, and this could result in the runtime polling the current task again
+/// immediately even if there is another task that could make progress if that
+/// other task is waiting for a notification from the IO driver.
+///
+/// In general, changes to the order in which the runtime polls tasks is not
+/// considered a breaking change, and your program should be correct no matter
+/// which order the runtime polls your tasks in.
+///
+/// [`tokio::select!`]: macro@crate::select
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub async fn yield_now() {
+ /// Yield implementation
+ struct YieldNow {
+ yielded: bool,
+ }
+
+ impl Future for YieldNow {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ ready!(crate::trace::trace_leaf(cx));
+
+ if self.yielded {
+ return Poll::Ready(());
+ }
+
+ self.yielded = true;
+
+ context::defer(cx.waker());
+
+ Poll::Pending
+ }
+ }
+
+ YieldNow { yielded: false }.await
+}