summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/task
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/task')
-rw-r--r--third_party/rust/tokio/src/task/blocking.rs199
-rw-r--r--third_party/rust/tokio/src/task/builder.rs115
-rw-r--r--third_party/rust/tokio/src/task/join_set.rs248
-rw-r--r--third_party/rust/tokio/src/task/local.rs698
-rw-r--r--third_party/rust/tokio/src/task/mod.rs317
-rw-r--r--third_party/rust/tokio/src/task/spawn.rs149
-rw-r--r--third_party/rust/tokio/src/task/task_local.rs302
-rw-r--r--third_party/rust/tokio/src/task/unconstrained.rs45
-rw-r--r--third_party/rust/tokio/src/task/yield_now.rs58
9 files changed, 2131 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/task/blocking.rs b/third_party/rust/tokio/src/task/blocking.rs
new file mode 100644
index 0000000000..5fe358f3e5
--- /dev/null
+++ b/third_party/rust/tokio/src/task/blocking.rs
@@ -0,0 +1,199 @@
+use crate::task::JoinHandle;
+
+cfg_rt_multi_thread! {
+ /// Runs the provided blocking function on the current thread without
+ /// blocking the executor.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is problematic, as it may prevent the executor
+ /// from driving other tasks forward. Calling this function informs the
+ /// executor that the currently executing task is about to block the thread,
+ /// so the executor is able to hand off any other tasks it has to a new
+ /// worker thread before that happens. See the [CPU-bound tasks and blocking
+ /// code][blocking] section for more information.
+ ///
+ /// Be aware that although this function avoids starving other independently
+ /// spawned tasks, any other code running concurrently in the same task will
+ /// be suspended during the call to `block_in_place`. This can happen e.g.
+ /// when using the [`join!`] macro. To avoid this issue, use
+ /// [`spawn_blocking`] instead of `block_in_place`.
+ ///
+ /// Note that this function cannot be used within a [`current_thread`] runtime
+ /// because in this case there are no other worker threads to hand off tasks
+ /// to. On the other hand, calling the function outside a runtime is
+ /// allowed. In this case, `block_in_place` just calls the provided closure
+ /// normally.
+ ///
+ /// Code running behind `block_in_place` cannot be cancelled. When you shut
+ /// down the executor, it will wait indefinitely for all blocking operations
+ /// to finish. You can use [`shutdown_timeout`] to stop waiting for them
+ /// after a certain timeout. Be aware that this will still not cancel the
+ /// tasks — they are simply allowed to keep running after the method
+ /// returns.
+ ///
+ /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
+ /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
+ /// [`join!`]: macro@join
+ /// [`thread::spawn`]: fn@std::thread::spawn
+ /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// // do some compute-heavy work or call synchronous code
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Code running inside `block_in_place` may use `block_on` to reenter the
+ /// async context.
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::runtime::Handle;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// Handle::current().block_on(async move {
+ /// // do something async
+ /// });
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called from a [`current_thread`] runtime.
+ ///
+ /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread
+ pub fn block_in_place<F, R>(f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ crate::runtime::thread_pool::block_in_place(f)
+ }
+}
+
+cfg_rt! {
+ /// Runs the provided closure on a thread where blocking is acceptable.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is problematic, as it may prevent the executor from
+ /// driving other futures forward. This function runs the provided closure on a
+ /// thread dedicated to blocking operations. See the [CPU-bound tasks and
+ /// blocking code][blocking] section for more information.
+ ///
+ /// Tokio will spawn more blocking threads when they are requested through this
+ /// function until the upper limit configured on the [`Builder`] is reached.
+ /// After reaching the upper limit, the tasks are put in a queue.
+ /// The thread limit is very large by default, because `spawn_blocking` is often
+ /// used for various kinds of IO operations that cannot be performed
+ /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
+ /// should keep this large upper limit in mind. When running many CPU-bound
+ /// computations, a semaphore or some other synchronization primitive should be
+ /// used to limit the number of computation executed in parallel. Specialized
+ /// CPU-bound executors, such as [rayon], may also be a good fit.
+ ///
+ /// This function is intended for non-async operations that eventually finish on
+ /// their own. If you want to spawn an ordinary thread, you should use
+ /// [`thread::spawn`] instead.
+ ///
+ /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut
+ /// down the executor, it will wait indefinitely for all blocking operations to
+ /// finish. You can use [`shutdown_timeout`] to stop waiting for them after a
+ /// certain timeout. Be aware that this will still not cancel the tasks — they
+ /// are simply allowed to keep running after the method returns.
+ ///
+ /// Note that if you are using the single threaded runtime, this function will
+ /// still spawn additional threads for blocking operations. The basic
+ /// scheduler's single thread is only used for asynchronous code.
+ ///
+ /// # Related APIs and patterns for bridging asynchronous and blocking code
+ ///
+ /// In simple cases, it is sufficient to have the closure accept input
+ /// parameters at creation time and return a single value (or struct/tuple, etc.).
+ ///
+ /// For more complex situations in which it is desirable to stream data to or from
+ /// the synchronous context, the [`mpsc channel`] has `blocking_send` and
+ /// `blocking_recv` methods for use in non-async code such as the thread created
+ /// by `spawn_blocking`.
+ ///
+ /// Another option is [`SyncIoBridge`] for cases where the synchronous context
+ /// is operating on byte streams. For example, you might use an asynchronous
+ /// HTTP client such as [hyper] to fetch data, but perform complex parsing
+ /// of the payload body using a library written for synchronous I/O.
+ ///
+ /// Finally, see also [Bridging with sync code][bridgesync] for discussions
+ /// around the opposite case of using Tokio as part of a larger synchronous
+ /// codebase.
+ ///
+ /// [`Builder`]: struct@crate::runtime::Builder
+ /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
+ /// [rayon]: https://docs.rs/rayon
+ /// [`mpsc channel`]: crate::sync::mpsc
+ /// [`SyncIoBridge`]: https://docs.rs/tokio-util/0.6/tokio_util/io/struct.SyncIoBridge.html
+ /// [hyper]: https://docs.rs/hyper
+ /// [`thread::spawn`]: fn@std::thread::spawn
+ /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ /// [bridgesync]: https://tokio.rs/tokio/topics/bridging
+ ///
+ /// # Examples
+ ///
+ /// Pass an input value and receive result of computation:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+ /// // Initial input
+ /// let mut v = "Hello, ".to_string();
+ /// let res = task::spawn_blocking(move || {
+ /// // Stand-in for compute-heavy work or using synchronous APIs
+ /// v.push_str("world");
+ /// // Pass ownership of the value back to the asynchronous context
+ /// v
+ /// }).await?;
+ ///
+ /// // `res` is the value returned from the thread
+ /// assert_eq!(res.as_str(), "Hello, world");
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// Use a channel:
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::sync::mpsc;
+ ///
+ /// # async fn docs() {
+ /// let (tx, mut rx) = mpsc::channel(2);
+ /// let start = 5;
+ /// let worker = task::spawn_blocking(move || {
+ /// for x in 0..10 {
+ /// // Stand in for complex computation
+ /// tx.blocking_send(start + x).unwrap();
+ /// }
+ /// });
+ ///
+ /// let mut acc = 0;
+ /// while let Some(v) = rx.recv().await {
+ /// acc += v;
+ /// }
+ /// assert_eq!(acc, 95);
+ /// worker.await.unwrap();
+ /// # }
+ /// ```
+ #[track_caller]
+ pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ crate::runtime::spawn_blocking(f)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/builder.rs b/third_party/rust/tokio/src/task/builder.rs
new file mode 100644
index 0000000000..2086302fb9
--- /dev/null
+++ b/third_party/rust/tokio/src/task/builder.rs
@@ -0,0 +1,115 @@
+#![allow(unreachable_pub)]
+use crate::{runtime::context, task::JoinHandle};
+use std::future::Future;
+
+/// Factory which is used to configure the properties of a new task.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// Methods can be chained in order to configure it.
+///
+/// Currently, there is only one configuration option:
+///
+/// - [`name`], which specifies an associated name for
+/// the task
+///
+/// There are three types of task that can be spawned from a Builder:
+/// - [`spawn_local`] for executing futures on the current thread
+/// - [`spawn`] for executing [`Send`] futures on the runtime
+/// - [`spawn_blocking`] for executing blocking code in the
+/// blocking thread pool.
+///
+/// ## Example
+///
+/// ```no_run
+/// use tokio::net::{TcpListener, TcpStream};
+///
+/// use std::io;
+///
+/// async fn process(socket: TcpStream) {
+/// // ...
+/// # drop(socket);
+/// }
+///
+/// #[tokio::main]
+/// async fn main() -> io::Result<()> {
+/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
+///
+/// loop {
+/// let (socket, _) = listener.accept().await?;
+///
+/// tokio::task::Builder::new()
+/// .name("tcp connection handler")
+/// .spawn(async move {
+/// // Process each socket concurrently.
+/// process(socket).await
+/// });
+/// }
+/// }
+/// ```
+/// [unstable]: crate#unstable-features
+/// [`name`]: Builder::name
+/// [`spawn_local`]: Builder::spawn_local
+/// [`spawn`]: Builder::spawn
+/// [`spawn_blocking`]: Builder::spawn_blocking
+#[derive(Default, Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
+pub struct Builder<'a> {
+ name: Option<&'a str>,
+}
+
+impl<'a> Builder<'a> {
+ /// Creates a new task builder.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Assigns a name to the task which will be spawned.
+ pub fn name(&self, name: &'a str) -> Self {
+ Self { name: Some(name) }
+ }
+
+ /// Spawns a task on the executor.
+ ///
+ /// See [`task::spawn`](crate::task::spawn) for
+ /// more details.
+ #[track_caller]
+ pub fn spawn<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
+ where
+ Fut: Future + Send + 'static,
+ Fut::Output: Send + 'static,
+ {
+ super::spawn::spawn_inner(future, self.name)
+ }
+
+ /// Spawns a task on the current thread.
+ ///
+ /// See [`task::spawn_local`](crate::task::spawn_local)
+ /// for more details.
+ #[track_caller]
+ pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
+ where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+ {
+ super::local::spawn_local_inner(future, self.name)
+ }
+
+ /// Spawns blocking code on the blocking threadpool.
+ ///
+ /// See [`task::spawn_blocking`](crate::task::spawn_blocking)
+ /// for more details.
+ #[track_caller]
+ pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
+ where
+ Function: FnOnce() -> Output + Send + 'static,
+ Output: Send + 'static,
+ {
+ use crate::runtime::Mandatory;
+ let (join_handle, _was_spawned) =
+ context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name);
+ join_handle
+ }
+}
diff --git a/third_party/rust/tokio/src/task/join_set.rs b/third_party/rust/tokio/src/task/join_set.rs
new file mode 100644
index 0000000000..8e8f74f66d
--- /dev/null
+++ b/third_party/rust/tokio/src/task/join_set.rs
@@ -0,0 +1,248 @@
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use crate::runtime::Handle;
+use crate::task::{JoinError, JoinHandle, LocalSet};
+use crate::util::IdleNotifiedSet;
+
+/// A collection of tasks spawned on a Tokio runtime.
+///
+/// A `JoinSet` can be used to await the completion of some or all of the tasks
+/// in the set. The set is not ordered, and the tasks will be returned in the
+/// order they complete.
+///
+/// All of the tasks must have the same return type `T`.
+///
+/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// # Examples
+///
+/// Spawn multiple tasks and wait for them.
+///
+/// ```
+/// use tokio::task::JoinSet;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut set = JoinSet::new();
+///
+/// for i in 0..10 {
+/// set.spawn(async move { i });
+/// }
+///
+/// let mut seen = [false; 10];
+/// while let Some(res) = set.join_one().await.unwrap() {
+/// seen[res] = true;
+/// }
+///
+/// for i in 0..10 {
+/// assert!(seen[i]);
+/// }
+/// }
+/// ```
+///
+/// [unstable]: crate#unstable-features
+pub struct JoinSet<T> {
+ inner: IdleNotifiedSet<JoinHandle<T>>,
+}
+
+impl<T> JoinSet<T> {
+ /// Create a new `JoinSet`.
+ pub fn new() -> Self {
+ Self {
+ inner: IdleNotifiedSet::new(),
+ }
+ }
+
+ /// Returns the number of tasks currently in the `JoinSet`.
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// Returns whether the `JoinSet` is empty.
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+}
+
+impl<T: 'static> JoinSet<T> {
+ /// Spawn the provided task on the `JoinSet`.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ pub fn spawn<F>(&mut self, task: F)
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(crate::spawn(task));
+ }
+
+ /// Spawn the provided task on the provided runtime and store it in this `JoinSet`.
+ pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ T: Send,
+ {
+ self.insert(handle.spawn(task));
+ }
+
+ /// Spawn the provided task on the current [`LocalSet`] and store it in this `JoinSet`.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if it is called outside of a `LocalSet`.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ pub fn spawn_local<F>(&mut self, task: F)
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ self.insert(crate::task::spawn_local(task));
+ }
+
+ /// Spawn the provided task on the provided [`LocalSet`] and store it in this `JoinSet`.
+ ///
+ /// [`LocalSet`]: crate::task::LocalSet
+ pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet)
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ {
+ self.insert(local_set.spawn_local(task));
+ }
+
+ fn insert(&mut self, jh: JoinHandle<T>) {
+ let mut entry = self.inner.insert_idle(jh);
+
+ // Set the waker that is notified when the task completes.
+ entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
+ }
+
+ /// Waits until one of the tasks in the set completes and returns its output.
+ ///
+ /// Returns `None` if the set is empty.
+ ///
+ /// # Cancel Safety
+ ///
+ /// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!`
+ /// statement and some other branch completes first, it is guaranteed that no tasks were
+ /// removed from this `JoinSet`.
+ pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
+ crate::future::poll_fn(|cx| self.poll_join_one(cx)).await
+ }
+
+ /// Aborts all tasks and waits for them to finish shutting down.
+ ///
+ /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in
+ /// a loop until it returns `Ok(None)`.
+ ///
+ /// This method ignores any panics in the tasks shutting down. When this call returns, the
+ /// `JoinSet` will be empty.
+ ///
+ /// [`abort_all`]: fn@Self::abort_all
+ /// [`join_one`]: fn@Self::join_one
+ pub async fn shutdown(&mut self) {
+ self.abort_all();
+ while self.join_one().await.transpose().is_some() {}
+ }
+
+ /// Aborts all tasks on this `JoinSet`.
+ ///
+ /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
+ /// cancellation, you should call `join_one` in a loop until the `JoinSet` is empty.
+ pub fn abort_all(&mut self) {
+ self.inner.for_each(|jh| jh.abort());
+ }
+
+ /// Removes all tasks from this `JoinSet` without aborting them.
+ ///
+ /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
+ /// is dropped.
+ pub fn detach_all(&mut self) {
+ self.inner.drain(drop);
+ }
+
+ /// Polls for one of the tasks in the set to complete.
+ ///
+ /// If this returns `Poll::Ready(Ok(Some(_)))` or `Poll::Ready(Err(_))`, then the task that
+ /// completed is removed from the set.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
+ /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
+ /// `poll_join_one`, only the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup.
+ ///
+ /// # Returns
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
+ /// available right now.
+ /// * `Poll::Ready(Ok(Some(value)))` if one of the tasks in this `JoinSet` has completed. The
+ /// `value` is the return value of one of the tasks that completed.
+ /// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been
+ /// aborted.
+ /// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty.
+ ///
+ /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
+ /// This can happen if the [coop budget] is reached.
+ ///
+ /// [coop budget]: crate::task#cooperative-scheduling
+ fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<T>, JoinError>> {
+ // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
+ // the `notified` list if the waker is notified in the `poll` call below.
+ let mut entry = match self.inner.pop_notified(cx.waker()) {
+ Some(entry) => entry,
+ None => {
+ if self.is_empty() {
+ return Poll::Ready(Ok(None));
+ } else {
+ // The waker was set by `pop_notified`.
+ return Poll::Pending;
+ }
+ }
+ };
+
+ let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
+
+ if let Poll::Ready(res) = res {
+ entry.remove();
+ Poll::Ready(Some(res).transpose())
+ } else {
+ // A JoinHandle generally won't emit a wakeup without being ready unless
+ // the coop limit has been reached. We yield to the executor in this
+ // case.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+}
+
+impl<T> Drop for JoinSet<T> {
+ fn drop(&mut self) {
+ self.inner.drain(|join_handle| join_handle.abort());
+ }
+}
+
+impl<T> fmt::Debug for JoinSet<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JoinSet").field("len", &self.len()).finish()
+ }
+}
+
+impl<T> Default for JoinSet<T> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/third_party/rust/tokio/src/task/local.rs b/third_party/rust/tokio/src/task/local.rs
new file mode 100644
index 0000000000..2dbd970604
--- /dev/null
+++ b/third_party/rust/tokio/src/task/local.rs
@@ -0,0 +1,698 @@
+//! Runs `!Send` futures on the current thread.
+use crate::loom::sync::{Arc, Mutex};
+use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
+use crate::sync::AtomicWaker;
+use crate::util::VecDequeCell;
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::Poll;
+
+use pin_project_lite::pin_project;
+
+cfg_rt! {
+ /// A set of tasks which are executed on the same thread.
+ ///
+ /// In some cases, it is necessary to run one or more futures that do not
+ /// implement [`Send`] and thus are unsafe to send between threads. In these
+ /// cases, a [local task set] may be used to schedule one or more `!Send`
+ /// futures to run together on the same thread.
+ ///
+ /// For example, the following code will not compile:
+ ///
+ /// ```rust,compile_fail
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // `Rc` does not implement `Send`, and thus may not be sent between
+ /// // threads safely.
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// let unsend_data = unsend_data.clone();
+ /// // Because the `async` block here moves `unsend_data`, the future is `!Send`.
+ /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
+ /// // will not compile.
+ /// tokio::spawn(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// # Use with `run_until`
+ ///
+ /// To spawn `!Send` futures, we can use a local task set to schedule them
+ /// on the thread calling [`Runtime::block_on`]. When running inside of the
+ /// local task set, we can use [`task::spawn_local`], which can spawn
+ /// `!Send` futures. For example:
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// // Construct a local task set that can run `!Send` futures.
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let unsend_data = unsend_data.clone();
+ /// // `spawn_local` ensures that the future is spawned on the local
+ /// // task set.
+ /// task::spawn_local(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
+ /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
+ /// cannot be used inside a task spawned with `tokio::spawn`.
+ ///
+ /// ## Awaiting a `LocalSet`
+ ///
+ /// Additionally, a `LocalSet` itself implements `Future`, completing when
+ /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
+ /// several futures on a `LocalSet` and drive the whole set until they
+ /// complete. For example,
+ ///
+ /// ```rust
+ /// use tokio::{task, time};
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("world");
+ /// let local = task::LocalSet::new();
+ ///
+ /// let unsend_data2 = unsend_data.clone();
+ /// local.spawn_local(async move {
+ /// // ...
+ /// println!("hello {}", unsend_data2)
+ /// });
+ ///
+ /// local.spawn_local(async move {
+ /// time::sleep(time::Duration::from_millis(100)).await;
+ /// println!("goodbye {}", unsend_data)
+ /// });
+ ///
+ /// // ...
+ ///
+ /// local.await;
+ /// }
+ /// ```
+ /// **Note:** Awaiting a `LocalSet` can only be done inside
+ /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
+ /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
+ /// `tokio::spawn`.
+ ///
+ /// ## Use inside `tokio::spawn`
+ ///
+ /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
+ /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
+ /// something else. The solution is to create the `LocalSet` somewhere else,
+ /// and communicate with it using an [`mpsc`] channel.
+ ///
+ /// The following example puts the `LocalSet` inside a new thread.
+ /// ```
+ /// use tokio::runtime::Builder;
+ /// use tokio::sync::{mpsc, oneshot};
+ /// use tokio::task::LocalSet;
+ ///
+ /// // This struct describes the task you want to spawn. Here we include
+ /// // some simple examples. The oneshot channel allows sending a response
+ /// // to the spawner.
+ /// #[derive(Debug)]
+ /// enum Task {
+ /// PrintNumber(u32),
+ /// AddOne(u32, oneshot::Sender<u32>),
+ /// }
+ ///
+ /// #[derive(Clone)]
+ /// struct LocalSpawner {
+ /// send: mpsc::UnboundedSender<Task>,
+ /// }
+ ///
+ /// impl LocalSpawner {
+ /// pub fn new() -> Self {
+ /// let (send, mut recv) = mpsc::unbounded_channel();
+ ///
+ /// let rt = Builder::new_current_thread()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// std::thread::spawn(move || {
+ /// let local = LocalSet::new();
+ ///
+ /// local.spawn_local(async move {
+ /// while let Some(new_task) = recv.recv().await {
+ /// tokio::task::spawn_local(run_task(new_task));
+ /// }
+ /// // If the while loop returns, then all the LocalSpawner
+ /// // objects have have been dropped.
+ /// });
+ ///
+ /// // This will return once all senders are dropped and all
+ /// // spawned tasks have returned.
+ /// rt.block_on(local);
+ /// });
+ ///
+ /// Self {
+ /// send,
+ /// }
+ /// }
+ ///
+ /// pub fn spawn(&self, task: Task) {
+ /// self.send.send(task).expect("Thread with LocalSet has shut down.");
+ /// }
+ /// }
+ ///
+ /// // This task may do !Send stuff. We use printing a number as an example,
+ /// // but it could be anything.
+ /// //
+ /// // The Task struct is an enum to support spawning many different kinds
+ /// // of operations.
+ /// async fn run_task(task: Task) {
+ /// match task {
+ /// Task::PrintNumber(n) => {
+ /// println!("{}", n);
+ /// },
+ /// Task::AddOne(n, response) => {
+ /// // We ignore failures to send the response.
+ /// let _ = response.send(n + 1);
+ /// },
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let spawner = LocalSpawner::new();
+ ///
+ /// let (send, response) = oneshot::channel();
+ /// spawner.spawn(Task::AddOne(10, send));
+ /// let eleven = response.await.unwrap();
+ /// assert_eq!(eleven, 11);
+ /// }
+ /// ```
+ ///
+ /// [`Send`]: trait@std::marker::Send
+ /// [local task set]: struct@LocalSet
+ /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
+ /// [`task::spawn_local`]: fn@spawn_local
+ /// [`mpsc`]: mod@crate::sync::mpsc
+ pub struct LocalSet {
+ /// Current scheduler tick.
+ tick: Cell<u8>,
+
+ /// State available from thread-local.
+ context: Context,
+
+ /// This type should not be Send.
+ _not_send: PhantomData<*const ()>,
+ }
+}
+
+/// State available from the thread-local.
+struct Context {
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LocalOwnedTasks<Arc<Shared>>,
+
+ /// Local run queue sender and receiver.
+ queue: VecDequeCell<task::Notified<Arc<Shared>>>,
+
+ /// State shared between threads.
+ shared: Arc<Shared>,
+}
+
+/// LocalSet state shared between threads.
+struct Shared {
+ /// Remote run queue sender.
+ queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
+
+ /// Wake the `LocalSet` task.
+ waker: AtomicWaker,
+}
+
+pin_project! {
+ #[derive(Debug)]
+ struct RunUntil<'a, F> {
+ local_set: &'a LocalSet,
+ #[pin]
+ future: F,
+ }
+}
+
+scoped_thread_local!(static CURRENT: Context);
+
+cfg_rt! {
+ /// Spawns a `!Send` future on the local task set.
+ ///
+ /// The spawned future will be run on the same thread that called `spawn_local.`
+ /// This may only be called from the context of a local task set.
+ ///
+ /// # Panics
+ ///
+ /// - This function panics if called outside of a local task set.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let unsend_data = unsend_data.clone();
+ /// task::spawn_local(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ #[track_caller]
+ pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ spawn_local_inner(future, None)
+ }
+
+
+ #[track_caller]
+ pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
+ where F: Future + 'static,
+ F::Output: 'static
+ {
+ let future = crate::util::trace::task(future, "local", name);
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx
+ .expect("`spawn_local` called from outside of a `task::LocalSet`");
+
+ let (handle, notified) = cx.owned.bind(future, cx.shared.clone());
+
+ if let Some(notified) = notified {
+ cx.shared.schedule(notified);
+ }
+
+ handle
+ })
+ }
+}
+
+/// Initial queue capacity.
+const INITIAL_CAPACITY: usize = 64;
+
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often it check the remote queue first.
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+impl LocalSet {
+ /// Returns a new local task set.
+ pub fn new() -> LocalSet {
+ LocalSet {
+ tick: Cell::new(0),
+ context: Context {
+ owned: LocalOwnedTasks::new(),
+ queue: VecDequeCell::with_capacity(INITIAL_CAPACITY),
+ shared: Arc::new(Shared {
+ queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
+ waker: AtomicWaker::new(),
+ }),
+ },
+ _not_send: PhantomData,
+ }
+ }
+
+ /// Spawns a `!Send` task onto the local task set.
+ ///
+ /// This task is guaranteed to be run on the current thread.
+ ///
+ /// Unlike the free function [`spawn_local`], this method may be used to
+ /// spawn local tasks when the task set is _not_ running. For example:
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Spawn a future on the local set. This future will be run when
+ /// // we call `run_until` to drive the task set.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ ///
+ /// // When `run` finishes, we can spawn _more_ futures, which will
+ /// // run in subsequent calls to `run_until`.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ /// [`spawn_local`]: fn@spawn_local
+ #[track_caller]
+ pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ let future = crate::util::trace::task(future, "local", None);
+
+ let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone());
+
+ if let Some(notified) = notified {
+ self.context.shared.schedule(notified);
+ }
+
+ self.context.shared.waker.wake();
+ handle
+ }
+
+ /// Runs a future to completion on the provided runtime, driving any local
+ /// futures spawned on this task set on the current thread.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers which
+ /// the future spawns internally will be executed on the runtime. The future
+ /// may also call [`spawn_local`] to spawn_local additional local futures on the
+ /// current thread.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ ///
+ /// # Notes
+ ///
+ /// Since this function internally calls [`Runtime::block_on`], and drives
+ /// futures in the local task set inside that call to `block_on`, the local
+ /// futures may not use [in-place blocking]. If a blocking call needs to be
+ /// issued from a local task, the [`spawn_blocking`] API may be used instead.
+ ///
+ /// For example, this will panic:
+ /// ```should_panic
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::block_in_place(|| {
+ /// // ...
+ /// });
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ /// This, however, will not panic:
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::spawn_blocking(|| {
+ /// // ...
+ /// }).await;
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
+ /// [in-place blocking]: fn@crate::task::block_in_place
+ /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
+ #[cfg(feature = "rt")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+ pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ rt.block_on(self.run_until(future))
+ }
+
+ /// Runs a future to completion on the local set, returning its output.
+ ///
+ /// This returns a future that runs the given future with a local set,
+ /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
+ /// Any local futures spawned on the local set will be driven in the
+ /// background until the future passed to `run_until` completes. When the future
+ /// passed to `run` finishes, any local futures which have not completed
+ /// will remain on the local set, and will be driven on subsequent calls to
+ /// `run_until` or when [awaiting the local set] itself.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// task::LocalSet::new().run_until(async {
+ /// task::spawn_local(async move {
+ /// // ...
+ /// }).await.unwrap();
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [awaiting the local set]: #awaiting-a-localset
+ pub async fn run_until<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ let run_until = RunUntil {
+ future,
+ local_set: self,
+ };
+ run_until.await
+ }
+
+ /// Ticks the scheduler, returning whether the local future needs to be
+ /// notified again.
+ fn tick(&self) -> bool {
+ for _ in 0..MAX_TASKS_PER_TICK {
+ match self.next_task() {
+ // Run the task
+ //
+ // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
+ // used. We are responsible for maintaining the invariant that
+ // `run_unchecked` is only called on threads that spawned the
+ // task initially. Because `LocalSet` itself is `!Send`, and
+ // `spawn_local` spawns into the `LocalSet` on the current
+ // thread, the invariant is maintained.
+ Some(task) => crate::coop::budget(|| task.run()),
+ // We have fully drained the queue of notified tasks, so the
+ // local future doesn't need to be notified again — it can wait
+ // until something else wakes a task in the local set.
+ None => return false,
+ }
+ }
+
+ true
+ }
+
+ fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
+ let tick = self.tick.get();
+ self.tick.set(tick.wrapping_add(1));
+
+ let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
+ self.context
+ .shared
+ .queue
+ .lock()
+ .as_mut()
+ .and_then(|queue| queue.pop_front())
+ .or_else(|| self.context.queue.pop_front())
+ } else {
+ self.context.queue.pop_front().or_else(|| {
+ self.context
+ .shared
+ .queue
+ .lock()
+ .as_mut()
+ .and_then(|queue| queue.pop_front())
+ })
+ };
+
+ task.map(|task| self.context.owned.assert_owner(task))
+ }
+
+ fn with<T>(&self, f: impl FnOnce() -> T) -> T {
+ CURRENT.set(&self.context, f)
+ }
+}
+
+impl fmt::Debug for LocalSet {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("LocalSet").finish()
+ }
+}
+
+impl Future for LocalSet {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ // Register the waker before starting to work
+ self.context.shared.waker.register_by_ref(cx.waker());
+
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ } else if self.context.owned.is_empty() {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
+ }
+}
+
+impl Default for LocalSet {
+ fn default() -> LocalSet {
+ LocalSet::new()
+ }
+}
+
+impl Drop for LocalSet {
+ fn drop(&mut self) {
+ self.with(|| {
+ // Shut down all tasks in the LocalOwnedTasks and close it to
+ // prevent new tasks from ever being added.
+ self.context.owned.close_and_shutdown_all();
+
+ // We already called shutdown on all tasks above, so there is no
+ // need to call shutdown.
+ for task in self.context.queue.take() {
+ drop(task);
+ }
+
+ // Take the queue from the Shared object to prevent pushing
+ // notifications to it in the future.
+ let queue = self.context.shared.queue.lock().take().unwrap();
+ for task in queue {
+ drop(task);
+ }
+
+ assert!(self.context.owned.is_empty());
+ });
+ }
+}
+
+// === impl LocalFuture ===
+
+impl<T: Future> Future for RunUntil<'_, T> {
+ type Output = T::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ me.local_set.with(|| {
+ me.local_set
+ .context
+ .shared
+ .waker
+ .register_by_ref(cx.waker());
+
+ let _no_blocking = crate::runtime::enter::disallow_blocking();
+ let f = me.future;
+
+ if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ })
+ }
+}
+
+impl Shared {
+ /// Schedule the provided task on the scheduler.
+ fn schedule(&self, task: task::Notified<Arc<Self>>) {
+ CURRENT.with(|maybe_cx| match maybe_cx {
+ Some(cx) if cx.shared.ptr_eq(self) => {
+ cx.queue.push_back(task);
+ }
+ _ => {
+ // First check whether the queue is still there (if not, the
+ // LocalSet is dropped). Then push to it if so, and if not,
+ // do nothing.
+ let mut lock = self.queue.lock();
+
+ if let Some(queue) = lock.as_mut() {
+ queue.push_back(task);
+ drop(lock);
+ self.waker.wake();
+ }
+ }
+ });
+ }
+
+ fn ptr_eq(&self, other: &Shared) -> bool {
+ std::ptr::eq(self, other)
+ }
+}
+
+impl task::Schedule for Arc<Shared> {
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+ assert!(cx.shared.ptr_eq(self));
+ cx.owned.remove(task)
+ })
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ Shared::schedule(self, task);
+ }
+}
diff --git a/third_party/rust/tokio/src/task/mod.rs b/third_party/rust/tokio/src/task/mod.rs
new file mode 100644
index 0000000000..d532155a1f
--- /dev/null
+++ b/third_party/rust/tokio/src/task/mod.rs
@@ -0,0 +1,317 @@
+//! Asynchronous green-threads.
+//!
+//! ## What are Tasks?
+//!
+//! A _task_ is a light weight, non-blocking unit of execution. A task is similar
+//! to an OS thread, but rather than being managed by the OS scheduler, they are
+//! managed by the [Tokio runtime][rt]. Another name for this general pattern is
+//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's
+//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as
+//! something similar.
+//!
+//! Key points about tasks include:
+//!
+//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio
+//! runtime rather than the operating system, creating new tasks or switching
+//! between tasks does not require a context switch and has fairly low
+//! overhead. Creating, running, and destroying large numbers of tasks is
+//! quite cheap, especially compared to OS threads.
+//!
+//! * Tasks are scheduled **cooperatively**. Most operating systems implement
+//! _preemptive multitasking_. This is a scheduling technique where the
+//! operating system allows each thread to run for a period of time, and then
+//! _preempts_ it, temporarily pausing that thread and switching to another.
+//! Tasks, on the other hand, implement _cooperative multitasking_. In
+//! cooperative multitasking, a task is allowed to run until it _yields_,
+//! indicating to the Tokio runtime's scheduler that it cannot currently
+//! continue executing. When a task yields, the Tokio runtime switches to
+//! executing the next task.
+//!
+//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or
+//! must synchronize with another thread, it _blocks_, allowing the OS to
+//! schedule another thread. When a task cannot continue executing, it must
+//! yield instead, allowing the Tokio runtime to schedule another task. Tasks
+//! should generally not perform system calls or other operations that could
+//! block a thread, as this would prevent other tasks running on the same
+//! thread from executing as well. Instead, this module provides APIs for
+//! running blocking operations in an asynchronous context.
+//!
+//! [rt]: crate::runtime
+//! [green threads]: https://en.wikipedia.org/wiki/Green_threads
+//! [Go's goroutines]: https://tour.golang.org/concurrency/1
+//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html
+//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes
+//!
+//! ## Working with Tasks
+//!
+//! This module provides the following APIs for working with tasks:
+//!
+//! ### Spawning
+//!
+//! Perhaps the most important function in this module is [`task::spawn`]. This
+//! function can be thought of as an async equivalent to the standard library's
+//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other
+//! [future], and creates a new task to run that work concurrently:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn doc() {
+//! task::spawn(async {
+//! // perform some work here...
+//! });
+//! # }
+//! ```
+//!
+//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct.
+//! A `JoinHandle` is itself a future which may be used to await the output of
+//! the spawned task. For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let join = task::spawn(async {
+//! // ...
+//! "hello world!"
+//! });
+//!
+//! // ...
+//!
+//! // Await the result of the spawned task.
+//! let result = join.await?;
+//! assert_eq!(result, "hello world!");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned
+//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! let join = task::spawn(async {
+//! panic!("something bad happened!")
+//! });
+//!
+//! // The returned result indicates that the task failed.
+//! assert!(join.await.is_err());
+//! # }
+//! ```
+//!
+//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt"
+//! feature flag is enabled.
+//!
+//! [`task::spawn`]: crate::task::spawn()
+//! [future]: std::future::Future
+//! [`std::thread::spawn`]: std::thread::spawn
+//! [`JoinHandle`]: crate::task::JoinHandle
+//! [thread_join]: std::thread::JoinHandle
+//! [`JoinError`]: crate::task::JoinError
+//!
+//! ### Blocking and Yielding
+//!
+//! As we discussed above, code running in asynchronous tasks should not perform
+//! operations that can block. A blocking operation performed in a task running
+//! on a thread that is also running other tasks would block the entire thread,
+//! preventing other tasks from running.
+//!
+//! Instead, Tokio provides two APIs for running blocking operations in an
+//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`].
+//!
+//! Be aware that if you call a non-async method from async code, that non-async
+//! method is still inside the asynchronous context, so you should also avoid
+//! blocking operations there. This includes destructors of objects destroyed in
+//! async code.
+//!
+//! #### spawn_blocking
+//!
+//! The `task::spawn_blocking` function is similar to the `task::spawn` function
+//! discussed in the previous section, but rather than spawning an
+//! _non-blocking_ future on the Tokio runtime, it instead spawns a
+//! _blocking_ function on a dedicated thread pool for blocking tasks. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! });
+//! # }
+//! ```
+//!
+//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle`
+//! which we can use to await the result of the blocking operation:
+//!
+//! ```rust
+//! # use tokio::task;
+//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+//! let join = task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! let result = join.await?;
+//! assert_eq!(result, "blocking completed");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! #### block_in_place
+//!
+//! When using the [multi-threaded runtime][rt-multi-thread], the [`task::block_in_place`]
+//! function is also available. Like `task::spawn_blocking`, this function
+//! allows running a blocking operation from an asynchronous context. Unlike
+//! `spawn_blocking`, however, `block_in_place` works by transitioning the
+//! _current_ worker thread to a blocking thread, moving other tasks running on
+//! that thread to another worker thread. This can improve performance by avoiding
+//! context switches.
+//!
+//! For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! let result = task::block_in_place(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! assert_eq!(result, "blocking completed");
+//! # }
+//! ```
+//!
+//! #### yield_now
+//!
+//! In addition, this module provides a [`task::yield_now`] async function
+//! that is analogous to the standard library's [`thread::yield_now`]. Calling
+//! and `await`ing this function will cause the current task to yield to the
+//! Tokio runtime's scheduler, allowing other tasks to be
+//! scheduled. Eventually, the yielding task will be polled again, allowing it
+//! to execute. For example:
+//!
+//! ```rust
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! async {
+//! task::spawn(async {
+//! // ...
+//! println!("spawned task done!")
+//! });
+//!
+//! // Yield, allowing the newly-spawned task to execute first.
+//! task::yield_now().await;
+//! println!("main task done!");
+//! }
+//! # .await;
+//! # }
+//! ```
+//!
+//! ### Cooperative scheduling
+//!
+//! A single call to [`poll`] on a top-level task may potentially do a lot of
+//! work before it returns `Poll::Pending`. If a task runs for a long period of
+//! time without yielding back to the executor, it can starve other tasks
+//! waiting on that executor to execute them, or drive underlying resources.
+//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
+//! long-running task. Instead, this module provides an opt-in mechanism for
+//! futures to collaborate with the executor to avoid starvation.
+//!
+//! Consider a future like this one:
+//!
+//! ```
+//! # use tokio_stream::{Stream, StreamExt};
+//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
+//! while let Some(_) = input.next().await {}
+//! }
+//! ```
+//!
+//! It may look harmless, but consider what happens under heavy load if the
+//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
+//! yield, and will starve other tasks and resources on the same executor.
+//!
+//! To account for this, Tokio has explicit yield points in a number of library
+//! functions, which force tasks to return to the executor periodically.
+//!
+//!
+//! #### unconstrained
+//!
+//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative
+//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
+//! Tokio. For example:
+//!
+//! ```
+//! # #[tokio::main]
+//! # async fn main() {
+//! use tokio::{task, sync::mpsc};
+//!
+//! let fut = async {
+//! let (tx, mut rx) = mpsc::unbounded_channel();
+//!
+//! for i in 0..1000 {
+//! let _ = tx.send(());
+//! // This will always be ready. If coop was in effect, this code would be forced to yield
+//! // periodically. However, if left unconstrained, then this code will never yield.
+//! rx.recv().await;
+//! }
+//! };
+//!
+//! task::unconstrained(fut).await;
+//! # }
+//! ```
+//!
+//! [`task::spawn_blocking`]: crate::task::spawn_blocking
+//! [`task::block_in_place`]: crate::task::block_in_place
+//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
+//! [`task::yield_now`]: crate::task::yield_now()
+//! [`thread::yield_now`]: std::thread::yield_now
+//! [`task::unconstrained`]: crate::task::unconstrained()
+//! [`poll`]: method@std::future::Future::poll
+
+cfg_rt! {
+ pub use crate::runtime::task::{JoinError, JoinHandle};
+
+ mod blocking;
+ pub use blocking::spawn_blocking;
+
+ mod spawn;
+ pub use spawn::spawn;
+
+ cfg_rt_multi_thread! {
+ pub use blocking::block_in_place;
+ }
+
+ mod yield_now;
+ pub use yield_now::yield_now;
+
+ mod local;
+ pub use local::{spawn_local, LocalSet};
+
+ mod task_local;
+ pub use task_local::LocalKey;
+
+ mod unconstrained;
+ pub use unconstrained::{unconstrained, Unconstrained};
+
+ cfg_unstable! {
+ mod join_set;
+ pub use join_set::JoinSet;
+ }
+
+ cfg_trace! {
+ mod builder;
+ pub use builder::Builder;
+ }
+
+ /// Task-related futures.
+ pub mod futures {
+ pub use super::task_local::TaskLocalFuture;
+ }
+}
diff --git a/third_party/rust/tokio/src/task/spawn.rs b/third_party/rust/tokio/src/task/spawn.rs
new file mode 100644
index 0000000000..a9d736674c
--- /dev/null
+++ b/third_party/rust/tokio/src/task/spawn.rs
@@ -0,0 +1,149 @@
+use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR};
+
+use std::future::Future;
+
+cfg_rt! {
+ /// Spawns a new asynchronous task, returning a
+ /// [`JoinHandle`](super::JoinHandle) for it.
+ ///
+ /// Spawning a task enables the task to execute concurrently to other tasks. The
+ /// spawned task may execute on the current thread, or it may be sent to a
+ /// different thread to be executed. The specifics depend on the current
+ /// [`Runtime`](crate::runtime::Runtime) configuration.
+ ///
+ /// There is no guarantee that a spawned task will execute to completion.
+ /// When a runtime is shutdown, all outstanding tasks are dropped,
+ /// regardless of the lifecycle of that task.
+ ///
+ /// This function must be called from the context of a Tokio runtime. Tasks running on
+ /// the Tokio runtime are always inside its context, but you can also enter the context
+ /// using the [`Runtime::enter`](crate::runtime::Runtime::enter()) method.
+ ///
+ /// # Examples
+ ///
+ /// In this example, a server is started and `spawn` is used to start a new task
+ /// that processes each received connection.
+ ///
+ /// ```no_run
+ /// use tokio::net::{TcpListener, TcpStream};
+ ///
+ /// use std::io;
+ ///
+ /// async fn process(socket: TcpStream) {
+ /// // ...
+ /// # drop(socket);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let (socket, _) = listener.accept().await?;
+ ///
+ /// tokio::spawn(async move {
+ /// // Process each socket concurrently.
+ /// process(socket).await
+ /// });
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if called from **outside** of the Tokio runtime.
+ ///
+ /// # Using `!Send` values from a task
+ ///
+ /// The task supplied to `spawn` must implement `Send`. However, it is
+ /// possible to **use** `!Send` values from the task as long as they only
+ /// exist between calls to `.await`.
+ ///
+ /// For example, this will work:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// // Force the `Rc` to stay in a scope with no `.await`
+ /// {
+ /// let rc = Rc::new(());
+ /// use_rc(rc.clone());
+ /// }
+ ///
+ /// task::yield_now().await;
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// This will **not** work:
+ ///
+ /// ```compile_fail
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// let rc = Rc::new(());
+ ///
+ /// task::yield_now().await;
+ ///
+ /// use_rc(rc.clone());
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// Holding on to a `!Send` value across calls to `.await` will result in
+ /// an unfriendly compile error message similar to:
+ ///
+ /// ```text
+ /// `[... some type ...]` cannot be sent between threads safely
+ /// ```
+ ///
+ /// or:
+ ///
+ /// ```text
+ /// error[E0391]: cycle detected when processing `main`
+ /// ```
+ #[track_caller]
+ pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ // preventing stack overflows on debug mode, by quickly sending the
+ // task to the heap.
+ if cfg!(debug_assertions) && std::mem::size_of::<T>() > 2048 {
+ spawn_inner(Box::pin(future), None)
+ } else {
+ spawn_inner(future, None)
+ }
+ }
+
+ #[track_caller]
+ pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR);
+ let task = crate::util::trace::task(future, "task", name);
+ spawn_handle.spawn(task)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/task_local.rs b/third_party/rust/tokio/src/task/task_local.rs
new file mode 100644
index 0000000000..949bbca3ee
--- /dev/null
+++ b/third_party/rust/tokio/src/task/task_local.rs
@@ -0,0 +1,302 @@
+use pin_project_lite::pin_project;
+use std::cell::RefCell;
+use std::error::Error;
+use std::future::Future;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{fmt, thread};
+
+/// Declares a new task-local key of type [`tokio::task::LocalKey`].
+///
+/// # Syntax
+///
+/// The macro wraps any number of static declarations and makes them local to the current task.
+/// Publicity and attributes for each static is preserved. For example:
+///
+/// # Examples
+///
+/// ```
+/// # use tokio::task_local;
+/// task_local! {
+/// pub static ONE: u32;
+///
+/// #[allow(unused)]
+/// static TWO: f32;
+/// }
+/// # fn main() {}
+/// ```
+///
+/// See [LocalKey documentation][`tokio::task::LocalKey`] for more
+/// information.
+///
+/// [`tokio::task::LocalKey`]: struct@crate::task::LocalKey
+#[macro_export]
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+macro_rules! task_local {
+ // empty (base case for the recursion)
+ () => {};
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ $crate::task_local!($($rest)*);
+ };
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ }
+}
+
+#[doc(hidden)]
+#[macro_export]
+macro_rules! __task_local_inner {
+ ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
+ $vis static $name: $crate::task::LocalKey<$t> = {
+ std::thread_local! {
+ static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
+ }
+
+ $crate::task::LocalKey { inner: __KEY }
+ };
+ };
+}
+
+/// A key for task-local data.
+///
+/// This type is generated by the `task_local!` macro.
+///
+/// Unlike [`std::thread::LocalKey`], `tokio::task::LocalKey` will
+/// _not_ lazily initialize the value on first access. Instead, the
+/// value is first initialized when the future containing
+/// the task-local is first polled by a futures executor, like Tokio.
+///
+/// # Examples
+///
+/// ```
+/// # async fn dox() {
+/// tokio::task_local! {
+/// static NUMBER: u32;
+/// }
+///
+/// NUMBER.scope(1, async move {
+/// assert_eq!(NUMBER.get(), 1);
+/// }).await;
+///
+/// NUMBER.scope(2, async move {
+/// assert_eq!(NUMBER.get(), 2);
+///
+/// NUMBER.scope(3, async move {
+/// assert_eq!(NUMBER.get(), 3);
+/// }).await;
+/// }).await;
+/// # }
+/// ```
+/// [`std::thread::LocalKey`]: struct@std::thread::LocalKey
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub struct LocalKey<T: 'static> {
+ #[doc(hidden)]
+ pub inner: thread::LocalKey<RefCell<Option<T>>>,
+}
+
+impl<T: 'static> LocalKey<T> {
+ /// Sets a value `T` as the task-local value for the future `F`.
+ ///
+ /// On completion of `scope`, the task-local will be dropped.
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.scope(1, async move {
+ /// println!("task local value: {}", NUMBER.get());
+ /// }).await;
+ /// # }
+ /// ```
+ pub fn scope<F>(&'static self, value: T, f: F) -> TaskLocalFuture<T, F>
+ where
+ F: Future,
+ {
+ TaskLocalFuture {
+ local: self,
+ slot: Some(value),
+ future: f,
+ _pinned: PhantomPinned,
+ }
+ }
+
+ /// Sets a value `T` as the task-local value for the closure `F`.
+ ///
+ /// On completion of `scope`, the task-local will be dropped.
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.sync_scope(1, || {
+ /// println!("task local value: {}", NUMBER.get());
+ /// });
+ /// # }
+ /// ```
+ pub fn sync_scope<F, R>(&'static self, value: T, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ let scope = TaskLocalFuture {
+ local: self,
+ slot: Some(value),
+ future: (),
+ _pinned: PhantomPinned,
+ };
+ crate::pin!(scope);
+ scope.with_task(|_| f())
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if not called within the context
+ /// of a future containing a task-local with the corresponding key.
+ pub fn with<F, R>(&'static self, f: F) -> R
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.try_with(f).expect(
+ "cannot access a Task Local Storage value \
+ without setting it via `LocalKey::set`",
+ )
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// If the task-local with the associated key is not present, this
+ /// method will return an `AccessError`. For a panicking variant,
+ /// see `with`.
+ pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.inner.with(|v| {
+ if let Some(val) = v.borrow().as_ref() {
+ Ok(f(val))
+ } else {
+ Err(AccessError { _private: () })
+ }
+ })
+ }
+}
+
+impl<T: Copy + 'static> LocalKey<T> {
+ /// Returns a copy of the task-local value
+ /// if the task-local value implements `Copy`.
+ pub fn get(&'static self) -> T {
+ self.with(|v| *v)
+ }
+}
+
+impl<T: 'static> fmt::Debug for LocalKey<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("LocalKey { .. }")
+ }
+}
+
+pin_project! {
+ /// A future that sets a value `T` of a task local for the future `F` during
+ /// its execution.
+ ///
+ /// The value of the task-local must be `'static` and will be dropped on the
+ /// completion of the future.
+ ///
+ /// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.scope(1, async move {
+ /// println!("task local value: {}", NUMBER.get());
+ /// }).await;
+ /// # }
+ /// ```
+ pub struct TaskLocalFuture<T, F>
+ where
+ T: 'static
+ {
+ local: &'static LocalKey<T>,
+ slot: Option<T>,
+ #[pin]
+ future: F,
+ #[pin]
+ _pinned: PhantomPinned,
+ }
+}
+
+impl<T: 'static, F> TaskLocalFuture<T, F> {
+ fn with_task<F2: FnOnce(Pin<&mut F>) -> R, R>(self: Pin<&mut Self>, f: F2) -> R {
+ struct Guard<'a, T: 'static> {
+ local: &'static LocalKey<T>,
+ slot: &'a mut Option<T>,
+ prev: Option<T>,
+ }
+
+ impl<T> Drop for Guard<'_, T> {
+ fn drop(&mut self) {
+ let value = self.local.inner.with(|c| c.replace(self.prev.take()));
+ *self.slot = value;
+ }
+ }
+
+ let project = self.project();
+ let val = project.slot.take();
+
+ let prev = project.local.inner.with(|c| c.replace(val));
+
+ let _guard = Guard {
+ prev,
+ slot: project.slot,
+ local: *project.local,
+ };
+
+ f(project.future)
+ }
+}
+
+impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.with_task(|f| f.poll(cx))
+ }
+}
+
+/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with).
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct AccessError {
+ _private: (),
+}
+
+impl fmt::Debug for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AccessError").finish()
+ }
+}
+
+impl fmt::Display for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt("task-local value not set", f)
+ }
+}
+
+impl Error for AccessError {}
diff --git a/third_party/rust/tokio/src/task/unconstrained.rs b/third_party/rust/tokio/src/task/unconstrained.rs
new file mode 100644
index 0000000000..31c732bfc9
--- /dev/null
+++ b/third_party/rust/tokio/src/task/unconstrained.rs
@@ -0,0 +1,45 @@
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Future for the [`unconstrained`](unconstrained) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+ #[must_use = "Unconstrained does nothing unless polled"]
+ pub struct Unconstrained<F> {
+ #[pin]
+ inner: F,
+ }
+}
+
+impl<F> Future for Unconstrained<F>
+where
+ F: Future,
+{
+ type Output = <F as Future>::Output;
+
+ cfg_coop! {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ crate::coop::with_unconstrained(|| inner.poll(cx))
+ }
+ }
+
+ cfg_not_coop! {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ inner.poll(cx)
+ }
+ }
+}
+
+/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
+/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
+/// otherwise.
+///
+/// See also the usage example in the [task module](index.html#unconstrained).
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
+ Unconstrained { inner }
+}
diff --git a/third_party/rust/tokio/src/task/yield_now.rs b/third_party/rust/tokio/src/task/yield_now.rs
new file mode 100644
index 0000000000..148e3dc0c8
--- /dev/null
+++ b/third_party/rust/tokio/src/task/yield_now.rs
@@ -0,0 +1,58 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Yields execution back to the Tokio runtime.
+///
+/// A task yields by awaiting on `yield_now()`, and may resume when that future
+/// completes (with no output.) The current task will be re-added as a pending
+/// task at the _back_ of the pending queue. Any other pending tasks will be
+/// scheduled. No other waking is required for the task to continue.
+///
+/// See also the usage example in the [task module](index.html#yield_now).
+///
+/// ## Non-guarantees
+///
+/// This function may not yield all the way up to the executor if there are any
+/// special combinators above it in the call stack. For example, if a
+/// [`tokio::select!`] has another branch complete during the same poll as the
+/// `yield_now()`, then the yield is not propagated all the way up to the
+/// runtime.
+///
+/// It is generally not guaranteed that the runtime behaves like you expect it
+/// to when deciding which task to schedule next after a call to `yield_now()`.
+/// In particular, the runtime may choose to poll the task that just ran
+/// `yield_now()` again immediately without polling any other tasks first. For
+/// example, the runtime will not drive the IO driver between every poll of a
+/// task, and this could result in the runtime polling the current task again
+/// immediately even if there is another task that could make progress if that
+/// other task is waiting for a notification from the IO driver.
+///
+/// In general, changes to the order in which the runtime polls tasks is not
+/// considered a breaking change, and your program should be correct no matter
+/// which order the runtime polls your tasks in.
+///
+/// [`tokio::select!`]: macro@crate::select
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub async fn yield_now() {
+ /// Yield implementation
+ struct YieldNow {
+ yielded: bool,
+ }
+
+ impl Future for YieldNow {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ if self.yielded {
+ return Poll::Ready(());
+ }
+
+ self.yielded = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ YieldNow { yielded: false }.await
+}