summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.22/tests/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.22/tests/runtime.rs')
-rw-r--r--third_party/rust/tokio-0.1.22/tests/runtime.rs532
1 files changed, 532 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/tests/runtime.rs b/third_party/rust/tokio-0.1.22/tests/runtime.rs
new file mode 100644
index 0000000000..f84c66738b
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/tests/runtime.rs
@@ -0,0 +1,532 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+
+use futures::sync::oneshot;
+use std::sync::{atomic, Arc, Mutex};
+use std::thread;
+use tokio::io;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::future::lazy;
+use tokio::prelude::*;
+use tokio::runtime::Runtime;
+
+// this import is used in all child modules that have it in scope
+// from importing super::*, but the compiler doesn't realise that
+// and warns about it.
+pub use futures::future::Executor;
+
+macro_rules! t {
+ ($e:expr) => {
+ match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ }
+ };
+}
+
+fn create_client_server_future() -> Box<Future<Item = (), Error = ()> + Send> {
+ let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(server.local_addr());
+ let client = TcpStream::connect(&addr);
+
+ let server = server
+ .incoming()
+ .take(1)
+ .map_err(|e| panic!("accept err = {:?}", e))
+ .for_each(|socket| {
+ tokio::spawn({
+ io::write_all(socket, b"hello")
+ .map(|_| ())
+ .map_err(|e| panic!("write err = {:?}", e))
+ })
+ })
+ .map(|_| ());
+
+ let client = client
+ .map_err(|e| panic!("connect err = {:?}", e))
+ .and_then(|client| {
+ // Read all
+ io::read_to_end(client, vec![])
+ .map(|_| ())
+ .map_err(|e| panic!("read err = {:?}", e))
+ });
+
+ let future = server.join(client).map(|_| ());
+ Box::new(future)
+}
+
+#[test]
+fn runtime_tokio_run() {
+ let _ = env_logger::try_init();
+
+ tokio::run(create_client_server_future());
+}
+
+#[test]
+fn runtime_single_threaded() {
+ let _ = env_logger::try_init();
+
+ let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
+ runtime.block_on(create_client_server_future()).unwrap();
+ runtime.run().unwrap();
+}
+
+#[test]
+fn runtime_single_threaded_block_on() {
+ let _ = env_logger::try_init();
+
+ tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap();
+}
+
+mod runtime_single_threaded_block_on_all {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let msg = tokio::runtime::current_thread::block_on_all(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+}
+
+mod runtime_single_threaded_racy {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(tokio::runtime::current_thread::Handle, Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let (trigger, exit) = futures::sync::oneshot::channel();
+ let (handle_tx, handle_rx) = ::std::sync::mpsc::channel();
+ let jh = ::std::thread::spawn(move || {
+ let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
+ handle_tx.send(rt.handle()).unwrap();
+
+ // don't exit until we are told to
+ rt.block_on(exit.map_err(|_| ())).unwrap();
+
+ // run until all spawned futures (incl. the "exit" signal future) have completed.
+ rt.run().unwrap();
+ });
+
+ let (tx, rx) = futures::sync::oneshot::channel();
+
+ let handle = handle_rx.recv().unwrap();
+ spawn(
+ handle,
+ Box::new(futures::future::lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ })),
+ );
+
+ // signal runtime thread to exit
+ trigger.send(()).unwrap();
+
+ // wait for runtime thread to exit
+ jh.join().unwrap();
+
+ assert_eq!(rx.wait().unwrap(), ());
+ }
+
+ #[test]
+ fn spawn() {
+ test(|handle, f| {
+ handle.spawn(f).unwrap();
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|handle, f| {
+ handle.execute(f).unwrap();
+ })
+ }
+}
+
+mod runtime_multi_threaded {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime) + Send + 'static,
+ {
+ let _ = env_logger::try_init();
+
+ let mut runtime = tokio::runtime::Builder::new().build().unwrap();
+ spawn(&mut runtime);
+ runtime.shutdown_on_idle().wait().unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(|rt| {
+ rt.spawn(create_client_server_future());
+ });
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt| {
+ rt.executor()
+ .execute(create_client_server_future())
+ .unwrap();
+ });
+ }
+}
+
+#[test]
+fn block_on_timer() {
+ use std::time::{Duration, Instant};
+ use tokio::timer::{Delay, Error};
+
+ fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send>
+ where
+ T: Send + 'static,
+ {
+ Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x))
+ }
+
+ let mut runtime = Runtime::new().unwrap();
+ assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42);
+ runtime.shutdown_on_idle().wait().unwrap();
+}
+
+mod from_block_on {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let mut runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+}
+
+#[test]
+fn block_waits() {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(|| {
+ use std::time::Duration;
+ thread::sleep(Duration::from_millis(1000));
+ tx.send(()).unwrap();
+ });
+
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let mut runtime = Runtime::new().unwrap();
+ runtime
+ .block_on(rx.then(move |_| {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<_, ()>(())
+ }))
+ .unwrap();
+
+ assert_eq!(1, *cnt.lock().unwrap());
+ runtime.shutdown_on_idle().wait().unwrap();
+}
+
+mod many {
+ use super::*;
+
+ const ITER: usize = 200;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime, Box<Future<Item = (), Error = ()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let mut runtime = Runtime::new().unwrap();
+
+ for _ in 0..ITER {
+ let c = cnt.clone();
+ spawn(
+ &mut runtime,
+ Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })),
+ );
+ }
+
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(ITER, *cnt.lock().unwrap());
+ }
+
+ #[test]
+ fn spawn() {
+ test(|rt, f| {
+ rt.spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt, f| {
+ rt.executor().execute(f).unwrap();
+ })
+ }
+}
+
+mod from_block_on_all {
+ use super::*;
+
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on_all(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
+}
+
+mod nested_enter {
+ use super::*;
+ use std::panic;
+ use tokio::runtime::current_thread;
+
+ fn test<F1, F2>(first: F1, nested: F2)
+ where
+ F1: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static,
+ F2: Fn(Box<Future<Item = (), Error = ()> + Send>) + panic::UnwindSafe + Send + 'static,
+ {
+ let panicked = Arc::new(Mutex::new(false));
+ let panicked2 = panicked.clone();
+
+ // Since this is testing panics in other threads, printing about panics
+ // is noisy and can give the impression that the test is ignoring panics.
+ //
+ // It *is* ignoring them, but on purpose.
+ let prev_hook = panic::take_hook();
+ panic::set_hook(Box::new(|info| {
+ let s = info.to_string();
+ if s.starts_with("panicked at 'nested ")
+ || s.starts_with("panicked at 'Multiple executors at once")
+ {
+ // expected, noop
+ } else {
+ println!("{}", s);
+ }
+ }));
+
+ first(Box::new(lazy(move || {
+ panic::catch_unwind(move || nested(Box::new(lazy(|| Ok::<(), ()>(())))))
+ .expect_err("nested should panic");
+ *panicked2.lock().unwrap() = true;
+ Ok::<(), ()>(())
+ })));
+
+ panic::set_hook(prev_hook);
+
+ assert!(
+ *panicked.lock().unwrap(),
+ "nested call should have panicked"
+ );
+ }
+
+ fn threadpool_new() -> Runtime {
+ Runtime::new().expect("rt new")
+ }
+
+ #[test]
+ fn run_in_run() {
+ test(tokio::run, tokio::run);
+ }
+
+ #[test]
+ fn threadpool_block_on_in_run() {
+ test(tokio::run, |fut| {
+ let mut rt = threadpool_new();
+ rt.block_on(fut).unwrap();
+ });
+ }
+
+ #[test]
+ fn threadpool_block_on_all_in_run() {
+ test(tokio::run, |fut| {
+ let rt = threadpool_new();
+ rt.block_on_all(fut).unwrap();
+ });
+ }
+
+ #[test]
+ fn current_thread_block_on_all_in_run() {
+ test(tokio::run, |fut| {
+ current_thread::block_on_all(fut).unwrap();
+ });
+ }
+}
+
+#[test]
+fn runtime_reactor_handle() {
+ #![allow(deprecated)]
+
+ use futures::Stream;
+ use std::net::{TcpListener as StdListener, TcpStream as StdStream};
+
+ let rt = Runtime::new().unwrap();
+
+ let std_listener = StdListener::bind("127.0.0.1:0").unwrap();
+ let tk_listener = TcpListener::from_std(std_listener, rt.handle()).unwrap();
+
+ let addr = tk_listener.local_addr().unwrap();
+
+ // Spawn a thread since we are avoiding the runtime
+ let th = thread::spawn(|| for _ in tk_listener.incoming().take(1).wait() {});
+
+ let _ = StdStream::connect(&addr).unwrap();
+
+ th.join().unwrap();
+}
+
+#[test]
+fn after_start_and_before_stop_is_called() {
+ let _ = env_logger::try_init();
+
+ let after_start = Arc::new(atomic::AtomicUsize::new(0));
+ let before_stop = Arc::new(atomic::AtomicUsize::new(0));
+
+ let after_inner = after_start.clone();
+ let before_inner = before_stop.clone();
+ let runtime = tokio::runtime::Builder::new()
+ .after_start(move || {
+ after_inner.clone().fetch_add(1, atomic::Ordering::Relaxed);
+ })
+ .before_stop(move || {
+ before_inner.clone().fetch_add(1, atomic::Ordering::Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ runtime.block_on_all(create_client_server_future()).unwrap();
+
+ assert!(after_start.load(atomic::Ordering::Relaxed) > 0);
+ assert!(before_stop.load(atomic::Ordering::Relaxed) > 0);
+}