diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.11/tests/global.rs')
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/global.rs | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/tests/global.rs b/third_party/rust/tokio-0.1.11/tests/global.rs new file mode 100644 index 0000000000..d3bc09315a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/global.rs @@ -0,0 +1,136 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use std::{io, thread}; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +use futures::prelude::*; +use tokio::net::{TcpStream, TcpListener}; +use tokio::runtime::Runtime; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn hammer_old() { + let _ = env_logger::try_init(); + + let threads = (0..10).map(|_| { + thread::spawn(|| { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr); + let theirs = srv.incoming().into_future() + .map(|(s, _)| s.unwrap()) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }).collect::<Vec<_>>(); + for thread in threads { + thread.join().unwrap(); + } +} + +struct Rd(Arc<TcpStream>); +struct Wr(Arc<TcpStream>); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd { +} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + const ITER: usize = 10; + + let _ = env_logger::try_init(); + + for _ in 0..ITER { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let cnt2 = cnt.clone(); + + let rd = io::read(rd, vec![0; 1]) + .map(move |_| { + cnt2.fetch_add(1, Relaxed); + }) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(move |_| { + cnt.fetch_add(1, Relaxed); + }) + .map_err(move |e| panic!("write error = {:?}", e)); + + tokio::spawn(rd); + tokio::spawn(wr); + } + + rt.spawn({ + let cnt = cnt.clone(); + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(move |socket| { + split(socket, cnt.clone()); + Ok(()) + }) + }); + + for _ in 0..N { + rt.spawn({ + let cnt = cnt.clone(); + TcpStream::connect(&addr) + .map_err(move |e| panic!("connect error = {:?}", e)) + .map(move |socket| split(socket, cnt)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 4, cnt.load(Relaxed)); + } +} |