summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.22/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-0.1.22/tests
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.22/tests')
-rw-r--r--third_party/rust/tokio-0.1.22/tests/buffered.rs65
-rw-r--r--third_party/rust/tokio-0.1.22/tests/clock.rs64
-rw-r--r--third_party/rust/tokio-0.1.22/tests/drop-core.rs42
-rw-r--r--third_party/rust/tokio-0.1.22/tests/enumerate.rs26
-rw-r--r--third_party/rust/tokio-0.1.22/tests/global.rs141
-rw-r--r--third_party/rust/tokio-0.1.22/tests/length_delimited.rs627
-rw-r--r--third_party/rust/tokio-0.1.22/tests/line-frames.rs90
-rw-r--r--third_party/rust/tokio-0.1.22/tests/pipe-hup.rs103
-rw-r--r--third_party/rust/tokio-0.1.22/tests/reactor.rs91
-rw-r--r--third_party/rust/tokio-0.1.22/tests/runtime.rs532
-rw-r--r--third_party/rust/tokio-0.1.22/tests/timer.rs113
11 files changed, 1894 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/tests/buffered.rs b/third_party/rust/tokio-0.1.22/tests/buffered.rs
new file mode 100644
index 0000000000..45560ad203
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/buffered.rs
@@ -0,0 +1,65 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+
+use std::io::{BufReader, BufWriter, Read, Write};
+use std::net::TcpStream;
+use std::thread;
+
+use futures::stream::Stream;
+use futures::Future;
+use tokio::net::TcpListener;
+use tokio_io::io::copy;
+
+macro_rules! t {
+ ($e:expr) => {
+ match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ }
+ };
+}
+
+#[test]
+fn echo_server() {
+ const N: usize = 1024;
+ drop(env_logger::try_init());
+
+ let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
+ let addr = t!(srv.local_addr());
+
+ let msg = "foo bar baz";
+ let t = thread::spawn(move || {
+ let mut s = t!(TcpStream::connect(&addr));
+
+ let t2 = thread::spawn(move || {
+ let mut s = t!(TcpStream::connect(&addr));
+ let mut b = vec![0; msg.len() * N];
+ t!(s.read_exact(&mut b));
+ b
+ });
+
+ let mut expected = Vec::<u8>::new();
+ for _i in 0..N {
+ expected.extend(msg.as_bytes());
+ assert_eq!(t!(s.write(msg.as_bytes())), msg.len());
+ }
+ (expected, t2)
+ });
+
+ let clients = srv.incoming().take(2).collect();
+ let copied = clients.and_then(|clients| {
+ let mut clients = clients.into_iter();
+ let a = BufReader::new(clients.next().unwrap());
+ let b = BufWriter::new(clients.next().unwrap());
+ copy(a, b)
+ });
+
+ let (amt, _, _) = t!(copied.wait());
+ let (expected, t2) = t.join().unwrap();
+ let actual = t2.join().unwrap();
+
+ assert!(expected == actual);
+ assert_eq!(amt, msg.len() as u64 * 1024);
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/clock.rs b/third_party/rust/tokio-0.1.22/tests/clock.rs
new file mode 100644
index 0000000000..184705aede
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/clock.rs
@@ -0,0 +1,64 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_timer;
+
+use tokio::prelude::*;
+use tokio::runtime::{self, current_thread};
+use tokio::timer::*;
+use tokio_timer::clock::Clock;
+
+use std::sync::mpsc;
+use std::time::{Duration, Instant};
+
+struct MockNow(Instant);
+
+impl tokio_timer::clock::Now for MockNow {
+ fn now(&self) -> Instant {
+ self.0
+ }
+}
+
+#[test]
+fn clock_and_timer_concurrent() {
+ let _ = env_logger::try_init();
+
+ let when = Instant::now() + Duration::from_millis(5_000);
+ let clock = Clock::new_with_now(MockNow(when));
+
+ let mut rt = runtime::Builder::new().clock(clock).build().unwrap();
+
+ let (tx, rx) = mpsc::channel();
+
+ rt.spawn({
+ Delay::new(when)
+ .map_err(|e| panic!("unexpected error; err={:?}", e))
+ .and_then(move |_| {
+ assert!(Instant::now() < when);
+ tx.send(()).unwrap();
+ Ok(())
+ })
+ });
+
+ rx.recv().unwrap();
+}
+
+#[test]
+fn clock_and_timer_single_threaded() {
+ let _ = env_logger::try_init();
+
+ let when = Instant::now() + Duration::from_millis(5_000);
+ let clock = Clock::new_with_now(MockNow(when));
+
+ let mut rt = current_thread::Builder::new().clock(clock).build().unwrap();
+
+ rt.block_on({
+ Delay::new(when)
+ .map_err(|e| panic!("unexpected error; err={:?}", e))
+ .and_then(move |_| {
+ assert!(Instant::now() < when);
+ Ok(())
+ })
+ })
+ .unwrap();
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/drop-core.rs b/third_party/rust/tokio-0.1.22/tests/drop-core.rs
new file mode 100644
index 0000000000..8be0d711d7
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/drop-core.rs
@@ -0,0 +1,42 @@
+extern crate futures;
+extern crate tokio;
+
+use std::net;
+use std::thread;
+
+use futures::future;
+use futures::prelude::*;
+use futures::sync::oneshot;
+use tokio::net::TcpListener;
+use tokio::reactor::Reactor;
+
+#[test]
+fn tcp_doesnt_block() {
+ let core = Reactor::new().unwrap();
+ let handle = core.handle();
+ let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let listener = TcpListener::from_std(listener, &handle).unwrap();
+ drop(core);
+ assert!(listener.incoming().wait().next().unwrap().is_err());
+}
+
+#[test]
+fn drop_wakes() {
+ let core = Reactor::new().unwrap();
+ let handle = core.handle();
+ let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let listener = TcpListener::from_std(listener, &handle).unwrap();
+ let (tx, rx) = oneshot::channel::<()>();
+ let t = thread::spawn(move || {
+ let incoming = listener.incoming();
+ let new_socket = incoming.into_future().map_err(|_| ());
+ let drop_tx = future::lazy(|| {
+ drop(tx);
+ future::ok(())
+ });
+ assert!(new_socket.join(drop_tx).wait().is_err());
+ });
+ drop(rx.wait());
+ drop(core);
+ t.join().unwrap();
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/enumerate.rs b/third_party/rust/tokio-0.1.22/tests/enumerate.rs
new file mode 100644
index 0000000000..c71b7a24c9
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/enumerate.rs
@@ -0,0 +1,26 @@
+extern crate futures;
+extern crate tokio;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+use futures::sync::mpsc;
+use tokio::util::StreamExt;
+
+#[test]
+fn enumerate() {
+ use futures::*;
+
+ let (mut tx, rx) = mpsc::channel(1);
+
+ std::thread::spawn(|| {
+ for i in 0..5 {
+ tx = tx.send(i * 2).wait().unwrap();
+ }
+ });
+
+ let result = rx.enumerate().collect();
+ assert_eq!(
+ result.wait(),
+ Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)])
+ );
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/global.rs b/third_party/rust/tokio-0.1.22/tests/global.rs
new file mode 100644
index 0000000000..1bf45a66f6
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/global.rs
@@ -0,0 +1,141 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::Arc;
+use std::{io, thread};
+
+use futures::prelude::*;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::runtime::Runtime;
+
+macro_rules! t {
+ ($e:expr) => {
+ match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ }
+ };
+}
+
+#[test]
+fn hammer_old() {
+ let _ = env_logger::try_init();
+
+ let threads = (0..10)
+ .map(|_| {
+ thread::spawn(|| {
+ let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(srv.local_addr());
+ let mine = TcpStream::connect(&addr);
+ let theirs = srv
+ .incoming()
+ .into_future()
+ .map(|(s, _)| s.unwrap())
+ .map_err(|(s, _)| s);
+ let (mine, theirs) = t!(mine.join(theirs).wait());
+
+ assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
+ assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
+ })
+ })
+ .collect::<Vec<_>>();
+ for thread in threads {
+ thread.join().unwrap();
+ }
+}
+
+struct Rd(Arc<TcpStream>);
+struct Wr(Arc<TcpStream>);
+
+impl io::Read for Rd {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ <&TcpStream>::read(&mut &*self.0, dst)
+ }
+}
+
+impl tokio_io::AsyncRead for Rd {}
+
+impl io::Write for Wr {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ <&TcpStream>::write(&mut &*self.0, src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl tokio_io::AsyncWrite for Wr {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}
+
+#[test]
+fn hammer_split() {
+ use tokio_io::io;
+
+ const N: usize = 100;
+ const ITER: usize = 10;
+
+ let _ = env_logger::try_init();
+
+ for _ in 0..ITER {
+ let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(srv.local_addr());
+
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let mut rt = Runtime::new().unwrap();
+
+ fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) {
+ let socket = Arc::new(socket);
+ let rd = Rd(socket.clone());
+ let wr = Wr(socket);
+
+ let cnt2 = cnt.clone();
+
+ let rd = io::read(rd, vec![0; 1])
+ .map(move |_| {
+ cnt2.fetch_add(1, Relaxed);
+ })
+ .map_err(|e| panic!("read error = {:?}", e));
+
+ let wr = io::write_all(wr, b"1")
+ .map(move |_| {
+ cnt.fetch_add(1, Relaxed);
+ })
+ .map_err(move |e| panic!("write error = {:?}", e));
+
+ tokio::spawn(rd);
+ tokio::spawn(wr);
+ }
+
+ rt.spawn({
+ let cnt = cnt.clone();
+ srv.incoming()
+ .map_err(|e| panic!("accept error = {:?}", e))
+ .take(N as u64)
+ .for_each(move |socket| {
+ split(socket, cnt.clone());
+ Ok(())
+ })
+ });
+
+ for _ in 0..N {
+ rt.spawn({
+ let cnt = cnt.clone();
+ TcpStream::connect(&addr)
+ .map_err(move |e| panic!("connect error = {:?}", e))
+ .map(move |socket| split(socket, cnt))
+ });
+ }
+
+ rt.shutdown_on_idle().wait().unwrap();
+ assert_eq!(N * 4, cnt.load(Relaxed));
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/length_delimited.rs b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs
new file mode 100644
index 0000000000..f87cfa936d
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs
@@ -0,0 +1,627 @@
+extern crate bytes;
+extern crate futures;
+extern crate tokio;
+
+use tokio::codec::*;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::Async::*;
+use futures::{Poll, Sink, Stream};
+
+use std::collections::VecDeque;
+use std::io;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+#[test]
+fn read_empty_io_yields_nothing() {
+ let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new());
+
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_frame_one_packet() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_frame_one_packet_little_endian() {
+ let mut io = length_delimited::Builder::new()
+ .little_endian()
+ .new_read(mock! {
+ Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_frame_one_packet_native_endian() {
+ let data = if cfg!(target_endian = "big") {
+ b"\x00\x00\x00\x09abcdefghi"
+ } else {
+ b"\x09\x00\x00\x00abcdefghi"
+ };
+ let mut io = length_delimited::Builder::new()
+ .native_endian()
+ .new_read(mock! {
+ Ok(data[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_multi_frame_one_packet() {
+ let mut data: Vec<u8> = vec![];
+ data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
+ data.extend_from_slice(b"\x00\x00\x00\x03123");
+ data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
+
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(data.into()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_frame_multi_packet() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00"[..].into()),
+ Ok(b"\x00\x09abc"[..].into()),
+ Ok(b"defghi"[..].into()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_multi_frame_multi_packet() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00"[..].into()),
+ Ok(b"\x00\x09abc"[..].into()),
+ Ok(b"defghi"[..].into()),
+ Ok(b"\x00\x00\x00\x0312"[..].into()),
+ Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_frame_multi_packet_wait() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00"[..].into()),
+ Err(would_block()),
+ Ok(b"\x00\x09abc"[..].into()),
+ Err(would_block()),
+ Ok(b"defghi"[..].into()),
+ Err(would_block()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_multi_frame_multi_packet_wait() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00"[..].into()),
+ Err(would_block()),
+ Ok(b"\x00\x09abc"[..].into()),
+ Err(would_block()),
+ Ok(b"defghi"[..].into()),
+ Err(would_block()),
+ Ok(b"\x00\x00\x00\x0312"[..].into()),
+ Err(would_block()),
+ Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
+ Err(would_block()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_incomplete_head() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00"[..].into()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert!(io.poll().is_err());
+}
+
+#[test]
+fn read_incomplete_head_multi() {
+ let mut io = FramedRead::new(
+ mock! {
+ Err(would_block()),
+ Ok(b"\x00"[..].into()),
+ Err(would_block()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert!(io.poll().is_err());
+}
+
+#[test]
+fn read_incomplete_payload() {
+ let mut io = FramedRead::new(
+ mock! {
+ Ok(b"\x00\x00\x00\x09ab"[..].into()),
+ Err(would_block()),
+ Ok(b"cd"[..].into()),
+ Err(would_block()),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert_eq!(io.poll().unwrap(), NotReady);
+ assert!(io.poll().is_err());
+}
+
+#[test]
+fn read_max_frame_len() {
+ let mut io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_read(mock! {
+ Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
+}
+
+#[test]
+fn read_update_max_frame_len_at_rest() {
+ let mut io = length_delimited::Builder::new().new_read(mock! {
+ Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
+ Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ io.decoder_mut().set_max_frame_length(5);
+ assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
+}
+
+#[test]
+fn read_update_max_frame_len_in_flight() {
+ let mut io = length_delimited::Builder::new().new_read(mock! {
+ Ok(b"\x00\x00\x00\x09abcd"[..].into()),
+ Err(would_block()),
+ Ok(b"efghi"[..].into()),
+ Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), NotReady);
+ io.decoder_mut().set_max_frame_length(5);
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
+}
+
+#[test]
+fn read_one_byte_length_field() {
+ let mut io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_read(mock! {
+ Ok(b"\x09abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_header_offset() {
+ let mut io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(4)
+ .new_read(mock! {
+ Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_skip_none_adjusted() {
+ let mut data: Vec<u8> = vec![];
+ data.extend_from_slice(b"xx\x00\x09abcdefghi");
+ data.extend_from_slice(b"yy\x00\x03123");
+ data.extend_from_slice(b"zz\x00\x0bhello world");
+
+ let mut io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(2)
+ .num_skip(0)
+ .length_adjustment(4)
+ .new_read(mock! {
+ Ok(data.into()),
+ });
+
+ assert_eq!(
+ io.poll().unwrap(),
+ Ready(Some(b"xx\x00\x09abcdefghi"[..].into()))
+ );
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
+ assert_eq!(
+ io.poll().unwrap(),
+ Ready(Some(b"zz\x00\x0bhello world"[..].into()))
+ );
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_length_includes_head() {
+ let mut data: Vec<u8> = vec![];
+ data.extend_from_slice(b"\x00\x0babcdefghi");
+ data.extend_from_slice(b"\x00\x05123");
+ data.extend_from_slice(b"\x00\x0dhello world");
+
+ let mut io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_adjustment(-2)
+ .new_read(mock! {
+ Ok(data.into()),
+ });
+
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
+ assert_eq!(io.poll().unwrap(), Ready(None));
+}
+
+#[test]
+fn write_single_frame_length_adjusted() {
+ let mut io = length_delimited::Builder::new()
+ .length_adjustment(-2)
+ .new_write(mock! {
+ Ok(b"\x00\x00\x00\x0b"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ });
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_nothing_yields_nothing() {
+ let mut io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
+ assert!(io.poll_complete().unwrap().is_ready());
+}
+
+#[test]
+fn write_single_frame_one_packet() {
+ let mut io = FramedWrite::new(
+ mock! {
+ Ok(b"\x00\x00\x00\x09"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_single_multi_frame_one_packet() {
+ let mut io = FramedWrite::new(
+ mock! {
+ Ok(b"\x00\x00\x00\x09"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(b"\x00\x00\x00\x03"[..].into()),
+ Ok(b"123"[..].into()),
+ Ok(b"\x00\x00\x00\x0b"[..].into()),
+ Ok(b"hello world"[..].into()),
+ Ok(Flush),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
+ assert!(io
+ .start_send(Bytes::from("hello world"))
+ .unwrap()
+ .is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_single_multi_frame_multi_packet() {
+ let mut io = FramedWrite::new(
+ mock! {
+ Ok(b"\x00\x00\x00\x09"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ Ok(b"\x00\x00\x00\x03"[..].into()),
+ Ok(b"123"[..].into()),
+ Ok(Flush),
+ Ok(b"\x00\x00\x00\x0b"[..].into()),
+ Ok(b"hello world"[..].into()),
+ Ok(Flush),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io
+ .start_send(Bytes::from("hello world"))
+ .unwrap()
+ .is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_single_frame_would_block() {
+ let mut io = FramedWrite::new(
+ mock! {
+ Err(would_block()),
+ Ok(b"\x00\x00"[..].into()),
+ Err(would_block()),
+ Ok(b"\x00\x09"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ },
+ LengthDelimitedCodec::new(),
+ );
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(!io.poll_complete().unwrap().is_ready());
+ assert!(!io.poll_complete().unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_single_frame_little_endian() {
+ let mut io = length_delimited::Builder::new()
+ .little_endian()
+ .new_write(mock! {
+ Ok(b"\x09\x00\x00\x00"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ });
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_single_frame_with_short_length_field() {
+ let mut io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_write(mock! {
+ Ok(b"\x09"[..].into()),
+ Ok(b"abcdefghi"[..].into()),
+ Ok(Flush),
+ });
+
+ assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_max_frame_len() {
+ let mut io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_write(mock! {});
+
+ assert_eq!(
+ io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
+ io::ErrorKind::InvalidInput
+ );
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_update_max_frame_len_at_rest() {
+ let mut io = length_delimited::Builder::new().new_write(mock! {
+ Ok(b"\x00\x00\x00\x06"[..].into()),
+ Ok(b"abcdef"[..].into()),
+ Ok(Flush),
+ });
+
+ assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
+ assert!(io.poll_complete().unwrap().is_ready());
+ io.encoder_mut().set_max_frame_length(5);
+ assert_eq!(
+ io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
+ io::ErrorKind::InvalidInput
+ );
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_update_max_frame_len_in_flight() {
+ let mut io = length_delimited::Builder::new().new_write(mock! {
+ Ok(b"\x00\x00\x00\x06"[..].into()),
+ Ok(b"ab"[..].into()),
+ Err(would_block()),
+ Ok(b"cdef"[..].into()),
+ Ok(Flush),
+ });
+
+ assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
+ assert!(!io.poll_complete().unwrap().is_ready());
+ io.encoder_mut().set_max_frame_length(5);
+ assert!(io.poll_complete().unwrap().is_ready());
+ assert_eq!(
+ io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
+ io::ErrorKind::InvalidInput
+ );
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn write_zero() {
+ let mut io = length_delimited::Builder::new().new_write(mock! {});
+
+ assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
+ assert_eq!(
+ io.poll_complete().unwrap_err().kind(),
+ io::ErrorKind::WriteZero
+ );
+ assert!(io.get_ref().calls.is_empty());
+}
+
+#[test]
+fn encode_overflow() {
+ // Test reproducing tokio-rs/tokio#681.
+ let mut codec = length_delimited::Builder::new().new_codec();
+ let mut buf = BytesMut::with_capacity(1024);
+
+ // Put some data into the buffer without resizing it to hold more.
+ let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
+ buf.put_slice(&some_as[..]);
+
+ // Trying to encode the length header should resize the buffer if it won't fit.
+ codec.encode(Bytes::from("hello"), &mut buf).unwrap();
+}
+
+// ===== Test utils =====
+
+fn would_block() -> io::Error {
+ io::Error::new(io::ErrorKind::WouldBlock, "would block")
+}
+
+struct Mock {
+ calls: VecDeque<io::Result<Op>>,
+}
+
+enum Op {
+ Data(Vec<u8>),
+ Flush,
+}
+
+use self::Op::*;
+
+impl io::Read for Mock {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(Op::Data(data))) => {
+ debug_assert!(dst.len() >= data.len());
+ dst[..data.len()].copy_from_slice(&data[..]);
+ Ok(data.len())
+ }
+ Some(Ok(_)) => panic!(),
+ Some(Err(e)) => Err(e),
+ None => Ok(0),
+ }
+ }
+}
+
+impl AsyncRead for Mock {}
+
+impl io::Write for Mock {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(Op::Data(data))) => {
+ let len = data.len();
+ assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
+ assert_eq!(&data[..], &src[..len]);
+ Ok(len)
+ }
+ Some(Ok(_)) => panic!(),
+ Some(Err(e)) => Err(e),
+ None => Ok(0),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ match self.calls.pop_front() {
+ Some(Ok(Op::Flush)) => Ok(()),
+ Some(Ok(_)) => panic!(),
+ Some(Err(e)) => Err(e),
+ None => Ok(()),
+ }
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(Ready(()))
+ }
+}
+
+impl<'a> From<&'a [u8]> for Op {
+ fn from(src: &'a [u8]) -> Op {
+ Op::Data(src.into())
+ }
+}
+
+impl From<Vec<u8>> for Op {
+ fn from(src: Vec<u8>) -> Op {
+ Op::Data(src)
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/line-frames.rs b/third_party/rust/tokio-0.1.22/tests/line-frames.rs
new file mode 100644
index 0000000000..84b860a2a3
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/line-frames.rs
@@ -0,0 +1,90 @@
+extern crate bytes;
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate tokio_threadpool;
+
+use std::io;
+use std::net::Shutdown;
+
+use bytes::{BufMut, BytesMut};
+use futures::{Future, Sink, Stream};
+use tokio::net::{TcpListener, TcpStream};
+use tokio_codec::{Decoder, Encoder};
+use tokio_io::io::{read, write_all};
+use tokio_threadpool::Builder;
+
+pub struct LineCodec;
+
+impl Decoder for LineCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ match buf.iter().position(|&b| b == b'\n') {
+ Some(i) => Ok(Some(buf.split_to(i + 1).into())),
+ None => Ok(None),
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ if buf.len() == 0 {
+ Ok(None)
+ } else {
+ let amt = buf.len();
+ Ok(Some(buf.split_to(amt)))
+ }
+ }
+}
+
+impl Encoder for LineCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> {
+ into.put(&item[..]);
+ Ok(())
+ }
+}
+
+#[test]
+fn echo() {
+ drop(env_logger::try_init());
+
+ let pool = Builder::new().pool_size(1).build();
+
+ let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = listener.local_addr().unwrap();
+ let sender = pool.sender().clone();
+ let srv = listener.incoming().for_each(move |socket| {
+ let (sink, stream) = LineCodec.framed(socket).split();
+ sender
+ .spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ()))
+ .unwrap();
+ Ok(())
+ });
+
+ pool.sender()
+ .spawn(srv.map_err(|e| panic!("srv error: {}", e)))
+ .unwrap();
+
+ let client = TcpStream::connect(&addr);
+ let client = client.wait().unwrap();
+ let (client, _) = write_all(client, b"a\n").wait().unwrap();
+ let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap();
+ assert_eq!(amt, 2);
+ assert_eq!(&buf[..2], b"a\n");
+
+ let (client, _) = write_all(client, b"\n").wait().unwrap();
+ let (client, buf, amt) = read(client, buf).wait().unwrap();
+ assert_eq!(amt, 1);
+ assert_eq!(&buf[..1], b"\n");
+
+ let (client, _) = write_all(client, b"b").wait().unwrap();
+ client.shutdown(Shutdown::Write).unwrap();
+ let (_client, buf, amt) = read(client, buf).wait().unwrap();
+ assert_eq!(amt, 1);
+ assert_eq!(&buf[..1], b"b");
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs
new file mode 100644
index 0000000000..eabdec4c80
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs
@@ -0,0 +1,103 @@
+#![cfg(unix)]
+
+extern crate env_logger;
+extern crate futures;
+extern crate libc;
+extern crate mio;
+extern crate tokio;
+extern crate tokio_io;
+
+use std::fs::File;
+use std::io::{self, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::thread;
+use std::time::Duration;
+
+use futures::Future;
+use mio::event::Evented;
+use mio::unix::{EventedFd, UnixReady};
+use mio::{PollOpt, Ready, Token};
+use tokio::reactor::{Handle, PollEvented2};
+use tokio_io::io::read_to_end;
+
+macro_rules! t {
+ ($e:expr) => {
+ match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ }
+ };
+}
+
+struct MyFile(File);
+
+impl MyFile {
+ fn new(file: File) -> MyFile {
+ unsafe {
+ let r = libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
+ assert!(r != -1, "fcntl error: {}", io::Error::last_os_error());
+ }
+ MyFile(file)
+ }
+}
+
+impl io::Read for MyFile {
+ fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+ self.0.read(bytes)
+ }
+}
+
+impl Evented for MyFile {
+ fn register(
+ &self,
+ poll: &mio::Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ let hup: Ready = UnixReady::hup().into();
+ EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | hup, opts)
+ }
+ fn reregister(
+ &self,
+ poll: &mio::Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ let hup: Ready = UnixReady::hup().into();
+ EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | hup, opts)
+ }
+ fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
+ EventedFd(&self.0.as_raw_fd()).deregister(poll)
+ }
+}
+
+#[test]
+fn hup() {
+ drop(env_logger::try_init());
+
+ let handle = Handle::default();
+ unsafe {
+ let mut pipes = [0; 2];
+ assert!(
+ libc::pipe(pipes.as_mut_ptr()) != -1,
+ "pipe error: {}",
+ io::Error::last_os_error()
+ );
+ let read = File::from_raw_fd(pipes[0]);
+ let mut write = File::from_raw_fd(pipes[1]);
+ let t = thread::spawn(move || {
+ write.write_all(b"Hello!\n").unwrap();
+ write.write_all(b"Good bye!\n").unwrap();
+ thread::sleep(Duration::from_millis(100));
+ });
+
+ let source = PollEvented2::new_with_handle(MyFile::new(read), &handle).unwrap();
+
+ let reader = read_to_end(source, Vec::new());
+ let (_, content) = t!(reader.wait());
+ assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]);
+ t.join().unwrap();
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/reactor.rs b/third_party/rust/tokio-0.1.22/tests/reactor.rs
new file mode 100644
index 0000000000..fd3a8eea69
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/reactor.rs
@@ -0,0 +1,91 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_reactor;
+extern crate tokio_tcp;
+
+use tokio_reactor::Reactor;
+use tokio_tcp::TcpListener;
+
+use futures::executor::{spawn, Notify, Spawn};
+use futures::{Future, Stream};
+
+use std::mem;
+use std::net::TcpStream;
+use std::sync::{Arc, Mutex};
+
+#[test]
+fn test_drop_on_notify() {
+ // When the reactor receives a kernel notification, it notifies the
+ // task that holds the associated socket. If this notification results in
+ // the task being dropped, the socket will also be dropped.
+ //
+ // Previously, there was a deadlock scenario where the reactor, while
+ // notifying, held a lock and the task being dropped attempted to acquire
+ // that same lock in order to clean up state.
+ //
+ // To simulate this case, we create a fake executor that does nothing when
+ // the task is notified. This simulates an executor in the process of
+ // shutting down. Then, when the task handle is dropped, the task itself is
+ // dropped.
+
+ struct MyNotify;
+
+ type Task = Mutex<Spawn<Box<Future<Item = (), Error = ()>>>>;
+
+ impl Notify for MyNotify {
+ fn notify(&self, _: usize) {
+ // Do nothing
+ }
+
+ fn clone_id(&self, id: usize) -> usize {
+ let ptr = id as *const Task;
+ let task = unsafe { Arc::from_raw(ptr) };
+
+ mem::forget(task.clone());
+ mem::forget(task);
+
+ id
+ }
+
+ fn drop_id(&self, id: usize) {
+ let ptr = id as *const Task;
+ let _ = unsafe { Arc::from_raw(ptr) };
+ }
+ }
+
+ let addr = "127.0.0.1:0".parse().unwrap();
+ let mut reactor = Reactor::new().unwrap();
+
+ // Create a listener
+ let listener = TcpListener::bind(&addr).unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ // Define a task that just drains the listener
+ let task = Box::new({
+ listener
+ .incoming()
+ .for_each(|_| Ok(()))
+ .map_err(|_| panic!())
+ }) as Box<Future<Item = (), Error = ()>>;
+
+ let task = Arc::new(Mutex::new(spawn(task)));
+ let notify = Arc::new(MyNotify);
+
+ let mut enter = tokio_executor::enter().unwrap();
+
+ tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| {
+ let id = &*task as *const Task as usize;
+
+ task.lock()
+ .unwrap()
+ .poll_future_notify(&notify, id)
+ .unwrap();
+ });
+
+ drop(task);
+
+ // Establish a connection to the acceptor
+ let _s = TcpStream::connect(&addr).unwrap();
+
+ reactor.turn(None).unwrap();
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/runtime.rs b/third_party/rust/tokio-0.1.22/tests/runtime.rs
new file mode 100644
index 0000000000..f84c66738b
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/runtime.rs
@@ -0,0 +1,532 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+
+use futures::sync::oneshot;
+use std::sync::{atomic, Arc, Mutex};
+use std::thread;
+use tokio::io;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::future::lazy;
+use tokio::prelude::*;
+use tokio::runtime::Runtime;
+
+// this import is used in all child modules that have it in scope
+// from importing super::*, but the compiler doesn't realise that
+// and warns about it.
+pub use futures::future::Executor;
+
+macro_rules! t {
+ ($e:expr) => {
+ match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ }
+ };
+}
+
+fn create_client_server_future() -> Box<Future<Item = (), Error = ()> + Send> {
+ let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(server.local_addr());
+ let client = TcpStream::connect(&addr);
+
+ let server = server
+ .incoming()
+ .take(1)
+ .map_err(|e| panic!("accept err = {:?}", e))
+ .for_each(|socket| {
+ tokio::spawn({
+ io::write_all(socket, b"hello")
+ .map(|_| ())
+ .map_err(|e| panic!("write err = {:?}", e))
+ })
+ })
+ .map(|_| ());
+
+ let client = client
+ .map_err(|e| panic!("connect err = {:?}", e))
+ .and_then(|client| {
+ // Read all
+ io::read_to_end(client, vec![])
+ .map(|_| ())
+ .map_err(|e| panic!("read err = {:?}", e))
+ });
+
+ let future = server.join(client).map(|_| ());
+ Box::new(future)
+}
+
+#[test]
+fn runtime_tokio_run() {
+ let _ = env_logger::try_init();
+
+ tokio::run(create_client_server_future());
+}
+
+#[test]
+fn runtime_single_threaded() {
+ let _ = env_logger::try_init();
+
+ let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
+ runtime.block_on(create_client_server_future()).unwrap();
+ runtime.run().unwrap();
+}
+
+#[test]
+fn runtime_single_threaded_block_on() {
+ let _ = env_logger::try_init();
+
+ tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap();
+}
+
+mod runtime_single_threaded_block_on_all {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let msg = tokio::runtime::current_thread::block_on_all(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+}
+
+mod runtime_single_threaded_racy {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(tokio::runtime::current_thread::Handle, Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let (trigger, exit) = futures::sync::oneshot::channel();
+ let (handle_tx, handle_rx) = ::std::sync::mpsc::channel();
+ let jh = ::std::thread::spawn(move || {
+ let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
+ handle_tx.send(rt.handle()).unwrap();
+
+ // don't exit until we are told to
+ rt.block_on(exit.map_err(|_| ())).unwrap();
+
+ // run until all spawned futures (incl. the "exit" signal future) have completed.
+ rt.run().unwrap();
+ });
+
+ let (tx, rx) = futures::sync::oneshot::channel();
+
+ let handle = handle_rx.recv().unwrap();
+ spawn(
+ handle,
+ Box::new(futures::future::lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ })),
+ );
+
+ // signal runtime thread to exit
+ trigger.send(()).unwrap();
+
+ // wait for runtime thread to exit
+ jh.join().unwrap();
+
+ assert_eq!(rx.wait().unwrap(), ());
+ }
+
+ #[test]
+ fn spawn() {
+ test(|handle, f| {
+ handle.spawn(f).unwrap();
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|handle, f| {
+ handle.execute(f).unwrap();
+ })
+ }
+}
+
+mod runtime_multi_threaded {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime) + Send + 'static,
+ {
+ let _ = env_logger::try_init();
+
+ let mut runtime = tokio::runtime::Builder::new().build().unwrap();
+ spawn(&mut runtime);
+ runtime.shutdown_on_idle().wait().unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(|rt| {
+ rt.spawn(create_client_server_future());
+ });
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt| {
+ rt.executor()
+ .execute(create_client_server_future())
+ .unwrap();
+ });
+ }
+}
+
+#[test]
+fn block_on_timer() {
+ use std::time::{Duration, Instant};
+ use tokio::timer::{Delay, Error};
+
+ fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send>
+ where
+ T: Send + 'static,
+ {
+ Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x))
+ }
+
+ let mut runtime = Runtime::new().unwrap();
+ assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42);
+ runtime.shutdown_on_idle().wait().unwrap();
+}
+
+mod from_block_on {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let mut runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+}
+
+#[test]
+fn block_waits() {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(|| {
+ use std::time::Duration;
+ thread::sleep(Duration::from_millis(1000));
+ tx.send(()).unwrap();
+ });
+
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let mut runtime = Runtime::new().unwrap();
+ runtime
+ .block_on(rx.then(move |_| {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<_, ()>(())
+ }))
+ .unwrap();
+
+ assert_eq!(1, *cnt.lock().unwrap());
+ runtime.shutdown_on_idle().wait().unwrap();
+}
+
+mod many {
+ use super::*;
+
+ const ITER: usize = 200;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime, Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let mut runtime = Runtime::new().unwrap();
+
+ for _ in 0..ITER {
+ let c = cnt.clone();
+ spawn(
+ &mut runtime,
+ Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })),
+ );
+ }
+
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(ITER, *cnt.lock().unwrap());
+ }
+
+ #[test]
+ fn spawn() {
+ test(|rt, f| {
+ rt.spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt, f| {
+ rt.executor().execute(f).unwrap();
+ })
+ }
+}
+
+mod from_block_on_all {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on_all(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+}
+
+mod nested_enter {
+ use super::*;
+ use std::panic;
+ use tokio::runtime::current_thread;
+
+ fn test<F1, F2>(first: F1, nested: F2)
+ where
+ F1: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ F2: Fn(Box<Future<Item = (), Error = ()> + Send>) + panic::UnwindSafe + Send + 'static,
+ {
+ let panicked = Arc::new(Mutex::new(false));
+ let panicked2 = panicked.clone();
+
+ // Since this is testing panics in other threads, printing about panics
+ // is noisy and can give the impression that the test is ignoring panics.
+ //
+ // It *is* ignoring them, but on purpose.
+ let prev_hook = panic::take_hook();
+ panic::set_hook(Box::new(|info| {
+ let s = info.to_string();
+ if s.starts_with("panicked at 'nested ")
+ || s.starts_with("panicked at 'Multiple executors at once")
+ {
+ // expected, noop
+ } else {
+ println!("{}", s);
+ }
+ }));
+
+ first(Box::new(lazy(move || {
+ panic::catch_unwind(move || nested(Box::new(lazy(|| Ok::<(), ()>(())))))
+ .expect_err("nested should panic");
+ *panicked2.lock().unwrap() = true;
+ Ok::<(), ()>(())
+ })));
+
+ panic::set_hook(prev_hook);
+
+ assert!(
+ *panicked.lock().unwrap(),
+ "nested call should have panicked"
+ );
+ }
+
+ fn threadpool_new() -> Runtime {
+ Runtime::new().expect("rt new")
+ }
+
+ #[test]
+ fn run_in_run() {
+ test(tokio::run, tokio::run);
+ }
+
+ #[test]
+ fn threadpool_block_on_in_run() {
+ test(tokio::run, |fut| {
+ let mut rt = threadpool_new();
+ rt.block_on(fut).unwrap();
+ });
+ }
+
+ #[test]
+ fn threadpool_block_on_all_in_run() {
+ test(tokio::run, |fut| {
+ let rt = threadpool_new();
+ rt.block_on_all(fut).unwrap();
+ });
+ }
+
+ #[test]
+ fn current_thread_block_on_all_in_run() {
+ test(tokio::run, |fut| {
+ current_thread::block_on_all(fut).unwrap();
+ });
+ }
+}
+
+#[test]
+fn runtime_reactor_handle() {
+ #![allow(deprecated)]
+
+ use futures::Stream;
+ use std::net::{TcpListener as StdListener, TcpStream as StdStream};
+
+ let rt = Runtime::new().unwrap();
+
+ let std_listener = StdListener::bind("127.0.0.1:0").unwrap();
+ let tk_listener = TcpListener::from_std(std_listener, rt.handle()).unwrap();
+
+ let addr = tk_listener.local_addr().unwrap();
+
+ // Spawn a thread since we are avoiding the runtime
+ let th = thread::spawn(|| for _ in tk_listener.incoming().take(1).wait() {});
+
+ let _ = StdStream::connect(&addr).unwrap();
+
+ th.join().unwrap();
+}
+
+#[test]
+fn after_start_and_before_stop_is_called() {
+ let _ = env_logger::try_init();
+
+ let after_start = Arc::new(atomic::AtomicUsize::new(0));
+ let before_stop = Arc::new(atomic::AtomicUsize::new(0));
+
+ let after_inner = after_start.clone();
+ let before_inner = before_stop.clone();
+ let runtime = tokio::runtime::Builder::new()
+ .after_start(move || {
+ after_inner.clone().fetch_add(1, atomic::Ordering::Relaxed);
+ })
+ .before_stop(move || {
+ before_inner.clone().fetch_add(1, atomic::Ordering::Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ runtime.block_on_all(create_client_server_future()).unwrap();
+
+ assert!(after_start.load(atomic::Ordering::Relaxed) > 0);
+ assert!(before_stop.load(atomic::Ordering::Relaxed) > 0);
+}
diff --git a/third_party/rust/tokio-0.1.22/tests/timer.rs b/third_party/rust/tokio-0.1.22/tests/timer.rs
new file mode 100644
index 0000000000..54c3b9d31d
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/timer.rs
@@ -0,0 +1,113 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+
+use tokio::prelude::*;
+use tokio::timer::*;
+
+use std::sync::mpsc;
+use std::time::{Duration, Instant};
+
+#[test]
+fn timer_with_runtime() {
+ let _ = env_logger::try_init();
+
+ let when = Instant::now() + Duration::from_millis(100);
+ let (tx, rx) = mpsc::channel();
+
+ tokio::run({
+ Delay::new(when)
+ .map_err(|e| panic!("unexpected error; err={:?}", e))
+ .and_then(move |_| {
+ assert!(Instant::now() >= when);
+ tx.send(()).unwrap();
+ Ok(())
+ })
+ });
+
+ rx.recv().unwrap();
+}
+
+#[test]
+fn starving() {
+ use futures::{task, Async, Poll};
+
+ let _ = env_logger::try_init();
+
+ struct Starve(Delay, u64);
+
+ impl Future for Starve {
+ type Item = u64;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, ()> {
+ if self.0.poll().unwrap().is_ready() {
+ return Ok(self.1.into());
+ }
+
+ self.1 += 1;
+
+ task::current().notify();
+
+ Ok(Async::NotReady)
+ }
+ }
+
+ let when = Instant::now() + Duration::from_millis(20);
+ let starve = Starve(Delay::new(when), 0);
+
+ let (tx, rx) = mpsc::channel();
+
+ tokio::run({
+ starve.and_then(move |_ticks| {
+ assert!(Instant::now() >= when);
+ tx.send(()).unwrap();
+ Ok(())
+ })
+ });
+
+ rx.recv().unwrap();
+}
+
+#[test]
+fn deadline() {
+ use futures::future;
+
+ let _ = env_logger::try_init();
+
+ let when = Instant::now() + Duration::from_millis(20);
+ let (tx, rx) = mpsc::channel();
+
+ #[allow(deprecated)]
+ tokio::run({
+ future::empty::<(), ()>().deadline(when).then(move |res| {
+ assert!(res.is_err());
+ tx.send(()).unwrap();
+ Ok(())
+ })
+ });
+
+ rx.recv().unwrap();
+}
+
+#[test]
+fn timeout() {
+ use futures::future;
+
+ let _ = env_logger::try_init();
+
+ let (tx, rx) = mpsc::channel();
+
+ tokio::run({
+ future::empty::<(), ()>()
+ .timeout(Duration::from_millis(20))
+ .then(move |res| {
+ assert!(res.is_err());
+ tx.send(()).unwrap();
+ Ok(())
+ })
+ });
+
+ rx.recv().unwrap();
+}