diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.22/tests/runtime.rs')
-rw-r--r-- | third_party/rust/tokio-0.1.22/tests/runtime.rs | 532 |
1 files changed, 532 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/tests/runtime.rs b/third_party/rust/tokio-0.1.22/tests/runtime.rs new file mode 100644 index 0000000000..f84c66738b --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/runtime.rs @@ -0,0 +1,532 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; + +use futures::sync::oneshot; +use std::sync::{atomic, Arc, Mutex}; +use std::thread; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::future::lazy; +use tokio::prelude::*; +use tokio::runtime::Runtime; + +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +fn create_client_server_future() -> Box<Future<Item = (), Error = ()> + Send> { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server + .incoming() + .take(1) + .map_err(|e| panic!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| ()) + .map_err(|e| panic!("write err = {:?}", e)) + }) + }) + .map(|_| ()); + + let client = client + .map_err(|e| panic!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| ()) + .map_err(|e| panic!("read err = {:?}", e)) + }); + + let future = server.join(client).map(|_| ()); + Box::new(future) +} + +#[test] +fn runtime_tokio_run() { + let _ = env_logger::try_init(); + + tokio::run(create_client_server_future()); +} + +#[test] +fn runtime_single_threaded() { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(create_client_server_future()).unwrap(); + runtime.run().unwrap(); +} + +#[test] +fn runtime_single_threaded_block_on() { + let _ = env_logger::try_init(); + + tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); +} + +mod runtime_single_threaded_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(tokio::runtime::current_thread::Handle, Box<Future<Item = (), Error = ()> + Send>), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); + + let (tx, rx) = futures::sync::oneshot::channel(); + + let handle = handle_rx.recv().unwrap(); + spawn( + handle, + Box::new(futures::future::lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })), + ); + + // signal runtime thread to exit + trigger.send(()).unwrap(); + + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } + + #[test] + fn spawn() { + test(|handle, f| { + handle.spawn(f).unwrap(); + }) + } + + #[test] + fn execute() { + test(|handle, f| { + handle.execute(f).unwrap(); + }) + } +} + +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::Builder::new().build().unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { + rt.spawn(create_client_server_future()); + }); + } + + #[test] + fn execute() { + test(|rt| { + rt.executor() + .execute(create_client_server_future()) + .unwrap(); + }); + } +} + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod from_block_on { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod many { + use super::*; + + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + spawn( + &mut runtime, + Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + })), + ); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); + } + + #[test] + fn spawn() { + test(|rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor().execute(f).unwrap(); + }) + } +} + +mod from_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +mod nested_enter { + use super::*; + use std::panic; + use tokio::runtime::current_thread; + + fn test<F1, F2>(first: F1, nested: F2) + where + F1: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + F2: Fn(Box<Future<Item = (), Error = ()> + Send>) + panic::UnwindSafe + Send + 'static, + { + let panicked = Arc::new(Mutex::new(false)); + let panicked2 = panicked.clone(); + + // Since this is testing panics in other threads, printing about panics + // is noisy and can give the impression that the test is ignoring panics. + // + // It *is* ignoring them, but on purpose. + let prev_hook = panic::take_hook(); + panic::set_hook(Box::new(|info| { + let s = info.to_string(); + if s.starts_with("panicked at 'nested ") + || s.starts_with("panicked at 'Multiple executors at once") + { + // expected, noop + } else { + println!("{}", s); + } + })); + + first(Box::new(lazy(move || { + panic::catch_unwind(move || nested(Box::new(lazy(|| Ok::<(), ()>(()))))) + .expect_err("nested should panic"); + *panicked2.lock().unwrap() = true; + Ok::<(), ()>(()) + }))); + + panic::set_hook(prev_hook); + + assert!( + *panicked.lock().unwrap(), + "nested call should have panicked" + ); + } + + fn threadpool_new() -> Runtime { + Runtime::new().expect("rt new") + } + + #[test] + fn run_in_run() { + test(tokio::run, tokio::run); + } + + #[test] + fn threadpool_block_on_in_run() { + test(tokio::run, |fut| { + let mut rt = threadpool_new(); + rt.block_on(fut).unwrap(); + }); + } + + #[test] + fn threadpool_block_on_all_in_run() { + test(tokio::run, |fut| { + let rt = threadpool_new(); + rt.block_on_all(fut).unwrap(); + }); + } + + #[test] + fn current_thread_block_on_all_in_run() { + test(tokio::run, |fut| { + current_thread::block_on_all(fut).unwrap(); + }); + } +} + +#[test] +fn runtime_reactor_handle() { + #![allow(deprecated)] + + use futures::Stream; + use std::net::{TcpListener as StdListener, TcpStream as StdStream}; + + let rt = Runtime::new().unwrap(); + + let std_listener = StdListener::bind("127.0.0.1:0").unwrap(); + let tk_listener = TcpListener::from_std(std_listener, rt.handle()).unwrap(); + + let addr = tk_listener.local_addr().unwrap(); + + // Spawn a thread since we are avoiding the runtime + let th = thread::spawn(|| for _ in tk_listener.incoming().take(1).wait() {}); + + let _ = StdStream::connect(&addr).unwrap(); + + th.join().unwrap(); +} + +#[test] +fn after_start_and_before_stop_is_called() { + let _ = env_logger::try_init(); + + let after_start = Arc::new(atomic::AtomicUsize::new(0)); + let before_stop = Arc::new(atomic::AtomicUsize::new(0)); + + let after_inner = after_start.clone(); + let before_inner = before_stop.clone(); + let runtime = tokio::runtime::Builder::new() + .after_start(move || { + after_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .before_stop(move || { + before_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .build() + .unwrap(); + + runtime.block_on_all(create_client_server_future()).unwrap(); + + assert!(after_start.load(atomic::Ordering::Relaxed) > 0); + assert!(before_stop.load(atomic::Ordering::Relaxed) > 0); +} |