summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/tests/global.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/tests/global.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/tests/global.rs136
1 files changed, 136 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/tests/global.rs b/third_party/rust/tokio-0.1.11/tests/global.rs
new file mode 100644
index 0000000000..d3bc09315a
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/tests/global.rs
@@ -0,0 +1,136 @@
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+extern crate env_logger;
+
+use std::{io, thread};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::Relaxed;
+
+use futures::prelude::*;
+use tokio::net::{TcpStream, TcpListener};
+use tokio::runtime::Runtime;
+
+macro_rules! t {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ })
+}
+
+#[test]
+fn hammer_old() {
+ let _ = env_logger::try_init();
+
+ let threads = (0..10).map(|_| {
+ thread::spawn(|| {
+ let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(srv.local_addr());
+ let mine = TcpStream::connect(&addr);
+ let theirs = srv.incoming().into_future()
+ .map(|(s, _)| s.unwrap())
+ .map_err(|(s, _)| s);
+ let (mine, theirs) = t!(mine.join(theirs).wait());
+
+ assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
+ assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
+ })
+ }).collect::<Vec<_>>();
+ for thread in threads {
+ thread.join().unwrap();
+ }
+}
+
+struct Rd(Arc<TcpStream>);
+struct Wr(Arc<TcpStream>);
+
+impl io::Read for Rd {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ <&TcpStream>::read(&mut &*self.0, dst)
+ }
+}
+
+impl tokio_io::AsyncRead for Rd {
+}
+
+impl io::Write for Wr {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ <&TcpStream>::write(&mut &*self.0, src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl tokio_io::AsyncWrite for Wr {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}
+
+#[test]
+fn hammer_split() {
+ use tokio_io::io;
+
+ const N: usize = 100;
+ const ITER: usize = 10;
+
+ let _ = env_logger::try_init();
+
+ for _ in 0..ITER {
+ let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(srv.local_addr());
+
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let mut rt = Runtime::new().unwrap();
+
+ fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) {
+ let socket = Arc::new(socket);
+ let rd = Rd(socket.clone());
+ let wr = Wr(socket);
+
+ let cnt2 = cnt.clone();
+
+ let rd = io::read(rd, vec![0; 1])
+ .map(move |_| {
+ cnt2.fetch_add(1, Relaxed);
+ })
+ .map_err(|e| panic!("read error = {:?}", e));
+
+ let wr = io::write_all(wr, b"1")
+ .map(move |_| {
+ cnt.fetch_add(1, Relaxed);
+ })
+ .map_err(move |e| panic!("write error = {:?}", e));
+
+ tokio::spawn(rd);
+ tokio::spawn(wr);
+ }
+
+ rt.spawn({
+ let cnt = cnt.clone();
+ srv.incoming()
+ .map_err(|e| panic!("accept error = {:?}", e))
+ .take(N as u64)
+ .for_each(move |socket| {
+ split(socket, cnt.clone());
+ Ok(())
+ })
+ });
+
+ for _ in 0..N {
+ rt.spawn({
+ let cnt = cnt.clone();
+ TcpStream::connect(&addr)
+ .map_err(move |e| panic!("connect error = {:?}", e))
+ .map(move |socket| split(socket, cnt))
+ });
+ }
+
+ rt.shutdown_on_idle().wait().unwrap();
+ assert_eq!(N * 4, cnt.load(Relaxed));
+ }
+}