diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/mod.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/mod.rs | 623 |
1 files changed, 623 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/mod.rs b/third_party/rust/tokio/src/runtime/mod.rs new file mode 100644 index 0000000000..7c381b0bbd --- /dev/null +++ b/third_party/rust/tokio/src/runtime/mod.rs @@ -0,0 +1,623 @@ +//! The Tokio runtime. +//! +//! Unlike other Rust programs, asynchronous applications require runtime +//! support. In particular, the following runtime services are necessary: +//! +//! * An **I/O event loop**, called the driver, which drives I/O resources and +//! dispatches I/O events to tasks that depend on them. +//! * A **scheduler** to execute [tasks] that use these I/O resources. +//! * A **timer** for scheduling work to run after a set period of time. +//! +//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing +//! them to be started, shut down, and configured together. However, often it is +//! not required to configure a [`Runtime`] manually, and a user may just use the +//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. +//! +//! # Usage +//! +//! When no fine tuning is required, the [`tokio::main`] attribute macro can be +//! used. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::io::{AsyncReadExt, AsyncWriteExt}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! } +//! ``` +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::io::{AsyncReadExt, AsyncWriteExt}; +//! use tokio::runtime::Runtime; +//! +//! fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create the runtime +//! let rt = Runtime::new()?; +//! +//! // Spawn the root task +//! rt.block_on(async { +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! }) +//! } +//! ``` +//! +//! ## Runtime Configurations +//! +//! Tokio provides multiple task scheduling strategies, suitable for different +//! applications. The [runtime builder] or `#[tokio::main]` attribute may be +//! used to select which scheduler to use. +//! +//! #### Multi-Thread Scheduler +//! +//! The multi-thread scheduler executes futures on a _thread pool_, using a +//! work-stealing strategy. By default, it will start a worker thread for each +//! CPU core available on the system. This tends to be the ideal configuration +//! for most applications. The multi-thread scheduler requires the `rt-multi-thread` +//! feature flag, and is selected by default: +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let threaded_rt = runtime::Runtime::new()?; +//! # Ok(()) } +//! ``` +//! +//! Most applications should use the multi-thread scheduler, except in some +//! niche use-cases, such as when running only a single thread is required. +//! +//! #### Current-Thread Scheduler +//! +//! The current-thread scheduler provides a _single-threaded_ future executor. +//! All tasks will be created and executed on the current thread. This requires +//! the `rt` feature flag. +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let basic_rt = runtime::Builder::new_current_thread() +//! .build()?; +//! # Ok(()) } +//! ``` +//! +//! #### Resource drivers +//! +//! When configuring a runtime by hand, no resource drivers are enabled by +//! default. In this case, attempting to use networking types or time types will +//! fail. In order to enable these types, the resource drivers must be enabled. +//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a +//! shorthand, [`Builder::enable_all`] enables both resource drivers. +//! +//! ## Lifetime of spawned threads +//! +//! The runtime may spawn threads depending on its configuration and usage. The +//! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking` +//! calls. +//! +//! While the `Runtime` is active, threads may shutdown after periods of being +//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. +//! Any tasks that have not yet completed will be dropped. +//! +//! [tasks]: crate::task +//! [`Runtime`]: Runtime +//! [`tokio::spawn`]: crate::spawn +//! [`tokio::main`]: ../attr.main.html +//! [runtime builder]: crate::runtime::Builder +//! [`Runtime::new`]: crate::runtime::Runtime::new +//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler +//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler +//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io +//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time +//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all + +// At the top due to macros +#[cfg(test)] +#[cfg(not(target_arch = "wasm32"))] +#[macro_use] +mod tests; + +pub(crate) mod enter; + +pub(crate) mod task; + +cfg_metrics! { + mod metrics; + pub use metrics::RuntimeMetrics; + + pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; +} + +cfg_not_metrics! { + pub(crate) mod metrics; + pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch}; +} + +cfg_rt! { + mod basic_scheduler; + use basic_scheduler::BasicScheduler; + + mod blocking; + use blocking::BlockingPool; + pub(crate) use blocking::spawn_blocking; + + cfg_trace! { + pub(crate) use blocking::Mandatory; + } + + cfg_fs! { + pub(crate) use blocking::spawn_mandatory_blocking; + } + + mod builder; + pub use self::builder::Builder; + + pub(crate) mod context; + pub(crate) mod driver; + + use self::enter::enter; + + mod handle; + pub use handle::{EnterGuard, Handle, TryCurrentError}; + + mod spawner; + use self::spawner::Spawner; +} + +cfg_rt_multi_thread! { + mod park; + use park::Parker; +} + +cfg_rt_multi_thread! { + mod queue; + + pub(crate) mod thread_pool; + use self::thread_pool::ThreadPool; +} + +cfg_rt! { + use crate::task::JoinHandle; + + use std::future::Future; + use std::time::Duration; + + /// The Tokio runtime. + /// + /// The runtime provides an I/O driver, task scheduler, [timer], and + /// blocking pool, necessary for running asynchronous tasks. + /// + /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. + /// However, most users will use the `#[tokio::main]` annotation on their + /// entry point instead. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Shutdown + /// + /// Shutting down the runtime is done by dropping the value. The current + /// thread will block until the shut down operation has completed. + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// # Sharing + /// + /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it + /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently + /// across multiple threads. + /// + /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of + /// the runtime type and this can be achieved via `Arc::try_unwrap` when only + /// one strong count reference is left over. + /// + /// [timer]: crate::time + /// [mod]: index.html + /// [`new`]: method@Self::new + /// [`Builder`]: struct@Builder + #[derive(Debug)] + pub struct Runtime { + /// Task executor + kind: Kind, + + /// Handle to runtime, also contains driver handles + handle: Handle, + + /// Blocking pool handle, used to signal shutdown + blocking_pool: BlockingPool, + } + + /// The runtime executor is either a thread-pool or a current-thread executor. + #[derive(Debug)] + enum Kind { + /// Execute all tasks on the current-thread. + CurrentThread(BasicScheduler), + + /// Execute tasks across multiple threads. + #[cfg(feature = "rt-multi-thread")] + ThreadPool(ThreadPool), + } + + /// After thread starts / before thread stops + type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; + + impl Runtime { + /// Creates a new runtime instance with default configuration values. + /// + /// This results in the multi threaded scheduler, I/O driver, and time driver being + /// initialized. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When a more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [basic scheduler]: index.html#basic-scheduler + /// [runtime builder]: crate::runtime::Builder + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new() -> std::io::Result<Runtime> { + Builder::new_multi_thread().enable_all().build() + } + + /// Returns a handle to the runtime's spawner. + /// + /// The returned handle can be used to spawn tasks that run on this runtime, and can + /// be cloned to allow moving the `Handle` to other threads. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// + /// // Use the handle... + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Spawns a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + #[track_caller] + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn(future) + } + + /// Runs the provided function on an executor dedicated to blocking operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a blocking function onto the runtime + /// rt.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } + #[track_caller] + pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Runs a future to completion on the Tokio runtime. This is the + /// runtime's entry point. + /// + /// This runs the given future on the current thread, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers + /// which the future spawns internally will be executed on the runtime. + /// + /// # Multi thread scheduler + /// + /// When the multi thread scheduler is used this will allow futures + /// to run within the io driver and timer context of the overall runtime. + /// + /// # Current thread scheduler + /// + /// When the current thread scheduler is enabled `block_on` + /// can be called concurrently from multiple threads. The first call + /// will take ownership of the io and timer drivers. This means + /// other threads which do not own the drivers will hook into that one. + /// When the first `block_on` completes, other threads will be able to + /// "steal" the driver to allow continued execution of their futures. + /// + /// # Panics + /// + /// This function panics if the provided future panics, or if called within an + /// asynchronous execution context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Execute the future, blocking the current thread until completion + /// rt.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// [handle]: fn@Handle::block_on + #[track_caller] + pub fn block_on<F: Future>(&self, future: F) -> F::Output { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "block_on", None); + + let _enter = self.enter(); + + match &self.kind { + Kind::CurrentThread(exec) => exec.block_on(future), + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(exec) => exec.block_on(future), + } + } + + /// Enters the runtime context. + /// + /// This allows you to construct types that must have an executor + /// available on creation such as [`Sleep`] or [`TcpStream`]. It will + /// also allow you to call methods such as [`tokio::spawn`]. + /// + /// [`Sleep`]: struct@crate::time::Sleep + /// [`TcpStream`]: struct@crate::net::TcpStream + /// [`tokio::spawn`]: fn@crate::spawn + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn function_that_spawns(msg: String) { + /// // Had we not used `rt.enter` below, this would panic. + /// tokio::spawn(async move { + /// println!("{}", msg); + /// }); + /// } + /// + /// fn main() { + /// let rt = Runtime::new().unwrap(); + /// + /// let s = "Hello World!".to_string(); + /// + /// // By entering the context, we tie `tokio::spawn` to this executor. + /// let _guard = rt.enter(); + /// function_that_spawns(s); + /// } + /// ``` + pub fn enter(&self) -> EnterGuard<'_> { + self.handle.enter() + } + + /// Shuts down the runtime, waiting for at most `duration` for all spawned + /// task to shutdown. + /// + /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to + /// shutdown in a timely fashion. However, dropping a `Runtime` will wait + /// indefinitely for all tasks to terminate, and there are cases where a long + /// blocking task has been spawned, which can block dropping `Runtime`. + /// + /// In this case, calling `shutdown_timeout` with an explicit wait timeout + /// can work. The `shutdown_timeout` will signal all tasks to shutdown and + /// will wait for at most `duration` for all spawned tasks to terminate. If + /// `timeout` elapses before all tasks are dropped, the function returns and + /// outstanding tasks are potentially leaked. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// task::spawn_blocking(move || { + /// thread::sleep(Duration::from_secs(10_000)); + /// }); + /// }); + /// + /// runtime.shutdown_timeout(Duration::from_millis(100)); + /// } + /// ``` + pub fn shutdown_timeout(mut self, duration: Duration) { + // Wakeup and shutdown all the worker threads + self.handle.clone().shutdown(); + self.blocking_pool.shutdown(Some(duration)); + } + + /// Shuts down the runtime, without waiting for any spawned tasks to shutdown. + /// + /// This can be useful if you want to drop a runtime from within another runtime. + /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks + /// to complete, which would normally not be permitted within an asynchronous context. + /// By calling `shutdown_background()`, you can drop the runtime from such a context. + /// + /// Note however, that because we do not wait for any blocking tasks to complete, this + /// may result in a resource leak (in that any blocking tasks are still running until they + /// return. + /// + /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// let inner_runtime = Runtime::new().unwrap(); + /// // ... + /// inner_runtime.shutdown_background(); + /// }); + /// } + /// ``` + pub fn shutdown_background(self) { + self.shutdown_timeout(Duration::from_nanos(0)) + } + } + + #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let + impl Drop for Runtime { + fn drop(&mut self) { + match &mut self.kind { + Kind::CurrentThread(basic) => { + // This ensures that tasks spawned on the basic runtime are dropped inside the + // runtime's context. + match self::context::try_enter(self.handle.clone()) { + Some(guard) => basic.set_context_guard(guard), + None => { + // The context thread-local has already been destroyed. + // + // We don't set the guard in this case. Calls to tokio::spawn in task + // destructors would fail regardless if this happens. + }, + } + }, + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(_) => { + // The threaded scheduler drops its tasks on its worker threads, which is + // already in the runtime's context. + }, + } + } + } + + cfg_metrics! { + impl Runtime { + /// TODO + pub fn metrics(&self) -> RuntimeMetrics { + self.handle.metrics() + } + } + } +} |