diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.11/benches')
-rw-r--r-- | third_party/rust/tokio-0.1.11/benches/latency.rs | 117 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/benches/mio-ops.rs | 58 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/benches/tcp.rs | 248 |
3 files changed, 423 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/benches/latency.rs b/third_party/rust/tokio-0.1.11/benches/latency.rs new file mode 100644 index 0000000000..c2619b7115 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/latency.rs @@ -0,0 +1,117 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate test; +#[macro_use] +extern crate futures; +extern crate tokio; + +use std::io; +use std::net::SocketAddr; +use std::thread; + +use futures::sync::oneshot; +use futures::sync::mpsc; +use futures::{Future, Poll, Sink, Stream}; +use test::Bencher; +use tokio::net::UdpSocket; + +/// UDP echo server +struct EchoServer { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl EchoServer { + fn new(s: UdpSocket) -> Self { + EchoServer { + socket: s, + to_send: None, + buf: vec![0u8; 1600], + } + } +} + +impl Future for EchoServer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + loop { + if let Some(&(size, peer)) = self.to_send.as_ref() { + try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); + self.to_send = None; + } + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); + } + } +} + +#[bench] +fn udp_echo_latency(b: &mut Bencher) { + let any_addr = "127.0.0.1:0".to_string(); + let any_addr = any_addr.parse::<SocketAddr>().unwrap(); + + let (stop_c, stop_p) = oneshot::channel::<()>(); + let (tx, rx) = oneshot::channel(); + + let child = thread::spawn(move || { + + let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap(); + tx.send(socket.local_addr().unwrap()).unwrap(); + + let server = EchoServer::new(socket); + let server = server.select(stop_p.map_err(|_| panic!())); + let server = server.map_err(|_| ()); + server.wait().unwrap(); + }); + + + let client = std::net::UdpSocket::bind(&any_addr).unwrap(); + + let server_addr = rx.wait().unwrap(); + let mut buf = [0u8; 1000]; + + // warmup phase; for some reason initial couple of + // runs are much slower + // + // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? + for _ in 0..8 { + client.send_to(&buf, &server_addr).unwrap(); + let _ = client.recv_from(&mut buf).unwrap(); + } + + b.iter(|| { + client.send_to(&buf, &server_addr).unwrap(); + let _ = client.recv_from(&mut buf).unwrap(); + }); + + stop_c.send(()).unwrap(); + child.join().unwrap(); +} + +#[bench] +fn futures_channel_latency(b: &mut Bencher) { + let (mut in_tx, in_rx) = mpsc::channel(32); + let (out_tx, out_rx) = mpsc::channel::<_>(32); + + let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait()); + let mut rx_iter = out_rx.wait(); + + // warmup phase; for some reason initial couple of runs are much slower + // + // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? + for _ in 0..8 { + in_tx.start_send(Ok(1usize)).unwrap(); + let _ = rx_iter.next(); + } + + b.iter(|| { + in_tx.start_send(Ok(1usize)).unwrap(); + let _ = rx_iter.next(); + }); + + drop(in_tx); + child.join().unwrap().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/benches/mio-ops.rs b/third_party/rust/tokio-0.1.11/benches/mio-ops.rs new file mode 100644 index 0000000000..6a71bebfe0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/mio-ops.rs @@ -0,0 +1,58 @@ +// Measure cost of different operations +// to get a sense of performance tradeoffs +#![feature(test)] +#![deny(warnings)] + +extern crate test; +extern crate mio; + +use test::Bencher; + +use mio::tcp::TcpListener; +use mio::{Token, Ready, PollOpt}; + + +#[bench] +fn mio_register_deregister(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + // Setup the server socket + let sock = TcpListener::bind(&addr).unwrap(); + let poll = mio::Poll::new().unwrap(); + + const CLIENT: Token = Token(1); + + b.iter(|| { + poll.register(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + poll.deregister(&sock).unwrap(); + }); +} + +#[bench] +fn mio_reregister(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + // Setup the server socket + let sock = TcpListener::bind(&addr).unwrap(); + let poll = mio::Poll::new().unwrap(); + + const CLIENT: Token = Token(1); + poll.register(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + + b.iter(|| { + poll.reregister(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + }); + poll.deregister(&sock).unwrap(); +} + +#[bench] +fn mio_poll(b: &mut Bencher) { + let poll = mio::Poll::new().unwrap(); + let timeout = std::time::Duration::new(0, 0); + let mut events = mio::Events::with_capacity(1024); + + b.iter(|| { + poll.poll(&mut events, Some(timeout)).unwrap(); + }); +} diff --git a/third_party/rust/tokio-0.1.11/benches/tcp.rs b/third_party/rust/tokio-0.1.11/benches/tcp.rs new file mode 100644 index 0000000000..fde72ce092 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/tcp.rs @@ -0,0 +1,248 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +#[macro_use] +extern crate tokio_io; + +pub extern crate test; + +mod prelude { + pub use futures::*; + pub use tokio::reactor::Reactor; + pub use tokio::net::{TcpListener, TcpStream}; + pub use tokio_io::io::read_to_end; + + pub use test::{self, Bencher}; + pub use std::thread; + pub use std::time::Duration; + pub use std::io::{self, Read, Write}; +} + +mod connect_churn { + use ::prelude::*; + + const NUM: usize = 300; + const CONCURRENT: usize = 8; + + #[bench] + fn one_thread(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + + b.iter(move || { + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); + + let connects = stream::iter_result((0..NUM).map(|_| { + Ok(TcpStream::connect(&addr) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) + })); + + let connects_concurrent = connects.buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())); + + serve_incomings.select(connects_concurrent) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); + }); + } + + fn n_workers(n: usize, b: &mut Bencher) { + let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); + let (addr_tx, addr_rx) = sync::oneshot::channel(); + + // Spawn reactor thread + let server_thread = thread::spawn(move || { + // Bind the TCP listener + let listener = TcpListener::bind( + &"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Get the address being listened on. + let addr = listener.local_addr().unwrap(); + + // Send the remote & address back to the main thread + addr_tx.send(addr).unwrap(); + + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); + + // Run server + serve_incomings.select(shutdown_rx) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); + }); + + // Get the bind addr of the server + let addr = addr_rx.wait().unwrap(); + + b.iter(move || { + use std::sync::{Barrier, Arc}; + + // Create a barrier to coordinate threads + let barrier = Arc::new(Barrier::new(n + 1)); + + // Spawn worker threads + let threads: Vec<_> = (0..n).map(|_| { + let barrier = barrier.clone(); + let addr = addr.clone(); + + thread::spawn(move || { + let connects = stream::iter_result((0..(NUM / n)).map(|_| { + Ok(TcpStream::connect(&addr) + .map_err(|e| panic!("connect err: {:?}", e)) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) + })); + + barrier.wait(); + + connects.buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())).wait().unwrap(); + }) + }).collect(); + + barrier.wait(); + + for th in threads { + th.join().unwrap(); + } + }); + + // Shutdown the server + shutdown_tx.send(()).unwrap(); + server_thread.join().unwrap(); + } + + #[bench] + fn two_threads(b: &mut Bencher) { + n_workers(1, b); + } + + #[bench] + fn multi_threads(b: &mut Bencher) { + n_workers(4, b); + } +} + +mod transfer { + use ::prelude::*; + use std::{cmp, mem}; + + const MB: usize = 3 * 1024 * 1024; + + struct Drain { + sock: TcpStream, + chunk: usize, + } + + impl Future for Drain { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + let mut buf: [u8; 1024] = unsafe { mem::uninitialized() }; + + loop { + match try_nb!(self.sock.read(&mut buf[..self.chunk])) { + 0 => return Ok(Async::Ready(())), + _ => {} + } + } + } + } + + struct Transfer { + sock: TcpStream, + rem: usize, + chunk: usize, + } + + impl Future for Transfer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + while self.rem > 0 { + let len = cmp::min(self.rem, self.chunk); + let buf = &DATA[..len]; + + let n = try_nb!(self.sock.write(&buf)); + self.rem -= n; + } + + Ok(Async::Ready(())) + } + } + + static DATA: [u8; 1024] = [0; 1024]; + + fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { + let addr = "127.0.0.1:0".parse().unwrap(); + + b.iter(move || { + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a single future that accepts 1 connection, Drain it and drops + let server = listener.incoming() + .into_future() // take the first connection + .map_err(|(e, _other_incomings)| e) + .map(|(connection, _other_incomings)| connection.unwrap()) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + let drain = Drain { + sock: sock, + chunk: read_size, + }; + drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e)) + }) + .map_err(|e| panic!("server err: {:?}", e)); + + let client = TcpStream::connect(&addr) + .and_then(move |sock| { + Transfer { + sock: sock, + rem: MB, + chunk: write_size, + } + }) + .map_err(|e| panic!("client err: {:?}", e)); + + server.join(client).wait().unwrap(); + }); + } + + mod small_chunks { + use ::prelude::*; + + #[bench] + fn one_thread(b: &mut Bencher) { + super::one_thread(b, 32, 32); + } + } + + mod big_chunks { + use ::prelude::*; + + #[bench] + fn one_thread(b: &mut Bencher) { + super::one_thread(b, 1_024, 1_024); + } + } +} |