diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-threadpool/src | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src')
25 files changed, 5386 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/blocking/global.rs b/third_party/rust/tokio-threadpool/src/blocking/global.rs new file mode 100644 index 0000000000..c732a58335 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/blocking/global.rs @@ -0,0 +1,218 @@ +use super::{BlockingError, BlockingImpl}; +use futures::Poll; +use std::cell::Cell; +use std::fmt; +use std::marker::PhantomData; +use tokio_executor::Enter; + +thread_local! { + static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking); +} + +/// Ensures that the executor is removed from the thread-local context +/// when leaving the scope. This handles cases that involve panicking. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub struct DefaultGuard<'a> { + prior: BlockingImpl, + _lifetime: PhantomData<&'a ()>, +} + +/// Set the default blocking implementation, returning a guard that resets the +/// blocking implementation when dropped. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> { + CURRENT.with(|cell| { + let prior = cell.replace(blocking); + DefaultGuard { + prior, + _lifetime: PhantomData, + } + }) +} + +/// Set the default blocking implementation for the duration of the closure. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R +where + F: FnOnce(&mut Enter) -> R, +{ + let _guard = set_default(blocking); + f(enter) +} + +/// Enter a blocking section of code. +/// +/// The `blocking` function annotates a section of code that performs a blocking +/// operation, either by issuing a blocking syscall or by performing a long +/// running CPU-bound computation. +/// +/// When the `blocking` function enters, it hands off the responsibility of +/// processing the current work queue to another thread. Then, it calls the +/// supplied closure. The closure is permitted to block indefinitely. +/// +/// If the maximum number of concurrent `blocking` calls has been reached, then +/// `NotReady` is returned and the task is notified once existing `blocking` +/// calls complete. The maximum value is specified when creating a thread pool +/// using [`Builder::max_blocking`][build] +/// +/// NB: The entire task that called `blocking` is blocked whenever the supplied +/// closure blocks, even if you have used future combinators such as `select` - +/// the other futures in this task will not make progress until the closure +/// returns. +/// If this is not desired, ensure that `blocking` runs in its own task (e.g. +/// using `futures::sync::oneshot::spawn`). +/// +/// [build]: struct.Builder.html#method.max_blocking +/// +/// # Return +/// +/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where +/// `T` is the closure's return value. +/// +/// If the thread pool has shutdown, `Err` is returned. +/// +/// If the number of concurrent `blocking` calls has reached the maximum, +/// `Ok(NotReady)` is returned and the current task is notified when a call to +/// `blocking` will succeed. +/// +/// If `blocking` is called from outside the context of a Tokio thread pool, +/// `Err` is returned. +/// +/// # Background +/// +/// By default, the Tokio thread pool expects that tasks will only run for short +/// periods at a time before yielding back to the thread pool. This is the basic +/// premise of cooperative multitasking. +/// +/// However, it is common to want to perform a blocking operation while +/// processing an asynchronous computation. Examples of blocking operation +/// include: +/// +/// * Performing synchronous file operations (reading and writing). +/// * Blocking on acquiring a mutex. +/// * Performing a CPU bound computation, like cryptographic encryption or +/// decryption. +/// +/// One option for dealing with blocking operations in an asynchronous context +/// is to use a thread pool dedicated to performing these operations. This not +/// ideal as it requires bidirectional message passing as well as a channel to +/// communicate which adds a level of buffering. +/// +/// Instead, `blocking` hands off the responsibility of processing the work queue +/// to another thread. This hand off is light compared to a channel and does not +/// require buffering. +/// +/// # Examples +/// +/// Block on receiving a message from a `std` channel. This example is a little +/// silly as using the non-blocking channel from the `futures` crate would make +/// more sense. The blocking receive can be replaced with any blocking operation +/// that needs to be performed. +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_threadpool; +/// +/// use tokio_threadpool::{ThreadPool, blocking}; +/// +/// use futures::Future; +/// use futures::future::{lazy, poll_fn}; +/// +/// use std::sync::mpsc; +/// use std::thread; +/// use std::time::Duration; +/// +/// pub fn main() { +/// // This is a *blocking* channel +/// let (tx, rx) = mpsc::channel(); +/// +/// // Spawn a thread to send a message +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// tx.send("hello").unwrap(); +/// }); +/// +/// let pool = ThreadPool::new(); +/// +/// pool.spawn(lazy(move || { +/// // Because `blocking` returns `Poll`, it is intended to be used +/// // from the context of a `Future` implementation. Since we don't +/// // have a complicated requirement, we can use `poll_fn` in this +/// // case. +/// poll_fn(move || { +/// blocking(|| { +/// let msg = rx.recv().unwrap(); +/// println!("message = {}", msg); +/// }).map_err(|_| panic!("the threadpool shut down")) +/// }) +/// })); +/// +/// // Wait for the task we just spawned to complete. +/// pool.shutdown_on_idle().wait().unwrap(); +/// } +/// ``` +pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError> +where + F: FnOnce() -> T, +{ + CURRENT.with(|cell| { + let blocking = cell.get(); + + // Object-safety workaround: the `Blocking` trait must be object-safe, + // since we use a trait object in the thread-local. However, a blocking + // _operation_ will be generic over the return type of the blocking + // function. Therefore, rather than passing a function with a return + // type to `Blocking::run_blocking`, we pass a _new_ closure which + // doesn't have a return value. That closure invokes the blocking + // function and assigns its value to `ret`, which we then unpack when + // the blocking call finishes. + let mut f = Some(f); + let mut ret = None; + { + let ret2 = &mut ret; + let mut run = move || { + let f = f + .take() + .expect("blocking closure invoked twice; this is a bug!"); + *ret2 = Some((f)()); + }; + + try_ready!((blocking)(&mut run)); + } + + // Return the result + let ret = + ret.expect("blocking function finished, but return value was unset; this is a bug!"); + Ok(ret.into()) + }) +} + +// === impl DefaultGuard === + +impl<'a> fmt::Debug for DefaultGuard<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("DefaultGuard { .. }") + } +} + +impl<'a> Drop for DefaultGuard<'a> { + fn drop(&mut self) { + // if the TLS value has already been torn down, there's nothing else we + // can do. we're almost certainly panicking anyway. + let _ = CURRENT.try_with(|cell| { + cell.set(self.prior); + }); + } +} diff --git a/third_party/rust/tokio-threadpool/src/blocking/mod.rs b/third_party/rust/tokio-threadpool/src/blocking/mod.rs new file mode 100644 index 0000000000..27151d44b8 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/blocking/mod.rs @@ -0,0 +1,88 @@ +use worker::Worker; + +use futures::{Async, Poll}; +use tokio_executor; + +use std::error::Error; +use std::fmt; + +mod global; +pub use self::global::blocking; +#[doc(hidden)] +pub use self::global::{set_default, with_default, DefaultGuard}; + +/// Error raised by `blocking`. +pub struct BlockingError { + _p: (), +} + +/// A function implementing the behavior run on calls to `blocking`. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +#[doc(hidden)] +pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>; + +fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> { + let res = Worker::with_current(|worker| { + let worker = match worker { + Some(worker) => worker, + None => { + return Err(BlockingError::new()); + } + }; + + // Transition the worker state to blocking. This will exit the fn early + // with `NotReady` if the pool does not have enough capacity to enter + // blocking mode. + worker.transition_to_blocking() + }); + + // If the transition cannot happen, exit early + try_ready!(res); + + // Currently in blocking mode, so call the inner closure. + // + // "Exit" the current executor in case the blocking function wants + // to call a different executor. + tokio_executor::exit(move || (f)()); + + // Try to transition out of blocking mode. This is a fast path that takes + // back ownership of the worker if the worker handoff didn't complete yet. + Worker::with_current(|worker| { + // Worker must be set since it was above. + worker.unwrap().transition_from_blocking(); + }); + + Ok(Async::Ready(())) +} + +impl BlockingError { + /// Returns a new `BlockingError`. + #[doc(hidden)] + pub fn new() -> Self { + Self { _p: () } + } +} + +impl fmt::Display for BlockingError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl fmt::Debug for BlockingError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockingError") + .field("reason", &self.description()) + .finish() + } +} + +impl Error for BlockingError { + fn description(&self) -> &str { + "`blocking` annotation used from outside the context of a thread pool" + } +} diff --git a/third_party/rust/tokio-threadpool/src/builder.rs b/third_party/rust/tokio-threadpool/src/builder.rs new file mode 100644 index 0000000000..b06568e6ae --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/builder.rs @@ -0,0 +1,476 @@ +use callback::Callback; +use config::{Config, MAX_WORKERS}; +use park::{BoxPark, BoxedPark, DefaultPark}; +use pool::{Pool, MAX_BACKUP}; +use shutdown::ShutdownTrigger; +use thread_pool::ThreadPool; +use worker::{self, Worker, WorkerId}; + +use std::any::Any; +use std::cmp::max; +use std::error::Error; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +use crossbeam_deque::Injector; +use num_cpus; +use tokio_executor::park::Park; +use tokio_executor::Enter; + +/// Builds a thread pool with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The thread +/// pool is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// # extern crate tokio_threadpool; +/// # extern crate futures; +/// # use tokio_threadpool::Builder; +/// use futures::future::{Future, lazy}; +/// use std::time::Duration; +/// +/// # pub fn main() { +/// let thread_pool = Builder::new() +/// .pool_size(4) +/// .keep_alive(Some(Duration::from_secs(30))) +/// .build(); +/// +/// thread_pool.spawn(lazy(|| { +/// println!("called from a worker thread"); +/// Ok(()) +/// })); +/// +/// // Gracefully shutdown the threadpool +/// thread_pool.shutdown().wait().unwrap(); +/// # } +/// ``` +pub struct Builder { + /// Thread pool specific configuration values + config: Config, + + /// Number of workers to spawn + pool_size: usize, + + /// Maximum number of futures that can be in a blocking section + /// concurrently. + max_blocking: usize, + + /// Generates the `Park` instances + new_park: Box<dyn Fn(&WorkerId) -> BoxPark>, +} + +impl Builder { + /// Returns a new thread pool builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// use std::time::Duration; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .pool_size(4) + /// .keep_alive(Some(Duration::from_secs(30))) + /// .build(); + /// # } + /// ``` + pub fn new() -> Builder { + let num_cpus = max(1, num_cpus::get()); + + let new_park = + Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark); + + Builder { + pool_size: num_cpus, + max_blocking: 100, + config: Config { + keep_alive: None, + name_prefix: None, + stack_size: None, + around_worker: None, + after_start: None, + before_stop: None, + panic_handler: None, + }, + new_park, + } + } + + /// Set the maximum number of worker threads for the thread pool instance. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .pool_size(4) + /// .build(); + /// # } + /// ``` + pub fn pool_size(&mut self, val: usize) -> &mut Self { + assert!(val >= 1, "at least one thread required"); + assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS); + + self.pool_size = val; + self + } + + /// Set the maximum number of concurrent blocking sections. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .max_blocking(200) + /// .build(); + /// # } + /// ``` + pub fn max_blocking(&mut self, val: usize) -> &mut Self { + assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP); + self.max_blocking = val; + self + } + + /// Set the thread keep alive duration + /// + /// If set, a thread that has completed a `blocking` call will wait for up + /// to the specified duration to become a worker thread again. Once the + /// duration elapses, the thread will shutdown. + /// + /// When the value is `None`, the thread will wait to become a worker + /// thread forever. + /// + /// The default value is `None`. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// use std::time::Duration; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .keep_alive(Some(Duration::from_secs(30))) + /// .build(); + /// # } + /// ``` + pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self { + self.config.keep_alive = val; + self + } + + /// Sets a callback to be triggered when a panic during a future bubbles up + /// to Tokio. By default Tokio catches these panics, and they will be + /// ignored. The parameter passed to this callback is the same error value + /// returned from std::panic::catch_unwind(). To abort the process on + /// panics, use std::panic::resume_unwind() in this callback as shown + /// below. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .panic_handler(|err| std::panic::resume_unwind(err)) + /// .build(); + /// # } + /// ``` + pub fn panic_handler<F>(&mut self, f: F) -> &mut Self + where + F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, + { + self.config.panic_handler = Some(Arc::new(f)); + self + } + + /// Set name prefix of threads spawned by the scheduler + /// + /// Thread name prefix is used for generating thread names. For example, if + /// prefix is `my-pool-`, then threads in the pool will get names like + /// `my-pool-1` etc. + /// + /// If this configuration is not set, then the thread will use the system + /// default naming scheme. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .name_prefix("my-pool-") + /// .build(); + /// # } + /// ``` + pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.config.name_prefix = Some(val.into()); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.config.stack_size = Some(val); + self + } + + /// Execute function `f` on each worker thread. + /// + /// This function is provided a handle to the worker and is expected to call + /// [`Worker::run`], otherwise the worker thread will shutdown without doing + /// any work. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .around_worker(|worker, _| { + /// println!("worker is starting up"); + /// worker.run(); + /// println!("worker is shutting down"); + /// }) + /// .build(); + /// # } + /// ``` + /// + /// [`Worker::run`]: struct.Worker.html#method.run + pub fn around_worker<F>(&mut self, f: F) -> &mut Self + where + F: Fn(&Worker, &mut Enter) + Send + Sync + 'static, + { + self.config.around_worker = Some(Callback::new(f)); + self + } + + /// Execute function `f` after each thread is started but before it starts + /// doing work. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .after_start(|| { + /// println!("thread started"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.config.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each thread stops. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .before_stop(|| { + /// println!("thread stopping"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.config.before_stop = Some(Arc::new(f)); + self + } + + /// Customize the `park` instance used by each worker thread. + /// + /// The provided closure `f` is called once per worker and returns a `Park` + /// instance that is used by the worker to put itself to sleep. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// # fn decorate<F>(f: F) -> F { f } + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .custom_park(|_| { + /// use tokio_threadpool::park::DefaultPark; + /// + /// // This is the default park type that the worker would use if we + /// // did not customize it. + /// let park = DefaultPark::new(); + /// + /// // Decorate the `park` instance, allowing us to customize work + /// // that happens when a worker thread goes to sleep. + /// decorate(park) + /// }) + /// .build(); + /// # } + /// ``` + pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self + where + F: Fn(&WorkerId) -> P + 'static, + P: Park + Send + 'static, + P::Error: Error, + { + self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id)))); + + self + } + + /// Create the configured `ThreadPool`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// let thread_pool = Builder::new() + /// .build(); + /// # } + /// ``` + pub fn build(&self) -> ThreadPool { + trace!("build; num-workers={}", self.pool_size); + + // Create the worker entry list + let workers: Arc<[worker::Entry]> = { + let mut workers = vec![]; + + for i in 0..self.pool_size { + let id = WorkerId::new(i); + let park = (self.new_park)(&id); + let unpark = park.unpark(); + + workers.push(worker::Entry::new(park, unpark)); + } + + workers.into() + }; + + let queue = Arc::new(Injector::new()); + + // Create a trigger that will clean up resources on shutdown. + // + // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain + // strong references. + let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone())); + + // Create the pool + let pool = Arc::new(Pool::new( + workers, + Arc::downgrade(&trigger), + self.max_blocking, + self.config.clone(), + queue, + )); + + ThreadPool::new2(pool, trigger) + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Builder") + .field("config", &self.config) + .field("pool_size", &self.pool_size) + .field("new_park", &"Box<Fn() -> BoxPark>") + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/callback.rs b/third_party/rust/tokio-threadpool/src/callback.rs new file mode 100644 index 0000000000..4fea4cac8e --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/callback.rs @@ -0,0 +1,30 @@ +use worker::Worker; + +use std::fmt; +use std::sync::Arc; + +use tokio_executor::Enter; + +#[derive(Clone)] +pub(crate) struct Callback { + f: Arc<dyn Fn(&Worker, &mut Enter) + Send + Sync>, +} + +impl Callback { + pub fn new<F>(f: F) -> Self + where + F: Fn(&Worker, &mut Enter) + Send + Sync + 'static, + { + Callback { f: Arc::new(f) } + } + + pub fn call(&self, worker: &Worker, enter: &mut Enter) { + (self.f)(worker, enter) + } +} + +impl fmt::Debug for Callback { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Fn") + } +} diff --git a/third_party/rust/tokio-threadpool/src/config.rs b/third_party/rust/tokio-threadpool/src/config.rs new file mode 100644 index 0000000000..94b3c06a0b --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/config.rs @@ -0,0 +1,34 @@ +use callback::Callback; + +use std::any::Any; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +/// Thread pool specific configuration values +#[derive(Clone)] +pub(crate) struct Config { + pub keep_alive: Option<Duration>, + // Used to configure a worker thread + pub name_prefix: Option<String>, + pub stack_size: Option<usize>, + pub around_worker: Option<Callback>, + pub after_start: Option<Arc<dyn Fn() + Send + Sync>>, + pub before_stop: Option<Arc<dyn Fn() + Send + Sync>>, + pub panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>, +} + +/// Max number of workers that can be part of a pool. This is the most that can +/// fit in the scheduler state. Note, that this is the max number of **active** +/// threads. There can be more standby threads. +pub(crate) const MAX_WORKERS: usize = 1 << 15; + +impl fmt::Debug for Config { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Config") + .field("keep_alive", &self.keep_alive) + .field("name_prefix", &self.name_prefix) + .field("stack_size", &self.stack_size) + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/lib.rs b/third_party/rust/tokio-threadpool/src/lib.rs new file mode 100644 index 0000000000..a879af1771 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/lib.rs @@ -0,0 +1,164 @@ +#![doc(html_root_url = "https://docs.rs/tokio-threadpool/0.1.17")] +#![deny(missing_docs, missing_debug_implementations)] + +//! A work-stealing based thread pool for executing futures. +//! +//! The Tokio thread pool supports scheduling futures and processing them on +//! multiple CPU cores. It is optimized for the primary Tokio use case of many +//! independent tasks with limited computation and with most tasks waiting on +//! I/O. Usually, users will not create a `ThreadPool` instance directly, but +//! will use one via a [`runtime`]. +//! +//! The `ThreadPool` structure manages two sets of threads: +//! +//! * Worker threads. +//! * Backup threads. +//! +//! Worker threads are used to schedule futures using a work-stealing strategy. +//! Backup threads, on the other hand, are intended only to support the +//! `blocking` API. Threads will transition between the two sets. +//! +//! The advantage of the work-stealing strategy is minimal cross-thread +//! coordination. The thread pool attempts to make as much progress as possible +//! without communicating across threads. +//! +//! ## Worker overview +//! +//! Each worker has two queues: a deque and a mpsc channel. The deque is the +//! primary queue for tasks that are scheduled to run on the worker thread. Tasks +//! can only be pushed onto the deque by the worker, but other workers may +//! "steal" from that deque. The mpsc channel is used to submit futures while +//! external to the pool. +//! +//! As long as the thread pool has not been shutdown, a worker will run in a +//! loop. Each loop, it consumes all tasks on its mpsc channel and pushes it onto +//! the deque. It then pops tasks off of the deque and executes them. +//! +//! If a worker has no work, i.e., both queues are empty. It attempts to steal. +//! To do this, it randomly scans other workers' deques and tries to pop a task. +//! If it finds no work to steal, the thread goes to sleep. +//! +//! When the worker detects that the pool has been shut down, it exits the loop, +//! cleans up its state, and shuts the thread down. +//! +//! ## Thread pool initialization +//! +//! Note, users normally will use the threadpool created by a [`runtime`]. +//! +//! By default, no threads are spawned on creation. Instead, when new futures are +//! spawned, the pool first checks if there are enough active worker threads. If +//! not, a new worker thread is spawned. +//! +//! ## Spawning futures +//! +//! The spawning behavior depends on whether a future was spawned from within a +//! worker or thread or if it was spawned from an external handle. +//! +//! When spawning a future while external to the thread pool, the current +//! strategy is to randomly pick a worker to submit the task to. The task is then +//! pushed onto that worker's mpsc channel. +//! +//! When spawning a future while on a worker thread, the task is pushed onto the +//! back of the current worker's deque. +//! +//! ## Blocking annotation strategy +//! +//! The [`blocking`] function is used to annotate a section of code that +//! performs a blocking operation, either by issuing a blocking syscall or +//! performing any long running CPU-bound computation. +//! +//! The strategy for handling blocking closures is to hand off the worker to a +//! new thread. This implies handing off the `deque` and `mpsc`. Once this is +//! done, the new thread continues to process the work queue and the original +//! thread is able to block. Once it finishes processing the blocking future, the +//! thread has no additional work and is inserted into the backup pool. This +//! makes it available to other workers that encounter a [`blocking`] call. +//! +//! [`blocking`]: fn.blocking.html +//! [`runtime`]: https://docs.rs/tokio/0.1/tokio/runtime/ + +extern crate tokio_executor; + +extern crate crossbeam_deque; +extern crate crossbeam_queue; +extern crate crossbeam_utils; +#[macro_use] +extern crate futures; +#[macro_use] +extern crate lazy_static; +extern crate num_cpus; +extern crate slab; + +#[macro_use] +extern crate log; + +// ## Crate layout +// +// The primary type, `Pool`, holds the majority of a thread pool's state, +// including the state for each worker. Each worker's state is maintained in an +// instance of `worker::Entry`. +// +// `Worker` contains the logic that runs on each worker thread. It holds an +// `Arc` to `Pool` and is able to access its state from `Pool`. +// +// `Task` is a harness around an individual future. It manages polling and +// scheduling that future. +// +// ## Sleeping workers +// +// Sleeping workers are tracked using a [Treiber stack]. This results in the +// thread that most recently went to sleep getting woken up first. When the pool +// is not under load, this helps threads shutdown faster. +// +// Sleeping is done by using `tokio_executor::Park` implementations. This allows +// the user of the thread pool to customize the work that is performed to sleep. +// This is how injecting timers and other functionality into the thread pool is +// done. +// +// ## Notifying workers +// +// When there is work to be done, workers must be notified. However, notifying a +// worker requires cross thread coordination. Ideally, a worker would only be +// notified when it is sleeping, but there is no way to know if a worker is +// sleeping without cross thread communication. +// +// The two cases when a worker might need to be notified are: +// +// 1. A task is externally submitted to a worker via the mpsc channel. +// 2. A worker has a back log of work and needs other workers to steal from it. +// +// In the first case, the worker will always be notified. However, it could be +// possible to avoid the notification if the mpsc channel has two or greater +// number of tasks *after* the task is submitted. In this case, we are able to +// assume that the worker has previously been notified. +// +// The second case is trickier. Currently, whenever a worker spawns a new future +// (pushing it onto its deque) and when it pops a future from its mpsc, it tries +// to notify a sleeping worker to wake up and start stealing. This is a lot of +// notification and it **might** be possible to reduce it. +// +// Also, whenever a worker is woken up via a signal and it does find work, it, +// in turn, will try to wake up a new worker. +// +// [Treiber stack]: https://en.wikipedia.org/wiki/Treiber_Stack + +#[doc(hidden)] +pub mod blocking; +mod builder; +mod callback; +mod config; +mod notifier; +pub mod park; +mod pool; +mod sender; +mod shutdown; +mod task; +mod thread_pool; +mod worker; + +pub use blocking::{blocking, BlockingError}; +pub use builder::Builder; +pub use sender::Sender; +pub use shutdown::Shutdown; +pub use thread_pool::{SpawnHandle, ThreadPool}; +pub use worker::{Worker, WorkerId}; diff --git a/third_party/rust/tokio-threadpool/src/notifier.rs b/third_party/rust/tokio-threadpool/src/notifier.rs new file mode 100644 index 0000000000..fe8e8f4ad0 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/notifier.rs @@ -0,0 +1,92 @@ +use pool::Pool; +use task::Task; + +use std::mem; +use std::ops; +use std::sync::Arc; + +use futures::executor::Notify; + +/// Implements the future `Notify` API. +/// +/// This is how external events are able to signal the task, informing it to try +/// to poll the future again. +#[derive(Debug)] +pub(crate) struct Notifier { + pub pool: Arc<Pool>, +} + +/// A guard that ensures that the inner value gets forgotten. +#[derive(Debug)] +struct Forget<T>(Option<T>); + +impl Notify for Notifier { + fn notify(&self, id: usize) { + trace!("Notifier::notify; id=0x{:x}", id); + + unsafe { + let ptr = id as *const Task; + + // We did not actually take ownership of the `Arc` in this function + // so we must ensure that the Arc is forgotten. + let task = Forget::new(Arc::from_raw(ptr)); + + // TODO: Unify this with Task::notify + if task.schedule() { + // TODO: Check if the pool is still running + // + // Bump the ref count + let task = task.clone(); + + let _ = self.pool.submit(task, &self.pool); + } + } + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + + // This function doesn't actually get a strong ref to the task here. + // However, the only method we have to convert a raw pointer -> &Arc<T> + // is to call `Arc::from_raw` which returns a strong ref. So, to + // maintain the invariants, `t1` has to be forgotten. This prevents the + // ref count from being decremented. + let t1 = Forget::new(unsafe { Arc::from_raw(ptr) }); + + // The clone is forgotten so that the fn exits without decrementing the ref + // count. The caller of `clone_id` ensures that `drop_id` is called when + // the ref count needs to be decremented. + let _ = Forget::new(t1.clone()); + + id + } + + fn drop_id(&self, id: usize) { + unsafe { + let ptr = id as *const Task; + let _ = Arc::from_raw(ptr); + } + } +} + +// ===== impl Forget ===== + +impl<T> Forget<T> { + fn new(t: T) -> Self { + Forget(Some(t)) + } +} + +impl<T> ops::Deref for Forget<T> { + type Target = T; + + fn deref(&self) -> &T { + self.0.as_ref().unwrap() + } +} + +impl<T> Drop for Forget<T> { + fn drop(&mut self) { + mem::forget(self.0.take()); + } +} diff --git a/third_party/rust/tokio-threadpool/src/park/boxed.rs b/third_party/rust/tokio-threadpool/src/park/boxed.rs new file mode 100644 index 0000000000..030866450d --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/park/boxed.rs @@ -0,0 +1,45 @@ +use tokio_executor::park::{Park, Unpark}; + +use std::error::Error; +use std::time::Duration; + +pub(crate) type BoxPark = Box<dyn Park<Unpark = BoxUnpark, Error = ()> + Send>; +pub(crate) type BoxUnpark = Box<dyn Unpark>; + +pub(crate) struct BoxedPark<T>(T); + +impl<T> BoxedPark<T> { + pub fn new(inner: T) -> Self { + BoxedPark(inner) + } +} + +impl<T: Park + Send> Park for BoxedPark<T> +where + T::Error: Error, +{ + type Unpark = BoxUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + Box::new(self.0.unpark()) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.0.park().map_err(|e| { + warn!( + "calling `park` on worker thread errored -- shutting down thread: {}", + e + ); + }) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.0.park_timeout(duration).map_err(|e| { + warn!( + "calling `park` on worker thread errored -- shutting down thread: {}", + e + ); + }) + } +} diff --git a/third_party/rust/tokio-threadpool/src/park/default_park.rs b/third_party/rust/tokio-threadpool/src/park/default_park.rs new file mode 100644 index 0000000000..ecc22350f0 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/park/default_park.rs @@ -0,0 +1,97 @@ +use tokio_executor::park::{Park, Unpark}; + +use std::error::Error; +use std::fmt; +use std::time::Duration; + +use crossbeam_utils::sync::{Parker, Unparker}; + +/// Parks the thread. +#[derive(Debug)] +pub struct DefaultPark { + inner: Parker, +} + +/// Unparks threads that were parked by `DefaultPark`. +#[derive(Debug)] +pub struct DefaultUnpark { + inner: Unparker, +} + +/// Error returned by [`ParkThread`] +/// +/// This currently is never returned, but might at some point in the future. +/// +/// [`ParkThread`]: struct.ParkThread.html +#[derive(Debug)] +pub struct ParkError { + _p: (), +} + +// ===== impl DefaultPark ===== + +impl DefaultPark { + /// Creates a new `DefaultPark` instance. + pub fn new() -> DefaultPark { + DefaultPark { + inner: Parker::new(), + } + } + + /// Unpark the thread without having to clone the unpark handle. + /// + /// Named `notify` to avoid conflicting with the `unpark` fn. + pub(crate) fn notify(&self) { + self.inner.unparker().unpark(); + } + + pub(crate) fn park_sync(&self, duration: Option<Duration>) { + match duration { + None => self.inner.park(), + Some(duration) => self.inner.park_timeout(duration), + } + } +} + +impl Park for DefaultPark { + type Unpark = DefaultUnpark; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + DefaultUnpark { + inner: self.inner.unparker().clone(), + } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration); + Ok(()) + } +} + +// ===== impl DefaultUnpark ===== + +impl Unpark for DefaultUnpark { + fn unpark(&self) { + self.inner.unpark(); + } +} + +// ===== impl ParkError ===== + +impl fmt::Display for ParkError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + self.description().fmt(fmt) + } +} + +impl Error for ParkError { + fn description(&self) -> &str { + "unknown park error" + } +} diff --git a/third_party/rust/tokio-threadpool/src/park/mod.rs b/third_party/rust/tokio-threadpool/src/park/mod.rs new file mode 100644 index 0000000000..e7c5f40d36 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/park/mod.rs @@ -0,0 +1,8 @@ +//! Thread parking utilities. + +mod boxed; +mod default_park; + +pub use self::default_park::{DefaultPark, DefaultUnpark, ParkError}; + +pub(crate) use self::boxed::{BoxPark, BoxUnpark, BoxedPark}; diff --git a/third_party/rust/tokio-threadpool/src/pool/backup.rs b/third_party/rust/tokio-threadpool/src/pool/backup.rs new file mode 100644 index 0000000000..e94e95d6f2 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/backup.rs @@ -0,0 +1,308 @@ +use park::DefaultPark; +use worker::WorkerId; + +use std::cell::UnsafeCell; +use std::fmt; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed}; +use std::time::{Duration, Instant}; + +/// State associated with a thread in the thread pool. +/// +/// The pool manages a number of threads. Some of those threads are considered +/// "primary" threads and process the work queue. When a task being run on a +/// primary thread enters a blocking context, the responsibility of processing +/// the work queue must be handed off to another thread. This is done by first +/// checking for idle threads on the backup stack. If one is found, the worker +/// token (`WorkerId`) is handed off to that running thread. If none are found, +/// a new thread is spawned. +/// +/// This state manages the exchange. A thread that is idle, not assigned to a +/// work queue, sits around for a specified amount of time. When the worker +/// token is handed off, it is first stored in `handoff`. The backup thread is +/// then signaled. At this point, the backup thread wakes up from sleep and +/// reads `handoff`. At that point, it has been promoted to a primary thread and +/// will begin processing inbound work on the work queue. +/// +/// The name `Backup` isn't really great for what the type does, but I have not +/// come up with a better name... Maybe it should just be named `Thread`. +#[derive(Debug)] +pub(crate) struct Backup { + /// Worker ID that is being handed to this thread. + handoff: UnsafeCell<Option<WorkerId>>, + + /// Thread state. + /// + /// This tracks: + /// + /// * Is queued flag + /// * If the pool is shutting down. + /// * If the thread is running + state: AtomicUsize, + + /// Next entry in the Treiber stack. + next_sleeper: UnsafeCell<BackupId>, + + /// Used to put the thread to sleep + park: DefaultPark, +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub(crate) struct BackupId(pub(crate) usize); + +#[derive(Debug)] +pub(crate) enum Handoff { + Worker(WorkerId), + Idle, + Terminated, +} + +/// Tracks thread state. +#[derive(Clone, Copy, Eq, PartialEq)] +struct State(usize); + +/// Set when the worker is pushed onto the scheduler's stack of sleeping +/// threads. +/// +/// This flag also serves as a "notification" bit. If another thread is +/// attempting to hand off a worker to the backup thread, then the pushed bit +/// will not be set when the thread tries to shutdown. +pub const PUSHED: usize = 0b001; + +/// Set when the thread is running +pub const RUNNING: usize = 0b010; + +/// Set when the thread pool has terminated +pub const TERMINATED: usize = 0b100; + +// ===== impl Backup ===== + +impl Backup { + pub fn new() -> Backup { + Backup { + handoff: UnsafeCell::new(None), + state: AtomicUsize::new(State::new().into()), + next_sleeper: UnsafeCell::new(BackupId(0)), + park: DefaultPark::new(), + } + } + + /// Called when the thread is starting + pub fn start(&self, worker_id: &WorkerId) { + debug_assert!({ + let state: State = self.state.load(Relaxed).into(); + + debug_assert!(!state.is_pushed()); + debug_assert!(state.is_running()); + debug_assert!(!state.is_terminated()); + + true + }); + + // The handoff value is equal to `worker_id` + debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); + + unsafe { + *self.handoff.get() = None; + } + } + + pub fn is_running(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_running() + } + + /// Hands off the worker to a thread. + /// + /// Returns `true` if the thread needs to be spawned. + pub fn worker_handoff(&self, worker_id: WorkerId) -> bool { + unsafe { + // The backup worker should not already have been handoff a worker. + debug_assert!((*self.handoff.get()).is_none()); + + // Set the handoff + *self.handoff.get() = Some(worker_id); + } + + // This *probably* can just be `Release`... memory orderings, how do + // they work? + let prev = State::worker_handoff(&self.state); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + // Wakeup the backup thread + self.park.notify(); + false + } else { + true + } + } + + /// Terminate the worker + pub fn signal_stop(&self) { + let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into(); + + debug_assert!(!prev.is_terminated()); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + self.park.notify(); + } + } + + /// Release the worker + pub fn release(&self) { + let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into(); + + debug_assert!(prev.is_running()); + } + + /// Wait for a worker handoff + pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff { + let sleep_until = timeout.map(|dur| Instant::now() + dur); + let mut state: State = self.state.load(Acquire).into(); + + // Run in a loop since there can be spurious wakeups + loop { + if !state.is_pushed() { + if state.is_terminated() { + return Handoff::Terminated; + } + + let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") }; + return Handoff::Worker(worker_id); + } + + match sleep_until { + None => { + self.park.park_sync(None); + state = self.state.load(Acquire).into(); + } + Some(when) => { + let now = Instant::now(); + + if now < when { + self.park.park_sync(Some(when - now)); + state = self.state.load(Acquire).into(); + } else { + debug_assert!(state.is_running()); + + // Transition out of running + let mut next = state; + next.unset_running(); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + debug_assert!(!next.is_running()); + return Handoff::Idle; + } + + state = actual; + } + } + } + } + } + + pub fn is_pushed(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_pushed() + } + + pub fn set_pushed(&self, ordering: Ordering) { + let prev: State = self.state.fetch_or(PUSHED, ordering).into(); + debug_assert!(!prev.is_pushed()); + } + + #[inline] + pub fn next_sleeper(&self) -> BackupId { + unsafe { *self.next_sleeper.get() } + } + + #[inline] + pub fn set_next_sleeper(&self, val: BackupId) { + unsafe { + *self.next_sleeper.get() = val; + } + } +} + +// ===== impl State ===== + +impl State { + /// Returns a new, default, thread `State` + pub fn new() -> State { + State(0) + } + + /// Returns true if the thread entry is pushed in the sleeper stack + pub fn is_pushed(&self) -> bool { + self.0 & PUSHED == PUSHED + } + + fn unset_pushed(&mut self) { + self.0 &= !PUSHED; + } + + pub fn is_running(&self) -> bool { + self.0 & RUNNING == RUNNING + } + + pub fn set_running(&mut self) { + self.0 |= RUNNING; + } + + pub fn unset_running(&mut self) { + self.0 &= !RUNNING; + } + + pub fn is_terminated(&self) -> bool { + self.0 & TERMINATED == TERMINATED + } + + fn worker_handoff(state: &AtomicUsize) -> State { + let mut curr: State = state.load(Acquire).into(); + + loop { + let mut next = curr; + next.set_running(); + next.unset_pushed(); + + let actual = state + .compare_and_swap(curr.into(), next.into(), AcqRel) + .into(); + + if actual == curr { + return curr; + } + + curr = actual; + } + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("backup::State") + .field("is_pushed", &self.is_pushed()) + .field("is_running", &self.is_running()) + .field("is_terminated", &self.is_terminated()) + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs new file mode 100644 index 0000000000..b9a46d08ef --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs @@ -0,0 +1,191 @@ +use pool::{Backup, BackupId}; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; + +#[derive(Debug)] +pub(crate) struct BackupStack { + state: AtomicUsize, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +struct State(usize); + +pub(crate) const MAX_BACKUP: usize = 1 << 15; + +/// Extracts the head of the backup stack from the state +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP); + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1); + +/// How many bits the Treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl BackupStack ===== + +impl BackupStack { + pub fn new() -> BackupStack { + let state = AtomicUsize::new(State::new().into()); + BackupStack { state } + } + + /// Push a backup thread onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// When terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + entries[id.0].set_pushed(AcqRel); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[id.0].set_next_sleeper(head); + next.set_head(id); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + /// Pop a backup thread off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further entries can be pushed onto the stack. + /// + /// # Return + /// + /// * Returns the index of the popped worker and the worker's observed + /// state. + /// + /// * `Ok(None)` if the stack is empty. + /// * `Err(_)` is returned if the pool has been shutdown. + pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return Ok(None); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual != state { + state = actual; + continue; + } + + return Ok(None); + } else if head == TERMINATED { + return Err(()); + } + + debug_assert!(head.0 < MAX_BACKUP); + + let mut next = state; + + let next_head = entries[head.0].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + debug_assert!(entries[head.0].is_pushed()); + return Ok(Some(head)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + fn new() -> State { + State(EMPTY.0) + } + + fn head(&self) -> BackupId { + BackupId(self.0 & STACK_MASK) + } + + fn set_head(&mut self, val: BackupId) { + let val = val.0; + + // The ABA guard protects against the ABA problem w/ Treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/mod.rs b/third_party/rust/tokio-threadpool/src/pool/mod.rs new file mode 100644 index 0000000000..0a42359b3c --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/mod.rs @@ -0,0 +1,475 @@ +mod backup; +mod backup_stack; +mod state; + +pub(crate) use self::backup::{Backup, BackupId}; +pub(crate) use self::backup_stack::MAX_BACKUP; +pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES}; + +use self::backup::Handoff; +use self::backup_stack::BackupStack; + +use config::Config; +use shutdown::ShutdownTrigger; +use task::{Blocking, Task}; +use worker::{self, Worker, WorkerId}; + +use futures::Poll; + +use std::cell::Cell; +use std::collections::hash_map::RandomState; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::num::Wrapping; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::{Arc, Weak}; +use std::thread; + +use crossbeam_deque::Injector; +use crossbeam_utils::CachePadded; + +#[derive(Debug)] +pub(crate) struct Pool { + // Tracks the state of the thread pool (running, shutting down, ...). + // + // While workers check this field as a hint to detect shutdown, it is + // **not** used as a primary point of coordination for workers. The sleep + // stack is used as the primary point of coordination for workers. + // + // The value of this atomic is deserialized into a `pool::State` instance. + // See comments for that type. + pub state: CachePadded<AtomicUsize>, + + // Stack tracking sleeping workers. + sleep_stack: CachePadded<worker::Stack>, + + // Worker state + // + // A worker is a thread that is processing the work queue and polling + // futures. + // + // The number of workers will *usually* be small. + pub workers: Arc<[worker::Entry]>, + + // The global MPMC queue of tasks. + // + // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated + // task queues, they periodically steal tasks from this global queue, too. + pub queue: Arc<Injector<Arc<Task>>>, + + // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped. + // + // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new + // thread. + pub trigger: Weak<ShutdownTrigger>, + + // Backup thread state + // + // In order to efficiently support `blocking`, a pool of backup threads is + // needed. These backup threads are ready to take over a worker if the + // future being processed requires blocking. + backup: Box<[Backup]>, + + // Stack of sleeping backup threads + pub backup_stack: BackupStack, + + // State regarding coordinating blocking sections and tracking tasks that + // are pending blocking capacity. + blocking: Blocking, + + // Configuration + pub config: Config, +} + +impl Pool { + /// Create a new `Pool` + pub fn new( + workers: Arc<[worker::Entry]>, + trigger: Weak<ShutdownTrigger>, + max_blocking: usize, + config: Config, + queue: Arc<Injector<Arc<Task>>>, + ) -> Pool { + let pool_size = workers.len(); + let total_size = max_blocking + pool_size; + + // Create the set of backup entries + // + // This is `backup + pool_size` because the core thread pool running the + // workers is spawned from backup as well. + let backup = (0..total_size) + .map(|_| Backup::new()) + .collect::<Vec<_>>() + .into_boxed_slice(); + + let backup_stack = BackupStack::new(); + + for i in (0..backup.len()).rev() { + backup_stack.push(&backup, BackupId(i)).unwrap(); + } + + // Initialize the blocking state + let blocking = Blocking::new(max_blocking); + + let ret = Pool { + state: CachePadded::new(AtomicUsize::new(State::new().into())), + sleep_stack: CachePadded::new(worker::Stack::new()), + workers, + queue, + trigger, + backup, + backup_stack, + blocking, + config, + }; + + // Now, we prime the sleeper stack + for i in 0..pool_size { + ret.sleep_stack.push(&ret.workers, i).unwrap(); + } + + ret + } + + /// Start shutting down the pool. This means that no new futures will be + /// accepted. + pub fn shutdown(&self, now: bool, purge_queue: bool) { + let mut state: State = self.state.load(Acquire).into(); + + trace!("shutdown; state={:?}", state); + + // For now, this must be true + debug_assert!(!purge_queue || now); + + // Start by setting the shutdown flag + loop { + let mut next = state; + + let num_futures = next.num_futures(); + + if next.lifecycle() == Lifecycle::ShutdownNow { + // Already transitioned to shutting down state + + if !purge_queue || num_futures == 0 { + // Nothing more to do + return; + } + + // The queue must be purged + debug_assert!(purge_queue); + next.clear_num_futures(); + } else { + next.set_lifecycle(if now || num_futures == 0 { + // If already idle, always transition to shutdown now. + Lifecycle::ShutdownNow + } else { + Lifecycle::ShutdownOnIdle + }); + + if purge_queue { + next.clear_num_futures(); + } + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + state = next; + break; + } + + state = actual; + } + + trace!(" -> transitioned to shutdown"); + + // Only transition to terminate if there are no futures currently on the + // pool + if state.num_futures() != 0 { + return; + } + + self.terminate_sleeping_workers(); + } + + /// Called by `Worker` as it tries to enter a sleeping state. Before it + /// sleeps, it must push itself onto the sleep stack. This enables other + /// threads to see it when signaling work. + pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> { + self.sleep_stack.push(&self.workers, idx) + } + + pub fn terminate_sleeping_workers(&self) { + use worker::Lifecycle::Signaled; + + trace!(" -> shutting down workers"); + // Wakeup all sleeping workers. They will wake up, see the state + // transition, and terminate. + while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) { + self.workers[idx].signal_stop(worker_state); + } + + // Now terminate any backup threads + // + // The call to `pop` must be successful because shutting down the pool + // is coordinated and at this point, this is the only thread that will + // attempt to transition the backup stack to "terminated". + while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) { + self.backup[backup_id.0].signal_stop(); + } + } + + pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> { + self.blocking.poll_blocking_capacity(task) + } + + /// Submit a task to the scheduler. + /// + /// Called from either inside or outside of the scheduler. If currently on + /// the scheduler, then a fast path is taken. + pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + Worker::with_current(|worker| { + if let Some(worker) = worker { + // If the worker is in blocking mode, then even though the + // thread-local variable is set, the current thread does not + // have ownership of that worker entry. This is because the + // worker entry has already been handed off to another thread. + // + // The second check handles the case where the current thread is + // part of a different threadpool than the one being submitted + // to. + if !worker.is_blocking() && *self == *worker.pool { + let idx = worker.id.0; + + trace!(" -> submit internal; idx={}", idx); + + worker.pool.workers[idx].submit_internal(task); + worker.pool.signal_work(pool); + return; + } + } + + self.submit_external(task, pool); + }); + } + + /// Submit a task to the scheduler from off worker + /// + /// Called from outside of the scheduler, this function is how new tasks + /// enter the system. + pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + trace!(" -> submit external"); + + self.queue.push(task); + self.signal_work(pool); + } + + pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> { + // First update the state, this cannot fail because the caller must have + // exclusive access to the backup token. + self.backup[backup_id.0].release(); + + // Push the backup entry back on the stack + self.backup_stack.push(&self.backup, backup_id) + } + + pub fn notify_blocking_task(&self, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + self.blocking.notify_task(&pool); + } + + /// Provision a thread to run a worker + pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + let backup_id = match self.backup_stack.pop(&self.backup, false) { + Ok(Some(backup_id)) => backup_id, + Ok(None) => panic!("no thread available"), + Err(_) => { + debug!("failed to spawn worker thread due to the thread pool shutting down"); + return; + } + }; + + let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone()); + + if !need_spawn { + return; + } + + let trigger = match self.trigger.upgrade() { + None => { + // The pool is shutting down. + return; + } + Some(t) => t, + }; + + let mut th = thread::Builder::new(); + + if let Some(ref prefix) = pool.config.name_prefix { + th = th.name(format!("{}{}", prefix, backup_id.0)); + } + + if let Some(stack) = pool.config.stack_size { + th = th.stack_size(stack); + } + + let pool = pool.clone(); + + let res = th.spawn(move || { + if let Some(ref f) = pool.config.after_start { + f(); + } + + let mut worker_id = id; + + pool.backup[backup_id.0].start(&worker_id); + + loop { + // The backup token should be in the running state. + debug_assert!(pool.backup[backup_id.0].is_running()); + + // TODO: Avoid always cloning + let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone()); + + // Run the worker. If the worker transitioned to a "blocking" + // state, then `is_blocking` will be true. + if !worker.do_run() { + // The worker shutdown, so exit the thread. + break; + } + + debug_assert!(!pool.backup[backup_id.0].is_pushed()); + + // Push the thread back onto the backup stack. This makes it + // available for future handoffs. + // + // This **must** happen before notifying the task. + let res = pool.backup_stack.push(&pool.backup, backup_id); + + if res.is_err() { + // The pool is being shutdown. + break; + } + + // The task switched the current thread to blocking mode. + // Now that the blocking task completed, any tasks + pool.notify_blocking_task(&pool); + + debug_assert!(pool.backup[backup_id.0].is_running()); + + // Wait for a handoff + let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive); + + match handoff { + Handoff::Worker(id) => { + debug_assert!(pool.backup[backup_id.0].is_running()); + worker_id = id; + } + Handoff::Idle | Handoff::Terminated => { + break; + } + } + } + + if let Some(ref f) = pool.config.before_stop { + f(); + } + }); + + if let Err(e) = res { + error!("failed to spawn worker thread; err={:?}", e); + panic!("failed to spawn worker thread: {:?}", e); + } + } + + /// If there are any other workers currently relaxing, signal them that work + /// is available so that they can try to find more work to process. + pub fn signal_work(&self, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + use worker::Lifecycle::Signaled; + + if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { + let entry = &self.workers[idx]; + + debug_assert!( + worker_state.lifecycle() != Signaled, + "actual={:?}", + worker_state.lifecycle(), + ); + + trace!("signal_work -- notify; idx={}", idx); + + if !entry.notify(worker_state) { + trace!("signal_work -- spawn; idx={}", idx); + self.spawn_thread(WorkerId(idx), pool); + } + } + } + + /// Generates a random number + /// + /// Uses a thread-local random number generator based on XorShift. + pub fn rand_usize(&self) -> usize { + thread_local! { + static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed())); + } + + RNG.with(|rng| { + // This is the 32-bit variant of Xorshift. + // https://en.wikipedia.org/wiki/Xorshift + let mut x = rng.get(); + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng.set(x); + x.0 as usize + }) + } +} + +impl PartialEq for Pool { + fn eq(&self, other: &Pool) -> bool { + self as *const _ == other as *const _ + } +} + +unsafe impl Send for Pool {} +unsafe impl Sync for Pool {} + +// Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit +// PRNG. This uses one libstd RandomState for a default hasher and hashes on +// the current thread ID to obtain an unpredictable, collision resistant seed. +fn prng_seed() -> u32 { + // This obtains a small number of random bytes from the host system (for + // example, on unix via getrandom(2)) in order to seed an unpredictable and + // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with + // 128-bit state). We only need one of these, to make the seeds for all + // process threads different via hashed IDs, collision resistant, and + // unpredictable. + lazy_static! { + static ref RND_STATE: RandomState = RandomState::new(); + } + + // Hash the current thread ID to produce a u32 value + let mut hasher = RND_STATE.build_hasher(); + thread::current().id().hash(&mut hasher); + let hash: u64 = hasher.finish(); + let seed = (hash as u32) ^ ((hash >> 32) as u32); + + // Ensure non-zero seed (Xorshift yields only zero's for that seed) + if seed == 0 { + 0x9b4e_6d25 // misc bits, could be any non-zero + } else { + seed + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/state.rs b/third_party/rust/tokio-threadpool/src/pool/state.rs new file mode 100644 index 0000000000..5ecb514e5c --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/state.rs @@ -0,0 +1,132 @@ +use std::{fmt, usize}; + +/// ThreadPool state. +/// +/// The two least significant bits are the shutdown flags. (0 for active, 1 for +/// shutdown on idle, 2 for shutting down). The remaining bits represent the +/// number of futures that still need to complete. +#[derive(Eq, PartialEq, Clone, Copy)] +pub(crate) struct State(usize); + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] +#[repr(usize)] +pub(crate) enum Lifecycle { + /// The thread pool is currently running + Running = 0, + + /// The thread pool should shutdown once it reaches an idle state. + ShutdownOnIdle = 1, + + /// The thread pool should start the process of shutting down. + ShutdownNow = 2, +} + +/// Mask used to extract the number of futures from the state +const LIFECYCLE_MASK: usize = 0b11; +const NUM_FUTURES_MASK: usize = !LIFECYCLE_MASK; +const NUM_FUTURES_OFFSET: usize = 2; + +/// Max number of futures the pool can handle. +pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET; + +// ===== impl State ===== + +impl State { + #[inline] + pub fn new() -> State { + State(0) + } + + /// Returns the number of futures still pending completion. + pub fn num_futures(&self) -> usize { + self.0 >> NUM_FUTURES_OFFSET + } + + /// Increment the number of futures pending completion. + /// + /// Returns false on failure. + pub fn inc_num_futures(&mut self) { + debug_assert!(self.num_futures() < MAX_FUTURES); + debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow); + + self.0 += 1 << NUM_FUTURES_OFFSET; + } + + /// Decrement the number of futures pending completion. + pub fn dec_num_futures(&mut self) { + let num_futures = self.num_futures(); + + if num_futures == 0 { + // Already zero + return; + } + + self.0 -= 1 << NUM_FUTURES_OFFSET; + + if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 { + self.set_lifecycle(Lifecycle::ShutdownNow); + } + } + + /// Set the number of futures pending completion to zero + pub fn clear_num_futures(&mut self) { + self.0 = self.0 & LIFECYCLE_MASK; + } + + pub fn lifecycle(&self) -> Lifecycle { + (self.0 & LIFECYCLE_MASK).into() + } + + pub fn set_lifecycle(&mut self, val: Lifecycle) { + self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize); + } + + pub fn is_terminated(&self) -> bool { + self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0 + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("pool::State") + .field("lifecycle", &self.lifecycle()) + .field("num_futures", &self.num_futures()) + .finish() + } +} + +// ===== impl Lifecycle ===== + +impl From<usize> for Lifecycle { + fn from(src: usize) -> Lifecycle { + use self::Lifecycle::*; + + debug_assert!( + src == Running as usize + || src == ShutdownOnIdle as usize + || src == ShutdownNow as usize + ); + + unsafe { ::std::mem::transmute(src) } + } +} + +impl From<Lifecycle> for usize { + fn from(src: Lifecycle) -> usize { + let v = src as usize; + debug_assert!(v & LIFECYCLE_MASK == v); + v + } +} diff --git a/third_party/rust/tokio-threadpool/src/sender.rs b/third_party/rust/tokio-threadpool/src/sender.rs new file mode 100644 index 0000000000..3ae3deca4d --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/sender.rs @@ -0,0 +1,218 @@ +use pool::{self, Lifecycle, Pool, MAX_FUTURES}; +use task::Task; + +use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::Arc; + +use futures::{future, Future}; +use tokio_executor::{self, SpawnError}; + +/// Submit futures to the associated thread pool for execution. +/// +/// A `Sender` instance is a handle to a single thread pool, allowing the owner +/// of the handle to spawn futures onto the thread pool. New futures are spawned +/// using [`Sender::spawn`]. +/// +/// The `Sender` handle is *only* used for spawning new futures. It does not +/// impact the lifecycle of the thread pool in any way. +/// +/// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The +/// `Sender` struct implements the `Executor` trait. +/// +/// [`Sender::spawn`]: #method.spawn +/// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender +#[derive(Debug)] +pub struct Sender { + pub(crate) pool: Arc<Pool>, +} + +impl Sender { + /// Spawn a future onto the thread pool + /// + /// This function takes ownership of the future and spawns it onto the + /// thread pool, assigning it to a worker thread. The exact strategy used to + /// assign a future to a worker depends on if the caller is already on a + /// worker thread or external to the thread pool. + /// + /// If the caller is currently on the thread pool, the spawned future will + /// be assigned to the same worker that the caller is on. If the caller is + /// external to the thread pool, the future will be assigned to a random + /// worker. + /// + /// If `spawn` returns `Ok`, this does not mean that the future will be + /// executed. The thread pool can be forcibly shutdown between the time + /// `spawn` is called and the future has a chance to execute. + /// + /// If `spawn` returns `Err`, then the future failed to be spawned. There + /// are two possible causes: + /// + /// * The thread pool is at capacity and is unable to spawn a new future. + /// This is a temporary failure. At some point in the future, the thread + /// pool might be able to spawn new futures. + /// * The thread pool is shutdown. This is a permanent failure indicating + /// that the handle will never be able to spawn new futures. + /// + /// The status of the thread pool can be queried before calling `spawn` + /// using the `status` function (part of the `Executor` trait). + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// thread_pool.sender().spawn(lazy(|| { + /// println!("called from a worker thread"); + /// Ok(()) + /// })).unwrap(); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + let mut s = self; + tokio_executor::Executor::spawn(&mut s, Box::new(future)) + } + + /// Logic to prepare for spawning + fn prepare_for_spawn(&self) -> Result<(), SpawnError> { + let mut state: pool::State = self.pool.state.load(Acquire).into(); + + // Increment the number of futures spawned on the pool as well as + // validate that the pool is still running/ + loop { + let mut next = state; + + if next.num_futures() == MAX_FUTURES { + // No capacity + return Err(SpawnError::at_capacity()); + } + + if next.lifecycle() == Lifecycle::ShutdownNow { + // Cannot execute the future, executor is shutdown. + return Err(SpawnError::shutdown()); + } + + next.inc_num_futures(); + + let actual = self + .pool + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + trace!("execute; count={:?}", next.num_futures()); + break; + } + + state = actual; + } + + Ok(()) + } +} + +impl tokio_executor::Executor for Sender { + fn status(&self) -> Result<(), tokio_executor::SpawnError> { + let s = self; + tokio_executor::Executor::status(&s) + } + + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + let mut s = &*self; + tokio_executor::Executor::spawn(&mut s, future) + } +} + +impl<'a> tokio_executor::Executor for &'a Sender { + fn status(&self) -> Result<(), tokio_executor::SpawnError> { + let state: pool::State = self.pool.state.load(Acquire).into(); + + if state.num_futures() == MAX_FUTURES { + // No capacity + return Err(SpawnError::at_capacity()); + } + + if state.lifecycle() == Lifecycle::ShutdownNow { + // Cannot execute the future, executor is shutdown. + return Err(SpawnError::shutdown()); + } + + Ok(()) + } + + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.prepare_for_spawn()?; + + // At this point, the pool has accepted the future, so schedule it for + // execution. + + // Create a new task for the future + let task = Arc::new(Task::new(future)); + + // Call `submit_external()` in order to place the task into the global + // queue. This way all workers have equal chance of running this task, + // which means IO handles will be assigned to reactors more evenly. + self.pool.submit_external(task, &self.pool); + + Ok(()) + } +} + +impl<T> tokio_executor::TypedExecutor<T> for Sender +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn status(&self) -> Result<(), tokio_executor::SpawnError> { + tokio_executor::Executor::status(self) + } + + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + tokio_executor::Executor::spawn(self, Box::new(future)) + } +} + +impl<T> future::Executor<T> for Sender +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = tokio_executor::Executor::status(self) { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +impl Clone for Sender { + #[inline] + fn clone(&self) -> Sender { + let pool = self.pool.clone(); + Sender { pool } + } +} diff --git a/third_party/rust/tokio-threadpool/src/shutdown.rs b/third_party/rust/tokio-threadpool/src/shutdown.rs new file mode 100644 index 0000000000..c3d04a002d --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/shutdown.rs @@ -0,0 +1,103 @@ +use task::Task; +use worker; + +use crossbeam_deque::Injector; +use futures::task::AtomicTask; +use futures::{Async, Future, Poll}; + +use std::sync::{Arc, Mutex}; + +/// Future that resolves when the thread pool is shutdown. +/// +/// A `ThreadPool` is shutdown once all the worker have drained their queues and +/// shutdown their threads. +/// +/// `Shutdown` is returned by [`shutdown`], [`shutdown_on_idle`], and +/// [`shutdown_now`]. +/// +/// [`shutdown`]: struct.ThreadPool.html#method.shutdown +/// [`shutdown_on_idle`]: struct.ThreadPool.html#method.shutdown_on_idle +/// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now +#[derive(Debug)] +pub struct Shutdown { + inner: Arc<Mutex<Inner>>, +} + +/// Shared state between `Shutdown` and `ShutdownTrigger`. +/// +/// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped. +#[derive(Debug)] +struct Inner { + /// The task to notify when the threadpool completes the shutdown process. + task: AtomicTask, + /// `true` if the threadpool has been shut down. + completed: bool, +} + +impl Shutdown { + pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown { + Shutdown { + inner: trigger.inner.clone(), + } + } +} + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let inner = self.inner.lock().unwrap(); + + if !inner.completed { + inner.task.register(); + Ok(Async::NotReady) + } else { + Ok(().into()) + } + } +} + +/// When dropped, cleans up threadpool's resources and completes the shutdown process. +#[derive(Debug)] +pub(crate) struct ShutdownTrigger { + inner: Arc<Mutex<Inner>>, + workers: Arc<[worker::Entry]>, + queue: Arc<Injector<Arc<Task>>>, +} + +unsafe impl Send for ShutdownTrigger {} +unsafe impl Sync for ShutdownTrigger {} + +impl ShutdownTrigger { + pub(crate) fn new( + workers: Arc<[worker::Entry]>, + queue: Arc<Injector<Arc<Task>>>, + ) -> ShutdownTrigger { + ShutdownTrigger { + inner: Arc::new(Mutex::new(Inner { + task: AtomicTask::new(), + completed: false, + })), + workers, + queue, + } + } +} + +impl Drop for ShutdownTrigger { + fn drop(&mut self) { + // Drain the global task queue. + while !self.queue.steal().is_empty() {} + + // Drop the remaining incomplete tasks and parkers assosicated with workers. + for worker in self.workers.iter() { + worker.shutdown(); + } + + // Notify the task interested in shutdown. + let mut inner = self.inner.lock().unwrap(); + inner.completed = true; + inner.task.notify(); + } +} diff --git a/third_party/rust/tokio-threadpool/src/task/blocking.rs b/third_party/rust/tokio-threadpool/src/task/blocking.rs new file mode 100644 index 0000000000..ded59edfef --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/task/blocking.rs @@ -0,0 +1,496 @@ +use pool::Pool; +use task::{BlockingState, Task}; + +use futures::{Async, Poll}; + +use std::cell::UnsafeCell; +use std::fmt; +use std::ptr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::Arc; +use std::thread; + +/// Manages the state around entering a blocking section and tasks that are +/// queued pending the ability to block. +/// +/// This is a hybrid counter and intrusive mpsc channel (like `Queue`). +#[derive(Debug)] +pub(crate) struct Blocking { + /// Queue head. + /// + /// This is either the current remaining capacity for blocking sections + /// **or** if the max has been reached, the head of a pending blocking + /// capacity channel of tasks. + /// + /// When this points to a task, it represents a strong reference, i.e. + /// `Arc<Task>`. + state: AtomicUsize, + + /// Tail pointer. This is `Arc<Task>` unless it points to `stub`. + tail: UnsafeCell<*mut Task>, + + /// Stub pointer, used as part of the intrusive mpsc channel algorithm + /// described by 1024cores. + stub: Box<Task>, + + /// The channel algorithm is MPSC. This means that, in order to pop tasks, + /// coordination is required. + /// + /// Since it doesn't matter *which* task pops & notifies the queued task, we + /// can avoid a full mutex and make the "lock" lock free. + /// + /// Instead, threads race to set the "entered" bit. When the transition is + /// successfully made, the thread has permission to pop tasks off of the + /// queue. If a thread loses the race, instead of waiting to pop a task, it + /// signals to the winning thread that it should pop an additional task. + lock: AtomicUsize, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum CanBlock { + /// Blocking capacity has been allocated to this task. + /// + /// The capacity allocation is initially checked before a task is polled. If + /// capacity has been allocated, it is consumed and tracked as `Allocated`. + Allocated, + + /// Allocation capacity must be either available to the task when it is + /// polled or not available. This means that a task can only ask for + /// capacity once. This state is used to track a task that has not yet asked + /// for blocking capacity. When a task needs blocking capacity, if it is in + /// this state, it can immediately try to get an allocation. + CanRequest, + + /// The task has requested blocking capacity, but none is available. + NoCapacity, +} + +/// Decorates the `usize` value of `Blocking::state`, providing fns to +/// manipulate the state instead of requiring bit ops. +#[derive(Copy, Clone, Eq, PartialEq)] +struct State(usize); + +/// Flag differentiating between remaining capacity and task pointers. +/// +/// If we assume pointers are properly aligned, then the least significant bit +/// will always be zero. So, we use that bit to track if the value represents a +/// number. +const NUM_FLAG: usize = 1; + +/// When representing "numbers", the state has to be shifted this much (to get +/// rid of the flag bit). +const NUM_SHIFT: usize = 1; + +// ====== impl Blocking ===== +// +impl Blocking { + /// Create a new `Blocking`. + pub fn new(capacity: usize) -> Blocking { + assert!(capacity > 0, "blocking capacity must be greater than zero"); + + let stub = Box::new(Task::stub()); + let ptr = &*stub as *const _ as *mut _; + + // Allocations are aligned + debug_assert!(ptr as usize & NUM_FLAG == 0); + + // The initial state value. This starts at the max capacity. + let init = State::new(capacity); + + Blocking { + state: AtomicUsize::new(init.into()), + tail: UnsafeCell::new(ptr), + stub: stub, + lock: AtomicUsize::new(0), + } + } + + /// Atomically either acquire blocking capacity or queue the task to be + /// notified once capacity becomes available. + /// + /// The caller must ensure that `task` has not previously been queued to be + /// notified when capacity becomes available. + pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> { + // This requires atomically claiming blocking capacity and if none is + // available, queuing &task. + + // The task cannot be queued at this point. The caller must ensure this. + debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued()); + + // Don't bump the ref count unless necessary. + let mut strong: Option<*const Task> = None; + + // Load the state + let mut curr: State = self.state.load(Acquire).into(); + + loop { + let mut next = curr; + + if !next.claim_capacity(&self.stub) { + debug_assert!(curr.ptr().is_some()); + + // Unable to claim capacity, so we must queue `task` onto the + // channel. + // + // This guard also serves to ensure that queuing work that is + // only needed to run once only gets run once. + if strong.is_none() { + // First, transition the task to a "queued" state. This + // prevents double queuing. + // + // This is also the only thread that can set the queued flag + // at this point. And, the goal is for this to only be + // visible when the task node is polled from the channel. + // The memory ordering is established by MPSC queue + // operation. + // + // Note that, if the task doesn't get queued (because the + // CAS fails and capacity is now available) then this flag + // must be unset. Again, there is no race because until the + // task is queued, no other thread can see it. + let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); + debug_assert!(!prev.is_queued()); + + // Bump the ref count + strong = Some(Arc::into_raw(task.clone())); + + // Set the next pointer. This does not require an atomic + // operation as this node is not currently accessible to + // other threads via the queue. + task.next_blocking.store(ptr::null_mut(), Relaxed); + } + + let ptr = strong.unwrap(); + + // Update the head to point to the new node. We need to see the + // previous node in order to update the next pointer as well as + // release `task` to any other threads calling `push`. + next.set_ptr(ptr); + } + + debug_assert_ne!(curr.0, 0); + debug_assert_ne!(next.0, 0); + + let actual = self + .state + .compare_and_swap(curr.into(), next.into(), AcqRel) + .into(); + + if curr == actual { + break; + } + + curr = actual; + } + + match curr.ptr() { + Some(prev) => { + let ptr = strong.unwrap(); + + // Finish pushing + unsafe { + (*prev).next_blocking.store(ptr as *mut _, Release); + } + + // The node was queued to be notified once capacity is made + // available. + Ok(Async::NotReady) + } + None => { + debug_assert!(curr.remaining_capacity() > 0); + + // If `strong` is set, gotta undo a bunch of work + if let Some(ptr) = strong { + let _ = unsafe { Arc::from_raw(ptr) }; + + // Unset the queued flag. + let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); + debug_assert!(prev.is_queued()); + } + + // Capacity has been obtained + Ok(().into()) + } + } + } + + unsafe fn push_stub(&self) { + let task: *mut Task = &*self.stub as *const _ as *mut _; + + // Set the next pointer. This does not require an atomic operation as + // this node is not accessible. The write will be flushed with the next + // operation + (*task).next_blocking.store(ptr::null_mut(), Relaxed); + + // Update the head to point to the new node. We need to see the previous + // node in order to update the next pointer as well as release `task` + // to any other threads calling `push`. + let prev = self.state.swap(task as usize, AcqRel); + + // The stub is only pushed when there are pending tasks. Because of + // this, the state must *always* be in pointer mode. + debug_assert!(State::from(prev).is_ptr()); + + let prev = prev as *const Task; + + // We don't want the *existing* pointer to be a stub. + debug_assert_ne!(prev, task); + + // Release `task` to the consume end. + (*prev).next_blocking.store(task, Release); + } + + pub fn notify_task(&self, pool: &Arc<Pool>) { + let prev = self.lock.fetch_add(1, AcqRel); + + if prev != 0 { + // Another thread has the lock and will be responsible for notifying + // pending tasks. + return; + } + + let mut dec = 1; + + loop { + let mut remaining_pops = dec; + while remaining_pops > 0 { + remaining_pops -= 1; + + let task = match self.pop(remaining_pops) { + Some(t) => t, + None => break, + }; + + Task::notify_blocking(task, pool); + } + + // Decrement the number of handled notifications + let actual = self.lock.fetch_sub(dec, AcqRel); + + if actual == dec { + break; + } + + // This can only be greater than expected as we are the only thread + // that is decrementing. + debug_assert!(actual > dec); + dec = actual - dec; + } + } + + /// Pop a task + /// + /// `rem` represents the remaining number of times the caller will pop. If + /// there are no more tasks to pop, `rem` is used to set the remaining + /// capacity. + fn pop(&self, rem: usize) -> Option<Arc<Task>> { + 'outer: loop { + unsafe { + let mut tail = *self.tail.get(); + let mut next = (*tail).next_blocking.load(Acquire); + + let stub = &*self.stub as *const _ as *mut _; + + if tail == stub { + if next.is_null() { + // This loop is not part of the standard intrusive mpsc + // channel algorithm. This is where we atomically pop + // the last task and add `rem` to the remaining capacity. + // + // This modification to the pop algorithm works because, + // at this point, we have not done any work (only done + // reading). We have a *pretty* good idea that there is + // no concurrent pusher. + // + // The capacity is then atomically added by doing an + // AcqRel CAS on `state`. The `state` cell is the + // linchpin of the algorithm. + // + // By successfully CASing `head` w/ AcqRel, we ensure + // that, if any thread was racing and entered a push, we + // see that and abort pop, retrying as it is + // "inconsistent". + let mut curr: State = self.state.load(Acquire).into(); + + loop { + if curr.has_task(&self.stub) { + // Inconsistent state, yield the thread and try + // again. + thread::yield_now(); + continue 'outer; + } + + let mut after = curr; + + // +1 here because `rem` represents the number of + // pops that will come after the current one. + after.add_capacity(rem + 1, &self.stub); + + let actual: State = self + .state + .compare_and_swap(curr.into(), after.into(), AcqRel) + .into(); + + if actual == curr { + // Successfully returned the remaining capacity + return None; + } + + curr = actual; + } + } + + *self.tail.get() = next; + tail = next; + next = (*next).next_blocking.load(Acquire); + } + + if !next.is_null() { + *self.tail.get() = next; + + // No ref_count inc is necessary here as this poll is paired + // with a `push` which "forgets" the handle. + return Some(Arc::from_raw(tail)); + } + + let state = self.state.load(Acquire); + + // This must always be a pointer + debug_assert!(State::from(state).is_ptr()); + + if state != tail as usize { + // Try again + thread::yield_now(); + continue 'outer; + } + + self.push_stub(); + + next = (*tail).next_blocking.load(Acquire); + + if !next.is_null() { + *self.tail.get() = next; + + return Some(Arc::from_raw(tail)); + } + + thread::yield_now(); + // Try again + } + } + } +} + +// ====== impl State ===== + +impl State { + /// Return a new `State` representing the remaining capacity at the maximum + /// value. + fn new(capacity: usize) -> State { + State((capacity << NUM_SHIFT) | NUM_FLAG) + } + + fn remaining_capacity(&self) -> usize { + if !self.has_remaining_capacity() { + return 0; + } + + self.0 >> 1 + } + + fn has_remaining_capacity(&self) -> bool { + self.0 & NUM_FLAG == NUM_FLAG + } + + fn has_task(&self, stub: &Task) -> bool { + !(self.has_remaining_capacity() || self.is_stub(stub)) + } + + fn is_stub(&self, stub: &Task) -> bool { + self.0 == stub as *const _ as usize + } + + /// Try to claim blocking capacity. + /// + /// # Return + /// + /// Returns `true` if the capacity was claimed, `false` otherwise. If + /// `false` is returned, it can be assumed that `State` represents the head + /// pointer in the mpsc channel. + fn claim_capacity(&mut self, stub: &Task) -> bool { + if !self.has_remaining_capacity() { + return false; + } + + debug_assert!(self.0 != 1); + + self.0 -= 1 << NUM_SHIFT; + + if self.0 == NUM_FLAG { + // Set the state to the stub pointer. + self.0 = stub as *const _ as usize; + } + + true + } + + /// Add blocking capacity. + fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool { + debug_assert!(capacity > 0); + + if self.is_stub(stub) { + self.0 = (capacity << NUM_SHIFT) | NUM_FLAG; + true + } else if self.has_remaining_capacity() { + self.0 += capacity << NUM_SHIFT; + true + } else { + false + } + } + + fn is_ptr(&self) -> bool { + self.0 & NUM_FLAG == 0 + } + + fn ptr(&self) -> Option<*const Task> { + if self.is_ptr() { + Some(self.0 as *const Task) + } else { + None + } + } + + fn set_ptr(&mut self, ptr: *const Task) { + let ptr = ptr as usize; + debug_assert!(ptr & NUM_FLAG == 0); + self.0 = ptr + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut fmt = fmt.debug_struct("State"); + + if self.is_ptr() { + fmt.field("ptr", &self.0); + } else { + fmt.field("remaining", &self.remaining_capacity()); + } + + fmt.finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/task/blocking_state.rs b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs new file mode 100644 index 0000000000..b41fc4868d --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs @@ -0,0 +1,89 @@ +use task::CanBlock; + +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// State tracking task level state to support `blocking`. +/// +/// This tracks two separate flags. +/// +/// a) If the task is queued in the pending blocking channel. This prevents +/// double queuing (which would break the linked list). +/// +/// b) If the task has been allocated capacity to block. +#[derive(Eq, PartialEq)] +pub(crate) struct BlockingState(usize); + +const QUEUED: usize = 0b01; +const ALLOCATED: usize = 0b10; + +impl BlockingState { + /// Create a new, default, `BlockingState`. + pub fn new() -> BlockingState { + BlockingState(0) + } + + /// Returns `true` if the state represents the associated task being queued + /// in the pending blocking capacity channel + pub fn is_queued(&self) -> bool { + self.0 & QUEUED == QUEUED + } + + /// Toggle the queued flag + /// + /// Returns the state before the flag has been toggled. + pub fn toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState { + state.fetch_xor(QUEUED, ordering).into() + } + + /// Returns `true` if the state represents the associated task having been + /// allocated capacity to block. + pub fn is_allocated(&self) -> bool { + self.0 & ALLOCATED == ALLOCATED + } + + /// Atomically consume the capacity allocation and return if the allocation + /// was present. + /// + /// If this returns `true`, then the task has the ability to block for the + /// duration of the `poll`. + pub fn consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock { + let state: Self = state.fetch_and(!ALLOCATED, ordering).into(); + + if state.is_allocated() { + CanBlock::Allocated + } else if state.is_queued() { + CanBlock::NoCapacity + } else { + CanBlock::CanRequest + } + } + + pub fn notify_blocking(state: &AtomicUsize, ordering: Ordering) { + let prev: Self = state.fetch_xor(ALLOCATED | QUEUED, ordering).into(); + + debug_assert!(prev.is_queued()); + debug_assert!(!prev.is_allocated()); + } +} + +impl From<usize> for BlockingState { + fn from(src: usize) -> BlockingState { + BlockingState(src) + } +} + +impl From<BlockingState> for usize { + fn from(src: BlockingState) -> usize { + src.0 + } +} + +impl fmt::Debug for BlockingState { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BlockingState") + .field("is_queued", &self.is_queued()) + .field("is_allocated", &self.is_allocated()) + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/task/mod.rs b/third_party/rust/tokio-threadpool/src/task/mod.rs new file mode 100644 index 0000000000..9c5a448fcd --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/task/mod.rs @@ -0,0 +1,308 @@ +mod blocking; +mod blocking_state; +mod state; + +pub(crate) use self::blocking::{Blocking, CanBlock}; +use self::blocking_state::BlockingState; +use self::state::State; + +use notifier::Notifier; +use pool::Pool; + +use futures::executor::{self, Spawn}; +use futures::{self, Async, Future}; + +use std::cell::{Cell, UnsafeCell}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::{AtomicPtr, AtomicUsize}; +use std::sync::Arc; +use std::{fmt, panic, ptr}; + +/// Harness around a future. +/// +/// This also behaves as a node in the inbound work queue and the blocking +/// queue. +pub(crate) struct Task { + /// Task lifecycle state + state: AtomicUsize, + + /// Task blocking related state + blocking: AtomicUsize, + + /// Next pointer in the queue of tasks pending blocking capacity. + next_blocking: AtomicPtr<Task>, + + /// ID of the worker that polled this task first. + /// + /// This field can be a `Cell` because it's only accessed by the worker thread that is + /// executing the task. + /// + /// The worker ID is represented by a `u32` rather than `usize` in order to save some space + /// on 64-bit platforms. + pub reg_worker: Cell<Option<u32>>, + + /// The key associated with this task in the `Slab` it was registered in. + /// + /// This field can be a `Cell` because it's only accessed by the worker thread that has + /// registered the task. + pub reg_index: Cell<usize>, + + /// Store the future at the head of the struct + /// + /// The future is dropped immediately when it transitions to Complete + future: UnsafeCell<Option<Spawn<BoxFuture>>>, +} + +#[derive(Debug)] +pub(crate) enum Run { + Idle, + Schedule, + Complete, +} + +type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>; + +// ===== impl Task ===== + +impl Task { + /// Create a new `Task` as a harness for `future`. + pub fn new(future: BoxFuture) -> Task { + // Wrap the future with an execution context. + let task_fut = executor::spawn(future); + + Task { + state: AtomicUsize::new(State::new().into()), + blocking: AtomicUsize::new(BlockingState::new().into()), + next_blocking: AtomicPtr::new(ptr::null_mut()), + reg_worker: Cell::new(None), + reg_index: Cell::new(0), + future: UnsafeCell::new(Some(task_fut)), + } + } + + /// Create a fake `Task` to be used as part of the intrusive mpsc channel + /// algorithm. + fn stub() -> Task { + let future = Box::new(futures::empty()) as BoxFuture; + let task_fut = executor::spawn(future); + + Task { + state: AtomicUsize::new(State::stub().into()), + blocking: AtomicUsize::new(BlockingState::new().into()), + next_blocking: AtomicPtr::new(ptr::null_mut()), + reg_worker: Cell::new(None), + reg_index: Cell::new(0), + future: UnsafeCell::new(Some(task_fut)), + } + } + + /// Execute the task returning `Run::Schedule` if the task needs to be + /// scheduled again. + pub fn run(&self, unpark: &Arc<Notifier>) -> Run { + use self::State::*; + + // Transition task to running state. At this point, the task must be + // scheduled. + let actual: State = self + .state + .compare_and_swap(Scheduled.into(), Running.into(), AcqRel) + .into(); + + match actual { + Scheduled => {} + _ => panic!("unexpected task state; {:?}", actual), + } + + trace!( + "Task::run; state={:?}", + State::from(self.state.load(Relaxed)) + ); + + // The transition to `Running` done above ensures that a lock on the + // future has been obtained. + let fut = unsafe { &mut (*self.future.get()) }; + + // This block deals with the future panicking while being polled. + // + // If the future panics, then the drop handler must be called such that + // `thread::panicking() -> true`. To do this, the future is dropped from + // within the catch_unwind block. + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool); + + impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + // This drops the future + if self.1 { + let _ = self.0.take(); + } + } + } + + let mut g = Guard(fut, true); + + let ret = + g.0.as_mut() + .unwrap() + .poll_future_notify(unpark, self as *const _ as usize); + + g.1 = false; + + ret + })); + + match res { + Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => { + trace!(" -> task complete"); + + // The future has completed. Drop it immediately to free + // resources and run drop handlers. + // + // The `Task` harness will stay around longer if it is contained + // by any of the various queues. + self.drop_future(); + + // Transition to the completed state + self.state.store(State::Complete.into(), Release); + + if let Err(panic_err) = res { + if let Some(ref f) = unpark.pool.config.panic_handler { + f(panic_err); + } + } + + Run::Complete + } + Ok(Ok(Async::NotReady)) => { + trace!(" -> not ready"); + + // Attempt to transition from Running -> Idle, if successful, + // then the task does not need to be scheduled again. If the CAS + // fails, then the task has been unparked concurrent to running, + // in which case it transitions immediately back to scheduled + // and we return `true`. + let prev: State = self + .state + .compare_and_swap(Running.into(), Idle.into(), AcqRel) + .into(); + + match prev { + Running => Run::Idle, + Notified => { + self.state.store(Scheduled.into(), Release); + Run::Schedule + } + _ => unreachable!(), + } + } + } + } + + /// Aborts this task. + /// + /// This is called when the threadpool shuts down and the task has already beed polled but not + /// completed. + pub fn abort(&self) { + use self::State::*; + + let mut state = self.state.load(Acquire).into(); + + loop { + match state { + Idle | Scheduled => {} + Running | Notified | Complete | Aborted => { + // It is assumed that no worker threads are running so the task must be either + // in the idle or scheduled state. + panic!("unexpected state while aborting task: {:?}", state); + } + } + + let actual = self + .state + .compare_and_swap(state.into(), Aborted.into(), AcqRel) + .into(); + + if actual == state { + // The future has been aborted. Drop it immediately to free resources and run drop + // handlers. + self.drop_future(); + break; + } + + state = actual; + } + } + + /// Notify the task + pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) { + if me.schedule() { + let _ = pool.submit(me, pool); + } + } + + /// Notify the task it has been allocated blocking capacity + pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) { + BlockingState::notify_blocking(&me.blocking, AcqRel); + Task::notify(me, pool); + } + + /// Transition the task state to scheduled. + /// + /// Returns `true` if the caller is permitted to schedule the task. + pub fn schedule(&self) -> bool { + use self::State::*; + + loop { + // Scheduling can only be done from the `Idle` state. + let actual = self + .state + .compare_and_swap(Idle.into(), Scheduled.into(), AcqRel) + .into(); + + match actual { + Idle => return true, + Running => { + // The task is already running on another thread. Transition + // the state to `Notified`. If this CAS fails, then restart + // the logic again from `Idle`. + let actual = self + .state + .compare_and_swap(Running.into(), Notified.into(), AcqRel) + .into(); + + match actual { + Idle => continue, + _ => return false, + } + } + Complete | Aborted | Notified | Scheduled => return false, + } + } + } + + /// Consumes any allocated capacity to block. + /// + /// Returns `true` if capacity was allocated, `false` otherwise. + pub fn consume_blocking_allocation(&self) -> CanBlock { + // This flag is the primary point of coordination. The queued flag + // happens "around" setting the blocking capacity. + BlockingState::consume_allocation(&self.blocking, AcqRel) + } + + /// Drop the future + /// + /// This must only be called by the thread that successfully transitioned + /// the future state to `Running`. + fn drop_future(&self) { + let _ = unsafe { (*self.future.get()).take() }; + } +} + +impl fmt::Debug for Task { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Task") + .field("state", &self.state) + .field("future", &"Spawn<BoxFuture>") + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/task/state.rs b/third_party/rust/tokio-threadpool/src/task/state.rs new file mode 100644 index 0000000000..3e00f89bc5 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/task/state.rs @@ -0,0 +1,57 @@ +#[repr(usize)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum State { + /// Task is currently idle + Idle = 0, + + /// Task is currently running + Running = 1, + + /// Task is currently running, but has been notified that it must run again. + Notified = 2, + + /// Task has been scheduled + Scheduled = 3, + + /// Task is complete + Complete = 4, + + /// Task was aborted because the thread pool has been shut down + Aborted = 5, +} + +// ===== impl State ===== + +impl State { + /// Returns the initial task state. + /// + /// Tasks start in the scheduled state as they are immediately scheduled on + /// creation. + pub fn new() -> State { + State::Scheduled + } + + pub fn stub() -> State { + State::Idle + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + use self::State::*; + + debug_assert!( + src >= Idle as usize && src <= Aborted as usize, + "actual={}", + src + ); + + unsafe { ::std::mem::transmute(src) } + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src as usize + } +} diff --git a/third_party/rust/tokio-threadpool/src/thread_pool.rs b/third_party/rust/tokio-threadpool/src/thread_pool.rs new file mode 100644 index 0000000000..30f58e96ad --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/thread_pool.rs @@ -0,0 +1,217 @@ +use builder::Builder; +use pool::Pool; +use sender::Sender; +use shutdown::{Shutdown, ShutdownTrigger}; + +use futures::sync::oneshot; +use futures::{Future, Poll}; + +use std::sync::Arc; + +/// Work-stealing based thread pool for executing futures. +/// +/// If a `ThreadPool` instance is dropped without explicitly being shutdown, +/// `shutdown_now` is called implicitly, forcing all tasks that have not yet +/// completed to be dropped. +/// +/// Create `ThreadPool` instances using `Builder`. +#[derive(Debug)] +pub struct ThreadPool { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + sender: Sender, + trigger: Arc<ShutdownTrigger>, +} + +impl ThreadPool { + /// Create a new `ThreadPool` with default values. + /// + /// Use [`Builder`] for creating a configured thread pool. + /// + /// [`Builder`]: struct.Builder.html + pub fn new() -> ThreadPool { + Builder::new().build() + } + + pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool { + ThreadPool { + inner: Some(Inner { + sender: Sender { pool }, + trigger, + }), + } + } + + /// Spawn a future onto the thread pool. + /// + /// This function takes ownership of the future and randomly assigns it to a + /// worker thread. The thread will then start executing the future. + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// thread_pool.spawn(lazy(|| { + /// println!("called from a worker thread"); + /// Ok(()) + /// })); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Use [`Sender::spawn`] for a + /// version that returns a `Result` instead of panicking. + pub fn spawn<F>(&self, future: F) + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + self.sender().spawn(future).unwrap(); + } + + /// Spawn a future on to the thread pool, return a future representing + /// the produced value. + /// + /// The SpawnHandle returned is a future that is a proxy for future itself. + /// When future completes on this thread pool then the SpawnHandle will itself + /// be resolved. + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42))); + /// + /// let value = handle.wait().unwrap(); + /// assert_eq!(value, 42); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. + pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error> + where + F: Future + Send + 'static, + F::Item: Send + 'static, + F::Error: Send + 'static, + { + SpawnHandle(oneshot::spawn(future, self.sender())) + } + + /// Return a reference to the sender handle + /// + /// The handle is used to spawn futures onto the thread pool. It also + /// implements the `Executor` trait. + pub fn sender(&self) -> &Sender { + &self.inner.as_ref().unwrap().sender + } + + /// Return a mutable reference to the sender handle + pub fn sender_mut(&mut self) -> &mut Sender { + &mut self.inner.as_mut().unwrap().sender + } + + /// Shutdown the pool once it becomes idle. + /// + /// Idle is defined as the completion of all futures that have been spawned + /// onto the thread pool. There may still be outstanding handles when the + /// thread pool reaches an idle state. + /// + /// Once the idle state is reached, calling `spawn` on any outstanding + /// handle will result in an error. All worker threads are signaled and will + /// shutdown. The returned future completes once all worker threads have + /// completed the shutdown process. + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(false, false); + Shutdown::new(&inner.trigger) + } + + /// Shutdown the pool + /// + /// This prevents the thread pool from accepting new tasks but will allow + /// any existing tasks to complete. + /// + /// Calling `spawn` on any outstanding handle will result in an error. All + /// worker threads are signaled and will shutdown. The returned future + /// completes once all worker threads have completed the shutdown process. + pub fn shutdown(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(true, false); + Shutdown::new(&inner.trigger) + } + + /// Shutdown the pool immediately + /// + /// This will prevent the thread pool from accepting new tasks **and** + /// abort any tasks that are currently running on the thread pool. + /// + /// Calling `spawn` on any outstanding handle will result in an error. All + /// worker threads are signaled and will shutdown. The returned future + /// completes once all worker threads have completed the shutdown process. + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(true, true); + Shutdown::new(&inner.trigger) + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + // Begin the shutdown process. + inner.sender.pool.shutdown(true, true); + let shutdown = Shutdown::new(&inner.trigger); + + // Drop `inner` in order to drop its shutdown trigger. + drop(inner); + + // Wait until all worker threads terminate and the threadpool's resources clean up. + let _ = shutdown.wait(); + } + } +} + +/// Handle returned from ThreadPool::spawn_handle. +/// +/// This handle is a future representing the completion of a different future +/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle +/// function this handle will resolve when the future provided resolves on the +/// thread pool. +#[derive(Debug)] +pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>); + +impl<T, E> Future for SpawnHandle<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + self.0.poll() + } +} diff --git a/third_party/rust/tokio-threadpool/src/worker/entry.rs b/third_party/rust/tokio-threadpool/src/worker/entry.rs new file mode 100644 index 0000000000..0dcf5108b8 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/worker/entry.rs @@ -0,0 +1,330 @@ +use park::{BoxPark, BoxUnpark}; +use task::Task; +use worker::state::{State, PUSHED_MASK}; + +use std::cell::UnsafeCell; +use std::fmt; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use crossbeam_deque::{Steal, Stealer, Worker}; +use crossbeam_queue::SegQueue; +use crossbeam_utils::CachePadded; +use slab::Slab; + +// TODO: None of the fields should be public +// +// It would also be helpful to split up the state across what fields / +// operations are thread-safe vs. which ones require ownership of the worker. +pub(crate) struct WorkerEntry { + // Worker state. This is mutated when notifying the worker. + // + // The `usize` value is deserialized to a `worker::State` instance. See + // comments on that type. + pub state: CachePadded<AtomicUsize>, + + // Next entry in the parked Trieber stack + next_sleeper: UnsafeCell<usize>, + + // Worker half of deque + pub worker: Worker<Arc<Task>>, + + // Stealer half of deque + stealer: Stealer<Arc<Task>>, + + // Thread parker + park: UnsafeCell<Option<BoxPark>>, + + // Thread unparker + unpark: UnsafeCell<Option<BoxUnpark>>, + + // Tasks that have been first polled by this worker, but not completed yet. + running_tasks: UnsafeCell<Slab<Arc<Task>>>, + + // Tasks that have been first polled by this worker, but completed by another worker. + remotely_completed_tasks: SegQueue<Arc<Task>>, + + // Set to `true` when `remotely_completed_tasks` has tasks that need to be removed from + // `running_tasks`. + needs_drain: AtomicBool, +} + +impl WorkerEntry { + pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self { + let w = Worker::new_fifo(); + let s = w.stealer(); + + WorkerEntry { + state: CachePadded::new(AtomicUsize::new(State::default().into())), + next_sleeper: UnsafeCell::new(0), + worker: w, + stealer: s, + park: UnsafeCell::new(Some(park)), + unpark: UnsafeCell::new(Some(unpark)), + running_tasks: UnsafeCell::new(Slab::new()), + remotely_completed_tasks: SegQueue::new(), + needs_drain: AtomicBool::new(false), + } + } + + /// Atomically unset the pushed flag. + /// + /// # Return + /// + /// The state *before* the push flag is unset. + /// + /// # Ordering + /// + /// The specified ordering is established on the entry's state variable. + pub fn fetch_unset_pushed(&self, ordering: Ordering) -> State { + self.state.fetch_and(!PUSHED_MASK, ordering).into() + } + + /// Submit a task to this worker while currently on the same thread that is + /// running the worker. + #[inline] + pub fn submit_internal(&self, task: Arc<Task>) { + self.push_internal(task); + } + + /// Notifies the worker and returns `false` if it needs to be spawned. + /// + /// # Ordering + /// + /// The `state` must have been obtained with an `Acquire` ordering. + #[inline] + pub fn notify(&self, mut state: State) -> bool { + use worker::Lifecycle::*; + + loop { + let mut next = state; + next.notify(); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + break; + } + + state = actual; + } + + match state.lifecycle() { + Sleeping => { + // The worker is currently sleeping, the condition variable must + // be signaled + self.unpark(); + true + } + Shutdown => false, + Running | Notified | Signaled => { + // In these states, the worker is active and will eventually see + // the task that was just submitted. + true + } + } + } + + /// Signals to the worker that it should stop + /// + /// `state` is the last observed state for the worker. This allows skipping + /// the initial load from the state atomic. + /// + /// # Return + /// + /// Returns `Ok` when the worker was successfully signaled. + /// + /// Returns `Err` if the worker has already terminated. + pub fn signal_stop(&self, mut state: State) { + use worker::Lifecycle::*; + + // Transition the worker state to signaled + loop { + let mut next = state; + + match state.lifecycle() { + Shutdown => { + return; + } + Running | Sleeping => {} + Notified | Signaled => { + // These two states imply that the worker is active, thus it + // will eventually see the shutdown signal, so we don't need + // to do anything. + // + // The worker is forced to see the shutdown signal + // eventually as: + // + // a) No more work will arrive + // b) The shutdown signal is stored as the head of the + // sleep, stack which will prevent the worker from going to + // sleep again. + return; + } + } + + next.set_lifecycle(Signaled); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + break; + } + + state = actual; + } + + // Wakeup the worker + self.unpark(); + } + + /// Pop a task + /// + /// This **must** only be called by the thread that owns the worker entry. + /// This function is not `Sync`. + #[inline] + pub fn pop_task(&self) -> Option<Arc<Task>> { + self.worker.pop() + } + + /// Steal tasks + /// + /// This is called by *other* workers to steal a task for processing. This + /// function is `Sync`. + /// + /// At the same time, this method steals some additional tasks and moves + /// them into `dest` in order to balance the work distribution among + /// workers. + pub fn steal_tasks(&self, dest: &Self) -> Steal<Arc<Task>> { + self.stealer.steal_batch_and_pop(&dest.worker) + } + + /// Drain (and drop) all tasks that are queued for work. + /// + /// This is called when the pool is shutting down. + pub fn drain_tasks(&self) { + while self.worker.pop().is_some() {} + } + + /// Parks the worker thread. + pub fn park(&self) { + if let Some(park) = unsafe { (*self.park.get()).as_mut() } { + park.park().unwrap(); + } + } + + /// Parks the worker thread for at most `duration`. + pub fn park_timeout(&self, duration: Duration) { + if let Some(park) = unsafe { (*self.park.get()).as_mut() } { + park.park_timeout(duration).unwrap(); + } + } + + /// Unparks the worker thread. + #[inline] + pub fn unpark(&self) { + if let Some(park) = unsafe { (*self.unpark.get()).as_ref() } { + park.unpark(); + } + } + + /// Registers a task in this worker. + /// + /// Called when the task is being polled for the first time. + #[inline] + pub fn register_task(&self, task: &Arc<Task>) { + let running_tasks = unsafe { &mut *self.running_tasks.get() }; + + let key = running_tasks.insert(task.clone()); + task.reg_index.set(key); + } + + /// Unregisters a task from this worker. + /// + /// Called when the task is completed and was previously registered in this worker. + #[inline] + pub fn unregister_task(&self, task: Arc<Task>) { + let running_tasks = unsafe { &mut *self.running_tasks.get() }; + running_tasks.remove(task.reg_index.get()); + self.drain_remotely_completed_tasks(); + } + + /// Unregisters a task from this worker. + /// + /// Called when the task is completed by another worker and was previously registered in this + /// worker. + #[inline] + pub fn remotely_complete_task(&self, task: Arc<Task>) { + self.remotely_completed_tasks.push(task); + self.needs_drain.store(true, Release); + } + + /// Drops the remaining incomplete tasks and the parker associated with this worker. + /// + /// This function is called by the shutdown trigger. + pub fn shutdown(&self) { + self.drain_remotely_completed_tasks(); + + // Abort all incomplete tasks. + let running_tasks = unsafe { &mut *self.running_tasks.get() }; + for (_, task) in running_tasks.iter() { + task.abort(); + } + running_tasks.clear(); + + unsafe { + *self.park.get() = None; + *self.unpark.get() = None; + } + } + + /// Drains the `remotely_completed_tasks` queue and removes tasks from `running_tasks`. + #[inline] + fn drain_remotely_completed_tasks(&self) { + if self.needs_drain.compare_and_swap(true, false, Acquire) { + let running_tasks = unsafe { &mut *self.running_tasks.get() }; + + while let Ok(task) = self.remotely_completed_tasks.pop() { + running_tasks.remove(task.reg_index.get()); + } + } + } + + #[inline] + pub fn push_internal(&self, task: Arc<Task>) { + self.worker.push(task); + } + + #[inline] + pub fn next_sleeper(&self) -> usize { + unsafe { *self.next_sleeper.get() } + } + + #[inline] + pub fn set_next_sleeper(&self, val: usize) { + unsafe { + *self.next_sleeper.get() = val; + } + } +} + +impl fmt::Debug for WorkerEntry { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("WorkerEntry") + .field("state", &self.state.load(Relaxed)) + .field("next_sleeper", &"UnsafeCell<usize>") + .field("worker", &self.worker) + .field("stealer", &self.stealer) + .field("park", &"UnsafeCell<BoxPark>") + .field("unpark", &"BoxUnpark") + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/worker/mod.rs b/third_party/rust/tokio-threadpool/src/worker/mod.rs new file mode 100644 index 0000000000..d380c5d561 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/worker/mod.rs @@ -0,0 +1,797 @@ +mod entry; +mod stack; +mod state; + +pub(crate) use self::entry::WorkerEntry as Entry; +pub(crate) use self::stack::Stack; +pub(crate) use self::state::{Lifecycle, State}; + +use notifier::Notifier; +use pool::{self, BackupId, Pool}; +use sender::Sender; +use shutdown::ShutdownTrigger; +use task::{self, CanBlock, Task}; + +use tokio_executor; + +use futures::{Async, Poll}; + +use std::cell::Cell; +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +/// Thread worker +/// +/// This is passed to the [`around_worker`] callback set on [`Builder`]. This +/// callback is only expected to call [`run`] on it. +/// +/// [`Builder`]: struct.Builder.html +/// [`around_worker`]: struct.Builder.html#method.around_worker +/// [`run`]: struct.Worker.html#method.run +#[derive(Debug)] +pub struct Worker { + // Shared scheduler data + pub(crate) pool: Arc<Pool>, + + // WorkerEntry index + pub(crate) id: WorkerId, + + // Backup thread ID assigned to processing this worker. + backup_id: BackupId, + + // Set to the task that is currently being polled by the worker. This is + // needed so that `blocking` blocks are able to interact with this task. + // + // This has to be a raw pointer to make it compile, but great care is taken + // when this is set. + current_task: CurrentTask, + + // Set when the thread is in blocking mode. + is_blocking: Cell<bool>, + + // Set when the worker should finalize on drop + should_finalize: Cell<bool>, + + // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped. + trigger: Arc<ShutdownTrigger>, + + // Keep the value on the current thread. + _p: PhantomData<Rc<()>>, +} + +/// Tracks the state related to the currently running task. +#[derive(Debug)] +struct CurrentTask { + /// This has to be a raw pointer to make it compile, but great care is taken + /// when this is set. + task: Cell<Option<*const Arc<Task>>>, + + /// Tracks the blocking capacity allocation state. + can_block: Cell<CanBlock>, +} + +/// Identifies a thread pool worker. +/// +/// This identifier is unique scoped by the thread pool. It is possible that +/// different thread pool instances share worker identifier values. +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct WorkerId(pub(crate) usize); + +// Pointer to the current worker info +thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _)); + +impl Worker { + pub(crate) fn new( + id: WorkerId, + backup_id: BackupId, + pool: Arc<Pool>, + trigger: Arc<ShutdownTrigger>, + ) -> Worker { + Worker { + pool, + id, + backup_id, + current_task: CurrentTask::new(), + is_blocking: Cell::new(false), + should_finalize: Cell::new(false), + trigger, + _p: PhantomData, + } + } + + pub(crate) fn is_blocking(&self) -> bool { + self.is_blocking.get() + } + + /// Run the worker + /// + /// Returns `true` if the thread should keep running as a `backup` thread. + pub(crate) fn do_run(&self) -> bool { + // Create another worker... It's ok, this is just a new type around + // `Pool` that is expected to stay on the current thread. + CURRENT_WORKER.with(|c| { + c.set(self as *const _); + + let pool = self.pool.clone(); + let mut sender = Sender { pool }; + + // Enter an execution context + let mut enter = tokio_executor::enter().unwrap(); + + tokio_executor::with_default(&mut sender, &mut enter, |enter| { + if let Some(ref callback) = self.pool.config.around_worker { + callback.call(self, enter); + } else { + self.run(); + } + }); + }); + + // Can't be in blocking mode and finalization mode + debug_assert!(!self.is_blocking.get() || !self.should_finalize.get()); + + self.is_blocking.get() + } + + pub(crate) fn with_current<F: FnOnce(Option<&Worker>) -> R, R>(f: F) -> R { + CURRENT_WORKER.with(move |c| { + let ptr = c.get(); + + if ptr.is_null() { + f(None) + } else { + f(Some(unsafe { &*ptr })) + } + }) + } + + /// Transition the current worker to a blocking worker + pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> { + use self::CanBlock::*; + + // If we get this far, then `current_task` has been set. + let task_ref = self.current_task.get_ref(); + + // First step is to acquire blocking capacity for the task. + match self.current_task.can_block() { + // Capacity to block has already been allocated to this task. + Allocated => {} + + // The task has already requested capacity to block, but there is + // none yet available. + NoCapacity => return Ok(Async::NotReady), + + // The task has yet to ask for capacity + CanRequest => { + // Atomically attempt to acquire blocking capacity, and if none + // is available, register the task to be notified once capacity + // becomes available. + match self.pool.poll_blocking_capacity(task_ref)? { + Async::Ready(()) => { + self.current_task.set_can_block(Allocated); + } + Async::NotReady => { + self.current_task.set_can_block(NoCapacity); + return Ok(Async::NotReady); + } + } + } + } + + // The task has been allocated blocking capacity. At this point, this is + // when the current thread transitions from a worker to a backup thread. + // To do so requires handing over the worker to another backup thread. + + if self.is_blocking.get() { + // The thread is already in blocking mode, so there is nothing else + // to do. Return `Ready` and allow the caller to block the thread. + return Ok(().into()); + } + + trace!("transition to blocking state"); + + // Transitioning to blocking requires handing over the worker state to + // another thread so that the work queue can continue to be processed. + + self.pool.spawn_thread(self.id.clone(), &self.pool); + + // Track that the thread has now fully entered the blocking state. + self.is_blocking.set(true); + + Ok(().into()) + } + + /// Transition from blocking + pub(crate) fn transition_from_blocking(&self) { + // TODO: Attempt to take ownership of the worker again. + } + + /// Returns a reference to the worker's identifier. + /// + /// This identifier is unique scoped by the thread pool. It is possible that + /// different thread pool instances share worker identifier values. + pub fn id(&self) -> &WorkerId { + &self.id + } + + /// Run the worker + /// + /// This function blocks until the worker is shutting down. + pub fn run(&self) { + const MAX_SPINS: usize = 3; + const LIGHT_SLEEP_INTERVAL: usize = 32; + + // Get the notifier. + let notify = Arc::new(Notifier { + pool: self.pool.clone(), + }); + + let mut first = true; + let mut spin_cnt = 0; + let mut tick = 0; + + while self.check_run_state(first) { + first = false; + + // Run the next available task + if self.try_run_task(¬ify) { + if self.is_blocking.get() { + // Exit out of the run state + return; + } + + // Poll the reactor and the global queue every now and then to + // ensure no task gets left behind. + if tick % LIGHT_SLEEP_INTERVAL == 0 { + self.sleep_light(); + } + + tick = tick.wrapping_add(1); + spin_cnt = 0; + + // As long as there is work, keep looping. + continue; + } + + spin_cnt += 1; + + // Yield the thread several times before it actually goes to sleep. + if spin_cnt <= MAX_SPINS { + thread::yield_now(); + continue; + } + + tick = 0; + spin_cnt = 0; + + // Starting to get sleeeeepy + if !self.sleep() { + return; + } + + // If there still isn't any work to do, shutdown the worker? + } + + // The pool is terminating. However, transitioning the pool state to + // terminated is the very first step of the finalization process. Other + // threads may not see this state and try to spawn a new thread. To + // ensure consistency, before the current thread shuts down, it must + // return the backup token to the stack. + // + // The returned result is ignored because `Err` represents the pool + // shutting down. We are currently aware of this fact. + let _ = self.pool.release_backup(self.backup_id); + + self.should_finalize.set(true); + } + + /// Try to run a task + /// + /// Returns `true` if work was found. + #[inline] + fn try_run_task(&self, notify: &Arc<Notifier>) -> bool { + if self.try_run_owned_task(notify) { + return true; + } + + self.try_steal_task(notify) + } + + /// Checks the worker's current state, updating it as needed. + /// + /// Returns `true` if the worker should run. + #[inline] + fn check_run_state(&self, first: bool) -> bool { + use self::Lifecycle::*; + + debug_assert!(!self.is_blocking.get()); + + let mut state: State = self.entry().state.load(Acquire).into(); + + loop { + let pool_state: pool::State = self.pool.state.load(Acquire).into(); + + if pool_state.is_terminated() { + return false; + } + + let mut next = state; + + match state.lifecycle() { + Running => break, + Notified | Signaled => { + // transition back to running + next.set_lifecycle(Running); + } + Shutdown | Sleeping => { + // The worker should never be in these states when calling + // this function. + panic!("unexpected worker state; lifecycle={:?}", state.lifecycle()); + } + } + + let actual = self + .entry() + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + break; + } + + state = actual; + } + + // `first` is set to true the first time this function is called after + // the thread has started. + // + // This check is to handle the scenario where a worker gets signaled + // while it is already happily running. The `is_signaled` state is + // intended to wake up a worker that has been previously sleeping in + // effect increasing the number of active workers. If this is the first + // time `check_run_state` is called, then being in a signalled state is + // normal and the thread was started to handle it. However, if this is + // **not** the first time the fn was called, then the number of active + // workers has not been increased by the signal, so `signal_work` has to + // be called again to try to wake up another worker. + // + // For example, if the thread pool is configured to allow 4 workers. + // Worker 1 is processing tasks from its `deque`. Worker 2 receives its + // first task. Worker 2 will pick a random worker to signal. It does + // this by popping off the sleep stack, but there is no guarantee that + // workers on the sleep stack are actually sleeping. It is possible that + // Worker 1 gets signaled. + // + // Without this check, in the above case, no additional workers will get + // started, which results in the thread pool permanently being at 2 + // workers even though it should reach 4. + if !first && state.is_signaled() { + trace!("Worker::check_run_state; delegate signal"); + // This worker is not ready to be signaled, so delegate the signal + // to another worker. + self.pool.signal_work(&self.pool); + } + + true + } + + /// Runs the next task on this worker's queue. + /// + /// Returns `true` if work was found. + fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool { + // Poll the internal queue for a task to run + match self.entry().pop_task() { + Some(task) => { + self.run_task(task, notify); + true + } + None => false, + } + } + + /// Tries to steal a task from another worker. + /// + /// Returns `true` if work was found + fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool { + use crossbeam_deque::Steal; + + debug_assert!(!self.is_blocking.get()); + + let len = self.pool.workers.len(); + let mut idx = self.pool.rand_usize() % len; + let mut found_work = false; + let start = idx; + + loop { + if idx < len { + match self.pool.workers[idx].steal_tasks(self.entry()) { + Steal::Success(task) => { + trace!("stole task from another worker"); + + self.run_task(task, notify); + + trace!( + "try_steal_task -- signal_work; self={}; from={}", + self.id.0, + idx + ); + + // Signal other workers that work is available + // + // TODO: Should this be called here or before + // `run_task`? + self.pool.signal_work(&self.pool); + + return true; + } + Steal::Empty => {} + Steal::Retry => found_work = true, + } + + idx += 1; + } else { + idx = 0; + } + + if idx == start { + break; + } + } + + found_work + } + + fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>) { + use task::Run::*; + + // If this is the first time this task is being polled, register it so that we can keep + // track of tasks that are in progress. + if task.reg_worker.get().is_none() { + task.reg_worker.set(Some(self.id.0 as u32)); + self.entry().register_task(&task); + } + + let run = self.run_task2(&task, notify); + + // TODO: Try to claim back the worker state in case the backup thread + // did not start up fast enough. This is a performance optimization. + + match run { + Idle => {} + Schedule => { + if self.is_blocking.get() { + // The future has been notified while it was running. + // However, the future also entered a blocking section, + // which released the worker state from this thread. + // + // This means that scheduling the future must be done from + // a point of view external to the worker set. + // + // We have to call `submit_external` instead of `submit` + // here because `self` is still set as the current worker. + self.pool.submit_external(task, &self.pool); + } else { + self.entry().push_internal(task); + } + } + Complete => { + let mut state: pool::State = self.pool.state.load(Acquire).into(); + + loop { + let mut next = state; + next.dec_num_futures(); + + let actual = self + .pool + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + trace!("task complete; state={:?}", next); + + if state.num_futures() == 1 { + // If the thread pool has been flagged as shutdown, + // start terminating workers. This involves waking + // up any sleeping worker so that they can notice + // the shutdown state. + if next.is_terminated() { + self.pool.terminate_sleeping_workers(); + } + } + + // Find which worker polled this task first. + let worker = task.reg_worker.get().unwrap() as usize; + + // Unregister the task from the worker it was registered in. + if !self.is_blocking.get() && worker == self.id.0 { + self.entry().unregister_task(task); + } else { + self.pool.workers[worker].remotely_complete_task(task); + } + + // The worker's run loop will detect the shutdown state + // next iteration. + return; + } + + state = actual; + } + } + } + } + + /// Actually run the task. This is where `Worker::current_task` is set. + /// + /// Great care is needed to ensure that `current_task` is unset in this + /// function. + fn run_task2(&self, task: &Arc<Task>, notify: &Arc<Notifier>) -> task::Run { + struct Guard<'a> { + worker: &'a Worker, + } + + impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + // A task is allocated at run when it was explicitly notified + // that the task has capacity to block. When this happens, that + // capacity is automatically allocated to the notified task. + // This capacity is "use it or lose it", so if the thread is not + // transitioned to blocking in this call, then another task has + // to be notified. + // + // If the task has consumed its blocking allocation but hasn't + // used it, it must be given to some other task instead. + if !self.worker.is_blocking.get() { + let can_block = self.worker.current_task.can_block(); + if can_block == CanBlock::Allocated { + self.worker.pool.notify_blocking_task(&self.worker.pool); + } + } + + self.worker.current_task.clear(); + } + } + + // Set `current_task` + self.current_task.set(task, CanBlock::CanRequest); + + // Create the guard, this ensures that `current_task` is unset when the + // function returns, even if the return is caused by a panic. + let _g = Guard { worker: self }; + + task.run(notify) + } + + /// Put the worker to sleep + /// + /// Returns `true` if woken up due to new work arriving. + fn sleep(&self) -> bool { + use self::Lifecycle::*; + + // Putting a worker to sleep is a multipart operation. This is, in part, + // due to the fact that a worker can be notified without it being popped + // from the sleep stack. Extra care is needed to deal with this. + + trace!("Worker::sleep; worker={:?}", self.id); + + let mut state: State = self.entry().state.load(Acquire).into(); + + // The first part of the sleep process is to transition the worker state + // to "pushed". Now, it may be that the worker is already pushed on the + // sleeper stack, in which case, we don't push again. + + loop { + let mut next = state; + + match state.lifecycle() { + Running => { + // Try setting the pushed state + next.set_pushed(); + + // Transition the worker state to sleeping + next.set_lifecycle(Sleeping); + } + Notified | Signaled => { + // No need to sleep, transition back to running and move on. + next.set_lifecycle(Running); + } + Shutdown | Sleeping => { + // The worker cannot transition to sleep when already in a + // sleeping state. + panic!("unexpected worker state; actual={:?}", state.lifecycle()); + } + } + + let actual = self + .entry() + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + if state.is_notified() { + // The previous state was notified, so we don't need to + // sleep. + return true; + } + + if !state.is_pushed() { + debug_assert!(next.is_pushed()); + + trace!(" sleeping -- push to stack; idx={}", self.id.0); + + // We obtained permission to push the worker into the + // sleeper queue. + if let Err(_) = self.pool.push_sleeper(self.id.0) { + trace!(" sleeping -- push to stack failed; idx={}", self.id.0); + // The push failed due to the pool being terminated. + // + // This is true because the "work" being woken up for is + // shutting down. + return true; + } + } + + break; + } + + state = actual; + } + + trace!(" -> starting to sleep; idx={}", self.id.0); + + // Do a quick check to see if there are any notifications in the + // reactor or new tasks in the global queue. Since this call will + // clear the wakeup token, we need to check the state again and + // only after that go to sleep. + self.sleep_light(); + + // The state has been transitioned to sleeping, we can now wait by + // calling the parker. This is done in a loop as condvars can wakeup + // spuriously. + loop { + // Reload the state + state = self.entry().state.load(Acquire).into(); + + // If the worker has been notified, transition back to running. + match state.lifecycle() { + Sleeping => { + // Still sleeping. Park again. + } + Notified | Signaled => { + // Transition back to running + loop { + let mut next = state; + next.set_lifecycle(Running); + + let actual = self + .entry() + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + return true; + } + + state = actual; + } + } + Shutdown | Running => { + // To get here, the block above transitioned the state to + // `Sleeping`. No other thread can concurrently + // transition to `Shutdown` or `Running`. + unreachable!(); + } + } + + self.entry().park(); + + trace!(" -> wakeup; idx={}", self.id.0); + } + } + + /// This doesn't actually put the thread to sleep. It calls + /// `park.park_timeout` with a duration of 0. This allows the park + /// implementation to perform any work that might be done on an interval. + /// + /// Returns `true` if this worker has tasks in its queue. + fn sleep_light(&self) { + self.entry().park_timeout(Duration::from_millis(0)); + + use crossbeam_deque::Steal; + loop { + match self.pool.queue.steal_batch(&self.entry().worker) { + Steal::Success(()) => { + self.pool.signal_work(&self.pool); + break; + } + Steal::Empty => break, + Steal::Retry => {} + } + } + } + + fn entry(&self) -> &Entry { + debug_assert!(!self.is_blocking.get()); + &self.pool.workers[self.id.0] + } +} + +impl Drop for Worker { + fn drop(&mut self) { + trace!("shutting down thread; idx={}", self.id.0); + + if self.should_finalize.get() { + // Drain the work queue + self.entry().drain_tasks(); + } + } +} + +// ===== impl CurrentTask ===== + +impl CurrentTask { + /// Returns a default `CurrentTask` representing no task. + fn new() -> CurrentTask { + CurrentTask { + task: Cell::new(None), + can_block: Cell::new(CanBlock::CanRequest), + } + } + + /// Returns a reference to the task. + fn get_ref(&self) -> &Arc<Task> { + unsafe { &*self.task.get().unwrap() } + } + + fn can_block(&self) -> CanBlock { + use self::CanBlock::*; + + match self.can_block.get() { + Allocated => Allocated, + CanRequest | NoCapacity => { + let can_block = self.get_ref().consume_blocking_allocation(); + self.can_block.set(can_block); + can_block + } + } + } + + fn set_can_block(&self, can_block: CanBlock) { + self.can_block.set(can_block); + } + + fn set(&self, task: &Arc<Task>, can_block: CanBlock) { + self.task.set(Some(task as *const _)); + self.can_block.set(can_block); + } + + /// Reset the `CurrentTask` to null state. + fn clear(&self) { + self.task.set(None); + self.can_block.set(CanBlock::CanRequest); + } +} + +// ===== impl WorkerId ===== + +impl WorkerId { + /// Returns a `WorkerId` representing the worker entry at index `idx`. + pub(crate) fn new(idx: usize) -> WorkerId { + WorkerId(idx) + } + + /// Returns this identifier represented as an integer. + /// + /// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the + /// range `0..pool_size`. + pub fn to_usize(&self) -> usize { + self.0 + } +} diff --git a/third_party/rust/tokio-threadpool/src/worker/stack.rs b/third_party/rust/tokio-threadpool/src/worker/stack.rs new file mode 100644 index 0000000000..d02c277fed --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/worker/stack.rs @@ -0,0 +1,260 @@ +use config::MAX_WORKERS; +use worker; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed}; +use std::{fmt, usize}; + +/// Lock-free stack of sleeping workers. +/// +/// This is implemented as a Treiber stack and references to nodes are +/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by +/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This +/// bit tracks if the entry is already pushed onto the stack or not. A single +/// entry can only be stored on the stack a single time. +/// +/// By using indexes instead of pointers, that allows a much greater amount of +/// data to be used for the ABA guard (see correctness section of wikipedia +/// page). +/// +/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack +#[derive(Debug)] +pub(crate) struct Stack { + state: AtomicUsize, +} + +/// State related to the stack of sleeping workers. +/// +/// - Parked head 16 bits +/// - Sequence remaining +/// +/// The parked head value has a couple of special values: +/// +/// - EMPTY: No sleepers +/// - TERMINATED: Don't spawn more threads +#[derive(Eq, PartialEq, Clone, Copy)] +pub struct State(usize); + +/// Extracts the head of the worker stack from the scheduler state +/// +/// The 16 relates to the value of MAX_WORKERS +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: usize = MAX_WORKERS; + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: usize = EMPTY + 1; + +/// How many bits the Treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl Stack ===== + +impl Stack { + /// Create a new `Stack` representing the empty state. + pub fn new() -> Stack { + let state = AtomicUsize::new(State::new().into()); + Stack { state } + } + + /// Push a worker onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// When terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed()); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[idx].set_next_sleeper(head); + next.set_head(idx); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + /// Pop a worker off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further workers can be pushed onto the stack. + /// + /// # Return + /// + /// Returns the index of the popped worker and the worker's observed state. + /// + /// `None` if the stack is empty. + pub fn pop( + &self, + entries: &[worker::Entry], + max_lifecycle: worker::Lifecycle, + terminate: bool, + ) -> Option<(usize, worker::State)> { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + // If terminating, the max lifecycle *must* be `Signaled`, which is the + // highest lifecycle. By passing the greatest possible lifecycle value, + // no entries are skipped by this function. + // + // TODO: It would be better to terminate in a separate function that + // atomically takes all values and transitions to a terminated state. + debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled); + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return None; + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual != state { + state = actual; + continue; + } + + return None; + } else if head == TERMINATED { + return None; + } + + debug_assert!(head < MAX_WORKERS); + + let mut next = state; + + let next_head = entries[head].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + // Release ordering is needed to ensure that unsetting the + // `pushed` flag happens after popping the sleeper from the + // stack. + // + // Acquire ordering is required to acquire any memory associated + // with transitioning the worker's lifecycle. + let state = entries[head].fetch_unset_pushed(AcqRel); + + if state.lifecycle() >= max_lifecycle { + // If the worker has already been notified, then it is + // warming up to do more work. In this case, try to pop + // another thread that might be in a relaxed state. + continue; + } + + return Some((head, state)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + #[inline] + fn new() -> State { + State(EMPTY) + } + + #[inline] + fn head(&self) -> usize { + self.0 & STACK_MASK + } + + #[inline] + fn set_head(&mut self, val: usize) { + // The ABA guard protects against the ABA problem w/ Treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let head = self.head(); + + let mut fmt = fmt.debug_struct("stack::State"); + + if head < MAX_WORKERS { + fmt.field("head", &head); + } else if head == EMPTY { + fmt.field("head", &"EMPTY"); + } else if head == TERMINATED { + fmt.field("head", &"TERMINATED"); + } + + fmt.finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/worker/state.rs b/third_party/rust/tokio-threadpool/src/worker/state.rs new file mode 100644 index 0000000000..c388f6c99e --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/worker/state.rs @@ -0,0 +1,153 @@ +use std::fmt; + +/// Tracks worker state +#[derive(Clone, Copy, Eq, PartialEq)] +pub(crate) struct State(usize); + +/// Set when the worker is pushed onto the scheduler's stack of sleeping +/// threads. +pub(crate) const PUSHED_MASK: usize = 0b001; + +/// Manages the worker lifecycle part of the state +const LIFECYCLE_MASK: usize = 0b1110; +const LIFECYCLE_SHIFT: usize = 1; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] +#[repr(usize)] +pub(crate) enum Lifecycle { + /// The worker does not currently have an associated thread. + Shutdown = 0 << LIFECYCLE_SHIFT, + + /// The worker is doing work + Running = 1 << LIFECYCLE_SHIFT, + + /// The worker is currently asleep in the condvar + Sleeping = 2 << LIFECYCLE_SHIFT, + + /// The worker has been notified it should process more work. + Notified = 3 << LIFECYCLE_SHIFT, + + /// A stronger form of notification. In this case, the worker is expected to + /// wakeup and try to acquire more work... if it enters this state while + /// already busy with other work, it is expected to signal another worker. + Signaled = 4 << LIFECYCLE_SHIFT, +} + +impl State { + /// Returns true if the worker entry is pushed in the sleeper stack + pub fn is_pushed(&self) -> bool { + self.0 & PUSHED_MASK == PUSHED_MASK + } + + pub fn set_pushed(&mut self) { + self.0 |= PUSHED_MASK + } + + pub fn is_notified(&self) -> bool { + use self::Lifecycle::*; + + match self.lifecycle() { + Notified | Signaled => true, + _ => false, + } + } + + pub fn lifecycle(&self) -> Lifecycle { + Lifecycle::from(self.0 & LIFECYCLE_MASK) + } + + pub fn set_lifecycle(&mut self, val: Lifecycle) { + self.0 = (self.0 & !LIFECYCLE_MASK) | (val as usize) + } + + pub fn is_signaled(&self) -> bool { + self.lifecycle() == Lifecycle::Signaled + } + + pub fn notify(&mut self) { + use self::Lifecycle::Signaled; + + if self.lifecycle() != Signaled { + self.set_lifecycle(Signaled) + } + } +} + +impl Default for State { + fn default() -> State { + // All workers will start pushed in the sleeping stack + State(PUSHED_MASK) + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("worker::State") + .field("lifecycle", &self.lifecycle()) + .field("is_pushed", &self.is_pushed()) + .finish() + } +} + +// ===== impl Lifecycle ===== + +impl From<usize> for Lifecycle { + fn from(src: usize) -> Lifecycle { + use self::Lifecycle::*; + + debug_assert!( + src == Shutdown as usize + || src == Running as usize + || src == Sleeping as usize + || src == Notified as usize + || src == Signaled as usize + ); + + unsafe { ::std::mem::transmute(src) } + } +} + +impl From<Lifecycle> for usize { + fn from(src: Lifecycle) -> usize { + let v = src as usize; + debug_assert!(v & LIFECYCLE_MASK == v); + v + } +} + +#[cfg(test)] +mod test { + use super::Lifecycle::*; + use super::*; + + #[test] + fn lifecycle_encode() { + let lifecycles = &[Shutdown, Running, Sleeping, Notified, Signaled]; + + for &lifecycle in lifecycles { + let mut v: usize = lifecycle.into(); + v &= LIFECYCLE_MASK; + + assert_eq!(lifecycle, Lifecycle::from(v)); + } + } + + #[test] + fn lifecycle_ord() { + assert!(Running >= Shutdown); + assert!(Signaled >= Notified); + assert!(Signaled >= Sleeping); + } +} |