summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/benches
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/benches')
-rw-r--r--third_party/rust/futures-0.1.31/benches/bilock.rs121
-rw-r--r--third_party/rust/futures-0.1.31/benches/futures_unordered.rs43
-rw-r--r--third_party/rust/futures-0.1.31/benches/poll.rs72
-rw-r--r--third_party/rust/futures-0.1.31/benches/sync_mpsc.rs168
-rw-r--r--third_party/rust/futures-0.1.31/benches/thread_notify.rs114
5 files changed, 518 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/benches/bilock.rs b/third_party/rust/futures-0.1.31/benches/bilock.rs
new file mode 100644
index 0000000000..0f840289ab
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/benches/bilock.rs
@@ -0,0 +1,121 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate test;
+
+use futures::{Async, Poll};
+use futures::executor;
+use futures::executor::{Notify, NotifyHandle};
+use futures::sync::BiLock;
+use futures::sync::BiLockAcquire;
+use futures::sync::BiLockAcquired;
+use futures::future::Future;
+use futures::stream::Stream;
+
+
+use test::Bencher;
+
+fn notify_noop() -> NotifyHandle {
+ struct Noop;
+
+ impl Notify for Noop {
+ fn notify(&self, _id: usize) {}
+ }
+
+ const NOOP : &'static Noop = &Noop;
+
+ NotifyHandle::from(NOOP)
+}
+
+
+/// Pseudo-stream which simply calls `lock.poll()` on `poll`
+struct LockStream {
+ lock: BiLockAcquire<u32>,
+}
+
+impl LockStream {
+ fn new(lock: BiLock<u32>) -> LockStream {
+ LockStream {
+ lock: lock.lock()
+ }
+ }
+
+ /// Release a lock after it was acquired in `poll`,
+ /// so `poll` could be called again.
+ fn release_lock(&mut self, guard: BiLockAcquired<u32>) {
+ self.lock = guard.unlock().lock()
+ }
+}
+
+impl Stream for LockStream {
+ type Item = BiLockAcquired<u32>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.lock.poll().map(|a| match a {
+ Async::Ready(a) => Async::Ready(Some(a)),
+ Async::NotReady => Async::NotReady,
+ })
+ }
+}
+
+
+#[bench]
+fn contended(b: &mut Bencher) {
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ let mut x = executor::spawn(LockStream::new(x));
+ let mut y = executor::spawn(LockStream::new(y));
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_stream_notify(&notify_noop(), 11) {
+ Ok(Async::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ // Try poll second lock while first lock still holds the lock
+ match y.poll_stream_notify(&notify_noop(), 11) {
+ Ok(Async::NotReady) => (),
+ _ => panic!(),
+ };
+
+ x.get_mut().release_lock(x_guard);
+
+ let y_guard = match y.poll_stream_notify(&notify_noop(), 11) {
+ Ok(Async::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ y.get_mut().release_lock(y_guard);
+ }
+ (x, y)
+ });
+}
+
+#[bench]
+fn lock_unlock(b: &mut Bencher) {
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ let mut x = executor::spawn(LockStream::new(x));
+ let mut y = executor::spawn(LockStream::new(y));
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_stream_notify(&notify_noop(), 11) {
+ Ok(Async::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ x.get_mut().release_lock(x_guard);
+
+ let y_guard = match y.poll_stream_notify(&notify_noop(), 11) {
+ Ok(Async::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ y.get_mut().release_lock(y_guard);
+ }
+ (x, y)
+ })
+}
diff --git a/third_party/rust/futures-0.1.31/benches/futures_unordered.rs b/third_party/rust/futures-0.1.31/benches/futures_unordered.rs
new file mode 100644
index 0000000000..c922df5541
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/benches/futures_unordered.rs
@@ -0,0 +1,43 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate test;
+
+use futures::*;
+use futures::stream::FuturesUnordered;
+use futures::sync::oneshot;
+
+use test::Bencher;
+
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshots(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(NUM);
+ let mut rxs = FuturesUnordered::new();
+
+ for _ in 0..NUM {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send("hello");
+ }
+ });
+
+ future::lazy(move || {
+ loop {
+ if let Ok(Async::Ready(None)) = rxs.poll() {
+ return Ok::<(), ()>(());
+ }
+ }
+ }).wait().unwrap();
+ });
+}
diff --git a/third_party/rust/futures-0.1.31/benches/poll.rs b/third_party/rust/futures-0.1.31/benches/poll.rs
new file mode 100644
index 0000000000..1fec653fa6
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/benches/poll.rs
@@ -0,0 +1,72 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate test;
+
+use futures::*;
+use futures::executor::{Notify, NotifyHandle};
+use futures::task::Task;
+
+use test::Bencher;
+
+fn notify_noop() -> NotifyHandle {
+ struct Noop;
+
+ impl Notify for Noop {
+ fn notify(&self, _id: usize) {}
+ }
+
+ const NOOP : &'static Noop = &Noop;
+
+ NotifyHandle::from(NOOP)
+}
+
+#[bench]
+fn task_init(b: &mut Bencher) {
+ const NUM: u32 = 100_000;
+
+ struct MyFuture {
+ num: u32,
+ task: Option<Task>,
+ };
+
+ impl Future for MyFuture {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ if self.num == NUM {
+ Ok(Async::Ready(()))
+ } else {
+ self.num += 1;
+
+ if let Some(ref t) = self.task {
+ if t.will_notify_current() {
+ t.notify();
+ return Ok(Async::NotReady);
+ }
+ }
+
+ let t = task::current();
+ t.notify();
+ self.task = Some(t);
+
+ Ok(Async::NotReady)
+ }
+ }
+ }
+
+ let notify = notify_noop();
+
+ let mut fut = executor::spawn(MyFuture {
+ num: 0,
+ task: None,
+ });
+
+ b.iter(|| {
+ fut.get_mut().num = 0;
+
+ while let Ok(Async::NotReady) = fut.poll_future_notify(&notify, 0) {
+ }
+ });
+}
diff --git a/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs b/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs
new file mode 100644
index 0000000000..c0365c5fed
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/benches/sync_mpsc.rs
@@ -0,0 +1,168 @@
+#![feature(test)]
+
+#[macro_use]
+extern crate futures;
+extern crate test;
+
+use futures::{Async, Poll, AsyncSink};
+use futures::executor;
+use futures::executor::{Notify, NotifyHandle};
+
+use futures::sink::Sink;
+use futures::stream::Stream;
+
+use futures::sync::mpsc::unbounded;
+use futures::sync::mpsc::channel;
+use futures::sync::mpsc::Sender;
+use futures::sync::mpsc::UnboundedSender;
+
+
+use test::Bencher;
+
+fn notify_noop() -> NotifyHandle {
+ struct Noop;
+
+ impl Notify for Noop {
+ fn notify(&self, _id: usize) {}
+ }
+
+ const NOOP : &'static Noop = &Noop;
+
+ NotifyHandle::from(NOOP)
+}
+
+/// Single producer, single consumer
+#[bench]
+fn unbounded_1_tx(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, rx) = unbounded();
+
+ let mut rx = executor::spawn(rx);
+
+ // 1000 iterations to avoid measuring overhead of initialization
+ // Result should be divided by 1000
+ for i in 0..1000 {
+
+ // Poll, not ready, park
+ assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));
+
+ UnboundedSender::unbounded_send(&tx, i).unwrap();
+
+ // Now poll ready
+ assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
+ }
+ })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn unbounded_100_tx(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, rx) = unbounded();
+
+ let mut rx = executor::spawn(rx);
+
+ let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
+
+ // 1000 send/recv operations total, result should be divided by 1000
+ for _ in 0..10 {
+ for i in 0..tx.len() {
+ assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));
+
+ UnboundedSender::unbounded_send(&tx[i], i).unwrap();
+
+ assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
+ }
+ }
+ })
+}
+
+#[bench]
+fn unbounded_uncontended(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, mut rx) = unbounded();
+
+ for i in 0..1000 {
+ UnboundedSender::unbounded_send(&tx, i).expect("send");
+ // No need to create a task, because poll is not going to park.
+ assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
+ }
+ })
+}
+
+
+/// A Stream that continuously sends incrementing number of the queue
+struct TestSender {
+ tx: Sender<u32>,
+ last: u32, // Last number sent
+}
+
+// Could be a Future, it doesn't matter
+impl Stream for TestSender {
+ type Item = u32;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ match self.tx.start_send(self.last + 1) {
+ Err(_) => panic!(),
+ Ok(AsyncSink::Ready) => {
+ self.last += 1;
+ Ok(Async::Ready(Some(self.last)))
+ }
+ Ok(AsyncSink::NotReady(_)) => {
+ Ok(Async::NotReady)
+ }
+ }
+ }
+}
+
+
+/// Single producers, single consumer
+#[bench]
+fn bounded_1_tx(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, rx) = channel(0);
+
+ let mut tx = executor::spawn(TestSender {
+ tx: tx,
+ last: 0,
+ });
+
+ let mut rx = executor::spawn(rx);
+
+ for i in 0..1000 {
+ assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(&notify_noop(), 1));
+ assert_eq!(Ok(Async::NotReady), tx.poll_stream_notify(&notify_noop(), 1));
+ assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
+ }
+ })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn bounded_100_tx(b: &mut Bencher) {
+ b.iter(|| {
+ // Each sender can send one item after specified capacity
+ let (tx, rx) = channel(0);
+
+ let mut tx: Vec<_> = (0..100).map(|_| {
+ executor::spawn(TestSender {
+ tx: tx.clone(),
+ last: 0
+ })
+ }).collect();
+
+ let mut rx = executor::spawn(rx);
+
+ for i in 0..10 {
+ for j in 0..tx.len() {
+ // Send an item
+ assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(&notify_noop(), 1));
+ // Then block
+ assert_eq!(Ok(Async::NotReady), tx[j].poll_stream_notify(&notify_noop(), 1));
+ // Recv the item
+ assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
+ }
+ }
+ })
+}
diff --git a/third_party/rust/futures-0.1.31/benches/thread_notify.rs b/third_party/rust/futures-0.1.31/benches/thread_notify.rs
new file mode 100644
index 0000000000..92932353d8
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/benches/thread_notify.rs
@@ -0,0 +1,114 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate test;
+
+use futures::{Future, Poll, Async};
+use futures::task::{self, Task};
+
+use test::Bencher;
+
+#[bench]
+fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ struct Yield {
+ rem: usize,
+ }
+
+ impl Future for Yield {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ if self.rem == 0 {
+ Ok(Async::Ready(()))
+ } else {
+ self.rem -= 1;
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+ }
+
+ b.iter(|| {
+ let y = Yield { rem: NUM };
+ y.wait().unwrap();
+ });
+}
+
+#[bench]
+fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ struct Yield {
+ rem: usize,
+ }
+
+ impl Future for Yield {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ if self.rem == 0 {
+ Ok(Async::Ready(()))
+ } else {
+ self.rem -= 1;
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+ }
+
+ b.iter(|| {
+ for _ in 0..NUM {
+ let y = Yield { rem: 1 };
+ y.wait().unwrap();
+ }
+ });
+}
+
+#[bench]
+fn thread_yield_multi_thread(b: &mut Bencher) {
+ use std::sync::mpsc;
+ use std::thread;
+
+ const NUM: usize = 1_000;
+
+ let (tx, rx) = mpsc::sync_channel::<Task>(10_000);
+
+ struct Yield {
+ rem: usize,
+ tx: mpsc::SyncSender<Task>,
+ }
+
+ impl Future for Yield {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ if self.rem == 0 {
+ Ok(Async::Ready(()))
+ } else {
+ self.rem -= 1;
+ self.tx.send(task::current()).unwrap();
+ Ok(Async::NotReady)
+ }
+ }
+ }
+
+ thread::spawn(move || {
+ while let Ok(task) = rx.recv() {
+ task.notify();
+ }
+ });
+
+ b.iter(move || {
+ let y = Yield {
+ rem: NUM,
+ tx: tx.clone(),
+ };
+
+ y.wait().unwrap();
+ });
+}