diff options
Diffstat (limited to 'third_party/rust/tokio-tcp/tests')
-rw-r--r-- | third_party/rust/tokio-tcp/tests/chain.rs | 49 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/echo.rs | 51 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/limit.rs | 43 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/stream-buffered.rs | 54 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/tcp.rs | 132 |
5 files changed, 329 insertions, 0 deletions
diff --git a/third_party/rust/tokio-tcp/tests/chain.rs b/third_party/rust/tokio-tcp/tests/chain.rs new file mode 100644 index 0000000000..c4e37f1030 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/chain.rs @@ -0,0 +1,49 @@ +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Write, Read}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::read_to_end; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn chain_clients() { + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = TcpStream::connect(&addr).unwrap(); + s1.write_all(b"foo ").unwrap(); + let mut s2 = TcpStream::connect(&addr).unwrap(); + s2.write_all(b"bar ").unwrap(); + let mut s3 = TcpStream::connect(&addr).unwrap(); + s3.write_all(b"baz").unwrap(); + }); + + let clients = srv.incoming().take(3); + let copied = clients.collect().and_then(|clients| { + let mut clients = clients.into_iter(); + let a = clients.next().unwrap(); + let b = clients.next().unwrap(); + let c = clients.next().unwrap(); + + read_to_end(a.chain(b).chain(c), Vec::new()) + }); + + let (_, data) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(data, b"foo bar baz"); +} diff --git a/third_party/rust/tokio-tcp/tests/echo.rs b/third_party/rust/tokio-tcp/tests/echo.rs new file mode 100644 index 0000000000..3c020b193e --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/echo.rs @@ -0,0 +1,51 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::Future; +use futures::stream::Stream; +use tokio_tcp::TcpListener; +use tokio_io::AsyncRead; +use tokio_io::io::copy; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + drop(env_logger::init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = TcpStream::connect(&addr).unwrap(); + + for _i in 0..1024 { + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + let mut buf = [0; 1024]; + assert_eq!(t!(s.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg.as_bytes()); + } + }); + + let clients = srv.incoming(); + let client = clients.into_future().map(|e| e.0.unwrap()).map_err(|e| e.0); + let halves = client.map(|s| s.split()); + let copied = halves.and_then(|(a, b)| copy(a, b)); + + let (amt, _, _) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-tcp/tests/limit.rs b/third_party/rust/tokio-tcp/tests/limit.rs new file mode 100644 index 0000000000..8714da9a51 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/limit.rs @@ -0,0 +1,43 @@ +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Write, Read}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::read_to_end; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn limit() { + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = TcpStream::connect(&addr).unwrap(); + s1.write_all(b"foo bar baz").unwrap(); + }); + + let clients = srv.incoming().take(1); + let copied = clients.collect().and_then(|clients| { + let mut clients = clients.into_iter(); + let a = clients.next().unwrap(); + + read_to_end(a.take(4), Vec::new()) + }); + + let (_, data) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(data, b"foo "); +} diff --git a/third_party/rust/tokio-tcp/tests/stream-buffered.rs b/third_party/rust/tokio-tcp/tests/stream-buffered.rs new file mode 100644 index 0000000000..a6d71298dc --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/stream-buffered.rs @@ -0,0 +1,54 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::copy; +use tokio_io::AsyncRead; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + drop(env_logger::init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = t!(TcpStream::connect(&addr)); + let mut s2 = t!(TcpStream::connect(&addr)); + + let msg = b"foo"; + assert_eq!(t!(s1.write(msg)), msg.len()); + assert_eq!(t!(s2.write(msg)), msg.len()); + let mut buf = [0; 1024]; + assert_eq!(t!(s1.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg); + assert_eq!(t!(s2.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg); + }); + + let future = srv.incoming() + .map(|s| s.split()) + .map(|(a, b)| copy(a, b).map(|_| ())) + .buffered(10) + .take(2) + .collect(); + + t!(future.wait()); + + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-tcp/tests/tcp.rs b/third_party/rust/tokio-tcp/tests/tcp.rs new file mode 100644 index 0000000000..c905711b20 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/tcp.rs @@ -0,0 +1,132 @@ +extern crate env_logger; +extern crate tokio_io; +extern crate tokio_tcp; +extern crate mio; +extern crate futures; + +use std::{net, thread}; +use std::sync::mpsc::channel; + +use futures::{Future, Stream}; +use tokio_tcp::{TcpListener, TcpStream}; + + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn connect() { + drop(env_logger::init()); + let srv = t!(net::TcpListener::bind("127.0.0.1:0")); + let addr = t!(srv.local_addr()); + let t = thread::spawn(move || { + t!(srv.accept()).0 + }); + + let stream = TcpStream::connect(&addr); + let mine = t!(stream.wait()); + let theirs = t.join().unwrap(); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); +} + +#[test] +fn accept() { + drop(env_logger::init()); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let (tx, rx) = channel(); + let client = srv.incoming().map(move |t| { + tx.send(()).unwrap(); + t + }).into_future().map_err(|e| e.0); + assert!(rx.try_recv().is_err()); + let t = thread::spawn(move || { + net::TcpStream::connect(&addr).unwrap() + }); + + let (mine, _remaining) = t!(client.wait()); + let mine = mine.unwrap(); + let theirs = t.join().unwrap(); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); +} + +#[test] +fn accept2() { + drop(env_logger::init()); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + net::TcpStream::connect(&addr).unwrap() + }); + + let (tx, rx) = channel(); + let client = srv.incoming().map(move |t| { + tx.send(()).unwrap(); + t + }).into_future().map_err(|e| e.0); + assert!(rx.try_recv().is_err()); + + let (mine, _remaining) = t!(client.wait()); + mine.unwrap(); + t.join().unwrap(); +} + +#[cfg(unix)] +mod unix { + use tokio_tcp::TcpStream; + + use env_logger; + use futures::{Future, future}; + use mio::unix::UnixReady; + use tokio_io::AsyncRead; + + use std::io::Write; + use std::{net, thread}; + use std::time::Duration; + + #[test] + fn poll_hup() { + drop(env_logger::init()); + + let srv = t!(net::TcpListener::bind("127.0.0.1:0")); + let addr = t!(srv.local_addr()); + let t = thread::spawn(move || { + let mut client = t!(srv.accept()).0; + client.write(b"hello world").unwrap(); + thread::sleep(Duration::from_millis(200)); + }); + + let mut stream = t!(TcpStream::connect(&addr).wait()); + + // Poll for HUP before reading. + future::poll_fn(|| { + stream.poll_read_ready(UnixReady::hup().into()) + }).wait().unwrap(); + + // Same for write half + future::poll_fn(|| { + stream.poll_write_ready() + }).wait().unwrap(); + + let mut buf = vec![0; 11]; + + // Read the data + future::poll_fn(|| { + stream.poll_read(&mut buf) + }).wait().unwrap(); + + assert_eq!(b"hello world", &buf[..]); + + t.join().unwrap(); + } +} |