summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/builder.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/builder.rs')
-rw-r--r--third_party/rust/tokio-threadpool/src/builder.rs476
1 files changed, 476 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/builder.rs b/third_party/rust/tokio-threadpool/src/builder.rs
new file mode 100644
index 0000000000..b06568e6ae
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/builder.rs
@@ -0,0 +1,476 @@
+use callback::Callback;
+use config::{Config, MAX_WORKERS};
+use park::{BoxPark, BoxedPark, DefaultPark};
+use pool::{Pool, MAX_BACKUP};
+use shutdown::ShutdownTrigger;
+use thread_pool::ThreadPool;
+use worker::{self, Worker, WorkerId};
+
+use std::any::Any;
+use std::cmp::max;
+use std::error::Error;
+use std::fmt;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crossbeam_deque::Injector;
+use num_cpus;
+use tokio_executor::park::Park;
+use tokio_executor::Enter;
+
+/// Builds a thread pool with custom configuration values.
+///
+/// Methods can be chained in order to set the configuration values. The thread
+/// pool is constructed by calling [`build`].
+///
+/// New instances of `Builder` are obtained via [`Builder::new`].
+///
+/// See function level documentation for details on the various configuration
+/// settings.
+///
+/// [`build`]: #method.build
+/// [`Builder::new`]: #method.new
+///
+/// # Examples
+///
+/// ```
+/// # extern crate tokio_threadpool;
+/// # extern crate futures;
+/// # use tokio_threadpool::Builder;
+/// use futures::future::{Future, lazy};
+/// use std::time::Duration;
+///
+/// # pub fn main() {
+/// let thread_pool = Builder::new()
+/// .pool_size(4)
+/// .keep_alive(Some(Duration::from_secs(30)))
+/// .build();
+///
+/// thread_pool.spawn(lazy(|| {
+/// println!("called from a worker thread");
+/// Ok(())
+/// }));
+///
+/// // Gracefully shutdown the threadpool
+/// thread_pool.shutdown().wait().unwrap();
+/// # }
+/// ```
+pub struct Builder {
+ /// Thread pool specific configuration values
+ config: Config,
+
+ /// Number of workers to spawn
+ pool_size: usize,
+
+ /// Maximum number of futures that can be in a blocking section
+ /// concurrently.
+ max_blocking: usize,
+
+ /// Generates the `Park` instances
+ new_park: Box<dyn Fn(&WorkerId) -> BoxPark>,
+}
+
+impl Builder {
+ /// Returns a new thread pool builder initialized with default configuration
+ /// values.
+ ///
+ /// Configuration methods can be chained on the return value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// use std::time::Duration;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .pool_size(4)
+ /// .keep_alive(Some(Duration::from_secs(30)))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn new() -> Builder {
+ let num_cpus = max(1, num_cpus::get());
+
+ let new_park =
+ Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
+
+ Builder {
+ pool_size: num_cpus,
+ max_blocking: 100,
+ config: Config {
+ keep_alive: None,
+ name_prefix: None,
+ stack_size: None,
+ around_worker: None,
+ after_start: None,
+ before_stop: None,
+ panic_handler: None,
+ },
+ new_park,
+ }
+ }
+
+ /// Set the maximum number of worker threads for the thread pool instance.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .pool_size(4)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn pool_size(&mut self, val: usize) -> &mut Self {
+ assert!(val >= 1, "at least one thread required");
+ assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS);
+
+ self.pool_size = val;
+ self
+ }
+
+ /// Set the maximum number of concurrent blocking sections.
+ ///
+ /// When the maximum concurrent `blocking` calls is reached, any further
+ /// calls to `blocking` will return `NotReady` and the task is notified once
+ /// previously in-flight calls to `blocking` return.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is 100.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .max_blocking(200)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn max_blocking(&mut self, val: usize) -> &mut Self {
+ assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
+ self.max_blocking = val;
+ self
+ }
+
+ /// Set the thread keep alive duration
+ ///
+ /// If set, a thread that has completed a `blocking` call will wait for up
+ /// to the specified duration to become a worker thread again. Once the
+ /// duration elapses, the thread will shutdown.
+ ///
+ /// When the value is `None`, the thread will wait to become a worker
+ /// thread forever.
+ ///
+ /// The default value is `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// use std::time::Duration;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .keep_alive(Some(Duration::from_secs(30)))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
+ self.config.keep_alive = val;
+ self
+ }
+
+ /// Sets a callback to be triggered when a panic during a future bubbles up
+ /// to Tokio. By default Tokio catches these panics, and they will be
+ /// ignored. The parameter passed to this callback is the same error value
+ /// returned from std::panic::catch_unwind(). To abort the process on
+ /// panics, use std::panic::resume_unwind() in this callback as shown
+ /// below.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .panic_handler(|err| std::panic::resume_unwind(err))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
+ {
+ self.config.panic_handler = Some(Arc::new(f));
+ self
+ }
+
+ /// Set name prefix of threads spawned by the scheduler
+ ///
+ /// Thread name prefix is used for generating thread names. For example, if
+ /// prefix is `my-pool-`, then threads in the pool will get names like
+ /// `my-pool-1` etc.
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .name_prefix("my-pool-")
+ /// .build();
+ /// # }
+ /// ```
+ pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ self.config.name_prefix = Some(val.into());
+ self
+ }
+
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .stack_size(32 * 1024)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.config.stack_size = Some(val);
+ self
+ }
+
+ /// Execute function `f` on each worker thread.
+ ///
+ /// This function is provided a handle to the worker and is expected to call
+ /// [`Worker::run`], otherwise the worker thread will shutdown without doing
+ /// any work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .around_worker(|worker, _| {
+ /// println!("worker is starting up");
+ /// worker.run();
+ /// println!("worker is shutting down");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ ///
+ /// [`Worker::run`]: struct.Worker.html#method.run
+ pub fn around_worker<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
+ {
+ self.config.around_worker = Some(Callback::new(f));
+ self
+ }
+
+ /// Execute function `f` after each thread is started but before it starts
+ /// doing work.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .after_start(|| {
+ /// println!("thread started");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.config.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute function `f` before each thread stops.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .before_stop(|| {
+ /// println!("thread stopping");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.config.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Customize the `park` instance used by each worker thread.
+ ///
+ /// The provided closure `f` is called once per worker and returns a `Park`
+ /// instance that is used by the worker to put itself to sleep.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// # fn decorate<F>(f: F) -> F { f }
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .custom_park(|_| {
+ /// use tokio_threadpool::park::DefaultPark;
+ ///
+ /// // This is the default park type that the worker would use if we
+ /// // did not customize it.
+ /// let park = DefaultPark::new();
+ ///
+ /// // Decorate the `park` instance, allowing us to customize work
+ /// // that happens when a worker thread goes to sleep.
+ /// decorate(park)
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(&WorkerId) -> P + 'static,
+ P: Park + Send + 'static,
+ P::Error: Error,
+ {
+ self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
+
+ self
+ }
+
+ /// Create the configured `ThreadPool`.
+ ///
+ /// The returned `ThreadPool` instance is ready to spawn tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .build();
+ /// # }
+ /// ```
+ pub fn build(&self) -> ThreadPool {
+ trace!("build; num-workers={}", self.pool_size);
+
+ // Create the worker entry list
+ let workers: Arc<[worker::Entry]> = {
+ let mut workers = vec![];
+
+ for i in 0..self.pool_size {
+ let id = WorkerId::new(i);
+ let park = (self.new_park)(&id);
+ let unpark = park.unpark();
+
+ workers.push(worker::Entry::new(park, unpark));
+ }
+
+ workers.into()
+ };
+
+ let queue = Arc::new(Injector::new());
+
+ // Create a trigger that will clean up resources on shutdown.
+ //
+ // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
+ // strong references.
+ let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
+
+ // Create the pool
+ let pool = Arc::new(Pool::new(
+ workers,
+ Arc::downgrade(&trigger),
+ self.max_blocking,
+ self.config.clone(),
+ queue,
+ ));
+
+ ThreadPool::new2(pool, trigger)
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("config", &self.config)
+ .field("pool_size", &self.pool_size)
+ .field("new_park", &"Box<Fn() -> BoxPark>")
+ .finish()
+ }
+}