diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/buffered.rs | 65 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/clock.rs | 64 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/drop-core.rs | 42 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/enumerate.rs | 26 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/global.rs | 141 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/length_delimited.rs | 627 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/line-frames.rs | 90 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/pipe-hup.rs | 103 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/reactor.rs | 91 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/runtime.rs | 532 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/timer.rs | 113 |
11 files changed, 1894 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/tests/buffered.rs b/third_party/rust/tokio-0.1.22/tests/buffered.rs new file mode 100644 index 0000000000..45560ad203 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/buffered.rs @@ -0,0 +1,65 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::io::{BufReader, BufWriter, Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::stream::Stream; +use futures::Future; +use tokio::net::TcpListener; +use tokio_io::io::copy; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +#[test] +fn echo_server() { + const N: usize = 1024; + drop(env_logger::try_init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + + let t2 = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + let mut b = vec![0; msg.len() * N]; + t!(s.read_exact(&mut b)); + b + }); + + let mut expected = Vec::<u8>::new(); + for _i in 0..N { + expected.extend(msg.as_bytes()); + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + } + (expected, t2) + }); + + let clients = srv.incoming().take(2).collect(); + let copied = clients.and_then(|clients| { + let mut clients = clients.into_iter(); + let a = BufReader::new(clients.next().unwrap()); + let b = BufWriter::new(clients.next().unwrap()); + copy(a, b) + }); + + let (amt, _, _) = t!(copied.wait()); + let (expected, t2) = t.join().unwrap(); + let actual = t2.join().unwrap(); + + assert!(expected == actual); + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-0.1.22/tests/clock.rs b/third_party/rust/tokio-0.1.22/tests/clock.rs new file mode 100644 index 0000000000..184705aede --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/clock.rs @@ -0,0 +1,64 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_timer; + +use tokio::prelude::*; +use tokio::runtime::{self, current_thread}; +use tokio::timer::*; +use tokio_timer::clock::Clock; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +struct MockNow(Instant); + +impl tokio_timer::clock::Now for MockNow { + fn now(&self) -> Instant { + self.0 + } +} + +#[test] +fn clock_and_timer_concurrent() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = runtime::Builder::new().clock(clock).build().unwrap(); + + let (tx, rx) = mpsc::channel(); + + rt.spawn({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn clock_and_timer_single_threaded() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = current_thread::Builder::new().clock(clock).build().unwrap(); + + rt.block_on({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + Ok(()) + }) + }) + .unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/drop-core.rs b/third_party/rust/tokio-0.1.22/tests/drop-core.rs new file mode 100644 index 0000000000..8be0d711d7 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/drop-core.rs @@ -0,0 +1,42 @@ +extern crate futures; +extern crate tokio; + +use std::net; +use std::thread; + +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use tokio::net::TcpListener; +use tokio::reactor::Reactor; + +#[test] +fn tcp_doesnt_block() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + drop(core); + assert!(listener.incoming().wait().next().unwrap().is_err()); +} + +#[test] +fn drop_wakes() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + let (tx, rx) = oneshot::channel::<()>(); + let t = thread::spawn(move || { + let incoming = listener.incoming(); + let new_socket = incoming.into_future().map_err(|_| ()); + let drop_tx = future::lazy(|| { + drop(tx); + future::ok(()) + }); + assert!(new_socket.join(drop_tx).wait().is_err()); + }); + drop(rx.wait()); + drop(core); + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/enumerate.rs b/third_party/rust/tokio-0.1.22/tests/enumerate.rs new file mode 100644 index 0000000000..c71b7a24c9 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/enumerate.rs @@ -0,0 +1,26 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_executor; +extern crate tokio_timer; + +use futures::sync::mpsc; +use tokio::util::StreamExt; + +#[test] +fn enumerate() { + use futures::*; + + let (mut tx, rx) = mpsc::channel(1); + + std::thread::spawn(|| { + for i in 0..5 { + tx = tx.send(i * 2).wait().unwrap(); + } + }); + + let result = rx.enumerate().collect(); + assert_eq!( + result.wait(), + Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)]) + ); +} diff --git a/third_party/rust/tokio-0.1.22/tests/global.rs b/third_party/rust/tokio-0.1.22/tests/global.rs new file mode 100644 index 0000000000..1bf45a66f6 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/global.rs @@ -0,0 +1,141 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::{io, thread}; + +use futures::prelude::*; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::Runtime; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +#[test] +fn hammer_old() { + let _ = env_logger::try_init(); + + let threads = (0..10) + .map(|_| { + thread::spawn(|| { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr); + let theirs = srv + .incoming() + .into_future() + .map(|(s, _)| s.unwrap()) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }) + .collect::<Vec<_>>(); + for thread in threads { + thread.join().unwrap(); + } +} + +struct Rd(Arc<TcpStream>); +struct Wr(Arc<TcpStream>); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd {} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + const ITER: usize = 10; + + let _ = env_logger::try_init(); + + for _ in 0..ITER { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let cnt2 = cnt.clone(); + + let rd = io::read(rd, vec![0; 1]) + .map(move |_| { + cnt2.fetch_add(1, Relaxed); + }) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(move |_| { + cnt.fetch_add(1, Relaxed); + }) + .map_err(move |e| panic!("write error = {:?}", e)); + + tokio::spawn(rd); + tokio::spawn(wr); + } + + rt.spawn({ + let cnt = cnt.clone(); + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(move |socket| { + split(socket, cnt.clone()); + Ok(()) + }) + }); + + for _ in 0..N { + rt.spawn({ + let cnt = cnt.clone(); + TcpStream::connect(&addr) + .map_err(move |e| panic!("connect error = {:?}", e)) + .map(move |socket| split(socket, cnt)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 4, cnt.load(Relaxed)); + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/length_delimited.rs b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs new file mode 100644 index 0000000000..f87cfa936d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs @@ -0,0 +1,627 @@ +extern crate bytes; +extern crate futures; +extern crate tokio; + +use tokio::codec::*; +use tokio::io::{AsyncRead, AsyncWrite}; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::Async::*; +use futures::{Poll, Sink, Stream}; + +use std::collections::VecDeque; +use std::io; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +#[test] +fn read_empty_io_yields_nothing() { + let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_read(mock! { + Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_native_endian() { + let data = if cfg!(target_endian = "big") { + b"\x00\x00\x00\x09abcdefghi" + } else { + b"\x09\x00\x00\x00abcdefghi" + }; + let mut io = length_delimited::Builder::new() + .native_endian() + .new_read(mock! { + Ok(data[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + data.extend_from_slice(b"\x00\x00\x00\x03123"); + data.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let mut io = FramedRead::new( + mock! { + Ok(data.into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet_wait() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet_wait() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Err(would_block()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_incomplete_head() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_head_multi() { + let mut io = FramedRead::new( + mock! { + Err(would_block()), + Ok(b"\x00"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_payload() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00\x00\x09ab"[..].into()), + Err(would_block()), + Ok(b"cd"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcd"[..].into()), + Err(would_block()), + Ok(b"efghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_one_byte_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_read(mock! { + Ok(b"\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_header_offset() { + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(4) + .new_read(mock! { + Ok(b"zzzz\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_skip_none_adjusted() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"xx\x00\x09abcdefghi"); + data.extend_from_slice(b"yy\x00\x03123"); + data.extend_from_slice(b"zz\x00\x0bhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(2) + .num_skip(0) + .length_adjustment(4) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"xx\x00\x09abcdefghi"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into()))); + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"zz\x00\x0bhello world"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_length_includes_head() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x0babcdefghi"); + data.extend_from_slice(b"\x00\x05123"); + data.extend_from_slice(b"\x00\x0dhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_adjustment(-2) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn write_single_frame_length_adjusted() { + let mut io = length_delimited::Builder::new() + .length_adjustment(-2) + .new_write(mock! { + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_nothing_yields_nothing() { + let mut io = FramedWrite::new(mock!(), LengthDelimitedCodec::new()); + assert!(io.poll_complete().unwrap().is_ready()); +} + +#[test] +fn write_single_frame_one_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_one_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io + .start_send(Bytes::from("hello world")) + .unwrap() + .is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_multi_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io + .start_send(Bytes::from("hello world")) + .unwrap() + .is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_would_block() { + let mut io = FramedWrite::new( + mock! { + Err(would_block()), + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_write(mock! { + Ok(b"\x09\x00\x00\x00"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_with_short_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_write(mock! { + Ok(b"\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_write(mock! {}); + + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"abcdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"ab"[..].into()), + Err(would_block()), + Ok(b"cdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert!(io.poll_complete().unwrap().is_ready()); + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_zero() { + let mut io = length_delimited::Builder::new().new_write(mock! {}); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert_eq!( + io.poll_complete().unwrap_err().kind(), + io::ErrorKind::WriteZero + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn encode_overflow() { + // Test reproducing tokio-rs/tokio#681. + let mut codec = length_delimited::Builder::new().new_codec(); + let mut buf = BytesMut::with_capacity(1024); + + // Put some data into the buffer without resizing it to hold more. + let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>(); + buf.put_slice(&some_as[..]); + + // Trying to encode the length header should resize the buffer if it won't fit. + codec.encode(Bytes::from("hello"), &mut buf).unwrap(); +} + +// ===== Test utils ===== + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +struct Mock { + calls: VecDeque<io::Result<Op>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl io::Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock {} + +impl io::Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ok(len) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front() { + Some(Ok(Op::Flush)) => Ok(()), + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(()), + } + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Ready(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/line-frames.rs b/third_party/rust/tokio-0.1.22/tests/line-frames.rs new file mode 100644 index 0000000000..84b860a2a3 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/line-frames.rs @@ -0,0 +1,90 @@ +extern crate bytes; +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_threadpool; + +use std::io; +use std::net::Shutdown; + +use bytes::{BufMut, BytesMut}; +use futures::{Future, Sink, Stream}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_codec::{Decoder, Encoder}; +use tokio_io::io::{read, write_all}; +use tokio_threadpool::Builder; + +pub struct LineCodec; + +impl Decoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf.split_to(i + 1).into())), + None => Ok(None), + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() == 0 { + Ok(None) + } else { + let amt = buf.len(); + Ok(Some(buf.split_to(amt))) + } + } +} + +impl Encoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> { + into.put(&item[..]); + Ok(()) + } +} + +#[test] +fn echo() { + drop(env_logger::try_init()); + + let pool = Builder::new().pool_size(1).build(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let sender = pool.sender().clone(); + let srv = listener.incoming().for_each(move |socket| { + let (sink, stream) = LineCodec.framed(socket).split(); + sender + .spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())) + .unwrap(); + Ok(()) + }); + + pool.sender() + .spawn(srv.map_err(|e| panic!("srv error: {}", e))) + .unwrap(); + + let client = TcpStream::connect(&addr); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + assert_eq!(amt, 2); + assert_eq!(&buf[..2], b"a\n"); + + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"\n"); + + let (client, _) = write_all(client, b"b").wait().unwrap(); + client.shutdown(Shutdown::Write).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"b"); +} diff --git a/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs new file mode 100644 index 0000000000..eabdec4c80 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs @@ -0,0 +1,103 @@ +#![cfg(unix)] + +extern crate env_logger; +extern crate futures; +extern crate libc; +extern crate mio; +extern crate tokio; +extern crate tokio_io; + +use std::fs::File; +use std::io::{self, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::thread; +use std::time::Duration; + +use futures::Future; +use mio::event::Evented; +use mio::unix::{EventedFd, UnixReady}; +use mio::{PollOpt, Ready, Token}; +use tokio::reactor::{Handle, PollEvented2}; +use tokio_io::io::read_to_end; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +struct MyFile(File); + +impl MyFile { + fn new(file: File) -> MyFile { + unsafe { + let r = libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + assert!(r != -1, "fcntl error: {}", io::Error::last_os_error()); + } + MyFile(file) + } +} + +impl io::Read for MyFile { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + self.0.read(bytes) + } +} + +impl Evented for MyFile { + fn register( + &self, + poll: &mio::Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | hup, opts) + } + fn reregister( + &self, + poll: &mio::Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | hup, opts) + } + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + EventedFd(&self.0.as_raw_fd()).deregister(poll) + } +} + +#[test] +fn hup() { + drop(env_logger::try_init()); + + let handle = Handle::default(); + unsafe { + let mut pipes = [0; 2]; + assert!( + libc::pipe(pipes.as_mut_ptr()) != -1, + "pipe error: {}", + io::Error::last_os_error() + ); + let read = File::from_raw_fd(pipes[0]); + let mut write = File::from_raw_fd(pipes[1]); + let t = thread::spawn(move || { + write.write_all(b"Hello!\n").unwrap(); + write.write_all(b"Good bye!\n").unwrap(); + thread::sleep(Duration::from_millis(100)); + }); + + let source = PollEvented2::new_with_handle(MyFile::new(read), &handle).unwrap(); + + let reader = read_to_end(source, Vec::new()); + let (_, content) = t!(reader.wait()); + assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); + t.join().unwrap(); + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/reactor.rs b/third_party/rust/tokio-0.1.22/tests/reactor.rs new file mode 100644 index 0000000000..fd3a8eea69 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/reactor.rs @@ -0,0 +1,91 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_tcp; + +use tokio_reactor::Reactor; +use tokio_tcp::TcpListener; + +use futures::executor::{spawn, Notify, Spawn}; +use futures::{Future, Stream}; + +use std::mem; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + struct MyNotify; + + type Task = Mutex<Spawn<Box<Future<Item = (), Error = ()>>>>; + + impl Notify for MyNotify { + fn notify(&self, _: usize) { + // Do nothing + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + let task = unsafe { Arc::from_raw(ptr) }; + + mem::forget(task.clone()); + mem::forget(task); + + id + } + + fn drop_id(&self, id: usize) { + let ptr = id as *const Task; + let _ = unsafe { Arc::from_raw(ptr) }; + } + } + + let addr = "127.0.0.1:0".parse().unwrap(); + let mut reactor = Reactor::new().unwrap(); + + // Create a listener + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Define a task that just drains the listener + let task = Box::new({ + listener + .incoming() + .for_each(|_| Ok(())) + .map_err(|_| panic!()) + }) as Box<Future<Item = (), Error = ()>>; + + let task = Arc::new(Mutex::new(spawn(task))); + let notify = Arc::new(MyNotify); + + let mut enter = tokio_executor::enter().unwrap(); + + tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| { + let id = &*task as *const Task as usize; + + task.lock() + .unwrap() + .poll_future_notify(¬ify, id) + .unwrap(); + }); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + reactor.turn(None).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/runtime.rs b/third_party/rust/tokio-0.1.22/tests/runtime.rs new file mode 100644 index 0000000000..f84c66738b --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/runtime.rs @@ -0,0 +1,532 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; + +use futures::sync::oneshot; +use std::sync::{atomic, Arc, Mutex}; +use std::thread; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::future::lazy; +use tokio::prelude::*; +use tokio::runtime::Runtime; + +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +fn create_client_server_future() -> Box<Future<Item = (), Error = ()> + Send> { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server + .incoming() + .take(1) + .map_err(|e| panic!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| ()) + .map_err(|e| panic!("write err = {:?}", e)) + }) + }) + .map(|_| ()); + + let client = client + .map_err(|e| panic!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| ()) + .map_err(|e| panic!("read err = {:?}", e)) + }); + + let future = server.join(client).map(|_| ()); + Box::new(future) +} + +#[test] +fn runtime_tokio_run() { + let _ = env_logger::try_init(); + + tokio::run(create_client_server_future()); +} + +#[test] +fn runtime_single_threaded() { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(create_client_server_future()).unwrap(); + runtime.run().unwrap(); +} + +#[test] +fn runtime_single_threaded_block_on() { + let _ = env_logger::try_init(); + + tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); +} + +mod runtime_single_threaded_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(tokio::runtime::current_thread::Handle, Box<Future<Item = (), Error = ()> + Send>), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); + + let (tx, rx) = futures::sync::oneshot::channel(); + + let handle = handle_rx.recv().unwrap(); + spawn( + handle, + Box::new(futures::future::lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })), + ); + + // signal runtime thread to exit + trigger.send(()).unwrap(); + + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } + + #[test] + fn spawn() { + test(|handle, f| { + handle.spawn(f).unwrap(); + }) + } + + #[test] + fn execute() { + test(|handle, f| { + handle.execute(f).unwrap(); + }) + } +} + +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::Builder::new().build().unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { + rt.spawn(create_client_server_future()); + }); + } + + #[test] + fn execute() { + test(|rt| { + rt.executor() + .execute(create_client_server_future()) + .unwrap(); + }); + } +} + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod from_block_on { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod many { + use super::*; + + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + spawn( + &mut runtime, + Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + })), + ); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); + } + + #[test] + fn spawn() { + test(|rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor().execute(f).unwrap(); + }) + } +} + +mod from_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +mod nested_enter { + use super::*; + use std::panic; + use tokio::runtime::current_thread; + + fn test<F1, F2>(first: F1, nested: F2) + where + F1: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + F2: Fn(Box<Future<Item = (), Error = ()> + Send>) + panic::UnwindSafe + Send + 'static, + { + let panicked = Arc::new(Mutex::new(false)); + let panicked2 = panicked.clone(); + + // Since this is testing panics in other threads, printing about panics + // is noisy and can give the impression that the test is ignoring panics. + // + // It *is* ignoring them, but on purpose. + let prev_hook = panic::take_hook(); + panic::set_hook(Box::new(|info| { + let s = info.to_string(); + if s.starts_with("panicked at 'nested ") + || s.starts_with("panicked at 'Multiple executors at once") + { + // expected, noop + } else { + println!("{}", s); + } + })); + + first(Box::new(lazy(move || { + panic::catch_unwind(move || nested(Box::new(lazy(|| Ok::<(), ()>(()))))) + .expect_err("nested should panic"); + *panicked2.lock().unwrap() = true; + Ok::<(), ()>(()) + }))); + + panic::set_hook(prev_hook); + + assert!( + *panicked.lock().unwrap(), + "nested call should have panicked" + ); + } + + fn threadpool_new() -> Runtime { + Runtime::new().expect("rt new") + } + + #[test] + fn run_in_run() { + test(tokio::run, tokio::run); + } + + #[test] + fn threadpool_block_on_in_run() { + test(tokio::run, |fut| { + let mut rt = threadpool_new(); + rt.block_on(fut).unwrap(); + }); + } + + #[test] + fn threadpool_block_on_all_in_run() { + test(tokio::run, |fut| { + let rt = threadpool_new(); + rt.block_on_all(fut).unwrap(); + }); + } + + #[test] + fn current_thread_block_on_all_in_run() { + test(tokio::run, |fut| { + current_thread::block_on_all(fut).unwrap(); + }); + } +} + +#[test] +fn runtime_reactor_handle() { + #![allow(deprecated)] + + use futures::Stream; + use std::net::{TcpListener as StdListener, TcpStream as StdStream}; + + let rt = Runtime::new().unwrap(); + + let std_listener = StdListener::bind("127.0.0.1:0").unwrap(); + let tk_listener = TcpListener::from_std(std_listener, rt.handle()).unwrap(); + + let addr = tk_listener.local_addr().unwrap(); + + // Spawn a thread since we are avoiding the runtime + let th = thread::spawn(|| for _ in tk_listener.incoming().take(1).wait() {}); + + let _ = StdStream::connect(&addr).unwrap(); + + th.join().unwrap(); +} + +#[test] +fn after_start_and_before_stop_is_called() { + let _ = env_logger::try_init(); + + let after_start = Arc::new(atomic::AtomicUsize::new(0)); + let before_stop = Arc::new(atomic::AtomicUsize::new(0)); + + let after_inner = after_start.clone(); + let before_inner = before_stop.clone(); + let runtime = tokio::runtime::Builder::new() + .after_start(move || { + after_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .before_stop(move || { + before_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .build() + .unwrap(); + + runtime.block_on_all(create_client_server_future()).unwrap(); + + assert!(after_start.load(atomic::Ordering::Relaxed) > 0); + assert!(before_stop.load(atomic::Ordering::Relaxed) > 0); +} diff --git a/third_party/rust/tokio-0.1.22/tests/timer.rs b/third_party/rust/tokio-0.1.22/tests/timer.rs new file mode 100644 index 0000000000..54c3b9d31d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/timer.rs @@ -0,0 +1,113 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use tokio::prelude::*; +use tokio::timer::*; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +#[test] +fn timer_with_runtime() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(100); + let (tx, rx) = mpsc::channel(); + + tokio::run({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn starving() { + use futures::{task, Async, Poll}; + + let _ = env_logger::try_init(); + + struct Starve(Delay, u64); + + impl Future for Starve { + type Item = u64; + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, ()> { + if self.0.poll().unwrap().is_ready() { + return Ok(self.1.into()); + } + + self.1 += 1; + + task::current().notify(); + + Ok(Async::NotReady) + } + } + + let when = Instant::now() + Duration::from_millis(20); + let starve = Starve(Delay::new(when), 0); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + starve.and_then(move |_ticks| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn deadline() { + use futures::future; + + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(20); + let (tx, rx) = mpsc::channel(); + + #[allow(deprecated)] + tokio::run({ + future::empty::<(), ()>().deadline(when).then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn timeout() { + use futures::future; + + let _ = env_logger::try_init(); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + future::empty::<(), ()>() + .timeout(Duration::from_millis(20)) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} |