summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/benches
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/futures-util/benches/flatten_unordered.rs66
-rw-r--r--third_party/rust/futures-util/benches/futures_unordered.rs43
-rw-r--r--third_party/rust/futures-util/benches/select.rs35
-rw-r--r--third_party/rust/futures-util/benches_disabled/bilock.rs122
4 files changed, 266 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/benches/flatten_unordered.rs b/third_party/rust/futures-util/benches/flatten_unordered.rs
new file mode 100644
index 0000000000..64d5f9a4e3
--- /dev/null
+++ b/third_party/rust/futures-util/benches/flatten_unordered.rs
@@ -0,0 +1,66 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshot_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+ const STREAM_ITEM_COUNT: usize = 1;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(STREAM_COUNT);
+ let mut rxs = Vec::new();
+
+ for _ in 0..STREAM_COUNT {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ let mut last = 1;
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
+ last += STREAM_ITEM_COUNT;
+ }
+ });
+
+ let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
+ async {
+ if let Some(next) = vals.next() {
+ let val = next.await.unwrap();
+ Some((val, vals))
+ } else {
+ None
+ }
+ }
+ .boxed()
+ })
+ .flatten_unordered(None);
+
+ block_on(future::poll_fn(move |cx| {
+ let mut count = 0;
+ loop {
+ match flatten.poll_next_unpin(cx) {
+ Poll::Ready(None) => break,
+ Poll::Ready(Some(_)) => {
+ count += 1;
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
+
+ Poll::Ready(())
+ }))
+ });
+}
diff --git a/third_party/rust/futures-util/benches/futures_unordered.rs b/third_party/rust/futures-util/benches/futures_unordered.rs
new file mode 100644
index 0000000000..d5fe7a59de
--- /dev/null
+++ b/third_party/rust/futures-util/benches/futures_unordered.rs
@@ -0,0 +1,43 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshots(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(NUM);
+ let mut rxs = FuturesUnordered::new();
+
+ for _ in 0..NUM {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send("hello");
+ }
+ });
+
+ block_on(future::poll_fn(move |cx| {
+ loop {
+ if let Poll::Ready(None) = rxs.poll_next_unpin(cx) {
+ break;
+ }
+ }
+ Poll::Ready(())
+ }))
+ });
+}
diff --git a/third_party/rust/futures-util/benches/select.rs b/third_party/rust/futures-util/benches/select.rs
new file mode 100644
index 0000000000..5410a95299
--- /dev/null
+++ b/third_party/rust/futures-util/benches/select.rs
@@ -0,0 +1,35 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::executor::block_on;
+use futures::stream::{repeat, select, StreamExt};
+
+#[bench]
+fn select_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+
+ b.iter(|| {
+ let stream1 = repeat(1).take(STREAM_COUNT);
+ let stream2 = repeat(2).take(STREAM_COUNT);
+ let stream3 = repeat(3).take(STREAM_COUNT);
+ let stream4 = repeat(4).take(STREAM_COUNT);
+ let stream5 = repeat(5).take(STREAM_COUNT);
+ let stream6 = repeat(6).take(STREAM_COUNT);
+ let stream7 = repeat(7).take(STREAM_COUNT);
+ let count = block_on(async {
+ let count = select(
+ stream1,
+ select(
+ stream2,
+ select(stream3, select(stream4, select(stream5, select(stream6, stream7)))),
+ ),
+ )
+ .count()
+ .await;
+ count
+ });
+ assert_eq!(count, STREAM_COUNT * 7);
+ });
+}
diff --git a/third_party/rust/futures-util/benches_disabled/bilock.rs b/third_party/rust/futures-util/benches_disabled/bilock.rs
new file mode 100644
index 0000000000..417f75d31e
--- /dev/null
+++ b/third_party/rust/futures-util/benches_disabled/bilock.rs
@@ -0,0 +1,122 @@
+#![feature(test)]
+
+#[cfg(feature = "bilock")]
+mod bench {
+ use futures::executor::LocalPool;
+ use futures::task::{Context, Waker};
+ use futures_util::lock::BiLock;
+ use futures_util::lock::BiLockAcquire;
+ use futures_util::lock::BiLockAcquired;
+ use futures_util::task::ArcWake;
+
+ use std::sync::Arc;
+ use test::Bencher;
+
+ fn notify_noop() -> Waker {
+ struct Noop;
+
+ impl ArcWake for Noop {
+ fn wake(_: &Arc<Self>) {}
+ }
+
+ ArcWake::into_waker(Arc::new(Noop))
+ }
+
+ /// Pseudo-stream which simply calls `lock.poll()` on `poll`
+ struct LockStream {
+ lock: BiLockAcquire<u32>,
+ }
+
+ impl LockStream {
+ fn new(lock: BiLock<u32>) -> Self {
+ Self { lock: lock.lock() }
+ }
+
+ /// Release a lock after it was acquired in `poll`,
+ /// so `poll` could be called again.
+ fn release_lock(&mut self, guard: BiLockAcquired<u32>) {
+ self.lock = guard.unlock().lock()
+ }
+ }
+
+ impl Stream for LockStream {
+ type Item = BiLockAcquired<u32>;
+ type Error = ();
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> {
+ self.lock.poll(cx).map(|a| a.map(Some))
+ }
+ }
+
+ #[bench]
+ fn contended(b: &mut Bencher) {
+ let pool = LocalPool::new();
+ let mut exec = pool.executor();
+ let waker = notify_noop();
+ let mut map = task::LocalMap::new();
+ let mut waker = task::Context::new(&mut map, &waker, &mut exec);
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ let mut x = LockStream::new(x);
+ let mut y = LockStream::new(y);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_next(&mut waker) {
+ Ok(Poll::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ // Try poll second lock while first lock still holds the lock
+ match y.poll_next(&mut waker) {
+ Ok(Poll::Pending) => (),
+ _ => panic!(),
+ };
+
+ x.release_lock(x_guard);
+
+ let y_guard = match y.poll_next(&mut waker) {
+ Ok(Poll::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ y.release_lock(y_guard);
+ }
+ (x, y)
+ });
+ }
+
+ #[bench]
+ fn lock_unlock(b: &mut Bencher) {
+ let pool = LocalPool::new();
+ let mut exec = pool.executor();
+ let waker = notify_noop();
+ let mut map = task::LocalMap::new();
+ let mut waker = task::Context::new(&mut map, &waker, &mut exec);
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ let mut x = LockStream::new(x);
+ let mut y = LockStream::new(y);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_next(&mut waker) {
+ Ok(Poll::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ x.release_lock(x_guard);
+
+ let y_guard = match y.poll_next(&mut waker) {
+ Ok(Poll::Ready(Some(guard))) => guard,
+ _ => panic!(),
+ };
+
+ y.release_lock(y_guard);
+ }
+ (x, y)
+ })
+ }
+}