diff options
Diffstat (limited to 'third_party/rust/tokio-threadpool/benches')
-rw-r--r-- | third_party/rust/tokio-threadpool/benches/basic.rs | 165 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/benches/blocking.rs | 137 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/benches/depth.rs | 76 |
3 files changed, 378 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/benches/basic.rs b/third_party/rust/tokio-threadpool/benches/basic.rs new file mode 100644 index 0000000000..cd1f5673c0 --- /dev/null +++ b/third_party/rust/tokio-threadpool/benches/basic.rs @@ -0,0 +1,165 @@ +#![feature(test)] + +extern crate futures; +extern crate futures_cpupool; +extern crate num_cpus; +extern crate test; +extern crate tokio_threadpool; + +const NUM_SPAWN: usize = 10_000; +const NUM_YIELD: usize = 1_000; +const TASKS_PER_CPU: usize = 50; + +mod threadpool { + use futures::{future, task, Async}; + use num_cpus; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::{mpsc, Arc}; + use test; + use tokio_threadpool::*; + + #[bench] + fn spawn_many(b: &mut test::Bencher) { + let threadpool = ThreadPool::new(); + + let (tx, rx) = mpsc::sync_channel(10); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(move || { + rem.store(super::NUM_SPAWN, SeqCst); + + for _ in 0..super::NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + threadpool.spawn(future::lazy(move || { + if 1 == rem.fetch_sub(1, SeqCst) { + tx.send(()).unwrap(); + } + + Ok(()) + })); + } + + let _ = rx.recv().unwrap(); + }); + } + + #[bench] + fn yield_many(b: &mut test::Bencher) { + let threadpool = ThreadPool::new(); + let tasks = super::TASKS_PER_CPU * num_cpus::get(); + + let (tx, rx) = mpsc::sync_channel(tasks); + + b.iter(move || { + for _ in 0..tasks { + let mut rem = super::NUM_YIELD; + let tx = tx.clone(); + + threadpool.spawn(future::poll_fn(move || { + rem -= 1; + + if rem == 0 { + tx.send(()).unwrap(); + Ok(Async::Ready(())) + } else { + // Notify the current task + task::current().notify(); + + // Not ready + Ok(Async::NotReady) + } + })); + } + + for _ in 0..tasks { + let _ = rx.recv().unwrap(); + } + }); + } +} + +// In this case, CPU pool completes the benchmark faster, but this is due to how +// CpuPool currently behaves, starving other futures. This completes the +// benchmark quickly but results in poor runtime characteristics for a thread +// pool. +// +// See rust-lang-nursery/futures-rs#617 +// +mod cpupool { + use futures::future::{self, Executor}; + use futures::{task, Async}; + use futures_cpupool::*; + use num_cpus; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::{mpsc, Arc}; + use test; + + #[bench] + fn spawn_many(b: &mut test::Bencher) { + let pool = CpuPool::new(num_cpus::get()); + + let (tx, rx) = mpsc::sync_channel(10); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(move || { + rem.store(super::NUM_SPAWN, SeqCst); + + for _ in 0..super::NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + pool.execute(future::lazy(move || { + if 1 == rem.fetch_sub(1, SeqCst) { + tx.send(()).unwrap(); + } + + Ok(()) + })) + .ok() + .unwrap(); + } + + let _ = rx.recv().unwrap(); + }); + } + + #[bench] + fn yield_many(b: &mut test::Bencher) { + let pool = CpuPool::new(num_cpus::get()); + let tasks = super::TASKS_PER_CPU * num_cpus::get(); + + let (tx, rx) = mpsc::sync_channel(tasks); + + b.iter(move || { + for _ in 0..tasks { + let mut rem = super::NUM_YIELD; + let tx = tx.clone(); + + pool.execute(future::poll_fn(move || { + rem -= 1; + + if rem == 0 { + tx.send(()).unwrap(); + Ok(Async::Ready(())) + } else { + // Notify the current task + task::current().notify(); + + // Not ready + Ok(Async::NotReady) + } + })) + .ok() + .unwrap(); + } + + for _ in 0..tasks { + let _ = rx.recv().unwrap(); + } + }); + } +} diff --git a/third_party/rust/tokio-threadpool/benches/blocking.rs b/third_party/rust/tokio-threadpool/benches/blocking.rs new file mode 100644 index 0000000000..bc5121545a --- /dev/null +++ b/third_party/rust/tokio-threadpool/benches/blocking.rs @@ -0,0 +1,137 @@ +#![feature(test)] + +extern crate futures; +extern crate rand; +extern crate test; +extern crate threadpool; +extern crate tokio_threadpool; + +const ITER: usize = 1_000; + +mod blocking { + use super::*; + + use futures::future::*; + use tokio_threadpool::{blocking, Builder}; + + #[bench] + fn cpu_bound(b: &mut test::Bencher) { + let pool = Builder::new().pool_size(2).max_blocking(20).build(); + + b.iter(|| { + let count_down = Arc::new(CountDown::new(::ITER)); + + for _ in 0..::ITER { + let count_down = count_down.clone(); + + pool.spawn(lazy(move || { + poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!())) + .and_then(move |_| { + // Do something with the value + count_down.dec(); + Ok(()) + }) + })); + } + + count_down.wait(); + }) + } +} + +mod message_passing { + use super::*; + + use futures::future::*; + use futures::sync::oneshot; + use tokio_threadpool::Builder; + + #[bench] + fn cpu_bound(b: &mut test::Bencher) { + let pool = Builder::new().pool_size(2).max_blocking(20).build(); + + let blocking = threadpool::ThreadPool::new(20); + + b.iter(|| { + let count_down = Arc::new(CountDown::new(::ITER)); + + for _ in 0..::ITER { + let count_down = count_down.clone(); + let blocking = blocking.clone(); + + pool.spawn(lazy(move || { + // Create a channel to receive the return value. + let (tx, rx) = oneshot::channel(); + + // Spawn a task on the blocking thread pool to process the + // computation. + blocking.execute(move || { + let res = perform_complex_computation(); + tx.send(res).unwrap(); + }); + + rx.and_then(move |_| { + count_down.dec(); + Ok(()) + }) + .map_err(|_| panic!()) + })); + } + + count_down.wait(); + }) + } +} + +fn perform_complex_computation() -> usize { + use rand::*; + + // Simulate a CPU heavy computation + let mut rng = rand::thread_rng(); + rng.gen() +} + +// Util for waiting until the tasks complete + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::*; +use std::sync::*; + +struct CountDown { + rem: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +impl CountDown { + fn new(rem: usize) -> Self { + CountDown { + rem: AtomicUsize::new(rem), + mutex: Mutex::new(()), + condvar: Condvar::new(), + } + } + + fn dec(&self) { + let prev = self.rem.fetch_sub(1, AcqRel); + + if prev != 1 { + return; + } + + let _lock = self.mutex.lock().unwrap(); + self.condvar.notify_all(); + } + + fn wait(&self) { + let mut lock = self.mutex.lock().unwrap(); + + loop { + if self.rem.load(Acquire) == 0 { + return; + } + + lock = self.condvar.wait(lock).unwrap(); + } + } +} diff --git a/third_party/rust/tokio-threadpool/benches/depth.rs b/third_party/rust/tokio-threadpool/benches/depth.rs new file mode 100644 index 0000000000..b4f2bcb095 --- /dev/null +++ b/third_party/rust/tokio-threadpool/benches/depth.rs @@ -0,0 +1,76 @@ +#![feature(test)] + +extern crate futures; +extern crate futures_cpupool; +extern crate num_cpus; +extern crate test; +extern crate tokio_threadpool; + +const ITER: usize = 20_000; + +mod us { + use futures::future; + use std::sync::mpsc; + use test; + use tokio_threadpool::*; + + #[bench] + fn chained_spawn(b: &mut test::Bencher) { + let threadpool = ThreadPool::new(); + + fn spawn(pool_tx: Sender, res_tx: mpsc::Sender<()>, n: usize) { + if n == 0 { + res_tx.send(()).unwrap(); + } else { + let pool_tx2 = pool_tx.clone(); + pool_tx + .spawn(future::lazy(move || { + spawn(pool_tx2, res_tx, n - 1); + Ok(()) + })) + .unwrap(); + } + } + + b.iter(move || { + let (res_tx, res_rx) = mpsc::channel(); + + spawn(threadpool.sender().clone(), res_tx, super::ITER); + res_rx.recv().unwrap(); + }); + } +} + +mod cpupool { + use futures::future::{self, Executor}; + use futures_cpupool::*; + use num_cpus; + use std::sync::mpsc; + use test; + + #[bench] + fn chained_spawn(b: &mut test::Bencher) { + let pool = CpuPool::new(num_cpus::get()); + + fn spawn(pool: CpuPool, res_tx: mpsc::Sender<()>, n: usize) { + if n == 0 { + res_tx.send(()).unwrap(); + } else { + let pool2 = pool.clone(); + pool.execute(future::lazy(move || { + spawn(pool2, res_tx, n - 1); + Ok(()) + })) + .ok() + .unwrap(); + } + } + + b.iter(move || { + let (res_tx, res_rx) = mpsc::channel(); + + spawn(pool.clone(), res_tx, super::ITER); + res_rx.recv().unwrap(); + }); + } +} |