summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/benches/latency.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/benches/latency.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/benches/latency.rs117
1 files changed, 117 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/benches/latency.rs b/third_party/rust/tokio-0.1.11/benches/latency.rs
new file mode 100644
index 0000000000..c2619b7115
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/benches/latency.rs
@@ -0,0 +1,117 @@
+#![feature(test)]
+#![deny(warnings)]
+
+extern crate test;
+#[macro_use]
+extern crate futures;
+extern crate tokio;
+
+use std::io;
+use std::net::SocketAddr;
+use std::thread;
+
+use futures::sync::oneshot;
+use futures::sync::mpsc;
+use futures::{Future, Poll, Sink, Stream};
+use test::Bencher;
+use tokio::net::UdpSocket;
+
+/// UDP echo server
+struct EchoServer {
+ socket: UdpSocket,
+ buf: Vec<u8>,
+ to_send: Option<(usize, SocketAddr)>,
+}
+
+impl EchoServer {
+ fn new(s: UdpSocket) -> Self {
+ EchoServer {
+ socket: s,
+ to_send: None,
+ buf: vec![0u8; 1600],
+ }
+ }
+}
+
+impl Future for EchoServer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ loop {
+ if let Some(&(size, peer)) = self.to_send.as_ref() {
+ try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
+ self.to_send = None;
+ }
+ self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
+ }
+ }
+}
+
+#[bench]
+fn udp_echo_latency(b: &mut Bencher) {
+ let any_addr = "127.0.0.1:0".to_string();
+ let any_addr = any_addr.parse::<SocketAddr>().unwrap();
+
+ let (stop_c, stop_p) = oneshot::channel::<()>();
+ let (tx, rx) = oneshot::channel();
+
+ let child = thread::spawn(move || {
+
+ let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap();
+ tx.send(socket.local_addr().unwrap()).unwrap();
+
+ let server = EchoServer::new(socket);
+ let server = server.select(stop_p.map_err(|_| panic!()));
+ let server = server.map_err(|_| ());
+ server.wait().unwrap();
+ });
+
+
+ let client = std::net::UdpSocket::bind(&any_addr).unwrap();
+
+ let server_addr = rx.wait().unwrap();
+ let mut buf = [0u8; 1000];
+
+ // warmup phase; for some reason initial couple of
+ // runs are much slower
+ //
+ // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
+ for _ in 0..8 {
+ client.send_to(&buf, &server_addr).unwrap();
+ let _ = client.recv_from(&mut buf).unwrap();
+ }
+
+ b.iter(|| {
+ client.send_to(&buf, &server_addr).unwrap();
+ let _ = client.recv_from(&mut buf).unwrap();
+ });
+
+ stop_c.send(()).unwrap();
+ child.join().unwrap();
+}
+
+#[bench]
+fn futures_channel_latency(b: &mut Bencher) {
+ let (mut in_tx, in_rx) = mpsc::channel(32);
+ let (out_tx, out_rx) = mpsc::channel::<_>(32);
+
+ let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait());
+ let mut rx_iter = out_rx.wait();
+
+ // warmup phase; for some reason initial couple of runs are much slower
+ //
+ // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
+ for _ in 0..8 {
+ in_tx.start_send(Ok(1usize)).unwrap();
+ let _ = rx_iter.next();
+ }
+
+ b.iter(|| {
+ in_tx.start_send(Ok(1usize)).unwrap();
+ let _ = rx_iter.next();
+ });
+
+ drop(in_tx);
+ child.join().unwrap().unwrap();
+}