//! A simple crate for executing work on a thread pool, and getting back a //! future. //! //! This crate provides a simple thread pool abstraction for running work //! externally from the current thread that's running. An instance of `Future` //! is handed back to represent that the work may be done later, and further //! computations can be chained along with it as well. //! //! ```rust //! extern crate futures; //! extern crate futures_cpupool; //! //! use futures::Future; //! use futures_cpupool::CpuPool; //! //! # fn long_running_future(a: u32) -> Box + Send> { //! # Box::new(futures::future::result(Ok(a))) //! # } //! # fn main() { //! //! // Create a worker thread pool with four threads //! let pool = CpuPool::new(4); //! //! // Execute some work on the thread pool, optionally closing over data. //! let a = pool.spawn(long_running_future(2)); //! let b = pool.spawn(long_running_future(100)); //! //! // Express some further computation once the work is completed on the thread //! // pool. //! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap(); //! //! // Print out the result //! println!("{:?}", c); //! # } //! ``` #![deny(missing_docs)] #![deny(missing_debug_implementations)] extern crate futures; extern crate num_cpus; use std::panic::{self, AssertUnwindSafe}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc; use std::thread; use std::fmt; use futures::{IntoFuture, Future, Poll, Async}; use futures::future::{lazy, Executor, ExecuteError}; use futures::sync::oneshot::{channel, Sender, Receiver}; use futures::executor::{self, Run, Executor as OldExecutor}; /// A thread pool intended to run CPU intensive work. /// /// This thread pool will hand out futures representing the completed work /// that happens on the thread pool itself, and the futures can then be later /// composed with other work as part of an overall computation. /// /// The worker threads associated with a thread pool are kept alive so long as /// there is an open handle to the `CpuPool` or there is work running on them. Once /// all work has been drained and all references have gone away the worker /// threads will be shut down. /// /// Currently `CpuPool` implements `Clone` which just clones a new reference to /// the underlying thread pool. /// /// **Note:** if you use CpuPool inside a library it's better accept a /// `Builder` object for thread configuration rather than configuring just /// pool size. This not only future proof for other settings but also allows /// user to attach monitoring tools to lifecycle hooks. pub struct CpuPool { inner: Arc, } /// Thread pool configuration object /// /// Builder starts with a number of workers equal to the number /// of CPUs on the host. But you can change it until you call `create()`. pub struct Builder { pool_size: usize, stack_size: usize, name_prefix: Option, after_start: Option>, before_stop: Option>, } struct MySender { fut: F, tx: Option>, keep_running_flag: Arc, } trait AssertSendSync: Send + Sync {} impl AssertSendSync for CpuPool {} struct Inner { tx: Mutex>, rx: Mutex>, cnt: AtomicUsize, size: usize, } impl fmt::Debug for CpuPool { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("CpuPool") .field("size", &self.inner.size) .finish() } } impl fmt::Debug for Builder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Builder") .field("pool_size", &self.pool_size) .field("name_prefix", &self.name_prefix) .finish() } } /// The type of future returned from the `CpuPool::spawn` function, which /// proxies the futures running on the thread pool. /// /// This future will resolve in the same way as the underlying future, and it /// will propagate panics. #[must_use] #[derive(Debug)] pub struct CpuFuture { inner: Receiver>>, keep_running_flag: Arc, } enum Message { Run(Run), Close, } impl CpuPool { /// Creates a new thread pool with `size` worker threads associated with it. /// /// The returned handle can use `execute` to run work on this thread pool, /// and clones can be made of it to get multiple references to the same /// thread pool. /// /// This is a shortcut for: /// /// ```rust /// # use futures_cpupool::{Builder, CpuPool}; /// # /// # fn new(size: usize) -> CpuPool { /// Builder::new().pool_size(size).create() /// # } /// ``` /// /// # Panics /// /// Panics if `size == 0`. pub fn new(size: usize) -> CpuPool { Builder::new().pool_size(size).create() } /// Creates a new thread pool with a number of workers equal to the number /// of CPUs on the host. /// /// This is a shortcut for: /// /// ```rust /// # use futures_cpupool::{Builder, CpuPool}; /// # /// # fn new_num_cpus() -> CpuPool { /// Builder::new().create() /// # } /// ``` pub fn new_num_cpus() -> CpuPool { Builder::new().create() } /// Spawns a future to run on this thread pool, returning a future /// representing the produced value. /// /// This function will execute the future `f` on the associated thread /// pool, and return a future representing the finished computation. The /// returned future serves as a proxy to the computation that `F` is /// running. /// /// To simply run an arbitrary closure on a thread pool and extract the /// result, you can use the `future::lazy` combinator to defer work to /// executing on the thread pool itself. /// /// Note that if the future `f` panics it will be caught by default and the /// returned future will propagate the panic. That is, panics will not tear /// down the thread pool and will be propagated to the returned future's /// `poll` method if queried. /// /// If the returned future is dropped then this `CpuPool` will attempt to /// cancel the computation, if possible. That is, if the computation is in /// the middle of working, it will be interrupted when possible. pub fn spawn(&self, f: F) -> CpuFuture where F: Future + Send + 'static, F::Item: Send + 'static, F::Error: Send + 'static, { let (tx, rx) = channel(); let keep_running_flag = Arc::new(AtomicBool::new(false)); // AssertUnwindSafe is used here because `Send + 'static` is basically // an alias for an implementation of the `UnwindSafe` trait but we can't // express that in the standard library right now. let sender = MySender { fut: AssertUnwindSafe(f).catch_unwind(), tx: Some(tx), keep_running_flag: keep_running_flag.clone(), }; executor::spawn(sender).execute(self.inner.clone()); CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } } /// Spawns a closure on this thread pool. /// /// This function is a convenience wrapper around the `spawn` function above /// for running a closure wrapped in `future::lazy`. It will spawn the /// function `f` provided onto the thread pool, and continue to run the /// future returned by `f` on the thread pool as well. /// /// The returned future will be a handle to the result produced by the /// future that `f` returns. pub fn spawn_fn(&self, f: F) -> CpuFuture where F: FnOnce() -> R + Send + 'static, R: IntoFuture + 'static, R::Future: Send + 'static, R::Item: Send + 'static, R::Error: Send + 'static, { self.spawn(lazy(f)) } } impl Executor for CpuPool where F: Future + Send + 'static, { fn execute(&self, future: F) -> Result<(), ExecuteError> { executor::spawn(future).execute(self.inner.clone()); Ok(()) } } impl Inner { fn send(&self, msg: Message) { self.tx.lock().unwrap().send(msg).unwrap(); } fn work(&self, after_start: Option>, before_stop: Option>) { after_start.map(|fun| fun()); loop { let msg = self.rx.lock().unwrap().recv().unwrap(); match msg { Message::Run(r) => r.run(), Message::Close => break, } } before_stop.map(|fun| fun()); } } impl Clone for CpuPool { fn clone(&self) -> CpuPool { self.inner.cnt.fetch_add(1, Ordering::Relaxed); CpuPool { inner: self.inner.clone() } } } impl Drop for CpuPool { fn drop(&mut self) { if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { for _ in 0..self.inner.size { self.inner.send(Message::Close); } } } } impl OldExecutor for Inner { fn execute(&self, run: Run) { self.send(Message::Run(run)) } } impl CpuFuture { /// Drop this future without canceling the underlying future. /// /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying /// future. This function can be used when user wants to drop but keep /// executing the underlying future. pub fn forget(self) { self.keep_running_flag.store(true, Ordering::SeqCst); } } impl Future for CpuFuture { type Item = T; type Error = E; fn poll(&mut self) -> Poll { match self.inner.poll().expect("cannot poll CpuFuture twice") { Async::Ready(Ok(Ok(e))) => Ok(e.into()), Async::Ready(Ok(Err(e))) => Err(e), Async::Ready(Err(e)) => panic::resume_unwind(e), Async::NotReady => Ok(Async::NotReady), } } } impl Future for MySender> { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() { if !self.keep_running_flag.load(Ordering::SeqCst) { // Cancelled, bail out return Ok(().into()) } } let res = match self.fut.poll() { Ok(Async::Ready(e)) => Ok(e), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => Err(e), }; // if the receiving end has gone away then that's ok, we just ignore the // send error here. drop(self.tx.take().unwrap().send(res)); Ok(Async::Ready(())) } } impl Builder { /// Create a builder a number of workers equal to the number /// of CPUs on the host. pub fn new() -> Builder { Builder { pool_size: num_cpus::get(), stack_size: 0, name_prefix: None, after_start: None, before_stop: None, } } /// Set size of a future CpuPool /// /// The size of a thread pool is the number of worker threads spawned pub fn pool_size(&mut self, size: usize) -> &mut Self { self.pool_size = size; self } /// Set stack size of threads in the pool. pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { self.stack_size = stack_size; self } /// Set thread name prefix of a future CpuPool /// /// Thread name prefix is used for generating thread names. For example, if prefix is /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. pub fn name_prefix>(&mut self, name_prefix: S) -> &mut Self { self.name_prefix = Some(name_prefix.into()); self } /// Execute function `f` right after each thread is started but before /// running any jobs on it. /// /// This is initially intended for bookkeeping and monitoring uses. /// The `f` will be deconstructed after the `builder` is deconstructed /// and all threads in the pool has executed it. pub fn after_start(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static { self.after_start = Some(Arc::new(f)); self } /// Execute function `f` before each worker thread stops. /// /// This is initially intended for bookkeeping and monitoring uses. /// The `f` will be deconstructed after the `builder` is deconstructed /// and all threads in the pool has executed it. pub fn before_stop(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static { self.before_stop = Some(Arc::new(f)); self } /// Create CpuPool with configured parameters /// /// # Panics /// /// Panics if `pool_size == 0`. pub fn create(&mut self) -> CpuPool { let (tx, rx) = mpsc::channel(); let pool = CpuPool { inner: Arc::new(Inner { tx: Mutex::new(tx), rx: Mutex::new(rx), cnt: AtomicUsize::new(1), size: self.pool_size, }), }; assert!(self.pool_size > 0); for counter in 0..self.pool_size { let inner = pool.inner.clone(); let after_start = self.after_start.clone(); let before_stop = self.before_stop.clone(); let mut thread_builder = thread::Builder::new(); if let Some(ref name_prefix) = self.name_prefix { thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); } if self.stack_size > 0 { thread_builder = thread_builder.stack_size(self.stack_size); } thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap(); } return pool } } #[cfg(test)] mod tests { use super::*; use std::sync::mpsc; #[test] fn test_drop_after_start() { let (tx, rx) = mpsc::sync_channel(2); let _cpu_pool = Builder::new() .pool_size(2) .after_start(move || tx.send(1).unwrap()).create(); // After Builder is deconstructed, the tx should be droped // so that we can use rx as an iterator. let count = rx.into_iter().count(); assert_eq!(count, 2); } }