diff options
Diffstat (limited to 'third_party/rust/flate2/tests/tokio.rs')
-rw-r--r-- | third_party/rust/flate2/tests/tokio.rs | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/third_party/rust/flate2/tests/tokio.rs b/third_party/rust/flate2/tests/tokio.rs new file mode 100644 index 0000000000..0f73646407 --- /dev/null +++ b/third_party/rust/flate2/tests/tokio.rs @@ -0,0 +1,133 @@ +#![cfg(feature = "tokio")] + +extern crate flate2; +extern crate futures; +extern crate rand; +extern crate tokio_io; +extern crate tokio_tcp; +extern crate tokio_threadpool; + +use std::io::{Read, Write}; +use std::iter; +use std::net::{Shutdown, TcpListener}; +use std::thread; + +use flate2::read; +use flate2::write; +use flate2::Compression; +use futures::Future; +use rand::{thread_rng, Rng}; +use tokio_io::io::{copy, shutdown}; +use tokio_io::AsyncRead; +use tokio_tcp::TcpStream; + +#[test] +fn tcp_stream_echo_pattern() { + const N: u8 = 16; + const M: usize = 16 * 1024; + + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let t = thread::spawn(move || { + let a = listener.accept().unwrap().0; + let b = a.try_clone().unwrap(); + + let t = thread::spawn(move || { + let mut b = read::DeflateDecoder::new(b); + let mut buf = [0; M]; + for i in 0..N { + b.read_exact(&mut buf).unwrap(); + for byte in buf.iter() { + assert_eq!(*byte, i); + } + } + + assert_eq!(b.read(&mut buf).unwrap(), 0); + }); + + let mut a = write::ZlibEncoder::new(a, Compression::default()); + for i in 0..N { + let buf = [i; M]; + a.write_all(&buf).unwrap(); + } + a.finish().unwrap().shutdown(Shutdown::Write).unwrap(); + + t.join().unwrap(); + }); + + let stream = TcpStream::connect(&addr); + let copy = stream + .and_then(|s| { + let (a, b) = s.split(); + let a = read::ZlibDecoder::new(a); + let b = write::DeflateEncoder::new(b, Compression::default()); + copy(a, b) + }) + .then(|result| { + let (amt, _a, b) = result.unwrap(); + assert_eq!(amt, (N as u64) * (M as u64)); + shutdown(b).map(|_| ()) + }) + .map_err(|err| panic!("{}", err)); + + let threadpool = tokio_threadpool::Builder::new().build(); + threadpool.spawn(copy); + threadpool.shutdown().wait().unwrap(); + t.join().unwrap(); +} + +#[test] +fn echo_random() { + let v = iter::repeat(()) + .take(1024 * 1024) + .map(|()| thread_rng().gen::<u8>()) + .collect::<Vec<_>>(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let v2 = v.clone(); + let t = thread::spawn(move || { + let a = listener.accept().unwrap().0; + let b = a.try_clone().unwrap(); + + let mut v3 = v2.clone(); + let t = thread::spawn(move || { + let mut b = read::DeflateDecoder::new(b); + let mut buf = [0; 1024]; + while v3.len() > 0 { + let n = b.read(&mut buf).unwrap(); + for (actual, expected) in buf[..n].iter().zip(&v3) { + assert_eq!(*actual, *expected); + } + v3.drain(..n); + } + + assert_eq!(b.read(&mut buf).unwrap(), 0); + }); + + let mut a = write::ZlibEncoder::new(a, Compression::default()); + a.write_all(&v2).unwrap(); + a.finish().unwrap().shutdown(Shutdown::Write).unwrap(); + + t.join().unwrap(); + }); + + let stream = TcpStream::connect(&addr); + let copy = stream + .and_then(|s| { + let (a, b) = s.split(); + let a = read::ZlibDecoder::new(a); + let b = write::DeflateEncoder::new(b, Compression::default()); + copy(a, b) + }) + .then(move |result| { + let (amt, _a, b) = result.unwrap(); + assert_eq!(amt, v.len() as u64); + shutdown(b).map(|_| ()) + }) + .map_err(|err| panic!("{}", err)); + + let threadpool = tokio_threadpool::Builder::new().build(); + threadpool.spawn(copy); + threadpool.shutdown().wait().unwrap(); + t.join().unwrap(); +} |