diff options
Diffstat (limited to 'third_party/rust/futures-cpupool')
-rw-r--r-- | third_party/rust/futures-cpupool/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/Cargo.toml | 32 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/LICENSE-APACHE | 201 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/LICENSE-MIT | 25 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/README.md | 36 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/src/lib.rs | 450 | ||||
-rw-r--r-- | third_party/rust/futures-cpupool/tests/smoke.rs | 110 |
7 files changed, 855 insertions, 0 deletions
diff --git a/third_party/rust/futures-cpupool/.cargo-checksum.json b/third_party/rust/futures-cpupool/.cargo-checksum.json new file mode 100644 index 0000000000..123f51fc57 --- /dev/null +++ b/third_party/rust/futures-cpupool/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"d65d12c309bb5af442353ceb79339c2d426b1ed643f5eddee14ad22637225ca2","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"69036b033e4bb951821964dbc3d9b1efe6913a6e36d9c1f206de4035a1a85cc4","README.md":"09c5f4bacff34b3f7e1969f5b9590c062a8aabac7c2442944eab1d2fc1301373","src/lib.rs":"2bffe7435a2c13028978955882338fbb9df3644f725a7e9d27b5f1495e3e9f90","tests/smoke.rs":"4c07aad02b0dd17f4723f3be1abbe320629b9e0756c885b44cbc1268141668f1"},"package":"ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"}
\ No newline at end of file diff --git a/third_party/rust/futures-cpupool/Cargo.toml b/third_party/rust/futures-cpupool/Cargo.toml new file mode 100644 index 0000000000..4bfc90332d --- /dev/null +++ b/third_party/rust/futures-cpupool/Cargo.toml @@ -0,0 +1,32 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g. crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "futures-cpupool" +version = "0.1.8" +authors = ["Alex Crichton <alex@alexcrichton.com>"] +description = "An implementation of thread pools which hand out futures to the results of the\ncomputation on the threads themselves.\n" +homepage = "https://github.com/alexcrichton/futures-rs" +documentation = "https://docs.rs/futures-cpupool" +license = "MIT/Apache-2.0" +repository = "https://github.com/alexcrichton/futures-rs" +[dependencies.futures] +version = "0.1" +features = ["use_std"] +default-features = false + +[dependencies.num_cpus] +version = "1.0" + +[features] +default = ["with-deprecated"] +with-deprecated = ["futures/with-deprecated"] diff --git a/third_party/rust/futures-cpupool/LICENSE-APACHE b/third_party/rust/futures-cpupool/LICENSE-APACHE new file mode 100644 index 0000000000..16fe87b06e --- /dev/null +++ b/third_party/rust/futures-cpupool/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/futures-cpupool/LICENSE-MIT b/third_party/rust/futures-cpupool/LICENSE-MIT new file mode 100644 index 0000000000..28e630cf40 --- /dev/null +++ b/third_party/rust/futures-cpupool/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2016 Alex Crichton + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/futures-cpupool/README.md b/third_party/rust/futures-cpupool/README.md new file mode 100644 index 0000000000..b022994fb5 --- /dev/null +++ b/third_party/rust/futures-cpupool/README.md @@ -0,0 +1,36 @@ +# futures-cpupool + +A library for creating futures representing work happening concurrently on a +dedicated thread pool. + +[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs) +[![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs) + +[Documentation](https://docs.rs/futures-cpupool) + +## Usage + +First, add this to your `Cargo.toml`: + +```toml +[dependencies] +futures = "0.1" +futures-cpupool = "0.1" +``` + +Next, add this to your crate: + +```rust +extern crate futures; +extern crate futures_cpupool; + +use futures_cpupool::CpuPool; +``` + +# License + +`futures-cpupool` is primarily distributed under the terms of both the MIT +license and the Apache License (Version 2.0), with portions covered by various +BSD-like licenses. + +See LICENSE-APACHE, and LICENSE-MIT for details. diff --git a/third_party/rust/futures-cpupool/src/lib.rs b/third_party/rust/futures-cpupool/src/lib.rs new file mode 100644 index 0000000000..0614368ba3 --- /dev/null +++ b/third_party/rust/futures-cpupool/src/lib.rs @@ -0,0 +1,450 @@ +//! A simple crate for executing work on a thread pool, and getting back a +//! future. +//! +//! This crate provides a simple thread pool abstraction for running work +//! externally from the current thread that's running. An instance of `Future` +//! is handed back to represent that the work may be done later, and further +//! computations can be chained along with it as well. +//! +//! ```rust +//! extern crate futures; +//! extern crate futures_cpupool; +//! +//! use futures::Future; +//! use futures_cpupool::CpuPool; +//! +//! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> { +//! # Box::new(futures::future::result(Ok(a))) +//! # } +//! # fn main() { +//! +//! // Create a worker thread pool with four threads +//! let pool = CpuPool::new(4); +//! +//! // Execute some work on the thread pool, optionally closing over data. +//! let a = pool.spawn(long_running_future(2)); +//! let b = pool.spawn(long_running_future(100)); +//! +//! // Express some further computation once the work is completed on the thread +//! // pool. +//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap(); +//! +//! // Print out the result +//! println!("{:?}", c); +//! # } +//! ``` + +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] + +extern crate futures; +extern crate num_cpus; + +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::thread; +use std::fmt; + +use futures::{IntoFuture, Future, Poll, Async}; +use futures::future::{lazy, Executor, ExecuteError}; +use futures::sync::oneshot::{channel, Sender, Receiver}; +use futures::executor::{self, Run, Executor as OldExecutor}; + +/// A thread pool intended to run CPU intensive work. +/// +/// This thread pool will hand out futures representing the completed work +/// that happens on the thread pool itself, and the futures can then be later +/// composed with other work as part of an overall computation. +/// +/// The worker threads associated with a thread pool are kept alive so long as +/// there is an open handle to the `CpuPool` or there is work running on them. Once +/// all work has been drained and all references have gone away the worker +/// threads will be shut down. +/// +/// Currently `CpuPool` implements `Clone` which just clones a new reference to +/// the underlying thread pool. +/// +/// **Note:** if you use CpuPool inside a library it's better accept a +/// `Builder` object for thread configuration rather than configuring just +/// pool size. This not only future proof for other settings but also allows +/// user to attach monitoring tools to lifecycle hooks. +pub struct CpuPool { + inner: Arc<Inner>, +} + +/// Thread pool configuration object +/// +/// Builder starts with a number of workers equal to the number +/// of CPUs on the host. But you can change it until you call `create()`. +pub struct Builder { + pool_size: usize, + stack_size: usize, + name_prefix: Option<String>, + after_start: Option<Arc<Fn() + Send + Sync>>, + before_stop: Option<Arc<Fn() + Send + Sync>>, +} + +struct MySender<F, T> { + fut: F, + tx: Option<Sender<T>>, + keep_running_flag: Arc<AtomicBool>, +} + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for CpuPool {} + +struct Inner { + tx: Mutex<mpsc::Sender<Message>>, + rx: Mutex<mpsc::Receiver<Message>>, + cnt: AtomicUsize, + size: usize, +} + +impl fmt::Debug for CpuPool { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("CpuPool") + .field("size", &self.inner.size) + .finish() + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Builder") + .field("pool_size", &self.pool_size) + .field("name_prefix", &self.name_prefix) + .finish() + } +} + +/// The type of future returned from the `CpuPool::spawn` function, which +/// proxies the futures running on the thread pool. +/// +/// This future will resolve in the same way as the underlying future, and it +/// will propagate panics. +#[must_use] +#[derive(Debug)] +pub struct CpuFuture<T, E> { + inner: Receiver<thread::Result<Result<T, E>>>, + keep_running_flag: Arc<AtomicBool>, +} + +enum Message { + Run(Run), + Close, +} + +impl CpuPool { + /// Creates a new thread pool with `size` worker threads associated with it. + /// + /// The returned handle can use `execute` to run work on this thread pool, + /// and clones can be made of it to get multiple references to the same + /// thread pool. + /// + /// This is a shortcut for: + /// + /// ```rust + /// # use futures_cpupool::{Builder, CpuPool}; + /// # + /// # fn new(size: usize) -> CpuPool { + /// Builder::new().pool_size(size).create() + /// # } + /// ``` + /// + /// # Panics + /// + /// Panics if `size == 0`. + pub fn new(size: usize) -> CpuPool { + Builder::new().pool_size(size).create() + } + + /// Creates a new thread pool with a number of workers equal to the number + /// of CPUs on the host. + /// + /// This is a shortcut for: + /// + /// ```rust + /// # use futures_cpupool::{Builder, CpuPool}; + /// # + /// # fn new_num_cpus() -> CpuPool { + /// Builder::new().create() + /// # } + /// ``` + pub fn new_num_cpus() -> CpuPool { + Builder::new().create() + } + + /// Spawns a future to run on this thread pool, returning a future + /// representing the produced value. + /// + /// This function will execute the future `f` on the associated thread + /// pool, and return a future representing the finished computation. The + /// returned future serves as a proxy to the computation that `F` is + /// running. + /// + /// To simply run an arbitrary closure on a thread pool and extract the + /// result, you can use the `future::lazy` combinator to defer work to + /// executing on the thread pool itself. + /// + /// Note that if the future `f` panics it will be caught by default and the + /// returned future will propagate the panic. That is, panics will not tear + /// down the thread pool and will be propagated to the returned future's + /// `poll` method if queried. + /// + /// If the returned future is dropped then this `CpuPool` will attempt to + /// cancel the computation, if possible. That is, if the computation is in + /// the middle of working, it will be interrupted when possible. + pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error> + where F: Future + Send + 'static, + F::Item: Send + 'static, + F::Error: Send + 'static, + { + let (tx, rx) = channel(); + let keep_running_flag = Arc::new(AtomicBool::new(false)); + // AssertUnwindSafe is used here because `Send + 'static` is basically + // an alias for an implementation of the `UnwindSafe` trait but we can't + // express that in the standard library right now. + let sender = MySender { + fut: AssertUnwindSafe(f).catch_unwind(), + tx: Some(tx), + keep_running_flag: keep_running_flag.clone(), + }; + executor::spawn(sender).execute(self.inner.clone()); + CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } + } + + /// Spawns a closure on this thread pool. + /// + /// This function is a convenience wrapper around the `spawn` function above + /// for running a closure wrapped in `future::lazy`. It will spawn the + /// function `f` provided onto the thread pool, and continue to run the + /// future returned by `f` on the thread pool as well. + /// + /// The returned future will be a handle to the result produced by the + /// future that `f` returns. + pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error> + where F: FnOnce() -> R + Send + 'static, + R: IntoFuture + 'static, + R::Future: Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + self.spawn(lazy(f)) + } +} + +impl<F> Executor<F> for CpuPool + where F: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + executor::spawn(future).execute(self.inner.clone()); + Ok(()) + } +} + +impl Inner { + fn send(&self, msg: Message) { + self.tx.lock().unwrap().send(msg).unwrap(); + } + + fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) { + after_start.map(|fun| fun()); + loop { + let msg = self.rx.lock().unwrap().recv().unwrap(); + match msg { + Message::Run(r) => r.run(), + Message::Close => break, + } + } + before_stop.map(|fun| fun()); + } +} + +impl Clone for CpuPool { + fn clone(&self) -> CpuPool { + self.inner.cnt.fetch_add(1, Ordering::Relaxed); + CpuPool { inner: self.inner.clone() } + } +} + +impl Drop for CpuPool { + fn drop(&mut self) { + if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { + for _ in 0..self.inner.size { + self.inner.send(Message::Close); + } + } + } +} + +impl OldExecutor for Inner { + fn execute(&self, run: Run) { + self.send(Message::Run(run)) + } +} + +impl<T, E> CpuFuture<T, E> { + /// Drop this future without canceling the underlying future. + /// + /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying + /// future. This function can be used when user wants to drop but keep + /// executing the underlying future. + pub fn forget(self) { + self.keep_running_flag.store(true, Ordering::SeqCst); + } +} + +impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + match self.inner.poll().expect("cannot poll CpuFuture twice") { + Async::Ready(Ok(Ok(e))) => Ok(e.into()), + Async::Ready(Ok(Err(e))) => Err(e), + Async::Ready(Err(e)) => panic::resume_unwind(e), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() { + if !self.keep_running_flag.load(Ordering::SeqCst) { + // Cancelled, bail out + return Ok(().into()) + } + } + + let res = match self.fut.poll() { + Ok(Async::Ready(e)) => Ok(e), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => Err(e), + }; + + // if the receiving end has gone away then that's ok, we just ignore the + // send error here. + drop(self.tx.take().unwrap().send(res)); + Ok(Async::Ready(())) + } +} + +impl Builder { + /// Create a builder a number of workers equal to the number + /// of CPUs on the host. + pub fn new() -> Builder { + Builder { + pool_size: num_cpus::get(), + stack_size: 0, + name_prefix: None, + after_start: None, + before_stop: None, + } + } + + /// Set size of a future CpuPool + /// + /// The size of a thread pool is the number of worker threads spawned + pub fn pool_size(&mut self, size: usize) -> &mut Self { + self.pool_size = size; + self + } + + /// Set stack size of threads in the pool. + pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { + self.stack_size = stack_size; + self + } + + /// Set thread name prefix of a future CpuPool + /// + /// Thread name prefix is used for generating thread names. For example, if prefix is + /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. + pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self { + self.name_prefix = Some(name_prefix.into()); + self + } + + /// Execute function `f` right after each thread is started but before + /// running any jobs on it. + /// + /// This is initially intended for bookkeeping and monitoring uses. + /// The `f` will be deconstructed after the `builder` is deconstructed + /// and all threads in the pool has executed it. + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each worker thread stops. + /// + /// This is initially intended for bookkeeping and monitoring uses. + /// The `f` will be deconstructed after the `builder` is deconstructed + /// and all threads in the pool has executed it. + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.before_stop = Some(Arc::new(f)); + self + } + + /// Create CpuPool with configured parameters + /// + /// # Panics + /// + /// Panics if `pool_size == 0`. + pub fn create(&mut self) -> CpuPool { + let (tx, rx) = mpsc::channel(); + let pool = CpuPool { + inner: Arc::new(Inner { + tx: Mutex::new(tx), + rx: Mutex::new(rx), + cnt: AtomicUsize::new(1), + size: self.pool_size, + }), + }; + assert!(self.pool_size > 0); + + for counter in 0..self.pool_size { + let inner = pool.inner.clone(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let mut thread_builder = thread::Builder::new(); + if let Some(ref name_prefix) = self.name_prefix { + thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); + } + if self.stack_size > 0 { + thread_builder = thread_builder.stack_size(self.stack_size); + } + thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap(); + } + return pool + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + + #[test] + fn test_drop_after_start() { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = Builder::new() + .pool_size(2) + .after_start(move || tx.send(1).unwrap()).create(); + + // After Builder is deconstructed, the tx should be droped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } +} diff --git a/third_party/rust/futures-cpupool/tests/smoke.rs b/third_party/rust/futures-cpupool/tests/smoke.rs new file mode 100644 index 0000000000..1b267f2f02 --- /dev/null +++ b/third_party/rust/futures-cpupool/tests/smoke.rs @@ -0,0 +1,110 @@ +extern crate futures; +extern crate futures_cpupool; + +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::thread; +use std::time::Duration; + +use futures::future::Future; +use futures_cpupool::{CpuPool, Builder}; + +fn done<T: Send + 'static>(t: T) -> Box<Future<Item = T, Error = ()> + Send> { + Box::new(futures::future::ok(t)) +} + +#[test] +fn join() { + let pool = CpuPool::new(2); + let a = pool.spawn(done(1)); + let b = pool.spawn(done(2)); + let res = a.join(b).map(|(a, b)| a + b).wait(); + + assert_eq!(res.unwrap(), 3); +} + +#[test] +fn select() { + let pool = CpuPool::new(2); + let a = pool.spawn(done(1)); + let b = pool.spawn(done(2)); + let (item1, next) = a.select(b).wait().ok().unwrap(); + let item2 = next.wait().unwrap(); + + assert!(item1 != item2); + assert!((item1 == 1 && item2 == 2) || (item1 == 2 && item2 == 1)); +} + +#[test] +fn threads_go_away() { + static CNT: AtomicUsize = ATOMIC_USIZE_INIT; + + struct A; + + impl Drop for A { + fn drop(&mut self) { + CNT.fetch_add(1, Ordering::SeqCst); + } + } + + thread_local!(static FOO: A = A); + + let pool = CpuPool::new(2); + let _handle = pool.spawn_fn(|| { + FOO.with(|_| ()); + Ok::<(), ()>(()) + }); + drop(pool); + + for _ in 0..100 { + if CNT.load(Ordering::SeqCst) == 1 { + return + } + thread::sleep(Duration::from_millis(10)); + } + panic!("thread didn't exit"); +} + +#[test] +fn lifecycle_test() { + static NUM_STARTS: AtomicUsize = ATOMIC_USIZE_INIT; + static NUM_STOPS: AtomicUsize = ATOMIC_USIZE_INIT; + + fn after_start() { + NUM_STARTS.fetch_add(1, Ordering::SeqCst); + } + + fn before_stop() { + NUM_STOPS.fetch_add(1, Ordering::SeqCst); + } + + let pool = Builder::new() + .pool_size(4) + .after_start(after_start) + .before_stop(before_stop) + .create(); + let _handle = pool.spawn_fn(|| { + Ok::<(), ()>(()) + }); + drop(pool); + + for _ in 0..100 { + if NUM_STOPS.load(Ordering::SeqCst) == 4 { + assert_eq!(NUM_STARTS.load(Ordering::SeqCst), 4); + return; + } + thread::sleep(Duration::from_millis(10)); + } + panic!("thread didn't exit"); +} + +#[test] +fn thread_name() { + let pool = Builder::new() + .name_prefix("my-pool-") + .create(); + let future = pool.spawn_fn(|| { + assert!(thread::current().name().unwrap().starts_with("my-pool-")); + Ok::<(), ()>(()) + }); + let _ = future.wait(); +} |