58 lines
1.5 KiB
Rust
58 lines
1.5 KiB
Rust
#![feature(test)]
|
|
|
|
extern crate test;
|
|
use crate::test::Bencher;
|
|
|
|
use futures::channel::oneshot;
|
|
use futures::executor::block_on;
|
|
use futures::future;
|
|
use futures::stream::{self, StreamExt};
|
|
use futures::task::Poll;
|
|
use futures_util::FutureExt;
|
|
use std::collections::VecDeque;
|
|
use std::thread;
|
|
|
|
#[bench]
|
|
fn oneshot_streams(b: &mut Bencher) {
|
|
const STREAM_COUNT: usize = 10_000;
|
|
const STREAM_ITEM_COUNT: usize = 1;
|
|
|
|
b.iter(|| {
|
|
let mut txs = VecDeque::with_capacity(STREAM_COUNT);
|
|
let mut rxs = Vec::new();
|
|
|
|
for _ in 0..STREAM_COUNT {
|
|
let (tx, rx) = oneshot::channel();
|
|
txs.push_back(tx);
|
|
rxs.push(rx);
|
|
}
|
|
|
|
thread::spawn(move || {
|
|
let mut last = 1;
|
|
while let Some(tx) = txs.pop_front() {
|
|
let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
|
|
last += STREAM_ITEM_COUNT;
|
|
}
|
|
});
|
|
|
|
let mut flatten = stream::iter(rxs)
|
|
.map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
|
|
.flatten_unordered(None);
|
|
|
|
block_on(future::poll_fn(move |cx| {
|
|
let mut count = 0;
|
|
loop {
|
|
match flatten.poll_next_unpin(cx) {
|
|
Poll::Ready(None) => break,
|
|
Poll::Ready(Some(_)) => {
|
|
count += 1;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
|
|
|
|
Poll::Ready(())
|
|
}))
|
|
});
|
|
}
|