summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/task
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/task
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/task')
-rw-r--r--third_party/rust/tokio/src/task/blocking.rs71
-rw-r--r--third_party/rust/tokio/src/task/local.rs584
-rw-r--r--third_party/rust/tokio/src/task/mod.rs242
-rw-r--r--third_party/rust/tokio/src/task/spawn.rs134
-rw-r--r--third_party/rust/tokio/src/task/task_local.rs240
-rw-r--r--third_party/rust/tokio/src/task/yield_now.rs38
6 files changed, 1309 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/task/blocking.rs b/third_party/rust/tokio/src/task/blocking.rs
new file mode 100644
index 0000000000..0069b10ada
--- /dev/null
+++ b/third_party/rust/tokio/src/task/blocking.rs
@@ -0,0 +1,71 @@
+use crate::task::JoinHandle;
+
+cfg_rt_threaded! {
+ /// Runs the provided blocking function without blocking the executor.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is not okay, as it may prevent the executor from
+ /// driving other futures forward. If you run a closure through this method,
+ /// the current executor thread will relegate all its executor duties to another
+ /// (possibly new) thread, and only then poll the task. Note that this requires
+ /// additional synchronization.
+ ///
+ /// # Note
+ ///
+ /// This function can only be called from a spawned task when working with
+ /// the [threaded scheduler](https://docs.rs/tokio/0.2.10/tokio/runtime/index.html#threaded-scheduler).
+ /// Consider using [tokio::task::spawn_blocking](https://docs.rs/tokio/0.2.10/tokio/task/fn.spawn_blocking.html).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// // do some compute-heavy work or call synchronous code
+ /// });
+ /// # }
+ /// ```
+ #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
+ pub fn block_in_place<F, R>(f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ use crate::runtime::{enter, thread_pool};
+
+ enter::exit(|| thread_pool::block_in_place(f))
+ }
+}
+
+cfg_blocking! {
+ /// Runs the provided closure on a thread where blocking is acceptable.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a future without
+ /// yielding is not okay, as it may prevent the executor from driving other futures forward.
+ /// A closure that is run through this method will instead be run on a dedicated thread pool for
+ /// such blocking tasks without holding up the main futures executor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+ /// let res = task::spawn_blocking(move || {
+ /// // do some compute-heavy work or call synchronous code
+ /// "done computing"
+ /// }).await?;
+ ///
+ /// assert_eq!(res, "done computing");
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ crate::runtime::spawn_blocking(f)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/local.rs b/third_party/rust/tokio/src/task/local.rs
new file mode 100644
index 0000000000..fce467079f
--- /dev/null
+++ b/third_party/rust/tokio/src/task/local.rs
@@ -0,0 +1,584 @@
+//! Runs `!Send` futures on the current thread.
+use crate::runtime::task::{self, JoinHandle, Task};
+use crate::sync::AtomicWaker;
+use crate::util::linked_list::LinkedList;
+
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::Poll;
+
+use pin_project_lite::pin_project;
+
+cfg_rt_util! {
+ /// A set of tasks which are executed on the same thread.
+ ///
+ /// In some cases, it is necessary to run one or more futures that do not
+ /// implement [`Send`] and thus are unsafe to send between threads. In these
+ /// cases, a [local task set] may be used to schedule one or more `!Send`
+ /// futures to run together on the same thread.
+ ///
+ /// For example, the following code will not compile:
+ ///
+ /// ```rust,compile_fail
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // `Rc` does not implement `Send`, and thus may not be sent between
+ /// // threads safely.
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// let unsend_data = unsend_data.clone();
+ /// // Because the `async` block here moves `unsend_data`, the future is `!Send`.
+ /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
+ /// // will not compile.
+ /// tokio::spawn(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ /// In order to spawn `!Send` futures, we can use a local task set to
+ /// schedule them on the thread calling [`Runtime::block_on`]. When running
+ /// inside of the local task set, we can use [`task::spawn_local`], which can
+ /// spawn `!Send` futures. For example:
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// // Construct a local task set that can run `!Send` futures.
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let unsend_data = unsend_data.clone();
+ /// // `spawn_local` ensures that the future is spawned on the local
+ /// // task set.
+ /// task::spawn_local(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ ///
+ /// ## Awaiting a `LocalSet`
+ ///
+ /// Additionally, a `LocalSet` itself implements `Future`, completing when
+ /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
+ /// several futures on a `LocalSet` and drive the whole set until they
+ /// complete. For example,
+ ///
+ /// ```rust
+ /// use tokio::{task, time};
+ /// use std::rc::Rc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("world");
+ /// let local = task::LocalSet::new();
+ ///
+ /// let unsend_data2 = unsend_data.clone();
+ /// local.spawn_local(async move {
+ /// // ...
+ /// println!("hello {}", unsend_data2)
+ /// });
+ ///
+ /// local.spawn_local(async move {
+ /// time::delay_for(time::Duration::from_millis(100)).await;
+ /// println!("goodbye {}", unsend_data)
+ /// });
+ ///
+ /// // ...
+ ///
+ /// local.await;
+ /// }
+ /// ```
+ ///
+ /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
+ /// [local task set]: struct@LocalSet
+ /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
+ /// [`task::spawn_local`]: fn@spawn_local
+ pub struct LocalSet {
+ /// Current scheduler tick
+ tick: Cell<u8>,
+
+ /// State available from thread-local
+ context: Context,
+
+ /// This type should not be Send.
+ _not_send: PhantomData<*const ()>,
+ }
+}
+
+/// State available from the thread-local
+struct Context {
+ /// Owned task set and local run queue
+ tasks: RefCell<Tasks>,
+
+ /// State shared between threads.
+ shared: Arc<Shared>,
+}
+
+struct Tasks {
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LinkedList<Task<Arc<Shared>>>,
+
+ /// Local run queue sender and receiver.
+ queue: VecDeque<task::Notified<Arc<Shared>>>,
+}
+
+/// LocalSet state shared between threads.
+struct Shared {
+ /// Remote run queue sender
+ queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
+
+ /// Wake the `LocalSet` task
+ waker: AtomicWaker,
+}
+
+pin_project! {
+ #[derive(Debug)]
+ struct RunUntil<'a, F> {
+ local_set: &'a LocalSet,
+ #[pin]
+ future: F,
+ }
+}
+
+scoped_thread_local!(static CURRENT: Context);
+
+cfg_rt_util! {
+ /// Spawns a `!Send` future on the local task set.
+ ///
+ /// The spawned future will be run on the same thread that called `spawn_local.`
+ /// This may only be called from the context of a local task set.
+ ///
+ /// # Panics
+ ///
+ /// - This function panics if called outside of a local task set.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::rc::Rc;
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let unsend_data = Rc::new("my unsend data...");
+ ///
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// let unsend_data = unsend_data.clone();
+ /// task::spawn_local(async move {
+ /// println!("{}", unsend_data);
+ /// // ...
+ /// }).await.unwrap();
+ /// }).await;
+ /// }
+ /// ```
+ pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx
+ .expect("`spawn_local` called from outside of a `task::LocalSet`");
+
+ // Safety: Tasks are only polled and dropped from the thread that
+ // spawns them.
+ let (task, handle) = unsafe { task::joinable_local(future) };
+ cx.tasks.borrow_mut().queue.push_back(task);
+ handle
+ })
+ }
+}
+
+/// Initial queue capacity
+const INITIAL_CAPACITY: usize = 64;
+
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often it check the remote queue first
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+impl LocalSet {
+ /// Returns a new local task set.
+ pub fn new() -> LocalSet {
+ LocalSet {
+ tick: Cell::new(0),
+ context: Context {
+ tasks: RefCell::new(Tasks {
+ owned: LinkedList::new(),
+ queue: VecDeque::with_capacity(INITIAL_CAPACITY),
+ }),
+ shared: Arc::new(Shared {
+ queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ waker: AtomicWaker::new(),
+ }),
+ },
+ _not_send: PhantomData,
+ }
+ }
+
+ /// Spawns a `!Send` task onto the local task set.
+ ///
+ /// This task is guaranteed to be run on the current thread.
+ ///
+ /// Unlike the free function [`spawn_local`], this method may be used to
+ /// spawn local tasks when the task set is _not_ running. For example:
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let local = task::LocalSet::new();
+ ///
+ /// // Spawn a future on the local set. This future will be run when
+ /// // we call `run_until` to drive the task set.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// // Run the local task set.
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ ///
+ /// // When `run` finishes, we can spawn _more_ futures, which will
+ /// // run in subsequent calls to `run_until`.
+ /// local.spawn_local(async {
+ /// // ...
+ /// });
+ ///
+ /// local.run_until(async move {
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ /// [`spawn_local`]: fn@spawn_local
+ pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + 'static,
+ F::Output: 'static,
+ {
+ let (task, handle) = unsafe { task::joinable_local(future) };
+ self.context.tasks.borrow_mut().queue.push_back(task);
+ handle
+ }
+
+ /// Runs a future to completion on the provided runtime, driving any local
+ /// futures spawned on this task set on the current thread.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers which
+ /// the future spawns internally will be executed on the runtime. The future
+ /// may also call [`spawn_local`] to spawn_local additional local futures on the
+ /// current thread.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ ///
+ /// # Notes
+ ///
+ /// Since this function internally calls [`Runtime::block_on`], and drives
+ /// futures in the local task set inside that call to `block_on`, the local
+ /// futures may not use [in-place blocking]. If a blocking call needs to be
+ /// issued from a local task, the [`spawn_blocking`] API may be used instead.
+ ///
+ /// For example, this will panic:
+ /// ```should_panic
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let mut rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&mut rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::block_in_place(|| {
+ /// // ...
+ /// });
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ /// This, however, will not panic:
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// let mut rt = Runtime::new().unwrap();
+ /// let local = task::LocalSet::new();
+ /// local.block_on(&mut rt, async {
+ /// let join = task::spawn_local(async {
+ /// let blocking_result = task::spawn_blocking(|| {
+ /// // ...
+ /// }).await;
+ /// // ...
+ /// });
+ /// join.await.unwrap();
+ /// })
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
+ /// [in-place blocking]: ../blocking/fn.in_place.html
+ /// [`spawn_blocking`]: ../blocking/fn.spawn_blocking.html
+ pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ rt.block_on(self.run_until(future))
+ }
+
+ /// Run a future to completion on the local set, returning its output.
+ ///
+ /// This returns a future that runs the given future with a local set,
+ /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
+ /// Any local futures spawned on the local set will be driven in the
+ /// background until the future passed to `run_until` completes. When the future
+ /// passed to `run` finishes, any local futures which have not completed
+ /// will remain on the local set, and will be driven on subsequent calls to
+ /// `run_until` or when [awaiting the local set] itself.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use tokio::task;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// task::LocalSet::new().run_until(async {
+ /// task::spawn_local(async move {
+ /// // ...
+ /// }).await.unwrap();
+ /// // ...
+ /// }).await;
+ /// }
+ /// ```
+ ///
+ /// [`spawn_local`]: fn@spawn_local
+ /// [awaiting the local set]: #awaiting-a-localset
+ pub async fn run_until<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ let run_until = RunUntil {
+ future,
+ local_set: self,
+ };
+ run_until.await
+ }
+
+ /// Tick the scheduler, returning whether the local future needs to be
+ /// notified again.
+ fn tick(&self) -> bool {
+ for _ in 0..MAX_TASKS_PER_TICK {
+ match self.next_task() {
+ // Run the task
+ //
+ // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
+ // used. We are responsible for maintaining the invariant that
+ // `run_unchecked` is only called on threads that spawned the
+ // task initially. Because `LocalSet` itself is `!Send`, and
+ // `spawn_local` spawns into the `LocalSet` on the current
+ // thread, the invariant is maintained.
+ Some(task) => crate::coop::budget(|| task.run()),
+ // We have fully drained the queue of notified tasks, so the
+ // local future doesn't need to be notified again — it can wait
+ // until something else wakes a task in the local set.
+ None => return false,
+ }
+ }
+
+ true
+ }
+
+ fn next_task(&self) -> Option<task::Notified<Arc<Shared>>> {
+ let tick = self.tick.get();
+ self.tick.set(tick.wrapping_add(1));
+
+ if tick % REMOTE_FIRST_INTERVAL == 0 {
+ self.context
+ .shared
+ .queue
+ .lock()
+ .unwrap()
+ .pop_front()
+ .or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
+ } else {
+ self.context
+ .tasks
+ .borrow_mut()
+ .queue
+ .pop_front()
+ .or_else(|| self.context.shared.queue.lock().unwrap().pop_front())
+ }
+ }
+
+ fn with<T>(&self, f: impl FnOnce() -> T) -> T {
+ CURRENT.set(&self.context, f)
+ }
+}
+
+impl fmt::Debug for LocalSet {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("LocalSet").finish()
+ }
+}
+
+impl Future for LocalSet {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ // Register the waker before starting to work
+ self.context.shared.waker.register_by_ref(cx.waker());
+
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ } else if self.context.tasks.borrow().owned.is_empty() {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
+ }
+}
+
+impl Default for LocalSet {
+ fn default() -> LocalSet {
+ LocalSet::new()
+ }
+}
+
+impl Drop for LocalSet {
+ fn drop(&mut self) {
+ self.with(|| {
+ // Loop required here to ensure borrow is dropped between iterations
+ #[allow(clippy::while_let_loop)]
+ loop {
+ let task = match self.context.tasks.borrow_mut().owned.pop_back() {
+ Some(task) => task,
+ None => break,
+ };
+
+ // Safety: same as `run_unchecked`.
+ task.shutdown();
+ }
+
+ for task in self.context.tasks.borrow_mut().queue.drain(..) {
+ task.shutdown();
+ }
+
+ for task in self.context.shared.queue.lock().unwrap().drain(..) {
+ task.shutdown();
+ }
+
+ assert!(self.context.tasks.borrow().owned.is_empty());
+ });
+ }
+}
+
+// === impl LocalFuture ===
+
+impl<T: Future> Future for RunUntil<'_, T> {
+ type Output = T::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ me.local_set.with(|| {
+ me.local_set
+ .context
+ .shared
+ .waker
+ .register_by_ref(cx.waker());
+
+ if let Poll::Ready(output) = me.future.poll(cx) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ })
+ }
+}
+
+impl Shared {
+ /// Schedule the provided task on the scheduler.
+ fn schedule(&self, task: task::Notified<Arc<Self>>) {
+ CURRENT.with(|maybe_cx| match maybe_cx {
+ Some(cx) if cx.shared.ptr_eq(self) => {
+ cx.tasks.borrow_mut().queue.push_back(task);
+ }
+ _ => {
+ self.queue.lock().unwrap().push_back(task);
+ self.waker.wake();
+ }
+ });
+ }
+
+ fn ptr_eq(&self, other: &Shared) -> bool {
+ self as *const _ == other as *const _
+ }
+}
+
+impl task::Schedule for Arc<Shared> {
+ fn bind(task: Task<Self>) -> Arc<Shared> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+ cx.tasks.borrow_mut().owned.push_front(task);
+ cx.shared.clone()
+ })
+ }
+
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ use std::ptr::NonNull;
+
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+
+ assert!(cx.shared.ptr_eq(self));
+
+ let ptr = NonNull::from(task.header());
+ // safety: task must be contained by list. It is inserted into the
+ // list in `bind`.
+ unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
+ })
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ Shared::schedule(self, task);
+ }
+}
diff --git a/third_party/rust/tokio/src/task/mod.rs b/third_party/rust/tokio/src/task/mod.rs
new file mode 100644
index 0000000000..5c89393a5e
--- /dev/null
+++ b/third_party/rust/tokio/src/task/mod.rs
@@ -0,0 +1,242 @@
+//! Asynchronous green-threads.
+//!
+//! ## What are Tasks?
+//!
+//! A _task_ is a light weight, non-blocking unit of execution. A task is similar
+//! to an OS thread, but rather than being managed by the OS scheduler, they are
+//! managed by the [Tokio runtime][rt]. Another name for this general pattern is
+//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's
+//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as
+//! something similar.
+//!
+//! Key points about tasks include:
+//!
+//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio
+//! runtime rather than the operating system, creating new tasks or switching
+//! between tasks does not require a context switch and has fairly low
+//! overhead. Creating, running, and destroying large numbers of tasks is
+//! quite cheap, especially compared to OS threads.
+//!
+//! * Tasks are scheduled **cooperatively**. Most operating systems implement
+//! _preemptive multitasking_. This is a scheduling technique where the
+//! operating system allows each thread to run for a period of time, and then
+//! _preempts_ it, temporarily pausing that thread and switching to another.
+//! Tasks, on the other hand, implement _cooperative multitasking_. In
+//! cooperative multitasking, a task is allowed to run until it _yields_,
+//! indicating to the Tokio runtime's scheduler that it cannot currently
+//! continue executing. When a task yields, the Tokio runtime switches to
+//! executing the next task.
+//!
+//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or
+//! must synchronize with another thread, it _blocks_, allowing the OS to
+//! schedule another thread. When a task cannot continue executing, it must
+//! yield instead, allowing the Tokio runtime to schedule another task. Tasks
+//! should generally not perform system calls or other operations that could
+//! block a thread, as this would prevent other tasks running on the same
+//! thread from executing as well. Instead, this module provides APIs for
+//! running blocking operations in an asynchronous context.
+//!
+//! [rt]: crate::runtime
+//! [green threads]: https://en.wikipedia.org/wiki/Green_threads
+//! [Go's goroutines]: https://tour.golang.org/concurrency/1
+//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html
+//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes
+//!
+//! ## Working with Tasks
+//!
+//! This module provides the following APIs for working with tasks:
+//!
+//! ### Spawning
+//!
+//! Perhaps the most important function in this module is [`task::spawn`]. This
+//! function can be thought of as an async equivalent to the standard library's
+//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other
+//! [future], and creates a new task to run that work concurrently:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn doc() {
+//! task::spawn(async {
+//! // perform some work here...
+//! });
+//! # }
+//! ```
+//!
+//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct.
+//! A `JoinHandle` is itself a future which may be used to await the output of
+//! the spawned task. For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let join = task::spawn(async {
+//! // ...
+//! "hello world!"
+//! });
+//!
+//! // ...
+//!
+//! // Await the result of the spawned task.
+//! let result = join.await?;
+//! assert_eq!(result, "hello world!");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned
+//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]`. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! let join = task::spawn(async {
+//! panic!("something bad happened!")
+//! });
+//!
+//! // The returned result indicates that the task failed.
+//! assert!(join.await.is_err());
+//! # }
+//! ```
+//!
+//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt-core"
+//! feature flag is enabled.
+//!
+//! [`task::spawn`]: crate::task::spawn()
+//! [future]: std::future::Future
+//! [`std::thread::spawn`]: std::thread::spawn
+//! [`JoinHandle`]: crate::task::JoinHandle
+//! [thread_join]: std::thread::JoinHandle
+//! [`JoinError`]: crate::task::JoinError
+//!
+//! ### Blocking and Yielding
+//!
+//! As we discussed above, code running in asynchronous tasks should not perform
+//! operations that can block. A blocking operation performed in a task running
+//! on a thread that is also running other tasks would block the entire thread,
+//! preventing other tasks from running.
+//!
+//! Instead, Tokio provides two APIs for running blocking operations in an
+//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`].
+//!
+//! #### spawn_blocking
+//!
+//! The `task::spawn_blocking` function is similar to the `task::spawn` function
+//! discussed in the previous section, but rather than spawning an
+//! _non-blocking_ future on the Tokio runtime, it instead spawns a
+//! _blocking_ function on a dedicated thread pool for blocking tasks. For
+//! example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! });
+//! # }
+//! ```
+//!
+//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle`
+//! which we can use to await the result of the blocking operation:
+//!
+//! ```rust
+//! # use tokio::task;
+//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+//! let join = task::spawn_blocking(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! let result = join.await?;
+//! assert_eq!(result, "blocking completed");
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! #### block_in_place
+//!
+//! When using the [threaded runtime][rt-threaded], the [`task::block_in_place`]
+//! function is also available. Like `task::spawn_blocking`, this function
+//! allows running a blocking operation from an asynchronous context. Unlike
+//! `spawn_blocking`, however, `block_in_place` works by transitioning the
+//! _current_ worker thread to a blocking thread, moving other tasks running on
+//! that thread to another worker thread. This can improve performance by avoiding
+//! context switches.
+//!
+//! For example:
+//!
+//! ```
+//! use tokio::task;
+//!
+//! # async fn docs() {
+//! let result = task::block_in_place(|| {
+//! // do some compute-heavy work or call synchronous code
+//! "blocking completed"
+//! });
+//!
+//! assert_eq!(result, "blocking completed");
+//! # }
+//! ```
+//!
+//! #### yield_now
+//!
+//! In addition, this module provides a [`task::yield_now`] async function
+//! that is analogous to the standard library's [`thread::yield_now`]. Calling
+//! and `await`ing this function will cause the current task to yield to the
+//! Tokio runtime's scheduler, allowing other tasks to be
+//! scheduled. Eventually, the yielding task will be polled again, allowing it
+//! to execute. For example:
+//!
+//! ```rust
+//! use tokio::task;
+//!
+//! # #[tokio::main] async fn main() {
+//! async {
+//! task::spawn(async {
+//! // ...
+//! println!("spawned task done!")
+//! });
+//!
+//! // Yield, allowing the newly-spawned task to execute first.
+//! task::yield_now().await;
+//! println!("main task done!");
+//! }
+//! # .await;
+//! # }
+//! ```
+//!
+//! [`task::spawn_blocking`]: crate::task::spawn_blocking
+//! [`task::block_in_place`]: crate::task::block_in_place
+//! [rt-threaded]: ../runtime/index.html#threaded-scheduler
+//! [`task::yield_now`]: crate::task::yield_now()
+//! [`thread::yield_now`]: std::thread::yield_now
+cfg_blocking! {
+ mod blocking;
+ pub use blocking::spawn_blocking;
+
+ cfg_rt_threaded! {
+ pub use blocking::block_in_place;
+ }
+}
+
+cfg_rt_core! {
+ pub use crate::runtime::task::{JoinError, JoinHandle};
+
+ mod spawn;
+ pub use spawn::spawn;
+
+ mod yield_now;
+ pub use yield_now::yield_now;
+}
+
+cfg_rt_util! {
+ mod local;
+ pub use local::{spawn_local, LocalSet};
+
+ mod task_local;
+ pub use task_local::LocalKey;
+}
diff --git a/third_party/rust/tokio/src/task/spawn.rs b/third_party/rust/tokio/src/task/spawn.rs
new file mode 100644
index 0000000000..fa5ff13b01
--- /dev/null
+++ b/third_party/rust/tokio/src/task/spawn.rs
@@ -0,0 +1,134 @@
+use crate::runtime;
+use crate::task::JoinHandle;
+
+use std::future::Future;
+
+doc_rt_core! {
+ /// Spawns a new asynchronous task, returning a
+ /// [`JoinHandle`](super::JoinHandle) for it.
+ ///
+ /// Spawning a task enables the task to execute concurrently to other tasks. The
+ /// spawned task may execute on the current thread, or it may be sent to a
+ /// different thread to be executed. The specifics depend on the current
+ /// [`Runtime`](crate::runtime::Runtime) configuration.
+ ///
+ /// There is no guarantee that a spawned task will execute to completion.
+ /// When a runtime is shutdown, all outstanding tasks are dropped,
+ /// regardless of the lifecycle of that task.
+ ///
+ /// This function must be called from the context of a Tokio runtime. Tasks running on
+ /// the Tokio runtime are always inside its context, but you can also enter the context
+ /// using the [`Handle::enter`](crate::runtime::Handle::enter()) method.
+ ///
+ /// # Examples
+ ///
+ /// In this example, a server is started and `spawn` is used to start a new task
+ /// that processes each received connection.
+ ///
+ /// ```no_run
+ /// use tokio::net::{TcpListener, TcpStream};
+ ///
+ /// use std::io;
+ ///
+ /// async fn process(socket: TcpStream) {
+ /// // ...
+ /// # drop(socket);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let (socket, _) = listener.accept().await?;
+ ///
+ /// tokio::spawn(async move {
+ /// // Process each socket concurrently.
+ /// process(socket).await
+ /// });
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if called from **outside** of the Tokio runtime.
+ ///
+ /// # Using `!Send` values from a task
+ ///
+ /// The task supplied to `spawn` must implement `Send`. However, it is
+ /// possible to **use** `!Send` values from the task as long as they only
+ /// exist between calls to `.await`.
+ ///
+ /// For example, this will work:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// // Force the `Rc` to stay in a scope with no `.await`
+ /// {
+ /// let rc = Rc::new(());
+ /// use_rc(rc.clone());
+ /// }
+ ///
+ /// task::yield_now().await;
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// This will **not** work:
+ ///
+ /// ```compile_fail
+ /// use tokio::task;
+ ///
+ /// use std::rc::Rc;
+ ///
+ /// fn use_rc(rc: Rc<()>) {
+ /// // Do stuff w/ rc
+ /// # drop(rc);
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// tokio::spawn(async {
+ /// let rc = Rc::new(());
+ ///
+ /// task::yield_now().await;
+ ///
+ /// use_rc(rc.clone());
+ /// }).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// Holding on to a `!Send` value across calls to `.await` will result in
+ /// an unfriendly compile error message similar to:
+ ///
+ /// ```text
+ /// `[... some type ...]` cannot be sent between threads safely
+ /// ```
+ ///
+ /// or:
+ ///
+ /// ```text
+ /// error[E0391]: cycle detected when processing `main`
+ /// ```
+ pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let spawn_handle = runtime::context::spawn_handle()
+ .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
+ spawn_handle.spawn(task)
+ }
+}
diff --git a/third_party/rust/tokio/src/task/task_local.rs b/third_party/rust/tokio/src/task/task_local.rs
new file mode 100644
index 0000000000..f3341b6a7e
--- /dev/null
+++ b/third_party/rust/tokio/src/task/task_local.rs
@@ -0,0 +1,240 @@
+use pin_project_lite::pin_project;
+use std::cell::RefCell;
+use std::error::Error;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{fmt, thread};
+
+/// Declares a new task-local key of type [`tokio::task::LocalKey`].
+///
+/// # Syntax
+///
+/// The macro wraps any number of static declarations and makes them local to the current task.
+/// Publicity and attributes for each static is preserved. For example:
+///
+/// # Examples
+///
+/// ```
+/// # use tokio::task_local;
+/// task_local! {
+/// pub static ONE: u32;
+///
+/// #[allow(unused)]
+/// static TWO: f32;
+/// }
+/// # fn main() {}
+/// ```
+///
+/// See [LocalKey documentation][`tokio::task::LocalKey`] for more
+/// information.
+///
+/// [`tokio::task::LocalKey`]: ../tokio/task/struct.LocalKey.html
+#[macro_export]
+macro_rules! task_local {
+ // empty (base case for the recursion)
+ () => {};
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ $crate::task_local!($($rest)*);
+ };
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ }
+}
+
+#[doc(hidden)]
+#[macro_export]
+macro_rules! __task_local_inner {
+ ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
+ static $name: $crate::task::LocalKey<$t> = {
+ std::thread_local! {
+ static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
+ }
+
+ $crate::task::LocalKey { inner: __KEY }
+ };
+ };
+}
+
+/// A key for task-local data.
+///
+/// This type is generated by the `task_local!` macro.
+///
+/// Unlike [`std::thread::LocalKey`], `tokio::task::LocalKey` will
+/// _not_ lazily initialize the value on first access. Instead, the
+/// value is first initialized when the future containing
+/// the task-local is first polled by a futures executor, like Tokio.
+///
+/// # Examples
+///
+/// ```
+/// # async fn dox() {
+/// tokio::task_local! {
+/// static NUMBER: u32;
+/// }
+///
+/// NUMBER.scope(1, async move {
+/// assert_eq!(NUMBER.get(), 1);
+/// }).await;
+///
+/// NUMBER.scope(2, async move {
+/// assert_eq!(NUMBER.get(), 2);
+///
+/// NUMBER.scope(3, async move {
+/// assert_eq!(NUMBER.get(), 3);
+/// }).await;
+/// }).await;
+/// # }
+/// ```
+/// [`std::thread::LocalKey`]: https://doc.rust-lang.org/std/thread/struct.LocalKey.html
+pub struct LocalKey<T: 'static> {
+ #[doc(hidden)]
+ pub inner: thread::LocalKey<RefCell<Option<T>>>,
+}
+
+impl<T: 'static> LocalKey<T> {
+ /// Sets a value `T` as the task-local value for the future `F`.
+ ///
+ /// On completion of `scope`, the task-local will be dropped.
+ ///
+ /// ### Examples
+ ///
+ /// ```
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static NUMBER: u32;
+ /// }
+ ///
+ /// NUMBER.scope(1, async move {
+ /// println!("task local value: {}", NUMBER.get());
+ /// }).await;
+ /// # }
+ /// ```
+ pub async fn scope<F>(&'static self, value: T, f: F) -> F::Output
+ where
+ F: Future,
+ {
+ TaskLocalFuture {
+ local: &self,
+ slot: Some(value),
+ future: f,
+ }
+ .await
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if not called within the context
+ /// of a future containing a task-local with the corresponding key.
+ pub fn with<F, R>(&'static self, f: F) -> R
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.try_with(f).expect(
+ "cannot access a Task Local Storage value \
+ without setting it via `LocalKey::set`",
+ )
+ }
+
+ /// Accesses the current task-local and runs the provided closure.
+ ///
+ /// If the task-local with the accociated key is not present, this
+ /// method will return an `AccessError`. For a panicking variant,
+ /// see `with`.
+ pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.inner.with(|v| {
+ if let Some(val) = v.borrow().as_ref() {
+ Ok(f(val))
+ } else {
+ Err(AccessError { _private: () })
+ }
+ })
+ }
+}
+
+impl<T: Copy + 'static> LocalKey<T> {
+ /// Returns a copy of the task-local value
+ /// if the task-local value implements `Copy`.
+ pub fn get(&'static self) -> T {
+ self.with(|v| *v)
+ }
+}
+
+impl<T: 'static> fmt::Debug for LocalKey<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("LocalKey { .. }")
+ }
+}
+
+pin_project! {
+ struct TaskLocalFuture<T: StaticLifetime, F> {
+ local: &'static LocalKey<T>,
+ slot: Option<T>,
+ #[pin]
+ future: F,
+ }
+}
+
+impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ struct Guard<'a, T: 'static> {
+ local: &'static LocalKey<T>,
+ slot: &'a mut Option<T>,
+ prev: Option<T>,
+ }
+
+ impl<T> Drop for Guard<'_, T> {
+ fn drop(&mut self) {
+ let value = self.local.inner.with(|c| c.replace(self.prev.take()));
+ *self.slot = value;
+ }
+ }
+
+ let mut project = self.project();
+ let val = project.slot.take();
+
+ let prev = project.local.inner.with(|c| c.replace(val));
+
+ let _guard = Guard {
+ prev,
+ slot: &mut project.slot,
+ local: *project.local,
+ };
+
+ project.future.poll(cx)
+ }
+}
+
+// Required to make `pin_project` happy.
+trait StaticLifetime: 'static {}
+impl<T: 'static> StaticLifetime for T {}
+
+/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with).
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct AccessError {
+ _private: (),
+}
+
+impl fmt::Debug for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AccessError").finish()
+ }
+}
+
+impl fmt::Display for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt("task-local value not set", f)
+ }
+}
+
+impl Error for AccessError {}
diff --git a/third_party/rust/tokio/src/task/yield_now.rs b/third_party/rust/tokio/src/task/yield_now.rs
new file mode 100644
index 0000000000..e0e20841c9
--- /dev/null
+++ b/third_party/rust/tokio/src/task/yield_now.rs
@@ -0,0 +1,38 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+doc_rt_core! {
+ /// Yields execution back to the Tokio runtime.
+ ///
+ /// A task yields by awaiting on `yield_now()`, and may resume when that
+ /// future completes (with no output.) The current task will be re-added as
+ /// a pending task at the _back_ of the pending queue. Any other pending
+ /// tasks will be scheduled. No other waking is required for the task to
+ /// continue.
+ ///
+ /// See also the usage example in the [task module](index.html#yield_now).
+ #[must_use = "yield_now does nothing unless polled/`await`-ed"]
+ pub async fn yield_now() {
+ /// Yield implementation
+ struct YieldNow {
+ yielded: bool,
+ }
+
+ impl Future for YieldNow {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ if self.yielded {
+ return Poll::Ready(());
+ }
+
+ self.yielded = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ YieldNow { yielded: false }.await
+ }
+}