summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/benches/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/benches/tcp.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/benches/tcp.rs248
1 files changed, 248 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/benches/tcp.rs b/third_party/rust/tokio-0.1.11/benches/tcp.rs
new file mode 100644
index 0000000000..fde72ce092
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/benches/tcp.rs
@@ -0,0 +1,248 @@
+#![feature(test)]
+#![deny(warnings)]
+
+extern crate futures;
+extern crate tokio;
+
+#[macro_use]
+extern crate tokio_io;
+
+pub extern crate test;
+
+mod prelude {
+ pub use futures::*;
+ pub use tokio::reactor::Reactor;
+ pub use tokio::net::{TcpListener, TcpStream};
+ pub use tokio_io::io::read_to_end;
+
+ pub use test::{self, Bencher};
+ pub use std::thread;
+ pub use std::time::Duration;
+ pub use std::io::{self, Read, Write};
+}
+
+mod connect_churn {
+ use ::prelude::*;
+
+ const NUM: usize = 300;
+ const CONCURRENT: usize = 8;
+
+ #[bench]
+ fn one_thread(b: &mut Bencher) {
+ let addr = "127.0.0.1:0".parse().unwrap();
+
+ b.iter(move || {
+ let listener = TcpListener::bind(&addr).unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ // Spawn a single future that accepts & drops connections
+ let serve_incomings = listener.incoming()
+ .map_err(|e| panic!("server err: {:?}", e))
+ .for_each(|_| Ok(()));
+
+ let connects = stream::iter_result((0..NUM).map(|_| {
+ Ok(TcpStream::connect(&addr)
+ .and_then(|sock| {
+ sock.set_linger(Some(Duration::from_secs(0))).unwrap();
+ read_to_end(sock, vec![])
+ }))
+ }));
+
+ let connects_concurrent = connects.buffer_unordered(CONCURRENT)
+ .map_err(|e| panic!("client err: {:?}", e))
+ .for_each(|_| Ok(()));
+
+ serve_incomings.select(connects_concurrent)
+ .map(|_| ()).map_err(|_| ())
+ .wait().unwrap();
+ });
+ }
+
+ fn n_workers(n: usize, b: &mut Bencher) {
+ let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
+ let (addr_tx, addr_rx) = sync::oneshot::channel();
+
+ // Spawn reactor thread
+ let server_thread = thread::spawn(move || {
+ // Bind the TCP listener
+ let listener = TcpListener::bind(
+ &"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Get the address being listened on.
+ let addr = listener.local_addr().unwrap();
+
+ // Send the remote & address back to the main thread
+ addr_tx.send(addr).unwrap();
+
+ // Spawn a single future that accepts & drops connections
+ let serve_incomings = listener.incoming()
+ .map_err(|e| panic!("server err: {:?}", e))
+ .for_each(|_| Ok(()));
+
+ // Run server
+ serve_incomings.select(shutdown_rx)
+ .map(|_| ()).map_err(|_| ())
+ .wait().unwrap();
+ });
+
+ // Get the bind addr of the server
+ let addr = addr_rx.wait().unwrap();
+
+ b.iter(move || {
+ use std::sync::{Barrier, Arc};
+
+ // Create a barrier to coordinate threads
+ let barrier = Arc::new(Barrier::new(n + 1));
+
+ // Spawn worker threads
+ let threads: Vec<_> = (0..n).map(|_| {
+ let barrier = barrier.clone();
+ let addr = addr.clone();
+
+ thread::spawn(move || {
+ let connects = stream::iter_result((0..(NUM / n)).map(|_| {
+ Ok(TcpStream::connect(&addr)
+ .map_err(|e| panic!("connect err: {:?}", e))
+ .and_then(|sock| {
+ sock.set_linger(Some(Duration::from_secs(0))).unwrap();
+ read_to_end(sock, vec![])
+ }))
+ }));
+
+ barrier.wait();
+
+ connects.buffer_unordered(CONCURRENT)
+ .map_err(|e| panic!("client err: {:?}", e))
+ .for_each(|_| Ok(())).wait().unwrap();
+ })
+ }).collect();
+
+ barrier.wait();
+
+ for th in threads {
+ th.join().unwrap();
+ }
+ });
+
+ // Shutdown the server
+ shutdown_tx.send(()).unwrap();
+ server_thread.join().unwrap();
+ }
+
+ #[bench]
+ fn two_threads(b: &mut Bencher) {
+ n_workers(1, b);
+ }
+
+ #[bench]
+ fn multi_threads(b: &mut Bencher) {
+ n_workers(4, b);
+ }
+}
+
+mod transfer {
+ use ::prelude::*;
+ use std::{cmp, mem};
+
+ const MB: usize = 3 * 1024 * 1024;
+
+ struct Drain {
+ sock: TcpStream,
+ chunk: usize,
+ }
+
+ impl Future for Drain {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };
+
+ loop {
+ match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
+ 0 => return Ok(Async::Ready(())),
+ _ => {}
+ }
+ }
+ }
+ }
+
+ struct Transfer {
+ sock: TcpStream,
+ rem: usize,
+ chunk: usize,
+ }
+
+ impl Future for Transfer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ while self.rem > 0 {
+ let len = cmp::min(self.rem, self.chunk);
+ let buf = &DATA[..len];
+
+ let n = try_nb!(self.sock.write(&buf));
+ self.rem -= n;
+ }
+
+ Ok(Async::Ready(()))
+ }
+ }
+
+ static DATA: [u8; 1024] = [0; 1024];
+
+ fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
+ let addr = "127.0.0.1:0".parse().unwrap();
+
+ b.iter(move || {
+ let listener = TcpListener::bind(&addr).unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ // Spawn a single future that accepts 1 connection, Drain it and drops
+ let server = listener.incoming()
+ .into_future() // take the first connection
+ .map_err(|(e, _other_incomings)| e)
+ .map(|(connection, _other_incomings)| connection.unwrap())
+ .and_then(|sock| {
+ sock.set_linger(Some(Duration::from_secs(0))).unwrap();
+ let drain = Drain {
+ sock: sock,
+ chunk: read_size,
+ };
+ drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e))
+ })
+ .map_err(|e| panic!("server err: {:?}", e));
+
+ let client = TcpStream::connect(&addr)
+ .and_then(move |sock| {
+ Transfer {
+ sock: sock,
+ rem: MB,
+ chunk: write_size,
+ }
+ })
+ .map_err(|e| panic!("client err: {:?}", e));
+
+ server.join(client).wait().unwrap();
+ });
+ }
+
+ mod small_chunks {
+ use ::prelude::*;
+
+ #[bench]
+ fn one_thread(b: &mut Bencher) {
+ super::one_thread(b, 32, 32);
+ }
+ }
+
+ mod big_chunks {
+ use ::prelude::*;
+
+ #[bench]
+ fn one_thread(b: &mut Bencher) {
+ super::one_thread(b, 1_024, 1_024);
+ }
+ }
+}