diff options
Diffstat (limited to 'third_party/rust/mio/test/test_battery.rs')
-rw-r--r-- | third_party/rust/mio/test/test_battery.rs | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/third_party/rust/mio/test/test_battery.rs b/third_party/rust/mio/test/test_battery.rs new file mode 100644 index 0000000000..fa3aff04df --- /dev/null +++ b/third_party/rust/mio/test/test_battery.rs @@ -0,0 +1,269 @@ +use {localhost, sleep_ms, TryRead, TryWrite}; +use mio::*; +use mio::deprecated::{EventLoop, EventLoopBuilder, Handler}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::LinkedList; +use slab::Slab; +use std::{io, thread}; +use std::time::Duration; + +// Don't touch the connection slab +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +#[cfg(windows)] +const N: usize = 10_000; +#[cfg(unix)] +const N: usize = 1_000_000; + +struct EchoConn { + sock: TcpStream, + token: Option<Token>, + count: usize, + buf: Vec<u8> +} + +impl EchoConn { + fn new(sock: TcpStream) -> EchoConn { + let mut ec = + EchoConn { + sock: sock, + token: None, + buf: Vec::with_capacity(22), + count: 0 + }; + unsafe { ec.buf.set_len(22) }; + ec + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + event_loop.reregister(&self.sock, self.token.unwrap(), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + loop { + match self.sock.try_read(&mut self.buf[..]) { + Ok(None) => { + break; + } + Ok(Some(_)) => { + self.count += 1; + if self.count % 10000 == 0 { + info!("Received {} messages", self.count); + } + if self.count == N { + event_loop.shutdown(); + } + } + Err(_) => { + break; + } + + }; + } + + event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct EchoServer { + sock: TcpListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap().0; + let conn = EchoConn::new(sock,); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(event_loop) + } + + fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(event_loop) + } + + fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: TcpStream, + backlog: LinkedList<String>, + token: Token, + count: u32 +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: TcpStream, tok: Token) -> EchoClient { + + EchoClient { + sock: sock, + backlog: LinkedList::new(), + token: tok, + count: 0 + } + } + + fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + Ok(()) + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket writable"); + + while self.backlog.len() > 0 { + match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) { + Ok(None) => { + break; + } + Ok(Some(_)) => { + self.backlog.pop_front(); + self.count += 1; + if self.count % 10000 == 0 { + info!("Sent {} messages", self.count); + } + } + Err(e) => { debug!("not implemented; client err={:?}", e); break; } + } + } + if self.backlog.len() > 0 { + event_loop.reregister(&self.sock, self.token, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + + Ok(()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: TcpListener, client: TcpStream) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128), + }, + client: EchoClient::new(client, CLIENT), + } + } +} + +impl Handler for Echo { + type Timeout = usize; + type Message = String; + + fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, + events: Ready) { + + if events.is_readable() { + match token { + SERVER => self.server.accept(event_loop).unwrap(), + CLIENT => self.client.readable(event_loop).unwrap(), + i => self.server.conn_readable(event_loop, i).unwrap() + } + } + if events.is_writable() { + match token { + SERVER => panic!("received writable for token 0"), + CLIENT => self.client.writable(event_loop).unwrap(), + _ => self.server.conn_writable(event_loop, token).unwrap() + } + } + } + + fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) { + match self.client.sock.try_write(msg.as_bytes()) { + Ok(Some(n)) => { + self.client.count += 1; + if self.client.count % 10000 == 0 { + info!("Sent {} bytes: count {}", n, self.client.count); + } + }, + + _ => { + self.client.backlog.push_back(msg); + event_loop.reregister( + &self.client.sock, + self.client.token, + Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + } + } +} + +#[test] +pub fn test_echo_server() { + debug!("Starting TEST_ECHO_SERVER"); + let mut b = EventLoopBuilder::new(); + b.notify_capacity(1_048_576) + .messages_per_tick(64) + .timer_tick(Duration::from_millis(100)) + .timer_wheel_size(1_024) + .timer_capacity(65_536); + + let mut event_loop = b.build().unwrap(); + + let addr = localhost(); + + let srv = TcpListener::bind(&addr).unwrap(); + + info!("listen for connections"); + event_loop.register(&srv, SERVER, Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = TcpStream::connect(&addr).unwrap(); + + // Connect to the server + event_loop.register(&sock, CLIENT, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + let chan = event_loop.channel(); + + let go = move || { + let mut i = N; + + sleep_ms(1_000); + + let message = "THIS IS A TEST MESSAGE".to_string(); + while i > 0 { + chan.send(message.clone()).unwrap(); + i -= 1; + if i % 10000 == 0 { + info!("Enqueued {} messages", N - i); + } + } + }; + + let t = thread::spawn(go); + + // Start the event loop + event_loop.run(&mut Echo::new(srv, sock)).unwrap(); + t.join().unwrap(); +} |