diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/mio/test | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio/test')
30 files changed, 5112 insertions, 0 deletions
diff --git a/third_party/rust/mio/test/benchmark.rs b/third_party/rust/mio/test/benchmark.rs new file mode 100644 index 0000000000..1e4fac517c --- /dev/null +++ b/third_party/rust/mio/test/benchmark.rs @@ -0,0 +1,80 @@ +use std::mem; +use mio::net::{AddressFamily, Inet, Inet6, SockAddr, InetAddr, IPv4Addr, SocketType, Dgram, Stream}; +use std::io::net::ip::IpAddr; +use native::NativeTaskBuilder; +use std::task::TaskBuilder; +use mio::os::{from_sockaddr}; +use time::Instant; +use std::vec::*; +use std::io::timer; + +mod nix { + pub use nix::c_int; + pub use nix::fcntl::{Fd, O_NONBLOCK, O_CLOEXEC}; + pub use nix::errno::{EWOULDBLOCK, EINPROGRESS}; + pub use nix::sys::socket::*; + pub use nix::unistd::*; + pub use nix::sys::epoll::*; +} + +fn timed(label: &str, f: ||) { + let start = Instant::now(); + f(); + let elapsed = start.elapsed(); + println!(" {}: {}", label, elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0); +} + +fn init(saddr: &str) -> (nix::Fd, nix::Fd) { + let optval = 1i; + let addr = SockAddr::parse(saddr.as_slice()).expect("could not parse InetAddr"); + let srvfd = nix::socket(nix::AF_INET, nix::SOCK_STREAM, nix::SOCK_CLOEXEC).unwrap(); + nix::setsockopt(srvfd, nix::SOL_SOCKET, nix::SO_REUSEADDR, &optval).unwrap(); + nix::bind(srvfd, &from_sockaddr(&addr)).unwrap(); + nix::listen(srvfd, 256u).unwrap(); + + let fd = nix::socket(nix::AF_INET, nix::SOCK_STREAM, nix::SOCK_CLOEXEC | nix::SOCK_NONBLOCK).unwrap(); + let res = nix::connect(fd, &from_sockaddr(&addr)); + let start = Instant::now(); + println!("connecting : {}", res); + + let clifd = nix::accept4(srvfd, nix::SOCK_CLOEXEC | nix::SOCK_NONBLOCK).unwrap(); + let elapsed = start.elapsed(); + println!("accepted : {} - {}", clifd, elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0); + + (clifd, srvfd) +} + +#[test] +fn read_bench() { + let (clifd, srvfd) = init("10.10.1.5:11111"); + let mut buf = Vec::with_capacity(1600); + unsafe { buf.set_len(1600); } + timed("read", || { + let mut i = 0u; + while i < 10000000 { + let res = nix::read(clifd, buf.as_mut_slice()); + assert_eq!(res.unwrap_err().kind, nix::EWOULDBLOCK); + i = i + 1; + } + }); +} + +#[test] +fn epollctl_bench() { + let (clifd, srvfd) = init("10.10.1.5:22222"); + + let epfd = nix::epoll_create().unwrap(); + let info = nix::EpollEvent { events: nix::EPOLLIN | nix::EPOLLONESHOT | nix::EPOLLET, + data: 0u64 }; + + nix::epoll_ctl(epfd, nix::EpollCtlAdd, clifd, &info); + + timed("epoll_ctl", || { + let mut i = 0u; + while i < 10000000 { + nix::epoll_ctl(epfd, nix::EpollCtlMod, clifd, &info); + i = i + 1; + } + }); + +} diff --git a/third_party/rust/mio/test/mod.rs b/third_party/rust/mio/test/mod.rs new file mode 100644 index 0000000000..e49034f17e --- /dev/null +++ b/third_party/rust/mio/test/mod.rs @@ -0,0 +1,214 @@ +#![allow(deprecated)] + +extern crate mio; +extern crate bytes; +extern crate net2; + +#[macro_use] +extern crate log; +extern crate env_logger; +extern crate iovec; +extern crate slab; +extern crate tempdir; + +#[cfg(target_os = "fuchsia")] +extern crate fuchsia_zircon as zircon; + +pub use ports::localhost; + +mod test_custom_evented; +mod test_close_on_drop; +mod test_double_register; +mod test_echo_server; +mod test_local_addr_ready; +mod test_multicast; +mod test_oneshot; +mod test_poll; +mod test_register_deregister; +mod test_register_multiple_event_loops; +mod test_reregister_without_poll; +mod test_smoke; +mod test_tcp; +mod test_tcp_level; +mod test_tcp_shutdown; +mod test_udp_level; +mod test_udp_socket; +mod test_write_then_drop; + +#[cfg(feature = "with-deprecated")] +mod test_notify; +#[cfg(feature = "with-deprecated")] +mod test_poll_channel; +#[cfg(feature = "with-deprecated")] +mod test_tick; + +// The following tests are for deprecated features. Only run these tests on +// platforms that were supported from before the features were deprecated +#[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))] +#[cfg(feature = "with-deprecated")] +mod test_battery; + +#[cfg(any(target_os = "macos", target_os = "linux"))] +#[cfg(feature = "with-deprecated")] +mod test_unix_echo_server; +#[cfg(any(target_os = "macos", target_os = "linux"))] +#[cfg(feature = "with-deprecated")] +mod test_unix_pass_fd; +#[cfg(any(target_os = "macos", target_os = "linux"))] +#[cfg(feature = "with-deprecated")] +mod test_uds_shutdown; +#[cfg(any(target_os = "macos", target_os = "linux"))] +#[cfg(feature = "with-deprecated")] +mod test_subprocess_pipe; +#[cfg(any(target_os = "macos", target_os = "linux"))] +#[cfg(feature = "with-deprecated")] +mod test_broken_pipe; + +#[cfg(any(target_os = "fuchsia"))] +mod test_fuchsia_handles; + +use bytes::{Buf, MutBuf}; +use std::io::{self, Read, Write}; +use std::time::Duration; +use mio::{Events, Poll}; +use mio::event::Event; + +pub trait TryRead { + fn try_read_buf<B: MutBuf>(&mut self, buf: &mut B) -> io::Result<Option<usize>> + where Self : Sized + { + // Reads the length of the slice supplied by buf.mut_bytes into the buffer + // This is not guaranteed to consume an entire datagram or segment. + // If your protocol is msg based (instead of continuous stream) you should + // ensure that your buffer is large enough to hold an entire segment (1532 bytes if not jumbo + // frames) + let res = self.try_read(unsafe { buf.mut_bytes() }); + + if let Ok(Some(cnt)) = res { + unsafe { buf.advance(cnt); } + } + + res + } + + fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>>; +} + +pub trait TryWrite { + fn try_write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<Option<usize>> + where Self : Sized + { + let res = self.try_write(buf.bytes()); + + if let Ok(Some(cnt)) = res { + buf.advance(cnt); + } + + res + } + + fn try_write(&mut self, buf: &[u8]) -> io::Result<Option<usize>>; +} + +impl<T: Read> TryRead for T { + fn try_read(&mut self, dst: &mut [u8]) -> io::Result<Option<usize>> { + self.read(dst).map_non_block() + } +} + +impl<T: Write> TryWrite for T { + fn try_write(&mut self, src: &[u8]) -> io::Result<Option<usize>> { + self.write(src).map_non_block() + } +} + +/* + * + * ===== Helpers ===== + * + */ + +/// A helper trait to provide the map_non_block function on Results. +trait MapNonBlock<T> { + /// Maps a `Result<T>` to a `Result<Option<T>>` by converting + /// operation-would-block errors into `Ok(None)`. + fn map_non_block(self) -> io::Result<Option<T>>; +} + +impl<T> MapNonBlock<T> for io::Result<T> { + fn map_non_block(self) -> io::Result<Option<T>> { + use std::io::ErrorKind::WouldBlock; + + match self { + Ok(value) => Ok(Some(value)), + Err(err) => { + if let WouldBlock = err.kind() { + Ok(None) + } else { + Err(err) + } + } + } + } +} + +mod ports { + use std::net::SocketAddr; + use std::str::FromStr; + use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; + use std::sync::atomic::Ordering::SeqCst; + + // Helper for getting a unique port for the task run + // TODO: Reuse ports to not spam the system + static mut NEXT_PORT: AtomicUsize = ATOMIC_USIZE_INIT; + const FIRST_PORT: usize = 18080; + + fn next_port() -> usize { + unsafe { + // If the atomic was never used, set it to the initial port + NEXT_PORT.compare_and_swap(0, FIRST_PORT, SeqCst); + + // Get and increment the port list + NEXT_PORT.fetch_add(1, SeqCst) + } + } + + pub fn localhost() -> SocketAddr { + let s = format!("127.0.0.1:{}", next_port()); + FromStr::from_str(&s).unwrap() + } +} + +pub fn sleep_ms(ms: u64) { + use std::thread; + thread::sleep(Duration::from_millis(ms)); +} + +pub fn expect_events(poll: &Poll, + event_buffer: &mut Events, + poll_try_count: usize, + mut expected: Vec<Event>) +{ + const MS: u64 = 1_000; + + for _ in 0..poll_try_count { + poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap(); + for event in event_buffer.iter() { + let pos_opt = match expected.iter().position(|exp_event| { + (event.token() == exp_event.token()) && + event.readiness().contains(exp_event.readiness()) + }) { + Some(x) => Some(x), + None => None, + }; + if let Some(pos) = pos_opt { expected.remove(pos); } + } + + if expected.is_empty() { + break; + } + } + + assert!(expected.is_empty(), "The following expected events were not found: {:?}", expected); +} + diff --git a/third_party/rust/mio/test/test_battery.rs b/third_party/rust/mio/test/test_battery.rs new file mode 100644 index 0000000000..fa3aff04df --- /dev/null +++ b/third_party/rust/mio/test/test_battery.rs @@ -0,0 +1,269 @@ +use {localhost, sleep_ms, TryRead, TryWrite}; +use mio::*; +use mio::deprecated::{EventLoop, EventLoopBuilder, Handler}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::LinkedList; +use slab::Slab; +use std::{io, thread}; +use std::time::Duration; + +// Don't touch the connection slab +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +#[cfg(windows)] +const N: usize = 10_000; +#[cfg(unix)] +const N: usize = 1_000_000; + +struct EchoConn { + sock: TcpStream, + token: Option<Token>, + count: usize, + buf: Vec<u8> +} + +impl EchoConn { + fn new(sock: TcpStream) -> EchoConn { + let mut ec = + EchoConn { + sock: sock, + token: None, + buf: Vec::with_capacity(22), + count: 0 + }; + unsafe { ec.buf.set_len(22) }; + ec + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + event_loop.reregister(&self.sock, self.token.unwrap(), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + loop { + match self.sock.try_read(&mut self.buf[..]) { + Ok(None) => { + break; + } + Ok(Some(_)) => { + self.count += 1; + if self.count % 10000 == 0 { + info!("Received {} messages", self.count); + } + if self.count == N { + event_loop.shutdown(); + } + } + Err(_) => { + break; + } + + }; + } + + event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct EchoServer { + sock: TcpListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap().0; + let conn = EchoConn::new(sock,); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(event_loop) + } + + fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(event_loop) + } + + fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: TcpStream, + backlog: LinkedList<String>, + token: Token, + count: u32 +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: TcpStream, tok: Token) -> EchoClient { + + EchoClient { + sock: sock, + backlog: LinkedList::new(), + token: tok, + count: 0 + } + } + + fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + Ok(()) + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket writable"); + + while self.backlog.len() > 0 { + match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) { + Ok(None) => { + break; + } + Ok(Some(_)) => { + self.backlog.pop_front(); + self.count += 1; + if self.count % 10000 == 0 { + info!("Sent {} messages", self.count); + } + } + Err(e) => { debug!("not implemented; client err={:?}", e); break; } + } + } + if self.backlog.len() > 0 { + event_loop.reregister(&self.sock, self.token, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + + Ok(()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: TcpListener, client: TcpStream) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128), + }, + client: EchoClient::new(client, CLIENT), + } + } +} + +impl Handler for Echo { + type Timeout = usize; + type Message = String; + + fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, + events: Ready) { + + if events.is_readable() { + match token { + SERVER => self.server.accept(event_loop).unwrap(), + CLIENT => self.client.readable(event_loop).unwrap(), + i => self.server.conn_readable(event_loop, i).unwrap() + } + } + if events.is_writable() { + match token { + SERVER => panic!("received writable for token 0"), + CLIENT => self.client.writable(event_loop).unwrap(), + _ => self.server.conn_writable(event_loop, token).unwrap() + } + } + } + + fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) { + match self.client.sock.try_write(msg.as_bytes()) { + Ok(Some(n)) => { + self.client.count += 1; + if self.client.count % 10000 == 0 { + info!("Sent {} bytes: count {}", n, self.client.count); + } + }, + + _ => { + self.client.backlog.push_back(msg); + event_loop.reregister( + &self.client.sock, + self.client.token, + Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + } + } +} + +#[test] +pub fn test_echo_server() { + debug!("Starting TEST_ECHO_SERVER"); + let mut b = EventLoopBuilder::new(); + b.notify_capacity(1_048_576) + .messages_per_tick(64) + .timer_tick(Duration::from_millis(100)) + .timer_wheel_size(1_024) + .timer_capacity(65_536); + + let mut event_loop = b.build().unwrap(); + + let addr = localhost(); + + let srv = TcpListener::bind(&addr).unwrap(); + + info!("listen for connections"); + event_loop.register(&srv, SERVER, Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = TcpStream::connect(&addr).unwrap(); + + // Connect to the server + event_loop.register(&sock, CLIENT, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + let chan = event_loop.channel(); + + let go = move || { + let mut i = N; + + sleep_ms(1_000); + + let message = "THIS IS A TEST MESSAGE".to_string(); + while i > 0 { + chan.send(message.clone()).unwrap(); + i -= 1; + if i % 10000 == 0 { + info!("Enqueued {} messages", N - i); + } + } + }; + + let t = thread::spawn(go); + + // Start the event loop + event_loop.run(&mut Echo::new(srv, sock)).unwrap(); + t.join().unwrap(); +} diff --git a/third_party/rust/mio/test/test_broken_pipe.rs b/third_party/rust/mio/test/test_broken_pipe.rs new file mode 100644 index 0000000000..1cd0ca7465 --- /dev/null +++ b/third_party/rust/mio/test/test_broken_pipe.rs @@ -0,0 +1,28 @@ +use mio::{Token, Ready, PollOpt}; +use mio::deprecated::{unix, EventLoop, Handler}; +use std::time::Duration; + +pub struct BrokenPipeHandler; + +impl Handler for BrokenPipeHandler { + type Timeout = (); + type Message = (); + fn ready(&mut self, _: &mut EventLoop<Self>, token: Token, _: Ready) { + if token == Token(1) { + panic!("Received ready() on a closed pipe."); + } + } +} + +#[test] +pub fn broken_pipe() { + let mut event_loop: EventLoop<BrokenPipeHandler> = EventLoop::new().unwrap(); + let (reader, _) = unix::pipe().unwrap(); + + event_loop.register(&reader, Token(1), Ready::all(), PollOpt::edge()) + .unwrap(); + + let mut handler = BrokenPipeHandler; + drop(reader); + event_loop.run_once(&mut handler, Some(Duration::from_millis(1000))).unwrap(); +} diff --git a/third_party/rust/mio/test/test_close_on_drop.rs b/third_party/rust/mio/test/test_close_on_drop.rs new file mode 100644 index 0000000000..6cd7d1365c --- /dev/null +++ b/third_party/rust/mio/test/test_close_on_drop.rs @@ -0,0 +1,119 @@ +use {localhost, TryRead}; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use bytes::ByteBuf; +use mio::net::{TcpListener, TcpStream}; + +use self::TestState::{Initial, AfterRead}; + +const SERVER: Token = Token(0); +const CLIENT: Token = Token(1); + +#[derive(Debug, PartialEq)] +enum TestState { + Initial, + AfterRead, +} + +struct TestHandler { + srv: TcpListener, + cli: TcpStream, + state: TestState, + shutdown: bool, +} + +impl TestHandler { + fn new(srv: TcpListener, cli: TcpStream) -> TestHandler { + TestHandler { + srv, + cli, + state: Initial, + shutdown: false, + } + } + + fn handle_read(&mut self, poll: &mut Poll, tok: Token, events: Ready) { + debug!("readable; tok={:?}; hint={:?}", tok, events); + + match tok { + SERVER => { + debug!("server connection ready for accept"); + let _ = self.srv.accept().unwrap(); + } + CLIENT => { + debug!("client readable"); + + match self.state { + Initial => { + let mut buf = [0; 4096]; + debug!("GOT={:?}", self.cli.try_read(&mut buf[..])); + self.state = AfterRead; + }, + AfterRead => {} + } + + let mut buf = ByteBuf::mut_with_capacity(1024); + + match self.cli.try_read_buf(&mut buf) { + Ok(Some(0)) => self.shutdown = true, + Ok(_) => panic!("the client socket should not be readable"), + Err(e) => panic!("Unexpected error {:?}", e) + } + } + _ => panic!("received unknown token {:?}", tok) + } + poll.reregister(&self.cli, CLIENT, Ready::readable(), PollOpt::edge()).unwrap(); + } + + fn handle_write(&mut self, poll: &mut Poll, tok: Token, _: Ready) { + match tok { + SERVER => panic!("received writable for token 0"), + CLIENT => { + debug!("client connected"); + poll.reregister(&self.cli, CLIENT, Ready::readable(), PollOpt::edge()).unwrap(); + } + _ => panic!("received unknown token {:?}", tok) + } + } +} + +#[test] +pub fn test_close_on_drop() { + let _ = ::env_logger::init(); + debug!("Starting TEST_CLOSE_ON_DROP"); + let mut poll = Poll::new().unwrap(); + + // The address to connect to - localhost + a unique port + let addr = localhost(); + + // == Create & setup server socket + let srv = TcpListener::bind(&addr).unwrap(); + + poll.register(&srv, SERVER, Ready::readable(), PollOpt::edge()).unwrap(); + + // == Create & setup client socket + let sock = TcpStream::connect(&addr).unwrap(); + + poll.register(&sock, CLIENT, Ready::writable(), PollOpt::edge()).unwrap(); + + // == Create storage for events + let mut events = Events::with_capacity(1024); + + // == Setup test handler + let mut handler = TestHandler::new(srv, sock); + + // == Run test + while !handler.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.readiness().is_readable() { + handler.handle_read(&mut poll, event.token(), event.readiness()); + } + + if event.readiness().is_writable() { + handler.handle_write(&mut poll, event.token(), event.readiness()); + } + } + } + assert!(handler.state == AfterRead, "actual={:?}", handler.state); +} diff --git a/third_party/rust/mio/test/test_custom_evented.rs b/third_party/rust/mio/test/test_custom_evented.rs new file mode 100644 index 0000000000..08842fc289 --- /dev/null +++ b/third_party/rust/mio/test/test_custom_evented.rs @@ -0,0 +1,394 @@ +use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use mio::event::Evented; +use std::time::Duration; + +#[test] +fn smoke() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + + let (r, set) = Registration::new2(); + r.register(&poll, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + + let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap(); + assert_eq!(n, 0); + + set.set_readiness(Ready::readable()).unwrap(); + + let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap(); + assert_eq!(n, 1); + + assert_eq!(events.get(0).unwrap().token(), Token(0)); +} + +#[test] +fn set_readiness_before_register() { + use std::sync::{Arc, Barrier}; + use std::thread; + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + + for _ in 0..5_000 { + let (r, set) = Registration::new2(); + + let b1 = Arc::new(Barrier::new(2)); + let b2 = b1.clone(); + + let th = thread::spawn(move || { + // set readiness before register + set.set_readiness(Ready::readable()).unwrap(); + + // run into barrier so both can pass + b2.wait(); + }); + + // wait for readiness + b1.wait(); + + // now register + poll.register(&r, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); + + loop { + let n = poll.poll(&mut events, None).unwrap(); + + if n == 0 { + continue; + } + + assert_eq!(n, 1); + assert_eq!(events.get(0).unwrap().token(), Token(123)); + break; + } + + th.join().unwrap(); + } +} + +#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))] +mod stress { + use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; + use mio::event::Evented; + use std::time::Duration; + + #[test] + fn single_threaded_poll() { + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::{Acquire, Release}; + use std::thread; + + const NUM_ATTEMPTS: usize = 30; + const NUM_ITERS: usize = 500; + const NUM_THREADS: usize = 4; + const NUM_REGISTRATIONS: usize = 128; + + for _ in 0..NUM_ATTEMPTS { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(NUM_REGISTRATIONS); + + let registrations: Vec<_> = (0..NUM_REGISTRATIONS).map(|i| { + let (r, s) = Registration::new2(); + r.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap(); + (r, s) + }).collect(); + + let mut ready: Vec<_> = (0..NUM_REGISTRATIONS).map(|_| Ready::empty()).collect(); + + let remaining = Arc::new(AtomicUsize::new(NUM_THREADS)); + + for _ in 0..NUM_THREADS { + let remaining = remaining.clone(); + + let set_readiness: Vec<SetReadiness> = + registrations.iter().map(|r| r.1.clone()).collect(); + + thread::spawn(move || { + for _ in 0..NUM_ITERS { + for i in 0..NUM_REGISTRATIONS { + set_readiness[i].set_readiness(Ready::readable()).unwrap(); + set_readiness[i].set_readiness(Ready::empty()).unwrap(); + set_readiness[i].set_readiness(Ready::writable()).unwrap(); + set_readiness[i].set_readiness(Ready::readable() | Ready::writable()).unwrap(); + set_readiness[i].set_readiness(Ready::empty()).unwrap(); + } + } + + for i in 0..NUM_REGISTRATIONS { + set_readiness[i].set_readiness(Ready::readable()).unwrap(); + } + + remaining.fetch_sub(1, Release); + }); + } + + while remaining.load(Acquire) > 0 { + // Set interest + for (i, &(ref r, _)) in registrations.iter().enumerate() { + r.reregister(&poll, Token(i), Ready::writable(), PollOpt::edge()).unwrap(); + } + + poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap(); + + for event in &events { + ready[event.token().0] = event.readiness(); + } + + // Update registration + // Set interest + for (i, &(ref r, _)) in registrations.iter().enumerate() { + r.reregister(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap(); + } + } + + // Finall polls, repeat until readiness-queue empty + loop { + // Might not read all events from custom-event-queue at once, implementation dependend + poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap(); + if events.is_empty() { + // no more events in readiness queue pending + break; + } + for event in &events { + ready[event.token().0] = event.readiness(); + } + } + + // Everything should be flagged as readable + for ready in ready { + assert_eq!(ready, Ready::readable()); + } + } + } + + #[test] + fn multi_threaded_poll() { + use std::sync::{Arc, Barrier}; + use std::sync::atomic::{AtomicUsize}; + use std::sync::atomic::Ordering::{Relaxed, SeqCst}; + use std::thread; + + const ENTRIES: usize = 10_000; + const PER_ENTRY: usize = 16; + const THREADS: usize = 4; + const NUM: usize = ENTRIES * PER_ENTRY; + + struct Entry { + #[allow(dead_code)] + registration: Registration, + set_readiness: SetReadiness, + num: AtomicUsize, + } + + impl Entry { + fn fire(&self) { + self.set_readiness.set_readiness(Ready::readable()).unwrap(); + } + } + + let poll = Arc::new(Poll::new().unwrap()); + let mut entries = vec![]; + + // Create entries + for i in 0..ENTRIES { + let (registration, set_readiness) = Registration::new2(); + registration.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap(); + + entries.push(Entry { + registration, + set_readiness, + num: AtomicUsize::new(0), + }); + } + + let total = Arc::new(AtomicUsize::new(0)); + let entries = Arc::new(entries); + let barrier = Arc::new(Barrier::new(THREADS)); + + let mut threads = vec![]; + + for th in 0..THREADS { + let poll = poll.clone(); + let total = total.clone(); + let entries = entries.clone(); + let barrier = barrier.clone(); + + threads.push(thread::spawn(move || { + let mut events = Events::with_capacity(128); + + barrier.wait(); + + // Prime all the registrations + let mut i = th; + while i < ENTRIES { + entries[i].fire(); + i += THREADS; + } + + let mut n = 0; + + + while total.load(SeqCst) < NUM { + // A poll timeout is necessary here because there may be more + // than one threads blocked in `poll` when the final wakeup + // notification arrives (and only notifies one thread). + n += poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); + + let mut num_this_tick = 0; + + for event in &events { + let e = &entries[event.token().0]; + + let mut num = e.num.load(Relaxed); + + loop { + if num < PER_ENTRY { + let actual = e.num.compare_and_swap(num, num + 1, Relaxed); + + if actual == num { + num_this_tick += 1; + e.fire(); + break; + } + + num = actual; + } else { + break; + } + } + } + + total.fetch_add(num_this_tick, SeqCst); + } + + n + })); + } + + let _: Vec<_> = threads.into_iter() + .map(|th| th.join().unwrap()) + .collect(); + + for entry in entries.iter() { + assert_eq!(PER_ENTRY, entry.num.load(Relaxed)); + } + } + + #[test] + fn with_small_events_collection() { + const N: usize = 8; + const ITER: usize = 1_000; + + use std::sync::{Arc, Barrier}; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering::{Acquire, Release}; + use std::thread; + + let poll = Poll::new().unwrap(); + let mut registrations = vec![]; + + let barrier = Arc::new(Barrier::new(N + 1)); + let done = Arc::new(AtomicBool::new(false)); + + for i in 0..N { + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, Token(i), Ready::readable(), PollOpt::edge()).unwrap(); + + registrations.push(registration); + + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + barrier.wait(); + + while !done.load(Acquire) { + set_readiness.set_readiness(Ready::readable()).unwrap(); + } + + // Set one last time + set_readiness.set_readiness(Ready::readable()).unwrap(); + }); + } + + let mut events = Events::with_capacity(4); + + barrier.wait(); + + for _ in 0..ITER { + poll.poll(&mut events, None).unwrap(); + } + + done.store(true, Release); + + let mut final_ready = vec![false; N]; + + + for _ in 0..5 { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + final_ready[event.token().0] = true; + } + + if final_ready.iter().all(|v| *v) { + return; + } + + thread::sleep(Duration::from_millis(10)); + } + + panic!("dead lock?"); + } +} + +#[test] +fn drop_registration_from_non_main_thread() { + use std::thread; + use std::sync::mpsc::channel; + + const THREADS: usize = 8; + const ITERS: usize = 50_000; + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut senders = Vec::with_capacity(THREADS); + let mut token_index = 0; + + // spawn threads, which will send messages to single receiver + for _ in 0..THREADS { + let (tx, rx) = channel::<(Registration, SetReadiness)>(); + senders.push(tx); + + thread::spawn(move || { + for (registration, set_readiness) in rx { + let _ = set_readiness.set_readiness(Ready::readable()); + drop(registration); + drop(set_readiness); + } + }); + } + + let mut index: usize = 0; + for _ in 0..ITERS { + let (registration, set_readiness) = Registration::new2(); + registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap(); + let _ = senders[index].send((registration, set_readiness)); + + token_index += 1; + index += 1; + if index == THREADS { + index = 0; + + let (registration, set_readiness) = Registration::new2(); + registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap(); + let _ = set_readiness.set_readiness(Ready::readable()); + drop(registration); + drop(set_readiness); + token_index += 1; + + thread::park_timeout(Duration::from_millis(0)); + let _ = poll.poll(&mut events, None).unwrap(); + } + } +} diff --git a/third_party/rust/mio/test/test_double_register.rs b/third_party/rust/mio/test/test_double_register.rs new file mode 100644 index 0000000000..c3d011c81e --- /dev/null +++ b/third_party/rust/mio/test/test_double_register.rs @@ -0,0 +1,17 @@ +//! A smoke test for windows compatibility + +#[test] +#[cfg(any(target_os = "linux", target_os = "windows"))] +pub fn test_double_register() { + use mio::*; + use mio::net::TcpListener; + + let poll = Poll::new().unwrap(); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + assert!(poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).is_err()); +} diff --git a/third_party/rust/mio/test/test_echo_server.rs b/third_party/rust/mio/test/test_echo_server.rs new file mode 100644 index 0000000000..e20ae98e5a --- /dev/null +++ b/third_party/rust/mio/test/test_echo_server.rs @@ -0,0 +1,303 @@ +use {localhost, TryRead, TryWrite}; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::net::{TcpListener, TcpStream}; +use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf}; +use slab::Slab; +use std::io; + +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +struct EchoConn { + sock: TcpStream, + buf: Option<ByteBuf>, + mut_buf: Option<MutByteBuf>, + token: Option<Token>, + interest: Ready +} + +impl EchoConn { + fn new(sock: TcpStream) -> EchoConn { + EchoConn { + sock, + buf: None, + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token: None, + interest: Ready::empty(), + } + } + + fn writable(&mut self, poll: &mut Poll) -> io::Result<()> { + let mut buf = self.buf.take().unwrap(); + + match self.sock.try_write_buf(&mut buf) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + + self.buf = Some(buf); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CONN : we wrote {} bytes!", r); + + self.mut_buf = Some(buf.flip()); + + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e), + } + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + poll.reregister(&self.sock, self.token.unwrap(), self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, poll: &mut Poll) -> io::Result<()> { + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CONN : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + debug!("CONN : we read {} bytes!", r); + + // prepare to provide this to writable + self.buf = Some(buf.flip()); + + self.interest.remove(Ready::readable()); + self.interest.insert(Ready::writable()); + } + Err(e) => { + debug!("not implemented; client err={:?}", e); + self.interest.remove(Ready::readable()); + } + + }; + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + poll.reregister(&self.sock, self.token.unwrap(), self.interest, + PollOpt::edge()) + } +} + +struct EchoServer { + sock: TcpListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, poll: &mut Poll) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap().0; + let conn = EchoConn::new(sock,); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + poll.register(&self.conns[tok].sock, Token(tok), Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, poll: &mut Poll, + tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(poll) + } + + fn conn_writable(&mut self, poll: &mut Poll, + tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(poll) + } + + fn conn(&mut self, tok: Token) -> &mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: TcpStream, + msgs: Vec<&'static str>, + tx: SliceBuf<'static>, + rx: SliceBuf<'static>, + mut_buf: Option<MutByteBuf>, + token: Token, + interest: Ready, + shutdown: bool, +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: TcpStream, token: Token, mut msgs: Vec<&'static str>) -> EchoClient { + let curr = msgs.remove(0); + + EchoClient { + sock, + msgs, + tx: SliceBuf::wrap(curr.as_bytes()), + rx: SliceBuf::wrap(curr.as_bytes()), + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token, + interest: Ready::empty(), + shutdown: false, + } + } + + fn readable(&mut self, poll: &mut Poll) -> io::Result<()> { + debug!("client socket readable"); + + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CLIENT : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + debug!("CLIENT : We read {} bytes!", r); + + // prepare for reading + let mut buf = buf.flip(); + + while buf.has_remaining() { + let actual = buf.read_byte().unwrap(); + let expect = self.rx.read_byte().unwrap(); + + assert!(actual == expect, "actual={}; expect={}", actual, expect); + } + + self.mut_buf = Some(buf.flip()); + + self.interest.remove(Ready::readable()); + + if !self.rx.has_remaining() { + self.next_msg(poll).unwrap(); + } + } + Err(e) => { + panic!("not implemented; client err={:?}", e); + } + }; + + if !self.interest.is_empty() { + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + poll.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot())?; + } + + Ok(()) + } + + fn writable(&mut self, poll: &mut Poll) -> io::Result<()> { + debug!("client socket writable"); + + match self.sock.try_write_buf(&mut self.tx) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CLIENT : we wrote {} bytes!", r); + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e) + } + + if self.interest.is_readable() || self.interest.is_writable() { + try!(poll.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot())); + } + + Ok(()) + } + + fn next_msg(&mut self, poll: &mut Poll) -> io::Result<()> { + if self.msgs.is_empty() { + self.shutdown = true; + return Ok(()); + } + + let curr = self.msgs.remove(0); + + debug!("client prepping next message"); + self.tx = SliceBuf::wrap(curr.as_bytes()); + self.rx = SliceBuf::wrap(curr.as_bytes()); + + self.interest.insert(Ready::writable()); + poll.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: TcpListener, client: TcpStream, msgs: Vec<&'static str>) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128) + }, + client: EchoClient::new(client, CLIENT, msgs) + } + } +} + +#[test] +pub fn test_echo_server() { + debug!("Starting TEST_ECHO_SERVER"); + let mut poll = Poll::new().unwrap(); + + let addr = localhost(); + let srv = TcpListener::bind(&addr).unwrap(); + + info!("listen for connections"); + poll.register(&srv, SERVER, Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = TcpStream::connect(&addr).unwrap(); + + // Connect to the server + poll.register(&sock, CLIENT, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + // == Create storage for events + let mut events = Events::with_capacity(1024); + + let mut handler = Echo::new(srv, sock, vec!["foo", "bar"]); + + // Start the event loop + while !handler.client.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + debug!("ready {:?} {:?}", event.token(), event.readiness()); + if event.readiness().is_readable() { + match event.token() { + SERVER => handler.server.accept(&mut poll).unwrap(), + CLIENT => handler.client.readable(&mut poll).unwrap(), + i => handler.server.conn_readable(&mut poll, i).unwrap() + } + } + + if event.readiness().is_writable() { + match event.token() { + SERVER => panic!("received writable for token 0"), + CLIENT => handler.client.writable(&mut poll).unwrap(), + i => handler.server.conn_writable(&mut poll, i).unwrap() + }; + } + } + } +} diff --git a/third_party/rust/mio/test/test_fuchsia_handles.rs b/third_party/rust/mio/test/test_fuchsia_handles.rs new file mode 100644 index 0000000000..85a14327f9 --- /dev/null +++ b/third_party/rust/mio/test/test_fuchsia_handles.rs @@ -0,0 +1,30 @@ +use mio::*; +use mio::fuchsia::EventedHandle; +use zircon::{self, AsHandleRef}; +use std::time::Duration; + +const MS: u64 = 1_000; + +#[test] +pub fn test_fuchsia_channel() { + let poll = Poll::new().unwrap(); + let mut event_buffer = Events::with_capacity(1); + let event_buffer = &mut event_buffer; + + let (channel0, channel1) = zircon::Channel::create(zircon::ChannelOpts::Normal).unwrap(); + let channel1_evented = unsafe { EventedHandle::new(channel1.raw_handle()) }; + + poll.register(&channel1_evented, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap(); + assert_eq!(event_buffer.len(), 0); + + channel0.write(&[1, 2, 3], &mut vec![], 0).unwrap(); + + poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap(); + let event = event_buffer.get(0).unwrap(); + assert_eq!(event.token(), Token(1)); + assert!(event.readiness().is_readable()); + + poll.deregister(&channel1_evented).unwrap(); +}
\ No newline at end of file diff --git a/third_party/rust/mio/test/test_local_addr_ready.rs b/third_party/rust/mio/test/test_local_addr_ready.rs new file mode 100644 index 0000000000..2e97f52449 --- /dev/null +++ b/third_party/rust/mio/test/test_local_addr_ready.rs @@ -0,0 +1,67 @@ +use {TryWrite}; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::net::{TcpListener, TcpStream}; + +const LISTEN: Token = Token(0); +const CLIENT: Token = Token(1); +const SERVER: Token = Token(2); + +struct MyHandler { + listener: TcpListener, + connected: TcpStream, + accepted: Option<TcpStream>, + shutdown: bool, +} + +#[test] +fn local_addr_ready() { + let addr = "127.0.0.1:0".parse().unwrap(); + let server = TcpListener::bind(&addr).unwrap(); + let addr = server.local_addr().unwrap(); + + let poll = Poll::new().unwrap(); + poll.register(&server, LISTEN, Ready::readable(), + PollOpt::edge()).unwrap(); + + let sock = TcpStream::connect(&addr).unwrap(); + poll.register(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + + let mut handler = MyHandler { + listener: server, + connected: sock, + accepted: None, + shutdown: false, + }; + + while !handler.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + match event.token() { + LISTEN => { + let sock = handler.listener.accept().unwrap().0; + poll.register(&sock, + SERVER, + Ready::writable(), + PollOpt::edge()).unwrap(); + handler.accepted = Some(sock); + } + SERVER => { + handler.accepted.as_ref().unwrap().peer_addr().unwrap(); + handler.accepted.as_ref().unwrap().local_addr().unwrap(); + handler.accepted.as_mut().unwrap().try_write(&[1, 2, 3]).unwrap(); + handler.accepted = None; + } + CLIENT => { + handler.connected.peer_addr().unwrap(); + handler.connected.local_addr().unwrap(); + handler.shutdown = true; + } + _ => panic!("unexpected token"), + } + } + } +} diff --git a/third_party/rust/mio/test/test_multicast.rs b/third_party/rust/mio/test/test_multicast.rs new file mode 100644 index 0000000000..b73e0d5c3a --- /dev/null +++ b/third_party/rust/mio/test/test_multicast.rs @@ -0,0 +1,107 @@ +// TODO: This doesn't pass on android 64bit CI... +// Figure out why! +#![cfg(not(target_os = "android"))] + +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::net::UdpSocket; +use bytes::{Buf, MutBuf, RingBuf, SliceBuf}; +use std::str; +use std::net::IpAddr; +use localhost; + +const LISTENER: Token = Token(0); +const SENDER: Token = Token(1); + +pub struct UdpHandler { + tx: UdpSocket, + rx: UdpSocket, + msg: &'static str, + buf: SliceBuf<'static>, + rx_buf: RingBuf, + localhost: IpAddr, + shutdown: bool, +} + +impl UdpHandler { + fn new(tx: UdpSocket, rx: UdpSocket, msg: &'static str) -> UdpHandler { + let sock = UdpSocket::bind(&"127.0.0.1:12345".parse().unwrap()).unwrap(); + UdpHandler { + tx, + rx, + msg, + buf: SliceBuf::wrap(msg.as_bytes()), + rx_buf: RingBuf::new(1024), + localhost: sock.local_addr().unwrap().ip(), + shutdown: false, + } + } + + fn handle_read(&mut self, _: &mut Poll, token: Token, _: Ready) { + if let LISTENER = token { + debug!("We are receiving a datagram now..."); + match unsafe { self.rx.recv_from(self.rx_buf.mut_bytes()) } { + Ok((cnt, addr)) => { + unsafe { MutBuf::advance(&mut self.rx_buf, cnt); } + assert_eq!(addr.ip(), self.localhost); + } + res => panic!("unexpected result: {:?}", res), + } + assert!(str::from_utf8(self.rx_buf.bytes()).unwrap() == self.msg); + self.shutdown = true; + } + } + + fn handle_write(&mut self, _: &mut Poll, token: Token, _: Ready) { + if let SENDER = token { + let addr = self.rx.local_addr().unwrap(); + let cnt = self.tx.send_to(self.buf.bytes(), &addr).unwrap(); + self.buf.advance(cnt); + } + } +} + +#[test] +pub fn test_multicast() { + drop(::env_logger::init()); + debug!("Starting TEST_UDP_CONNECTIONLESS"); + let mut poll = Poll::new().unwrap(); + + let addr = localhost(); + let any = "0.0.0.0:0".parse().unwrap(); + + let tx = UdpSocket::bind(&any).unwrap(); + let rx = UdpSocket::bind(&addr).unwrap(); + + info!("Joining group 227.1.1.100"); + let any = "0.0.0.0".parse().unwrap(); + rx.join_multicast_v4(&"227.1.1.100".parse().unwrap(), &any).unwrap(); + + info!("Joining group 227.1.1.101"); + rx.join_multicast_v4(&"227.1.1.101".parse().unwrap(), &any).unwrap(); + + info!("Registering SENDER"); + poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap(); + + info!("Registering LISTENER"); + poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + + let mut handler = UdpHandler::new(tx, rx, "hello world"); + + info!("Starting event loop to test with..."); + + while !handler.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.readiness().is_readable() { + handler.handle_read(&mut poll, event.token(), event.readiness()); + } + + if event.readiness().is_writable() { + handler.handle_write(&mut poll, event.token(), event.readiness()); + } + } + } +} diff --git a/third_party/rust/mio/test/test_notify.rs b/third_party/rust/mio/test/test_notify.rs new file mode 100644 index 0000000000..a6a8e51f67 --- /dev/null +++ b/third_party/rust/mio/test/test_notify.rs @@ -0,0 +1,192 @@ +use {localhost, sleep_ms}; +use mio::*; +use mio::deprecated::{EventLoop, EventLoopBuilder, Handler, Sender, NotifyError}; +use mio::net::TcpListener; +use std::thread; + +struct TestHandler { + sender: Sender<String>, + notify: usize +} + +impl TestHandler { + fn new(sender: Sender<String>) -> TestHandler { + TestHandler { + sender, + notify: 0 + } + } +} + +impl Handler for TestHandler { + type Timeout = usize; + type Message = String; + + fn notify(&mut self, event_loop: &mut EventLoop<TestHandler>, msg: String) { + match self.notify { + 0 => { + assert!(msg == "First", "actual={}", msg); + self.sender.send("Second".to_string()).unwrap(); + } + 1 => { + assert!(msg == "Second", "actual={}", msg); + event_loop.shutdown(); + } + v => panic!("unexpected value for notify; val={}", v) + } + + self.notify += 1; + } +} + +#[test] +pub fn test_notify() { + debug!("Starting TEST_NOTIFY"); + let mut event_loop = EventLoop::new().unwrap(); + + let addr = localhost(); + + // Setup a server socket so that the event loop blocks + let srv = TcpListener::bind(&addr).unwrap(); + + event_loop.register(&srv, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + let sender = event_loop.channel(); + + thread::spawn(move || { + sleep_ms(1_000); + sender.send("First".to_string()).unwrap(); + }); + + let sender = event_loop.channel(); + let mut handler = TestHandler::new(sender); + + // Start the event loop + event_loop.run(&mut handler).unwrap(); + + assert!(handler.notify == 2, "actual={}", handler.notify); +} + +#[test] +pub fn test_notify_capacity() { + use std::sync::mpsc::*; + use std::thread; + + struct Capacity(Receiver<i32>); + + impl Handler for Capacity { + type Message = i32; + type Timeout = (); + + fn notify(&mut self, event_loop: &mut EventLoop<Capacity>, msg: i32) { + if msg == 1 { + self.0.recv().unwrap(); + } else if msg == 3 { + event_loop.shutdown(); + } + } + } + + let mut builder = EventLoopBuilder::new(); + builder.notify_capacity(1); + + let (tx, rx) = channel::<i32>(); + let mut event_loop = builder.build().unwrap(); + let notify = event_loop.channel(); + + let handle = thread::spawn(move || { + let mut handler = Capacity(rx); + event_loop.run(&mut handler).unwrap(); + }); + + assert!(notify.send(1).is_ok()); + + loop { + if notify.send(2).is_err() { + break; + } + } + + tx.send(1).unwrap(); + + loop { + if notify.send(3).is_ok() { + break; + } + } + + handle.join().unwrap(); +} + +#[test] +pub fn test_notify_drop() { + use std::sync::mpsc::{self,Sender}; + use std::thread; + + struct MessageDrop(Sender<u8>); + + impl Drop for MessageDrop { + fn drop(&mut self) { + self.0.send(0).unwrap(); + } + } + + struct DummyHandler; + + impl Handler for DummyHandler { + type Timeout = (); + type Message = MessageDrop; + + fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: MessageDrop) { + msg.0.send(1).unwrap(); + drop(msg); + // We stop after the first message + event_loop.shutdown(); + } + } + + let (tx_notif_1, rx_notif_1) = mpsc::channel(); + let (tx_notif_2, rx_notif_2) = mpsc::channel(); + let (tx_notif_3, _unused) = mpsc::channel(); + let (tx_exit_loop, rx_exit_loop) = mpsc::channel(); + let (tx_drop_loop, rx_drop_loop) = mpsc::channel(); + + let mut event_loop = EventLoop::new().unwrap(); + let notify = event_loop.channel(); + + let handle = thread::spawn(move || { + let mut handler = DummyHandler; + event_loop.run(&mut handler).unwrap(); + + // Confirmation we exited the loop + tx_exit_loop.send(()).unwrap(); + + // Order to drop the loop + rx_drop_loop.recv().unwrap(); + drop(event_loop); + }); + notify.send(MessageDrop(tx_notif_1)).unwrap(); + assert_eq!(rx_notif_1.recv().unwrap(), 1); // Response from the loop + assert_eq!(rx_notif_1.recv().unwrap(), 0); // Drop notification + + // We wait for the event loop to exit before sending the second notification + rx_exit_loop.recv().unwrap(); + notify.send(MessageDrop(tx_notif_2)).unwrap(); + + // We ensure the message is indeed stuck in the queue + sleep_ms(100); + assert!(rx_notif_2.try_recv().is_err()); + + // Give the order to drop the event loop + tx_drop_loop.send(()).unwrap(); + assert_eq!(rx_notif_2.recv().unwrap(), 0); // Drop notification + + // Check that sending a new notification will return an error + // We should also get our message back + match notify.send(MessageDrop(tx_notif_3)).unwrap_err() { + NotifyError::Closed(Some(..)) => {} + _ => panic!(), + } + + handle.join().unwrap(); +} diff --git a/third_party/rust/mio/test/test_oneshot.rs b/third_party/rust/mio/test/test_oneshot.rs new file mode 100644 index 0000000000..4dca219b73 --- /dev/null +++ b/third_party/rust/mio/test/test_oneshot.rs @@ -0,0 +1,64 @@ +use mio::*; +use mio::net::{TcpListener, TcpStream}; +use std::io::*; +use std::time::Duration; + +const MS: u64 = 1_000; + +#[test] +pub fn test_tcp_edge_oneshot() { + let _ = ::env_logger::init(); + + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::level()).unwrap(); + + // Connect a socket, we are going to write to it + let mut s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + poll.register(&s1, Token(1), Ready::writable(), PollOpt::level()).unwrap(); + + wait_for(&mut poll, &mut events, Token(0)); + + // Get pair + let (mut s2, _) = l.accept().unwrap(); + poll.register(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + wait_for(&mut poll, &mut events, Token(1)); + + let res = s1.write(b"foo").unwrap(); + assert_eq!(3, res); + + let mut buf = [0; 1]; + + for byte in b"foo" { + wait_for(&mut poll, &mut events, Token(2)); + + assert_eq!(1, s2.read(&mut buf).unwrap()); + assert_eq!(*byte, buf[0]); + + poll.reregister(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + if *byte == b'o' { + poll.reregister(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + } +} + +fn wait_for(poll: &mut Poll, events: &mut Events, token: Token) { + loop { + poll.poll(events, Some(Duration::from_millis(MS))).unwrap(); + + let cnt = (0..events.len()).map(|i| events.get(i).unwrap()) + .filter(|e| e.token() == token) + .count(); + + assert!(cnt < 2, "token appeared multiple times in poll results; cnt={:}", cnt); + + if cnt == 1 { return }; + } +} diff --git a/third_party/rust/mio/test/test_poll.rs b/third_party/rust/mio/test/test_poll.rs new file mode 100644 index 0000000000..e259d89e24 --- /dev/null +++ b/third_party/rust/mio/test/test_poll.rs @@ -0,0 +1,18 @@ +use mio::*; +use std::time::Duration; + +#[test] +fn test_poll_closes_fd() { + for _ in 0..2000 { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(4); + let (registration, set_readiness) = Registration::new2(); + + poll.register(®istration, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap(); + + drop(poll); + drop(set_readiness); + drop(registration); + } +} diff --git a/third_party/rust/mio/test/test_poll_channel.rs b/third_party/rust/mio/test/test_poll_channel.rs new file mode 100644 index 0000000000..f7ce050537 --- /dev/null +++ b/third_party/rust/mio/test/test_poll_channel.rs @@ -0,0 +1,285 @@ +use {expect_events, sleep_ms}; +use mio::{channel, Events, Poll, PollOpt, Ready, Token}; +use mio::event::Event; +use std::sync::mpsc::TryRecvError; +use std::thread; +use std::time::Duration; + +#[test] +pub fn test_poll_channel_edge() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(1, num); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(1, num); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Read the value + rx.try_recv().unwrap(); + + // Drop the sender half + drop(tx); + + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(1, num); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + match rx.try_recv() { + Err(TryRecvError::Disconnected) => {} + no => panic!("unexpected value {:?}", no), + } + +} + +#[test] +pub fn test_poll_channel_oneshot() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(1, num); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Reregistering will re-trigger the notification + for _ in 0..3 { + poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(1, num); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Get the value + assert_eq!("goodbye", rx.try_recv().unwrap()); + + poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_level() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + for i in 0..5 { + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert!(1 == num, "actually got {} on iteration {}", num, i); + + let event = events.get(0).unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_writable() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_dropping_receive_before_poll() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); + + // Push the value + tx.send("hello").unwrap(); + + // Drop the receive end + drop(rx); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_mixing_channel_with_socket() { + use mio::net::{TcpListener, TcpStream}; + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + // Push a value onto the channel + tx.send("hello").unwrap(); + + // Connect a TCP socket + let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + + // Register the socket + poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); + + // Sleep a bit to ensure it arrives at dest + sleep_ms(250); + + expect_events(&poll, &mut events, 2, vec![ + Event::new(Ready::empty(), Token(0)), + Event::new(Ready::empty(), Token(1)), + ]); +} + +#[test] +pub fn test_sending_from_other_thread_while_polling() { + const ITERATIONS: usize = 20; + const THREADS: usize = 5; + + // Make sure to run multiple times + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + + for _ in 0..ITERATIONS { + let (tx, rx) = channel::channel(); + poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + + for _ in 0..THREADS { + let tx = tx.clone(); + + thread::spawn(move || { + sleep_ms(50); + tx.send("ping").unwrap(); + }); + } + + let mut recv = 0; + + while recv < THREADS { + let num = poll.poll(&mut events, None).unwrap(); + + if num != 0 { + assert_eq!(1, num); + assert_eq!(events.get(0).unwrap().token(), Token(0)); + + while let Ok(_) = rx.try_recv() { + recv += 1; + } + } + } + } +} diff --git a/third_party/rust/mio/test/test_register_deregister.rs b/third_party/rust/mio/test/test_register_deregister.rs new file mode 100644 index 0000000000..ae84a029ec --- /dev/null +++ b/third_party/rust/mio/test/test_register_deregister.rs @@ -0,0 +1,123 @@ +use {expect_events, localhost, TryWrite}; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::event::Event; +use mio::net::{TcpListener, TcpStream}; +use bytes::SliceBuf; +use std::time::Duration; + +const SERVER: Token = Token(0); +const CLIENT: Token = Token(1); + +struct TestHandler { + server: TcpListener, + client: TcpStream, + state: usize, +} + +impl TestHandler { + fn new(srv: TcpListener, cli: TcpStream) -> TestHandler { + TestHandler { + server: srv, + client: cli, + state: 0, + } + } + + fn handle_read(&mut self, poll: &mut Poll, token: Token) { + match token { + SERVER => { + trace!("handle_read; token=SERVER"); + let mut sock = self.server.accept().unwrap().0; + sock.try_write_buf(&mut SliceBuf::wrap(b"foobar")).unwrap(); + } + CLIENT => { + trace!("handle_read; token=CLIENT"); + assert!(self.state == 0, "unexpected state {}", self.state); + self.state = 1; + poll.reregister(&self.client, CLIENT, Ready::writable(), PollOpt::level()).unwrap(); + } + _ => panic!("unexpected token"), + } + } + + fn handle_write(&mut self, poll: &mut Poll, token: Token) { + debug!("handle_write; token={:?}; state={:?}", token, self.state); + + assert!(token == CLIENT, "unexpected token {:?}", token); + assert!(self.state == 1, "unexpected state {}", self.state); + + self.state = 2; + poll.deregister(&self.client).unwrap(); + poll.deregister(&self.server).unwrap(); + } +} + +#[test] +pub fn test_register_deregister() { + let _ = ::env_logger::init(); + + debug!("Starting TEST_REGISTER_DEREGISTER"); + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + + let addr = localhost(); + + let server = TcpListener::bind(&addr).unwrap(); + + info!("register server socket"); + poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap(); + + let client = TcpStream::connect(&addr).unwrap(); + + // Register client socket only as writable + poll.register(&client, CLIENT, Ready::readable(), PollOpt::level()).unwrap(); + + let mut handler = TestHandler::new(server, client); + + loop { + poll.poll(&mut events, None).unwrap(); + + if let Some(event) = events.get(0) { + if event.readiness().is_readable() { + handler.handle_read(&mut poll, event.token()); + } + + if event.readiness().is_writable() { + handler.handle_write(&mut poll, event.token()); + break; + } + } + } + + poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); + assert_eq!(events.len(), 0); +} + +#[test] +pub fn test_register_empty_interest() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let addr = localhost(); + + let sock = TcpListener::bind(&addr).unwrap(); + + poll.register(&sock, Token(0), Ready::empty(), PollOpt::edge()).unwrap(); + + let client = TcpStream::connect(&addr).unwrap(); + + // The connect is not guaranteed to have started until it is registered + // https://docs.rs/mio/0.6.10/mio/struct.Poll.html#registering-handles + poll.register(&client, Token(1), Ready::empty(), PollOpt::edge()).unwrap(); + + // sock is registered with empty interest, we should not receive any event + poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); + assert_eq!(events.len(), 0, "Received unexpected event: {:?}", events.get(0).unwrap()); + + // now sock is reregistered with readable, we should receive the pending event + poll.reregister(&sock, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + expect_events(&poll, &mut events, 2, vec![ + Event::new(Ready::readable(), Token(0)) + ]); + + poll.reregister(&sock, Token(0), Ready::empty(), PollOpt::edge()).unwrap(); +} diff --git a/third_party/rust/mio/test/test_register_multiple_event_loops.rs b/third_party/rust/mio/test/test_register_multiple_event_loops.rs new file mode 100644 index 0000000000..9204afaf68 --- /dev/null +++ b/third_party/rust/mio/test/test_register_multiple_event_loops.rs @@ -0,0 +1,63 @@ +use localhost; +use mio::*; +use mio::net::{TcpListener, TcpStream, UdpSocket}; +use std::io::ErrorKind; + +#[test] +fn test_tcp_register_multiple_event_loops() { + let addr = localhost(); + let listener = TcpListener::bind(&addr).unwrap(); + + let poll1 = Poll::new().unwrap(); + poll1.register(&listener, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + let poll2 = Poll::new().unwrap(); + + // Try registering the same socket with the initial one + let res = poll2.register(&listener, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); + + // Try cloning the socket and registering it again + let listener2 = listener.try_clone().unwrap(); + let res = poll2.register(&listener2, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); + + // Try the stream + let stream = TcpStream::connect(&addr).unwrap(); + + poll1.register(&stream, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + let res = poll2.register(&stream, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); + + // Try cloning the socket and registering it again + let stream2 = stream.try_clone().unwrap(); + let res = poll2.register(&stream2, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); +} + +#[test] +fn test_udp_register_multiple_event_loops() { + let addr = localhost(); + let socket = UdpSocket::bind(&addr).unwrap(); + + let poll1 = Poll::new().unwrap(); + poll1.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + let poll2 = Poll::new().unwrap(); + + // Try registering the same socket with the initial one + let res = poll2.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); + + // Try cloning the socket and registering it again + let socket2 = socket.try_clone().unwrap(); + let res = poll2.register(&socket2, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Other); +} diff --git a/third_party/rust/mio/test/test_reregister_without_poll.rs b/third_party/rust/mio/test/test_reregister_without_poll.rs new file mode 100644 index 0000000000..45d5aca49c --- /dev/null +++ b/third_party/rust/mio/test/test_reregister_without_poll.rs @@ -0,0 +1,28 @@ +use {sleep_ms}; +use mio::*; +use mio::net::{TcpListener, TcpStream}; +use std::time::Duration; + +const MS: u64 = 1_000; + +#[test] +pub fn test_reregister_different_without_poll() { + let mut events = Events::with_capacity(1024); + let poll = Poll::new().unwrap(); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); + + sleep_ms(MS); + + poll.reregister(&l, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap(); + assert_eq!(events.len(), 0); +} diff --git a/third_party/rust/mio/test/test_smoke.rs b/third_party/rust/mio/test/test_smoke.rs new file mode 100644 index 0000000000..96f7d3c9e4 --- /dev/null +++ b/third_party/rust/mio/test/test_smoke.rs @@ -0,0 +1,23 @@ +extern crate mio; + +use mio::{Events, Poll, Token, Ready, PollOpt}; +use mio::net::TcpListener; +use std::time::Duration; + +#[test] +fn run_once_with_nothing() { + let mut events = Events::with_capacity(1024); + let poll = Poll::new().unwrap(); + poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); +} + +#[test] +fn add_then_drop() { + let mut events = Events::with_capacity(1024); + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let poll = Poll::new().unwrap(); + poll.register(&l, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + drop(l); + poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); + +} diff --git a/third_party/rust/mio/test/test_subprocess_pipe.rs b/third_party/rust/mio/test/test_subprocess_pipe.rs new file mode 100644 index 0000000000..2bcf132486 --- /dev/null +++ b/third_party/rust/mio/test/test_subprocess_pipe.rs @@ -0,0 +1,249 @@ +use {TryRead, TryWrite}; +use std::mem; +use mio::*; +use std::io; +use mio::deprecated::{EventLoop, Handler}; +use mio::deprecated::unix::{PipeReader, PipeWriter}; +use std::process::{Command, Stdio, Child}; + + +struct SubprocessClient { + stdin: Option<PipeWriter>, + stdout: Option<PipeReader>, + stderr: Option<PipeReader>, + stdin_token : Token, + stdout_token : Token, + stderr_token : Token, + output : Vec<u8>, + output_stderr : Vec<u8>, + input : Vec<u8>, + input_offset : usize, + buf : [u8; 65536], +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl SubprocessClient { + fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient { + SubprocessClient { + stdin: stdin, + stdout: stdout, + stderr: stderr, + stdin_token : Token(0), + stdout_token : Token(1), + stderr_token : Token(2), + output : Vec::<u8>::new(), + output_stderr : Vec::<u8>::new(), + buf : [0; 65536], + input : data.to_vec(), + input_offset : 0, + } + } + + fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> { + let mut eof = false; + match self.stdout { + None => unreachable!(), + Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) { + Ok(None) => { + } + Ok(Some(r)) => { + if r == 0 { + eof = true; + } else { + self.output.extend(&self.buf[0..r]); + } + } + Err(e) => { + return Err(e); + } + } + }; + if eof { + drop(self.stdout.take()); + match self.stderr { + None => event_loop.shutdown(), + Some(_) => {}, + } + } + return Ok(()); + } + + fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> { + let mut eof = false; + match self.stderr { + None => unreachable!(), + Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) { + Ok(None) => { + } + Ok(Some(r)) => { + if r == 0 { + eof = true; + } else { + self.output_stderr.extend(&self.buf[0..r]); + } + } + Err(e) => { + return Err(e); + } + } + }; + if eof { + drop(self.stderr.take()); + match self.stdout { + None => event_loop.shutdown(), + Some(_) => {}, + } + } + return Ok(()); + } + + fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> { + let mut ok = true; + match self.stdin { + None => unreachable!(), + Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) { + Ok(None) => { + }, + Ok(Some(r)) => { + if r == 0 { + ok = false; + } else { + self.input_offset += r; + } + }, + Err(_) => { + ok = false; + }, + } + } + if self.input_offset == self.input.len() || !ok { + drop(self.stdin.take()); + match self.stderr { + None => match self.stdout { + None => event_loop.shutdown(), + Some(_) => {}, + }, + Some(_) => {}, + } + } + return Ok(()); + } + +} + +impl Handler for SubprocessClient { + type Timeout = usize; + type Message = (); + + fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token, + _: Ready) { + if token == self.stderr_token { + let _x = self.readable_stderr(event_loop); + } else { + let _x = self.readable(event_loop); + } + if token == self.stdin_token { + let _y = self.writable(event_loop); + } + } +} + + + + +const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096]; +pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) { + let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap(); + let stdin : Option<PipeWriter>; + let stdin_exists : bool; + match process.stdin { + None => stdin_exists = false, + Some(_) => stdin_exists = true, + } + if stdin_exists { + match PipeWriter::from_stdin(process.stdin.take().unwrap()) { + Err(e) => panic!(e), + Ok(pipe) => stdin = Some(pipe), + } + } else { + stdin = None; + } + let stdout_exists : bool; + let stdout : Option<PipeReader>; + match process.stdout { + None => stdout_exists = false, + Some(_) => stdout_exists = true, + } + if stdout_exists { + match PipeReader::from_stdout(process.stdout.take().unwrap()) { + Err(e) => panic!(e), + Ok(pipe) => stdout = Some(pipe), + } + } else { + stdout = None; + } + let stderr_exists : bool; + let stderr : Option<PipeReader>; + match process.stderr { + None => stderr_exists = false, + Some(_) => stderr_exists = true, + } + if stderr_exists { + match PipeReader::from_stderr(process.stderr.take().unwrap()) { + Err(e) => panic!(e), + Ok(pipe) => stderr = Some(pipe), + } + } else { + stderr = None + } + + let mut subprocess = SubprocessClient::new(stdin, + stdout, + stderr, + input); + match subprocess.stdout { + Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(), + PollOpt::level()).unwrap(), + None => {}, + } + + match subprocess.stderr { + Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(), + PollOpt::level()).unwrap(), + None => {}, + } + + // Connect to the server + match subprocess.stdin { + Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(), + PollOpt::level()).unwrap(), + None => {}, + } + + // Start the event loop + event_loop.run(&mut subprocess).unwrap(); + let _ = process.wait(); + + let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new()); + let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new()); + return (ret_stdout, ret_stderr); +} + +#[test] +fn test_subprocess_pipe() { + let process = + Command::new("/bin/cat") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn().unwrap(); + let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]); + assert_eq!(TEST_DATA.len(), ret_stdout.len()); + assert_eq!(0usize, ret_stderr.len()); + let mut i : usize = 0; + for item in TEST_DATA.iter() { + assert_eq!(*item, ret_stdout[i]); + i += 1; + } +} diff --git a/third_party/rust/mio/test/test_tcp.rs b/third_party/rust/mio/test/test_tcp.rs new file mode 100644 index 0000000000..ae569ac5e8 --- /dev/null +++ b/third_party/rust/mio/test/test_tcp.rs @@ -0,0 +1,660 @@ +use std::cmp; +use std::io::prelude::*; +use std::io; +use std::net; +use std::sync::mpsc::channel; +use std::thread; +use std::time::Duration; + +use net2::{self, TcpStreamExt}; + +use {TryRead, TryWrite}; +use mio::{Token, Ready, PollOpt, Poll, Events}; +use iovec::IoVec; +use mio::net::{TcpListener, TcpStream}; + +#[test] +fn accept() { + struct H { hit: bool, listener: TcpListener, shutdown: bool } + + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + net::TcpStream::connect(&addr).unwrap(); + }); + + let poll = Poll::new().unwrap(); + + poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { hit: false, listener: l, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + h.hit = true; + assert_eq!(event.token(), Token(1)); + assert!(event.readiness().is_readable()); + assert!(h.listener.accept().is_ok()); + h.shutdown = true; + } + } + assert!(h.hit); + assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock); + t.join().unwrap(); +} + +#[test] +fn connect() { + struct H { hit: u32, shutdown: bool } + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + let s = l.accept().unwrap(); + rx.recv().unwrap(); + drop(s); + tx2.send(()).unwrap(); + }); + + let poll = Poll::new().unwrap(); + let s = TcpStream::connect(&addr).unwrap(); + + poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { hit: 0, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + assert_eq!(event.token(), Token(1)); + match h.hit { + 0 => assert!(event.readiness().is_writable()), + 1 => assert!(event.readiness().is_readable()), + _ => panic!(), + } + h.hit += 1; + h.shutdown = true; + } + } + assert_eq!(h.hit, 1); + tx.send(()).unwrap(); + rx2.recv().unwrap(); + h.shutdown = false; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + assert_eq!(event.token(), Token(1)); + match h.hit { + 0 => assert!(event.readiness().is_writable()), + 1 => assert!(event.readiness().is_readable()), + _ => panic!(), + } + h.hit += 1; + h.shutdown = true; + } + } + assert_eq!(h.hit, 2); + t.join().unwrap(); +} + +#[test] +fn read() { + const N: usize = 16 * 1024 * 1024; + struct H { amt: usize, socket: TcpStream, shutdown: bool } + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let b = [0; 1024]; + let mut amt = 0; + while amt < N { + amt += s.write(&b).unwrap(); + } + }); + + let poll = Poll::new().unwrap(); + let s = TcpStream::connect(&addr).unwrap(); + + poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { amt: 0, socket: s, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + assert_eq!(event.token(), Token(1)); + let mut b = [0; 1024]; + loop { + if let Some(amt) = h.socket.try_read(&mut b).unwrap() { + h.amt += amt; + } else { + break + } + if h.amt >= N { + h.shutdown = true; + break + } + } + } + } + t.join().unwrap(); +} + +#[test] +fn peek() { + const N: usize = 16 * 1024 * 1024; + struct H { amt: usize, socket: TcpStream, shutdown: bool } + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let b = [0; 1024]; + let mut amt = 0; + while amt < N { + amt += s.write(&b).unwrap(); + } + }); + + let poll = Poll::new().unwrap(); + let s = TcpStream::connect(&addr).unwrap(); + + poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { amt: 0, socket: s, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + assert_eq!(event.token(), Token(1)); + let mut b = [0; 1024]; + match h.socket.peek(&mut b) { + Ok(_) => (), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + continue + }, + Err(e) => panic!("unexpected error: {:?}", e), + } + + loop { + if let Some(amt) = h.socket.try_read(&mut b).unwrap() { + h.amt += amt; + } else { + break + } + if h.amt >= N { + h.shutdown = true; + break + } + } + } + } + t.join().unwrap(); +} + +#[test] +fn read_bufs() { + const N: usize = 16 * 1024 * 1024; + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let b = [1; 1024]; + let mut amt = 0; + while amt < N { + amt += s.write(&b).unwrap(); + } + }); + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + + let s = TcpStream::connect(&addr).unwrap(); + + poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap(); + + let b1 = &mut [0; 10][..]; + let b2 = &mut [0; 383][..]; + let b3 = &mut [0; 28][..]; + let b4 = &mut [0; 8][..]; + let b5 = &mut [0; 128][..]; + let mut b: [&mut IoVec; 5] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + ]; + + let mut so_far = 0; + loop { + for buf in b.iter_mut() { + for byte in buf.as_mut_bytes() { + *byte = 0; + } + } + + poll.poll(&mut events, None).unwrap(); + + match s.read_bufs(&mut b) { + Ok(0) => { + assert_eq!(so_far, N); + break + } + Ok(mut n) => { + so_far += n; + for buf in b.iter() { + let buf = buf.as_bytes(); + for byte in buf[..cmp::min(n, buf.len())].iter() { + assert_eq!(*byte, 1); + } + n = n.saturating_sub(buf.len()); + if n == 0 { + break + } + } + assert_eq!(n, 0); + } + Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock), + } + } + + t.join().unwrap(); +} + +#[test] +fn write() { + const N: usize = 16 * 1024 * 1024; + struct H { amt: usize, socket: TcpStream, shutdown: bool } + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let mut b = [0; 1024]; + let mut amt = 0; + while amt < N { + amt += s.read(&mut b).unwrap(); + } + }); + + let poll = Poll::new().unwrap(); + let s = TcpStream::connect(&addr).unwrap(); + + poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { amt: 0, socket: s, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + assert_eq!(event.token(), Token(1)); + let b = [0; 1024]; + loop { + if let Some(amt) = h.socket.try_write(&b).unwrap() { + h.amt += amt; + } else { + break + } + if h.amt >= N { + h.shutdown = true; + break + } + } + } + } + t.join().unwrap(); +} + +#[test] +fn write_bufs() { + const N: usize = 16 * 1024 * 1024; + + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let mut b = [0; 1024]; + let mut amt = 0; + while amt < N { + for byte in b.iter_mut() { + *byte = 0; + } + let n = s.read(&mut b).unwrap(); + amt += n; + for byte in b[..n].iter() { + assert_eq!(*byte, 1); + } + } + }); + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + let s = TcpStream::connect(&addr).unwrap(); + poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap(); + + let b1 = &[1; 10][..]; + let b2 = &[1; 383][..]; + let b3 = &[1; 28][..]; + let b4 = &[1; 8][..]; + let b5 = &[1; 128][..]; + let b: [&IoVec; 5] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + ]; + + let mut so_far = 0; + while so_far < N { + poll.poll(&mut events, None).unwrap(); + + match s.write_bufs(&b) { + Ok(n) => so_far += n, + Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock), + } + } + + t.join().unwrap(); +} + +#[test] +fn connect_then_close() { + struct H { listener: TcpListener, shutdown: bool } + + let poll = Poll::new().unwrap(); + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + + poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(128); + + let mut h = H { listener: l, shutdown: false }; + while !h.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.token() == Token(1) { + let s = h.listener.accept().unwrap().0; + poll.register(&s, Token(3), Ready::readable() | Ready::writable(), + PollOpt::edge()).unwrap(); + drop(s); + } else if event.token() == Token(2) { + h.shutdown = true; + } + } + } +} + +#[test] +fn listen_then_close() { + let poll = Poll::new().unwrap(); + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + drop(l); + + let mut events = Events::with_capacity(128); + + poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); + + for event in &events { + if event.token() == Token(1) { + panic!("recieved ready() on a closed TcpListener") + } + } +} + +fn assert_send<T: Send>() { +} + +fn assert_sync<T: Sync>() { +} + +#[test] +fn test_tcp_sockets_are_send() { + assert_send::<TcpListener>(); + assert_send::<TcpStream>(); + assert_sync::<TcpListener>(); + assert_sync::<TcpStream>(); +} + +#[test] +fn bind_twice_bad() { + let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = l1.local_addr().unwrap(); + assert!(TcpListener::bind(&addr).is_err()); +} + +#[test] +fn multiple_writes_immediate_success() { + const N: usize = 16; + let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = l.local_addr().unwrap(); + + let t = thread::spawn(move || { + let mut s = l.accept().unwrap().0; + let mut b = [0; 1024]; + let mut amt = 0; + while amt < 1024*N { + for byte in b.iter_mut() { + *byte = 0; + } + let n = s.read(&mut b).unwrap(); + amt += n; + for byte in b[..n].iter() { + assert_eq!(*byte, 1); + } + } + }); + + let poll = Poll::new().unwrap(); + let mut s = TcpStream::connect(&addr).unwrap(); + poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap(); + let mut events = Events::with_capacity(16); + + // Wait for our TCP stream to connect + 'outer: loop { + poll.poll(&mut events, None).unwrap(); + for event in events.iter() { + if event.token() == Token(1) && event.readiness().is_writable() { + break 'outer + } + } + } + + for _ in 0..N { + s.write_all(&[1; 1024]).unwrap(); + } + + t.join().unwrap(); +} + +#[test] +fn connection_reset_by_peer() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(16); + let mut buf = [0u8; 16]; + + // Create listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = l.local_addr().unwrap(); + + // Connect client + let client = net2::TcpBuilder::new_v4().unwrap() + .to_tcp_stream().unwrap(); + + client.set_linger(Some(Duration::from_millis(0))).unwrap(); + client.connect(&addr).unwrap(); + + // Convert to Mio stream + let client = TcpStream::from_stream(client).unwrap(); + + // Register server + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + + // Register interest in the client + poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); + + // Wait for listener to be ready + let mut server; + 'outer: + loop { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.token() == Token(0) { + match l.accept() { + Ok((sock, _)) => { + server = sock; + break 'outer; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => panic!("unexpected error {:?}", e), + } + } + } + } + + // Close the connection + drop(client); + + // Wait a moment + thread::sleep(Duration::from_millis(100)); + + // Register interest in the server socket + poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap(); + + + loop { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.token() == Token(3) { + assert!(event.readiness().is_readable()); + + match server.read(&mut buf) { + Ok(0) | + Err(_) => {}, + + Ok(x) => panic!("expected empty buffer but read {} bytes", x), + } + return; + } + } + } + +} + +#[test] +#[cfg_attr(target_os = "fuchsia", ignore)] +fn connect_error() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(16); + + // Pick a "random" port that shouldn't be in use. + let l = match TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()) { + Ok(l) => l, + Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => { + // Connection failed synchronously. This is not a bug, but it + // unfortunately doesn't get us the code coverage we want. + return; + }, + Err(e) => panic!("TcpStream::connect unexpected error {:?}", e) + }; + + poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap(); + + 'outer: + loop { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.token() == Token(0) { + assert!(event.readiness().is_writable()); + break 'outer + } + } + } + + assert!(l.take_error().unwrap().is_some()); +} + +#[test] +fn write_error() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(16); + let (tx, rx) = channel(); + + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let t = thread::spawn(move || { + let (conn, _addr) = listener.accept().unwrap(); + rx.recv().unwrap(); + drop(conn); + }); + + let mut s = TcpStream::connect(&addr).unwrap(); + poll.register(&s, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge()).unwrap(); + + let mut wait_writable = || { + 'outer: + loop { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.token() == Token(0) && event.readiness().is_writable() { + break 'outer + } + } + } + }; + + wait_writable(); + + tx.send(()).unwrap(); + t.join().unwrap(); + + let buf = [0; 1024]; + loop { + match s.write(&buf) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + wait_writable() + } + Err(e) => { + println!("good error: {}", e); + break + } + } + } +} diff --git a/third_party/rust/mio/test/test_tcp_level.rs b/third_party/rust/mio/test/test_tcp_level.rs new file mode 100644 index 0000000000..c384caac53 --- /dev/null +++ b/third_party/rust/mio/test/test_tcp_level.rs @@ -0,0 +1,142 @@ +use {expect_events, sleep_ms, TryRead}; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::event::Event; +use mio::net::{TcpListener, TcpStream}; +use std::io::Write; +use std::time::Duration; + +const MS: u64 = 1_000; + +#[test] +pub fn test_tcp_listener_level_triggered() { + let poll = Poll::new().unwrap(); + let mut pevents = Events::with_capacity(1024); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::level()).unwrap(); + + let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + poll.register(&s1, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + while filter(&pevents, Token(0)).is_empty() { + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + } + let events = filter(&pevents, Token(0)); + + assert_eq!(events.len(), 1); + assert_eq!(events[0], Event::new(Ready::readable(), Token(0))); + + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + let events = filter(&pevents, Token(0)); + assert_eq!(events.len(), 1); + assert_eq!(events[0], Event::new(Ready::readable(), Token(0))); + + // Accept the connection then test that the events stop + let _ = l.accept().unwrap(); + + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + let events = filter(&pevents, Token(0)); + assert!(events.is_empty(), "actual={:?}", events); + + let s3 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + poll.register(&s3, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); + + while filter(&pevents, Token(0)).is_empty() { + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + } + let events = filter(&pevents, Token(0)); + + assert_eq!(events.len(), 1); + assert_eq!(events[0], Event::new(Ready::readable(), Token(0))); + + drop(l); + + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + let events = filter(&pevents, Token(0)); + assert!(events.is_empty()); +} + +#[test] +pub fn test_tcp_stream_level_triggered() { + drop(::env_logger::init()); + let poll = Poll::new().unwrap(); + let mut pevents = Events::with_capacity(1024); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); + + let mut s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + poll.register(&s1, Token(1), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap(); + + // Sleep a bit to ensure it arrives at dest + sleep_ms(250); + + expect_events(&poll, &mut pevents, 2, vec![ + Event::new(Ready::readable(), Token(0)), + Event::new(Ready::writable(), Token(1)), + ]); + + // Server side of socket + let (mut s1_tx, _) = l.accept().unwrap(); + + // Sleep a bit to ensure it arrives at dest + sleep_ms(250); + + expect_events(&poll, &mut pevents, 2, vec![ + Event::new(Ready::writable(), Token(1)) + ]); + + // Register the socket + poll.register(&s1_tx, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); + + debug!("writing some data ----------"); + + // Write some data + let res = s1_tx.write(b"hello world!"); + assert!(res.unwrap() > 0); + + // Sleep a bit to ensure it arrives at dest + sleep_ms(250); + + debug!("looking at rx end ----------"); + + // Poll rx end + expect_events(&poll, &mut pevents, 2, vec![ + Event::new(Ready::readable(), Token(1)) + ]); + + debug!("reading ----------"); + + // Reading the data should clear it + let mut res = vec![]; + while s1.try_read_buf(&mut res).unwrap().is_some() { + } + + assert_eq!(res, b"hello world!"); + + debug!("checking just read ----------"); + + expect_events(&poll, &mut pevents, 1, vec![ + Event::new(Ready::writable(), Token(1))]); + + // Closing the socket clears all active level events + drop(s1); + + debug!("checking everything is gone ----------"); + + poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap(); + let events = filter(&pevents, Token(1)); + assert!(events.is_empty()); +} + +fn filter(events: &Events, token: Token) -> Vec<Event> { + (0..events.len()).map(|i| events.get(i).unwrap()) + .filter(|e| e.token() == token) + .collect() +} diff --git a/third_party/rust/mio/test/test_tcp_shutdown.rs b/third_party/rust/mio/test/test_tcp_shutdown.rs new file mode 100644 index 0000000000..9363f83401 --- /dev/null +++ b/third_party/rust/mio/test/test_tcp_shutdown.rs @@ -0,0 +1,248 @@ +use std::collections::HashMap; +use std::net::{self, Shutdown}; +use std::time::{Duration, Instant}; + +use mio::{Token, Ready, PollOpt, Poll, Events}; +use mio::event::{Evented, Event}; +use mio::net::TcpStream; + +struct TestPoll { + poll: Poll, + events: Events, + buf: HashMap<Token, Ready>, +} + +impl TestPoll { + fn new() -> TestPoll { + TestPoll { + poll: Poll::new().unwrap(), + events: Events::with_capacity(1024), + buf: HashMap::new(), + } + } + + fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) + where E: Evented + { + self.poll.register(handle, token, interest, opts).unwrap(); + } + + fn wait_for(&mut self, token: Token, ready: Ready) -> Result<(), &'static str> { + let now = Instant::now(); + + loop { + if now.elapsed() > Duration::from_secs(1) { + return Err("not ready"); + } + + if let Some(curr) = self.buf.get(&token) { + if curr.contains(ready) { + break; + } + } + + self.poll.poll(&mut self.events, Some(Duration::from_millis(250))).unwrap(); + + for event in &self.events { + let curr = self.buf.entry(event.token()) + .or_insert(Ready::empty()); + + *curr |= event.readiness(); + } + } + + *self.buf.get_mut(&token).unwrap() -= ready; + Ok(()) + } + + fn check_idle(&mut self) -> Result<(), Event> { + self.poll.poll(&mut self.events, Some(Duration::from_millis(100))).unwrap(); + + if let Some(e) = self.events.iter().next() { + Err(e) + } else { + Ok(()) + } + } +} + +macro_rules! assert_ready { + ($poll:expr, $token:expr, $ready:expr) => {{ + match $poll.wait_for($token, $ready) { + Ok(_) => {} + Err(_) => panic!("not ready; token = {:?}; interest = {:?}", $token, $ready), + } + }} +} + +macro_rules! assert_not_ready { + ($poll:expr, $token:expr, $ready:expr) => {{ + match $poll.wait_for($token, $ready) { + Ok(_) => panic!("is ready; token = {:?}; interest = {:?}", $token, $ready), + Err(_) => {} + } + }} +} + +macro_rules! assert_hup_ready { + ($poll:expr) => { + #[cfg(unix)] + { + use mio::unix::UnixReady; + assert_ready!($poll, Token(0), Ready::from(UnixReady::hup())) + } + } +} + +macro_rules! assert_not_hup_ready { + ($poll:expr) => { + #[cfg(unix)] + { + use mio::unix::UnixReady; + assert_not_ready!($poll, Token(0), Ready::from(UnixReady::hup())) + } + } +} + +macro_rules! assert_idle { + ($poll:expr) => { + match $poll.check_idle() { + Ok(()) => {} + Err(e) => panic!("not idle; event = {:?}", e), + } + } +} + +// TODO: replace w/ assertive +// https://github.com/carllerche/assertive +macro_rules! assert_ok { + ($e:expr) => { + assert_ok!($e,) + }; + ($e:expr,) => {{ + use std::result::Result::*; + match $e { + Ok(v) => v, + Err(e) => panic!("assertion failed: error = {:?}", e), + } + }}; + ($e:expr, $($arg:tt)+) => {{ + use std::result::Result::*; + match $e { + Ok(v) => v, + Err(e) => panic!("assertion failed: error = {:?}: {}", e, format_args!($($arg)+)), + } + }}; +} + +#[test] +fn test_write_shutdown() { + use std::io::prelude::*; + + let mut poll = TestPoll::new(); + let mut buf = [0; 1024]; + + let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0")); + let addr = assert_ok!(listener.local_addr()); + + let mut client = assert_ok!(TcpStream::connect(&addr)); + poll.register(&client, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge()); + + let (socket, _) = assert_ok!(listener.accept()); + + assert_ready!(poll, Token(0), Ready::writable()); + + // Polling should not have any events + assert_idle!(poll); + + // Now, shutdown the write half of the socket. + assert_ok!(socket.shutdown(Shutdown::Write)); + + assert_ready!(poll, Token(0), Ready::readable()); + + assert_not_hup_ready!(poll); + + let n = assert_ok!(client.read(&mut buf)); + assert_eq!(n, 0); +} + +#[test] +fn test_graceful_shutdown() { + use std::io::prelude::*; + + let mut poll = TestPoll::new(); + let mut buf = [0; 1024]; + + let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0")); + let addr = assert_ok!(listener.local_addr()); + + let mut client = assert_ok!(TcpStream::connect(&addr)); + poll.register(&client, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge()); + + let (mut socket, _) = assert_ok!(listener.accept()); + + assert_ready!(poll, Token(0), Ready::writable()); + + // Polling should not have any events + assert_idle!(poll); + + // Now, shutdown the write half of the socket. + assert_ok!(client.shutdown(Shutdown::Write)); + + let n = assert_ok!(socket.read(&mut buf)); + assert_eq!(0, n); + drop(socket); + + assert_ready!(poll, Token(0), Ready::readable()); + #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos", + target_os = "netbsd", target_os = "openbsd")))] + assert_hup_ready!(poll); + + let mut buf = [0; 1024]; + let n = assert_ok!(client.read(&mut buf)); + assert_eq!(n, 0); +} + +#[test] +fn test_abrupt_shutdown() { + use net2::TcpStreamExt; + use std::io::Read; + + let mut poll = TestPoll::new(); + let mut buf = [0; 1024]; + + let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0")); + let addr = assert_ok!(listener.local_addr()); + + let mut client = assert_ok!(TcpStream::connect(&addr)); + poll.register(&client, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge()); + + let (socket, _) = assert_ok!(listener.accept()); + assert_ok!(socket.set_linger(Some(Duration::from_millis(0)))); + // assert_ok!(socket.set_linger(None)); + + // Wait to be connected + assert_ready!(poll, Token(0), Ready::writable()); + + drop(socket); + + #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos", + target_os = "netbsd", target_os = "openbsd")))] + assert_hup_ready!(poll); + assert_ready!(poll, Token(0), Ready::writable()); + assert_ready!(poll, Token(0), Ready::readable()); + + let res = client.read(&mut buf); + assert!(res.is_err(), "not err = {:?}", res); +} diff --git a/third_party/rust/mio/test/test_tick.rs b/third_party/rust/mio/test/test_tick.rs new file mode 100644 index 0000000000..8c4cec9b25 --- /dev/null +++ b/third_party/rust/mio/test/test_tick.rs @@ -0,0 +1,64 @@ +use mio::*; +use mio::deprecated::{EventLoop, Handler}; +use mio::net::{TcpListener, TcpStream}; +use {sleep_ms}; + +struct TestHandler { + tick: usize, + state: usize, +} + +impl TestHandler { + fn new() -> TestHandler { + TestHandler { + tick: 0, + state: 0, + } + } +} + +impl Handler for TestHandler { + type Timeout = usize; + type Message = String; + + fn tick(&mut self, _event_loop: &mut EventLoop<TestHandler>) { + debug!("Handler::tick()"); + self.tick += 1; + + assert_eq!(self.state, 1); + self.state = 0; + } + + fn ready(&mut self, _event_loop: &mut EventLoop<TestHandler>, token: Token, events: Ready) { + debug!("READY: {:?} - {:?}", token, events); + if events.is_readable() { + debug!("Handler::ready() readable event"); + assert_eq!(token, Token(0)); + assert_eq!(self.state, 0); + self.state = 1; + } + } +} + +#[test] +pub fn test_tick() { + debug!("Starting TEST_TICK"); + let mut event_loop = EventLoop::new().expect("Couldn't make event loop"); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + event_loop.register(&listener, Token(0), Ready::readable(), PollOpt::level()).unwrap(); + + let client = TcpStream::connect(&listener.local_addr().unwrap()).unwrap(); + event_loop.register(&client, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); + + sleep_ms(250); + + let mut handler = TestHandler::new(); + + for _ in 0..2 { + event_loop.run_once(&mut handler, None).unwrap(); + } + + assert!(handler.tick == 2, "actual={}", handler.tick); + assert!(handler.state == 0, "actual={}", handler.state); +} diff --git a/third_party/rust/mio/test/test_udp_level.rs b/third_party/rust/mio/test/test_udp_level.rs new file mode 100644 index 0000000000..7e19d54b3e --- /dev/null +++ b/third_party/rust/mio/test/test_udp_level.rs @@ -0,0 +1,52 @@ +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::event::Event; +use mio::net::UdpSocket; +use {expect_events, sleep_ms}; + +#[test] +pub fn test_udp_level_triggered() { + let poll = Poll::new().unwrap(); + let poll = &poll; + let mut events = Events::with_capacity(1024); + let events = &mut events; + + // Create the listener + let tx = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let rx = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + poll.register(&tx, Token(0), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap(); + poll.register(&rx, Token(1), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap(); + + + for _ in 0..2 { + expect_events(poll, events, 2, vec![ + Event::new(Ready::writable(), Token(0)), + Event::new(Ready::writable(), Token(1)), + ]); + } + + tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap(); + + sleep_ms(250); + + for _ in 0..2 { + expect_events(poll, events, 2, vec![ + Event::new(Ready::readable() | Ready::writable(), Token(1)) + ]); + } + + let mut buf = [0; 200]; + while rx.recv_from(&mut buf).is_ok() {} + + for _ in 0..2 { + expect_events(poll, events, 4, vec![Event::new(Ready::writable(), Token(1))]); + } + + tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap(); + sleep_ms(250); + + expect_events(poll, events, 10, + vec![Event::new(Ready::readable() | Ready::writable(), Token(1))]); + + drop(rx); +} diff --git a/third_party/rust/mio/test/test_udp_socket.rs b/third_party/rust/mio/test/test_udp_socket.rs new file mode 100644 index 0000000000..c3311592ac --- /dev/null +++ b/third_party/rust/mio/test/test_udp_socket.rs @@ -0,0 +1,252 @@ +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio::net::UdpSocket; +use bytes::{Buf, RingBuf, SliceBuf, MutBuf}; +use std::io::ErrorKind; +use std::str; +use std::time; +use localhost; +use iovec::IoVec; + +const LISTENER: Token = Token(0); +const SENDER: Token = Token(1); + +pub struct UdpHandlerSendRecv { + tx: UdpSocket, + rx: UdpSocket, + msg: &'static str, + buf: SliceBuf<'static>, + rx_buf: RingBuf, + connected: bool, + shutdown: bool, +} + +impl UdpHandlerSendRecv { + fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv { + UdpHandlerSendRecv { + tx, + rx, + msg, + buf: SliceBuf::wrap(msg.as_bytes()), + rx_buf: RingBuf::new(1024), + connected, + shutdown: false, + } + } +} + +fn assert_send<T: Send>() { +} + +fn assert_sync<T: Sync>() { +} + +#[cfg(test)] +fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) { + debug!("Starting TEST_UDP_SOCKETS"); + let poll = Poll::new().unwrap(); + + assert_send::<UdpSocket>(); + assert_sync::<UdpSocket>(); + + // ensure that the sockets are non-blocking + let mut buf = [0; 128]; + assert_eq!(ErrorKind::WouldBlock, rx.recv_from(&mut buf).unwrap_err().kind()); + + info!("Registering SENDER"); + poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap(); + + info!("Registering LISTENER"); + poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + + info!("Starting event loop to test with..."); + let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world"); + + while !handler.shutdown { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.readiness().is_readable() { + if let LISTENER = event.token() { + debug!("We are receiving a datagram now..."); + let cnt = unsafe { + if !handler.connected { + handler.rx.recv_from(handler.rx_buf.mut_bytes()).unwrap().0 + } else { + handler.rx.recv(handler.rx_buf.mut_bytes()).unwrap() + } + }; + + unsafe { MutBuf::advance(&mut handler.rx_buf, cnt); } + assert!(str::from_utf8(handler.rx_buf.bytes()).unwrap() == handler.msg); + handler.shutdown = true; + } + } + + if event.readiness().is_writable() { + if let SENDER = event.token() { + let cnt = if !handler.connected { + let addr = handler.rx.local_addr().unwrap(); + handler.tx.send_to(handler.buf.bytes(), &addr).unwrap() + } else { + handler.tx.send(handler.buf.bytes()).unwrap() + }; + + handler.buf.advance(cnt); + } + } + } + } +} + +/// Returns the sender and the receiver +fn connected_sockets() -> (UdpSocket, UdpSocket) { + let addr = localhost(); + let any = localhost(); + + let tx = UdpSocket::bind(&any).unwrap(); + let rx = UdpSocket::bind(&addr).unwrap(); + + let tx_addr = tx.local_addr().unwrap(); + let rx_addr = rx.local_addr().unwrap(); + + assert!(tx.connect(rx_addr).is_ok()); + assert!(rx.connect(tx_addr).is_ok()); + + (tx, rx) +} + +#[test] +pub fn test_udp_socket() { + let addr = localhost(); + let any = localhost(); + + let tx = UdpSocket::bind(&any).unwrap(); + let rx = UdpSocket::bind(&addr).unwrap(); + + test_send_recv_udp(tx, rx, false); +} + +#[test] +pub fn test_udp_socket_send_recv() { + let (tx, rx) = connected_sockets(); + + test_send_recv_udp(tx, rx, true); +} + +#[test] +pub fn test_udp_socket_discard() { + let addr = localhost(); + let any = localhost(); + let outside = localhost(); + + let tx = UdpSocket::bind(&any).unwrap(); + let rx = UdpSocket::bind(&addr).unwrap(); + let udp_outside = UdpSocket::bind(&outside).unwrap(); + + let tx_addr = tx.local_addr().unwrap(); + let rx_addr = rx.local_addr().unwrap(); + + assert!(tx.connect(rx_addr).is_ok()); + assert!(udp_outside.connect(rx_addr).is_ok()); + assert!(rx.connect(tx_addr).is_ok()); + + let poll = Poll::new().unwrap(); + + let r = udp_outside.send(b"hello world"); + assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock); + + poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap(); + poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + + poll.poll(&mut events, Some(time::Duration::from_secs(5))).unwrap(); + + for event in &events { + if event.readiness().is_readable() { + if let LISTENER = event.token() { + assert!(false, "Expected to no receive a packet but got something") + } + } + } +} + +#[cfg(all(unix, not(target_os = "fuchsia")))] +#[test] +pub fn test_udp_socket_send_recv_bufs() { + let (tx, rx) = connected_sockets(); + + let poll = Poll::new().unwrap(); + + poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()) + .unwrap(); + + poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()) + .unwrap(); + + let mut events = Events::with_capacity(1024); + + let data = b"hello, world"; + let write_bufs: Vec<_> = vec![b"hello, " as &[u8], b"world"] + .into_iter() + .flat_map(IoVec::from_bytes) + .collect(); + let (a, b, c) = ( + &mut [0u8; 4] as &mut [u8], + &mut [0u8; 6] as &mut [u8], + &mut [0u8; 8] as &mut [u8], + ); + let mut read_bufs: Vec<_> = vec![a, b, c] + .into_iter() + .flat_map(IoVec::from_bytes_mut) + .collect(); + + let times = 5; + let mut rtimes = 0; + let mut wtimes = 0; + + 'outer: loop { + poll.poll(&mut events, None).unwrap(); + + for event in &events { + if event.readiness().is_readable() { + if let LISTENER = event.token() { + loop { + let cnt = match rx.recv_bufs(read_bufs.as_mut()) { + Ok(cnt) => cnt, + Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, + Err(e) => panic!("read error {}", e), + }; + assert_eq!(cnt, data.len()); + let res: Vec<u8> = read_bufs + .iter() + .flat_map(|buf| buf.iter()) + .cloned() + .collect(); + assert_eq!(&res[..cnt], &data[..cnt]); + rtimes += 1; + if rtimes == times { + break 'outer; + } + } + } + } + + if event.readiness().is_writable() { + if let SENDER = event.token() { + while wtimes < times { + let cnt = match tx.send_bufs(write_bufs.as_slice()) { + Ok(cnt) => cnt, + Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, + Err(e) => panic!("write error {}", e), + }; + assert_eq!(cnt, data.len()); + wtimes += 1; + } + } + } + } + } +} diff --git a/third_party/rust/mio/test/test_uds_shutdown.rs b/third_party/rust/mio/test/test_uds_shutdown.rs new file mode 100644 index 0000000000..bf9644599d --- /dev/null +++ b/third_party/rust/mio/test/test_uds_shutdown.rs @@ -0,0 +1,300 @@ +use {TryRead, TryWrite}; +use mio::*; +use mio::deprecated::{EventLoop, Handler}; +use mio::deprecated::unix::*; +use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf}; +use slab::Slab; +use std::io; +use std::path::PathBuf; +use tempdir::TempDir; + +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +struct EchoConn { + sock: UnixStream, + buf: Option<ByteBuf>, + mut_buf: Option<MutByteBuf>, + token: Option<Token>, + interest: Ready +} + +impl EchoConn { + fn new(sock: UnixStream) -> EchoConn { + EchoConn { + sock: sock, + buf: None, + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token: None, + interest: Ready::hup() + } + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let mut buf = self.buf.take().unwrap(); + + match self.sock.try_write_buf(&mut buf) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + + self.buf = Some(buf); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CONN : we wrote {} bytes!", r); + + self.mut_buf = Some(buf.flip()); + + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + match self.sock.shutdown(Shutdown::Write) { + Err(e) => panic!(e), + _ => {}, + } + } + Err(e) => debug!("not implemented; client err={:?}", e), + } + + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CONN : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + debug!("CONN : we read {} bytes!", r); + + // prepare to provide this to writable + self.buf = Some(buf.flip()); + + self.interest.remove(Ready::readable()); + self.interest.insert(Ready::writable()); + } + Err(e) => { + debug!("not implemented; client err={:?}", e); + self.interest.remove(Ready::readable()); + } + + }; + + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, + PollOpt::edge()) + } +} + +struct EchoServer { + sock: UnixListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap(); + let conn = EchoConn::new(sock,); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(event_loop) + } + + fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, + tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(event_loop) + } + + fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: UnixStream, + msgs: Vec<&'static str>, + tx: SliceBuf<'static>, + rx: SliceBuf<'static>, + mut_buf: Option<MutByteBuf>, + token: Token, + interest: Ready +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient { + let curr = msgs.remove(0); + + EchoClient { + sock: sock, + msgs: msgs, + tx: SliceBuf::wrap(curr.as_bytes()), + rx: SliceBuf::wrap(curr.as_bytes()), + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token: tok, + interest: Ready::none() + } + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket readable"); + + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CLIENT : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + if r == 0 { + self.interest.remove(Ready::readable()); + event_loop.shutdown(); + } else { + debug!("CLIENT : We read {} bytes!", r); + + // prepare for reading + let mut buf = buf.flip(); + + while buf.has_remaining() { + let actual = buf.read_byte().unwrap(); + let expect = self.rx.read_byte().unwrap(); + + assert!(actual == expect, "actual={}; expect={}", actual, expect); + } + + self.mut_buf = Some(buf.flip()); + if !self.rx.has_remaining() { + self.next_msg(event_loop).unwrap(); + } + } + } + Err(e) => { + panic!("not implemented; client err={:?}", e); + } + }; + + event_loop.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket writable"); + + match self.sock.try_write_buf(&mut self.tx) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CLIENT : we wrote {} bytes!", r); + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e) + } + + event_loop.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } + + fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + if self.msgs.is_empty() { + + return Ok(()); + } + + let curr = self.msgs.remove(0); + + debug!("client prepping next message"); + self.tx = SliceBuf::wrap(curr.as_bytes()); + self.rx = SliceBuf::wrap(curr.as_bytes()); + + self.interest.insert(Ready::writable()); + event_loop.reregister(&self.sock, self.token, self.interest, + PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128) + }, + client: EchoClient::new(client, CLIENT, msgs) + } + } +} + +impl Handler for Echo { + type Timeout = usize; + type Message = (); + + fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, + events: Ready) { + debug!("ready {:?} {:?}", token, events); + if events.is_readable() { + match token { + SERVER => self.server.accept(event_loop).unwrap(), + CLIENT => self.client.readable(event_loop).unwrap(), + i => self.server.conn_readable(event_loop, i).unwrap() + } + } + + if events.is_writable() { + match token { + SERVER => panic!("received writable for token 0"), + CLIENT => self.client.writable(event_loop).unwrap(), + _ => self.server.conn_writable(event_loop, token).unwrap() + }; + } + } +} + +#[test] +pub fn test_echo_server() { + debug!("Starting TEST_ECHO_SERVER"); + let mut event_loop = EventLoop::new().unwrap(); + + let tmp_dir = TempDir::new("mio").unwrap(); + let addr = tmp_dir.path().join(&PathBuf::from("sock")); + + let srv = UnixListener::bind(&addr).unwrap(); + + event_loop.register(&srv, SERVER, Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = UnixStream::connect(&addr).unwrap(); + + // Connect to the server + event_loop.register(&sock, CLIENT, Ready::writable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Start the event loop + event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap(); +} diff --git a/third_party/rust/mio/test/test_unix_echo_server.rs b/third_party/rust/mio/test/test_unix_echo_server.rs new file mode 100644 index 0000000000..6531deac17 --- /dev/null +++ b/third_party/rust/mio/test/test_unix_echo_server.rs @@ -0,0 +1,292 @@ +use {TryRead, TryWrite}; +use mio::*; +use mio::deprecated::{EventLoop, Handler}; +use mio::deprecated::unix::*; +use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf}; +use slab::Slab; +use std::path::PathBuf; +use std::io; +use tempdir::TempDir; + +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +struct EchoConn { + sock: UnixStream, + buf: Option<ByteBuf>, + mut_buf: Option<MutByteBuf>, + token: Option<Token>, + interest: Ready, +} + +impl EchoConn { + fn new(sock: UnixStream) -> EchoConn { + EchoConn { + sock: sock, + buf: None, + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token: None, + interest: Ready::hup(), + } + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let mut buf = self.buf.take().unwrap(); + + match self.sock.try_write_buf(&mut buf) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + + self.buf = Some(buf); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CONN : we wrote {} bytes!", r); + + self.mut_buf = Some(buf.flip()); + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e), + } + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CONN : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + debug!("CONN : we read {} bytes!", r); + + // prepare to provide this to writable + self.buf = Some(buf.flip()); + + self.interest.remove(Ready::readable()); + self.interest.insert(Ready::writable()); + } + Err(e) => { + debug!("not implemented; client err={:?}", e); + self.interest.remove(Ready::readable()); + } + + }; + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct EchoServer { + sock: UnixListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap(); + let conn = EchoConn::new(sock); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(event_loop) + } + + fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(event_loop) + } + + fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: UnixStream, + msgs: Vec<&'static str>, + tx: SliceBuf<'static>, + rx: SliceBuf<'static>, + mut_buf: Option<MutByteBuf>, + token: Token, + interest: Ready, +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient { + let curr = msgs.remove(0); + + EchoClient { + sock: sock, + msgs: msgs, + tx: SliceBuf::wrap(curr.as_bytes()), + rx: SliceBuf::wrap(curr.as_bytes()), + mut_buf: Some(ByteBuf::mut_with_capacity(2048)), + token: tok, + interest: Ready::none(), + } + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket readable"); + + let mut buf = self.mut_buf.take().unwrap(); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + debug!("CLIENT : spurious read wakeup"); + self.mut_buf = Some(buf); + } + Ok(Some(r)) => { + debug!("CLIENT : We read {} bytes!", r); + + // prepare for reading + let mut buf = buf.flip(); + + debug!("CLIENT : buf = {:?} -- rx = {:?}", buf.bytes(), self.rx.bytes()); + while buf.has_remaining() { + let actual = buf.read_byte().unwrap(); + let expect = self.rx.read_byte().unwrap(); + + assert!(actual == expect, "actual={}; expect={}", actual, expect); + } + + self.mut_buf = Some(buf.flip()); + + self.interest.remove(Ready::readable()); + + if !self.rx.has_remaining() { + self.next_msg(event_loop).unwrap(); + } + } + Err(e) => { + panic!("not implemented; client err={:?}", e); + } + }; + + if !self.interest.is_none() { + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?; + } + + Ok(()) + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket writable"); + + match self.sock.try_write_buf(&mut self.tx) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CLIENT : we wrote {} bytes!", r); + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e) + } + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()) + } + + fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + if self.msgs.is_empty() { + event_loop.shutdown(); + return Ok(()); + } + + let curr = self.msgs.remove(0); + + debug!("client prepping next message"); + self.tx = SliceBuf::wrap(curr.as_bytes()); + self.rx = SliceBuf::wrap(curr.as_bytes()); + + self.interest.insert(Ready::writable()); + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128) + }, + client: EchoClient::new(client, CLIENT, msgs) + } + } +} + +impl Handler for Echo { + type Timeout = usize; + type Message = (); + + fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) { + if events.is_readable() { + match token { + SERVER => self.server.accept(event_loop).unwrap(), + CLIENT => self.client.readable(event_loop).unwrap(), + i => self.server.conn_readable(event_loop, i).unwrap() + }; + } + + if events.is_writable() { + match token { + SERVER => panic!("received writable for token 0"), + CLIENT => self.client.writable(event_loop).unwrap(), + _ => self.server.conn_writable(event_loop, token).unwrap() + }; + } + } +} + +#[test] +pub fn test_unix_echo_server() { + debug!("Starting TEST_UNIX_ECHO_SERVER"); + let mut event_loop = EventLoop::new().unwrap(); + + let tmp_dir = TempDir::new("mio").unwrap(); + let addr = tmp_dir.path().join(&PathBuf::from("sock")); + + let srv = UnixListener::bind(&addr).unwrap(); + + info!("listen for connections"); + event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = UnixStream::connect(&addr).unwrap(); + + // Connect to the server + event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Start the event loop + event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap(); +} diff --git a/third_party/rust/mio/test/test_unix_pass_fd.rs b/third_party/rust/mio/test/test_unix_pass_fd.rs new file mode 100644 index 0000000000..a948bb2ff6 --- /dev/null +++ b/third_party/rust/mio/test/test_unix_pass_fd.rs @@ -0,0 +1,306 @@ +use {TryRead, TryWrite}; +use mio::*; +use mio::deprecated::{EventLoop, Handler}; +use mio::deprecated::unix::*; +use bytes::{Buf, ByteBuf, SliceBuf}; +use slab::Slab; +use std::path::PathBuf; +use std::io::{self, Read}; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use tempdir::TempDir; + +const SERVER: Token = Token(10_000_000); +const CLIENT: Token = Token(10_000_001); + +struct EchoConn { + sock: UnixStream, + pipe_fd: Option<PipeReader>, + token: Option<Token>, + interest: Ready, +} + +impl EchoConn { + fn new(sock: UnixStream) -> EchoConn { + EchoConn { + sock: sock, + pipe_fd: None, + token: None, + interest: Ready::hup(), + } + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let fd = self.pipe_fd.take().unwrap(); + + match self.sock.try_write_send_fd(b"x", fd.as_raw_fd()) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + + self.pipe_fd = Some(fd); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CONN : we wrote {} bytes!", r); + + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e), + } + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot()) + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + let mut buf = ByteBuf::mut_with_capacity(2048); + + match self.sock.try_read_buf(&mut buf) { + Ok(None) => { + panic!("We just got readable, but were unable to read from the socket?"); + } + Ok(Some(r)) => { + debug!("CONN : we read {} bytes!", r); + self.interest.remove(Ready::readable()); + self.interest.insert(Ready::writable()); + } + Err(e) => { + debug!("not implemented; client err={:?}", e); + self.interest.remove(Ready::readable()); + } + + }; + + // create fd to pass back. Assume that the write will work + // without blocking, for simplicity -- we're only testing that + // the FD makes it through somehow + let (rd, mut wr) = pipe().unwrap(); + let mut buf = buf.flip(); + match wr.try_write_buf(&mut buf) { + Ok(None) => { + panic!("writing to our own pipe blocked :("); + } + Ok(Some(r)) => { + debug!("CONN: we wrote {} bytes to the FD", r); + } + Err(e) => { + panic!("not implemented; client err={:?}", e); + } + } + self.pipe_fd = Some(rd); + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct EchoServer { + sock: UnixListener, + conns: Slab<EchoConn> +} + +impl EchoServer { + fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("server accepting socket"); + + let sock = self.sock.accept().unwrap(); + let conn = EchoConn::new(sock); + let tok = self.conns.insert(conn); + + // Register the connection + self.conns[tok].token = Some(Token(tok)); + event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) + .expect("could not register socket with event loop"); + + Ok(()) + } + + fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> { + debug!("server conn readable; tok={:?}", tok); + self.conn(tok).readable(event_loop) + } + + fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> { + debug!("server conn writable; tok={:?}", tok); + self.conn(tok).writable(event_loop) + } + + fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { + &mut self.conns[tok.into()] + } +} + +struct EchoClient { + sock: UnixStream, + msgs: Vec<&'static str>, + tx: SliceBuf<'static>, + rx: SliceBuf<'static>, + token: Token, + interest: Ready, +} + + +// Sends a message and expects to receive the same exact message, one at a time +impl EchoClient { + fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient { + let curr = msgs.remove(0); + + EchoClient { + sock: sock, + msgs: msgs, + tx: SliceBuf::wrap(curr.as_bytes()), + rx: SliceBuf::wrap(curr.as_bytes()), + token: tok, + interest: Ready::none(), + } + } + + fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket readable"); + + let mut pipe: PipeReader; + let mut buf = [0; 256]; + + match self.sock.read_recv_fd(&mut buf) { + Ok((_, None)) => { + panic!("Did not receive passed file descriptor"); + } + Ok((r, Some(fd))) => { + assert_eq!(r, 1); + assert_eq!(b'x', buf[0]); + debug!("CLIENT : We read {} bytes!", r); + pipe = From::<Io>::from(unsafe { Io::from_raw_fd(fd) }); + } + Err(e) => { + panic!("not implemented; client err={:?}", e); + } + }; + + // read the data out of the FD itself + let n = match pipe.read(&mut buf) { + Ok(r) => { + debug!("CLIENT : We read {} bytes from the FD", r); + r + } + Err(e) => { + panic!("not implemented, client err={:?}", e); + } + }; + + for &actual in buf[0..n].iter() { + let expect = self.rx.read_byte().unwrap(); + assert!(actual == expect, "actual={}; expect={}", actual, expect); + } + + self.interest.remove(Ready::readable()); + + if !self.rx.has_remaining() { + self.next_msg(event_loop).unwrap(); + } + + if !self.interest.is_none() { + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?; + } + + Ok(()) + } + + fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + debug!("client socket writable"); + + match self.sock.try_write_buf(&mut self.tx) { + Ok(None) => { + debug!("client flushing buf; WOULDBLOCK"); + self.interest.insert(Ready::writable()); + } + Ok(Some(r)) => { + debug!("CLIENT : we wrote {} bytes!", r); + self.interest.insert(Ready::readable()); + self.interest.remove(Ready::writable()); + } + Err(e) => debug!("not implemented; client err={:?}", e) + } + + assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()) + } + + fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> { + if self.msgs.is_empty() { + event_loop.shutdown(); + return Ok(()); + } + + let curr = self.msgs.remove(0); + + debug!("client prepping next message"); + self.tx = SliceBuf::wrap(curr.as_bytes()); + self.rx = SliceBuf::wrap(curr.as_bytes()); + + self.interest.insert(Ready::writable()); + event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()) + } +} + +struct Echo { + server: EchoServer, + client: EchoClient, +} + +impl Echo { + fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo { + Echo { + server: EchoServer { + sock: srv, + conns: Slab::with_capacity(128) + }, + client: EchoClient::new(client, CLIENT, msgs) + } + } +} + +impl Handler for Echo { + type Timeout = usize; + type Message = (); + + fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) { + if events.is_readable() { + match token { + SERVER => self.server.accept(event_loop).unwrap(), + CLIENT => self.client.readable(event_loop).unwrap(), + i => self.server.conn_readable(event_loop, i).unwrap() + }; + } + + if events.is_writable() { + match token { + SERVER => panic!("received writable for token 0"), + CLIENT => self.client.writable(event_loop).unwrap(), + _ => self.server.conn_writable(event_loop, token).unwrap() + }; + } + } +} + +#[test] +pub fn test_unix_pass_fd() { + debug!("Starting TEST_UNIX_PASS_FD"); + let mut event_loop = EventLoop::new().unwrap(); + + let tmp_dir = TempDir::new("mio").unwrap(); + let addr = tmp_dir.path().join(&PathBuf::from("sock")); + + let srv = UnixListener::bind(&addr).unwrap(); + + info!("listen for connections"); + event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + let sock = UnixStream::connect(&addr).unwrap(); + + // Connect to the server + event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + // Start the event loop + event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap(); +} diff --git a/third_party/rust/mio/test/test_write_then_drop.rs b/third_party/rust/mio/test/test_write_then_drop.rs new file mode 100644 index 0000000000..971ffff7c2 --- /dev/null +++ b/third_party/rust/mio/test/test_write_then_drop.rs @@ -0,0 +1,123 @@ +use std::io::{Write, Read}; + +use mio::event::Evented; +use mio::net::{TcpListener, TcpStream}; +use mio::{Poll, Events, Ready, PollOpt, Token}; + +#[test] +fn write_then_drop() { + drop(::env_logger::init()); + + let a = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = a.local_addr().unwrap(); + let mut s = TcpStream::connect(&addr).unwrap(); + + let poll = Poll::new().unwrap(); + + a.register(&poll, + Token(1), + Ready::readable(), + PollOpt::edge()).unwrap(); + s.register(&poll, + Token(3), + Ready::empty(), + PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(1)); + + let mut s2 = a.accept().unwrap().0; + + s2.register(&poll, + Token(2), + Ready::writable(), + PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(2)); + + s2.write_all(&[1, 2, 3, 4]).unwrap(); + drop(s2); + + s.reregister(&poll, + Token(3), + Ready::readable(), + PollOpt::edge()).unwrap(); + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(3)); + + let mut buf = [0; 10]; + assert_eq!(s.read(&mut buf).unwrap(), 4); + assert_eq!(&buf[0..4], &[1, 2, 3, 4]); +} + +#[test] +fn write_then_deregister() { + drop(::env_logger::init()); + + let a = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = a.local_addr().unwrap(); + let mut s = TcpStream::connect(&addr).unwrap(); + + let poll = Poll::new().unwrap(); + + a.register(&poll, + Token(1), + Ready::readable(), + PollOpt::edge()).unwrap(); + s.register(&poll, + Token(3), + Ready::empty(), + PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(1)); + + let mut s2 = a.accept().unwrap().0; + + s2.register(&poll, + Token(2), + Ready::writable(), + PollOpt::edge()).unwrap(); + + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(2)); + + s2.write_all(&[1, 2, 3, 4]).unwrap(); + s2.deregister(&poll).unwrap(); + + s.reregister(&poll, + Token(3), + Ready::readable(), + PollOpt::edge()).unwrap(); + let mut events = Events::with_capacity(1024); + while events.is_empty() { + poll.poll(&mut events, None).unwrap(); + } + assert_eq!(events.len(), 1); + assert_eq!(events.get(0).unwrap().token(), Token(3)); + + let mut buf = [0; 10]; + assert_eq!(s.read(&mut buf).unwrap(), 4); + assert_eq!(&buf[0..4], &[1, 2, 3, 4]); +} |