diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.11/src/runtime')
7 files changed, 1292 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/src/runtime/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs new file mode 100644 index 0000000000..43eb5ddee1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs @@ -0,0 +1,261 @@ +use runtime::{Inner, Runtime}; + +use reactor::Reactor; + +use std::io; + +use tokio_reactor; +use tokio_threadpool::Builder as ThreadPoolBuilder; +use tokio_threadpool::park::DefaultPark; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; + +/// Builds Tokio Runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// # extern crate tokio; +/// # extern crate tokio_threadpool; +/// # use tokio::runtime::Builder; +/// +/// # pub fn main() { +/// // create and configure ThreadPool +/// let mut threadpool_builder = tokio_threadpool::Builder::new(); +/// threadpool_builder +/// .name_prefix("my-runtime-worker-") +/// .pool_size(4); +/// +/// // build Runtime +/// let runtime = Builder::new() +/// .threadpool_builder(threadpool_builder) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// Thread pool specific builder + threadpool_builder: ThreadPoolBuilder, + + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + let mut threadpool_builder = ThreadPoolBuilder::new(); + threadpool_builder.name_prefix("tokio-runtime-worker-"); + + Builder { + threadpool_builder, + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Set builder to set up the thread pool instance. + #[deprecated( + since="0.1.9", + note="use the `core_threads`, `blocking_threads`, `name_prefix`, \ + and `stack_size` functions on `runtime::Builder`, instead")] + #[doc(hidden)] + pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { + self.threadpool_builder = val; + self + } + + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .core_threads(4) + /// .build() + /// .unwrap(); + /// # } + /// ``` + pub fn core_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.pool_size(val); + self + } + + /// Set the maximum number of concurrent blocking sections in the `Runtime`'s + /// thread pool. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .blocking_threads(200) + /// .build(); + /// # } + /// ``` + pub fn blocking_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.max_blocking(val); + self + } + + /// Set name prefix of threads spawned by the `Runtime`'s thread pool. + /// + /// Thread name prefix is used for generating thread names. For example, if + /// prefix is `my-pool-`, then threads in the pool will get names like + /// `my-pool-1` etc. + /// + /// The default prefix is "tokio-runtime-worker-". + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .name_prefix("my-pool-") + /// .build(); + /// # } + /// ``` + pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.threadpool_builder.name_prefix(val); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.stack_size(val); + self + } + + /// Create the configured `Runtime`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::runtime::Builder; + /// # pub fn main() { + /// let runtime = Builder::new().build().unwrap(); + /// // ... call runtime.run(...) + /// # let _ = runtime; + /// # } + /// ``` + pub fn build(&mut self) -> io::Result<Runtime> { + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + // Get a handle to the clock for the runtime. + let clock1 = self.clock.clone(); + let clock2 = clock1.clone(); + + let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new())); + let t1 = timers.clone(); + + // Spawn a reactor on a background thread. + let reactor = Reactor::new()?.background()?; + + // Get a handle to the reactor. + let reactor_handle = reactor.handle().clone(); + + let pool = self.threadpool_builder + .around_worker(move |w, enter| { + let timer_handle = t1.lock().unwrap() + .get(w.id()).unwrap() + .clone(); + + tokio_reactor::with_default(&reactor_handle, enter, |enter| { + clock::with_default(&clock1, enter, |enter| { + timer::with_default(&timer_handle, enter, |_| { + w.run(); + }); + }) + }); + }) + .custom_park(move |worker_id| { + // Create a new timer + let timer = Timer::new_with_now(DefaultPark::new(), clock2.clone()); + + timers.lock().unwrap() + .insert(worker_id.clone(), timer.handle()); + + timer + }) + .build(); + + Ok(Runtime { + inner: Some(Inner { + reactor, + pool, + }), + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs new file mode 100644 index 0000000000..72960fadf2 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs @@ -0,0 +1,88 @@ +use executor::current_thread::CurrentThread; +use runtime::current_thread::Runtime; + +use tokio_reactor::Reactor; +use tokio_timer::clock::Clock; +use tokio_timer::timer::Timer; + +use std::io; + +/// Builds a Single-threaded runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// extern crate tokio; +/// extern crate tokio_timer; +/// +/// use tokio::runtime::current_thread::Builder; +/// use tokio_timer::clock::Clock; +/// +/// # pub fn main() { +/// // build Runtime +/// let runtime = Builder::new() +/// .clock(Clock::new()) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + Builder { + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Create the configured `Runtime`. + pub fn build(&mut self) -> io::Result<Runtime> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + let timer_handle = timer.handle(); + + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor to generate some new stimuli for the + // futures to continue in their life. + let executor = CurrentThread::new_with_park(timer); + + let runtime = Runtime::new2( + reactor_handle, + timer_handle, + self.clock.clone(), + executor); + + Ok(runtime) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs new file mode 100644 index 0000000000..dca41711e8 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs @@ -0,0 +1,92 @@ +//! A runtime implementation that runs everything on the current thread. +//! +//! [`current_thread::Runtime`][rt] is similar to the primary +//! [`Runtime`][concurrent-rt] except that it runs all components on the current +//! thread instead of using a thread pool. This means that it is able to spawn +//! futures that do not implement `Send`. +//! +//! Same as the default [`Runtime`][concurrent-rt], the +//! [`current_thread::Runtime`][rt] includes: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself +//! and cannot be safely moved to other threads. +//! +//! # Spawning from other threads +//! +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! use std::thread; +//! +//! # fn main() { +//! let mut runtime = Runtime::new().unwrap(); +//! let handle = runtime.handle(); +//! +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); +//! +//! # /* +//! runtime.run().unwrap(); +//! # */ +//! # } +//! ``` +//! +//! # Examples +//! +//! Creating a new `Runtime` and running a future `f` until its completion and +//! returning its result. +//! +//! ``` +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! +//! let mut runtime = Runtime::new().unwrap(); +//! +//! // Use the runtime... +//! // runtime.block_on(f); // where f is a future +//! ``` +//! +//! [rt]: struct.Runtime.html +//! [concurrent-rt]: ../struct.Runtime.html +//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html +//! [reactor]: ../../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../../timer/index.html + +mod builder; +mod runtime; + +pub use self::builder::Builder; +pub use self::runtime::{Runtime, Handle}; +pub use tokio_current_thread::spawn; +pub use tokio_current_thread::TaskExecutor; + +use futures::Future; + +/// Run the provided future to completion using a runtime running on the current thread. +/// +/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, +/// which blocks the current thread until the provided future completes. It then calls +/// [`Runtime::run`] to wait for any other spawned futures to resolve. +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let v = r.block_on(future)?; + r.run().expect("failed to resolve remaining futures"); + Ok(v) +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs new file mode 100644 index 0000000000..262cb1e72d --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs @@ -0,0 +1,234 @@ +use tokio_current_thread::{self as current_thread, CurrentThread}; +use tokio_current_thread::Handle as ExecutorHandle; +use runtime::current_thread::Builder; + +use tokio_reactor::{self, Reactor}; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; +use tokio_executor; + +use futures::{future, Future}; + +use std::fmt; +use std::error::Error; +use std::io; + +/// Single-threaded runtime provides a way to start reactor +/// and executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Debug)] +pub struct Runtime { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>, +} + +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where F: Future<Item = (), Error = ()> + Send + 'static { + self.0.spawn(future) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl<T> future::Executor<T> for Handle +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + inner: current_thread::RunError, +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.inner) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + self.inner.description() + } + fn cause(&self) -> Option<&Error> { + self.inner.cause() + } +} + +impl Runtime { + /// Returns a new runtime initialized with default configuration values. + pub fn new() -> io::Result<Runtime> { + Builder::new().build() + } + + pub(super) fn new2( + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>) -> Runtime + { + Runtime { + reactor_handle, + timer_handle, + clock, + executor, + } + } + + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + + /// Spawn a future onto the single-threaded Tokio runtime. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::current_thread::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("running on the runtime"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.executor.spawn(future); + self + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error> + where F: Future + { + self.enter(|executor| { + // Run the provided future + let ret = executor.block_on(f); + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + }) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.enter(|executor| executor.run()) + .map_err(|e| RunError { + inner: e, + }) + } + + fn enter<F, R>(&mut self, f: F) -> R + where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R + { + let Runtime { + ref reactor_handle, + ref timer_handle, + ref clock, + ref mut executor, + .. + } = *self; + + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + + // This will set the default handle and timer to use inside the closure + // and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + clock::with_default(clock, enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the + // current single-threaded executor when used. This is a trick, + // because we need two mutable references to the executor (one + // to run the provided future, another to install as the default + // one). We use the fake one here as the default one. + let mut default_executor = current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + f(&mut executor) + }) + }) + }) + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs new file mode 100644 index 0000000000..9ff0cc4c2f --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs @@ -0,0 +1,496 @@ +//! A batteries included runtime for applications using Tokio. +//! +//! Applications using Tokio require some runtime support in order to work: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! While it is possible to setup each component manually, this involves a bunch +//! of boilerplate. +//! +//! [`Runtime`] bundles all of these various runtime components into a single +//! handle that can be started and shutdown together, eliminating the necessary +//! boilerplate to run a Tokio application. +//! +//! Most applications wont need to use [`Runtime`] directly. Instead, they will +//! use the [`run`] function, which uses [`Runtime`] under the hood. +//! +//! Creating a [`Runtime`] does the following: +//! +//! * Spawn a background thread running a [`Reactor`] instance. +//! * Start a [`ThreadPool`] for executing futures. +//! * Run an instance of [`Timer`] **per** thread pool worker thread. +//! +//! The thread pool uses a work-stealing strategy and is configured to start a +//! worker thread for each CPU core available on the system. This tends to be +//! the ideal setup for Tokio applications. +//! +//! A timer per thread pool worker thread is used to minimize the amount of +//! synchronization that is required for working with the timer. +//! +//! # Usage +//! +//! Most applications will use the [`run`] function. This takes a future to +//! "seed" the application, blocking the thread until the runtime becomes +//! [idle]. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! tokio::run(server); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! In this function, the `run` function blocks until the runtime becomes idle. +//! See [`shutdown_on_idle`][idle] for more shutdown details. +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::runtime::Runtime; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! // Create the runtime +//! let mut rt = Runtime::new().unwrap(); +//! +//! // Spawn the server task +//! rt.spawn(server); +//! +//! // Wait until the runtime becomes idle and shut it down. +//! rt.shutdown_on_idle() +//! .wait().unwrap(); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! [reactor]: ../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../timer/index.html +//! [`Runtime`]: struct.Runtime.html +//! [`Reactor`]: ../reactor/struct.Reactor.html +//! [`ThreadPool`]: ../executor/thread_pool/struct.ThreadPool.html +//! [`run`]: fn.run.html +//! [idle]: struct.Runtime.html#method.shutdown_on_idle +//! [`tokio::spawn`]: ../executor/fn.spawn.html +//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html + +mod builder; +pub mod current_thread; +mod shutdown; +mod task_executor; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Background, Handle}; + +use std::io; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + /// Reactor running on a background thread. + reactor: Background, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run<F>(future: F) +where F: Future<Item = (), Error = ()> + Send + 'static, +{ + let mut runtime = Runtime::new().unwrap(); + runtime.spawn(future); + enter().expect("nested tokio::run") + .block_on(runtime.shutdown_on_idle()) + .unwrap(); +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result<Self> { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + pub fn reactor(&self) -> &Handle { + self.inner().reactor.handle() + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let res = self.block_on(future); + self.shutdown_on_idle().wait().unwrap(); + res + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_on_idle().and_then(|_| { + reactor.shutdown_on_idle() + }) + }); + + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut Inner { + self.inner.as_mut().unwrap() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let shutdown = Shutdown::shutdown_now(inner); + let _ = shutdown.wait(); + } + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs new file mode 100644 index 0000000000..1aca557277 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs @@ -0,0 +1,46 @@ +use runtime::Inner; + +use std::fmt; + +use futures::{Future, Poll}; + +/// A future that resolves when the Tokio `Runtime` is shut down. +pub struct Shutdown { + pub(super) inner: Box<Future<Item = (), Error = ()> + Send>, +} + +impl Shutdown { + pub(super) fn shutdown_now(inner: Inner) -> Self { + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_now().and_then(|_| { + reactor.shutdown_now() + .then(|_| { + Ok(()) + }) + }) + }); + + Shutdown { inner } + } +} + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + try_ready!(self.inner.poll()); + Ok(().into()) + } +} + +impl fmt::Debug for Shutdown { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Shutdown") + .field("inner", &"Box<Future<Item = (), Error = ()>>") + .finish() + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs new file mode 100644 index 0000000000..e213201ab0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs @@ -0,0 +1,75 @@ + +use tokio_threadpool::Sender; + +use futures::future::{self, Future}; + +/// Executes futures on the runtime +/// +/// All futures spawned using this executor will be submitted to the associated +/// Runtime's executor. This executor is usually a thread pool. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + pub(super) inner: Sender, +} + +impl TaskExecutor { + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// let executor = rt.executor(); + /// + /// // Spawn a future onto the runtime + /// executor.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&self, future: F) + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner.spawn(future).unwrap(); + } +} + +impl<T> future::Executor<T> for TaskExecutor +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + self.inner.execute(future) + } +} + +impl ::executor::Executor for TaskExecutor { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), ::executor::SpawnError> + { + self.inner.spawn(future) + } +} |