diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/mod.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/mod.rs | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/mod.rs b/third_party/rust/tokio/src/runtime/mod.rs new file mode 100644 index 0000000000..36b2b442ee --- /dev/null +++ b/third_party/rust/tokio/src/runtime/mod.rs @@ -0,0 +1,494 @@ +//! The Tokio runtime. +//! +//! Unlike other Rust programs, asynchronous applications require +//! runtime support. In particular, the following runtime services are +//! necessary: +//! +//! * An **I/O event loop**, called the driver, which drives I/O resources and +//! dispatches I/O events to tasks that depend on them. +//! * A **scheduler** to execute [tasks] that use these I/O resources. +//! * A **timer** for scheduling work to run after a set period of time. +//! +//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing +//! them to be started, shut down, and configured together. However, most +//! applications won't need to use [`Runtime`] directly. Instead, they can +//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under +//! the hood. +//! +//! # Usage +//! +//! Most applications will use the [`tokio::main`] attribute macro. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! } +//! ``` +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! use tokio::runtime::Runtime; +//! +//! fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create the runtime +//! let mut rt = Runtime::new()?; +//! +//! // Spawn the root task +//! rt.block_on(async { +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! }) +//! } +//! ``` +//! +//! ## Runtime Configurations +//! +//! Tokio provides multiple task scheduling strategies, suitable for different +//! applications. The [runtime builder] or `#[tokio::main]` attribute may be +//! used to select which scheduler to use. +//! +//! #### Basic Scheduler +//! +//! The basic scheduler provides a _single-threaded_ future executor. All tasks +//! will be created and executed on the current thread. The basic scheduler +//! requires the `rt-core` feature flag, and can be selected using the +//! [`Builder::basic_scheduler`] method: +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let basic_rt = runtime::Builder::new() +//! .basic_scheduler() +//! .build()?; +//! # Ok(()) } +//! ``` +//! +//! If the `rt-core` feature is enabled and `rt-threaded` is not, +//! [`Runtime::new`] will return a basic scheduler runtime by default. +//! +//! #### Threaded Scheduler +//! +//! The threaded scheduler executes futures on a _thread pool_, using a +//! work-stealing strategy. By default, it will start a worker thread for each +//! CPU core available on the system. This tends to be the ideal configurations +//! for most applications. The threaded scheduler requires the `rt-threaded` feature +//! flag, and can be selected using the [`Builder::threaded_scheduler`] method: +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let threaded_rt = runtime::Builder::new() +//! .threaded_scheduler() +//! .build()?; +//! # Ok(()) } +//! ``` +//! +//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a +//! threaded scheduler runtime by default. +//! +//! Most applications should use the threaded scheduler, except in some niche +//! use-cases, such as when running only a single thread is required. +//! +//! #### Resource drivers +//! +//! When configuring a runtime by hand, no resource drivers are enabled by +//! default. In this case, attempting to use networking types or time types will +//! fail. In order to enable these types, the resource drivers must be enabled. +//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a +//! shorthand, [`Builder::enable_all`] enables both resource drivers. +//! +//! ## Lifetime of spawned threads +//! +//! The runtime may spawn threads depending on its configuration and usage. The +//! threaded scheduler spawns threads to schedule tasks and calls to +//! `spawn_blocking` spawn threads to run blocking operations. +//! +//! While the `Runtime` is active, threads may shutdown after periods of being +//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. +//! Any tasks that have not yet completed will be dropped. +//! +//! [tasks]: crate::task +//! [`Runtime`]: Runtime +//! [`tokio::spawn`]: crate::spawn +//! [`tokio::main`]: ../attr.main.html +//! [runtime builder]: crate::runtime::Builder +//! [`Runtime::new`]: crate::runtime::Runtime::new +//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler +//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler +//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io +//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time +//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all + +// At the top due to macros +#[cfg(test)] +#[macro_use] +mod tests; + +pub(crate) mod context; + +cfg_rt_core! { + mod basic_scheduler; + use basic_scheduler::BasicScheduler; + + pub(crate) mod task; +} + +mod blocking; +use blocking::BlockingPool; + +cfg_blocking_impl! { + #[allow(unused_imports)] + pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; +} + +mod builder; +pub use self::builder::Builder; + +pub(crate) mod enter; +use self::enter::enter; + +mod handle; +pub use self::handle::{Handle, TryCurrentError}; + +mod io; + +cfg_rt_threaded! { + mod park; + use park::Parker; +} + +mod shell; +use self::shell::Shell; + +mod spawner; +use self::spawner::Spawner; + +mod time; + +cfg_rt_threaded! { + mod queue; + + pub(crate) mod thread_pool; + use self::thread_pool::ThreadPool; +} + +cfg_rt_core! { + use crate::task::JoinHandle; +} + +use std::future::Future; +use std::time::Duration; + +/// The Tokio runtime. +/// +/// The runtime provides an I/O [driver], task scheduler, [timer], and blocking +/// pool, necessary for running asynchronous tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use the `#[tokio::main]` annotation on their entry point instead. +/// +/// See [module level][mod] documentation for more details. +/// +/// # Shutdown +/// +/// Shutting down the runtime is done by dropping the value. The current thread +/// will block until the shut down operation has completed. +/// +/// * Drain any scheduled work queues. +/// * Drop any futures that have not yet completed. +/// * Drop the reactor. +/// +/// Once the reactor has dropped, any outstanding I/O resources bound to +/// that reactor will no longer function. Calling any method on them will +/// result in an error. +/// +/// [driver]: crate::io::driver +/// [timer]: crate::time +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct@Builder +/// [`tokio::run`]: fn@run +#[derive(Debug)] +pub struct Runtime { + /// Task executor + kind: Kind, + + /// Handle to runtime, also contains driver handles + handle: Handle, + + /// Blocking pool handle, used to signal shutdown + blocking_pool: BlockingPool, +} + +/// The runtime executor is either a thread-pool or a current-thread executor. +#[derive(Debug)] +enum Kind { + /// Not able to execute concurrent tasks. This variant is mostly used to get + /// access to the driver handles. + Shell(Shell), + + /// Execute all tasks on the current-thread. + #[cfg(feature = "rt-core")] + Basic(BasicScheduler<time::Driver>), + + /// Execute tasks across multiple threads. + #[cfg(feature = "rt-threaded")] + ThreadPool(ThreadPool), +} + +/// After thread starts / before thread stops +type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a scheduler, I/O driver, and time driver being + /// initialized. The type of scheduler used depends on what feature flags + /// are enabled: if the `rt-threaded` feature is enabled, the [threaded + /// scheduler] is used, while if only the `rt-core` feature is enabled, the + /// [basic scheduler] is used instead. + /// + /// If the threaded scheduler is selected, it will not spawn + /// any worker threads until it needs to, i.e. tasks are scheduled to run. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../../tokio_macros/attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [basic scheduler]: index.html#basic-scheduler + /// [runtime builder]: crate::runtime::Builder + pub fn new() -> io::Result<Runtime> { + #[cfg(feature = "rt-threaded")] + let ret = Builder::new().threaded_scheduler().enable_all().build(); + + #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] + let ret = Builder::new().basic_scheduler().enable_all().build(); + + #[cfg(not(feature = "rt-core"))] + let ret = Builder::new().enable_all().build(); + + ret + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + #[cfg(feature = "rt-core")] + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match &self.kind { + Kind::Shell(_) => panic!("task execution disabled"), + #[cfg(feature = "rt-threaded")] + Kind::ThreadPool(exec) => exec.spawn(future), + Kind::Basic(exec) => exec.spawn(future), + } + } + + /// Run a future to completion on the Tokio runtime. This is the runtime's + /// entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F: Future>(&mut self, future: F) -> F::Output { + let kind = &mut self.kind; + + self.handle.enter(|| match kind { + Kind::Shell(exec) => exec.block_on(future), + #[cfg(feature = "rt-core")] + Kind::Basic(exec) => exec.block_on(future), + #[cfg(feature = "rt-threaded")] + Kind::ThreadPool(exec) => exec.block_on(future), + }) + } + + /// Enter the runtime context. + pub fn enter<F, R>(&self, f: F) -> R + where + F: FnOnce() -> R, + { + self.handle.enter(f) + } + + /// Return a handle to the runtime's spawner. + /// + /// The returned handle can be used to spawn tasks that run on this runtime, and can + /// be cloned to allow moving the `Handle` to other threads. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// + /// handle.spawn(async { println!("hello"); }); + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Shutdown the runtime, waiting for at most `duration` for all spawned + /// task to shutdown. + /// + /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to + /// shutdown in a timely fashion. However, dropping a `Runtime` will wait + /// indefinitely for all tasks to terminate, and there are cases where a long + /// blocking task has been spawned which can block dropping `Runtime`. + /// + /// In this case, calling `shutdown_timeout` with an explicit wait timeout + /// can work. The `shutdown_timeout` will signal all tasks to shutdown and + /// will wait for at most `duration` for all spawned tasks to terminate. If + /// `timeout` elapses before all tasks are dropped, the function returns and + /// outstanding tasks are potentially leaked. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let mut runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// task::spawn_blocking(move || { + /// thread::sleep(Duration::from_secs(10_000)); + /// }); + /// }); + /// + /// runtime.shutdown_timeout(Duration::from_millis(100)); + /// } + /// ``` + pub fn shutdown_timeout(self, duration: Duration) { + let Runtime { + mut blocking_pool, .. + } = self; + blocking_pool.shutdown(Some(duration)); + } +} |