summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/benches
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/benches')
-rw-r--r--third_party/rust/tokio-threadpool/benches/basic.rs165
-rw-r--r--third_party/rust/tokio-threadpool/benches/blocking.rs137
-rw-r--r--third_party/rust/tokio-threadpool/benches/depth.rs76
3 files changed, 378 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/benches/basic.rs b/third_party/rust/tokio-threadpool/benches/basic.rs
new file mode 100644
index 0000000000..cd1f5673c0
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/basic.rs
@@ -0,0 +1,165 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate futures_cpupool;
+extern crate num_cpus;
+extern crate test;
+extern crate tokio_threadpool;
+
+const NUM_SPAWN: usize = 10_000;
+const NUM_YIELD: usize = 1_000;
+const TASKS_PER_CPU: usize = 50;
+
+mod threadpool {
+ use futures::{future, task, Async};
+ use num_cpus;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
+ use tokio_threadpool::*;
+
+ #[bench]
+ fn spawn_many(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+
+ let (tx, rx) = mpsc::sync_channel(10);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(move || {
+ rem.store(super::NUM_SPAWN, SeqCst);
+
+ for _ in 0..super::NUM_SPAWN {
+ let tx = tx.clone();
+ let rem = rem.clone();
+
+ threadpool.spawn(future::lazy(move || {
+ if 1 == rem.fetch_sub(1, SeqCst) {
+ tx.send(()).unwrap();
+ }
+
+ Ok(())
+ }));
+ }
+
+ let _ = rx.recv().unwrap();
+ });
+ }
+
+ #[bench]
+ fn yield_many(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+ let tasks = super::TASKS_PER_CPU * num_cpus::get();
+
+ let (tx, rx) = mpsc::sync_channel(tasks);
+
+ b.iter(move || {
+ for _ in 0..tasks {
+ let mut rem = super::NUM_YIELD;
+ let tx = tx.clone();
+
+ threadpool.spawn(future::poll_fn(move || {
+ rem -= 1;
+
+ if rem == 0 {
+ tx.send(()).unwrap();
+ Ok(Async::Ready(()))
+ } else {
+ // Notify the current task
+ task::current().notify();
+
+ // Not ready
+ Ok(Async::NotReady)
+ }
+ }));
+ }
+
+ for _ in 0..tasks {
+ let _ = rx.recv().unwrap();
+ }
+ });
+ }
+}
+
+// In this case, CPU pool completes the benchmark faster, but this is due to how
+// CpuPool currently behaves, starving other futures. This completes the
+// benchmark quickly but results in poor runtime characteristics for a thread
+// pool.
+//
+// See rust-lang-nursery/futures-rs#617
+//
+mod cpupool {
+ use futures::future::{self, Executor};
+ use futures::{task, Async};
+ use futures_cpupool::*;
+ use num_cpus;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
+
+ #[bench]
+ fn spawn_many(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+
+ let (tx, rx) = mpsc::sync_channel(10);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(move || {
+ rem.store(super::NUM_SPAWN, SeqCst);
+
+ for _ in 0..super::NUM_SPAWN {
+ let tx = tx.clone();
+ let rem = rem.clone();
+
+ pool.execute(future::lazy(move || {
+ if 1 == rem.fetch_sub(1, SeqCst) {
+ tx.send(()).unwrap();
+ }
+
+ Ok(())
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let _ = rx.recv().unwrap();
+ });
+ }
+
+ #[bench]
+ fn yield_many(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+ let tasks = super::TASKS_PER_CPU * num_cpus::get();
+
+ let (tx, rx) = mpsc::sync_channel(tasks);
+
+ b.iter(move || {
+ for _ in 0..tasks {
+ let mut rem = super::NUM_YIELD;
+ let tx = tx.clone();
+
+ pool.execute(future::poll_fn(move || {
+ rem -= 1;
+
+ if rem == 0 {
+ tx.send(()).unwrap();
+ Ok(Async::Ready(()))
+ } else {
+ // Notify the current task
+ task::current().notify();
+
+ // Not ready
+ Ok(Async::NotReady)
+ }
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ for _ in 0..tasks {
+ let _ = rx.recv().unwrap();
+ }
+ });
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/benches/blocking.rs b/third_party/rust/tokio-threadpool/benches/blocking.rs
new file mode 100644
index 0000000000..bc5121545a
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/blocking.rs
@@ -0,0 +1,137 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate rand;
+extern crate test;
+extern crate threadpool;
+extern crate tokio_threadpool;
+
+const ITER: usize = 1_000;
+
+mod blocking {
+ use super::*;
+
+ use futures::future::*;
+ use tokio_threadpool::{blocking, Builder};
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!()))
+ .and_then(move |_| {
+ // Do something with the value
+ count_down.dec();
+ Ok(())
+ })
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+mod message_passing {
+ use super::*;
+
+ use futures::future::*;
+ use futures::sync::oneshot;
+ use tokio_threadpool::Builder;
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ let blocking = threadpool::ThreadPool::new(20);
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+ let blocking = blocking.clone();
+
+ pool.spawn(lazy(move || {
+ // Create a channel to receive the return value.
+ let (tx, rx) = oneshot::channel();
+
+ // Spawn a task on the blocking thread pool to process the
+ // computation.
+ blocking.execute(move || {
+ let res = perform_complex_computation();
+ tx.send(res).unwrap();
+ });
+
+ rx.and_then(move |_| {
+ count_down.dec();
+ Ok(())
+ })
+ .map_err(|_| panic!())
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+fn perform_complex_computation() -> usize {
+ use rand::*;
+
+ // Simulate a CPU heavy computation
+ let mut rng = rand::thread_rng();
+ rng.gen()
+}
+
+// Util for waiting until the tasks complete
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::*;
+use std::sync::*;
+
+struct CountDown {
+ rem: AtomicUsize,
+ mutex: Mutex<()>,
+ condvar: Condvar,
+}
+
+impl CountDown {
+ fn new(rem: usize) -> Self {
+ CountDown {
+ rem: AtomicUsize::new(rem),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ }
+ }
+
+ fn dec(&self) {
+ let prev = self.rem.fetch_sub(1, AcqRel);
+
+ if prev != 1 {
+ return;
+ }
+
+ let _lock = self.mutex.lock().unwrap();
+ self.condvar.notify_all();
+ }
+
+ fn wait(&self) {
+ let mut lock = self.mutex.lock().unwrap();
+
+ loop {
+ if self.rem.load(Acquire) == 0 {
+ return;
+ }
+
+ lock = self.condvar.wait(lock).unwrap();
+ }
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/benches/depth.rs b/third_party/rust/tokio-threadpool/benches/depth.rs
new file mode 100644
index 0000000000..b4f2bcb095
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/depth.rs
@@ -0,0 +1,76 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate futures_cpupool;
+extern crate num_cpus;
+extern crate test;
+extern crate tokio_threadpool;
+
+const ITER: usize = 20_000;
+
+mod us {
+ use futures::future;
+ use std::sync::mpsc;
+ use test;
+ use tokio_threadpool::*;
+
+ #[bench]
+ fn chained_spawn(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+
+ fn spawn(pool_tx: Sender, res_tx: mpsc::Sender<()>, n: usize) {
+ if n == 0 {
+ res_tx.send(()).unwrap();
+ } else {
+ let pool_tx2 = pool_tx.clone();
+ pool_tx
+ .spawn(future::lazy(move || {
+ spawn(pool_tx2, res_tx, n - 1);
+ Ok(())
+ }))
+ .unwrap();
+ }
+ }
+
+ b.iter(move || {
+ let (res_tx, res_rx) = mpsc::channel();
+
+ spawn(threadpool.sender().clone(), res_tx, super::ITER);
+ res_rx.recv().unwrap();
+ });
+ }
+}
+
+mod cpupool {
+ use futures::future::{self, Executor};
+ use futures_cpupool::*;
+ use num_cpus;
+ use std::sync::mpsc;
+ use test;
+
+ #[bench]
+ fn chained_spawn(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+
+ fn spawn(pool: CpuPool, res_tx: mpsc::Sender<()>, n: usize) {
+ if n == 0 {
+ res_tx.send(()).unwrap();
+ } else {
+ let pool2 = pool.clone();
+ pool.execute(future::lazy(move || {
+ spawn(pool2, res_tx, n - 1);
+ Ok(())
+ }))
+ .ok()
+ .unwrap();
+ }
+ }
+
+ b.iter(move || {
+ let (res_tx, res_rx) = mpsc::channel();
+
+ spawn(pool.clone(), res_tx, super::ITER);
+ res_rx.recv().unwrap();
+ });
+ }
+}