diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-0.1.11/tests | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.11/tests')
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/buffered.rs | 63 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/clock.rs | 69 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/drop-core.rs | 42 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/global.rs | 136 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/length_delimited.rs | 564 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/line-frames.rs | 88 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/pipe-hup.rs | 88 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/reactor.rs | 89 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/runtime.rs | 404 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/timer.rs | 116 |
10 files changed, 1659 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/tests/buffered.rs b/third_party/rust/tokio-0.1.11/tests/buffered.rs new file mode 100644 index 0000000000..3605eba38a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/buffered.rs @@ -0,0 +1,63 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Read, Write, BufReader, BufWriter}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::copy; +use tokio::net::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + const N: usize = 1024; + drop(env_logger::try_init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + + let t2 = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + let mut b = vec![0; msg.len() * N]; + t!(s.read_exact(&mut b)); + b + }); + + let mut expected = Vec::<u8>::new(); + for _i in 0..N { + expected.extend(msg.as_bytes()); + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + } + (expected, t2) + }); + + let clients = srv.incoming().take(2).collect(); + let copied = clients.and_then(|clients| { + let mut clients = clients.into_iter(); + let a = BufReader::new(clients.next().unwrap()); + let b = BufWriter::new(clients.next().unwrap()); + copy(a, b) + }); + + let (amt, _, _) = t!(copied.wait()); + let (expected, t2) = t.join().unwrap(); + let actual = t2.join().unwrap(); + + assert!(expected == actual); + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-0.1.11/tests/clock.rs b/third_party/rust/tokio-0.1.11/tests/clock.rs new file mode 100644 index 0000000000..6e9d9121fc --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/clock.rs @@ -0,0 +1,69 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_timer; +extern crate env_logger; + +use tokio::prelude::*; +use tokio::runtime::{self, current_thread}; +use tokio::timer::*; +use tokio_timer::clock::Clock; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +struct MockNow(Instant); + +impl tokio_timer::clock::Now for MockNow { + fn now(&self) -> Instant { + self.0 + } +} + +#[test] +fn clock_and_timer_concurrent() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = runtime::Builder::new() + .clock(clock) + .build() + .unwrap(); + + let (tx, rx) = mpsc::channel(); + + rt.spawn({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn clock_and_timer_single_threaded() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = current_thread::Builder::new() + .clock(clock) + .build() + .unwrap(); + + rt.block_on({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + Ok(()) + }) + }).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/drop-core.rs b/third_party/rust/tokio-0.1.11/tests/drop-core.rs new file mode 100644 index 0000000000..75ac9b7eb1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/drop-core.rs @@ -0,0 +1,42 @@ +extern crate tokio; +extern crate futures; + +use std::thread; +use std::net; + +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use tokio::net::TcpListener; +use tokio::reactor::Reactor; + +#[test] +fn tcp_doesnt_block() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + drop(core); + assert!(listener.incoming().wait().next().unwrap().is_err()); +} + +#[test] +fn drop_wakes() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + let (tx, rx) = oneshot::channel::<()>(); + let t = thread::spawn(move || { + let incoming = listener.incoming(); + let new_socket = incoming.into_future().map_err(|_| ()); + let drop_tx = future::lazy(|| { + drop(tx); + future::ok(()) + }); + assert!(new_socket.join(drop_tx).wait().is_err()); + }); + drop(rx.wait()); + drop(core); + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/global.rs b/third_party/rust/tokio-0.1.11/tests/global.rs new file mode 100644 index 0000000000..d3bc09315a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/global.rs @@ -0,0 +1,136 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use std::{io, thread}; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +use futures::prelude::*; +use tokio::net::{TcpStream, TcpListener}; +use tokio::runtime::Runtime; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn hammer_old() { + let _ = env_logger::try_init(); + + let threads = (0..10).map(|_| { + thread::spawn(|| { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr); + let theirs = srv.incoming().into_future() + .map(|(s, _)| s.unwrap()) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }).collect::<Vec<_>>(); + for thread in threads { + thread.join().unwrap(); + } +} + +struct Rd(Arc<TcpStream>); +struct Wr(Arc<TcpStream>); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd { +} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + const ITER: usize = 10; + + let _ = env_logger::try_init(); + + for _ in 0..ITER { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let cnt2 = cnt.clone(); + + let rd = io::read(rd, vec![0; 1]) + .map(move |_| { + cnt2.fetch_add(1, Relaxed); + }) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(move |_| { + cnt.fetch_add(1, Relaxed); + }) + .map_err(move |e| panic!("write error = {:?}", e)); + + tokio::spawn(rd); + tokio::spawn(wr); + } + + rt.spawn({ + let cnt = cnt.clone(); + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(move |socket| { + split(socket, cnt.clone()); + Ok(()) + }) + }); + + for _ in 0..N { + rt.spawn({ + let cnt = cnt.clone(); + TcpStream::connect(&addr) + .map_err(move |e| panic!("connect error = {:?}", e)) + .map(move |socket| split(socket, cnt)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 4, cnt.load(Relaxed)); + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/length_delimited.rs b/third_party/rust/tokio-0.1.11/tests/length_delimited.rs new file mode 100644 index 0000000000..318f35ef33 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/length_delimited.rs @@ -0,0 +1,564 @@ +extern crate tokio; +extern crate futures; +extern crate bytes; + +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::codec::*; + +use bytes::Bytes; +use futures::{Stream, Sink, Poll}; +use futures::Async::*; + +use std::io; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + + +#[test] +fn read_empty_io_yields_nothing() { + let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_read(mock! { + Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_native_endian() { + let data = if cfg!(target_endian = "big") { + b"\x00\x00\x00\x09abcdefghi" + } else { + b"\x09\x00\x00\x00abcdefghi" + }; + let mut io = length_delimited::Builder::new() + .native_endian() + .new_read(mock! { + Ok(data[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + data.extend_from_slice(b"\x00\x00\x00\x03123"); + data.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let mut io = FramedRead::new(mock! { + Ok(data.into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Err(would_block()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_incomplete_head() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + }, LengthDelimitedCodec::new()); + + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_head_multi() { + let mut io = FramedRead::new(mock! { + Err(would_block()), + Ok(b"\x00"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_payload() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09ab"[..].into()), + Err(would_block()), + Ok(b"cd"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new() + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new() + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcd"[..].into()), + Err(would_block()), + Ok(b"efghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_one_byte_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_read(mock! { + Ok(b"\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_header_offset() { + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(4) + .new_read(mock! { + Ok(b"zzzz\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_skip_none_adjusted() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"xx\x00\x09abcdefghi"); + data.extend_from_slice(b"yy\x00\x03123"); + data.extend_from_slice(b"zz\x00\x0bhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(2) + .num_skip(0) + .length_adjustment(4) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_length_includes_head() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x0babcdefghi"); + data.extend_from_slice(b"\x00\x05123"); + data.extend_from_slice(b"\x00\x0dhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_adjustment(-2) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn write_single_frame_length_adjusted() { + let mut io = length_delimited::Builder::new() + .length_adjustment(-2) + .new_write(mock! { + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_nothing_yields_nothing() { + let mut io = FramedWrite::new( + mock!(), + LengthDelimitedCodec::new() + ); + assert!(io.poll_complete().unwrap().is_ready()); +} + +#[test] +fn write_single_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_multi_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_would_block() { + let mut io = FramedWrite::new(mock! { + Err(would_block()), + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_write(mock! { + Ok(b"\x09\x00\x00\x00"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + + +#[test] +fn write_single_frame_with_short_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_write(mock! { + Ok(b"\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_write(mock! { }); + + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"abcdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"ab"[..].into()), + Err(would_block()), + Ok(b"cdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert!(io.poll_complete().unwrap().is_ready()); + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_zero() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert_eq!(io.poll_complete().unwrap_err().kind(), io::ErrorKind::WriteZero); + assert!(io.get_ref().calls.is_empty()); +} + +// ===== Test utils ===== + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +struct Mock { + calls: VecDeque<io::Result<Op>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl io::Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock { +} + +impl io::Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ok(len) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front() { + Some(Ok(Op::Flush)) => { + Ok(()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(()), + } + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Ready(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/line-frames.rs b/third_party/rust/tokio-0.1.11/tests/line-frames.rs new file mode 100644 index 0000000000..e36d5a73ee --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/line-frames.rs @@ -0,0 +1,88 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_threadpool; +extern crate bytes; + +use std::io; +use std::net::Shutdown; + +use bytes::{BytesMut, BufMut}; +use futures::{Future, Stream, Sink}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_codec::{Encoder, Decoder}; +use tokio_io::io::{write_all, read}; +use tokio_threadpool::Builder; + +pub struct LineCodec; + +impl Decoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf.split_to(i + 1).into())), + None => Ok(None), + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() == 0 { + Ok(None) + } else { + let amt = buf.len(); + Ok(Some(buf.split_to(amt))) + } + } +} + +impl Encoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> { + into.put(&item[..]); + Ok(()) + } +} + +#[test] +fn echo() { + drop(env_logger::try_init()); + + let pool = Builder::new() + .pool_size(1) + .build(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let sender = pool.sender().clone(); + let srv = listener.incoming().for_each(move |socket| { + let (sink, stream) = LineCodec.framed(socket).split(); + sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap(); + Ok(()) + }); + + pool.sender().spawn(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); + + let client = TcpStream::connect(&addr); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + assert_eq!(amt, 2); + assert_eq!(&buf[..2], b"a\n"); + + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"\n"); + + let (client, _) = write_all(client, b"b").wait().unwrap(); + client.shutdown(Shutdown::Write).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"b"); +} diff --git a/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs b/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs new file mode 100644 index 0000000000..a23ae7f6ba --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs @@ -0,0 +1,88 @@ +#![cfg(unix)] + +extern crate env_logger; +extern crate futures; +extern crate libc; +extern crate mio; +extern crate tokio; +extern crate tokio_io; + +use std::fs::File; +use std::io::{self, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::thread; +use std::time::Duration; + +use mio::event::Evented; +use mio::unix::{UnixReady, EventedFd}; +use mio::{PollOpt, Ready, Token}; +use tokio::reactor::{Handle, PollEvented2}; +use tokio_io::io::read_to_end; +use futures::Future; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +struct MyFile(File); + +impl MyFile { + fn new(file: File) -> MyFile { + unsafe { + let r = libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + assert!(r != -1, "fcntl error: {}", io::Error::last_os_error()); + } + MyFile(file) + } +} + +impl io::Read for MyFile { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + self.0.read(bytes) + } +} + +impl Evented for MyFile { + fn register(&self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt) + -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | hup, opts) + } + fn reregister(&self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt) + -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | hup, opts) + } + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + EventedFd(&self.0.as_raw_fd()).deregister(poll) + } +} + +#[test] +fn hup() { + drop(env_logger::try_init()); + + let handle = Handle::default(); + unsafe { + let mut pipes = [0; 2]; + assert!(libc::pipe(pipes.as_mut_ptr()) != -1, + "pipe error: {}", io::Error::last_os_error()); + let read = File::from_raw_fd(pipes[0]); + let mut write = File::from_raw_fd(pipes[1]); + let t = thread::spawn(move || { + write.write_all(b"Hello!\n").unwrap(); + write.write_all(b"Good bye!\n").unwrap(); + thread::sleep(Duration::from_millis(100)); + }); + + let source = PollEvented2::new_with_handle(MyFile::new(read), &handle).unwrap(); + + let reader = read_to_end(source, Vec::new()); + let (_, content) = t!(reader.wait()); + assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); + t.join().unwrap(); + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/reactor.rs b/third_party/rust/tokio-0.1.11/tests/reactor.rs new file mode 100644 index 0000000000..1bac13ad40 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/reactor.rs @@ -0,0 +1,89 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_tcp; + +use tokio_reactor::Reactor; +use tokio_tcp::TcpListener; + +use futures::{Future, Stream}; +use futures::executor::{spawn, Notify, Spawn}; + +use std::mem; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + struct MyNotify; + + type Task = Mutex<Spawn<Box<Future<Item = (), Error = ()>>>>; + + impl Notify for MyNotify { + fn notify(&self, _: usize) { + // Do nothing + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + let task = unsafe { Arc::from_raw(ptr) }; + + mem::forget(task.clone()); + mem::forget(task); + + id + } + + fn drop_id(&self, id: usize) { + let ptr = id as *const Task; + let _ = unsafe { Arc::from_raw(ptr) }; + } + } + + let addr = "127.0.0.1:0".parse().unwrap(); + let mut reactor = Reactor::new().unwrap(); + + // Create a listener + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Define a task that just drains the listener + let task = Box::new({ + listener.incoming() + .for_each(|_| Ok(())) + .map_err(|_| panic!()) + }) as Box<Future<Item = (), Error = ()>>; + + let task = Arc::new(Mutex::new(spawn(task))); + let notify = Arc::new(MyNotify); + + let mut enter = tokio_executor::enter().unwrap(); + + tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| { + let id = &*task as *const Task as usize; + + task.lock().unwrap() + .poll_future_notify(¬ify, id) + .unwrap(); + }); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + reactor.turn(None).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/runtime.rs b/third_party/rust/tokio-0.1.11/tests/runtime.rs new file mode 100644 index 0000000000..66d10b9510 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/runtime.rs @@ -0,0 +1,404 @@ +extern crate tokio; +extern crate env_logger; +extern crate futures; + +use futures::sync::oneshot; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::io; +use tokio::net::{TcpStream, TcpListener}; +use tokio::prelude::future::lazy; +use tokio::prelude::*; +use tokio::runtime::Runtime; + +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +fn create_client_server_future() -> Box<Future<Item=(), Error=()> + Send> { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server.incoming().take(1) + .map_err(|e| panic!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| ()) + .map_err(|e| panic!("write err = {:?}", e)) + }) + }) + .map(|_| ()); + + let client = client + .map_err(|e| panic!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| ()) + .map_err(|e| panic!("read err = {:?}", e)) + }); + + let future = server.join(client) + .map(|_| ()); + Box::new(future) +} + +#[test] +fn runtime_tokio_run() { + let _ = env_logger::try_init(); + + tokio::run(create_client_server_future()); +} + +#[test] +fn runtime_single_threaded() { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::current_thread::Runtime::new() + .unwrap(); + runtime.block_on(create_client_server_future()).unwrap(); + runtime.run().unwrap(); +} + +#[test] +fn runtime_single_threaded_block_on() { + let _ = env_logger::try_init(); + + tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); +} + +mod runtime_single_threaded_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })).unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn( + tokio::runtime::current_thread::Handle, + Box<Future<Item=(), Error=()> + Send>, + ), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); + + let (tx, rx) = futures::sync::oneshot::channel(); + + let handle = handle_rx.recv().unwrap(); + spawn(handle, Box::new(futures::future::lazy(move || { + tx.send(()).unwrap(); + Ok(()) + }))); + + // signal runtime thread to exit + trigger.send(()).unwrap(); + + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } + + #[test] + fn spawn() { + test(|handle, f| { handle.spawn(f).unwrap(); }) + } + + #[test] + fn execute() { + test(|handle, f| { handle.execute(f).unwrap(); }) + } +} + +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::Builder::new() + .build() + .unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { rt.spawn(create_client_server_future()); }); + } + + #[test] + fn execute() { + test(|rt| { rt.executor().execute(create_client_server_future()).unwrap(); }); + } +} + + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod from_block_on { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod many { + use super::*; + + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + spawn(&mut runtime, Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); + } + + #[test] + fn spawn() { + test(|rt, f| { rt.spawn(f); }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor() + .execute(f) + .unwrap(); + }) + } +} + + +mod from_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } +} + +#[test] +fn run_in_run() { + use std::panic; + + tokio::run(lazy(|| { + panic::catch_unwind(|| { + tokio::run(lazy(|| { Ok::<(), ()>(()) })) + }).unwrap_err(); + Ok::<(), ()>(()) + })); +} diff --git a/third_party/rust/tokio-0.1.11/tests/timer.rs b/third_party/rust/tokio-0.1.11/tests/timer.rs new file mode 100644 index 0000000000..72a5595d76 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/timer.rs @@ -0,0 +1,116 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use tokio::prelude::*; +use tokio::timer::*; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +#[test] +fn timer_with_runtime() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(100); + let (tx, rx) = mpsc::channel(); + + tokio::run({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn starving() { + use futures::{task, Poll, Async}; + + let _ = env_logger::try_init(); + + struct Starve(Delay, u64); + + impl Future for Starve { + type Item = u64; + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, ()> { + if self.0.poll().unwrap().is_ready() { + return Ok(self.1.into()); + } + + self.1 += 1; + + task::current().notify(); + + Ok(Async::NotReady) + } + } + + let when = Instant::now() + Duration::from_millis(20); + let starve = Starve(Delay::new(when), 0); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + starve + .and_then(move |_ticks| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn deadline() { + use futures::future; + + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(20); + let (tx, rx) = mpsc::channel(); + + #[allow(deprecated)] + tokio::run({ + future::empty::<(), ()>() + .deadline(when) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn timeout() { + use futures::future; + + let _ = env_logger::try_init(); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + future::empty::<(), ()>() + .timeout(Duration::from_millis(20)) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} |