diff options
Diffstat (limited to 'third_party/rust/futures-0.1.29/benches/sync_mpsc.rs')
-rw-r--r-- | third_party/rust/futures-0.1.29/benches/sync_mpsc.rs | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/benches/sync_mpsc.rs b/third_party/rust/futures-0.1.29/benches/sync_mpsc.rs new file mode 100644 index 0000000000..c0365c5fed --- /dev/null +++ b/third_party/rust/futures-0.1.29/benches/sync_mpsc.rs @@ -0,0 +1,168 @@ +#![feature(test)] + +#[macro_use] +extern crate futures; +extern crate test; + +use futures::{Async, Poll, AsyncSink}; +use futures::executor; +use futures::executor::{Notify, NotifyHandle}; + +use futures::sink::Sink; +use futures::stream::Stream; + +use futures::sync::mpsc::unbounded; +use futures::sync::mpsc::channel; +use futures::sync::mpsc::Sender; +use futures::sync::mpsc::UnboundedSender; + + +use test::Bencher; + +fn notify_noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP : &'static Noop = &Noop; + + NotifyHandle::from(NOOP) +} + +/// Single producer, single consumer +#[bench] +fn unbounded_1_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = unbounded(); + + let mut rx = executor::spawn(rx); + + // 1000 iterations to avoid measuring overhead of initialization + // Result should be divided by 1000 + for i in 0..1000 { + + // Poll, not ready, park + assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1)); + + UnboundedSender::unbounded_send(&tx, i).unwrap(); + + // Now poll ready + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn unbounded_100_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = unbounded(); + + let mut rx = executor::spawn(rx); + + let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); + + // 1000 send/recv operations total, result should be divided by 1000 + for _ in 0..10 { + for i in 0..tx.len() { + assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1)); + + UnboundedSender::unbounded_send(&tx[i], i).unwrap(); + + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + } + }) +} + +#[bench] +fn unbounded_uncontended(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = unbounded(); + + for i in 0..1000 { + UnboundedSender::unbounded_send(&tx, i).expect("send"); + // No need to create a task, because poll is not going to park. + assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); + } + }) +} + + +/// A Stream that continuously sends incrementing number of the queue +struct TestSender { + tx: Sender<u32>, + last: u32, // Last number sent +} + +// Could be a Future, it doesn't matter +impl Stream for TestSender { + type Item = u32; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + match self.tx.start_send(self.last + 1) { + Err(_) => panic!(), + Ok(AsyncSink::Ready) => { + self.last += 1; + Ok(Async::Ready(Some(self.last))) + } + Ok(AsyncSink::NotReady(_)) => { + Ok(Async::NotReady) + } + } + } +} + + +/// Single producers, single consumer +#[bench] +fn bounded_1_tx(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = channel(0); + + let mut tx = executor::spawn(TestSender { + tx: tx, + last: 0, + }); + + let mut rx = executor::spawn(rx); + + for i in 0..1000 { + assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(¬ify_noop(), 1)); + assert_eq!(Ok(Async::NotReady), tx.poll_stream_notify(¬ify_noop(), 1)); + assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn bounded_100_tx(b: &mut Bencher) { + b.iter(|| { + // Each sender can send one item after specified capacity + let (tx, rx) = channel(0); + + let mut tx: Vec<_> = (0..100).map(|_| { + executor::spawn(TestSender { + tx: tx.clone(), + last: 0 + }) + }).collect(); + + let mut rx = executor::spawn(rx); + + for i in 0..10 { + for j in 0..tx.len() { + // Send an item + assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(¬ify_noop(), 1)); + // Then block + assert_eq!(Ok(Async::NotReady), tx[j].poll_stream_notify(¬ify_noop(), 1)); + // Recv the item + assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1)); + } + } + }) +} |