summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs')
-rw-r--r--third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs398
1 files changed, 398 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs
new file mode 100644
index 0000000000..b688a46745
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs
@@ -0,0 +1,398 @@
+mod builder;
+mod shutdown;
+mod task_executor;
+
+#[cfg(feature = "async-await-preview")]
+mod async_await;
+
+pub use self::builder::Builder;
+pub use self::shutdown::Shutdown;
+pub use self::task_executor::TaskExecutor;
+
+use reactor::{Handle, Reactor};
+
+use std::io;
+use std::sync::Mutex;
+
+use tokio_executor::enter;
+use tokio_threadpool as threadpool;
+
+use futures;
+use futures::future::Future;
+
+/// Handle to the Tokio runtime.
+///
+/// The Tokio runtime includes a reactor as well as an executor for running
+/// tasks.
+///
+/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
+/// most users will use [`tokio::run`], which uses a `Runtime` internally.
+///
+/// See [module level][mod] documentation for more details.
+///
+/// [mod]: index.html
+/// [`new`]: #method.new
+/// [`Builder`]: struct.Builder.html
+/// [`tokio::run`]: fn.run.html
+#[derive(Debug)]
+pub struct Runtime {
+ inner: Option<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ /// A handle to the reactor in the background thread.
+ reactor_handle: Handle,
+
+ // TODO: This should go away in 0.2
+ reactor: Mutex<Option<Reactor>>,
+
+ /// Task execution pool.
+ pool: threadpool::ThreadPool,
+}
+
+// ===== impl Runtime =====
+
+/// Start the Tokio runtime using the supplied future to bootstrap execution.
+///
+/// This function is used to bootstrap the execution of a Tokio application. It
+/// does the following:
+///
+/// * Start the Tokio runtime using a default configuration.
+/// * Spawn the given future onto the thread pool.
+/// * Block the current thread until the runtime shuts down.
+///
+/// Note that the function will not return immediately once `future` has
+/// completed. Instead it waits for the entire runtime to become idle.
+///
+/// See the [module level][mod] documentation for more details.
+///
+/// # Examples
+///
+/// ```rust
+/// # extern crate tokio;
+/// # extern crate futures;
+/// # use futures::{Future, Stream};
+/// use tokio::net::TcpListener;
+///
+/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
+/// # unimplemented!();
+/// # }
+/// # fn dox() {
+/// # let addr = "127.0.0.1:8080".parse().unwrap();
+/// let listener = TcpListener::bind(&addr).unwrap();
+///
+/// let server = listener.incoming()
+/// .map_err(|e| println!("error = {:?}", e))
+/// .for_each(|socket| {
+/// tokio::spawn(process(socket))
+/// });
+///
+/// tokio::run(server);
+/// # }
+/// # pub fn main() {}
+/// ```
+///
+/// # Panics
+///
+/// This function panics if called from the context of an executor.
+///
+/// [mod]: ../index.html
+pub fn run<F>(future: F)
+where F: Future<Item = (), Error = ()> + Send + 'static,
+{
+ // Check enter before creating a new Runtime...
+ let mut entered = enter().expect("nested tokio::run");
+ let mut runtime = Runtime::new().expect("failed to start new Runtime");
+ runtime.spawn(future);
+ entered
+ .block_on(runtime.shutdown_on_idle())
+ .expect("shutdown cannot error")
+}
+
+impl Runtime {
+ /// Create a new runtime instance with default configuration values.
+ ///
+ /// This results in a reactor, thread pool, and timer being initialized. The
+ /// thread pool will not spawn any worker threads until it needs to, i.e.
+ /// tasks are scheduled to run.
+ ///
+ /// Most users will not need to call this function directly, instead they
+ /// will use [`tokio::run`](fn.run.html).
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// Creating a new `Runtime` with default configuration values.
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::prelude::*;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// // Use the runtime...
+ ///
+ /// // Shutdown the runtime
+ /// rt.shutdown_now()
+ /// .wait().unwrap();
+ /// ```
+ ///
+ /// [mod]: index.html
+ pub fn new() -> io::Result<Self> {
+ Builder::new().build()
+ }
+
+ #[deprecated(since = "0.1.5", note = "use `reactor` instead")]
+ #[doc(hidden)]
+ pub fn handle(&self) -> &Handle {
+ #[allow(deprecated)]
+ self.reactor()
+ }
+
+ /// Return a reference to the reactor handle for this runtime instance.
+ ///
+ /// The returned handle reference can be cloned in order to get an owned
+ /// value of the handle. This handle can be used to initialize I/O resources
+ /// (like TCP or UDP sockets) that will not be used on the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// let reactor_handle = rt.reactor().clone();
+ ///
+ /// // use `reactor_handle`
+ /// ```
+ #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")]
+ pub fn reactor(&self) -> &Handle {
+ let mut reactor = self.inner().reactor.lock().unwrap();
+ if let Some(reactor) = reactor.take() {
+ if let Ok(background) = reactor.background() {
+ background.forget();
+ }
+ }
+
+ &self.inner().reactor_handle
+ }
+
+ /// Return a handle to the runtime's executor.
+ ///
+ /// The returned handle can be used to spawn tasks that run on this runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// let executor_handle = rt.executor();
+ ///
+ /// // use `executor_handle`
+ /// ```
+ pub fn executor(&self) -> TaskExecutor {
+ let inner = self.inner().pool.sender().clone();
+ TaskExecutor { inner }
+ }
+
+ /// Spawn a future onto the Tokio runtime.
+ ///
+ /// This spawns the given future onto the runtime's executor, usually a
+ /// thread pool. The thread pool is then responsible for polling the future
+ /// until it completes.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// [mod]: index.html
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate tokio;
+ /// # extern crate futures;
+ /// # use futures::{future, Future, Stream};
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let mut rt = Runtime::new().unwrap();
+ ///
+ /// // Spawn a future onto the runtime
+ /// rt.spawn(future::lazy(|| {
+ /// println!("now running on a worker thread");
+ /// Ok(())
+ /// }));
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Failure occurs if the executor
+ /// is currently at capacity and is unable to spawn a new future.
+ pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ where F: Future<Item = (), Error = ()> + Send + 'static,
+ {
+ self.inner_mut().pool.sender().spawn(future).unwrap();
+ self
+ }
+
+ /// Run a future to completion on the Tokio runtime.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers which
+ /// the future spawns internally will be executed on the runtime.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
+ where
+ F: Send + 'static + Future<Item = R, Error = E>,
+ R: Send + 'static,
+ E: Send + 'static,
+ {
+ let mut entered = enter().expect("nested block_on");
+ let (tx, rx) = futures::sync::oneshot::channel();
+ self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
+ entered.block_on(rx).unwrap()
+ }
+
+ /// Run a future to completion on the Tokio runtime, then wait for all
+ /// background futures to complete too.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, waiting for background futures to complete, and yielding
+ /// its resolved result. Any tasks or timers which the future spawns
+ /// internally will be executed on the runtime and waited for completion.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E>
+ where
+ F: Send + 'static + Future<Item = R, Error = E>,
+ R: Send + 'static,
+ E: Send + 'static,
+ {
+ let mut entered = enter().expect("nested block_on_all");
+ let (tx, rx) = futures::sync::oneshot::channel();
+ self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
+ let block = rx
+ .map_err(|_| unreachable!())
+ .and_then(move |r| {
+ self.shutdown_on_idle()
+ .map(move |()| r)
+ });
+ entered.block_on(block).unwrap()
+ }
+
+ /// Signals the runtime to shutdown once it becomes idle.
+ ///
+ /// Returns a future that completes once the shutdown operation has
+ /// completed.
+ ///
+ /// This function can be used to perform a graceful shutdown of the runtime.
+ ///
+ /// The runtime enters an idle state once **all** of the following occur.
+ ///
+ /// * The thread pool has no tasks to execute, i.e., all tasks that were
+ /// spawned have completed.
+ /// * The reactor is not managing any I/O resources.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::prelude::*;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// // Use the runtime...
+ ///
+ /// // Shutdown the runtime
+ /// rt.shutdown_on_idle()
+ /// .wait().unwrap();
+ /// ```
+ ///
+ /// [mod]: index.html
+ pub fn shutdown_on_idle(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ let inner = inner.pool.shutdown_on_idle();
+ Shutdown { inner }
+ }
+
+ /// Signals the runtime to shutdown immediately.
+ ///
+ /// Returns a future that completes once the shutdown operation has
+ /// completed.
+ ///
+ /// This function will forcibly shutdown the runtime, causing any
+ /// in-progress work to become canceled. The shutdown steps are:
+ ///
+ /// * Drain any scheduled work queues.
+ /// * Drop any futures that have not yet completed.
+ /// * Drop the reactor.
+ ///
+ /// Once the reactor has dropped, any outstanding I/O resources bound to
+ /// that reactor will no longer function. Calling any method on them will
+ /// result in an error.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::prelude::*;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// // Use the runtime...
+ ///
+ /// // Shutdown the runtime
+ /// rt.shutdown_now()
+ /// .wait().unwrap();
+ /// ```
+ ///
+ /// [mod]: index.html
+ pub fn shutdown_now(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ Shutdown::shutdown_now(inner)
+ }
+
+ fn inner(&self) -> &Inner {
+ self.inner.as_ref().unwrap()
+ }
+
+ fn inner_mut(&mut self) -> &mut Inner {
+ self.inner.as_mut().unwrap()
+ }
+}
+
+impl Drop for Runtime {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.take() {
+ let shutdown = Shutdown::shutdown_now(inner);
+ let _ = shutdown.wait();
+ }
+ }
+}