#![feature(test)] extern crate test; use crate::test::Bencher; use futures::channel::oneshot; use futures::executor::block_on; use futures::future::{self, FutureExt}; use futures::stream::{self, StreamExt}; use futures::task::Poll; use std::collections::VecDeque; use std::thread; #[bench] fn oneshot_streams(b: &mut Bencher) { const STREAM_COUNT: usize = 10_000; const STREAM_ITEM_COUNT: usize = 1; b.iter(|| { let mut txs = VecDeque::with_capacity(STREAM_COUNT); let mut rxs = Vec::new(); for _ in 0..STREAM_COUNT { let (tx, rx) = oneshot::channel(); txs.push_back(tx); rxs.push(rx); } thread::spawn(move || { let mut last = 1; while let Some(tx) = txs.pop_front() { let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); last += STREAM_ITEM_COUNT; } }); let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { async { if let Some(next) = vals.next() { let val = next.await.unwrap(); Some((val, vals)) } else { None } } .boxed() }) .flatten_unordered(None); block_on(future::poll_fn(move |cx| { let mut count = 0; loop { match flatten.poll_next_unpin(cx) { Poll::Ready(None) => break, Poll::Ready(Some(_)) => { count += 1; } _ => {} } } assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); Poll::Ready(()) })) }); }