diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/benches')
-rw-r--r-- | third_party/rust/futures-0.1.31/benches/bilock.rs | 121 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/benches/futures_unordered.rs | 43 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/benches/poll.rs | 72 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/benches/sync_mpsc.rs | 168 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/benches/thread_notify.rs | 114 |
5 files changed, 518 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/benches/bilock.rs b/third_party/rust/futures-0.1.31/benches/bilock.rs new file mode 100644 index 0000000000..0f840289ab --- /dev/null +++ b/third_party/rust/futures-0.1.31/benches/bilock.rs @@ -0,0 +1,121 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::{Async, Poll}; +use futures::executor; +use futures::executor::{Notify, NotifyHandle}; +use futures::sync::BiLock; +use futures::sync::BiLockAcquire; +use futures::sync::BiLockAcquired; +use futures::future::Future; +use futures::stream::Stream; + + +use test::Bencher; + +fn notify_noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP : &'static Noop = &Noop; + + NotifyHandle::from(NOOP) +} + + +/// Pseudo-stream which simply calls `lock.poll()` on `poll` +struct LockStream { + lock: BiLockAcquire<u32>, +} + +impl LockStream { + fn new(lock: BiLock<u32>) -> LockStream { + LockStream { + lock: lock.lock() + } + } + + /// Release a lock after it was acquired in `poll`, + /// so `poll` could be called again. + fn release_lock(&mut self, guard: BiLockAcquired<u32>) { + self.lock = guard.unlock().lock() + } +} + +impl Stream for LockStream { + type Item = BiLockAcquired<u32>; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.lock.poll().map(|a| match a { + Async::Ready(a) => Async::Ready(Some(a)), + Async::NotReady => Async::NotReady, + }) + } +} + + +#[bench] +fn contended(b: &mut Bencher) { + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = executor::spawn(LockStream::new(x)); + let mut y = executor::spawn(LockStream::new(y)); + + for _ in 0..1000 { + let x_guard = match x.poll_stream_notify(¬ify_noop(), 11) { + Ok(Async::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_stream_notify(¬ify_noop(), 11) { + Ok(Async::NotReady) => (), + _ => panic!(), + }; + + x.get_mut().release_lock(x_guard); + + let y_guard = match y.poll_stream_notify(¬ify_noop(), 11) { + Ok(Async::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.get_mut().release_lock(y_guard); + } + (x, y) + }); +} + +#[bench] +fn lock_unlock(b: &mut Bencher) { + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = executor::spawn(LockStream::new(x)); + let mut y = executor::spawn(LockStream::new(y)); + + for _ in 0..1000 { + let x_guard = match x.poll_stream_notify(¬ify_noop(), 11) { + Ok(Async::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + x.get_mut().release_lock(x_guard); + + let y_guard = match y.poll_stream_notify(¬ify_noop(), 11) { + Ok(Async::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.get_mut().release_lock(y_guard); + } + (x, y) + }) +} diff --git a/third_party/rust/futures-0.1.31/benches/futures_unordered.rs b/third_party/rust/futures-0.1.31/benches/futures_unordered.rs new file mode 100644 index 0000000000..c922df5541 --- /dev/null +++ b/third_party/rust/futures-0.1.31/benches/futures_unordered.rs @@ -0,0 +1,43 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::*; +use futures::stream::FuturesUnordered; +use futures::sync::oneshot; + +use test::Bencher; + +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshots(b: &mut Bencher) { + const NUM: usize = 10_000; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(NUM); + let mut rxs = FuturesUnordered::new(); + + for _ in 0..NUM { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + while let Some(tx) = txs.pop_front() { + let _ = tx.send("hello"); + } + }); + + future::lazy(move || { + loop { + if let Ok(Async::Ready(None)) = rxs.poll() { + return Ok::<(), ()>(()); + } + } + }).wait().unwrap(); + }); +} diff --git a/third_party/rust/futures-0.1.31/benches/poll.rs b/third_party/rust/futures-0.1.31/benches/poll.rs new file mode 100644 index 0000000000..1fec653fa6 --- /dev/null +++ b/third_party/rust/futures-0.1.31/benches/poll.rs @@ -0,0 +1,72 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::*; +use futures::executor::{Notify, NotifyHandle}; +use futures::task::Task; + +use test::Bencher; + +fn notify_noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP : &'static Noop = &Noop; + + NotifyHandle::from(NOOP) +} + +#[bench] +fn task_init(b: &mut Bencher) { + const NUM: u32 = 100_000; + + struct MyFuture { + num: u32, + task: Option<Task>, + }; + + impl Future for MyFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.num == NUM { + Ok(Async::Ready(())) + } else { + self.num += 1; + + if let Some(ref t) = self.task { + if t.will_notify_current() { + t.notify(); + return Ok(Async::NotReady); + } + } + + let t = task::current(); + t.notify(); + self.task = Some(t); + + Ok(Async::NotReady) + } + } + } + + let notify = notify_noop(); + + let mut fut = executor::spawn(MyFuture { + num: 0, + task: None, + }); + + b.iter(|| { + fut.get_mut().num = 0; + + while let Ok(Async::NotReady) = fut.poll_future_notify(¬ify, 0) { + } + }); +} diff --git a/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs b/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs new file mode 100644 index 0000000000..c0365c5fed --- /dev/null +++ b/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs @@ -0,0 +1,168 @@ +#![feature(test)] + +#[macro_use] +extern crate futures; +extern crate test; + +use futures::{Async, Poll, AsyncSink}; +use futures::executor; +use futures::executor::{Notify, NotifyHandle}; + +use futures::sink::Sink; +use futures::stream::Stream; + +use futures::sync::mpsc::unbounded; +use futures::sync::mpsc::channel; +use futures::sync::mpsc::Sender; +use futures::sync::mpsc::UnboundedSender; + + +use test::Bencher; + +fn notify_noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP : &'static Noop = &Noop; + + NotifyHandle::from(NOOP) +} + +/// Single producer, single consumer +#[bench] +fn unbounded_1_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = unbounded(); + + let mut rx = executor::spawn(rx); + + // 1000 iterations to avoid measuring overhead of initialization + // Result should be divided by 1000 + for i in 0..1000 { + + // Poll, not ready, park + assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1)); + + UnboundedSender::unbounded_send(&tx, i).unwrap(); + + // Now poll ready + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn unbounded_100_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = unbounded(); + + let mut rx = executor::spawn(rx); + + let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); + + // 1000 send/recv operations total, result should be divided by 1000 + for _ in 0..10 { + for i in 0..tx.len() { + assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1)); + + UnboundedSender::unbounded_send(&tx[i], i).unwrap(); + + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + } + }) +} + +#[bench] +fn unbounded_uncontended(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = unbounded(); + + for i in 0..1000 { + UnboundedSender::unbounded_send(&tx, i).expect("send"); + // No need to create a task, because poll is not going to park. + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); + } + }) +} + + +/// A Stream that continuously sends incrementing number of the queue +struct TestSender { + tx: Sender<u32>, + last: u32, // Last number sent +} + +// Could be a Future, it doesn't matter +impl Stream for TestSender { + type Item = u32; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + match self.tx.start_send(self.last + 1) { + Err(_) => panic!(), + Ok(AsyncSink::Ready) => { + self.last += 1; + Ok(Async::Ready(Some(self.last))) + } + Ok(AsyncSink::NotReady(_)) => { + Ok(Async::NotReady) + } + } + } +} + + +/// Single producers, single consumer +#[bench] +fn bounded_1_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = channel(0); + + let mut tx = executor::spawn(TestSender { + tx: tx, + last: 0, + }); + + let mut rx = executor::spawn(rx); + + for i in 0..1000 { + assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(¬ify_noop(), 1)); + assert_eq!(Ok(Async::NotReady), tx.poll_stream_notify(¬ify_noop(), 1)); + assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn bounded_100_tx(b: &mut Bencher) { + b.iter(|| { + // Each sender can send one item after specified capacity + let (tx, rx) = channel(0); + + let mut tx: Vec<_> = (0..100).map(|_| { + executor::spawn(TestSender { + tx: tx.clone(), + last: 0 + }) + }).collect(); + + let mut rx = executor::spawn(rx); + + for i in 0..10 { + for j in 0..tx.len() { + // Send an item + assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(¬ify_noop(), 1)); + // Then block + assert_eq!(Ok(Async::NotReady), tx[j].poll_stream_notify(¬ify_noop(), 1)); + // Recv the item + assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + } + }) +} diff --git a/third_party/rust/futures-0.1.31/benches/thread_notify.rs b/third_party/rust/futures-0.1.31/benches/thread_notify.rs new file mode 100644 index 0000000000..92932353d8 --- /dev/null +++ b/third_party/rust/futures-0.1.31/benches/thread_notify.rs @@ -0,0 +1,114 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::{Future, Poll, Async}; +use futures::task::{self, Task}; + +use test::Bencher; + +#[bench] +fn thread_yield_single_thread_one_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + let y = Yield { rem: NUM }; + y.wait().unwrap(); + }); +} + +#[bench] +fn thread_yield_single_thread_many_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + for _ in 0..NUM { + let y = Yield { rem: 1 }; + y.wait().unwrap(); + } + }); +} + +#[bench] +fn thread_yield_multi_thread(b: &mut Bencher) { + use std::sync::mpsc; + use std::thread; + + const NUM: usize = 1_000; + + let (tx, rx) = mpsc::sync_channel::<Task>(10_000); + + struct Yield { + rem: usize, + tx: mpsc::SyncSender<Task>, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + self.tx.send(task::current()).unwrap(); + Ok(Async::NotReady) + } + } + } + + thread::spawn(move || { + while let Ok(task) = rx.recv() { + task.notify(); + } + }); + + b.iter(move || { + let y = Yield { + rem: NUM, + tx: tx.clone(), + }; + + y.wait().unwrap(); + }); +} |