#![feature(test)] extern crate test; use crate::test::Bencher; use { futures::{ channel::mpsc::{self, Sender, UnboundedSender}, ready, sink::Sink, stream::{Stream, StreamExt}, task::{Context, Poll}, }, futures_test::task::noop_context, std::pin::Pin, }; /// Single producer, single consumer #[bench] fn unbounded_1_tx(b: &mut Bencher) { let mut cx = noop_context(); b.iter(|| { let (tx, mut rx) = mpsc::unbounded(); // 1000 iterations to avoid measuring overhead of initialization // Result should be divided by 1000 for i in 0..1000 { // Poll, not ready, park assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); UnboundedSender::unbounded_send(&tx, i).unwrap(); // Now poll ready assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); } }) } /// 100 producers, single consumer #[bench] fn unbounded_100_tx(b: &mut Bencher) { let mut cx = noop_context(); b.iter(|| { let (tx, mut rx) = mpsc::unbounded(); let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); // 1000 send/recv operations total, result should be divided by 1000 for _ in 0..10 { for (i, x) in tx.iter().enumerate() { assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); UnboundedSender::unbounded_send(x, i).unwrap(); assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); } } }) } #[bench] fn unbounded_uncontended(b: &mut Bencher) { let mut cx = noop_context(); b.iter(|| { let (tx, mut rx) = mpsc::unbounded(); for i in 0..1000 { UnboundedSender::unbounded_send(&tx, i).expect("send"); // No need to create a task, because poll is not going to park. assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); } }) } /// A Stream that continuously sends incrementing number of the queue struct TestSender { tx: Sender, last: u32, // Last number sent } // Could be a Future, it doesn't matter impl Stream for TestSender { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut tx = Pin::new(&mut this.tx); ready!(tx.as_mut().poll_ready(cx)).unwrap(); tx.as_mut().start_send(this.last + 1).unwrap(); this.last += 1; assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx)); Poll::Ready(Some(this.last)) } } /// Single producers, single consumer #[bench] fn bounded_1_tx(b: &mut Bencher) { let mut cx = noop_context(); b.iter(|| { let (tx, mut rx) = mpsc::channel(0); let mut tx = TestSender { tx, last: 0 }; for i in 0..1000 { assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx)); assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx)); assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); } }) } /// 100 producers, single consumer #[bench] fn bounded_100_tx(b: &mut Bencher) { let mut cx = noop_context(); b.iter(|| { // Each sender can send one item after specified capacity let (tx, mut rx) = mpsc::channel(0); let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); for i in 0..10 { for x in &mut tx { // Send an item assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx)); // Then block assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx)); // Recv the item assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); } } }) }