summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-channel/benches/sync_mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-channel/benches/sync_mpsc.rs')
-rw-r--r--third_party/rust/futures-channel/benches/sync_mpsc.rs135
1 files changed, 135 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/benches/sync_mpsc.rs b/third_party/rust/futures-channel/benches/sync_mpsc.rs
new file mode 100644
index 0000000000..7c3c3d3a80
--- /dev/null
+++ b/third_party/rust/futures-channel/benches/sync_mpsc.rs
@@ -0,0 +1,135 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use {
+ futures::{
+ channel::mpsc::{self, Sender, UnboundedSender},
+ ready,
+ sink::Sink,
+ stream::{Stream, StreamExt},
+ task::{Context, Poll},
+ },
+ futures_test::task::noop_context,
+ std::pin::Pin,
+};
+
+/// Single producer, single consumer
+#[bench]
+fn unbounded_1_tx(b: &mut Bencher) {
+ let mut cx = noop_context();
+ b.iter(|| {
+ let (tx, mut rx) = mpsc::unbounded();
+
+ // 1000 iterations to avoid measuring overhead of initialization
+ // Result should be divided by 1000
+ for i in 0..1000 {
+ // Poll, not ready, park
+ assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
+
+ UnboundedSender::unbounded_send(&tx, i).unwrap();
+
+ // Now poll ready
+ assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+ }
+ })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn unbounded_100_tx(b: &mut Bencher) {
+ let mut cx = noop_context();
+ b.iter(|| {
+ let (tx, mut rx) = mpsc::unbounded();
+
+ let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
+
+ // 1000 send/recv operations total, result should be divided by 1000
+ for _ in 0..10 {
+ for (i, x) in tx.iter().enumerate() {
+ assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
+
+ UnboundedSender::unbounded_send(x, i).unwrap();
+
+ assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+ }
+ }
+ })
+}
+
+#[bench]
+fn unbounded_uncontended(b: &mut Bencher) {
+ let mut cx = noop_context();
+ b.iter(|| {
+ let (tx, mut rx) = mpsc::unbounded();
+
+ for i in 0..1000 {
+ UnboundedSender::unbounded_send(&tx, i).expect("send");
+ // No need to create a task, because poll is not going to park.
+ assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+ }
+ })
+}
+
+/// A Stream that continuously sends incrementing number of the queue
+struct TestSender {
+ tx: Sender<u32>,
+ last: u32, // Last number sent
+}
+
+// Could be a Future, it doesn't matter
+impl Stream for TestSender {
+ type Item = u32;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = &mut *self;
+ let mut tx = Pin::new(&mut this.tx);
+
+ ready!(tx.as_mut().poll_ready(cx)).unwrap();
+ tx.as_mut().start_send(this.last + 1).unwrap();
+ this.last += 1;
+ assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
+ Poll::Ready(Some(this.last))
+ }
+}
+
+/// Single producers, single consumer
+#[bench]
+fn bounded_1_tx(b: &mut Bencher) {
+ let mut cx = noop_context();
+ b.iter(|| {
+ let (tx, mut rx) = mpsc::channel(0);
+
+ let mut tx = TestSender { tx, last: 0 };
+
+ for i in 0..1000 {
+ assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
+ }
+ })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn bounded_100_tx(b: &mut Bencher) {
+ let mut cx = noop_context();
+ b.iter(|| {
+ // Each sender can send one item after specified capacity
+ let (tx, mut rx) = mpsc::channel(0);
+
+ let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
+
+ for i in 0..10 {
+ for x in &mut tx {
+ // Send an item
+ assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
+ // Then block
+ assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
+ // Recv the item
+ assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
+ }
+ }
+ })
+}