diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/task | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/task')
-rw-r--r-- | third_party/rust/tokio/src/task/blocking.rs | 199 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/builder.rs | 115 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/join_set.rs | 248 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/local.rs | 698 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/mod.rs | 317 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/spawn.rs | 149 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/task_local.rs | 302 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/unconstrained.rs | 45 | ||||
-rw-r--r-- | third_party/rust/tokio/src/task/yield_now.rs | 58 |
9 files changed, 2131 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/task/blocking.rs b/third_party/rust/tokio/src/task/blocking.rs new file mode 100644 index 0000000000..5fe358f3e5 --- /dev/null +++ b/third_party/rust/tokio/src/task/blocking.rs @@ -0,0 +1,199 @@ +use crate::task::JoinHandle; + +cfg_rt_multi_thread! { + /// Runs the provided blocking function on the current thread without + /// blocking the executor. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is problematic, as it may prevent the executor + /// from driving other tasks forward. Calling this function informs the + /// executor that the currently executing task is about to block the thread, + /// so the executor is able to hand off any other tasks it has to a new + /// worker thread before that happens. See the [CPU-bound tasks and blocking + /// code][blocking] section for more information. + /// + /// Be aware that although this function avoids starving other independently + /// spawned tasks, any other code running concurrently in the same task will + /// be suspended during the call to `block_in_place`. This can happen e.g. + /// when using the [`join!`] macro. To avoid this issue, use + /// [`spawn_blocking`] instead of `block_in_place`. + /// + /// Note that this function cannot be used within a [`current_thread`] runtime + /// because in this case there are no other worker threads to hand off tasks + /// to. On the other hand, calling the function outside a runtime is + /// allowed. In this case, `block_in_place` just calls the provided closure + /// normally. + /// + /// Code running behind `block_in_place` cannot be cancelled. When you shut + /// down the executor, it will wait indefinitely for all blocking operations + /// to finish. You can use [`shutdown_timeout`] to stop waiting for them + /// after a certain timeout. Be aware that this will still not cancel the + /// tasks — they are simply allowed to keep running after the method + /// returns. + /// + /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code + /// [`spawn_blocking`]: fn@crate::task::spawn_blocking + /// [`join!`]: macro@join + /// [`thread::spawn`]: fn@std::thread::spawn + /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// + /// # Examples + /// + /// ``` + /// use tokio::task; + /// + /// # async fn docs() { + /// task::block_in_place(move || { + /// // do some compute-heavy work or call synchronous code + /// }); + /// # } + /// ``` + /// + /// Code running inside `block_in_place` may use `block_on` to reenter the + /// async context. + /// + /// ``` + /// use tokio::task; + /// use tokio::runtime::Handle; + /// + /// # async fn docs() { + /// task::block_in_place(move || { + /// Handle::current().block_on(async move { + /// // do something async + /// }); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if called from a [`current_thread`] runtime. + /// + /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread + pub fn block_in_place<F, R>(f: F) -> R + where + F: FnOnce() -> R, + { + crate::runtime::thread_pool::block_in_place(f) + } +} + +cfg_rt! { + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is problematic, as it may prevent the executor from + /// driving other futures forward. This function runs the provided closure on a + /// thread dedicated to blocking operations. See the [CPU-bound tasks and + /// blocking code][blocking] section for more information. + /// + /// Tokio will spawn more blocking threads when they are requested through this + /// function until the upper limit configured on the [`Builder`] is reached. + /// After reaching the upper limit, the tasks are put in a queue. + /// The thread limit is very large by default, because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you + /// should keep this large upper limit in mind. When running many CPU-bound + /// computations, a semaphore or some other synchronization primitive should be + /// used to limit the number of computation executed in parallel. Specialized + /// CPU-bound executors, such as [rayon], may also be a good fit. + /// + /// This function is intended for non-async operations that eventually finish on + /// their own. If you want to spawn an ordinary thread, you should use + /// [`thread::spawn`] instead. + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut + /// down the executor, it will wait indefinitely for all blocking operations to + /// finish. You can use [`shutdown_timeout`] to stop waiting for them after a + /// certain timeout. Be aware that this will still not cancel the tasks — they + /// are simply allowed to keep running after the method returns. + /// + /// Note that if you are using the single threaded runtime, this function will + /// still spawn additional threads for blocking operations. The basic + /// scheduler's single thread is only used for asynchronous code. + /// + /// # Related APIs and patterns for bridging asynchronous and blocking code + /// + /// In simple cases, it is sufficient to have the closure accept input + /// parameters at creation time and return a single value (or struct/tuple, etc.). + /// + /// For more complex situations in which it is desirable to stream data to or from + /// the synchronous context, the [`mpsc channel`] has `blocking_send` and + /// `blocking_recv` methods for use in non-async code such as the thread created + /// by `spawn_blocking`. + /// + /// Another option is [`SyncIoBridge`] for cases where the synchronous context + /// is operating on byte streams. For example, you might use an asynchronous + /// HTTP client such as [hyper] to fetch data, but perform complex parsing + /// of the payload body using a library written for synchronous I/O. + /// + /// Finally, see also [Bridging with sync code][bridgesync] for discussions + /// around the opposite case of using Tokio as part of a larger synchronous + /// codebase. + /// + /// [`Builder`]: struct@crate::runtime::Builder + /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code + /// [rayon]: https://docs.rs/rayon + /// [`mpsc channel`]: crate::sync::mpsc + /// [`SyncIoBridge`]: https://docs.rs/tokio-util/0.6/tokio_util/io/struct.SyncIoBridge.html + /// [hyper]: https://docs.rs/hyper + /// [`thread::spawn`]: fn@std::thread::spawn + /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [bridgesync]: https://tokio.rs/tokio/topics/bridging + /// + /// # Examples + /// + /// Pass an input value and receive result of computation: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ + /// // Initial input + /// let mut v = "Hello, ".to_string(); + /// let res = task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// v.push_str("world"); + /// // Pass ownership of the value back to the asynchronous context + /// v + /// }).await?; + /// + /// // `res` is the value returned from the thread + /// assert_eq!(res.as_str(), "Hello, world"); + /// # Ok(()) + /// # } + /// ``` + /// + /// Use a channel: + /// + /// ``` + /// use tokio::task; + /// use tokio::sync::mpsc; + /// + /// # async fn docs() { + /// let (tx, mut rx) = mpsc::channel(2); + /// let start = 5; + /// let worker = task::spawn_blocking(move || { + /// for x in 0..10 { + /// // Stand in for complex computation + /// tx.blocking_send(start + x).unwrap(); + /// } + /// }); + /// + /// let mut acc = 0; + /// while let Some(v) = rx.recv().await { + /// acc += v; + /// } + /// assert_eq!(acc, 95); + /// worker.await.unwrap(); + /// # } + /// ``` + #[track_caller] + pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + crate::runtime::spawn_blocking(f) + } +} diff --git a/third_party/rust/tokio/src/task/builder.rs b/third_party/rust/tokio/src/task/builder.rs new file mode 100644 index 0000000000..2086302fb9 --- /dev/null +++ b/third_party/rust/tokio/src/task/builder.rs @@ -0,0 +1,115 @@ +#![allow(unreachable_pub)] +use crate::{runtime::context, task::JoinHandle}; +use std::future::Future; + +/// Factory which is used to configure the properties of a new task. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// Methods can be chained in order to configure it. +/// +/// Currently, there is only one configuration option: +/// +/// - [`name`], which specifies an associated name for +/// the task +/// +/// There are three types of task that can be spawned from a Builder: +/// - [`spawn_local`] for executing futures on the current thread +/// - [`spawn`] for executing [`Send`] futures on the runtime +/// - [`spawn_blocking`] for executing blocking code in the +/// blocking thread pool. +/// +/// ## Example +/// +/// ```no_run +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use std::io; +/// +/// async fn process(socket: TcpStream) { +/// // ... +/// # drop(socket); +/// } +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// let (socket, _) = listener.accept().await?; +/// +/// tokio::task::Builder::new() +/// .name("tcp connection handler") +/// .spawn(async move { +/// // Process each socket concurrently. +/// process(socket).await +/// }); +/// } +/// } +/// ``` +/// [unstable]: crate#unstable-features +/// [`name`]: Builder::name +/// [`spawn_local`]: Builder::spawn_local +/// [`spawn`]: Builder::spawn +/// [`spawn_blocking`]: Builder::spawn_blocking +#[derive(Default, Debug)] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +pub struct Builder<'a> { + name: Option<&'a str>, +} + +impl<'a> Builder<'a> { + /// Creates a new task builder. + pub fn new() -> Self { + Self::default() + } + + /// Assigns a name to the task which will be spawned. + pub fn name(&self, name: &'a str) -> Self { + Self { name: Some(name) } + } + + /// Spawns a task on the executor. + /// + /// See [`task::spawn`](crate::task::spawn) for + /// more details. + #[track_caller] + pub fn spawn<Fut>(self, future: Fut) -> JoinHandle<Fut::Output> + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + super::spawn::spawn_inner(future, self.name) + } + + /// Spawns a task on the current thread. + /// + /// See [`task::spawn_local`](crate::task::spawn_local) + /// for more details. + #[track_caller] + pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output> + where + Fut: Future + 'static, + Fut::Output: 'static, + { + super::local::spawn_local_inner(future, self.name) + } + + /// Spawns blocking code on the blocking threadpool. + /// + /// See [`task::spawn_blocking`](crate::task::spawn_blocking) + /// for more details. + #[track_caller] + pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output> + where + Function: FnOnce() -> Output + Send + 'static, + Output: Send + 'static, + { + use crate::runtime::Mandatory; + let (join_handle, _was_spawned) = + context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name); + join_handle + } +} diff --git a/third_party/rust/tokio/src/task/join_set.rs b/third_party/rust/tokio/src/task/join_set.rs new file mode 100644 index 0000000000..8e8f74f66d --- /dev/null +++ b/third_party/rust/tokio/src/task/join_set.rs @@ -0,0 +1,248 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::runtime::Handle; +use crate::task::{JoinError, JoinHandle, LocalSet}; +use crate::util::IdleNotifiedSet; + +/// A collection of tasks spawned on a Tokio runtime. +/// +/// A `JoinSet` can be used to await the completion of some or all of the tasks +/// in the set. The set is not ordered, and the tasks will be returned in the +/// order they complete. +/// +/// All of the tasks must have the same return type `T`. +/// +/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// # Examples +/// +/// Spawn multiple tasks and wait for them. +/// +/// ``` +/// use tokio::task::JoinSet; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut set = JoinSet::new(); +/// +/// for i in 0..10 { +/// set.spawn(async move { i }); +/// } +/// +/// let mut seen = [false; 10]; +/// while let Some(res) = set.join_one().await.unwrap() { +/// seen[res] = true; +/// } +/// +/// for i in 0..10 { +/// assert!(seen[i]); +/// } +/// } +/// ``` +/// +/// [unstable]: crate#unstable-features +pub struct JoinSet<T> { + inner: IdleNotifiedSet<JoinHandle<T>>, +} + +impl<T> JoinSet<T> { + /// Create a new `JoinSet`. + pub fn new() -> Self { + Self { + inner: IdleNotifiedSet::new(), + } + } + + /// Returns the number of tasks currently in the `JoinSet`. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns whether the `JoinSet` is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +impl<T: 'static> JoinSet<T> { + /// Spawn the provided task on the `JoinSet`. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + pub fn spawn<F>(&mut self, task: F) + where + F: Future<Output = T>, + F: Send + 'static, + T: Send, + { + self.insert(crate::spawn(task)); + } + + /// Spawn the provided task on the provided runtime and store it in this `JoinSet`. + pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) + where + F: Future<Output = T>, + F: Send + 'static, + T: Send, + { + self.insert(handle.spawn(task)); + } + + /// Spawn the provided task on the current [`LocalSet`] and store it in this `JoinSet`. + /// + /// # Panics + /// + /// This method panics if it is called outside of a `LocalSet`. + /// + /// [`LocalSet`]: crate::task::LocalSet + pub fn spawn_local<F>(&mut self, task: F) + where + F: Future<Output = T>, + F: 'static, + { + self.insert(crate::task::spawn_local(task)); + } + + /// Spawn the provided task on the provided [`LocalSet`] and store it in this `JoinSet`. + /// + /// [`LocalSet`]: crate::task::LocalSet + pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) + where + F: Future<Output = T>, + F: 'static, + { + self.insert(local_set.spawn_local(task)); + } + + fn insert(&mut self, jh: JoinHandle<T>) { + let mut entry = self.inner.insert_idle(jh); + + // Set the waker that is notified when the task completes. + entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker())); + } + + /// Waits until one of the tasks in the set completes and returns its output. + /// + /// Returns `None` if the set is empty. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinSet`. + pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> { + crate::future::poll_fn(|cx| self.poll_join_one(cx)).await + } + + /// Aborts all tasks and waits for them to finish shutting down. + /// + /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in + /// a loop until it returns `Ok(None)`. + /// + /// This method ignores any panics in the tasks shutting down. When this call returns, the + /// `JoinSet` will be empty. + /// + /// [`abort_all`]: fn@Self::abort_all + /// [`join_one`]: fn@Self::join_one + pub async fn shutdown(&mut self) { + self.abort_all(); + while self.join_one().await.transpose().is_some() {} + } + + /// Aborts all tasks on this `JoinSet`. + /// + /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete + /// cancellation, you should call `join_one` in a loop until the `JoinSet` is empty. + pub fn abort_all(&mut self) { + self.inner.for_each(|jh| jh.abort()); + } + + /// Removes all tasks from this `JoinSet` without aborting them. + /// + /// The tasks removed by this call will continue to run in the background even if the `JoinSet` + /// is dropped. + pub fn detach_all(&mut self) { + self.inner.drain(drop); + } + + /// Polls for one of the tasks in the set to complete. + /// + /// If this returns `Poll::Ready(Ok(Some(_)))` or `Poll::Ready(Err(_))`, then the task that + /// completed is removed from the set. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled + /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to + /// `poll_join_one`, only the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// # Returns + /// + /// This function returns: + /// + /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is + /// available right now. + /// * `Poll::Ready(Ok(Some(value)))` if one of the tasks in this `JoinSet` has completed. The + /// `value` is the return value of one of the tasks that completed. + /// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been + /// aborted. + /// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty. + /// + /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. + /// This can happen if the [coop budget] is reached. + /// + /// [coop budget]: crate::task#cooperative-scheduling + fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<T>, JoinError>> { + // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to + // the `notified` list if the waker is notified in the `poll` call below. + let mut entry = match self.inner.pop_notified(cx.waker()) { + Some(entry) => entry, + None => { + if self.is_empty() { + return Poll::Ready(Ok(None)); + } else { + // The waker was set by `pop_notified`. + return Poll::Pending; + } + } + }; + + let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); + + if let Poll::Ready(res) = res { + entry.remove(); + Poll::Ready(Some(res).transpose()) + } else { + // A JoinHandle generally won't emit a wakeup without being ready unless + // the coop limit has been reached. We yield to the executor in this + // case. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl<T> Drop for JoinSet<T> { + fn drop(&mut self) { + self.inner.drain(|join_handle| join_handle.abort()); + } +} + +impl<T> fmt::Debug for JoinSet<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JoinSet").field("len", &self.len()).finish() + } +} + +impl<T> Default for JoinSet<T> { + fn default() -> Self { + Self::new() + } +} diff --git a/third_party/rust/tokio/src/task/local.rs b/third_party/rust/tokio/src/task/local.rs new file mode 100644 index 0000000000..2dbd970604 --- /dev/null +++ b/third_party/rust/tokio/src/task/local.rs @@ -0,0 +1,698 @@ +//! Runs `!Send` futures on the current thread. +use crate::loom::sync::{Arc, Mutex}; +use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; +use crate::sync::AtomicWaker; +use crate::util::VecDequeCell; + +use std::cell::Cell; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Poll; + +use pin_project_lite::pin_project; + +cfg_rt! { + /// A set of tasks which are executed on the same thread. + /// + /// In some cases, it is necessary to run one or more futures that do not + /// implement [`Send`] and thus are unsafe to send between threads. In these + /// cases, a [local task set] may be used to schedule one or more `!Send` + /// futures to run together on the same thread. + /// + /// For example, the following code will not compile: + /// + /// ```rust,compile_fail + /// use std::rc::Rc; + /// + /// #[tokio::main] + /// async fn main() { + /// // `Rc` does not implement `Send`, and thus may not be sent between + /// // threads safely. + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let unsend_data = unsend_data.clone(); + /// // Because the `async` block here moves `unsend_data`, the future is `!Send`. + /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this + /// // will not compile. + /// tokio::spawn(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// } + /// ``` + /// + /// # Use with `run_until` + /// + /// To spawn `!Send` futures, we can use a local task set to schedule them + /// on the thread calling [`Runtime::block_on`]. When running inside of the + /// local task set, we can use [`task::spawn_local`], which can spawn + /// `!Send` futures. For example: + /// + /// ```rust + /// use std::rc::Rc; + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// // Construct a local task set that can run `!Send` futures. + /// let local = task::LocalSet::new(); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// let unsend_data = unsend_data.clone(); + /// // `spawn_local` ensures that the future is spawned on the local + /// // task set. + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }).await; + /// } + /// ``` + /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, + /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It + /// cannot be used inside a task spawned with `tokio::spawn`. + /// + /// ## Awaiting a `LocalSet` + /// + /// Additionally, a `LocalSet` itself implements `Future`, completing when + /// *all* tasks spawned on the `LocalSet` complete. This can be used to run + /// several futures on a `LocalSet` and drive the whole set until they + /// complete. For example, + /// + /// ```rust + /// use tokio::{task, time}; + /// use std::rc::Rc; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("world"); + /// let local = task::LocalSet::new(); + /// + /// let unsend_data2 = unsend_data.clone(); + /// local.spawn_local(async move { + /// // ... + /// println!("hello {}", unsend_data2) + /// }); + /// + /// local.spawn_local(async move { + /// time::sleep(time::Duration::from_millis(100)).await; + /// println!("goodbye {}", unsend_data) + /// }); + /// + /// // ... + /// + /// local.await; + /// } + /// ``` + /// **Note:** Awaiting a `LocalSet` can only be done inside + /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to + /// [`Runtime::block_on`]. It cannot be used inside a task spawned with + /// `tokio::spawn`. + /// + /// ## Use inside `tokio::spawn` + /// + /// The two methods mentioned above cannot be used inside `tokio::spawn`, so + /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do + /// something else. The solution is to create the `LocalSet` somewhere else, + /// and communicate with it using an [`mpsc`] channel. + /// + /// The following example puts the `LocalSet` inside a new thread. + /// ``` + /// use tokio::runtime::Builder; + /// use tokio::sync::{mpsc, oneshot}; + /// use tokio::task::LocalSet; + /// + /// // This struct describes the task you want to spawn. Here we include + /// // some simple examples. The oneshot channel allows sending a response + /// // to the spawner. + /// #[derive(Debug)] + /// enum Task { + /// PrintNumber(u32), + /// AddOne(u32, oneshot::Sender<u32>), + /// } + /// + /// #[derive(Clone)] + /// struct LocalSpawner { + /// send: mpsc::UnboundedSender<Task>, + /// } + /// + /// impl LocalSpawner { + /// pub fn new() -> Self { + /// let (send, mut recv) = mpsc::unbounded_channel(); + /// + /// let rt = Builder::new_current_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// std::thread::spawn(move || { + /// let local = LocalSet::new(); + /// + /// local.spawn_local(async move { + /// while let Some(new_task) = recv.recv().await { + /// tokio::task::spawn_local(run_task(new_task)); + /// } + /// // If the while loop returns, then all the LocalSpawner + /// // objects have have been dropped. + /// }); + /// + /// // This will return once all senders are dropped and all + /// // spawned tasks have returned. + /// rt.block_on(local); + /// }); + /// + /// Self { + /// send, + /// } + /// } + /// + /// pub fn spawn(&self, task: Task) { + /// self.send.send(task).expect("Thread with LocalSet has shut down."); + /// } + /// } + /// + /// // This task may do !Send stuff. We use printing a number as an example, + /// // but it could be anything. + /// // + /// // The Task struct is an enum to support spawning many different kinds + /// // of operations. + /// async fn run_task(task: Task) { + /// match task { + /// Task::PrintNumber(n) => { + /// println!("{}", n); + /// }, + /// Task::AddOne(n, response) => { + /// // We ignore failures to send the response. + /// let _ = response.send(n + 1); + /// }, + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let spawner = LocalSpawner::new(); + /// + /// let (send, response) = oneshot::channel(); + /// spawner.spawn(Task::AddOne(10, send)); + /// let eleven = response.await.unwrap(); + /// assert_eq!(eleven, 11); + /// } + /// ``` + /// + /// [`Send`]: trait@std::marker::Send + /// [local task set]: struct@LocalSet + /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on + /// [`task::spawn_local`]: fn@spawn_local + /// [`mpsc`]: mod@crate::sync::mpsc + pub struct LocalSet { + /// Current scheduler tick. + tick: Cell<u8>, + + /// State available from thread-local. + context: Context, + + /// This type should not be Send. + _not_send: PhantomData<*const ()>, + } +} + +/// State available from the thread-local. +struct Context { + /// Collection of all active tasks spawned onto this executor. + owned: LocalOwnedTasks<Arc<Shared>>, + + /// Local run queue sender and receiver. + queue: VecDequeCell<task::Notified<Arc<Shared>>>, + + /// State shared between threads. + shared: Arc<Shared>, +} + +/// LocalSet state shared between threads. +struct Shared { + /// Remote run queue sender. + queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>, + + /// Wake the `LocalSet` task. + waker: AtomicWaker, +} + +pin_project! { + #[derive(Debug)] + struct RunUntil<'a, F> { + local_set: &'a LocalSet, + #[pin] + future: F, + } +} + +scoped_thread_local!(static CURRENT: Context); + +cfg_rt! { + /// Spawns a `!Send` future on the local task set. + /// + /// The spawned future will be run on the same thread that called `spawn_local.` + /// This may only be called from the context of a local task set. + /// + /// # Panics + /// + /// - This function panics if called outside of a local task set. + /// + /// # Examples + /// + /// ```rust + /// use std::rc::Rc; + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let local = task::LocalSet::new(); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// let unsend_data = unsend_data.clone(); + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }).await; + /// } + /// ``` + #[track_caller] + pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + spawn_local_inner(future, None) + } + + + #[track_caller] + pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output> + where F: Future + 'static, + F::Output: 'static + { + let future = crate::util::trace::task(future, "local", name); + CURRENT.with(|maybe_cx| { + let cx = maybe_cx + .expect("`spawn_local` called from outside of a `task::LocalSet`"); + + let (handle, notified) = cx.owned.bind(future, cx.shared.clone()); + + if let Some(notified) = notified { + cx.shared.schedule(notified); + } + + handle + }) + } +} + +/// Initial queue capacity. +const INITIAL_CAPACITY: usize = 64; + +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often it check the remote queue first. +const REMOTE_FIRST_INTERVAL: u8 = 31; + +impl LocalSet { + /// Returns a new local task set. + pub fn new() -> LocalSet { + LocalSet { + tick: Cell::new(0), + context: Context { + owned: LocalOwnedTasks::new(), + queue: VecDequeCell::with_capacity(INITIAL_CAPACITY), + shared: Arc::new(Shared { + queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), + waker: AtomicWaker::new(), + }), + }, + _not_send: PhantomData, + } + } + + /// Spawns a `!Send` task onto the local task set. + /// + /// This task is guaranteed to be run on the current thread. + /// + /// Unlike the free function [`spawn_local`], this method may be used to + /// spawn local tasks when the task set is _not_ running. For example: + /// ```rust + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// let local = task::LocalSet::new(); + /// + /// // Spawn a future on the local set. This future will be run when + /// // we call `run_until` to drive the task set. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// // Run the local task set. + /// local.run_until(async move { + /// // ... + /// }).await; + /// + /// // When `run` finishes, we can spawn _more_ futures, which will + /// // run in subsequent calls to `run_until`. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// local.run_until(async move { + /// // ... + /// }).await; + /// } + /// ``` + /// [`spawn_local`]: fn@spawn_local + #[track_caller] + pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + let future = crate::util::trace::task(future, "local", None); + + let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone()); + + if let Some(notified) = notified { + self.context.shared.schedule(notified); + } + + self.context.shared.waker.wake(); + handle + } + + /// Runs a future to completion on the provided runtime, driving any local + /// futures spawned on this task set on the current thread. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. The future + /// may also call [`spawn_local`] to spawn_local additional local futures on the + /// current thread. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + /// + /// # Notes + /// + /// Since this function internally calls [`Runtime::block_on`], and drives + /// futures in the local task set inside that call to `block_on`, the local + /// futures may not use [in-place blocking]. If a blocking call needs to be + /// issued from a local task, the [`spawn_blocking`] API may be used instead. + /// + /// For example, this will panic: + /// ```should_panic + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::block_in_place(|| { + /// // ... + /// }); + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// This, however, will not panic: + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::spawn_blocking(|| { + /// // ... + /// }).await; + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// + /// [`spawn_local`]: fn@spawn_local + /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on + /// [in-place blocking]: fn@crate::task::block_in_place + /// [`spawn_blocking`]: fn@crate::task::spawn_blocking + #[cfg(feature = "rt")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] + pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output + where + F: Future, + { + rt.block_on(self.run_until(future)) + } + + /// Runs a future to completion on the local set, returning its output. + /// + /// This returns a future that runs the given future with a local set, + /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. + /// Any local futures spawned on the local set will be driven in the + /// background until the future passed to `run_until` completes. When the future + /// passed to `run` finishes, any local futures which have not completed + /// will remain on the local set, and will be driven on subsequent calls to + /// `run_until` or when [awaiting the local set] itself. + /// + /// # Examples + /// + /// ```rust + /// use tokio::task; + /// + /// #[tokio::main] + /// async fn main() { + /// task::LocalSet::new().run_until(async { + /// task::spawn_local(async move { + /// // ... + /// }).await.unwrap(); + /// // ... + /// }).await; + /// } + /// ``` + /// + /// [`spawn_local`]: fn@spawn_local + /// [awaiting the local set]: #awaiting-a-localset + pub async fn run_until<F>(&self, future: F) -> F::Output + where + F: Future, + { + let run_until = RunUntil { + future, + local_set: self, + }; + run_until.await + } + + /// Ticks the scheduler, returning whether the local future needs to be + /// notified again. + fn tick(&self) -> bool { + for _ in 0..MAX_TASKS_PER_TICK { + match self.next_task() { + // Run the task + // + // Safety: As spawned tasks are `!Send`, `run_unchecked` must be + // used. We are responsible for maintaining the invariant that + // `run_unchecked` is only called on threads that spawned the + // task initially. Because `LocalSet` itself is `!Send`, and + // `spawn_local` spawns into the `LocalSet` on the current + // thread, the invariant is maintained. + Some(task) => crate::coop::budget(|| task.run()), + // We have fully drained the queue of notified tasks, so the + // local future doesn't need to be notified again — it can wait + // until something else wakes a task in the local set. + None => return false, + } + } + + true + } + + fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> { + let tick = self.tick.get(); + self.tick.set(tick.wrapping_add(1)); + + let task = if tick % REMOTE_FIRST_INTERVAL == 0 { + self.context + .shared + .queue + .lock() + .as_mut() + .and_then(|queue| queue.pop_front()) + .or_else(|| self.context.queue.pop_front()) + } else { + self.context.queue.pop_front().or_else(|| { + self.context + .shared + .queue + .lock() + .as_mut() + .and_then(|queue| queue.pop_front()) + }) + }; + + task.map(|task| self.context.owned.assert_owner(task)) + } + + fn with<T>(&self, f: impl FnOnce() -> T) -> T { + CURRENT.set(&self.context, f) + } +} + +impl fmt::Debug for LocalSet { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LocalSet").finish() + } +} + +impl Future for LocalSet { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + // Register the waker before starting to work + self.context.shared.waker.register_by_ref(cx.waker()); + + if self.with(|| self.tick()) { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + Poll::Pending + } else if self.context.owned.is_empty() { + // If the scheduler has no remaining futures, we're done! + Poll::Ready(()) + } else { + // There are still futures in the local set, but we've polled all the + // futures in the run queue. Therefore, we can just return Pending + // since the remaining futures will be woken from somewhere else. + Poll::Pending + } + } +} + +impl Default for LocalSet { + fn default() -> LocalSet { + LocalSet::new() + } +} + +impl Drop for LocalSet { + fn drop(&mut self) { + self.with(|| { + // Shut down all tasks in the LocalOwnedTasks and close it to + // prevent new tasks from ever being added. + self.context.owned.close_and_shutdown_all(); + + // We already called shutdown on all tasks above, so there is no + // need to call shutdown. + for task in self.context.queue.take() { + drop(task); + } + + // Take the queue from the Shared object to prevent pushing + // notifications to it in the future. + let queue = self.context.shared.queue.lock().take().unwrap(); + for task in queue { + drop(task); + } + + assert!(self.context.owned.is_empty()); + }); + } +} + +// === impl LocalFuture === + +impl<T: Future> Future for RunUntil<'_, T> { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + me.local_set.with(|| { + me.local_set + .context + .shared + .waker + .register_by_ref(cx.waker()); + + let _no_blocking = crate::runtime::enter::disallow_blocking(); + let f = me.future; + + if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + return Poll::Ready(output); + } + + if me.local_set.tick() { + // If `tick` returns `true`, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + + Poll::Pending + }) + } +} + +impl Shared { + /// Schedule the provided task on the scheduler. + fn schedule(&self, task: task::Notified<Arc<Self>>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if cx.shared.ptr_eq(self) => { + cx.queue.push_back(task); + } + _ => { + // First check whether the queue is still there (if not, the + // LocalSet is dropped). Then push to it if so, and if not, + // do nothing. + let mut lock = self.queue.lock(); + + if let Some(queue) = lock.as_mut() { + queue.push_back(task); + drop(lock); + self.waker.wake(); + } + } + }); + } + + fn ptr_eq(&self, other: &Shared) -> bool { + std::ptr::eq(self, other) + } +} + +impl task::Schedule for Arc<Shared> { + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + assert!(cx.shared.ptr_eq(self)); + cx.owned.remove(task) + }) + } + + fn schedule(&self, task: task::Notified<Self>) { + Shared::schedule(self, task); + } +} diff --git a/third_party/rust/tokio/src/task/mod.rs b/third_party/rust/tokio/src/task/mod.rs new file mode 100644 index 0000000000..d532155a1f --- /dev/null +++ b/third_party/rust/tokio/src/task/mod.rs @@ -0,0 +1,317 @@ +//! Asynchronous green-threads. +//! +//! ## What are Tasks? +//! +//! A _task_ is a light weight, non-blocking unit of execution. A task is similar +//! to an OS thread, but rather than being managed by the OS scheduler, they are +//! managed by the [Tokio runtime][rt]. Another name for this general pattern is +//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's +//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as +//! something similar. +//! +//! Key points about tasks include: +//! +//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio +//! runtime rather than the operating system, creating new tasks or switching +//! between tasks does not require a context switch and has fairly low +//! overhead. Creating, running, and destroying large numbers of tasks is +//! quite cheap, especially compared to OS threads. +//! +//! * Tasks are scheduled **cooperatively**. Most operating systems implement +//! _preemptive multitasking_. This is a scheduling technique where the +//! operating system allows each thread to run for a period of time, and then +//! _preempts_ it, temporarily pausing that thread and switching to another. +//! Tasks, on the other hand, implement _cooperative multitasking_. In +//! cooperative multitasking, a task is allowed to run until it _yields_, +//! indicating to the Tokio runtime's scheduler that it cannot currently +//! continue executing. When a task yields, the Tokio runtime switches to +//! executing the next task. +//! +//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or +//! must synchronize with another thread, it _blocks_, allowing the OS to +//! schedule another thread. When a task cannot continue executing, it must +//! yield instead, allowing the Tokio runtime to schedule another task. Tasks +//! should generally not perform system calls or other operations that could +//! block a thread, as this would prevent other tasks running on the same +//! thread from executing as well. Instead, this module provides APIs for +//! running blocking operations in an asynchronous context. +//! +//! [rt]: crate::runtime +//! [green threads]: https://en.wikipedia.org/wiki/Green_threads +//! [Go's goroutines]: https://tour.golang.org/concurrency/1 +//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html +//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes +//! +//! ## Working with Tasks +//! +//! This module provides the following APIs for working with tasks: +//! +//! ### Spawning +//! +//! Perhaps the most important function in this module is [`task::spawn`]. This +//! function can be thought of as an async equivalent to the standard library's +//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other +//! [future], and creates a new task to run that work concurrently: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn doc() { +//! task::spawn(async { +//! // perform some work here... +//! }); +//! # } +//! ``` +//! +//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct. +//! A `JoinHandle` is itself a future which may be used to await the output of +//! the spawned task. For example: +//! +//! ``` +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let join = task::spawn(async { +//! // ... +//! "hello world!" +//! }); +//! +//! // ... +//! +//! // Await the result of the spawned task. +//! let result = join.await?; +//! assert_eq!(result, "hello world!"); +//! # Ok(()) +//! # } +//! ``` +//! +//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned +//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]. For +//! example: +//! +//! ``` +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() { +//! let join = task::spawn(async { +//! panic!("something bad happened!") +//! }); +//! +//! // The returned result indicates that the task failed. +//! assert!(join.await.is_err()); +//! # } +//! ``` +//! +//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt" +//! feature flag is enabled. +//! +//! [`task::spawn`]: crate::task::spawn() +//! [future]: std::future::Future +//! [`std::thread::spawn`]: std::thread::spawn +//! [`JoinHandle`]: crate::task::JoinHandle +//! [thread_join]: std::thread::JoinHandle +//! [`JoinError`]: crate::task::JoinError +//! +//! ### Blocking and Yielding +//! +//! As we discussed above, code running in asynchronous tasks should not perform +//! operations that can block. A blocking operation performed in a task running +//! on a thread that is also running other tasks would block the entire thread, +//! preventing other tasks from running. +//! +//! Instead, Tokio provides two APIs for running blocking operations in an +//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`]. +//! +//! Be aware that if you call a non-async method from async code, that non-async +//! method is still inside the asynchronous context, so you should also avoid +//! blocking operations there. This includes destructors of objects destroyed in +//! async code. +//! +//! #### spawn_blocking +//! +//! The `task::spawn_blocking` function is similar to the `task::spawn` function +//! discussed in the previous section, but rather than spawning an +//! _non-blocking_ future on the Tokio runtime, it instead spawns a +//! _blocking_ function on a dedicated thread pool for blocking tasks. For +//! example: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn docs() { +//! task::spawn_blocking(|| { +//! // do some compute-heavy work or call synchronous code +//! }); +//! # } +//! ``` +//! +//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle` +//! which we can use to await the result of the blocking operation: +//! +//! ```rust +//! # use tokio::task; +//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ +//! let join = task::spawn_blocking(|| { +//! // do some compute-heavy work or call synchronous code +//! "blocking completed" +//! }); +//! +//! let result = join.await?; +//! assert_eq!(result, "blocking completed"); +//! # Ok(()) +//! # } +//! ``` +//! +//! #### block_in_place +//! +//! When using the [multi-threaded runtime][rt-multi-thread], the [`task::block_in_place`] +//! function is also available. Like `task::spawn_blocking`, this function +//! allows running a blocking operation from an asynchronous context. Unlike +//! `spawn_blocking`, however, `block_in_place` works by transitioning the +//! _current_ worker thread to a blocking thread, moving other tasks running on +//! that thread to another worker thread. This can improve performance by avoiding +//! context switches. +//! +//! For example: +//! +//! ``` +//! use tokio::task; +//! +//! # async fn docs() { +//! let result = task::block_in_place(|| { +//! // do some compute-heavy work or call synchronous code +//! "blocking completed" +//! }); +//! +//! assert_eq!(result, "blocking completed"); +//! # } +//! ``` +//! +//! #### yield_now +//! +//! In addition, this module provides a [`task::yield_now`] async function +//! that is analogous to the standard library's [`thread::yield_now`]. Calling +//! and `await`ing this function will cause the current task to yield to the +//! Tokio runtime's scheduler, allowing other tasks to be +//! scheduled. Eventually, the yielding task will be polled again, allowing it +//! to execute. For example: +//! +//! ```rust +//! use tokio::task; +//! +//! # #[tokio::main] async fn main() { +//! async { +//! task::spawn(async { +//! // ... +//! println!("spawned task done!") +//! }); +//! +//! // Yield, allowing the newly-spawned task to execute first. +//! task::yield_now().await; +//! println!("main task done!"); +//! } +//! # .await; +//! # } +//! ``` +//! +//! ### Cooperative scheduling +//! +//! A single call to [`poll`] on a top-level task may potentially do a lot of +//! work before it returns `Poll::Pending`. If a task runs for a long period of +//! time without yielding back to the executor, it can starve other tasks +//! waiting on that executor to execute them, or drive underlying resources. +//! Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for +//! futures to collaborate with the executor to avoid starvation. +//! +//! Consider a future like this one: +//! +//! ``` +//! # use tokio_stream::{Stream, StreamExt}; +//! async fn drop_all<I: Stream + Unpin>(mut input: I) { +//! while let Some(_) = input.next().await {} +//! } +//! ``` +//! +//! It may look harmless, but consider what happens under heavy load if the +//! input stream is _always_ ready. If we spawn `drop_all`, the task will never +//! yield, and will starve other tasks and resources on the same executor. +//! +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to return to the executor periodically. +//! +//! +//! #### unconstrained +//! +//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative +//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to +//! Tokio. For example: +//! +//! ``` +//! # #[tokio::main] +//! # async fn main() { +//! use tokio::{task, sync::mpsc}; +//! +//! let fut = async { +//! let (tx, mut rx) = mpsc::unbounded_channel(); +//! +//! for i in 0..1000 { +//! let _ = tx.send(()); +//! // This will always be ready. If coop was in effect, this code would be forced to yield +//! // periodically. However, if left unconstrained, then this code will never yield. +//! rx.recv().await; +//! } +//! }; +//! +//! task::unconstrained(fut).await; +//! # } +//! ``` +//! +//! [`task::spawn_blocking`]: crate::task::spawn_blocking +//! [`task::block_in_place`]: crate::task::block_in_place +//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler +//! [`task::yield_now`]: crate::task::yield_now() +//! [`thread::yield_now`]: std::thread::yield_now +//! [`task::unconstrained`]: crate::task::unconstrained() +//! [`poll`]: method@std::future::Future::poll + +cfg_rt! { + pub use crate::runtime::task::{JoinError, JoinHandle}; + + mod blocking; + pub use blocking::spawn_blocking; + + mod spawn; + pub use spawn::spawn; + + cfg_rt_multi_thread! { + pub use blocking::block_in_place; + } + + mod yield_now; + pub use yield_now::yield_now; + + mod local; + pub use local::{spawn_local, LocalSet}; + + mod task_local; + pub use task_local::LocalKey; + + mod unconstrained; + pub use unconstrained::{unconstrained, Unconstrained}; + + cfg_unstable! { + mod join_set; + pub use join_set::JoinSet; + } + + cfg_trace! { + mod builder; + pub use builder::Builder; + } + + /// Task-related futures. + pub mod futures { + pub use super::task_local::TaskLocalFuture; + } +} diff --git a/third_party/rust/tokio/src/task/spawn.rs b/third_party/rust/tokio/src/task/spawn.rs new file mode 100644 index 0000000000..a9d736674c --- /dev/null +++ b/third_party/rust/tokio/src/task/spawn.rs @@ -0,0 +1,149 @@ +use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR}; + +use std::future::Future; + +cfg_rt! { + /// Spawns a new asynchronous task, returning a + /// [`JoinHandle`](super::JoinHandle) for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. The + /// spawned task may execute on the current thread, or it may be sent to a + /// different thread to be executed. The specifics depend on the current + /// [`Runtime`](crate::runtime::Runtime) configuration. + /// + /// There is no guarantee that a spawned task will execute to completion. + /// When a runtime is shutdown, all outstanding tasks are dropped, + /// regardless of the lifecycle of that task. + /// + /// This function must be called from the context of a Tokio runtime. Tasks running on + /// the Tokio runtime are always inside its context, but you can also enter the context + /// using the [`Runtime::enter`](crate::runtime::Runtime::enter()) method. + /// + /// # Examples + /// + /// In this example, a server is started and `spawn` is used to start a new task + /// that processes each received connection. + /// + /// ```no_run + /// use tokio::net::{TcpListener, TcpStream}; + /// + /// use std::io; + /// + /// async fn process(socket: TcpStream) { + /// // ... + /// # drop(socket); + /// } + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// let (socket, _) = listener.accept().await?; + /// + /// tokio::spawn(async move { + /// // Process each socket concurrently. + /// process(socket).await + /// }); + /// } + /// } + /// ``` + /// + /// # Panics + /// + /// Panics if called from **outside** of the Tokio runtime. + /// + /// # Using `!Send` values from a task + /// + /// The task supplied to `spawn` must implement `Send`. However, it is + /// possible to **use** `!Send` values from the task as long as they only + /// exist between calls to `.await`. + /// + /// For example, this will work: + /// + /// ``` + /// use tokio::task; + /// + /// use std::rc::Rc; + /// + /// fn use_rc(rc: Rc<()>) { + /// // Do stuff w/ rc + /// # drop(rc); + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// tokio::spawn(async { + /// // Force the `Rc` to stay in a scope with no `.await` + /// { + /// let rc = Rc::new(()); + /// use_rc(rc.clone()); + /// } + /// + /// task::yield_now().await; + /// }).await.unwrap(); + /// } + /// ``` + /// + /// This will **not** work: + /// + /// ```compile_fail + /// use tokio::task; + /// + /// use std::rc::Rc; + /// + /// fn use_rc(rc: Rc<()>) { + /// // Do stuff w/ rc + /// # drop(rc); + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// tokio::spawn(async { + /// let rc = Rc::new(()); + /// + /// task::yield_now().await; + /// + /// use_rc(rc.clone()); + /// }).await.unwrap(); + /// } + /// ``` + /// + /// Holding on to a `!Send` value across calls to `.await` will result in + /// an unfriendly compile error message similar to: + /// + /// ```text + /// `[... some type ...]` cannot be sent between threads safely + /// ``` + /// + /// or: + /// + /// ```text + /// error[E0391]: cycle detected when processing `main` + /// ``` + #[track_caller] + pub fn spawn<T>(future: T) -> JoinHandle<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // preventing stack overflows on debug mode, by quickly sending the + // task to the heap. + if cfg!(debug_assertions) && std::mem::size_of::<T>() > 2048 { + spawn_inner(Box::pin(future), None) + } else { + spawn_inner(future, None) + } + } + + #[track_caller] + pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR); + let task = crate::util::trace::task(future, "task", name); + spawn_handle.spawn(task) + } +} diff --git a/third_party/rust/tokio/src/task/task_local.rs b/third_party/rust/tokio/src/task/task_local.rs new file mode 100644 index 0000000000..949bbca3ee --- /dev/null +++ b/third_party/rust/tokio/src/task/task_local.rs @@ -0,0 +1,302 @@ +use pin_project_lite::pin_project; +use std::cell::RefCell; +use std::error::Error; +use std::future::Future; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, thread}; + +/// Declares a new task-local key of type [`tokio::task::LocalKey`]. +/// +/// # Syntax +/// +/// The macro wraps any number of static declarations and makes them local to the current task. +/// Publicity and attributes for each static is preserved. For example: +/// +/// # Examples +/// +/// ``` +/// # use tokio::task_local; +/// task_local! { +/// pub static ONE: u32; +/// +/// #[allow(unused)] +/// static TWO: f32; +/// } +/// # fn main() {} +/// ``` +/// +/// See [LocalKey documentation][`tokio::task::LocalKey`] for more +/// information. +/// +/// [`tokio::task::LocalKey`]: struct@crate::task::LocalKey +#[macro_export] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +macro_rules! task_local { + // empty (base case for the recursion) + () => {}; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + $crate::task_local!($($rest)*); + }; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __task_local_inner { + ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => { + $vis static $name: $crate::task::LocalKey<$t> = { + std::thread_local! { + static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None); + } + + $crate::task::LocalKey { inner: __KEY } + }; + }; +} + +/// A key for task-local data. +/// +/// This type is generated by the `task_local!` macro. +/// +/// Unlike [`std::thread::LocalKey`], `tokio::task::LocalKey` will +/// _not_ lazily initialize the value on first access. Instead, the +/// value is first initialized when the future containing +/// the task-local is first polled by a futures executor, like Tokio. +/// +/// # Examples +/// +/// ``` +/// # async fn dox() { +/// tokio::task_local! { +/// static NUMBER: u32; +/// } +/// +/// NUMBER.scope(1, async move { +/// assert_eq!(NUMBER.get(), 1); +/// }).await; +/// +/// NUMBER.scope(2, async move { +/// assert_eq!(NUMBER.get(), 2); +/// +/// NUMBER.scope(3, async move { +/// assert_eq!(NUMBER.get(), 3); +/// }).await; +/// }).await; +/// # } +/// ``` +/// [`std::thread::LocalKey`]: struct@std::thread::LocalKey +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub struct LocalKey<T: 'static> { + #[doc(hidden)] + pub inner: thread::LocalKey<RefCell<Option<T>>>, +} + +impl<T: 'static> LocalKey<T> { + /// Sets a value `T` as the task-local value for the future `F`. + /// + /// On completion of `scope`, the task-local will be dropped. + /// + /// ### Examples + /// + /// ``` + /// # async fn dox() { + /// tokio::task_local! { + /// static NUMBER: u32; + /// } + /// + /// NUMBER.scope(1, async move { + /// println!("task local value: {}", NUMBER.get()); + /// }).await; + /// # } + /// ``` + pub fn scope<F>(&'static self, value: T, f: F) -> TaskLocalFuture<T, F> + where + F: Future, + { + TaskLocalFuture { + local: self, + slot: Some(value), + future: f, + _pinned: PhantomPinned, + } + } + + /// Sets a value `T` as the task-local value for the closure `F`. + /// + /// On completion of `scope`, the task-local will be dropped. + /// + /// ### Examples + /// + /// ``` + /// # async fn dox() { + /// tokio::task_local! { + /// static NUMBER: u32; + /// } + /// + /// NUMBER.sync_scope(1, || { + /// println!("task local value: {}", NUMBER.get()); + /// }); + /// # } + /// ``` + pub fn sync_scope<F, R>(&'static self, value: T, f: F) -> R + where + F: FnOnce() -> R, + { + let scope = TaskLocalFuture { + local: self, + slot: Some(value), + future: (), + _pinned: PhantomPinned, + }; + crate::pin!(scope); + scope.with_task(|_| f()) + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// # Panics + /// + /// This function will panic if not called within the context + /// of a future containing a task-local with the corresponding key. + pub fn with<F, R>(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + self.try_with(f).expect( + "cannot access a Task Local Storage value \ + without setting it via `LocalKey::set`", + ) + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// If the task-local with the associated key is not present, this + /// method will return an `AccessError`. For a panicking variant, + /// see `with`. + pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError> + where + F: FnOnce(&T) -> R, + { + self.inner.with(|v| { + if let Some(val) = v.borrow().as_ref() { + Ok(f(val)) + } else { + Err(AccessError { _private: () }) + } + }) + } +} + +impl<T: Copy + 'static> LocalKey<T> { + /// Returns a copy of the task-local value + /// if the task-local value implements `Copy`. + pub fn get(&'static self) -> T { + self.with(|v| *v) + } +} + +impl<T: 'static> fmt::Debug for LocalKey<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("LocalKey { .. }") + } +} + +pin_project! { + /// A future that sets a value `T` of a task local for the future `F` during + /// its execution. + /// + /// The value of the task-local must be `'static` and will be dropped on the + /// completion of the future. + /// + /// Created by the function [`LocalKey::scope`](self::LocalKey::scope). + /// + /// ### Examples + /// + /// ``` + /// # async fn dox() { + /// tokio::task_local! { + /// static NUMBER: u32; + /// } + /// + /// NUMBER.scope(1, async move { + /// println!("task local value: {}", NUMBER.get()); + /// }).await; + /// # } + /// ``` + pub struct TaskLocalFuture<T, F> + where + T: 'static + { + local: &'static LocalKey<T>, + slot: Option<T>, + #[pin] + future: F, + #[pin] + _pinned: PhantomPinned, + } +} + +impl<T: 'static, F> TaskLocalFuture<T, F> { + fn with_task<F2: FnOnce(Pin<&mut F>) -> R, R>(self: Pin<&mut Self>, f: F2) -> R { + struct Guard<'a, T: 'static> { + local: &'static LocalKey<T>, + slot: &'a mut Option<T>, + prev: Option<T>, + } + + impl<T> Drop for Guard<'_, T> { + fn drop(&mut self) { + let value = self.local.inner.with(|c| c.replace(self.prev.take())); + *self.slot = value; + } + } + + let project = self.project(); + let val = project.slot.take(); + + let prev = project.local.inner.with(|c| c.replace(val)); + + let _guard = Guard { + prev, + slot: project.slot, + local: *project.local, + }; + + f(project.future) + } +} + +impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.with_task(|f| f.poll(cx)) + } +} + +/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with). +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct AccessError { + _private: (), +} + +impl fmt::Debug for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AccessError").finish() + } +} + +impl fmt::Display for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("task-local value not set", f) + } +} + +impl Error for AccessError {} diff --git a/third_party/rust/tokio/src/task/unconstrained.rs b/third_party/rust/tokio/src/task/unconstrained.rs new file mode 100644 index 0000000000..31c732bfc9 --- /dev/null +++ b/third_party/rust/tokio/src/task/unconstrained.rs @@ -0,0 +1,45 @@ +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`unconstrained`](unconstrained) method. + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] + #[must_use = "Unconstrained does nothing unless polled"] + pub struct Unconstrained<F> { + #[pin] + inner: F, + } +} + +impl<F> Future for Unconstrained<F> +where + F: Future, +{ + type Output = <F as Future>::Output; + + cfg_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + crate::coop::with_unconstrained(|| inner.poll(cx)) + } + } + + cfg_not_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + inner.poll(cx) + } + } +} + +/// Turn off cooperative scheduling for a future. The future will never be forced to yield by +/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields +/// otherwise. +/// +/// See also the usage example in the [task module](index.html#unconstrained). +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub fn unconstrained<F>(inner: F) -> Unconstrained<F> { + Unconstrained { inner } +} diff --git a/third_party/rust/tokio/src/task/yield_now.rs b/third_party/rust/tokio/src/task/yield_now.rs new file mode 100644 index 0000000000..148e3dc0c8 --- /dev/null +++ b/third_party/rust/tokio/src/task/yield_now.rs @@ -0,0 +1,58 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Yields execution back to the Tokio runtime. +/// +/// A task yields by awaiting on `yield_now()`, and may resume when that future +/// completes (with no output.) The current task will be re-added as a pending +/// task at the _back_ of the pending queue. Any other pending tasks will be +/// scheduled. No other waking is required for the task to continue. +/// +/// See also the usage example in the [task module](index.html#yield_now). +/// +/// ## Non-guarantees +/// +/// This function may not yield all the way up to the executor if there are any +/// special combinators above it in the call stack. For example, if a +/// [`tokio::select!`] has another branch complete during the same poll as the +/// `yield_now()`, then the yield is not propagated all the way up to the +/// runtime. +/// +/// It is generally not guaranteed that the runtime behaves like you expect it +/// to when deciding which task to schedule next after a call to `yield_now()`. +/// In particular, the runtime may choose to poll the task that just ran +/// `yield_now()` again immediately without polling any other tasks first. For +/// example, the runtime will not drive the IO driver between every poll of a +/// task, and this could result in the runtime polling the current task again +/// immediately even if there is another task that could make progress if that +/// other task is waiting for a notification from the IO driver. +/// +/// In general, changes to the order in which the runtime polls tasks is not +/// considered a breaking change, and your program should be correct no matter +/// which order the runtime polls your tasks in. +/// +/// [`tokio::select!`]: macro@crate::select +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub async fn yield_now() { + /// Yield implementation + struct YieldNow { + yielded: bool, + } + + impl Future for YieldNow { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.yielded { + return Poll::Ready(()); + } + + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + YieldNow { yielded: false }.await +} |