diff options
Diffstat (limited to 'third_party/rust/futures-cpupool/src')
-rw-r--r-- | third_party/rust/futures-cpupool/src/lib.rs | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/third_party/rust/futures-cpupool/src/lib.rs b/third_party/rust/futures-cpupool/src/lib.rs new file mode 100644 index 0000000000..0614368ba3 --- /dev/null +++ b/third_party/rust/futures-cpupool/src/lib.rs @@ -0,0 +1,450 @@ +//! A simple crate for executing work on a thread pool, and getting back a +//! future. +//! +//! This crate provides a simple thread pool abstraction for running work +//! externally from the current thread that's running. An instance of `Future` +//! is handed back to represent that the work may be done later, and further +//! computations can be chained along with it as well. +//! +//! ```rust +//! extern crate futures; +//! extern crate futures_cpupool; +//! +//! use futures::Future; +//! use futures_cpupool::CpuPool; +//! +//! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> { +//! # Box::new(futures::future::result(Ok(a))) +//! # } +//! # fn main() { +//! +//! // Create a worker thread pool with four threads +//! let pool = CpuPool::new(4); +//! +//! // Execute some work on the thread pool, optionally closing over data. +//! let a = pool.spawn(long_running_future(2)); +//! let b = pool.spawn(long_running_future(100)); +//! +//! // Express some further computation once the work is completed on the thread +//! // pool. +//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap(); +//! +//! // Print out the result +//! println!("{:?}", c); +//! # } +//! ``` + +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] + +extern crate futures; +extern crate num_cpus; + +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::thread; +use std::fmt; + +use futures::{IntoFuture, Future, Poll, Async}; +use futures::future::{lazy, Executor, ExecuteError}; +use futures::sync::oneshot::{channel, Sender, Receiver}; +use futures::executor::{self, Run, Executor as OldExecutor}; + +/// A thread pool intended to run CPU intensive work. +/// +/// This thread pool will hand out futures representing the completed work +/// that happens on the thread pool itself, and the futures can then be later +/// composed with other work as part of an overall computation. +/// +/// The worker threads associated with a thread pool are kept alive so long as +/// there is an open handle to the `CpuPool` or there is work running on them. Once +/// all work has been drained and all references have gone away the worker +/// threads will be shut down. +/// +/// Currently `CpuPool` implements `Clone` which just clones a new reference to +/// the underlying thread pool. +/// +/// **Note:** if you use CpuPool inside a library it's better accept a +/// `Builder` object for thread configuration rather than configuring just +/// pool size. This not only future proof for other settings but also allows +/// user to attach monitoring tools to lifecycle hooks. +pub struct CpuPool { + inner: Arc<Inner>, +} + +/// Thread pool configuration object +/// +/// Builder starts with a number of workers equal to the number +/// of CPUs on the host. But you can change it until you call `create()`. +pub struct Builder { + pool_size: usize, + stack_size: usize, + name_prefix: Option<String>, + after_start: Option<Arc<Fn() + Send + Sync>>, + before_stop: Option<Arc<Fn() + Send + Sync>>, +} + +struct MySender<F, T> { + fut: F, + tx: Option<Sender<T>>, + keep_running_flag: Arc<AtomicBool>, +} + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for CpuPool {} + +struct Inner { + tx: Mutex<mpsc::Sender<Message>>, + rx: Mutex<mpsc::Receiver<Message>>, + cnt: AtomicUsize, + size: usize, +} + +impl fmt::Debug for CpuPool { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("CpuPool") + .field("size", &self.inner.size) + .finish() + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Builder") + .field("pool_size", &self.pool_size) + .field("name_prefix", &self.name_prefix) + .finish() + } +} + +/// The type of future returned from the `CpuPool::spawn` function, which +/// proxies the futures running on the thread pool. +/// +/// This future will resolve in the same way as the underlying future, and it +/// will propagate panics. +#[must_use] +#[derive(Debug)] +pub struct CpuFuture<T, E> { + inner: Receiver<thread::Result<Result<T, E>>>, + keep_running_flag: Arc<AtomicBool>, +} + +enum Message { + Run(Run), + Close, +} + +impl CpuPool { + /// Creates a new thread pool with `size` worker threads associated with it. + /// + /// The returned handle can use `execute` to run work on this thread pool, + /// and clones can be made of it to get multiple references to the same + /// thread pool. + /// + /// This is a shortcut for: + /// + /// ```rust + /// # use futures_cpupool::{Builder, CpuPool}; + /// # + /// # fn new(size: usize) -> CpuPool { + /// Builder::new().pool_size(size).create() + /// # } + /// ``` + /// + /// # Panics + /// + /// Panics if `size == 0`. + pub fn new(size: usize) -> CpuPool { + Builder::new().pool_size(size).create() + } + + /// Creates a new thread pool with a number of workers equal to the number + /// of CPUs on the host. + /// + /// This is a shortcut for: + /// + /// ```rust + /// # use futures_cpupool::{Builder, CpuPool}; + /// # + /// # fn new_num_cpus() -> CpuPool { + /// Builder::new().create() + /// # } + /// ``` + pub fn new_num_cpus() -> CpuPool { + Builder::new().create() + } + + /// Spawns a future to run on this thread pool, returning a future + /// representing the produced value. + /// + /// This function will execute the future `f` on the associated thread + /// pool, and return a future representing the finished computation. The + /// returned future serves as a proxy to the computation that `F` is + /// running. + /// + /// To simply run an arbitrary closure on a thread pool and extract the + /// result, you can use the `future::lazy` combinator to defer work to + /// executing on the thread pool itself. + /// + /// Note that if the future `f` panics it will be caught by default and the + /// returned future will propagate the panic. That is, panics will not tear + /// down the thread pool and will be propagated to the returned future's + /// `poll` method if queried. + /// + /// If the returned future is dropped then this `CpuPool` will attempt to + /// cancel the computation, if possible. That is, if the computation is in + /// the middle of working, it will be interrupted when possible. + pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error> + where F: Future + Send + 'static, + F::Item: Send + 'static, + F::Error: Send + 'static, + { + let (tx, rx) = channel(); + let keep_running_flag = Arc::new(AtomicBool::new(false)); + // AssertUnwindSafe is used here because `Send + 'static` is basically + // an alias for an implementation of the `UnwindSafe` trait but we can't + // express that in the standard library right now. + let sender = MySender { + fut: AssertUnwindSafe(f).catch_unwind(), + tx: Some(tx), + keep_running_flag: keep_running_flag.clone(), + }; + executor::spawn(sender).execute(self.inner.clone()); + CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } + } + + /// Spawns a closure on this thread pool. + /// + /// This function is a convenience wrapper around the `spawn` function above + /// for running a closure wrapped in `future::lazy`. It will spawn the + /// function `f` provided onto the thread pool, and continue to run the + /// future returned by `f` on the thread pool as well. + /// + /// The returned future will be a handle to the result produced by the + /// future that `f` returns. + pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error> + where F: FnOnce() -> R + Send + 'static, + R: IntoFuture + 'static, + R::Future: Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + self.spawn(lazy(f)) + } +} + +impl<F> Executor<F> for CpuPool + where F: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + executor::spawn(future).execute(self.inner.clone()); + Ok(()) + } +} + +impl Inner { + fn send(&self, msg: Message) { + self.tx.lock().unwrap().send(msg).unwrap(); + } + + fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) { + after_start.map(|fun| fun()); + loop { + let msg = self.rx.lock().unwrap().recv().unwrap(); + match msg { + Message::Run(r) => r.run(), + Message::Close => break, + } + } + before_stop.map(|fun| fun()); + } +} + +impl Clone for CpuPool { + fn clone(&self) -> CpuPool { + self.inner.cnt.fetch_add(1, Ordering::Relaxed); + CpuPool { inner: self.inner.clone() } + } +} + +impl Drop for CpuPool { + fn drop(&mut self) { + if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { + for _ in 0..self.inner.size { + self.inner.send(Message::Close); + } + } + } +} + +impl OldExecutor for Inner { + fn execute(&self, run: Run) { + self.send(Message::Run(run)) + } +} + +impl<T, E> CpuFuture<T, E> { + /// Drop this future without canceling the underlying future. + /// + /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying + /// future. This function can be used when user wants to drop but keep + /// executing the underlying future. + pub fn forget(self) { + self.keep_running_flag.store(true, Ordering::SeqCst); + } +} + +impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + match self.inner.poll().expect("cannot poll CpuFuture twice") { + Async::Ready(Ok(Ok(e))) => Ok(e.into()), + Async::Ready(Ok(Err(e))) => Err(e), + Async::Ready(Err(e)) => panic::resume_unwind(e), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() { + if !self.keep_running_flag.load(Ordering::SeqCst) { + // Cancelled, bail out + return Ok(().into()) + } + } + + let res = match self.fut.poll() { + Ok(Async::Ready(e)) => Ok(e), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => Err(e), + }; + + // if the receiving end has gone away then that's ok, we just ignore the + // send error here. + drop(self.tx.take().unwrap().send(res)); + Ok(Async::Ready(())) + } +} + +impl Builder { + /// Create a builder a number of workers equal to the number + /// of CPUs on the host. + pub fn new() -> Builder { + Builder { + pool_size: num_cpus::get(), + stack_size: 0, + name_prefix: None, + after_start: None, + before_stop: None, + } + } + + /// Set size of a future CpuPool + /// + /// The size of a thread pool is the number of worker threads spawned + pub fn pool_size(&mut self, size: usize) -> &mut Self { + self.pool_size = size; + self + } + + /// Set stack size of threads in the pool. + pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { + self.stack_size = stack_size; + self + } + + /// Set thread name prefix of a future CpuPool + /// + /// Thread name prefix is used for generating thread names. For example, if prefix is + /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. + pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self { + self.name_prefix = Some(name_prefix.into()); + self + } + + /// Execute function `f` right after each thread is started but before + /// running any jobs on it. + /// + /// This is initially intended for bookkeeping and monitoring uses. + /// The `f` will be deconstructed after the `builder` is deconstructed + /// and all threads in the pool has executed it. + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each worker thread stops. + /// + /// This is initially intended for bookkeeping and monitoring uses. + /// The `f` will be deconstructed after the `builder` is deconstructed + /// and all threads in the pool has executed it. + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.before_stop = Some(Arc::new(f)); + self + } + + /// Create CpuPool with configured parameters + /// + /// # Panics + /// + /// Panics if `pool_size == 0`. + pub fn create(&mut self) -> CpuPool { + let (tx, rx) = mpsc::channel(); + let pool = CpuPool { + inner: Arc::new(Inner { + tx: Mutex::new(tx), + rx: Mutex::new(rx), + cnt: AtomicUsize::new(1), + size: self.pool_size, + }), + }; + assert!(self.pool_size > 0); + + for counter in 0..self.pool_size { + let inner = pool.inner.clone(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let mut thread_builder = thread::Builder::new(); + if let Some(ref name_prefix) = self.name_prefix { + thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); + } + if self.stack_size > 0 { + thread_builder = thread_builder.stack_size(self.stack_size); + } + thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap(); + } + return pool + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + + #[test] + fn test_drop_after_start() { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = Builder::new() + .pool_size(2) + .after_start(move || tx.send(1).unwrap()).create(); + + // After Builder is deconstructed, the tx should be droped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } +} |