diff options
Diffstat (limited to '')
4 files changed, 266 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/benches/flatten_unordered.rs b/third_party/rust/futures-util/benches/flatten_unordered.rs new file mode 100644 index 0000000000..64d5f9a4e3 --- /dev/null +++ b/third_party/rust/futures-util/benches/flatten_unordered.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{self, FutureExt}; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(STREAM_COUNT); + let mut rxs = Vec::new(); + + for _ in 0..STREAM_COUNT { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + let mut last = 1; + while let Some(tx) = txs.pop_front() { + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; + } + }); + + let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { + async { + if let Some(next) = vals.next() { + let val = next.await.unwrap(); + Some((val, vals)) + } else { + None + } + } + .boxed() + }) + .flatten_unordered(None); + + block_on(future::poll_fn(move |cx| { + let mut count = 0; + loop { + match flatten.poll_next_unpin(cx) { + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + count += 1; + } + _ => {} + } + } + assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + + Poll::Ready(()) + })) + }); +} diff --git a/third_party/rust/futures-util/benches/futures_unordered.rs b/third_party/rust/futures-util/benches/futures_unordered.rs new file mode 100644 index 0000000000..d5fe7a59de --- /dev/null +++ b/third_party/rust/futures-util/benches/futures_unordered.rs @@ -0,0 +1,43 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshots(b: &mut Bencher) { + const NUM: usize = 10_000; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(NUM); + let mut rxs = FuturesUnordered::new(); + + for _ in 0..NUM { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + while let Some(tx) = txs.pop_front() { + let _ = tx.send("hello"); + } + }); + + block_on(future::poll_fn(move |cx| { + loop { + if let Poll::Ready(None) = rxs.poll_next_unpin(cx) { + break; + } + } + Poll::Ready(()) + })) + }); +} diff --git a/third_party/rust/futures-util/benches/select.rs b/third_party/rust/futures-util/benches/select.rs new file mode 100644 index 0000000000..5410a95299 --- /dev/null +++ b/third_party/rust/futures-util/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} diff --git a/third_party/rust/futures-util/benches_disabled/bilock.rs b/third_party/rust/futures-util/benches_disabled/bilock.rs new file mode 100644 index 0000000000..417f75d31e --- /dev/null +++ b/third_party/rust/futures-util/benches_disabled/bilock.rs @@ -0,0 +1,122 @@ +#![feature(test)] + +#[cfg(feature = "bilock")] +mod bench { + use futures::executor::LocalPool; + use futures::task::{Context, Waker}; + use futures_util::lock::BiLock; + use futures_util::lock::BiLockAcquire; + use futures_util::lock::BiLockAcquired; + use futures_util::task::ArcWake; + + use std::sync::Arc; + use test::Bencher; + + fn notify_noop() -> Waker { + struct Noop; + + impl ArcWake for Noop { + fn wake(_: &Arc<Self>) {} + } + + ArcWake::into_waker(Arc::new(Noop)) + } + + /// Pseudo-stream which simply calls `lock.poll()` on `poll` + struct LockStream { + lock: BiLockAcquire<u32>, + } + + impl LockStream { + fn new(lock: BiLock<u32>) -> Self { + Self { lock: lock.lock() } + } + + /// Release a lock after it was acquired in `poll`, + /// so `poll` could be called again. + fn release_lock(&mut self, guard: BiLockAcquired<u32>) { + self.lock = guard.unlock().lock() + } + } + + impl Stream for LockStream { + type Item = BiLockAcquired<u32>; + type Error = (); + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> { + self.lock.poll(cx).map(|a| a.map(Some)) + } + } + + #[bench] + fn contended(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_next(&mut waker) { + Ok(Poll::Pending) => (), + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }); + } + + #[bench] + fn lock_unlock(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }) + } +} |