diff options
Diffstat (limited to 'third_party/rust/futures-channel/benches/sync_mpsc.rs')
-rw-r--r-- | third_party/rust/futures-channel/benches/sync_mpsc.rs | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/benches/sync_mpsc.rs b/third_party/rust/futures-channel/benches/sync_mpsc.rs new file mode 100644 index 0000000000..7c3c3d3a80 --- /dev/null +++ b/third_party/rust/futures-channel/benches/sync_mpsc.rs @@ -0,0 +1,135 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use { + futures::{ + channel::mpsc::{self, Sender, UnboundedSender}, + ready, + sink::Sink, + stream::{Stream, StreamExt}, + task::{Context, Poll}, + }, + futures_test::task::noop_context, + std::pin::Pin, +}; + +/// Single producer, single consumer +#[bench] +fn unbounded_1_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + // 1000 iterations to avoid measuring overhead of initialization + // Result should be divided by 1000 + for i in 0..1000 { + // Poll, not ready, park + assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); + + UnboundedSender::unbounded_send(&tx, i).unwrap(); + + // Now poll ready + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn unbounded_100_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); + + // 1000 send/recv operations total, result should be divided by 1000 + for _ in 0..10 { + for (i, x) in tx.iter().enumerate() { + assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); + + UnboundedSender::unbounded_send(x, i).unwrap(); + + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + } + }) +} + +#[bench] +fn unbounded_uncontended(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + for i in 0..1000 { + UnboundedSender::unbounded_send(&tx, i).expect("send"); + // No need to create a task, because poll is not going to park. + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// A Stream that continuously sends incrementing number of the queue +struct TestSender { + tx: Sender<u32>, + last: u32, // Last number sent +} + +// Could be a Future, it doesn't matter +impl Stream for TestSender { + type Item = u32; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = &mut *self; + let mut tx = Pin::new(&mut this.tx); + + ready!(tx.as_mut().poll_ready(cx)).unwrap(); + tx.as_mut().start_send(this.last + 1).unwrap(); + this.last += 1; + assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx)); + Poll::Ready(Some(this.last)) + } +} + +/// Single producers, single consumer +#[bench] +fn bounded_1_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::channel(0); + + let mut tx = TestSender { tx, last: 0 }; + + for i in 0..1000 { + assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn bounded_100_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + // Each sender can send one item after specified capacity + let (tx, mut rx) = mpsc::channel(0); + + let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); + + for i in 0..10 { + for x in &mut tx { + // Send an item + assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx)); + // Then block + assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx)); + // Recv the item + assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); + } + } + }) +} |