summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/benches/blocking.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/benches/blocking.rs')
-rw-r--r--third_party/rust/tokio-threadpool/benches/blocking.rs137
1 files changed, 137 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/benches/blocking.rs b/third_party/rust/tokio-threadpool/benches/blocking.rs
new file mode 100644
index 0000000000..bc5121545a
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/blocking.rs
@@ -0,0 +1,137 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate rand;
+extern crate test;
+extern crate threadpool;
+extern crate tokio_threadpool;
+
+const ITER: usize = 1_000;
+
+mod blocking {
+ use super::*;
+
+ use futures::future::*;
+ use tokio_threadpool::{blocking, Builder};
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!()))
+ .and_then(move |_| {
+ // Do something with the value
+ count_down.dec();
+ Ok(())
+ })
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+mod message_passing {
+ use super::*;
+
+ use futures::future::*;
+ use futures::sync::oneshot;
+ use tokio_threadpool::Builder;
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ let blocking = threadpool::ThreadPool::new(20);
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+ let blocking = blocking.clone();
+
+ pool.spawn(lazy(move || {
+ // Create a channel to receive the return value.
+ let (tx, rx) = oneshot::channel();
+
+ // Spawn a task on the blocking thread pool to process the
+ // computation.
+ blocking.execute(move || {
+ let res = perform_complex_computation();
+ tx.send(res).unwrap();
+ });
+
+ rx.and_then(move |_| {
+ count_down.dec();
+ Ok(())
+ })
+ .map_err(|_| panic!())
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+fn perform_complex_computation() -> usize {
+ use rand::*;
+
+ // Simulate a CPU heavy computation
+ let mut rng = rand::thread_rng();
+ rng.gen()
+}
+
+// Util for waiting until the tasks complete
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::*;
+use std::sync::*;
+
+struct CountDown {
+ rem: AtomicUsize,
+ mutex: Mutex<()>,
+ condvar: Condvar,
+}
+
+impl CountDown {
+ fn new(rem: usize) -> Self {
+ CountDown {
+ rem: AtomicUsize::new(rem),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ }
+ }
+
+ fn dec(&self) {
+ let prev = self.rem.fetch_sub(1, AcqRel);
+
+ if prev != 1 {
+ return;
+ }
+
+ let _lock = self.mutex.lock().unwrap();
+ self.condvar.notify_all();
+ }
+
+ fn wait(&self) {
+ let mut lock = self.mutex.lock().unwrap();
+
+ loop {
+ if self.rem.load(Acquire) == 0 {
+ return;
+ }
+
+ lock = self.condvar.wait(lock).unwrap();
+ }
+ }
+}