summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio/test
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/mio/test
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio/test')
-rw-r--r--third_party/rust/mio/test/benchmark.rs80
-rw-r--r--third_party/rust/mio/test/mod.rs214
-rw-r--r--third_party/rust/mio/test/test_battery.rs269
-rw-r--r--third_party/rust/mio/test/test_broken_pipe.rs28
-rw-r--r--third_party/rust/mio/test/test_close_on_drop.rs119
-rw-r--r--third_party/rust/mio/test/test_custom_evented.rs394
-rw-r--r--third_party/rust/mio/test/test_double_register.rs17
-rw-r--r--third_party/rust/mio/test/test_echo_server.rs303
-rw-r--r--third_party/rust/mio/test/test_fuchsia_handles.rs30
-rw-r--r--third_party/rust/mio/test/test_local_addr_ready.rs67
-rw-r--r--third_party/rust/mio/test/test_multicast.rs107
-rw-r--r--third_party/rust/mio/test/test_notify.rs192
-rw-r--r--third_party/rust/mio/test/test_oneshot.rs64
-rw-r--r--third_party/rust/mio/test/test_poll.rs18
-rw-r--r--third_party/rust/mio/test/test_poll_channel.rs285
-rw-r--r--third_party/rust/mio/test/test_register_deregister.rs123
-rw-r--r--third_party/rust/mio/test/test_register_multiple_event_loops.rs63
-rw-r--r--third_party/rust/mio/test/test_reregister_without_poll.rs28
-rw-r--r--third_party/rust/mio/test/test_smoke.rs23
-rw-r--r--third_party/rust/mio/test/test_subprocess_pipe.rs249
-rw-r--r--third_party/rust/mio/test/test_tcp.rs660
-rw-r--r--third_party/rust/mio/test/test_tcp_level.rs142
-rw-r--r--third_party/rust/mio/test/test_tcp_shutdown.rs248
-rw-r--r--third_party/rust/mio/test/test_tick.rs64
-rw-r--r--third_party/rust/mio/test/test_udp_level.rs52
-rw-r--r--third_party/rust/mio/test/test_udp_socket.rs252
-rw-r--r--third_party/rust/mio/test/test_uds_shutdown.rs300
-rw-r--r--third_party/rust/mio/test/test_unix_echo_server.rs292
-rw-r--r--third_party/rust/mio/test/test_unix_pass_fd.rs306
-rw-r--r--third_party/rust/mio/test/test_write_then_drop.rs123
30 files changed, 5112 insertions, 0 deletions
diff --git a/third_party/rust/mio/test/benchmark.rs b/third_party/rust/mio/test/benchmark.rs
new file mode 100644
index 0000000000..1e4fac517c
--- /dev/null
+++ b/third_party/rust/mio/test/benchmark.rs
@@ -0,0 +1,80 @@
+use std::mem;
+use mio::net::{AddressFamily, Inet, Inet6, SockAddr, InetAddr, IPv4Addr, SocketType, Dgram, Stream};
+use std::io::net::ip::IpAddr;
+use native::NativeTaskBuilder;
+use std::task::TaskBuilder;
+use mio::os::{from_sockaddr};
+use time::Instant;
+use std::vec::*;
+use std::io::timer;
+
+mod nix {
+ pub use nix::c_int;
+ pub use nix::fcntl::{Fd, O_NONBLOCK, O_CLOEXEC};
+ pub use nix::errno::{EWOULDBLOCK, EINPROGRESS};
+ pub use nix::sys::socket::*;
+ pub use nix::unistd::*;
+ pub use nix::sys::epoll::*;
+}
+
+fn timed(label: &str, f: ||) {
+ let start = Instant::now();
+ f();
+ let elapsed = start.elapsed();
+ println!(" {}: {}", label, elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0);
+}
+
+fn init(saddr: &str) -> (nix::Fd, nix::Fd) {
+ let optval = 1i;
+ let addr = SockAddr::parse(saddr.as_slice()).expect("could not parse InetAddr");
+ let srvfd = nix::socket(nix::AF_INET, nix::SOCK_STREAM, nix::SOCK_CLOEXEC).unwrap();
+ nix::setsockopt(srvfd, nix::SOL_SOCKET, nix::SO_REUSEADDR, &optval).unwrap();
+ nix::bind(srvfd, &from_sockaddr(&addr)).unwrap();
+ nix::listen(srvfd, 256u).unwrap();
+
+ let fd = nix::socket(nix::AF_INET, nix::SOCK_STREAM, nix::SOCK_CLOEXEC | nix::SOCK_NONBLOCK).unwrap();
+ let res = nix::connect(fd, &from_sockaddr(&addr));
+ let start = Instant::now();
+ println!("connecting : {}", res);
+
+ let clifd = nix::accept4(srvfd, nix::SOCK_CLOEXEC | nix::SOCK_NONBLOCK).unwrap();
+ let elapsed = start.elapsed();
+ println!("accepted : {} - {}", clifd, elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0);
+
+ (clifd, srvfd)
+}
+
+#[test]
+fn read_bench() {
+ let (clifd, srvfd) = init("10.10.1.5:11111");
+ let mut buf = Vec::with_capacity(1600);
+ unsafe { buf.set_len(1600); }
+ timed("read", || {
+ let mut i = 0u;
+ while i < 10000000 {
+ let res = nix::read(clifd, buf.as_mut_slice());
+ assert_eq!(res.unwrap_err().kind, nix::EWOULDBLOCK);
+ i = i + 1;
+ }
+ });
+}
+
+#[test]
+fn epollctl_bench() {
+ let (clifd, srvfd) = init("10.10.1.5:22222");
+
+ let epfd = nix::epoll_create().unwrap();
+ let info = nix::EpollEvent { events: nix::EPOLLIN | nix::EPOLLONESHOT | nix::EPOLLET,
+ data: 0u64 };
+
+ nix::epoll_ctl(epfd, nix::EpollCtlAdd, clifd, &info);
+
+ timed("epoll_ctl", || {
+ let mut i = 0u;
+ while i < 10000000 {
+ nix::epoll_ctl(epfd, nix::EpollCtlMod, clifd, &info);
+ i = i + 1;
+ }
+ });
+
+}
diff --git a/third_party/rust/mio/test/mod.rs b/third_party/rust/mio/test/mod.rs
new file mode 100644
index 0000000000..e49034f17e
--- /dev/null
+++ b/third_party/rust/mio/test/mod.rs
@@ -0,0 +1,214 @@
+#![allow(deprecated)]
+
+extern crate mio;
+extern crate bytes;
+extern crate net2;
+
+#[macro_use]
+extern crate log;
+extern crate env_logger;
+extern crate iovec;
+extern crate slab;
+extern crate tempdir;
+
+#[cfg(target_os = "fuchsia")]
+extern crate fuchsia_zircon as zircon;
+
+pub use ports::localhost;
+
+mod test_custom_evented;
+mod test_close_on_drop;
+mod test_double_register;
+mod test_echo_server;
+mod test_local_addr_ready;
+mod test_multicast;
+mod test_oneshot;
+mod test_poll;
+mod test_register_deregister;
+mod test_register_multiple_event_loops;
+mod test_reregister_without_poll;
+mod test_smoke;
+mod test_tcp;
+mod test_tcp_level;
+mod test_tcp_shutdown;
+mod test_udp_level;
+mod test_udp_socket;
+mod test_write_then_drop;
+
+#[cfg(feature = "with-deprecated")]
+mod test_notify;
+#[cfg(feature = "with-deprecated")]
+mod test_poll_channel;
+#[cfg(feature = "with-deprecated")]
+mod test_tick;
+
+// The following tests are for deprecated features. Only run these tests on
+// platforms that were supported from before the features were deprecated
+#[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
+#[cfg(feature = "with-deprecated")]
+mod test_battery;
+
+#[cfg(any(target_os = "macos", target_os = "linux"))]
+#[cfg(feature = "with-deprecated")]
+mod test_unix_echo_server;
+#[cfg(any(target_os = "macos", target_os = "linux"))]
+#[cfg(feature = "with-deprecated")]
+mod test_unix_pass_fd;
+#[cfg(any(target_os = "macos", target_os = "linux"))]
+#[cfg(feature = "with-deprecated")]
+mod test_uds_shutdown;
+#[cfg(any(target_os = "macos", target_os = "linux"))]
+#[cfg(feature = "with-deprecated")]
+mod test_subprocess_pipe;
+#[cfg(any(target_os = "macos", target_os = "linux"))]
+#[cfg(feature = "with-deprecated")]
+mod test_broken_pipe;
+
+#[cfg(any(target_os = "fuchsia"))]
+mod test_fuchsia_handles;
+
+use bytes::{Buf, MutBuf};
+use std::io::{self, Read, Write};
+use std::time::Duration;
+use mio::{Events, Poll};
+use mio::event::Event;
+
+pub trait TryRead {
+ fn try_read_buf<B: MutBuf>(&mut self, buf: &mut B) -> io::Result<Option<usize>>
+ where Self : Sized
+ {
+ // Reads the length of the slice supplied by buf.mut_bytes into the buffer
+ // This is not guaranteed to consume an entire datagram or segment.
+ // If your protocol is msg based (instead of continuous stream) you should
+ // ensure that your buffer is large enough to hold an entire segment (1532 bytes if not jumbo
+ // frames)
+ let res = self.try_read(unsafe { buf.mut_bytes() });
+
+ if let Ok(Some(cnt)) = res {
+ unsafe { buf.advance(cnt); }
+ }
+
+ res
+ }
+
+ fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>>;
+}
+
+pub trait TryWrite {
+ fn try_write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<Option<usize>>
+ where Self : Sized
+ {
+ let res = self.try_write(buf.bytes());
+
+ if let Ok(Some(cnt)) = res {
+ buf.advance(cnt);
+ }
+
+ res
+ }
+
+ fn try_write(&mut self, buf: &[u8]) -> io::Result<Option<usize>>;
+}
+
+impl<T: Read> TryRead for T {
+ fn try_read(&mut self, dst: &mut [u8]) -> io::Result<Option<usize>> {
+ self.read(dst).map_non_block()
+ }
+}
+
+impl<T: Write> TryWrite for T {
+ fn try_write(&mut self, src: &[u8]) -> io::Result<Option<usize>> {
+ self.write(src).map_non_block()
+ }
+}
+
+/*
+ *
+ * ===== Helpers =====
+ *
+ */
+
+/// A helper trait to provide the map_non_block function on Results.
+trait MapNonBlock<T> {
+ /// Maps a `Result<T>` to a `Result<Option<T>>` by converting
+ /// operation-would-block errors into `Ok(None)`.
+ fn map_non_block(self) -> io::Result<Option<T>>;
+}
+
+impl<T> MapNonBlock<T> for io::Result<T> {
+ fn map_non_block(self) -> io::Result<Option<T>> {
+ use std::io::ErrorKind::WouldBlock;
+
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(err) => {
+ if let WouldBlock = err.kind() {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ }
+ }
+ }
+}
+
+mod ports {
+ use std::net::SocketAddr;
+ use std::str::FromStr;
+ use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
+ use std::sync::atomic::Ordering::SeqCst;
+
+ // Helper for getting a unique port for the task run
+ // TODO: Reuse ports to not spam the system
+ static mut NEXT_PORT: AtomicUsize = ATOMIC_USIZE_INIT;
+ const FIRST_PORT: usize = 18080;
+
+ fn next_port() -> usize {
+ unsafe {
+ // If the atomic was never used, set it to the initial port
+ NEXT_PORT.compare_and_swap(0, FIRST_PORT, SeqCst);
+
+ // Get and increment the port list
+ NEXT_PORT.fetch_add(1, SeqCst)
+ }
+ }
+
+ pub fn localhost() -> SocketAddr {
+ let s = format!("127.0.0.1:{}", next_port());
+ FromStr::from_str(&s).unwrap()
+ }
+}
+
+pub fn sleep_ms(ms: u64) {
+ use std::thread;
+ thread::sleep(Duration::from_millis(ms));
+}
+
+pub fn expect_events(poll: &Poll,
+ event_buffer: &mut Events,
+ poll_try_count: usize,
+ mut expected: Vec<Event>)
+{
+ const MS: u64 = 1_000;
+
+ for _ in 0..poll_try_count {
+ poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+ for event in event_buffer.iter() {
+ let pos_opt = match expected.iter().position(|exp_event| {
+ (event.token() == exp_event.token()) &&
+ event.readiness().contains(exp_event.readiness())
+ }) {
+ Some(x) => Some(x),
+ None => None,
+ };
+ if let Some(pos) = pos_opt { expected.remove(pos); }
+ }
+
+ if expected.is_empty() {
+ break;
+ }
+ }
+
+ assert!(expected.is_empty(), "The following expected events were not found: {:?}", expected);
+}
+
diff --git a/third_party/rust/mio/test/test_battery.rs b/third_party/rust/mio/test/test_battery.rs
new file mode 100644
index 0000000000..fa3aff04df
--- /dev/null
+++ b/third_party/rust/mio/test/test_battery.rs
@@ -0,0 +1,269 @@
+use {localhost, sleep_ms, TryRead, TryWrite};
+use mio::*;
+use mio::deprecated::{EventLoop, EventLoopBuilder, Handler};
+use mio::net::{TcpListener, TcpStream};
+use std::collections::LinkedList;
+use slab::Slab;
+use std::{io, thread};
+use std::time::Duration;
+
+// Don't touch the connection slab
+const SERVER: Token = Token(10_000_000);
+const CLIENT: Token = Token(10_000_001);
+
+#[cfg(windows)]
+const N: usize = 10_000;
+#[cfg(unix)]
+const N: usize = 1_000_000;
+
+struct EchoConn {
+ sock: TcpStream,
+ token: Option<Token>,
+ count: usize,
+ buf: Vec<u8>
+}
+
+impl EchoConn {
+ fn new(sock: TcpStream) -> EchoConn {
+ let mut ec =
+ EchoConn {
+ sock: sock,
+ token: None,
+ buf: Vec::with_capacity(22),
+ count: 0
+ };
+ unsafe { ec.buf.set_len(22) };
+ ec
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ event_loop.reregister(&self.sock, self.token.unwrap(),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ loop {
+ match self.sock.try_read(&mut self.buf[..]) {
+ Ok(None) => {
+ break;
+ }
+ Ok(Some(_)) => {
+ self.count += 1;
+ if self.count % 10000 == 0 {
+ info!("Received {} messages", self.count);
+ }
+ if self.count == N {
+ event_loop.shutdown();
+ }
+ }
+ Err(_) => {
+ break;
+ }
+
+ };
+ }
+
+ event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct EchoServer {
+ sock: TcpListener,
+ conns: Slab<EchoConn>
+}
+
+impl EchoServer {
+ fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("server accepting socket");
+
+ let sock = self.sock.accept().unwrap().0;
+ let conn = EchoConn::new(sock,);
+ let tok = self.conns.insert(conn);
+
+ // Register the connection
+ self.conns[tok].token = Some(Token(tok));
+ event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot())
+ .expect("could not register socket with event loop");
+
+ Ok(())
+ }
+
+ fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn readable; tok={:?}", tok);
+ self.conn(tok).readable(event_loop)
+ }
+
+ fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn writable; tok={:?}", tok);
+ self.conn(tok).writable(event_loop)
+ }
+
+ fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
+ &mut self.conns[tok.into()]
+ }
+}
+
+struct EchoClient {
+ sock: TcpStream,
+ backlog: LinkedList<String>,
+ token: Token,
+ count: u32
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl EchoClient {
+ fn new(sock: TcpStream, tok: Token) -> EchoClient {
+
+ EchoClient {
+ sock: sock,
+ backlog: LinkedList::new(),
+ token: tok,
+ count: 0
+ }
+ }
+
+ fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ Ok(())
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket writable");
+
+ while self.backlog.len() > 0 {
+ match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) {
+ Ok(None) => {
+ break;
+ }
+ Ok(Some(_)) => {
+ self.backlog.pop_front();
+ self.count += 1;
+ if self.count % 10000 == 0 {
+ info!("Sent {} messages", self.count);
+ }
+ }
+ Err(e) => { debug!("not implemented; client err={:?}", e); break; }
+ }
+ }
+ if self.backlog.len() > 0 {
+ event_loop.reregister(&self.sock, self.token, Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+ }
+
+ Ok(())
+ }
+}
+
+struct Echo {
+ server: EchoServer,
+ client: EchoClient,
+}
+
+impl Echo {
+ fn new(srv: TcpListener, client: TcpStream) -> Echo {
+ Echo {
+ server: EchoServer {
+ sock: srv,
+ conns: Slab::with_capacity(128),
+ },
+ client: EchoClient::new(client, CLIENT),
+ }
+ }
+}
+
+impl Handler for Echo {
+ type Timeout = usize;
+ type Message = String;
+
+ fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
+ events: Ready) {
+
+ if events.is_readable() {
+ match token {
+ SERVER => self.server.accept(event_loop).unwrap(),
+ CLIENT => self.client.readable(event_loop).unwrap(),
+ i => self.server.conn_readable(event_loop, i).unwrap()
+ }
+ }
+ if events.is_writable() {
+ match token {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => self.client.writable(event_loop).unwrap(),
+ _ => self.server.conn_writable(event_loop, token).unwrap()
+ }
+ }
+ }
+
+ fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) {
+ match self.client.sock.try_write(msg.as_bytes()) {
+ Ok(Some(n)) => {
+ self.client.count += 1;
+ if self.client.count % 10000 == 0 {
+ info!("Sent {} bytes: count {}", n, self.client.count);
+ }
+ },
+
+ _ => {
+ self.client.backlog.push_back(msg);
+ event_loop.reregister(
+ &self.client.sock,
+ self.client.token,
+ Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+ }
+ }
+ }
+}
+
+#[test]
+pub fn test_echo_server() {
+ debug!("Starting TEST_ECHO_SERVER");
+ let mut b = EventLoopBuilder::new();
+ b.notify_capacity(1_048_576)
+ .messages_per_tick(64)
+ .timer_tick(Duration::from_millis(100))
+ .timer_wheel_size(1_024)
+ .timer_capacity(65_536);
+
+ let mut event_loop = b.build().unwrap();
+
+ let addr = localhost();
+
+ let srv = TcpListener::bind(&addr).unwrap();
+
+ info!("listen for connections");
+ event_loop.register(&srv, SERVER, Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let sock = TcpStream::connect(&addr).unwrap();
+
+ // Connect to the server
+ event_loop.register(&sock, CLIENT, Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+ let chan = event_loop.channel();
+
+ let go = move || {
+ let mut i = N;
+
+ sleep_ms(1_000);
+
+ let message = "THIS IS A TEST MESSAGE".to_string();
+ while i > 0 {
+ chan.send(message.clone()).unwrap();
+ i -= 1;
+ if i % 10000 == 0 {
+ info!("Enqueued {} messages", N - i);
+ }
+ }
+ };
+
+ let t = thread::spawn(go);
+
+ // Start the event loop
+ event_loop.run(&mut Echo::new(srv, sock)).unwrap();
+ t.join().unwrap();
+}
diff --git a/third_party/rust/mio/test/test_broken_pipe.rs b/third_party/rust/mio/test/test_broken_pipe.rs
new file mode 100644
index 0000000000..1cd0ca7465
--- /dev/null
+++ b/third_party/rust/mio/test/test_broken_pipe.rs
@@ -0,0 +1,28 @@
+use mio::{Token, Ready, PollOpt};
+use mio::deprecated::{unix, EventLoop, Handler};
+use std::time::Duration;
+
+pub struct BrokenPipeHandler;
+
+impl Handler for BrokenPipeHandler {
+ type Timeout = ();
+ type Message = ();
+ fn ready(&mut self, _: &mut EventLoop<Self>, token: Token, _: Ready) {
+ if token == Token(1) {
+ panic!("Received ready() on a closed pipe.");
+ }
+ }
+}
+
+#[test]
+pub fn broken_pipe() {
+ let mut event_loop: EventLoop<BrokenPipeHandler> = EventLoop::new().unwrap();
+ let (reader, _) = unix::pipe().unwrap();
+
+ event_loop.register(&reader, Token(1), Ready::all(), PollOpt::edge())
+ .unwrap();
+
+ let mut handler = BrokenPipeHandler;
+ drop(reader);
+ event_loop.run_once(&mut handler, Some(Duration::from_millis(1000))).unwrap();
+}
diff --git a/third_party/rust/mio/test/test_close_on_drop.rs b/third_party/rust/mio/test/test_close_on_drop.rs
new file mode 100644
index 0000000000..6cd7d1365c
--- /dev/null
+++ b/third_party/rust/mio/test/test_close_on_drop.rs
@@ -0,0 +1,119 @@
+use {localhost, TryRead};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use bytes::ByteBuf;
+use mio::net::{TcpListener, TcpStream};
+
+use self::TestState::{Initial, AfterRead};
+
+const SERVER: Token = Token(0);
+const CLIENT: Token = Token(1);
+
+#[derive(Debug, PartialEq)]
+enum TestState {
+ Initial,
+ AfterRead,
+}
+
+struct TestHandler {
+ srv: TcpListener,
+ cli: TcpStream,
+ state: TestState,
+ shutdown: bool,
+}
+
+impl TestHandler {
+ fn new(srv: TcpListener, cli: TcpStream) -> TestHandler {
+ TestHandler {
+ srv,
+ cli,
+ state: Initial,
+ shutdown: false,
+ }
+ }
+
+ fn handle_read(&mut self, poll: &mut Poll, tok: Token, events: Ready) {
+ debug!("readable; tok={:?}; hint={:?}", tok, events);
+
+ match tok {
+ SERVER => {
+ debug!("server connection ready for accept");
+ let _ = self.srv.accept().unwrap();
+ }
+ CLIENT => {
+ debug!("client readable");
+
+ match self.state {
+ Initial => {
+ let mut buf = [0; 4096];
+ debug!("GOT={:?}", self.cli.try_read(&mut buf[..]));
+ self.state = AfterRead;
+ },
+ AfterRead => {}
+ }
+
+ let mut buf = ByteBuf::mut_with_capacity(1024);
+
+ match self.cli.try_read_buf(&mut buf) {
+ Ok(Some(0)) => self.shutdown = true,
+ Ok(_) => panic!("the client socket should not be readable"),
+ Err(e) => panic!("Unexpected error {:?}", e)
+ }
+ }
+ _ => panic!("received unknown token {:?}", tok)
+ }
+ poll.reregister(&self.cli, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();
+ }
+
+ fn handle_write(&mut self, poll: &mut Poll, tok: Token, _: Ready) {
+ match tok {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => {
+ debug!("client connected");
+ poll.reregister(&self.cli, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();
+ }
+ _ => panic!("received unknown token {:?}", tok)
+ }
+ }
+}
+
+#[test]
+pub fn test_close_on_drop() {
+ let _ = ::env_logger::init();
+ debug!("Starting TEST_CLOSE_ON_DROP");
+ let mut poll = Poll::new().unwrap();
+
+ // The address to connect to - localhost + a unique port
+ let addr = localhost();
+
+ // == Create & setup server socket
+ let srv = TcpListener::bind(&addr).unwrap();
+
+ poll.register(&srv, SERVER, Ready::readable(), PollOpt::edge()).unwrap();
+
+ // == Create & setup client socket
+ let sock = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&sock, CLIENT, Ready::writable(), PollOpt::edge()).unwrap();
+
+ // == Create storage for events
+ let mut events = Events::with_capacity(1024);
+
+ // == Setup test handler
+ let mut handler = TestHandler::new(srv, sock);
+
+ // == Run test
+ while !handler.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.readiness().is_readable() {
+ handler.handle_read(&mut poll, event.token(), event.readiness());
+ }
+
+ if event.readiness().is_writable() {
+ handler.handle_write(&mut poll, event.token(), event.readiness());
+ }
+ }
+ }
+ assert!(handler.state == AfterRead, "actual={:?}", handler.state);
+}
diff --git a/third_party/rust/mio/test/test_custom_evented.rs b/third_party/rust/mio/test/test_custom_evented.rs
new file mode 100644
index 0000000000..08842fc289
--- /dev/null
+++ b/third_party/rust/mio/test/test_custom_evented.rs
@@ -0,0 +1,394 @@
+use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use mio::event::Evented;
+use std::time::Duration;
+
+#[test]
+fn smoke() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+
+ let (r, set) = Registration::new2();
+ r.register(&poll, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
+ assert_eq!(n, 0);
+
+ set.set_readiness(Ready::readable()).unwrap();
+
+ let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
+ assert_eq!(n, 1);
+
+ assert_eq!(events.get(0).unwrap().token(), Token(0));
+}
+
+#[test]
+fn set_readiness_before_register() {
+ use std::sync::{Arc, Barrier};
+ use std::thread;
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+
+ for _ in 0..5_000 {
+ let (r, set) = Registration::new2();
+
+ let b1 = Arc::new(Barrier::new(2));
+ let b2 = b1.clone();
+
+ let th = thread::spawn(move || {
+ // set readiness before register
+ set.set_readiness(Ready::readable()).unwrap();
+
+ // run into barrier so both can pass
+ b2.wait();
+ });
+
+ // wait for readiness
+ b1.wait();
+
+ // now register
+ poll.register(&r, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
+
+ loop {
+ let n = poll.poll(&mut events, None).unwrap();
+
+ if n == 0 {
+ continue;
+ }
+
+ assert_eq!(n, 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(123));
+ break;
+ }
+
+ th.join().unwrap();
+ }
+}
+
+#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
+mod stress {
+ use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+ use mio::event::Evented;
+ use std::time::Duration;
+
+ #[test]
+ fn single_threaded_poll() {
+ use std::sync::Arc;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::{Acquire, Release};
+ use std::thread;
+
+ const NUM_ATTEMPTS: usize = 30;
+ const NUM_ITERS: usize = 500;
+ const NUM_THREADS: usize = 4;
+ const NUM_REGISTRATIONS: usize = 128;
+
+ for _ in 0..NUM_ATTEMPTS {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(NUM_REGISTRATIONS);
+
+ let registrations: Vec<_> = (0..NUM_REGISTRATIONS).map(|i| {
+ let (r, s) = Registration::new2();
+ r.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
+ (r, s)
+ }).collect();
+
+ let mut ready: Vec<_> = (0..NUM_REGISTRATIONS).map(|_| Ready::empty()).collect();
+
+ let remaining = Arc::new(AtomicUsize::new(NUM_THREADS));
+
+ for _ in 0..NUM_THREADS {
+ let remaining = remaining.clone();
+
+ let set_readiness: Vec<SetReadiness> =
+ registrations.iter().map(|r| r.1.clone()).collect();
+
+ thread::spawn(move || {
+ for _ in 0..NUM_ITERS {
+ for i in 0..NUM_REGISTRATIONS {
+ set_readiness[i].set_readiness(Ready::readable()).unwrap();
+ set_readiness[i].set_readiness(Ready::empty()).unwrap();
+ set_readiness[i].set_readiness(Ready::writable()).unwrap();
+ set_readiness[i].set_readiness(Ready::readable() | Ready::writable()).unwrap();
+ set_readiness[i].set_readiness(Ready::empty()).unwrap();
+ }
+ }
+
+ for i in 0..NUM_REGISTRATIONS {
+ set_readiness[i].set_readiness(Ready::readable()).unwrap();
+ }
+
+ remaining.fetch_sub(1, Release);
+ });
+ }
+
+ while remaining.load(Acquire) > 0 {
+ // Set interest
+ for (i, &(ref r, _)) in registrations.iter().enumerate() {
+ r.reregister(&poll, Token(i), Ready::writable(), PollOpt::edge()).unwrap();
+ }
+
+ poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
+
+ for event in &events {
+ ready[event.token().0] = event.readiness();
+ }
+
+ // Update registration
+ // Set interest
+ for (i, &(ref r, _)) in registrations.iter().enumerate() {
+ r.reregister(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
+ }
+ }
+
+ // Finall polls, repeat until readiness-queue empty
+ loop {
+ // Might not read all events from custom-event-queue at once, implementation dependend
+ poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
+ if events.is_empty() {
+ // no more events in readiness queue pending
+ break;
+ }
+ for event in &events {
+ ready[event.token().0] = event.readiness();
+ }
+ }
+
+ // Everything should be flagged as readable
+ for ready in ready {
+ assert_eq!(ready, Ready::readable());
+ }
+ }
+ }
+
+ #[test]
+ fn multi_threaded_poll() {
+ use std::sync::{Arc, Barrier};
+ use std::sync::atomic::{AtomicUsize};
+ use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+ use std::thread;
+
+ const ENTRIES: usize = 10_000;
+ const PER_ENTRY: usize = 16;
+ const THREADS: usize = 4;
+ const NUM: usize = ENTRIES * PER_ENTRY;
+
+ struct Entry {
+ #[allow(dead_code)]
+ registration: Registration,
+ set_readiness: SetReadiness,
+ num: AtomicUsize,
+ }
+
+ impl Entry {
+ fn fire(&self) {
+ self.set_readiness.set_readiness(Ready::readable()).unwrap();
+ }
+ }
+
+ let poll = Arc::new(Poll::new().unwrap());
+ let mut entries = vec![];
+
+ // Create entries
+ for i in 0..ENTRIES {
+ let (registration, set_readiness) = Registration::new2();
+ registration.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
+
+ entries.push(Entry {
+ registration,
+ set_readiness,
+ num: AtomicUsize::new(0),
+ });
+ }
+
+ let total = Arc::new(AtomicUsize::new(0));
+ let entries = Arc::new(entries);
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let mut threads = vec![];
+
+ for th in 0..THREADS {
+ let poll = poll.clone();
+ let total = total.clone();
+ let entries = entries.clone();
+ let barrier = barrier.clone();
+
+ threads.push(thread::spawn(move || {
+ let mut events = Events::with_capacity(128);
+
+ barrier.wait();
+
+ // Prime all the registrations
+ let mut i = th;
+ while i < ENTRIES {
+ entries[i].fire();
+ i += THREADS;
+ }
+
+ let mut n = 0;
+
+
+ while total.load(SeqCst) < NUM {
+ // A poll timeout is necessary here because there may be more
+ // than one threads blocked in `poll` when the final wakeup
+ // notification arrives (and only notifies one thread).
+ n += poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+
+ let mut num_this_tick = 0;
+
+ for event in &events {
+ let e = &entries[event.token().0];
+
+ let mut num = e.num.load(Relaxed);
+
+ loop {
+ if num < PER_ENTRY {
+ let actual = e.num.compare_and_swap(num, num + 1, Relaxed);
+
+ if actual == num {
+ num_this_tick += 1;
+ e.fire();
+ break;
+ }
+
+ num = actual;
+ } else {
+ break;
+ }
+ }
+ }
+
+ total.fetch_add(num_this_tick, SeqCst);
+ }
+
+ n
+ }));
+ }
+
+ let _: Vec<_> = threads.into_iter()
+ .map(|th| th.join().unwrap())
+ .collect();
+
+ for entry in entries.iter() {
+ assert_eq!(PER_ENTRY, entry.num.load(Relaxed));
+ }
+ }
+
+ #[test]
+ fn with_small_events_collection() {
+ const N: usize = 8;
+ const ITER: usize = 1_000;
+
+ use std::sync::{Arc, Barrier};
+ use std::sync::atomic::AtomicBool;
+ use std::sync::atomic::Ordering::{Acquire, Release};
+ use std::thread;
+
+ let poll = Poll::new().unwrap();
+ let mut registrations = vec![];
+
+ let barrier = Arc::new(Barrier::new(N + 1));
+ let done = Arc::new(AtomicBool::new(false));
+
+ for i in 0..N {
+ let (registration, set_readiness) = Registration::new2();
+ poll.register(&registration, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
+
+ registrations.push(registration);
+
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ barrier.wait();
+
+ while !done.load(Acquire) {
+ set_readiness.set_readiness(Ready::readable()).unwrap();
+ }
+
+ // Set one last time
+ set_readiness.set_readiness(Ready::readable()).unwrap();
+ });
+ }
+
+ let mut events = Events::with_capacity(4);
+
+ barrier.wait();
+
+ for _ in 0..ITER {
+ poll.poll(&mut events, None).unwrap();
+ }
+
+ done.store(true, Release);
+
+ let mut final_ready = vec![false; N];
+
+
+ for _ in 0..5 {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ final_ready[event.token().0] = true;
+ }
+
+ if final_ready.iter().all(|v| *v) {
+ return;
+ }
+
+ thread::sleep(Duration::from_millis(10));
+ }
+
+ panic!("dead lock?");
+ }
+}
+
+#[test]
+fn drop_registration_from_non_main_thread() {
+ use std::thread;
+ use std::sync::mpsc::channel;
+
+ const THREADS: usize = 8;
+ const ITERS: usize = 50_000;
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut senders = Vec::with_capacity(THREADS);
+ let mut token_index = 0;
+
+ // spawn threads, which will send messages to single receiver
+ for _ in 0..THREADS {
+ let (tx, rx) = channel::<(Registration, SetReadiness)>();
+ senders.push(tx);
+
+ thread::spawn(move || {
+ for (registration, set_readiness) in rx {
+ let _ = set_readiness.set_readiness(Ready::readable());
+ drop(registration);
+ drop(set_readiness);
+ }
+ });
+ }
+
+ let mut index: usize = 0;
+ for _ in 0..ITERS {
+ let (registration, set_readiness) = Registration::new2();
+ registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
+ let _ = senders[index].send((registration, set_readiness));
+
+ token_index += 1;
+ index += 1;
+ if index == THREADS {
+ index = 0;
+
+ let (registration, set_readiness) = Registration::new2();
+ registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
+ let _ = set_readiness.set_readiness(Ready::readable());
+ drop(registration);
+ drop(set_readiness);
+ token_index += 1;
+
+ thread::park_timeout(Duration::from_millis(0));
+ let _ = poll.poll(&mut events, None).unwrap();
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_double_register.rs b/third_party/rust/mio/test/test_double_register.rs
new file mode 100644
index 0000000000..c3d011c81e
--- /dev/null
+++ b/third_party/rust/mio/test/test_double_register.rs
@@ -0,0 +1,17 @@
+//! A smoke test for windows compatibility
+
+#[test]
+#[cfg(any(target_os = "linux", target_os = "windows"))]
+pub fn test_double_register() {
+ use mio::*;
+ use mio::net::TcpListener;
+
+ let poll = Poll::new().unwrap();
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+ assert!(poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).is_err());
+}
diff --git a/third_party/rust/mio/test/test_echo_server.rs b/third_party/rust/mio/test/test_echo_server.rs
new file mode 100644
index 0000000000..e20ae98e5a
--- /dev/null
+++ b/third_party/rust/mio/test/test_echo_server.rs
@@ -0,0 +1,303 @@
+use {localhost, TryRead, TryWrite};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::net::{TcpListener, TcpStream};
+use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf};
+use slab::Slab;
+use std::io;
+
+const SERVER: Token = Token(10_000_000);
+const CLIENT: Token = Token(10_000_001);
+
+struct EchoConn {
+ sock: TcpStream,
+ buf: Option<ByteBuf>,
+ mut_buf: Option<MutByteBuf>,
+ token: Option<Token>,
+ interest: Ready
+}
+
+impl EchoConn {
+ fn new(sock: TcpStream) -> EchoConn {
+ EchoConn {
+ sock,
+ buf: None,
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token: None,
+ interest: Ready::empty(),
+ }
+ }
+
+ fn writable(&mut self, poll: &mut Poll) -> io::Result<()> {
+ let mut buf = self.buf.take().unwrap();
+
+ match self.sock.try_write_buf(&mut buf) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+
+ self.buf = Some(buf);
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we wrote {} bytes!", r);
+
+ self.mut_buf = Some(buf.flip());
+
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e),
+ }
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ poll.reregister(&self.sock, self.token.unwrap(), self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn readable(&mut self, poll: &mut Poll) -> io::Result<()> {
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CONN : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we read {} bytes!", r);
+
+ // prepare to provide this to writable
+ self.buf = Some(buf.flip());
+
+ self.interest.remove(Ready::readable());
+ self.interest.insert(Ready::writable());
+ }
+ Err(e) => {
+ debug!("not implemented; client err={:?}", e);
+ self.interest.remove(Ready::readable());
+ }
+
+ };
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ poll.reregister(&self.sock, self.token.unwrap(), self.interest,
+ PollOpt::edge())
+ }
+}
+
+struct EchoServer {
+ sock: TcpListener,
+ conns: Slab<EchoConn>
+}
+
+impl EchoServer {
+ fn accept(&mut self, poll: &mut Poll) -> io::Result<()> {
+ debug!("server accepting socket");
+
+ let sock = self.sock.accept().unwrap().0;
+ let conn = EchoConn::new(sock,);
+ let tok = self.conns.insert(conn);
+
+ // Register the connection
+ self.conns[tok].token = Some(Token(tok));
+ poll.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot())
+ .expect("could not register socket with event loop");
+
+ Ok(())
+ }
+
+ fn conn_readable(&mut self, poll: &mut Poll,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn readable; tok={:?}", tok);
+ self.conn(tok).readable(poll)
+ }
+
+ fn conn_writable(&mut self, poll: &mut Poll,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn writable; tok={:?}", tok);
+ self.conn(tok).writable(poll)
+ }
+
+ fn conn(&mut self, tok: Token) -> &mut EchoConn {
+ &mut self.conns[tok.into()]
+ }
+}
+
+struct EchoClient {
+ sock: TcpStream,
+ msgs: Vec<&'static str>,
+ tx: SliceBuf<'static>,
+ rx: SliceBuf<'static>,
+ mut_buf: Option<MutByteBuf>,
+ token: Token,
+ interest: Ready,
+ shutdown: bool,
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl EchoClient {
+ fn new(sock: TcpStream, token: Token, mut msgs: Vec<&'static str>) -> EchoClient {
+ let curr = msgs.remove(0);
+
+ EchoClient {
+ sock,
+ msgs,
+ tx: SliceBuf::wrap(curr.as_bytes()),
+ rx: SliceBuf::wrap(curr.as_bytes()),
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token,
+ interest: Ready::empty(),
+ shutdown: false,
+ }
+ }
+
+ fn readable(&mut self, poll: &mut Poll) -> io::Result<()> {
+ debug!("client socket readable");
+
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CLIENT : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : We read {} bytes!", r);
+
+ // prepare for reading
+ let mut buf = buf.flip();
+
+ while buf.has_remaining() {
+ let actual = buf.read_byte().unwrap();
+ let expect = self.rx.read_byte().unwrap();
+
+ assert!(actual == expect, "actual={}; expect={}", actual, expect);
+ }
+
+ self.mut_buf = Some(buf.flip());
+
+ self.interest.remove(Ready::readable());
+
+ if !self.rx.has_remaining() {
+ self.next_msg(poll).unwrap();
+ }
+ }
+ Err(e) => {
+ panic!("not implemented; client err={:?}", e);
+ }
+ };
+
+ if !self.interest.is_empty() {
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ poll.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot())?;
+ }
+
+ Ok(())
+ }
+
+ fn writable(&mut self, poll: &mut Poll) -> io::Result<()> {
+ debug!("client socket writable");
+
+ match self.sock.try_write_buf(&mut self.tx) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : we wrote {} bytes!", r);
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e)
+ }
+
+ if self.interest.is_readable() || self.interest.is_writable() {
+ try!(poll.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot()));
+ }
+
+ Ok(())
+ }
+
+ fn next_msg(&mut self, poll: &mut Poll) -> io::Result<()> {
+ if self.msgs.is_empty() {
+ self.shutdown = true;
+ return Ok(());
+ }
+
+ let curr = self.msgs.remove(0);
+
+ debug!("client prepping next message");
+ self.tx = SliceBuf::wrap(curr.as_bytes());
+ self.rx = SliceBuf::wrap(curr.as_bytes());
+
+ self.interest.insert(Ready::writable());
+ poll.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct Echo {
+ server: EchoServer,
+ client: EchoClient,
+}
+
+impl Echo {
+ fn new(srv: TcpListener, client: TcpStream, msgs: Vec<&'static str>) -> Echo {
+ Echo {
+ server: EchoServer {
+ sock: srv,
+ conns: Slab::with_capacity(128)
+ },
+ client: EchoClient::new(client, CLIENT, msgs)
+ }
+ }
+}
+
+#[test]
+pub fn test_echo_server() {
+ debug!("Starting TEST_ECHO_SERVER");
+ let mut poll = Poll::new().unwrap();
+
+ let addr = localhost();
+ let srv = TcpListener::bind(&addr).unwrap();
+
+ info!("listen for connections");
+ poll.register(&srv, SERVER, Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let sock = TcpStream::connect(&addr).unwrap();
+
+ // Connect to the server
+ poll.register(&sock, CLIENT, Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+ // == Create storage for events
+ let mut events = Events::with_capacity(1024);
+
+ let mut handler = Echo::new(srv, sock, vec!["foo", "bar"]);
+
+ // Start the event loop
+ while !handler.client.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ debug!("ready {:?} {:?}", event.token(), event.readiness());
+ if event.readiness().is_readable() {
+ match event.token() {
+ SERVER => handler.server.accept(&mut poll).unwrap(),
+ CLIENT => handler.client.readable(&mut poll).unwrap(),
+ i => handler.server.conn_readable(&mut poll, i).unwrap()
+ }
+ }
+
+ if event.readiness().is_writable() {
+ match event.token() {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => handler.client.writable(&mut poll).unwrap(),
+ i => handler.server.conn_writable(&mut poll, i).unwrap()
+ };
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_fuchsia_handles.rs b/third_party/rust/mio/test/test_fuchsia_handles.rs
new file mode 100644
index 0000000000..85a14327f9
--- /dev/null
+++ b/third_party/rust/mio/test/test_fuchsia_handles.rs
@@ -0,0 +1,30 @@
+use mio::*;
+use mio::fuchsia::EventedHandle;
+use zircon::{self, AsHandleRef};
+use std::time::Duration;
+
+const MS: u64 = 1_000;
+
+#[test]
+pub fn test_fuchsia_channel() {
+ let poll = Poll::new().unwrap();
+ let mut event_buffer = Events::with_capacity(1);
+ let event_buffer = &mut event_buffer;
+
+ let (channel0, channel1) = zircon::Channel::create(zircon::ChannelOpts::Normal).unwrap();
+ let channel1_evented = unsafe { EventedHandle::new(channel1.raw_handle()) };
+
+ poll.register(&channel1_evented, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+ assert_eq!(event_buffer.len(), 0);
+
+ channel0.write(&[1, 2, 3], &mut vec![], 0).unwrap();
+
+ poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+ let event = event_buffer.get(0).unwrap();
+ assert_eq!(event.token(), Token(1));
+ assert!(event.readiness().is_readable());
+
+ poll.deregister(&channel1_evented).unwrap();
+} \ No newline at end of file
diff --git a/third_party/rust/mio/test/test_local_addr_ready.rs b/third_party/rust/mio/test/test_local_addr_ready.rs
new file mode 100644
index 0000000000..2e97f52449
--- /dev/null
+++ b/third_party/rust/mio/test/test_local_addr_ready.rs
@@ -0,0 +1,67 @@
+use {TryWrite};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::net::{TcpListener, TcpStream};
+
+const LISTEN: Token = Token(0);
+const CLIENT: Token = Token(1);
+const SERVER: Token = Token(2);
+
+struct MyHandler {
+ listener: TcpListener,
+ connected: TcpStream,
+ accepted: Option<TcpStream>,
+ shutdown: bool,
+}
+
+#[test]
+fn local_addr_ready() {
+ let addr = "127.0.0.1:0".parse().unwrap();
+ let server = TcpListener::bind(&addr).unwrap();
+ let addr = server.local_addr().unwrap();
+
+ let poll = Poll::new().unwrap();
+ poll.register(&server, LISTEN, Ready::readable(),
+ PollOpt::edge()).unwrap();
+
+ let sock = TcpStream::connect(&addr).unwrap();
+ poll.register(&sock, CLIENT, Ready::readable(),
+ PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+
+ let mut handler = MyHandler {
+ listener: server,
+ connected: sock,
+ accepted: None,
+ shutdown: false,
+ };
+
+ while !handler.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ match event.token() {
+ LISTEN => {
+ let sock = handler.listener.accept().unwrap().0;
+ poll.register(&sock,
+ SERVER,
+ Ready::writable(),
+ PollOpt::edge()).unwrap();
+ handler.accepted = Some(sock);
+ }
+ SERVER => {
+ handler.accepted.as_ref().unwrap().peer_addr().unwrap();
+ handler.accepted.as_ref().unwrap().local_addr().unwrap();
+ handler.accepted.as_mut().unwrap().try_write(&[1, 2, 3]).unwrap();
+ handler.accepted = None;
+ }
+ CLIENT => {
+ handler.connected.peer_addr().unwrap();
+ handler.connected.local_addr().unwrap();
+ handler.shutdown = true;
+ }
+ _ => panic!("unexpected token"),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_multicast.rs b/third_party/rust/mio/test/test_multicast.rs
new file mode 100644
index 0000000000..b73e0d5c3a
--- /dev/null
+++ b/third_party/rust/mio/test/test_multicast.rs
@@ -0,0 +1,107 @@
+// TODO: This doesn't pass on android 64bit CI...
+// Figure out why!
+#![cfg(not(target_os = "android"))]
+
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::net::UdpSocket;
+use bytes::{Buf, MutBuf, RingBuf, SliceBuf};
+use std::str;
+use std::net::IpAddr;
+use localhost;
+
+const LISTENER: Token = Token(0);
+const SENDER: Token = Token(1);
+
+pub struct UdpHandler {
+ tx: UdpSocket,
+ rx: UdpSocket,
+ msg: &'static str,
+ buf: SliceBuf<'static>,
+ rx_buf: RingBuf,
+ localhost: IpAddr,
+ shutdown: bool,
+}
+
+impl UdpHandler {
+ fn new(tx: UdpSocket, rx: UdpSocket, msg: &'static str) -> UdpHandler {
+ let sock = UdpSocket::bind(&"127.0.0.1:12345".parse().unwrap()).unwrap();
+ UdpHandler {
+ tx,
+ rx,
+ msg,
+ buf: SliceBuf::wrap(msg.as_bytes()),
+ rx_buf: RingBuf::new(1024),
+ localhost: sock.local_addr().unwrap().ip(),
+ shutdown: false,
+ }
+ }
+
+ fn handle_read(&mut self, _: &mut Poll, token: Token, _: Ready) {
+ if let LISTENER = token {
+ debug!("We are receiving a datagram now...");
+ match unsafe { self.rx.recv_from(self.rx_buf.mut_bytes()) } {
+ Ok((cnt, addr)) => {
+ unsafe { MutBuf::advance(&mut self.rx_buf, cnt); }
+ assert_eq!(addr.ip(), self.localhost);
+ }
+ res => panic!("unexpected result: {:?}", res),
+ }
+ assert!(str::from_utf8(self.rx_buf.bytes()).unwrap() == self.msg);
+ self.shutdown = true;
+ }
+ }
+
+ fn handle_write(&mut self, _: &mut Poll, token: Token, _: Ready) {
+ if let SENDER = token {
+ let addr = self.rx.local_addr().unwrap();
+ let cnt = self.tx.send_to(self.buf.bytes(), &addr).unwrap();
+ self.buf.advance(cnt);
+ }
+ }
+}
+
+#[test]
+pub fn test_multicast() {
+ drop(::env_logger::init());
+ debug!("Starting TEST_UDP_CONNECTIONLESS");
+ let mut poll = Poll::new().unwrap();
+
+ let addr = localhost();
+ let any = "0.0.0.0:0".parse().unwrap();
+
+ let tx = UdpSocket::bind(&any).unwrap();
+ let rx = UdpSocket::bind(&addr).unwrap();
+
+ info!("Joining group 227.1.1.100");
+ let any = "0.0.0.0".parse().unwrap();
+ rx.join_multicast_v4(&"227.1.1.100".parse().unwrap(), &any).unwrap();
+
+ info!("Joining group 227.1.1.101");
+ rx.join_multicast_v4(&"227.1.1.101".parse().unwrap(), &any).unwrap();
+
+ info!("Registering SENDER");
+ poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
+
+ info!("Registering LISTENER");
+ poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+
+ let mut handler = UdpHandler::new(tx, rx, "hello world");
+
+ info!("Starting event loop to test with...");
+
+ while !handler.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.readiness().is_readable() {
+ handler.handle_read(&mut poll, event.token(), event.readiness());
+ }
+
+ if event.readiness().is_writable() {
+ handler.handle_write(&mut poll, event.token(), event.readiness());
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_notify.rs b/third_party/rust/mio/test/test_notify.rs
new file mode 100644
index 0000000000..a6a8e51f67
--- /dev/null
+++ b/third_party/rust/mio/test/test_notify.rs
@@ -0,0 +1,192 @@
+use {localhost, sleep_ms};
+use mio::*;
+use mio::deprecated::{EventLoop, EventLoopBuilder, Handler, Sender, NotifyError};
+use mio::net::TcpListener;
+use std::thread;
+
+struct TestHandler {
+ sender: Sender<String>,
+ notify: usize
+}
+
+impl TestHandler {
+ fn new(sender: Sender<String>) -> TestHandler {
+ TestHandler {
+ sender,
+ notify: 0
+ }
+ }
+}
+
+impl Handler for TestHandler {
+ type Timeout = usize;
+ type Message = String;
+
+ fn notify(&mut self, event_loop: &mut EventLoop<TestHandler>, msg: String) {
+ match self.notify {
+ 0 => {
+ assert!(msg == "First", "actual={}", msg);
+ self.sender.send("Second".to_string()).unwrap();
+ }
+ 1 => {
+ assert!(msg == "Second", "actual={}", msg);
+ event_loop.shutdown();
+ }
+ v => panic!("unexpected value for notify; val={}", v)
+ }
+
+ self.notify += 1;
+ }
+}
+
+#[test]
+pub fn test_notify() {
+ debug!("Starting TEST_NOTIFY");
+ let mut event_loop = EventLoop::new().unwrap();
+
+ let addr = localhost();
+
+ // Setup a server socket so that the event loop blocks
+ let srv = TcpListener::bind(&addr).unwrap();
+
+ event_loop.register(&srv, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let sender = event_loop.channel();
+
+ thread::spawn(move || {
+ sleep_ms(1_000);
+ sender.send("First".to_string()).unwrap();
+ });
+
+ let sender = event_loop.channel();
+ let mut handler = TestHandler::new(sender);
+
+ // Start the event loop
+ event_loop.run(&mut handler).unwrap();
+
+ assert!(handler.notify == 2, "actual={}", handler.notify);
+}
+
+#[test]
+pub fn test_notify_capacity() {
+ use std::sync::mpsc::*;
+ use std::thread;
+
+ struct Capacity(Receiver<i32>);
+
+ impl Handler for Capacity {
+ type Message = i32;
+ type Timeout = ();
+
+ fn notify(&mut self, event_loop: &mut EventLoop<Capacity>, msg: i32) {
+ if msg == 1 {
+ self.0.recv().unwrap();
+ } else if msg == 3 {
+ event_loop.shutdown();
+ }
+ }
+ }
+
+ let mut builder = EventLoopBuilder::new();
+ builder.notify_capacity(1);
+
+ let (tx, rx) = channel::<i32>();
+ let mut event_loop = builder.build().unwrap();
+ let notify = event_loop.channel();
+
+ let handle = thread::spawn(move || {
+ let mut handler = Capacity(rx);
+ event_loop.run(&mut handler).unwrap();
+ });
+
+ assert!(notify.send(1).is_ok());
+
+ loop {
+ if notify.send(2).is_err() {
+ break;
+ }
+ }
+
+ tx.send(1).unwrap();
+
+ loop {
+ if notify.send(3).is_ok() {
+ break;
+ }
+ }
+
+ handle.join().unwrap();
+}
+
+#[test]
+pub fn test_notify_drop() {
+ use std::sync::mpsc::{self,Sender};
+ use std::thread;
+
+ struct MessageDrop(Sender<u8>);
+
+ impl Drop for MessageDrop {
+ fn drop(&mut self) {
+ self.0.send(0).unwrap();
+ }
+ }
+
+ struct DummyHandler;
+
+ impl Handler for DummyHandler {
+ type Timeout = ();
+ type Message = MessageDrop;
+
+ fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: MessageDrop) {
+ msg.0.send(1).unwrap();
+ drop(msg);
+ // We stop after the first message
+ event_loop.shutdown();
+ }
+ }
+
+ let (tx_notif_1, rx_notif_1) = mpsc::channel();
+ let (tx_notif_2, rx_notif_2) = mpsc::channel();
+ let (tx_notif_3, _unused) = mpsc::channel();
+ let (tx_exit_loop, rx_exit_loop) = mpsc::channel();
+ let (tx_drop_loop, rx_drop_loop) = mpsc::channel();
+
+ let mut event_loop = EventLoop::new().unwrap();
+ let notify = event_loop.channel();
+
+ let handle = thread::spawn(move || {
+ let mut handler = DummyHandler;
+ event_loop.run(&mut handler).unwrap();
+
+ // Confirmation we exited the loop
+ tx_exit_loop.send(()).unwrap();
+
+ // Order to drop the loop
+ rx_drop_loop.recv().unwrap();
+ drop(event_loop);
+ });
+ notify.send(MessageDrop(tx_notif_1)).unwrap();
+ assert_eq!(rx_notif_1.recv().unwrap(), 1); // Response from the loop
+ assert_eq!(rx_notif_1.recv().unwrap(), 0); // Drop notification
+
+ // We wait for the event loop to exit before sending the second notification
+ rx_exit_loop.recv().unwrap();
+ notify.send(MessageDrop(tx_notif_2)).unwrap();
+
+ // We ensure the message is indeed stuck in the queue
+ sleep_ms(100);
+ assert!(rx_notif_2.try_recv().is_err());
+
+ // Give the order to drop the event loop
+ tx_drop_loop.send(()).unwrap();
+ assert_eq!(rx_notif_2.recv().unwrap(), 0); // Drop notification
+
+ // Check that sending a new notification will return an error
+ // We should also get our message back
+ match notify.send(MessageDrop(tx_notif_3)).unwrap_err() {
+ NotifyError::Closed(Some(..)) => {}
+ _ => panic!(),
+ }
+
+ handle.join().unwrap();
+}
diff --git a/third_party/rust/mio/test/test_oneshot.rs b/third_party/rust/mio/test/test_oneshot.rs
new file mode 100644
index 0000000000..4dca219b73
--- /dev/null
+++ b/third_party/rust/mio/test/test_oneshot.rs
@@ -0,0 +1,64 @@
+use mio::*;
+use mio::net::{TcpListener, TcpStream};
+use std::io::*;
+use std::time::Duration;
+
+const MS: u64 = 1_000;
+
+#[test]
+pub fn test_tcp_edge_oneshot() {
+ let _ = ::env_logger::init();
+
+ let mut poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::level()).unwrap();
+
+ // Connect a socket, we are going to write to it
+ let mut s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+ poll.register(&s1, Token(1), Ready::writable(), PollOpt::level()).unwrap();
+
+ wait_for(&mut poll, &mut events, Token(0));
+
+ // Get pair
+ let (mut s2, _) = l.accept().unwrap();
+ poll.register(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ wait_for(&mut poll, &mut events, Token(1));
+
+ let res = s1.write(b"foo").unwrap();
+ assert_eq!(3, res);
+
+ let mut buf = [0; 1];
+
+ for byte in b"foo" {
+ wait_for(&mut poll, &mut events, Token(2));
+
+ assert_eq!(1, s2.read(&mut buf).unwrap());
+ assert_eq!(*byte, buf[0]);
+
+ poll.reregister(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ if *byte == b'o' {
+ poll.reregister(&s2, Token(2), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+ }
+ }
+}
+
+fn wait_for(poll: &mut Poll, events: &mut Events, token: Token) {
+ loop {
+ poll.poll(events, Some(Duration::from_millis(MS))).unwrap();
+
+ let cnt = (0..events.len()).map(|i| events.get(i).unwrap())
+ .filter(|e| e.token() == token)
+ .count();
+
+ assert!(cnt < 2, "token appeared multiple times in poll results; cnt={:}", cnt);
+
+ if cnt == 1 { return };
+ }
+}
diff --git a/third_party/rust/mio/test/test_poll.rs b/third_party/rust/mio/test/test_poll.rs
new file mode 100644
index 0000000000..e259d89e24
--- /dev/null
+++ b/third_party/rust/mio/test/test_poll.rs
@@ -0,0 +1,18 @@
+use mio::*;
+use std::time::Duration;
+
+#[test]
+fn test_poll_closes_fd() {
+ for _ in 0..2000 {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(4);
+ let (registration, set_readiness) = Registration::new2();
+
+ poll.register(&registration, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+ poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
+
+ drop(poll);
+ drop(set_readiness);
+ drop(registration);
+ }
+}
diff --git a/third_party/rust/mio/test/test_poll_channel.rs b/third_party/rust/mio/test/test_poll_channel.rs
new file mode 100644
index 0000000000..f7ce050537
--- /dev/null
+++ b/third_party/rust/mio/test/test_poll_channel.rs
@@ -0,0 +1,285 @@
+use {expect_events, sleep_ms};
+use mio::{channel, Events, Poll, PollOpt, Ready, Token};
+use mio::event::Event;
+use std::sync::mpsc::TryRecvError;
+use std::thread;
+use std::time::Duration;
+
+#[test]
+pub fn test_poll_channel_edge() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(1, num);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Poll again and there should be no events
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Poll again, nothing
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push a value
+ tx.send("goodbye").unwrap();
+
+ // Have an event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(1, num);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Read the value
+ rx.try_recv().unwrap();
+
+ // Drop the sender half
+ drop(tx);
+
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(1, num);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ match rx.try_recv() {
+ Err(TryRecvError::Disconnected) => {}
+ no => panic!("unexpected value {:?}", no),
+ }
+
+}
+
+#[test]
+pub fn test_poll_channel_oneshot() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(1, num);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Poll again and there should be no events
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Poll again, nothing
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push a value
+ tx.send("goodbye").unwrap();
+
+ // Poll again, nothing
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Reregistering will re-trigger the notification
+ for _ in 0..3 {
+ poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Have an event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(1, num);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+ }
+
+ // Get the value
+ assert_eq!("goodbye", rx.try_recv().unwrap());
+
+ poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Have an event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Have an event
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_poll_channel_level() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ for i in 0..5 {
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert!(1 == num, "actually got {} on iteration {}", num, i);
+
+ let event = events.get(0).unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+ }
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_poll_channel_writable() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_dropping_receive_before_poll() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Drop the receive end
+ drop(rx);
+
+ // Wait, but nothing should happen
+ let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_mixing_channel_with_socket() {
+ use mio::net::{TcpListener, TcpStream};
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+ poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Push a value onto the channel
+ tx.send("hello").unwrap();
+
+ // Connect a TCP socket
+ let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+
+ // Register the socket
+ poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Sleep a bit to ensure it arrives at dest
+ sleep_ms(250);
+
+ expect_events(&poll, &mut events, 2, vec![
+ Event::new(Ready::empty(), Token(0)),
+ Event::new(Ready::empty(), Token(1)),
+ ]);
+}
+
+#[test]
+pub fn test_sending_from_other_thread_while_polling() {
+ const ITERATIONS: usize = 20;
+ const THREADS: usize = 5;
+
+ // Make sure to run multiple times
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+
+ for _ in 0..ITERATIONS {
+ let (tx, rx) = channel::channel();
+ poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+
+ for _ in 0..THREADS {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ sleep_ms(50);
+ tx.send("ping").unwrap();
+ });
+ }
+
+ let mut recv = 0;
+
+ while recv < THREADS {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ if num != 0 {
+ assert_eq!(1, num);
+ assert_eq!(events.get(0).unwrap().token(), Token(0));
+
+ while let Ok(_) = rx.try_recv() {
+ recv += 1;
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_register_deregister.rs b/third_party/rust/mio/test/test_register_deregister.rs
new file mode 100644
index 0000000000..ae84a029ec
--- /dev/null
+++ b/third_party/rust/mio/test/test_register_deregister.rs
@@ -0,0 +1,123 @@
+use {expect_events, localhost, TryWrite};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::event::Event;
+use mio::net::{TcpListener, TcpStream};
+use bytes::SliceBuf;
+use std::time::Duration;
+
+const SERVER: Token = Token(0);
+const CLIENT: Token = Token(1);
+
+struct TestHandler {
+ server: TcpListener,
+ client: TcpStream,
+ state: usize,
+}
+
+impl TestHandler {
+ fn new(srv: TcpListener, cli: TcpStream) -> TestHandler {
+ TestHandler {
+ server: srv,
+ client: cli,
+ state: 0,
+ }
+ }
+
+ fn handle_read(&mut self, poll: &mut Poll, token: Token) {
+ match token {
+ SERVER => {
+ trace!("handle_read; token=SERVER");
+ let mut sock = self.server.accept().unwrap().0;
+ sock.try_write_buf(&mut SliceBuf::wrap(b"foobar")).unwrap();
+ }
+ CLIENT => {
+ trace!("handle_read; token=CLIENT");
+ assert!(self.state == 0, "unexpected state {}", self.state);
+ self.state = 1;
+ poll.reregister(&self.client, CLIENT, Ready::writable(), PollOpt::level()).unwrap();
+ }
+ _ => panic!("unexpected token"),
+ }
+ }
+
+ fn handle_write(&mut self, poll: &mut Poll, token: Token) {
+ debug!("handle_write; token={:?}; state={:?}", token, self.state);
+
+ assert!(token == CLIENT, "unexpected token {:?}", token);
+ assert!(self.state == 1, "unexpected state {}", self.state);
+
+ self.state = 2;
+ poll.deregister(&self.client).unwrap();
+ poll.deregister(&self.server).unwrap();
+ }
+}
+
+#[test]
+pub fn test_register_deregister() {
+ let _ = ::env_logger::init();
+
+ debug!("Starting TEST_REGISTER_DEREGISTER");
+ let mut poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+
+ let addr = localhost();
+
+ let server = TcpListener::bind(&addr).unwrap();
+
+ info!("register server socket");
+ poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap();
+
+ let client = TcpStream::connect(&addr).unwrap();
+
+ // Register client socket only as writable
+ poll.register(&client, CLIENT, Ready::readable(), PollOpt::level()).unwrap();
+
+ let mut handler = TestHandler::new(server, client);
+
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ if let Some(event) = events.get(0) {
+ if event.readiness().is_readable() {
+ handler.handle_read(&mut poll, event.token());
+ }
+
+ if event.readiness().is_writable() {
+ handler.handle_write(&mut poll, event.token());
+ break;
+ }
+ }
+ }
+
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+ assert_eq!(events.len(), 0);
+}
+
+#[test]
+pub fn test_register_empty_interest() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let addr = localhost();
+
+ let sock = TcpListener::bind(&addr).unwrap();
+
+ poll.register(&sock, Token(0), Ready::empty(), PollOpt::edge()).unwrap();
+
+ let client = TcpStream::connect(&addr).unwrap();
+
+ // The connect is not guaranteed to have started until it is registered
+ // https://docs.rs/mio/0.6.10/mio/struct.Poll.html#registering-handles
+ poll.register(&client, Token(1), Ready::empty(), PollOpt::edge()).unwrap();
+
+ // sock is registered with empty interest, we should not receive any event
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+ assert_eq!(events.len(), 0, "Received unexpected event: {:?}", events.get(0).unwrap());
+
+ // now sock is reregistered with readable, we should receive the pending event
+ poll.reregister(&sock, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+ expect_events(&poll, &mut events, 2, vec![
+ Event::new(Ready::readable(), Token(0))
+ ]);
+
+ poll.reregister(&sock, Token(0), Ready::empty(), PollOpt::edge()).unwrap();
+}
diff --git a/third_party/rust/mio/test/test_register_multiple_event_loops.rs b/third_party/rust/mio/test/test_register_multiple_event_loops.rs
new file mode 100644
index 0000000000..9204afaf68
--- /dev/null
+++ b/third_party/rust/mio/test/test_register_multiple_event_loops.rs
@@ -0,0 +1,63 @@
+use localhost;
+use mio::*;
+use mio::net::{TcpListener, TcpStream, UdpSocket};
+use std::io::ErrorKind;
+
+#[test]
+fn test_tcp_register_multiple_event_loops() {
+ let addr = localhost();
+ let listener = TcpListener::bind(&addr).unwrap();
+
+ let poll1 = Poll::new().unwrap();
+ poll1.register(&listener, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let poll2 = Poll::new().unwrap();
+
+ // Try registering the same socket with the initial one
+ let res = poll2.register(&listener, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+
+ // Try cloning the socket and registering it again
+ let listener2 = listener.try_clone().unwrap();
+ let res = poll2.register(&listener2, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+
+ // Try the stream
+ let stream = TcpStream::connect(&addr).unwrap();
+
+ poll1.register(&stream, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let res = poll2.register(&stream, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+
+ // Try cloning the socket and registering it again
+ let stream2 = stream.try_clone().unwrap();
+ let res = poll2.register(&stream2, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+}
+
+#[test]
+fn test_udp_register_multiple_event_loops() {
+ let addr = localhost();
+ let socket = UdpSocket::bind(&addr).unwrap();
+
+ let poll1 = Poll::new().unwrap();
+ poll1.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let poll2 = Poll::new().unwrap();
+
+ // Try registering the same socket with the initial one
+ let res = poll2.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+
+ // Try cloning the socket and registering it again
+ let socket2 = socket.try_clone().unwrap();
+ let res = poll2.register(&socket2, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge());
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Other);
+}
diff --git a/third_party/rust/mio/test/test_reregister_without_poll.rs b/third_party/rust/mio/test/test_reregister_without_poll.rs
new file mode 100644
index 0000000000..45d5aca49c
--- /dev/null
+++ b/third_party/rust/mio/test/test_reregister_without_poll.rs
@@ -0,0 +1,28 @@
+use {sleep_ms};
+use mio::*;
+use mio::net::{TcpListener, TcpStream};
+use std::time::Duration;
+
+const MS: u64 = 1_000;
+
+#[test]
+pub fn test_reregister_different_without_poll() {
+ let mut events = Events::with_capacity(1024);
+ let poll = Poll::new().unwrap();
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+ poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
+
+ sleep_ms(MS);
+
+ poll.reregister(&l, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
+ assert_eq!(events.len(), 0);
+}
diff --git a/third_party/rust/mio/test/test_smoke.rs b/third_party/rust/mio/test/test_smoke.rs
new file mode 100644
index 0000000000..96f7d3c9e4
--- /dev/null
+++ b/third_party/rust/mio/test/test_smoke.rs
@@ -0,0 +1,23 @@
+extern crate mio;
+
+use mio::{Events, Poll, Token, Ready, PollOpt};
+use mio::net::TcpListener;
+use std::time::Duration;
+
+#[test]
+fn run_once_with_nothing() {
+ let mut events = Events::with_capacity(1024);
+ let poll = Poll::new().unwrap();
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+}
+
+#[test]
+fn add_then_drop() {
+ let mut events = Events::with_capacity(1024);
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let poll = Poll::new().unwrap();
+ poll.register(&l, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+ drop(l);
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+
+}
diff --git a/third_party/rust/mio/test/test_subprocess_pipe.rs b/third_party/rust/mio/test/test_subprocess_pipe.rs
new file mode 100644
index 0000000000..2bcf132486
--- /dev/null
+++ b/third_party/rust/mio/test/test_subprocess_pipe.rs
@@ -0,0 +1,249 @@
+use {TryRead, TryWrite};
+use std::mem;
+use mio::*;
+use std::io;
+use mio::deprecated::{EventLoop, Handler};
+use mio::deprecated::unix::{PipeReader, PipeWriter};
+use std::process::{Command, Stdio, Child};
+
+
+struct SubprocessClient {
+ stdin: Option<PipeWriter>,
+ stdout: Option<PipeReader>,
+ stderr: Option<PipeReader>,
+ stdin_token : Token,
+ stdout_token : Token,
+ stderr_token : Token,
+ output : Vec<u8>,
+ output_stderr : Vec<u8>,
+ input : Vec<u8>,
+ input_offset : usize,
+ buf : [u8; 65536],
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl SubprocessClient {
+ fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
+ SubprocessClient {
+ stdin: stdin,
+ stdout: stdout,
+ stderr: stderr,
+ stdin_token : Token(0),
+ stdout_token : Token(1),
+ stderr_token : Token(2),
+ output : Vec::<u8>::new(),
+ output_stderr : Vec::<u8>::new(),
+ buf : [0; 65536],
+ input : data.to_vec(),
+ input_offset : 0,
+ }
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
+ let mut eof = false;
+ match self.stdout {
+ None => unreachable!(),
+ Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
+ Ok(None) => {
+ }
+ Ok(Some(r)) => {
+ if r == 0 {
+ eof = true;
+ } else {
+ self.output.extend(&self.buf[0..r]);
+ }
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ };
+ if eof {
+ drop(self.stdout.take());
+ match self.stderr {
+ None => event_loop.shutdown(),
+ Some(_) => {},
+ }
+ }
+ return Ok(());
+ }
+
+ fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
+ let mut eof = false;
+ match self.stderr {
+ None => unreachable!(),
+ Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
+ Ok(None) => {
+ }
+ Ok(Some(r)) => {
+ if r == 0 {
+ eof = true;
+ } else {
+ self.output_stderr.extend(&self.buf[0..r]);
+ }
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ };
+ if eof {
+ drop(self.stderr.take());
+ match self.stdout {
+ None => event_loop.shutdown(),
+ Some(_) => {},
+ }
+ }
+ return Ok(());
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
+ let mut ok = true;
+ match self.stdin {
+ None => unreachable!(),
+ Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
+ Ok(None) => {
+ },
+ Ok(Some(r)) => {
+ if r == 0 {
+ ok = false;
+ } else {
+ self.input_offset += r;
+ }
+ },
+ Err(_) => {
+ ok = false;
+ },
+ }
+ }
+ if self.input_offset == self.input.len() || !ok {
+ drop(self.stdin.take());
+ match self.stderr {
+ None => match self.stdout {
+ None => event_loop.shutdown(),
+ Some(_) => {},
+ },
+ Some(_) => {},
+ }
+ }
+ return Ok(());
+ }
+
+}
+
+impl Handler for SubprocessClient {
+ type Timeout = usize;
+ type Message = ();
+
+ fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
+ _: Ready) {
+ if token == self.stderr_token {
+ let _x = self.readable_stderr(event_loop);
+ } else {
+ let _x = self.readable(event_loop);
+ }
+ if token == self.stdin_token {
+ let _y = self.writable(event_loop);
+ }
+ }
+}
+
+
+
+
+const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
+pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
+ let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
+ let stdin : Option<PipeWriter>;
+ let stdin_exists : bool;
+ match process.stdin {
+ None => stdin_exists = false,
+ Some(_) => stdin_exists = true,
+ }
+ if stdin_exists {
+ match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
+ Err(e) => panic!(e),
+ Ok(pipe) => stdin = Some(pipe),
+ }
+ } else {
+ stdin = None;
+ }
+ let stdout_exists : bool;
+ let stdout : Option<PipeReader>;
+ match process.stdout {
+ None => stdout_exists = false,
+ Some(_) => stdout_exists = true,
+ }
+ if stdout_exists {
+ match PipeReader::from_stdout(process.stdout.take().unwrap()) {
+ Err(e) => panic!(e),
+ Ok(pipe) => stdout = Some(pipe),
+ }
+ } else {
+ stdout = None;
+ }
+ let stderr_exists : bool;
+ let stderr : Option<PipeReader>;
+ match process.stderr {
+ None => stderr_exists = false,
+ Some(_) => stderr_exists = true,
+ }
+ if stderr_exists {
+ match PipeReader::from_stderr(process.stderr.take().unwrap()) {
+ Err(e) => panic!(e),
+ Ok(pipe) => stderr = Some(pipe),
+ }
+ } else {
+ stderr = None
+ }
+
+ let mut subprocess = SubprocessClient::new(stdin,
+ stdout,
+ stderr,
+ input);
+ match subprocess.stdout {
+ Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
+ PollOpt::level()).unwrap(),
+ None => {},
+ }
+
+ match subprocess.stderr {
+ Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
+ PollOpt::level()).unwrap(),
+ None => {},
+ }
+
+ // Connect to the server
+ match subprocess.stdin {
+ Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
+ PollOpt::level()).unwrap(),
+ None => {},
+ }
+
+ // Start the event loop
+ event_loop.run(&mut subprocess).unwrap();
+ let _ = process.wait();
+
+ let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
+ let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
+ return (ret_stdout, ret_stderr);
+}
+
+#[test]
+fn test_subprocess_pipe() {
+ let process =
+ Command::new("/bin/cat")
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn().unwrap();
+ let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
+ assert_eq!(TEST_DATA.len(), ret_stdout.len());
+ assert_eq!(0usize, ret_stderr.len());
+ let mut i : usize = 0;
+ for item in TEST_DATA.iter() {
+ assert_eq!(*item, ret_stdout[i]);
+ i += 1;
+ }
+}
diff --git a/third_party/rust/mio/test/test_tcp.rs b/third_party/rust/mio/test/test_tcp.rs
new file mode 100644
index 0000000000..ae569ac5e8
--- /dev/null
+++ b/third_party/rust/mio/test/test_tcp.rs
@@ -0,0 +1,660 @@
+use std::cmp;
+use std::io::prelude::*;
+use std::io;
+use std::net;
+use std::sync::mpsc::channel;
+use std::thread;
+use std::time::Duration;
+
+use net2::{self, TcpStreamExt};
+
+use {TryRead, TryWrite};
+use mio::{Token, Ready, PollOpt, Poll, Events};
+use iovec::IoVec;
+use mio::net::{TcpListener, TcpStream};
+
+#[test]
+fn accept() {
+ struct H { hit: bool, listener: TcpListener, shutdown: bool }
+
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ net::TcpStream::connect(&addr).unwrap();
+ });
+
+ let poll = Poll::new().unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { hit: false, listener: l, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ h.hit = true;
+ assert_eq!(event.token(), Token(1));
+ assert!(event.readiness().is_readable());
+ assert!(h.listener.accept().is_ok());
+ h.shutdown = true;
+ }
+ }
+ assert!(h.hit);
+ assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
+ t.join().unwrap();
+}
+
+#[test]
+fn connect() {
+ struct H { hit: u32, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let (tx, rx) = channel();
+ let (tx2, rx2) = channel();
+ let t = thread::spawn(move || {
+ let s = l.accept().unwrap();
+ rx.recv().unwrap();
+ drop(s);
+ tx2.send(()).unwrap();
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { hit: 0, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ match h.hit {
+ 0 => assert!(event.readiness().is_writable()),
+ 1 => assert!(event.readiness().is_readable()),
+ _ => panic!(),
+ }
+ h.hit += 1;
+ h.shutdown = true;
+ }
+ }
+ assert_eq!(h.hit, 1);
+ tx.send(()).unwrap();
+ rx2.recv().unwrap();
+ h.shutdown = false;
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ match h.hit {
+ 0 => assert!(event.readiness().is_writable()),
+ 1 => assert!(event.readiness().is_readable()),
+ _ => panic!(),
+ }
+ h.hit += 1;
+ h.shutdown = true;
+ }
+ }
+ assert_eq!(h.hit, 2);
+ t.join().unwrap();
+}
+
+#[test]
+fn read() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let mut b = [0; 1024];
+ loop {
+ if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn peek() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let mut b = [0; 1024];
+ match h.socket.peek(&mut b) {
+ Ok(_) => (),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue
+ },
+ Err(e) => panic!("unexpected error: {:?}", e),
+ }
+
+ loop {
+ if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn read_bufs() {
+ const N: usize = 16 * 1024 * 1024;
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [1; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap();
+
+ let b1 = &mut [0; 10][..];
+ let b2 = &mut [0; 383][..];
+ let b3 = &mut [0; 28][..];
+ let b4 = &mut [0; 8][..];
+ let b5 = &mut [0; 128][..];
+ let mut b: [&mut IoVec; 5] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ ];
+
+ let mut so_far = 0;
+ loop {
+ for buf in b.iter_mut() {
+ for byte in buf.as_mut_bytes() {
+ *byte = 0;
+ }
+ }
+
+ poll.poll(&mut events, None).unwrap();
+
+ match s.read_bufs(&mut b) {
+ Ok(0) => {
+ assert_eq!(so_far, N);
+ break
+ }
+ Ok(mut n) => {
+ so_far += n;
+ for buf in b.iter() {
+ let buf = buf.as_bytes();
+ for byte in buf[..cmp::min(n, buf.len())].iter() {
+ assert_eq!(*byte, 1);
+ }
+ n = n.saturating_sub(buf.len());
+ if n == 0 {
+ break
+ }
+ }
+ assert_eq!(n, 0);
+ }
+ Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
+ }
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn write() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.read(&mut b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let b = [0; 1024];
+ loop {
+ if let Some(amt) = h.socket.try_write(&b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn write_bufs() {
+ const N: usize = 16 * 1024 * 1024;
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ for byte in b.iter_mut() {
+ *byte = 0;
+ }
+ let n = s.read(&mut b).unwrap();
+ amt += n;
+ for byte in b[..n].iter() {
+ assert_eq!(*byte, 1);
+ }
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+ let s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
+
+ let b1 = &[1; 10][..];
+ let b2 = &[1; 383][..];
+ let b3 = &[1; 28][..];
+ let b4 = &[1; 8][..];
+ let b5 = &[1; 128][..];
+ let b: [&IoVec; 5] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ ];
+
+ let mut so_far = 0;
+ while so_far < N {
+ poll.poll(&mut events, None).unwrap();
+
+ match s.write_bufs(&b) {
+ Ok(n) => so_far += n,
+ Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
+ }
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn connect_then_close() {
+ struct H { listener: TcpListener, shutdown: bool }
+
+ let poll = Poll::new().unwrap();
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+ poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { listener: l, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(1) {
+ let s = h.listener.accept().unwrap().0;
+ poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
+ PollOpt::edge()).unwrap();
+ drop(s);
+ } else if event.token() == Token(2) {
+ h.shutdown = true;
+ }
+ }
+ }
+}
+
+#[test]
+fn listen_then_close() {
+ let poll = Poll::new().unwrap();
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+ drop(l);
+
+ let mut events = Events::with_capacity(128);
+
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+
+ for event in &events {
+ if event.token() == Token(1) {
+ panic!("recieved ready() on a closed TcpListener")
+ }
+ }
+}
+
+fn assert_send<T: Send>() {
+}
+
+fn assert_sync<T: Sync>() {
+}
+
+#[test]
+fn test_tcp_sockets_are_send() {
+ assert_send::<TcpListener>();
+ assert_send::<TcpStream>();
+ assert_sync::<TcpListener>();
+ assert_sync::<TcpStream>();
+}
+
+#[test]
+fn bind_twice_bad() {
+ let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l1.local_addr().unwrap();
+ assert!(TcpListener::bind(&addr).is_err());
+}
+
+#[test]
+fn multiple_writes_immediate_success() {
+ const N: usize = 16;
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < 1024*N {
+ for byte in b.iter_mut() {
+ *byte = 0;
+ }
+ let n = s.read(&mut b).unwrap();
+ amt += n;
+ for byte in b[..n].iter() {
+ assert_eq!(*byte, 1);
+ }
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
+ let mut events = Events::with_capacity(16);
+
+ // Wait for our TCP stream to connect
+ 'outer: loop {
+ poll.poll(&mut events, None).unwrap();
+ for event in events.iter() {
+ if event.token() == Token(1) && event.readiness().is_writable() {
+ break 'outer
+ }
+ }
+ }
+
+ for _ in 0..N {
+ s.write_all(&[1; 1024]).unwrap();
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn connection_reset_by_peer() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+ let mut buf = [0u8; 16];
+
+ // Create listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l.local_addr().unwrap();
+
+ // Connect client
+ let client = net2::TcpBuilder::new_v4().unwrap()
+ .to_tcp_stream().unwrap();
+
+ client.set_linger(Some(Duration::from_millis(0))).unwrap();
+ client.connect(&addr).unwrap();
+
+ // Convert to Mio stream
+ let client = TcpStream::from_stream(client).unwrap();
+
+ // Register server
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Register interest in the client
+ poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ // Wait for listener to be ready
+ let mut server;
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) {
+ match l.accept() {
+ Ok((sock, _)) => {
+ server = sock;
+ break 'outer;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ Err(e) => panic!("unexpected error {:?}", e),
+ }
+ }
+ }
+ }
+
+ // Close the connection
+ drop(client);
+
+ // Wait a moment
+ thread::sleep(Duration::from_millis(100));
+
+ // Register interest in the server socket
+ poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap();
+
+
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(3) {
+ assert!(event.readiness().is_readable());
+
+ match server.read(&mut buf) {
+ Ok(0) |
+ Err(_) => {},
+
+ Ok(x) => panic!("expected empty buffer but read {} bytes", x),
+ }
+ return;
+ }
+ }
+ }
+
+}
+
+#[test]
+#[cfg_attr(target_os = "fuchsia", ignore)]
+fn connect_error() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+
+ // Pick a "random" port that shouldn't be in use.
+ let l = match TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()) {
+ Ok(l) => l,
+ Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => {
+ // Connection failed synchronously. This is not a bug, but it
+ // unfortunately doesn't get us the code coverage we want.
+ return;
+ },
+ Err(e) => panic!("TcpStream::connect unexpected error {:?}", e)
+ };
+
+ poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
+
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) {
+ assert!(event.readiness().is_writable());
+ break 'outer
+ }
+ }
+ }
+
+ assert!(l.take_error().unwrap().is_some());
+}
+
+#[test]
+fn write_error() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+ let (tx, rx) = channel();
+
+ let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = listener.local_addr().unwrap();
+ let t = thread::spawn(move || {
+ let (conn, _addr) = listener.accept().unwrap();
+ rx.recv().unwrap();
+ drop(conn);
+ });
+
+ let mut s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s,
+ Token(0),
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge()).unwrap();
+
+ let mut wait_writable = || {
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) && event.readiness().is_writable() {
+ break 'outer
+ }
+ }
+ }
+ };
+
+ wait_writable();
+
+ tx.send(()).unwrap();
+ t.join().unwrap();
+
+ let buf = [0; 1024];
+ loop {
+ match s.write(&buf) {
+ Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ wait_writable()
+ }
+ Err(e) => {
+ println!("good error: {}", e);
+ break
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_tcp_level.rs b/third_party/rust/mio/test/test_tcp_level.rs
new file mode 100644
index 0000000000..c384caac53
--- /dev/null
+++ b/third_party/rust/mio/test/test_tcp_level.rs
@@ -0,0 +1,142 @@
+use {expect_events, sleep_ms, TryRead};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::event::Event;
+use mio::net::{TcpListener, TcpStream};
+use std::io::Write;
+use std::time::Duration;
+
+const MS: u64 = 1_000;
+
+#[test]
+pub fn test_tcp_listener_level_triggered() {
+ let poll = Poll::new().unwrap();
+ let mut pevents = Events::with_capacity(1024);
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::level()).unwrap();
+
+ let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+ poll.register(&s1, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ while filter(&pevents, Token(0)).is_empty() {
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ }
+ let events = filter(&pevents, Token(0));
+
+ assert_eq!(events.len(), 1);
+ assert_eq!(events[0], Event::new(Ready::readable(), Token(0)));
+
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ let events = filter(&pevents, Token(0));
+ assert_eq!(events.len(), 1);
+ assert_eq!(events[0], Event::new(Ready::readable(), Token(0)));
+
+ // Accept the connection then test that the events stop
+ let _ = l.accept().unwrap();
+
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ let events = filter(&pevents, Token(0));
+ assert!(events.is_empty(), "actual={:?}", events);
+
+ let s3 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+ poll.register(&s3, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
+
+ while filter(&pevents, Token(0)).is_empty() {
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ }
+ let events = filter(&pevents, Token(0));
+
+ assert_eq!(events.len(), 1);
+ assert_eq!(events[0], Event::new(Ready::readable(), Token(0)));
+
+ drop(l);
+
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ let events = filter(&pevents, Token(0));
+ assert!(events.is_empty());
+}
+
+#[test]
+pub fn test_tcp_stream_level_triggered() {
+ drop(::env_logger::init());
+ let poll = Poll::new().unwrap();
+ let mut pevents = Events::with_capacity(1024);
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+ poll.register(&s1, Token(1), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap();
+
+ // Sleep a bit to ensure it arrives at dest
+ sleep_ms(250);
+
+ expect_events(&poll, &mut pevents, 2, vec![
+ Event::new(Ready::readable(), Token(0)),
+ Event::new(Ready::writable(), Token(1)),
+ ]);
+
+ // Server side of socket
+ let (mut s1_tx, _) = l.accept().unwrap();
+
+ // Sleep a bit to ensure it arrives at dest
+ sleep_ms(250);
+
+ expect_events(&poll, &mut pevents, 2, vec![
+ Event::new(Ready::writable(), Token(1))
+ ]);
+
+ // Register the socket
+ poll.register(&s1_tx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
+
+ debug!("writing some data ----------");
+
+ // Write some data
+ let res = s1_tx.write(b"hello world!");
+ assert!(res.unwrap() > 0);
+
+ // Sleep a bit to ensure it arrives at dest
+ sleep_ms(250);
+
+ debug!("looking at rx end ----------");
+
+ // Poll rx end
+ expect_events(&poll, &mut pevents, 2, vec![
+ Event::new(Ready::readable(), Token(1))
+ ]);
+
+ debug!("reading ----------");
+
+ // Reading the data should clear it
+ let mut res = vec![];
+ while s1.try_read_buf(&mut res).unwrap().is_some() {
+ }
+
+ assert_eq!(res, b"hello world!");
+
+ debug!("checking just read ----------");
+
+ expect_events(&poll, &mut pevents, 1, vec![
+ Event::new(Ready::writable(), Token(1))]);
+
+ // Closing the socket clears all active level events
+ drop(s1);
+
+ debug!("checking everything is gone ----------");
+
+ poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
+ let events = filter(&pevents, Token(1));
+ assert!(events.is_empty());
+}
+
+fn filter(events: &Events, token: Token) -> Vec<Event> {
+ (0..events.len()).map(|i| events.get(i).unwrap())
+ .filter(|e| e.token() == token)
+ .collect()
+}
diff --git a/third_party/rust/mio/test/test_tcp_shutdown.rs b/third_party/rust/mio/test/test_tcp_shutdown.rs
new file mode 100644
index 0000000000..9363f83401
--- /dev/null
+++ b/third_party/rust/mio/test/test_tcp_shutdown.rs
@@ -0,0 +1,248 @@
+use std::collections::HashMap;
+use std::net::{self, Shutdown};
+use std::time::{Duration, Instant};
+
+use mio::{Token, Ready, PollOpt, Poll, Events};
+use mio::event::{Evented, Event};
+use mio::net::TcpStream;
+
+struct TestPoll {
+ poll: Poll,
+ events: Events,
+ buf: HashMap<Token, Ready>,
+}
+
+impl TestPoll {
+ fn new() -> TestPoll {
+ TestPoll {
+ poll: Poll::new().unwrap(),
+ events: Events::with_capacity(1024),
+ buf: HashMap::new(),
+ }
+ }
+
+ fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt)
+ where E: Evented
+ {
+ self.poll.register(handle, token, interest, opts).unwrap();
+ }
+
+ fn wait_for(&mut self, token: Token, ready: Ready) -> Result<(), &'static str> {
+ let now = Instant::now();
+
+ loop {
+ if now.elapsed() > Duration::from_secs(1) {
+ return Err("not ready");
+ }
+
+ if let Some(curr) = self.buf.get(&token) {
+ if curr.contains(ready) {
+ break;
+ }
+ }
+
+ self.poll.poll(&mut self.events, Some(Duration::from_millis(250))).unwrap();
+
+ for event in &self.events {
+ let curr = self.buf.entry(event.token())
+ .or_insert(Ready::empty());
+
+ *curr |= event.readiness();
+ }
+ }
+
+ *self.buf.get_mut(&token).unwrap() -= ready;
+ Ok(())
+ }
+
+ fn check_idle(&mut self) -> Result<(), Event> {
+ self.poll.poll(&mut self.events, Some(Duration::from_millis(100))).unwrap();
+
+ if let Some(e) = self.events.iter().next() {
+ Err(e)
+ } else {
+ Ok(())
+ }
+ }
+}
+
+macro_rules! assert_ready {
+ ($poll:expr, $token:expr, $ready:expr) => {{
+ match $poll.wait_for($token, $ready) {
+ Ok(_) => {}
+ Err(_) => panic!("not ready; token = {:?}; interest = {:?}", $token, $ready),
+ }
+ }}
+}
+
+macro_rules! assert_not_ready {
+ ($poll:expr, $token:expr, $ready:expr) => {{
+ match $poll.wait_for($token, $ready) {
+ Ok(_) => panic!("is ready; token = {:?}; interest = {:?}", $token, $ready),
+ Err(_) => {}
+ }
+ }}
+}
+
+macro_rules! assert_hup_ready {
+ ($poll:expr) => {
+ #[cfg(unix)]
+ {
+ use mio::unix::UnixReady;
+ assert_ready!($poll, Token(0), Ready::from(UnixReady::hup()))
+ }
+ }
+}
+
+macro_rules! assert_not_hup_ready {
+ ($poll:expr) => {
+ #[cfg(unix)]
+ {
+ use mio::unix::UnixReady;
+ assert_not_ready!($poll, Token(0), Ready::from(UnixReady::hup()))
+ }
+ }
+}
+
+macro_rules! assert_idle {
+ ($poll:expr) => {
+ match $poll.check_idle() {
+ Ok(()) => {}
+ Err(e) => panic!("not idle; event = {:?}", e),
+ }
+ }
+}
+
+// TODO: replace w/ assertive
+// https://github.com/carllerche/assertive
+macro_rules! assert_ok {
+ ($e:expr) => {
+ assert_ok!($e,)
+ };
+ ($e:expr,) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => v,
+ Err(e) => panic!("assertion failed: error = {:?}", e),
+ }
+ }};
+ ($e:expr, $($arg:tt)+) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => v,
+ Err(e) => panic!("assertion failed: error = {:?}: {}", e, format_args!($($arg)+)),
+ }
+ }};
+}
+
+#[test]
+fn test_write_shutdown() {
+ use std::io::prelude::*;
+
+ let mut poll = TestPoll::new();
+ let mut buf = [0; 1024];
+
+ let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
+ let addr = assert_ok!(listener.local_addr());
+
+ let mut client = assert_ok!(TcpStream::connect(&addr));
+ poll.register(&client,
+ Token(0),
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge());
+
+ let (socket, _) = assert_ok!(listener.accept());
+
+ assert_ready!(poll, Token(0), Ready::writable());
+
+ // Polling should not have any events
+ assert_idle!(poll);
+
+ // Now, shutdown the write half of the socket.
+ assert_ok!(socket.shutdown(Shutdown::Write));
+
+ assert_ready!(poll, Token(0), Ready::readable());
+
+ assert_not_hup_ready!(poll);
+
+ let n = assert_ok!(client.read(&mut buf));
+ assert_eq!(n, 0);
+}
+
+#[test]
+fn test_graceful_shutdown() {
+ use std::io::prelude::*;
+
+ let mut poll = TestPoll::new();
+ let mut buf = [0; 1024];
+
+ let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
+ let addr = assert_ok!(listener.local_addr());
+
+ let mut client = assert_ok!(TcpStream::connect(&addr));
+ poll.register(&client,
+ Token(0),
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge());
+
+ let (mut socket, _) = assert_ok!(listener.accept());
+
+ assert_ready!(poll, Token(0), Ready::writable());
+
+ // Polling should not have any events
+ assert_idle!(poll);
+
+ // Now, shutdown the write half of the socket.
+ assert_ok!(client.shutdown(Shutdown::Write));
+
+ let n = assert_ok!(socket.read(&mut buf));
+ assert_eq!(0, n);
+ drop(socket);
+
+ assert_ready!(poll, Token(0), Ready::readable());
+ #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos",
+ target_os = "netbsd", target_os = "openbsd")))]
+ assert_hup_ready!(poll);
+
+ let mut buf = [0; 1024];
+ let n = assert_ok!(client.read(&mut buf));
+ assert_eq!(n, 0);
+}
+
+#[test]
+fn test_abrupt_shutdown() {
+ use net2::TcpStreamExt;
+ use std::io::Read;
+
+ let mut poll = TestPoll::new();
+ let mut buf = [0; 1024];
+
+ let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
+ let addr = assert_ok!(listener.local_addr());
+
+ let mut client = assert_ok!(TcpStream::connect(&addr));
+ poll.register(&client,
+ Token(0),
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge());
+
+ let (socket, _) = assert_ok!(listener.accept());
+ assert_ok!(socket.set_linger(Some(Duration::from_millis(0))));
+ // assert_ok!(socket.set_linger(None));
+
+ // Wait to be connected
+ assert_ready!(poll, Token(0), Ready::writable());
+
+ drop(socket);
+
+ #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos",
+ target_os = "netbsd", target_os = "openbsd")))]
+ assert_hup_ready!(poll);
+ assert_ready!(poll, Token(0), Ready::writable());
+ assert_ready!(poll, Token(0), Ready::readable());
+
+ let res = client.read(&mut buf);
+ assert!(res.is_err(), "not err = {:?}", res);
+}
diff --git a/third_party/rust/mio/test/test_tick.rs b/third_party/rust/mio/test/test_tick.rs
new file mode 100644
index 0000000000..8c4cec9b25
--- /dev/null
+++ b/third_party/rust/mio/test/test_tick.rs
@@ -0,0 +1,64 @@
+use mio::*;
+use mio::deprecated::{EventLoop, Handler};
+use mio::net::{TcpListener, TcpStream};
+use {sleep_ms};
+
+struct TestHandler {
+ tick: usize,
+ state: usize,
+}
+
+impl TestHandler {
+ fn new() -> TestHandler {
+ TestHandler {
+ tick: 0,
+ state: 0,
+ }
+ }
+}
+
+impl Handler for TestHandler {
+ type Timeout = usize;
+ type Message = String;
+
+ fn tick(&mut self, _event_loop: &mut EventLoop<TestHandler>) {
+ debug!("Handler::tick()");
+ self.tick += 1;
+
+ assert_eq!(self.state, 1);
+ self.state = 0;
+ }
+
+ fn ready(&mut self, _event_loop: &mut EventLoop<TestHandler>, token: Token, events: Ready) {
+ debug!("READY: {:?} - {:?}", token, events);
+ if events.is_readable() {
+ debug!("Handler::ready() readable event");
+ assert_eq!(token, Token(0));
+ assert_eq!(self.state, 0);
+ self.state = 1;
+ }
+ }
+}
+
+#[test]
+pub fn test_tick() {
+ debug!("Starting TEST_TICK");
+ let mut event_loop = EventLoop::new().expect("Couldn't make event loop");
+
+ let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ event_loop.register(&listener, Token(0), Ready::readable(), PollOpt::level()).unwrap();
+
+ let client = TcpStream::connect(&listener.local_addr().unwrap()).unwrap();
+ event_loop.register(&client, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ sleep_ms(250);
+
+ let mut handler = TestHandler::new();
+
+ for _ in 0..2 {
+ event_loop.run_once(&mut handler, None).unwrap();
+ }
+
+ assert!(handler.tick == 2, "actual={}", handler.tick);
+ assert!(handler.state == 0, "actual={}", handler.state);
+}
diff --git a/third_party/rust/mio/test/test_udp_level.rs b/third_party/rust/mio/test/test_udp_level.rs
new file mode 100644
index 0000000000..7e19d54b3e
--- /dev/null
+++ b/third_party/rust/mio/test/test_udp_level.rs
@@ -0,0 +1,52 @@
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::event::Event;
+use mio::net::UdpSocket;
+use {expect_events, sleep_ms};
+
+#[test]
+pub fn test_udp_level_triggered() {
+ let poll = Poll::new().unwrap();
+ let poll = &poll;
+ let mut events = Events::with_capacity(1024);
+ let events = &mut events;
+
+ // Create the listener
+ let tx = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let rx = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ poll.register(&tx, Token(0), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap();
+ poll.register(&rx, Token(1), Ready::readable() | Ready::writable(), PollOpt::level()).unwrap();
+
+
+ for _ in 0..2 {
+ expect_events(poll, events, 2, vec![
+ Event::new(Ready::writable(), Token(0)),
+ Event::new(Ready::writable(), Token(1)),
+ ]);
+ }
+
+ tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap();
+
+ sleep_ms(250);
+
+ for _ in 0..2 {
+ expect_events(poll, events, 2, vec![
+ Event::new(Ready::readable() | Ready::writable(), Token(1))
+ ]);
+ }
+
+ let mut buf = [0; 200];
+ while rx.recv_from(&mut buf).is_ok() {}
+
+ for _ in 0..2 {
+ expect_events(poll, events, 4, vec![Event::new(Ready::writable(), Token(1))]);
+ }
+
+ tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap();
+ sleep_ms(250);
+
+ expect_events(poll, events, 10,
+ vec![Event::new(Ready::readable() | Ready::writable(), Token(1))]);
+
+ drop(rx);
+}
diff --git a/third_party/rust/mio/test/test_udp_socket.rs b/third_party/rust/mio/test/test_udp_socket.rs
new file mode 100644
index 0000000000..c3311592ac
--- /dev/null
+++ b/third_party/rust/mio/test/test_udp_socket.rs
@@ -0,0 +1,252 @@
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::net::UdpSocket;
+use bytes::{Buf, RingBuf, SliceBuf, MutBuf};
+use std::io::ErrorKind;
+use std::str;
+use std::time;
+use localhost;
+use iovec::IoVec;
+
+const LISTENER: Token = Token(0);
+const SENDER: Token = Token(1);
+
+pub struct UdpHandlerSendRecv {
+ tx: UdpSocket,
+ rx: UdpSocket,
+ msg: &'static str,
+ buf: SliceBuf<'static>,
+ rx_buf: RingBuf,
+ connected: bool,
+ shutdown: bool,
+}
+
+impl UdpHandlerSendRecv {
+ fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv {
+ UdpHandlerSendRecv {
+ tx,
+ rx,
+ msg,
+ buf: SliceBuf::wrap(msg.as_bytes()),
+ rx_buf: RingBuf::new(1024),
+ connected,
+ shutdown: false,
+ }
+ }
+}
+
+fn assert_send<T: Send>() {
+}
+
+fn assert_sync<T: Sync>() {
+}
+
+#[cfg(test)]
+fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) {
+ debug!("Starting TEST_UDP_SOCKETS");
+ let poll = Poll::new().unwrap();
+
+ assert_send::<UdpSocket>();
+ assert_sync::<UdpSocket>();
+
+ // ensure that the sockets are non-blocking
+ let mut buf = [0; 128];
+ assert_eq!(ErrorKind::WouldBlock, rx.recv_from(&mut buf).unwrap_err().kind());
+
+ info!("Registering SENDER");
+ poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
+
+ info!("Registering LISTENER");
+ poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+
+ info!("Starting event loop to test with...");
+ let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world");
+
+ while !handler.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.readiness().is_readable() {
+ if let LISTENER = event.token() {
+ debug!("We are receiving a datagram now...");
+ let cnt = unsafe {
+ if !handler.connected {
+ handler.rx.recv_from(handler.rx_buf.mut_bytes()).unwrap().0
+ } else {
+ handler.rx.recv(handler.rx_buf.mut_bytes()).unwrap()
+ }
+ };
+
+ unsafe { MutBuf::advance(&mut handler.rx_buf, cnt); }
+ assert!(str::from_utf8(handler.rx_buf.bytes()).unwrap() == handler.msg);
+ handler.shutdown = true;
+ }
+ }
+
+ if event.readiness().is_writable() {
+ if let SENDER = event.token() {
+ let cnt = if !handler.connected {
+ let addr = handler.rx.local_addr().unwrap();
+ handler.tx.send_to(handler.buf.bytes(), &addr).unwrap()
+ } else {
+ handler.tx.send(handler.buf.bytes()).unwrap()
+ };
+
+ handler.buf.advance(cnt);
+ }
+ }
+ }
+ }
+}
+
+/// Returns the sender and the receiver
+fn connected_sockets() -> (UdpSocket, UdpSocket) {
+ let addr = localhost();
+ let any = localhost();
+
+ let tx = UdpSocket::bind(&any).unwrap();
+ let rx = UdpSocket::bind(&addr).unwrap();
+
+ let tx_addr = tx.local_addr().unwrap();
+ let rx_addr = rx.local_addr().unwrap();
+
+ assert!(tx.connect(rx_addr).is_ok());
+ assert!(rx.connect(tx_addr).is_ok());
+
+ (tx, rx)
+}
+
+#[test]
+pub fn test_udp_socket() {
+ let addr = localhost();
+ let any = localhost();
+
+ let tx = UdpSocket::bind(&any).unwrap();
+ let rx = UdpSocket::bind(&addr).unwrap();
+
+ test_send_recv_udp(tx, rx, false);
+}
+
+#[test]
+pub fn test_udp_socket_send_recv() {
+ let (tx, rx) = connected_sockets();
+
+ test_send_recv_udp(tx, rx, true);
+}
+
+#[test]
+pub fn test_udp_socket_discard() {
+ let addr = localhost();
+ let any = localhost();
+ let outside = localhost();
+
+ let tx = UdpSocket::bind(&any).unwrap();
+ let rx = UdpSocket::bind(&addr).unwrap();
+ let udp_outside = UdpSocket::bind(&outside).unwrap();
+
+ let tx_addr = tx.local_addr().unwrap();
+ let rx_addr = rx.local_addr().unwrap();
+
+ assert!(tx.connect(rx_addr).is_ok());
+ assert!(udp_outside.connect(rx_addr).is_ok());
+ assert!(rx.connect(tx_addr).is_ok());
+
+ let poll = Poll::new().unwrap();
+
+ let r = udp_outside.send(b"hello world");
+ assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock);
+
+ poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
+ poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+
+ poll.poll(&mut events, Some(time::Duration::from_secs(5))).unwrap();
+
+ for event in &events {
+ if event.readiness().is_readable() {
+ if let LISTENER = event.token() {
+ assert!(false, "Expected to no receive a packet but got something")
+ }
+ }
+ }
+}
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+#[test]
+pub fn test_udp_socket_send_recv_bufs() {
+ let (tx, rx) = connected_sockets();
+
+ let poll = Poll::new().unwrap();
+
+ poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge())
+ .unwrap();
+
+ poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let mut events = Events::with_capacity(1024);
+
+ let data = b"hello, world";
+ let write_bufs: Vec<_> = vec![b"hello, " as &[u8], b"world"]
+ .into_iter()
+ .flat_map(IoVec::from_bytes)
+ .collect();
+ let (a, b, c) = (
+ &mut [0u8; 4] as &mut [u8],
+ &mut [0u8; 6] as &mut [u8],
+ &mut [0u8; 8] as &mut [u8],
+ );
+ let mut read_bufs: Vec<_> = vec![a, b, c]
+ .into_iter()
+ .flat_map(IoVec::from_bytes_mut)
+ .collect();
+
+ let times = 5;
+ let mut rtimes = 0;
+ let mut wtimes = 0;
+
+ 'outer: loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.readiness().is_readable() {
+ if let LISTENER = event.token() {
+ loop {
+ let cnt = match rx.recv_bufs(read_bufs.as_mut()) {
+ Ok(cnt) => cnt,
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
+ Err(e) => panic!("read error {}", e),
+ };
+ assert_eq!(cnt, data.len());
+ let res: Vec<u8> = read_bufs
+ .iter()
+ .flat_map(|buf| buf.iter())
+ .cloned()
+ .collect();
+ assert_eq!(&res[..cnt], &data[..cnt]);
+ rtimes += 1;
+ if rtimes == times {
+ break 'outer;
+ }
+ }
+ }
+ }
+
+ if event.readiness().is_writable() {
+ if let SENDER = event.token() {
+ while wtimes < times {
+ let cnt = match tx.send_bufs(write_bufs.as_slice()) {
+ Ok(cnt) => cnt,
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
+ Err(e) => panic!("write error {}", e),
+ };
+ assert_eq!(cnt, data.len());
+ wtimes += 1;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/test/test_uds_shutdown.rs b/third_party/rust/mio/test/test_uds_shutdown.rs
new file mode 100644
index 0000000000..bf9644599d
--- /dev/null
+++ b/third_party/rust/mio/test/test_uds_shutdown.rs
@@ -0,0 +1,300 @@
+use {TryRead, TryWrite};
+use mio::*;
+use mio::deprecated::{EventLoop, Handler};
+use mio::deprecated::unix::*;
+use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf};
+use slab::Slab;
+use std::io;
+use std::path::PathBuf;
+use tempdir::TempDir;
+
+const SERVER: Token = Token(10_000_000);
+const CLIENT: Token = Token(10_000_001);
+
+struct EchoConn {
+ sock: UnixStream,
+ buf: Option<ByteBuf>,
+ mut_buf: Option<MutByteBuf>,
+ token: Option<Token>,
+ interest: Ready
+}
+
+impl EchoConn {
+ fn new(sock: UnixStream) -> EchoConn {
+ EchoConn {
+ sock: sock,
+ buf: None,
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token: None,
+ interest: Ready::hup()
+ }
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let mut buf = self.buf.take().unwrap();
+
+ match self.sock.try_write_buf(&mut buf) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+
+ self.buf = Some(buf);
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we wrote {} bytes!", r);
+
+ self.mut_buf = Some(buf.flip());
+
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ match self.sock.shutdown(Shutdown::Write) {
+ Err(e) => panic!(e),
+ _ => {},
+ }
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e),
+ }
+
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CONN : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we read {} bytes!", r);
+
+ // prepare to provide this to writable
+ self.buf = Some(buf.flip());
+
+ self.interest.remove(Ready::readable());
+ self.interest.insert(Ready::writable());
+ }
+ Err(e) => {
+ debug!("not implemented; client err={:?}", e);
+ self.interest.remove(Ready::readable());
+ }
+
+ };
+
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
+ PollOpt::edge())
+ }
+}
+
+struct EchoServer {
+ sock: UnixListener,
+ conns: Slab<EchoConn>
+}
+
+impl EchoServer {
+ fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("server accepting socket");
+
+ let sock = self.sock.accept().unwrap();
+ let conn = EchoConn::new(sock,);
+ let tok = self.conns.insert(conn);
+
+ // Register the connection
+ self.conns[tok].token = Some(Token(tok));
+ event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot())
+ .expect("could not register socket with event loop");
+
+ Ok(())
+ }
+
+ fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn readable; tok={:?}", tok);
+ self.conn(tok).readable(event_loop)
+ }
+
+ fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
+ tok: Token) -> io::Result<()> {
+ debug!("server conn writable; tok={:?}", tok);
+ self.conn(tok).writable(event_loop)
+ }
+
+ fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
+ &mut self.conns[tok.into()]
+ }
+}
+
+struct EchoClient {
+ sock: UnixStream,
+ msgs: Vec<&'static str>,
+ tx: SliceBuf<'static>,
+ rx: SliceBuf<'static>,
+ mut_buf: Option<MutByteBuf>,
+ token: Token,
+ interest: Ready
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl EchoClient {
+ fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient {
+ let curr = msgs.remove(0);
+
+ EchoClient {
+ sock: sock,
+ msgs: msgs,
+ tx: SliceBuf::wrap(curr.as_bytes()),
+ rx: SliceBuf::wrap(curr.as_bytes()),
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token: tok,
+ interest: Ready::none()
+ }
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket readable");
+
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CLIENT : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ if r == 0 {
+ self.interest.remove(Ready::readable());
+ event_loop.shutdown();
+ } else {
+ debug!("CLIENT : We read {} bytes!", r);
+
+ // prepare for reading
+ let mut buf = buf.flip();
+
+ while buf.has_remaining() {
+ let actual = buf.read_byte().unwrap();
+ let expect = self.rx.read_byte().unwrap();
+
+ assert!(actual == expect, "actual={}; expect={}", actual, expect);
+ }
+
+ self.mut_buf = Some(buf.flip());
+ if !self.rx.has_remaining() {
+ self.next_msg(event_loop).unwrap();
+ }
+ }
+ }
+ Err(e) => {
+ panic!("not implemented; client err={:?}", e);
+ }
+ };
+
+ event_loop.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket writable");
+
+ match self.sock.try_write_buf(&mut self.tx) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : we wrote {} bytes!", r);
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e)
+ }
+
+ event_loop.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ if self.msgs.is_empty() {
+
+ return Ok(());
+ }
+
+ let curr = self.msgs.remove(0);
+
+ debug!("client prepping next message");
+ self.tx = SliceBuf::wrap(curr.as_bytes());
+ self.rx = SliceBuf::wrap(curr.as_bytes());
+
+ self.interest.insert(Ready::writable());
+ event_loop.reregister(&self.sock, self.token, self.interest,
+ PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct Echo {
+ server: EchoServer,
+ client: EchoClient,
+}
+
+impl Echo {
+ fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
+ Echo {
+ server: EchoServer {
+ sock: srv,
+ conns: Slab::with_capacity(128)
+ },
+ client: EchoClient::new(client, CLIENT, msgs)
+ }
+ }
+}
+
+impl Handler for Echo {
+ type Timeout = usize;
+ type Message = ();
+
+ fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
+ events: Ready) {
+ debug!("ready {:?} {:?}", token, events);
+ if events.is_readable() {
+ match token {
+ SERVER => self.server.accept(event_loop).unwrap(),
+ CLIENT => self.client.readable(event_loop).unwrap(),
+ i => self.server.conn_readable(event_loop, i).unwrap()
+ }
+ }
+
+ if events.is_writable() {
+ match token {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => self.client.writable(event_loop).unwrap(),
+ _ => self.server.conn_writable(event_loop, token).unwrap()
+ };
+ }
+ }
+}
+
+#[test]
+pub fn test_echo_server() {
+ debug!("Starting TEST_ECHO_SERVER");
+ let mut event_loop = EventLoop::new().unwrap();
+
+ let tmp_dir = TempDir::new("mio").unwrap();
+ let addr = tmp_dir.path().join(&PathBuf::from("sock"));
+
+ let srv = UnixListener::bind(&addr).unwrap();
+
+ event_loop.register(&srv, SERVER, Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let sock = UnixStream::connect(&addr).unwrap();
+
+ // Connect to the server
+ event_loop.register(&sock, CLIENT, Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Start the event loop
+ event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
+}
diff --git a/third_party/rust/mio/test/test_unix_echo_server.rs b/third_party/rust/mio/test/test_unix_echo_server.rs
new file mode 100644
index 0000000000..6531deac17
--- /dev/null
+++ b/third_party/rust/mio/test/test_unix_echo_server.rs
@@ -0,0 +1,292 @@
+use {TryRead, TryWrite};
+use mio::*;
+use mio::deprecated::{EventLoop, Handler};
+use mio::deprecated::unix::*;
+use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf};
+use slab::Slab;
+use std::path::PathBuf;
+use std::io;
+use tempdir::TempDir;
+
+const SERVER: Token = Token(10_000_000);
+const CLIENT: Token = Token(10_000_001);
+
+struct EchoConn {
+ sock: UnixStream,
+ buf: Option<ByteBuf>,
+ mut_buf: Option<MutByteBuf>,
+ token: Option<Token>,
+ interest: Ready,
+}
+
+impl EchoConn {
+ fn new(sock: UnixStream) -> EchoConn {
+ EchoConn {
+ sock: sock,
+ buf: None,
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token: None,
+ interest: Ready::hup(),
+ }
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let mut buf = self.buf.take().unwrap();
+
+ match self.sock.try_write_buf(&mut buf) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+
+ self.buf = Some(buf);
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we wrote {} bytes!", r);
+
+ self.mut_buf = Some(buf.flip());
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e),
+ }
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CONN : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we read {} bytes!", r);
+
+ // prepare to provide this to writable
+ self.buf = Some(buf.flip());
+
+ self.interest.remove(Ready::readable());
+ self.interest.insert(Ready::writable());
+ }
+ Err(e) => {
+ debug!("not implemented; client err={:?}", e);
+ self.interest.remove(Ready::readable());
+ }
+
+ };
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct EchoServer {
+ sock: UnixListener,
+ conns: Slab<EchoConn>
+}
+
+impl EchoServer {
+ fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("server accepting socket");
+
+ let sock = self.sock.accept().unwrap();
+ let conn = EchoConn::new(sock);
+ let tok = self.conns.insert(conn);
+
+ // Register the connection
+ self.conns[tok].token = Some(Token(tok));
+ event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
+ .expect("could not register socket with event loop");
+
+ Ok(())
+ }
+
+ fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
+ debug!("server conn readable; tok={:?}", tok);
+ self.conn(tok).readable(event_loop)
+ }
+
+ fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
+ debug!("server conn writable; tok={:?}", tok);
+ self.conn(tok).writable(event_loop)
+ }
+
+ fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
+ &mut self.conns[tok.into()]
+ }
+}
+
+struct EchoClient {
+ sock: UnixStream,
+ msgs: Vec<&'static str>,
+ tx: SliceBuf<'static>,
+ rx: SliceBuf<'static>,
+ mut_buf: Option<MutByteBuf>,
+ token: Token,
+ interest: Ready,
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl EchoClient {
+ fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient {
+ let curr = msgs.remove(0);
+
+ EchoClient {
+ sock: sock,
+ msgs: msgs,
+ tx: SliceBuf::wrap(curr.as_bytes()),
+ rx: SliceBuf::wrap(curr.as_bytes()),
+ mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
+ token: tok,
+ interest: Ready::none(),
+ }
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket readable");
+
+ let mut buf = self.mut_buf.take().unwrap();
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ debug!("CLIENT : spurious read wakeup");
+ self.mut_buf = Some(buf);
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : We read {} bytes!", r);
+
+ // prepare for reading
+ let mut buf = buf.flip();
+
+ debug!("CLIENT : buf = {:?} -- rx = {:?}", buf.bytes(), self.rx.bytes());
+ while buf.has_remaining() {
+ let actual = buf.read_byte().unwrap();
+ let expect = self.rx.read_byte().unwrap();
+
+ assert!(actual == expect, "actual={}; expect={}", actual, expect);
+ }
+
+ self.mut_buf = Some(buf.flip());
+
+ self.interest.remove(Ready::readable());
+
+ if !self.rx.has_remaining() {
+ self.next_msg(event_loop).unwrap();
+ }
+ }
+ Err(e) => {
+ panic!("not implemented; client err={:?}", e);
+ }
+ };
+
+ if !self.interest.is_none() {
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?;
+ }
+
+ Ok(())
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket writable");
+
+ match self.sock.try_write_buf(&mut self.tx) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : we wrote {} bytes!", r);
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e)
+ }
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ if self.msgs.is_empty() {
+ event_loop.shutdown();
+ return Ok(());
+ }
+
+ let curr = self.msgs.remove(0);
+
+ debug!("client prepping next message");
+ self.tx = SliceBuf::wrap(curr.as_bytes());
+ self.rx = SliceBuf::wrap(curr.as_bytes());
+
+ self.interest.insert(Ready::writable());
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct Echo {
+ server: EchoServer,
+ client: EchoClient,
+}
+
+impl Echo {
+ fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
+ Echo {
+ server: EchoServer {
+ sock: srv,
+ conns: Slab::with_capacity(128)
+ },
+ client: EchoClient::new(client, CLIENT, msgs)
+ }
+ }
+}
+
+impl Handler for Echo {
+ type Timeout = usize;
+ type Message = ();
+
+ fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) {
+ if events.is_readable() {
+ match token {
+ SERVER => self.server.accept(event_loop).unwrap(),
+ CLIENT => self.client.readable(event_loop).unwrap(),
+ i => self.server.conn_readable(event_loop, i).unwrap()
+ };
+ }
+
+ if events.is_writable() {
+ match token {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => self.client.writable(event_loop).unwrap(),
+ _ => self.server.conn_writable(event_loop, token).unwrap()
+ };
+ }
+ }
+}
+
+#[test]
+pub fn test_unix_echo_server() {
+ debug!("Starting TEST_UNIX_ECHO_SERVER");
+ let mut event_loop = EventLoop::new().unwrap();
+
+ let tmp_dir = TempDir::new("mio").unwrap();
+ let addr = tmp_dir.path().join(&PathBuf::from("sock"));
+
+ let srv = UnixListener::bind(&addr).unwrap();
+
+ info!("listen for connections");
+ event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let sock = UnixStream::connect(&addr).unwrap();
+
+ // Connect to the server
+ event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Start the event loop
+ event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
+}
diff --git a/third_party/rust/mio/test/test_unix_pass_fd.rs b/third_party/rust/mio/test/test_unix_pass_fd.rs
new file mode 100644
index 0000000000..a948bb2ff6
--- /dev/null
+++ b/third_party/rust/mio/test/test_unix_pass_fd.rs
@@ -0,0 +1,306 @@
+use {TryRead, TryWrite};
+use mio::*;
+use mio::deprecated::{EventLoop, Handler};
+use mio::deprecated::unix::*;
+use bytes::{Buf, ByteBuf, SliceBuf};
+use slab::Slab;
+use std::path::PathBuf;
+use std::io::{self, Read};
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use tempdir::TempDir;
+
+const SERVER: Token = Token(10_000_000);
+const CLIENT: Token = Token(10_000_001);
+
+struct EchoConn {
+ sock: UnixStream,
+ pipe_fd: Option<PipeReader>,
+ token: Option<Token>,
+ interest: Ready,
+}
+
+impl EchoConn {
+ fn new(sock: UnixStream) -> EchoConn {
+ EchoConn {
+ sock: sock,
+ pipe_fd: None,
+ token: None,
+ interest: Ready::hup(),
+ }
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let fd = self.pipe_fd.take().unwrap();
+
+ match self.sock.try_write_send_fd(b"x", fd.as_raw_fd()) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+
+ self.pipe_fd = Some(fd);
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we wrote {} bytes!", r);
+
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e),
+ }
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ let mut buf = ByteBuf::mut_with_capacity(2048);
+
+ match self.sock.try_read_buf(&mut buf) {
+ Ok(None) => {
+ panic!("We just got readable, but were unable to read from the socket?");
+ }
+ Ok(Some(r)) => {
+ debug!("CONN : we read {} bytes!", r);
+ self.interest.remove(Ready::readable());
+ self.interest.insert(Ready::writable());
+ }
+ Err(e) => {
+ debug!("not implemented; client err={:?}", e);
+ self.interest.remove(Ready::readable());
+ }
+
+ };
+
+ // create fd to pass back. Assume that the write will work
+ // without blocking, for simplicity -- we're only testing that
+ // the FD makes it through somehow
+ let (rd, mut wr) = pipe().unwrap();
+ let mut buf = buf.flip();
+ match wr.try_write_buf(&mut buf) {
+ Ok(None) => {
+ panic!("writing to our own pipe blocked :(");
+ }
+ Ok(Some(r)) => {
+ debug!("CONN: we wrote {} bytes to the FD", r);
+ }
+ Err(e) => {
+ panic!("not implemented; client err={:?}", e);
+ }
+ }
+ self.pipe_fd = Some(rd);
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct EchoServer {
+ sock: UnixListener,
+ conns: Slab<EchoConn>
+}
+
+impl EchoServer {
+ fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("server accepting socket");
+
+ let sock = self.sock.accept().unwrap();
+ let conn = EchoConn::new(sock);
+ let tok = self.conns.insert(conn);
+
+ // Register the connection
+ self.conns[tok].token = Some(Token(tok));
+ event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
+ .expect("could not register socket with event loop");
+
+ Ok(())
+ }
+
+ fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
+ debug!("server conn readable; tok={:?}", tok);
+ self.conn(tok).readable(event_loop)
+ }
+
+ fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
+ debug!("server conn writable; tok={:?}", tok);
+ self.conn(tok).writable(event_loop)
+ }
+
+ fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
+ &mut self.conns[tok.into()]
+ }
+}
+
+struct EchoClient {
+ sock: UnixStream,
+ msgs: Vec<&'static str>,
+ tx: SliceBuf<'static>,
+ rx: SliceBuf<'static>,
+ token: Token,
+ interest: Ready,
+}
+
+
+// Sends a message and expects to receive the same exact message, one at a time
+impl EchoClient {
+ fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient {
+ let curr = msgs.remove(0);
+
+ EchoClient {
+ sock: sock,
+ msgs: msgs,
+ tx: SliceBuf::wrap(curr.as_bytes()),
+ rx: SliceBuf::wrap(curr.as_bytes()),
+ token: tok,
+ interest: Ready::none(),
+ }
+ }
+
+ fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket readable");
+
+ let mut pipe: PipeReader;
+ let mut buf = [0; 256];
+
+ match self.sock.read_recv_fd(&mut buf) {
+ Ok((_, None)) => {
+ panic!("Did not receive passed file descriptor");
+ }
+ Ok((r, Some(fd))) => {
+ assert_eq!(r, 1);
+ assert_eq!(b'x', buf[0]);
+ debug!("CLIENT : We read {} bytes!", r);
+ pipe = From::<Io>::from(unsafe { Io::from_raw_fd(fd) });
+ }
+ Err(e) => {
+ panic!("not implemented; client err={:?}", e);
+ }
+ };
+
+ // read the data out of the FD itself
+ let n = match pipe.read(&mut buf) {
+ Ok(r) => {
+ debug!("CLIENT : We read {} bytes from the FD", r);
+ r
+ }
+ Err(e) => {
+ panic!("not implemented, client err={:?}", e);
+ }
+ };
+
+ for &actual in buf[0..n].iter() {
+ let expect = self.rx.read_byte().unwrap();
+ assert!(actual == expect, "actual={}; expect={}", actual, expect);
+ }
+
+ self.interest.remove(Ready::readable());
+
+ if !self.rx.has_remaining() {
+ self.next_msg(event_loop).unwrap();
+ }
+
+ if !self.interest.is_none() {
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?;
+ }
+
+ Ok(())
+ }
+
+ fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ debug!("client socket writable");
+
+ match self.sock.try_write_buf(&mut self.tx) {
+ Ok(None) => {
+ debug!("client flushing buf; WOULDBLOCK");
+ self.interest.insert(Ready::writable());
+ }
+ Ok(Some(r)) => {
+ debug!("CLIENT : we wrote {} bytes!", r);
+ self.interest.insert(Ready::readable());
+ self.interest.remove(Ready::writable());
+ }
+ Err(e) => debug!("not implemented; client err={:?}", e)
+ }
+
+ assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+
+ fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
+ if self.msgs.is_empty() {
+ event_loop.shutdown();
+ return Ok(());
+ }
+
+ let curr = self.msgs.remove(0);
+
+ debug!("client prepping next message");
+ self.tx = SliceBuf::wrap(curr.as_bytes());
+ self.rx = SliceBuf::wrap(curr.as_bytes());
+
+ self.interest.insert(Ready::writable());
+ event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
+ }
+}
+
+struct Echo {
+ server: EchoServer,
+ client: EchoClient,
+}
+
+impl Echo {
+ fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
+ Echo {
+ server: EchoServer {
+ sock: srv,
+ conns: Slab::with_capacity(128)
+ },
+ client: EchoClient::new(client, CLIENT, msgs)
+ }
+ }
+}
+
+impl Handler for Echo {
+ type Timeout = usize;
+ type Message = ();
+
+ fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) {
+ if events.is_readable() {
+ match token {
+ SERVER => self.server.accept(event_loop).unwrap(),
+ CLIENT => self.client.readable(event_loop).unwrap(),
+ i => self.server.conn_readable(event_loop, i).unwrap()
+ };
+ }
+
+ if events.is_writable() {
+ match token {
+ SERVER => panic!("received writable for token 0"),
+ CLIENT => self.client.writable(event_loop).unwrap(),
+ _ => self.server.conn_writable(event_loop, token).unwrap()
+ };
+ }
+ }
+}
+
+#[test]
+pub fn test_unix_pass_fd() {
+ debug!("Starting TEST_UNIX_PASS_FD");
+ let mut event_loop = EventLoop::new().unwrap();
+
+ let tmp_dir = TempDir::new("mio").unwrap();
+ let addr = tmp_dir.path().join(&PathBuf::from("sock"));
+
+ let srv = UnixListener::bind(&addr).unwrap();
+
+ info!("listen for connections");
+ event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ let sock = UnixStream::connect(&addr).unwrap();
+
+ // Connect to the server
+ event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
+
+ // Start the event loop
+ event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
+}
diff --git a/third_party/rust/mio/test/test_write_then_drop.rs b/third_party/rust/mio/test/test_write_then_drop.rs
new file mode 100644
index 0000000000..971ffff7c2
--- /dev/null
+++ b/third_party/rust/mio/test/test_write_then_drop.rs
@@ -0,0 +1,123 @@
+use std::io::{Write, Read};
+
+use mio::event::Evented;
+use mio::net::{TcpListener, TcpStream};
+use mio::{Poll, Events, Ready, PollOpt, Token};
+
+#[test]
+fn write_then_drop() {
+ drop(::env_logger::init());
+
+ let a = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = a.local_addr().unwrap();
+ let mut s = TcpStream::connect(&addr).unwrap();
+
+ let poll = Poll::new().unwrap();
+
+ a.register(&poll,
+ Token(1),
+ Ready::readable(),
+ PollOpt::edge()).unwrap();
+ s.register(&poll,
+ Token(3),
+ Ready::empty(),
+ PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(1));
+
+ let mut s2 = a.accept().unwrap().0;
+
+ s2.register(&poll,
+ Token(2),
+ Ready::writable(),
+ PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(2));
+
+ s2.write_all(&[1, 2, 3, 4]).unwrap();
+ drop(s2);
+
+ s.reregister(&poll,
+ Token(3),
+ Ready::readable(),
+ PollOpt::edge()).unwrap();
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(3));
+
+ let mut buf = [0; 10];
+ assert_eq!(s.read(&mut buf).unwrap(), 4);
+ assert_eq!(&buf[0..4], &[1, 2, 3, 4]);
+}
+
+#[test]
+fn write_then_deregister() {
+ drop(::env_logger::init());
+
+ let a = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = a.local_addr().unwrap();
+ let mut s = TcpStream::connect(&addr).unwrap();
+
+ let poll = Poll::new().unwrap();
+
+ a.register(&poll,
+ Token(1),
+ Ready::readable(),
+ PollOpt::edge()).unwrap();
+ s.register(&poll,
+ Token(3),
+ Ready::empty(),
+ PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(1));
+
+ let mut s2 = a.accept().unwrap().0;
+
+ s2.register(&poll,
+ Token(2),
+ Ready::writable(),
+ PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(2));
+
+ s2.write_all(&[1, 2, 3, 4]).unwrap();
+ s2.deregister(&poll).unwrap();
+
+ s.reregister(&poll,
+ Token(3),
+ Ready::readable(),
+ PollOpt::edge()).unwrap();
+ let mut events = Events::with_capacity(1024);
+ while events.is_empty() {
+ poll.poll(&mut events, None).unwrap();
+ }
+ assert_eq!(events.len(), 1);
+ assert_eq!(events.get(0).unwrap().token(), Token(3));
+
+ let mut buf = [0; 10];
+ assert_eq!(s.read(&mut buf).unwrap(), 4);
+ assert_eq!(&buf[0..4], &[1, 2, 3, 4]);
+}