summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio/test/test_tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/mio/test/test_tcp.rs')
-rw-r--r--third_party/rust/mio/test/test_tcp.rs660
1 files changed, 660 insertions, 0 deletions
diff --git a/third_party/rust/mio/test/test_tcp.rs b/third_party/rust/mio/test/test_tcp.rs
new file mode 100644
index 0000000000..ae569ac5e8
--- /dev/null
+++ b/third_party/rust/mio/test/test_tcp.rs
@@ -0,0 +1,660 @@
+use std::cmp;
+use std::io::prelude::*;
+use std::io;
+use std::net;
+use std::sync::mpsc::channel;
+use std::thread;
+use std::time::Duration;
+
+use net2::{self, TcpStreamExt};
+
+use {TryRead, TryWrite};
+use mio::{Token, Ready, PollOpt, Poll, Events};
+use iovec::IoVec;
+use mio::net::{TcpListener, TcpStream};
+
+#[test]
+fn accept() {
+ struct H { hit: bool, listener: TcpListener, shutdown: bool }
+
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ net::TcpStream::connect(&addr).unwrap();
+ });
+
+ let poll = Poll::new().unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { hit: false, listener: l, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ h.hit = true;
+ assert_eq!(event.token(), Token(1));
+ assert!(event.readiness().is_readable());
+ assert!(h.listener.accept().is_ok());
+ h.shutdown = true;
+ }
+ }
+ assert!(h.hit);
+ assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
+ t.join().unwrap();
+}
+
+#[test]
+fn connect() {
+ struct H { hit: u32, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let (tx, rx) = channel();
+ let (tx2, rx2) = channel();
+ let t = thread::spawn(move || {
+ let s = l.accept().unwrap();
+ rx.recv().unwrap();
+ drop(s);
+ tx2.send(()).unwrap();
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { hit: 0, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ match h.hit {
+ 0 => assert!(event.readiness().is_writable()),
+ 1 => assert!(event.readiness().is_readable()),
+ _ => panic!(),
+ }
+ h.hit += 1;
+ h.shutdown = true;
+ }
+ }
+ assert_eq!(h.hit, 1);
+ tx.send(()).unwrap();
+ rx2.recv().unwrap();
+ h.shutdown = false;
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ match h.hit {
+ 0 => assert!(event.readiness().is_writable()),
+ 1 => assert!(event.readiness().is_readable()),
+ _ => panic!(),
+ }
+ h.hit += 1;
+ h.shutdown = true;
+ }
+ }
+ assert_eq!(h.hit, 2);
+ t.join().unwrap();
+}
+
+#[test]
+fn read() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let mut b = [0; 1024];
+ loop {
+ if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn peek() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let mut b = [0; 1024];
+ match h.socket.peek(&mut b) {
+ Ok(_) => (),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue
+ },
+ Err(e) => panic!("unexpected error: {:?}", e),
+ }
+
+ loop {
+ if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn read_bufs() {
+ const N: usize = 16 * 1024 * 1024;
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let b = [1; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.write(&b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap();
+
+ let b1 = &mut [0; 10][..];
+ let b2 = &mut [0; 383][..];
+ let b3 = &mut [0; 28][..];
+ let b4 = &mut [0; 8][..];
+ let b5 = &mut [0; 128][..];
+ let mut b: [&mut IoVec; 5] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ ];
+
+ let mut so_far = 0;
+ loop {
+ for buf in b.iter_mut() {
+ for byte in buf.as_mut_bytes() {
+ *byte = 0;
+ }
+ }
+
+ poll.poll(&mut events, None).unwrap();
+
+ match s.read_bufs(&mut b) {
+ Ok(0) => {
+ assert_eq!(so_far, N);
+ break
+ }
+ Ok(mut n) => {
+ so_far += n;
+ for buf in b.iter() {
+ let buf = buf.as_bytes();
+ for byte in buf[..cmp::min(n, buf.len())].iter() {
+ assert_eq!(*byte, 1);
+ }
+ n = n.saturating_sub(buf.len());
+ if n == 0 {
+ break
+ }
+ }
+ assert_eq!(n, 0);
+ }
+ Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
+ }
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn write() {
+ const N: usize = 16 * 1024 * 1024;
+ struct H { amt: usize, socket: TcpStream, shutdown: bool }
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ amt += s.read(&mut b).unwrap();
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let s = TcpStream::connect(&addr).unwrap();
+
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { amt: 0, socket: s, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ assert_eq!(event.token(), Token(1));
+ let b = [0; 1024];
+ loop {
+ if let Some(amt) = h.socket.try_write(&b).unwrap() {
+ h.amt += amt;
+ } else {
+ break
+ }
+ if h.amt >= N {
+ h.shutdown = true;
+ break
+ }
+ }
+ }
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn write_bufs() {
+ const N: usize = 16 * 1024 * 1024;
+
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < N {
+ for byte in b.iter_mut() {
+ *byte = 0;
+ }
+ let n = s.read(&mut b).unwrap();
+ amt += n;
+ for byte in b[..n].iter() {
+ assert_eq!(*byte, 1);
+ }
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(128);
+ let s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
+
+ let b1 = &[1; 10][..];
+ let b2 = &[1; 383][..];
+ let b3 = &[1; 28][..];
+ let b4 = &[1; 8][..];
+ let b5 = &[1; 128][..];
+ let b: [&IoVec; 5] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ ];
+
+ let mut so_far = 0;
+ while so_far < N {
+ poll.poll(&mut events, None).unwrap();
+
+ match s.write_bufs(&b) {
+ Ok(n) => so_far += n,
+ Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
+ }
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn connect_then_close() {
+ struct H { listener: TcpListener, shutdown: bool }
+
+ let poll = Poll::new().unwrap();
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+ poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
+
+ let mut events = Events::with_capacity(128);
+
+ let mut h = H { listener: l, shutdown: false };
+ while !h.shutdown {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(1) {
+ let s = h.listener.accept().unwrap().0;
+ poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
+ PollOpt::edge()).unwrap();
+ drop(s);
+ } else if event.token() == Token(2) {
+ h.shutdown = true;
+ }
+ }
+ }
+}
+
+#[test]
+fn listen_then_close() {
+ let poll = Poll::new().unwrap();
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+ drop(l);
+
+ let mut events = Events::with_capacity(128);
+
+ poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
+
+ for event in &events {
+ if event.token() == Token(1) {
+ panic!("recieved ready() on a closed TcpListener")
+ }
+ }
+}
+
+fn assert_send<T: Send>() {
+}
+
+fn assert_sync<T: Sync>() {
+}
+
+#[test]
+fn test_tcp_sockets_are_send() {
+ assert_send::<TcpListener>();
+ assert_send::<TcpStream>();
+ assert_sync::<TcpListener>();
+ assert_sync::<TcpStream>();
+}
+
+#[test]
+fn bind_twice_bad() {
+ let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l1.local_addr().unwrap();
+ assert!(TcpListener::bind(&addr).is_err());
+}
+
+#[test]
+fn multiple_writes_immediate_success() {
+ const N: usize = 16;
+ let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = l.local_addr().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut s = l.accept().unwrap().0;
+ let mut b = [0; 1024];
+ let mut amt = 0;
+ while amt < 1024*N {
+ for byte in b.iter_mut() {
+ *byte = 0;
+ }
+ let n = s.read(&mut b).unwrap();
+ amt += n;
+ for byte in b[..n].iter() {
+ assert_eq!(*byte, 1);
+ }
+ }
+ });
+
+ let poll = Poll::new().unwrap();
+ let mut s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
+ let mut events = Events::with_capacity(16);
+
+ // Wait for our TCP stream to connect
+ 'outer: loop {
+ poll.poll(&mut events, None).unwrap();
+ for event in events.iter() {
+ if event.token() == Token(1) && event.readiness().is_writable() {
+ break 'outer
+ }
+ }
+ }
+
+ for _ in 0..N {
+ s.write_all(&[1; 1024]).unwrap();
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn connection_reset_by_peer() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+ let mut buf = [0u8; 16];
+
+ // Create listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = l.local_addr().unwrap();
+
+ // Connect client
+ let client = net2::TcpBuilder::new_v4().unwrap()
+ .to_tcp_stream().unwrap();
+
+ client.set_linger(Some(Duration::from_millis(0))).unwrap();
+ client.connect(&addr).unwrap();
+
+ // Convert to Mio stream
+ let client = TcpStream::from_stream(client).unwrap();
+
+ // Register server
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
+
+ // Register interest in the client
+ poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
+
+ // Wait for listener to be ready
+ let mut server;
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) {
+ match l.accept() {
+ Ok((sock, _)) => {
+ server = sock;
+ break 'outer;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ Err(e) => panic!("unexpected error {:?}", e),
+ }
+ }
+ }
+ }
+
+ // Close the connection
+ drop(client);
+
+ // Wait a moment
+ thread::sleep(Duration::from_millis(100));
+
+ // Register interest in the server socket
+ poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap();
+
+
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(3) {
+ assert!(event.readiness().is_readable());
+
+ match server.read(&mut buf) {
+ Ok(0) |
+ Err(_) => {},
+
+ Ok(x) => panic!("expected empty buffer but read {} bytes", x),
+ }
+ return;
+ }
+ }
+ }
+
+}
+
+#[test]
+#[cfg_attr(target_os = "fuchsia", ignore)]
+fn connect_error() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+
+ // Pick a "random" port that shouldn't be in use.
+ let l = match TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()) {
+ Ok(l) => l,
+ Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => {
+ // Connection failed synchronously. This is not a bug, but it
+ // unfortunately doesn't get us the code coverage we want.
+ return;
+ },
+ Err(e) => panic!("TcpStream::connect unexpected error {:?}", e)
+ };
+
+ poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
+
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) {
+ assert!(event.readiness().is_writable());
+ break 'outer
+ }
+ }
+ }
+
+ assert!(l.take_error().unwrap().is_some());
+}
+
+#[test]
+fn write_error() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(16);
+ let (tx, rx) = channel();
+
+ let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = listener.local_addr().unwrap();
+ let t = thread::spawn(move || {
+ let (conn, _addr) = listener.accept().unwrap();
+ rx.recv().unwrap();
+ drop(conn);
+ });
+
+ let mut s = TcpStream::connect(&addr).unwrap();
+ poll.register(&s,
+ Token(0),
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge()).unwrap();
+
+ let mut wait_writable = || {
+ 'outer:
+ loop {
+ poll.poll(&mut events, None).unwrap();
+
+ for event in &events {
+ if event.token() == Token(0) && event.readiness().is_writable() {
+ break 'outer
+ }
+ }
+ }
+ };
+
+ wait_writable();
+
+ tx.send(()).unwrap();
+ t.join().unwrap();
+
+ let buf = [0; 1024];
+ loop {
+ match s.write(&buf) {
+ Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ wait_writable()
+ }
+ Err(e) => {
+ println!("good error: {}", e);
+ break
+ }
+ }
+ }
+}