From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- .../rust/futures-util/benches/flatten_unordered.rs | 66 ++++++++++++++++++++++ .../rust/futures-util/benches/futures_unordered.rs | 43 ++++++++++++++ third_party/rust/futures-util/benches/select.rs | 35 ++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 third_party/rust/futures-util/benches/flatten_unordered.rs create mode 100644 third_party/rust/futures-util/benches/futures_unordered.rs create mode 100644 third_party/rust/futures-util/benches/select.rs (limited to 'third_party/rust/futures-util/benches') diff --git a/third_party/rust/futures-util/benches/flatten_unordered.rs b/third_party/rust/futures-util/benches/flatten_unordered.rs new file mode 100644 index 0000000000..64d5f9a4e3 --- /dev/null +++ b/third_party/rust/futures-util/benches/flatten_unordered.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{self, FutureExt}; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(STREAM_COUNT); + let mut rxs = Vec::new(); + + for _ in 0..STREAM_COUNT { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + let mut last = 1; + while let Some(tx) = txs.pop_front() { + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; + } + }); + + let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { + async { + if let Some(next) = vals.next() { + let val = next.await.unwrap(); + Some((val, vals)) + } else { + None + } + } + .boxed() + }) + .flatten_unordered(None); + + block_on(future::poll_fn(move |cx| { + let mut count = 0; + loop { + match flatten.poll_next_unpin(cx) { + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + count += 1; + } + _ => {} + } + } + assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + + Poll::Ready(()) + })) + }); +} diff --git a/third_party/rust/futures-util/benches/futures_unordered.rs b/third_party/rust/futures-util/benches/futures_unordered.rs new file mode 100644 index 0000000000..d5fe7a59de --- /dev/null +++ b/third_party/rust/futures-util/benches/futures_unordered.rs @@ -0,0 +1,43 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshots(b: &mut Bencher) { + const NUM: usize = 10_000; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(NUM); + let mut rxs = FuturesUnordered::new(); + + for _ in 0..NUM { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + while let Some(tx) = txs.pop_front() { + let _ = tx.send("hello"); + } + }); + + block_on(future::poll_fn(move |cx| { + loop { + if let Poll::Ready(None) = rxs.poll_next_unpin(cx) { + break; + } + } + Poll::Ready(()) + })) + }); +} diff --git a/third_party/rust/futures-util/benches/select.rs b/third_party/rust/futures-util/benches/select.rs new file mode 100644 index 0000000000..5410a95299 --- /dev/null +++ b/third_party/rust/futures-util/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} -- cgit v1.2.3