summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-cpupool/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/futures-cpupool/src/lib.rs450
1 files changed, 450 insertions, 0 deletions
diff --git a/third_party/rust/futures-cpupool/src/lib.rs b/third_party/rust/futures-cpupool/src/lib.rs
new file mode 100644
index 0000000000..0614368ba3
--- /dev/null
+++ b/third_party/rust/futures-cpupool/src/lib.rs
@@ -0,0 +1,450 @@
+//! A simple crate for executing work on a thread pool, and getting back a
+//! future.
+//!
+//! This crate provides a simple thread pool abstraction for running work
+//! externally from the current thread that's running. An instance of `Future`
+//! is handed back to represent that the work may be done later, and further
+//! computations can be chained along with it as well.
+//!
+//! ```rust
+//! extern crate futures;
+//! extern crate futures_cpupool;
+//!
+//! use futures::Future;
+//! use futures_cpupool::CpuPool;
+//!
+//! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> {
+//! # Box::new(futures::future::result(Ok(a)))
+//! # }
+//! # fn main() {
+//!
+//! // Create a worker thread pool with four threads
+//! let pool = CpuPool::new(4);
+//!
+//! // Execute some work on the thread pool, optionally closing over data.
+//! let a = pool.spawn(long_running_future(2));
+//! let b = pool.spawn(long_running_future(100));
+//!
+//! // Express some further computation once the work is completed on the thread
+//! // pool.
+//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap();
+//!
+//! // Print out the result
+//! println!("{:?}", c);
+//! # }
+//! ```
+
+#![deny(missing_docs)]
+#![deny(missing_debug_implementations)]
+
+extern crate futures;
+extern crate num_cpus;
+
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::mpsc;
+use std::thread;
+use std::fmt;
+
+use futures::{IntoFuture, Future, Poll, Async};
+use futures::future::{lazy, Executor, ExecuteError};
+use futures::sync::oneshot::{channel, Sender, Receiver};
+use futures::executor::{self, Run, Executor as OldExecutor};
+
+/// A thread pool intended to run CPU intensive work.
+///
+/// This thread pool will hand out futures representing the completed work
+/// that happens on the thread pool itself, and the futures can then be later
+/// composed with other work as part of an overall computation.
+///
+/// The worker threads associated with a thread pool are kept alive so long as
+/// there is an open handle to the `CpuPool` or there is work running on them. Once
+/// all work has been drained and all references have gone away the worker
+/// threads will be shut down.
+///
+/// Currently `CpuPool` implements `Clone` which just clones a new reference to
+/// the underlying thread pool.
+///
+/// **Note:** if you use CpuPool inside a library it's better accept a
+/// `Builder` object for thread configuration rather than configuring just
+/// pool size. This not only future proof for other settings but also allows
+/// user to attach monitoring tools to lifecycle hooks.
+pub struct CpuPool {
+ inner: Arc<Inner>,
+}
+
+/// Thread pool configuration object
+///
+/// Builder starts with a number of workers equal to the number
+/// of CPUs on the host. But you can change it until you call `create()`.
+pub struct Builder {
+ pool_size: usize,
+ stack_size: usize,
+ name_prefix: Option<String>,
+ after_start: Option<Arc<Fn() + Send + Sync>>,
+ before_stop: Option<Arc<Fn() + Send + Sync>>,
+}
+
+struct MySender<F, T> {
+ fut: F,
+ tx: Option<Sender<T>>,
+ keep_running_flag: Arc<AtomicBool>,
+}
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for CpuPool {}
+
+struct Inner {
+ tx: Mutex<mpsc::Sender<Message>>,
+ rx: Mutex<mpsc::Receiver<Message>>,
+ cnt: AtomicUsize,
+ size: usize,
+}
+
+impl fmt::Debug for CpuPool {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("CpuPool")
+ .field("size", &self.inner.size)
+ .finish()
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Builder")
+ .field("pool_size", &self.pool_size)
+ .field("name_prefix", &self.name_prefix)
+ .finish()
+ }
+}
+
+/// The type of future returned from the `CpuPool::spawn` function, which
+/// proxies the futures running on the thread pool.
+///
+/// This future will resolve in the same way as the underlying future, and it
+/// will propagate panics.
+#[must_use]
+#[derive(Debug)]
+pub struct CpuFuture<T, E> {
+ inner: Receiver<thread::Result<Result<T, E>>>,
+ keep_running_flag: Arc<AtomicBool>,
+}
+
+enum Message {
+ Run(Run),
+ Close,
+}
+
+impl CpuPool {
+ /// Creates a new thread pool with `size` worker threads associated with it.
+ ///
+ /// The returned handle can use `execute` to run work on this thread pool,
+ /// and clones can be made of it to get multiple references to the same
+ /// thread pool.
+ ///
+ /// This is a shortcut for:
+ ///
+ /// ```rust
+ /// # use futures_cpupool::{Builder, CpuPool};
+ /// #
+ /// # fn new(size: usize) -> CpuPool {
+ /// Builder::new().pool_size(size).create()
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if `size == 0`.
+ pub fn new(size: usize) -> CpuPool {
+ Builder::new().pool_size(size).create()
+ }
+
+ /// Creates a new thread pool with a number of workers equal to the number
+ /// of CPUs on the host.
+ ///
+ /// This is a shortcut for:
+ ///
+ /// ```rust
+ /// # use futures_cpupool::{Builder, CpuPool};
+ /// #
+ /// # fn new_num_cpus() -> CpuPool {
+ /// Builder::new().create()
+ /// # }
+ /// ```
+ pub fn new_num_cpus() -> CpuPool {
+ Builder::new().create()
+ }
+
+ /// Spawns a future to run on this thread pool, returning a future
+ /// representing the produced value.
+ ///
+ /// This function will execute the future `f` on the associated thread
+ /// pool, and return a future representing the finished computation. The
+ /// returned future serves as a proxy to the computation that `F` is
+ /// running.
+ ///
+ /// To simply run an arbitrary closure on a thread pool and extract the
+ /// result, you can use the `future::lazy` combinator to defer work to
+ /// executing on the thread pool itself.
+ ///
+ /// Note that if the future `f` panics it will be caught by default and the
+ /// returned future will propagate the panic. That is, panics will not tear
+ /// down the thread pool and will be propagated to the returned future's
+ /// `poll` method if queried.
+ ///
+ /// If the returned future is dropped then this `CpuPool` will attempt to
+ /// cancel the computation, if possible. That is, if the computation is in
+ /// the middle of working, it will be interrupted when possible.
+ pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error>
+ where F: Future + Send + 'static,
+ F::Item: Send + 'static,
+ F::Error: Send + 'static,
+ {
+ let (tx, rx) = channel();
+ let keep_running_flag = Arc::new(AtomicBool::new(false));
+ // AssertUnwindSafe is used here because `Send + 'static` is basically
+ // an alias for an implementation of the `UnwindSafe` trait but we can't
+ // express that in the standard library right now.
+ let sender = MySender {
+ fut: AssertUnwindSafe(f).catch_unwind(),
+ tx: Some(tx),
+ keep_running_flag: keep_running_flag.clone(),
+ };
+ executor::spawn(sender).execute(self.inner.clone());
+ CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() }
+ }
+
+ /// Spawns a closure on this thread pool.
+ ///
+ /// This function is a convenience wrapper around the `spawn` function above
+ /// for running a closure wrapped in `future::lazy`. It will spawn the
+ /// function `f` provided onto the thread pool, and continue to run the
+ /// future returned by `f` on the thread pool as well.
+ ///
+ /// The returned future will be a handle to the result produced by the
+ /// future that `f` returns.
+ pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error>
+ where F: FnOnce() -> R + Send + 'static,
+ R: IntoFuture + 'static,
+ R::Future: Send + 'static,
+ R::Item: Send + 'static,
+ R::Error: Send + 'static,
+ {
+ self.spawn(lazy(f))
+ }
+}
+
+impl<F> Executor<F> for CpuPool
+ where F: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ executor::spawn(future).execute(self.inner.clone());
+ Ok(())
+ }
+}
+
+impl Inner {
+ fn send(&self, msg: Message) {
+ self.tx.lock().unwrap().send(msg).unwrap();
+ }
+
+ fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) {
+ after_start.map(|fun| fun());
+ loop {
+ let msg = self.rx.lock().unwrap().recv().unwrap();
+ match msg {
+ Message::Run(r) => r.run(),
+ Message::Close => break,
+ }
+ }
+ before_stop.map(|fun| fun());
+ }
+}
+
+impl Clone for CpuPool {
+ fn clone(&self) -> CpuPool {
+ self.inner.cnt.fetch_add(1, Ordering::Relaxed);
+ CpuPool { inner: self.inner.clone() }
+ }
+}
+
+impl Drop for CpuPool {
+ fn drop(&mut self) {
+ if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
+ for _ in 0..self.inner.size {
+ self.inner.send(Message::Close);
+ }
+ }
+ }
+}
+
+impl OldExecutor for Inner {
+ fn execute(&self, run: Run) {
+ self.send(Message::Run(run))
+ }
+}
+
+impl<T, E> CpuFuture<T, E> {
+ /// Drop this future without canceling the underlying future.
+ ///
+ /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying
+ /// future. This function can be used when user wants to drop but keep
+ /// executing the underlying future.
+ pub fn forget(self) {
+ self.keep_running_flag.store(true, Ordering::SeqCst);
+ }
+}
+
+impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<T, E> {
+ match self.inner.poll().expect("cannot poll CpuFuture twice") {
+ Async::Ready(Ok(Ok(e))) => Ok(e.into()),
+ Async::Ready(Ok(Err(e))) => Err(e),
+ Async::Ready(Err(e)) => panic::resume_unwind(e),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
+ if !self.keep_running_flag.load(Ordering::SeqCst) {
+ // Cancelled, bail out
+ return Ok(().into())
+ }
+ }
+
+ let res = match self.fut.poll() {
+ Ok(Async::Ready(e)) => Ok(e),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => Err(e),
+ };
+
+ // if the receiving end has gone away then that's ok, we just ignore the
+ // send error here.
+ drop(self.tx.take().unwrap().send(res));
+ Ok(Async::Ready(()))
+ }
+}
+
+impl Builder {
+ /// Create a builder a number of workers equal to the number
+ /// of CPUs on the host.
+ pub fn new() -> Builder {
+ Builder {
+ pool_size: num_cpus::get(),
+ stack_size: 0,
+ name_prefix: None,
+ after_start: None,
+ before_stop: None,
+ }
+ }
+
+ /// Set size of a future CpuPool
+ ///
+ /// The size of a thread pool is the number of worker threads spawned
+ pub fn pool_size(&mut self, size: usize) -> &mut Self {
+ self.pool_size = size;
+ self
+ }
+
+ /// Set stack size of threads in the pool.
+ pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
+ self.stack_size = stack_size;
+ self
+ }
+
+ /// Set thread name prefix of a future CpuPool
+ ///
+ /// Thread name prefix is used for generating thread names. For example, if prefix is
+ /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
+ pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
+ self.name_prefix = Some(name_prefix.into());
+ self
+ }
+
+ /// Execute function `f` right after each thread is started but before
+ /// running any jobs on it.
+ ///
+ /// This is initially intended for bookkeeping and monitoring uses.
+ /// The `f` will be deconstructed after the `builder` is deconstructed
+ /// and all threads in the pool has executed it.
+ pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ where F: Fn() + Send + Sync + 'static
+ {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute function `f` before each worker thread stops.
+ ///
+ /// This is initially intended for bookkeeping and monitoring uses.
+ /// The `f` will be deconstructed after the `builder` is deconstructed
+ /// and all threads in the pool has executed it.
+ pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ where F: Fn() + Send + Sync + 'static
+ {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Create CpuPool with configured parameters
+ ///
+ /// # Panics
+ ///
+ /// Panics if `pool_size == 0`.
+ pub fn create(&mut self) -> CpuPool {
+ let (tx, rx) = mpsc::channel();
+ let pool = CpuPool {
+ inner: Arc::new(Inner {
+ tx: Mutex::new(tx),
+ rx: Mutex::new(rx),
+ cnt: AtomicUsize::new(1),
+ size: self.pool_size,
+ }),
+ };
+ assert!(self.pool_size > 0);
+
+ for counter in 0..self.pool_size {
+ let inner = pool.inner.clone();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
+ let mut thread_builder = thread::Builder::new();
+ if let Some(ref name_prefix) = self.name_prefix {
+ thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
+ }
+ if self.stack_size > 0 {
+ thread_builder = thread_builder.stack_size(self.stack_size);
+ }
+ thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap();
+ }
+ return pool
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::mpsc;
+
+ #[test]
+ fn test_drop_after_start() {
+ let (tx, rx) = mpsc::sync_channel(2);
+ let _cpu_pool = Builder::new()
+ .pool_size(2)
+ .after_start(move || tx.send(1).unwrap()).create();
+
+ // After Builder is deconstructed, the tx should be droped
+ // so that we can use rx as an iterator.
+ let count = rx.into_iter().count();
+ assert_eq!(count, 2);
+ }
+}