diff options
Diffstat (limited to 'third_party/rust/tokio/src/task')
-rw-r--r-- | third_party/rust/tokio/src/task/blocking.rs | 71 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/local.rs | 584 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/mod.rs | 242 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/spawn.rs | 134 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/task_local.rs | 240 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/yield_now.rs | 38 |
6 files changed, 1309 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/task/blocking.rs b/third_party/rust/tokio/src/task/blocking.rs new file mode 100644 index 0000000000..0069b10ada --- /dev/null +++ b/third_party/rust/tokio/src/task/blocking.rs @@ -0,0 +1,71 @@ +use crate::task::JoinHandle; + +cfg_rt_threaded! { + /// Runs the provided blocking function without blocking the executor. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the executor from + /// driving other futures forward. If you run a closure through this method, + /// the current executor thread will relegate all its executor duties to another + /// (possibly new) thread, and only then poll the task. Note that this requires + /// additional synchronization. + /// + /// # Note + /// + /// This function can only be called from a spawned task when working with + /// the [threaded scheduler](https://docs.rs/tokio/0.2.10/tokio/runtime/index.html#threaded-scheduler). + /// Consider using [tokio::task::spawn_blocking](https://docs.rs/tokio/0.2.10/tokio/task/fn.spawn_blocking.html). + /// + /// # Examples + /// + /// ``` + /// use tokio::task; + /// + /// # async fn docs() { + /// task::block_in_place(move || { + /// // do some compute-heavy work or call synchronous code + /// }); + /// # } + /// ``` + #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] + pub fn block_in_place<F, R>(f: F) -> R + where + F: FnOnce() -> R, + { + use crate::runtime::{enter, thread_pool}; + + enter::exit(|| thread_pool::block_in_place(f)) + } +} + +cfg_blocking! { + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a future without + /// yielding is not okay, as it may prevent the executor from driving other futures forward. + /// A closure that is run through this method will instead be run on a dedicated thread pool for + /// such blocking tasks without holding up the main futures executor. + /// + /// # Examples + /// + /// ``` + /// use tokio::task; + /// + /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ + /// let res = task::spawn_blocking(move || { + /// // do some compute-heavy work or call synchronous code + /// "done computing" + /// }).await?; + /// + /// assert_eq!(res, "done computing"); + /// # Ok(()) + /// # } + /// ``` + pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + crate::runtime::spawn_blocking(f) + } +} diff --git a/third_party/rust/tokio/src/task/local.rs b/third_party/rust/tokio/src/task/local.rs new file mode 100644 index 0000000000..fce467079f --- /dev/null +++ b/third_party/rust/tokio/src/task/local.rs @@ -0,0 +1,584 @@ +//! Runs `!Send` futures on the current thread. +use crate::runtime::task::{self, JoinHandle, Task}; +use crate::sync::AtomicWaker; +use crate::util::linked_list::LinkedList; + +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Poll; + +use pin_project_lite::pin_project; + +cfg_rt_util! { + /// A set of tasks which are executed on the same thread. + /// + /// In some cases, it is necessary to run one or more futures that do not + /// implement [`Send`] and thus are unsafe to send between threads. In these + /// cases, a [local task set] may be used to schedule one or more `!Send` + /// futures to run together on the same thread. + /// + /// For example, the following code will not compile: + /// + /// ```rust,compile_fail + /// use std::rc::Rc; + /// + /// #[tokio::main] + /// async fn main() { + /// // `Rc` does not implement `Send`, and thus may not be sent between + /// // threads safely. + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let unsend_data = unsend_data.clone(); + /// // Because the `async` block here moves `unsend_data`, the future is `!Send`. + /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this + /// // will not compile. + /// tokio::spawn(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// } + /// ``` + /// In order to spawn `!Send` futures, we can use a local task set to + /// schedule them on the thread calling [`Runtime::block_on`]. When running + /// inside of the local task set, we can use [`task::spawn_local`], which can + /// spawn `!Send` futures. For example: + /// + /// ```rust + /// use std::rc::Rc; + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// // Construct a local task set that can run `!Send` futures. + /// let local = task::LocalSet::new(); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// let unsend_data = unsend_data.clone(); + /// // `spawn_local` ensures that the future is spawned on the local + /// // task set. + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }).await; + /// } + /// ``` + /// + /// ## Awaiting a `LocalSet` + /// + /// Additionally, a `LocalSet` itself implements `Future`, completing when + /// *all* tasks spawned on the `LocalSet` complete. This can be used to run + /// several futures on a `LocalSet` and drive the whole set until they + /// complete. For example, + /// + /// ```rust + /// use tokio::{task, time}; + /// use std::rc::Rc; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("world"); + /// let local = task::LocalSet::new(); + /// + /// let unsend_data2 = unsend_data.clone(); + /// local.spawn_local(async move { + /// // ... + /// println!("hello {}", unsend_data2) + /// }); + /// + /// local.spawn_local(async move { + /// time::delay_for(time::Duration::from_millis(100)).await; + /// println!("goodbye {}", unsend_data) + /// }); + /// + /// // ... + /// + /// local.await; + /// } + /// ``` + /// + /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html + /// [local task set]: struct@LocalSet + /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on + /// [`task::spawn_local`]: fn@spawn_local + pub struct LocalSet { + /// Current scheduler tick + tick: Cell<u8>, + + /// State available from thread-local + context: Context, + + /// This type should not be Send. + _not_send: PhantomData<*const ()>, + } +} + +/// State available from the thread-local +struct Context { + /// Owned task set and local run queue + tasks: RefCell<Tasks>, + + /// State shared between threads. + shared: Arc<Shared>, +} + +struct Tasks { + /// Collection of all active tasks spawned onto this executor. + owned: LinkedList<Task<Arc<Shared>>>, + + /// Local run queue sender and receiver. + queue: VecDeque<task::Notified<Arc<Shared>>>, +} + +/// LocalSet state shared between threads. +struct Shared { + /// Remote run queue sender + queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, + + /// Wake the `LocalSet` task + waker: AtomicWaker, +} + +pin_project! { + #[derive(Debug)] + struct RunUntil<'a, F> { + local_set: &'a LocalSet, + #[pin] + future: F, + } +} + +scoped_thread_local!(static CURRENT: Context); + +cfg_rt_util! { + /// Spawns a `!Send` future on the local task set. + /// + /// The spawned future will be run on the same thread that called `spawn_local.` + /// This may only be called from the context of a local task set. + /// + /// # Panics + /// + /// - This function panics if called outside of a local task set. + /// + /// # Examples + /// + /// ```rust + /// use std::rc::Rc; + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let local = task::LocalSet::new(); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// let unsend_data = unsend_data.clone(); + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }).await; + /// } + /// ``` + pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx + .expect("`spawn_local` called from outside of a `task::LocalSet`"); + + // Safety: Tasks are only polled and dropped from the thread that + // spawns them. + let (task, handle) = unsafe { task::joinable_local(future) }; + cx.tasks.borrow_mut().queue.push_back(task); + handle + }) + } +} + +/// Initial queue capacity +const INITIAL_CAPACITY: usize = 64; + +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often it check the remote queue first +const REMOTE_FIRST_INTERVAL: u8 = 31; + +impl LocalSet { + /// Returns a new local task set. + pub fn new() -> LocalSet { + LocalSet { + tick: Cell::new(0), + context: Context { + tasks: RefCell::new(Tasks { + owned: LinkedList::new(), + queue: VecDeque::with_capacity(INITIAL_CAPACITY), + }), + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + waker: AtomicWaker::new(), + }), + }, + _not_send: PhantomData, + } + } + + /// Spawns a `!Send` task onto the local task set. + /// + /// This task is guaranteed to be run on the current thread. + /// + /// Unlike the free function [`spawn_local`], this method may be used to + /// spawn local tasks when the task set is _not_ running. For example: + /// ```rust + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let local = task::LocalSet::new(); + /// + /// // Spawn a future on the local set. This future will be run when + /// // we call `run_until` to drive the task set. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// // ... + /// }).await; + /// + /// // When `run` finishes, we can spawn _more_ futures, which will + /// // run in subsequent calls to `run_until`. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// local.run_until(async move { + /// // ... + /// }).await; + /// } + /// ``` + /// [`spawn_local`]: fn@spawn_local + pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + let (task, handle) = unsafe { task::joinable_local(future) }; + self.context.tasks.borrow_mut().queue.push_back(task); + handle + } + + /// Runs a future to completion on the provided runtime, driving any local + /// futures spawned on this task set on the current thread. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. The future + /// may also call [`spawn_local`] to spawn_local additional local futures on the + /// current thread. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + /// + /// # Notes + /// + /// Since this function internally calls [`Runtime::block_on`], and drives + /// futures in the local task set inside that call to `block_on`, the local + /// futures may not use [in-place blocking]. If a blocking call needs to be + /// issued from a local task, the [`spawn_blocking`] API may be used instead. + /// + /// For example, this will panic: + /// ```should_panic + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&mut rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::block_in_place(|| { + /// // ... + /// }); + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// This, however, will not panic: + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&mut rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::spawn_blocking(|| { + /// // ... + /// }).await; + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// + /// [`spawn_local`]: fn@spawn_local + /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on + /// [in-place blocking]: ../blocking/fn.in_place.html + /// [`spawn_blocking`]: ../blocking/fn.spawn_blocking.html + pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output + where + F: Future, + { + rt.block_on(self.run_until(future)) + } + + /// Run a future to completion on the local set, returning its output. + /// + /// This returns a future that runs the given future with a local set, + /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. + /// Any local futures spawned on the local set will be driven in the + /// background until the future passed to `run_until` completes. When the future + /// passed to `run` finishes, any local futures which have not completed + /// will remain on the local set, and will be driven on subsequent calls to + /// `run_until` or when [awaiting the local set] itself. + /// + /// # Examples + /// + /// ```rust + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// task::LocalSet::new().run_until(async { + /// task::spawn_local(async move { + /// // ... + /// }).await.unwrap(); + /// // ... + /// }).await; + /// } + /// ``` + /// + /// [`spawn_local`]: fn@spawn_local + /// [awaiting the local set]: #awaiting-a-localset + pub async fn run_until<F>(&self, future: F) -> F::Output + where + F: Future, + { + let run_until = RunUntil { + future, + local_set: self, + }; + run_until.await + } + + /// Tick the scheduler, returning whether the local future needs to be + /// notified again. + fn tick(&self) -> bool { + for _ in 0..MAX_TASKS_PER_TICK { + match self.next_task() { + // Run the task + // + // Safety: As spawned tasks are `!Send`, `run_unchecked` must be + // used. We are responsible for maintaining the invariant that + // `run_unchecked` is only called on threads that spawned the + // task initially. Because `LocalSet` itself is `!Send`, and + // `spawn_local` spawns into the `LocalSet` on the current + // thread, the invariant is maintained. + Some(task) => crate::coop::budget(|| task.run()), + // We have fully drained the queue of notified tasks, so the + // local future doesn't need to be notified again — it can wait + // until something else wakes a task in the local set. + None => return false, + } + } + + true + } + + fn next_task(&self) -> Option<task::Notified<Arc<Shared>>> { + let tick = self.tick.get(); + self.tick.set(tick.wrapping_add(1)); + + if tick % REMOTE_FIRST_INTERVAL == 0 { + self.context + .shared + .queue + .lock() + .unwrap() + .pop_front() + .or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) + } else { + self.context + .tasks + .borrow_mut() + .queue + .pop_front() + .or_else(|| self.context.shared.queue.lock().unwrap().pop_front()) + } + } + + fn with<T>(&self, f: impl FnOnce() -> T) -> T { + CURRENT.set(&self.context, f) + } +} + +impl fmt::Debug for LocalSet { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LocalSet").finish() + } +} + +impl Future for LocalSet { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + // Register the waker before starting to work + self.context.shared.waker.register_by_ref(cx.waker()); + + if self.with(|| self.tick()) { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + Poll::Pending + } else if self.context.tasks.borrow().owned.is_empty() { + // If the scheduler has no remaining futures, we're done! + Poll::Ready(()) + } else { + // There are still futures in the local set, but we've polled all the + // futures in the run queue. Therefore, we can just return Pending + // since the remaining futures will be woken from somewhere else. + Poll::Pending + } + } +} + +impl Default for LocalSet { + fn default() -> LocalSet { + LocalSet::new() + } +} + +impl Drop for LocalSet { + fn drop(&mut self) { + self.with(|| { + // Loop required here to ensure borrow is dropped between iterations + #[allow(clippy::while_let_loop)] + loop { + let task = match self.context.tasks.borrow_mut().owned.pop_back() { + Some(task) => task, + None => break, + }; + + // Safety: same as `run_unchecked`. + task.shutdown(); + } + + for task in self.context.tasks.borrow_mut().queue.drain(..) { + task.shutdown(); + } + + for task in self.context.shared.queue.lock().unwrap().drain(..) { + task.shutdown(); + } + + assert!(self.context.tasks.borrow().owned.is_empty()); + }); + } +} + +// === impl LocalFuture === + +impl<T: Future> Future for RunUntil<'_, T> { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + me.local_set.with(|| { + me.local_set + .context + .shared + .waker + .register_by_ref(cx.waker()); + + if let Poll::Ready(output) = me.future.poll(cx) { + return Poll::Ready(output); + } + + if me.local_set.tick() { + // If `tick` returns `true`, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + + Poll::Pending + }) + } +} + +impl Shared { + /// Schedule the provided task on the scheduler. + fn schedule(&self, task: task::Notified<Arc<Self>>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if cx.shared.ptr_eq(self) => { + cx.tasks.borrow_mut().queue.push_back(task); + } + _ => { + self.queue.lock().unwrap().push_back(task); + self.waker.wake(); + } + }); + } + + fn ptr_eq(&self, other: &Shared) -> bool { + self as *const _ == other as *const _ + } +} + +impl task::Schedule for Arc<Shared> { + fn bind(task: Task<Self>) -> Arc<Shared> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.clone() + }) + } + + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + use std::ptr::NonNull; + + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + + assert!(cx.shared.ptr_eq(self)); + + let ptr = NonNull::from(task.header()); + // safety: task must be contained by list. It is inserted into the + // list in `bind`. + unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } + }) + } + + fn schedule(&self, task: task::Notified<Self>) { + Shared::schedule(self, task); + } +} diff --git a/third_party/rust/tokio/src/task/mod.rs b/third_party/rust/tokio/src/task/mod.rs new file mode 100644 index 0000000000..5c89393a5e --- /dev/null +++ b/third_party/rust/tokio/src/task/mod.rs @@ -0,0 +1,242 @@ +//! Asynchronous green-threads. +//! +//! ## What are Tasks? +//! +//! A _task_ is a light weight, non-blocking unit of execution. A task is similar +//! to an OS thread, but rather than being managed by the OS scheduler, they are +//! managed by the [Tokio runtime][rt]. Another name for this general pattern is +//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's +//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as +//! something similar. +//! +//! Key points about tasks include: +//! +//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio +//! runtime rather than the operating system, creating new tasks or switching +//! between tasks does not require a context switch and has fairly low +//! overhead. Creating, running, and destroying large numbers of tasks is +//! quite cheap, especially compared to OS threads. +//! +//! * Tasks are scheduled **cooperatively**. Most operating systems implement +//! _preemptive multitasking_. This is a scheduling technique where the +//! operating system allows each thread to run for a period of time, and then +//! _preempts_ it, temporarily pausing that thread and switching to another. +//! Tasks, on the other hand, implement _cooperative multitasking_. In +//! cooperative multitasking, a task is allowed to run until it _yields_, +//! indicating to the Tokio runtime's scheduler that it cannot currently +//! continue executing. When a task yields, the Tokio runtime switches to +//! executing the next task. +//! +//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or +//! must synchronize with another thread, it _blocks_, allowing the OS to +//! schedule another thread. When a task cannot continue executing, it must +//! yield instead, allowing the Tokio runtime to schedule another task. Tasks +//! should generally not perform system calls or other operations that could +//! block a thread, as this would prevent other tasks running on the same +//! thread from executing as well. Instead, this module provides APIs for +//! running blocking operations in an asynchronous context. +//! +//! [rt]: crate::runtime +//! [green threads]: https://en.wikipedia.org/wiki/Green_threads +//! [Go's goroutines]: https://tour.golang.org/concurrency/1 +//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html +//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes +//! +//! ## Working with Tasks +//! +//! This module provides the following APIs for working with tasks: +//! +//! ### Spawning +//! +//! Perhaps the most important function in this module is [`task::spawn`]. This +//! function can be thought of as an async equivalent to the standard library's +//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other +//! [future], and creates a new task to run that work concurrently: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn doc() { +//! task::spawn(async { +//! // perform some work here... +//! }); +//! # } +//! ``` +//! +//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct. +//! A `JoinHandle` is itself a future which may be used to await the output of +//! the spawned task. For example: +//! +//! ``` +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let join = task::spawn(async { +//! // ... +//! "hello world!" +//! }); +//! +//! // ... +//! +//! // Await the result of the spawned task. +//! let result = join.await?; +//! assert_eq!(result, "hello world!"); +//! # Ok(()) +//! # } +//! ``` +//! +//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned +//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]`. For +//! example: +//! +//! ``` +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() { +//! let join = task::spawn(async { +//! panic!("something bad happened!") +//! }); +//! +//! // The returned result indicates that the task failed. +//! assert!(join.await.is_err()); +//! # } +//! ``` +//! +//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt-core" +//! feature flag is enabled. +//! +//! [`task::spawn`]: crate::task::spawn() +//! [future]: std::future::Future +//! [`std::thread::spawn`]: std::thread::spawn +//! [`JoinHandle`]: crate::task::JoinHandle +//! [thread_join]: std::thread::JoinHandle +//! [`JoinError`]: crate::task::JoinError +//! +//! ### Blocking and Yielding +//! +//! As we discussed above, code running in asynchronous tasks should not perform +//! operations that can block. A blocking operation performed in a task running +//! on a thread that is also running other tasks would block the entire thread, +//! preventing other tasks from running. +//! +//! Instead, Tokio provides two APIs for running blocking operations in an +//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`]. +//! +//! #### spawn_blocking +//! +//! The `task::spawn_blocking` function is similar to the `task::spawn` function +//! discussed in the previous section, but rather than spawning an +//! _non-blocking_ future on the Tokio runtime, it instead spawns a +//! _blocking_ function on a dedicated thread pool for blocking tasks. For +//! example: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn docs() { +//! task::spawn_blocking(|| { +//! // do some compute-heavy work or call synchronous code +//! }); +//! # } +//! ``` +//! +//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle` +//! which we can use to await the result of the blocking operation: +//! +//! ```rust +//! # use tokio::task; +//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ +//! let join = task::spawn_blocking(|| { +//! // do some compute-heavy work or call synchronous code +//! "blocking completed" +//! }); +//! +//! let result = join.await?; +//! assert_eq!(result, "blocking completed"); +//! # Ok(()) +//! # } +//! ``` +//! +//! #### block_in_place +//! +//! When using the [threaded runtime][rt-threaded], the [`task::block_in_place`] +//! function is also available. Like `task::spawn_blocking`, this function +//! allows running a blocking operation from an asynchronous context. Unlike +//! `spawn_blocking`, however, `block_in_place` works by transitioning the +//! _current_ worker thread to a blocking thread, moving other tasks running on +//! that thread to another worker thread. This can improve performance by avoiding +//! context switches. +//! +//! For example: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn docs() { +//! let result = task::block_in_place(|| { +//! // do some compute-heavy work or call synchronous code +//! "blocking completed" +//! }); +//! +//! assert_eq!(result, "blocking completed"); +//! # } +//! ``` +//! +//! #### yield_now +//! +//! In addition, this module provides a [`task::yield_now`] async function +//! that is analogous to the standard library's [`thread::yield_now`]. Calling +//! and `await`ing this function will cause the current task to yield to the +//! Tokio runtime's scheduler, allowing other tasks to be +//! scheduled. Eventually, the yielding task will be polled again, allowing it +//! to execute. For example: +//! +//! ```rust +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() { +//! async { +//! task::spawn(async { +//! // ... +//! println!("spawned task done!") +//! }); +//! +//! // Yield, allowing the newly-spawned task to execute first. +//! task::yield_now().await; +//! println!("main task done!"); +//! } +//! # .await; +//! # } +//! ``` +//! +//! [`task::spawn_blocking`]: crate::task::spawn_blocking +//! [`task::block_in_place`]: crate::task::block_in_place +//! [rt-threaded]: ../runtime/index.html#threaded-scheduler +//! [`task::yield_now`]: crate::task::yield_now() +//! [`thread::yield_now`]: std::thread::yield_now +cfg_blocking! { + mod blocking; + pub use blocking::spawn_blocking; + + cfg_rt_threaded! { + pub use blocking::block_in_place; + } +} + +cfg_rt_core! { + pub use crate::runtime::task::{JoinError, JoinHandle}; + + mod spawn; + pub use spawn::spawn; + + mod yield_now; + pub use yield_now::yield_now; +} + +cfg_rt_util! { + mod local; + pub use local::{spawn_local, LocalSet}; + + mod task_local; + pub use task_local::LocalKey; +} diff --git a/third_party/rust/tokio/src/task/spawn.rs b/third_party/rust/tokio/src/task/spawn.rs new file mode 100644 index 0000000000..fa5ff13b01 --- /dev/null +++ b/third_party/rust/tokio/src/task/spawn.rs @@ -0,0 +1,134 @@ +use crate::runtime; +use crate::task::JoinHandle; + +use std::future::Future; + +doc_rt_core! { + /// Spawns a new asynchronous task, returning a + /// [`JoinHandle`](super::JoinHandle) for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. The + /// spawned task may execute on the current thread, or it may be sent to a + /// different thread to be executed. The specifics depend on the current + /// [`Runtime`](crate::runtime::Runtime) configuration. + /// + /// There is no guarantee that a spawned task will execute to completion. + /// When a runtime is shutdown, all outstanding tasks are dropped, + /// regardless of the lifecycle of that task. + /// + /// This function must be called from the context of a Tokio runtime. Tasks running on + /// the Tokio runtime are always inside its context, but you can also enter the context + /// using the [`Handle::enter`](crate::runtime::Handle::enter()) method. + /// + /// # Examples + /// + /// In this example, a server is started and `spawn` is used to start a new task + /// that processes each received connection. + /// + /// ```no_run + /// use tokio::net::{TcpListener, TcpStream}; + /// + /// use std::io; + /// + /// async fn process(socket: TcpStream) { + /// // ... + /// # drop(socket); + /// } + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// let (socket, _) = listener.accept().await?; + /// + /// tokio::spawn(async move { + /// // Process each socket concurrently. + /// process(socket).await + /// }); + /// } + /// } + /// ``` + /// + /// # Panics + /// + /// Panics if called from **outside** of the Tokio runtime. + /// + /// # Using `!Send` values from a task + /// + /// The task supplied to `spawn` must implement `Send`. However, it is + /// possible to **use** `!Send` values from the task as long as they only + /// exist between calls to `.await`. + /// + /// For example, this will work: + /// + /// ``` + /// use tokio::task; + /// + /// use std::rc::Rc; + /// + /// fn use_rc(rc: Rc<()>) { + /// // Do stuff w/ rc + /// # drop(rc); + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// tokio::spawn(async { + /// // Force the `Rc` to stay in a scope with no `.await` + /// { + /// let rc = Rc::new(()); + /// use_rc(rc.clone()); + /// } + /// + /// task::yield_now().await; + /// }).await.unwrap(); + /// } + /// ``` + /// + /// This will **not** work: + /// + /// ```compile_fail + /// use tokio::task; + /// + /// use std::rc::Rc; + /// + /// fn use_rc(rc: Rc<()>) { + /// // Do stuff w/ rc + /// # drop(rc); + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// tokio::spawn(async { + /// let rc = Rc::new(()); + /// + /// task::yield_now().await; + /// + /// use_rc(rc.clone()); + /// }).await.unwrap(); + /// } + /// ``` + /// + /// Holding on to a `!Send` value across calls to `.await` will result in + /// an unfriendly compile error message similar to: + /// + /// ```text + /// `[... some type ...]` cannot be sent between threads safely + /// ``` + /// + /// or: + /// + /// ```text + /// error[E0391]: cycle detected when processing `main` + /// ``` + pub fn spawn<T>(task: T) -> JoinHandle<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let spawn_handle = runtime::context::spawn_handle() + .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); + spawn_handle.spawn(task) + } +} diff --git a/third_party/rust/tokio/src/task/task_local.rs b/third_party/rust/tokio/src/task/task_local.rs new file mode 100644 index 0000000000..f3341b6a7e --- /dev/null +++ b/third_party/rust/tokio/src/task/task_local.rs @@ -0,0 +1,240 @@ +use pin_project_lite::pin_project; +use std::cell::RefCell; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, thread}; + +/// Declares a new task-local key of type [`tokio::task::LocalKey`]. +/// +/// # Syntax +/// +/// The macro wraps any number of static declarations and makes them local to the current task. +/// Publicity and attributes for each static is preserved. For example: +/// +/// # Examples +/// +/// ``` +/// # use tokio::task_local; +/// task_local! { +/// pub static ONE: u32; +/// +/// #[allow(unused)] +/// static TWO: f32; +/// } +/// # fn main() {} +/// ``` +/// +/// See [LocalKey documentation][`tokio::task::LocalKey`] for more +/// information. +/// +/// [`tokio::task::LocalKey`]: ../tokio/task/struct.LocalKey.html +#[macro_export] +macro_rules! task_local { + // empty (base case for the recursion) + () => {}; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + $crate::task_local!($($rest)*); + }; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __task_local_inner { + ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => { + static $name: $crate::task::LocalKey<$t> = { + std::thread_local! { + static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None); + } + + $crate::task::LocalKey { inner: __KEY } + }; + }; +} + +/// A key for task-local data. +/// +/// This type is generated by the `task_local!` macro. +/// +/// Unlike [`std::thread::LocalKey`], `tokio::task::LocalKey` will +/// _not_ lazily initialize the value on first access. Instead, the +/// value is first initialized when the future containing +/// the task-local is first polled by a futures executor, like Tokio. +/// +/// # Examples +/// +/// ``` +/// # async fn dox() { +/// tokio::task_local! { +/// static NUMBER: u32; +/// } +/// +/// NUMBER.scope(1, async move { +/// assert_eq!(NUMBER.get(), 1); +/// }).await; +/// +/// NUMBER.scope(2, async move { +/// assert_eq!(NUMBER.get(), 2); +/// +/// NUMBER.scope(3, async move { +/// assert_eq!(NUMBER.get(), 3); +/// }).await; +/// }).await; +/// # } +/// ``` +/// [`std::thread::LocalKey`]: https://doc.rust-lang.org/std/thread/struct.LocalKey.html +pub struct LocalKey<T: 'static> { + #[doc(hidden)] + pub inner: thread::LocalKey<RefCell<Option<T>>>, +} + +impl<T: 'static> LocalKey<T> { + /// Sets a value `T` as the task-local value for the future `F`. + /// + /// On completion of `scope`, the task-local will be dropped. + /// + /// ### Examples + /// + /// ``` + /// # async fn dox() { + /// tokio::task_local! { + /// static NUMBER: u32; + /// } + /// + /// NUMBER.scope(1, async move { + /// println!("task local value: {}", NUMBER.get()); + /// }).await; + /// # } + /// ``` + pub async fn scope<F>(&'static self, value: T, f: F) -> F::Output + where + F: Future, + { + TaskLocalFuture { + local: &self, + slot: Some(value), + future: f, + } + .await + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// # Panics + /// + /// This function will panic if not called within the context + /// of a future containing a task-local with the corresponding key. + pub fn with<F, R>(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + self.try_with(f).expect( + "cannot access a Task Local Storage value \ + without setting it via `LocalKey::set`", + ) + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// If the task-local with the accociated key is not present, this + /// method will return an `AccessError`. For a panicking variant, + /// see `with`. + pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError> + where + F: FnOnce(&T) -> R, + { + self.inner.with(|v| { + if let Some(val) = v.borrow().as_ref() { + Ok(f(val)) + } else { + Err(AccessError { _private: () }) + } + }) + } +} + +impl<T: Copy + 'static> LocalKey<T> { + /// Returns a copy of the task-local value + /// if the task-local value implements `Copy`. + pub fn get(&'static self) -> T { + self.with(|v| *v) + } +} + +impl<T: 'static> fmt::Debug for LocalKey<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("LocalKey { .. }") + } +} + +pin_project! { + struct TaskLocalFuture<T: StaticLifetime, F> { + local: &'static LocalKey<T>, + slot: Option<T>, + #[pin] + future: F, + } +} + +impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + struct Guard<'a, T: 'static> { + local: &'static LocalKey<T>, + slot: &'a mut Option<T>, + prev: Option<T>, + } + + impl<T> Drop for Guard<'_, T> { + fn drop(&mut self) { + let value = self.local.inner.with(|c| c.replace(self.prev.take())); + *self.slot = value; + } + } + + let mut project = self.project(); + let val = project.slot.take(); + + let prev = project.local.inner.with(|c| c.replace(val)); + + let _guard = Guard { + prev, + slot: &mut project.slot, + local: *project.local, + }; + + project.future.poll(cx) + } +} + +// Required to make `pin_project` happy. +trait StaticLifetime: 'static {} +impl<T: 'static> StaticLifetime for T {} + +/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with). +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct AccessError { + _private: (), +} + +impl fmt::Debug for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AccessError").finish() + } +} + +impl fmt::Display for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("task-local value not set", f) + } +} + +impl Error for AccessError {} diff --git a/third_party/rust/tokio/src/task/yield_now.rs b/third_party/rust/tokio/src/task/yield_now.rs new file mode 100644 index 0000000000..e0e20841c9 --- /dev/null +++ b/third_party/rust/tokio/src/task/yield_now.rs @@ -0,0 +1,38 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +doc_rt_core! { + /// Yields execution back to the Tokio runtime. + /// + /// A task yields by awaiting on `yield_now()`, and may resume when that + /// future completes (with no output.) The current task will be re-added as + /// a pending task at the _back_ of the pending queue. Any other pending + /// tasks will be scheduled. No other waking is required for the task to + /// continue. + /// + /// See also the usage example in the [task module](index.html#yield_now). + #[must_use = "yield_now does nothing unless polled/`await`-ed"] + pub async fn yield_now() { + /// Yield implementation + struct YieldNow { + yielded: bool, + } + + impl Future for YieldNow { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.yielded { + return Poll::Ready(()); + } + + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + YieldNow { yielded: false }.await + } +} |