diff options
Diffstat (limited to 'third_party/rust/futures-util/benches/flatten_unordered.rs')
-rw-r--r-- | third_party/rust/futures-util/benches/flatten_unordered.rs | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/benches/flatten_unordered.rs b/third_party/rust/futures-util/benches/flatten_unordered.rs new file mode 100644 index 0000000000..64d5f9a4e3 --- /dev/null +++ b/third_party/rust/futures-util/benches/flatten_unordered.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{self, FutureExt}; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(STREAM_COUNT); + let mut rxs = Vec::new(); + + for _ in 0..STREAM_COUNT { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + let mut last = 1; + while let Some(tx) = txs.pop_front() { + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; + } + }); + + let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { + async { + if let Some(next) = vals.next() { + let val = next.await.unwrap(); + Some((val, vals)) + } else { + None + } + } + .boxed() + }) + .flatten_unordered(None); + + block_on(future::poll_fn(move |cx| { + let mut count = 0; + loop { + match flatten.poll_next_unpin(cx) { + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + count += 1; + } + _ => {} + } + } + assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + + Poll::Ready(()) + })) + }); +} |