summaryrefslogtreecommitdiffstats
path: root/third_party/rust/flate2/tests/tokio.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/flate2/tests/tokio.rs')
-rw-r--r--third_party/rust/flate2/tests/tokio.rs133
1 files changed, 133 insertions, 0 deletions
diff --git a/third_party/rust/flate2/tests/tokio.rs b/third_party/rust/flate2/tests/tokio.rs
new file mode 100644
index 0000000000..0f73646407
--- /dev/null
+++ b/third_party/rust/flate2/tests/tokio.rs
@@ -0,0 +1,133 @@
+#![cfg(feature = "tokio")]
+
+extern crate flate2;
+extern crate futures;
+extern crate rand;
+extern crate tokio_io;
+extern crate tokio_tcp;
+extern crate tokio_threadpool;
+
+use std::io::{Read, Write};
+use std::iter;
+use std::net::{Shutdown, TcpListener};
+use std::thread;
+
+use flate2::read;
+use flate2::write;
+use flate2::Compression;
+use futures::Future;
+use rand::{thread_rng, Rng};
+use tokio_io::io::{copy, shutdown};
+use tokio_io::AsyncRead;
+use tokio_tcp::TcpStream;
+
+#[test]
+fn tcp_stream_echo_pattern() {
+ const N: u8 = 16;
+ const M: usize = 16 * 1024;
+
+ let listener = TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = listener.local_addr().unwrap();
+ let t = thread::spawn(move || {
+ let a = listener.accept().unwrap().0;
+ let b = a.try_clone().unwrap();
+
+ let t = thread::spawn(move || {
+ let mut b = read::DeflateDecoder::new(b);
+ let mut buf = [0; M];
+ for i in 0..N {
+ b.read_exact(&mut buf).unwrap();
+ for byte in buf.iter() {
+ assert_eq!(*byte, i);
+ }
+ }
+
+ assert_eq!(b.read(&mut buf).unwrap(), 0);
+ });
+
+ let mut a = write::ZlibEncoder::new(a, Compression::default());
+ for i in 0..N {
+ let buf = [i; M];
+ a.write_all(&buf).unwrap();
+ }
+ a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
+
+ t.join().unwrap();
+ });
+
+ let stream = TcpStream::connect(&addr);
+ let copy = stream
+ .and_then(|s| {
+ let (a, b) = s.split();
+ let a = read::ZlibDecoder::new(a);
+ let b = write::DeflateEncoder::new(b, Compression::default());
+ copy(a, b)
+ })
+ .then(|result| {
+ let (amt, _a, b) = result.unwrap();
+ assert_eq!(amt, (N as u64) * (M as u64));
+ shutdown(b).map(|_| ())
+ })
+ .map_err(|err| panic!("{}", err));
+
+ let threadpool = tokio_threadpool::Builder::new().build();
+ threadpool.spawn(copy);
+ threadpool.shutdown().wait().unwrap();
+ t.join().unwrap();
+}
+
+#[test]
+fn echo_random() {
+ let v = iter::repeat(())
+ .take(1024 * 1024)
+ .map(|()| thread_rng().gen::<u8>())
+ .collect::<Vec<_>>();
+ let listener = TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = listener.local_addr().unwrap();
+ let v2 = v.clone();
+ let t = thread::spawn(move || {
+ let a = listener.accept().unwrap().0;
+ let b = a.try_clone().unwrap();
+
+ let mut v3 = v2.clone();
+ let t = thread::spawn(move || {
+ let mut b = read::DeflateDecoder::new(b);
+ let mut buf = [0; 1024];
+ while v3.len() > 0 {
+ let n = b.read(&mut buf).unwrap();
+ for (actual, expected) in buf[..n].iter().zip(&v3) {
+ assert_eq!(*actual, *expected);
+ }
+ v3.drain(..n);
+ }
+
+ assert_eq!(b.read(&mut buf).unwrap(), 0);
+ });
+
+ let mut a = write::ZlibEncoder::new(a, Compression::default());
+ a.write_all(&v2).unwrap();
+ a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
+
+ t.join().unwrap();
+ });
+
+ let stream = TcpStream::connect(&addr);
+ let copy = stream
+ .and_then(|s| {
+ let (a, b) = s.split();
+ let a = read::ZlibDecoder::new(a);
+ let b = write::DeflateEncoder::new(b, Compression::default());
+ copy(a, b)
+ })
+ .then(move |result| {
+ let (amt, _a, b) = result.unwrap();
+ assert_eq!(amt, v.len() as u64);
+ shutdown(b).map(|_| ())
+ })
+ .map_err(|err| panic!("{}", err));
+
+ let threadpool = tokio_threadpool::Builder::new().build();
+ threadpool.spawn(copy);
+ threadpool.shutdown().wait().unwrap();
+ t.join().unwrap();
+}