summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/rt_basic.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/tests/rt_basic.rs')
-rw-r--r--vendor/tokio/tests/rt_basic.rs292
1 files changed, 291 insertions, 1 deletions
diff --git a/vendor/tokio/tests/rt_basic.rs b/vendor/tokio/tests/rt_basic.rs
index 4b1bdadc1..6caf0a44b 100644
--- a/vendor/tokio/tests/rt_basic.rs
+++ b/vendor/tokio/tests/rt_basic.rs
@@ -3,15 +3,28 @@
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
+use tokio::time::{timeout, Duration};
use tokio_test::{assert_err, assert_ok};
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::task::{Context, Poll};
use std::thread;
-use tokio::time::{timeout, Duration};
mod support {
pub(crate) mod mpsc_stream;
}
+macro_rules! cfg_metrics {
+ ($($t:tt)*) => {
+ #[cfg(tokio_unstable)]
+ {
+ $( $t )*
+ }
+ }
+}
+
#[test]
fn spawned_task_does_not_progress_without_block_on() {
let (tx, mut rx) = oneshot::channel();
@@ -136,6 +149,134 @@ fn acquire_mutex_in_drop() {
}
#[test]
+fn drop_tasks_in_context() {
+ static SUCCESS: AtomicBool = AtomicBool::new(false);
+
+ struct ContextOnDrop;
+
+ impl Future for ContextOnDrop {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for ContextOnDrop {
+ fn drop(&mut self) {
+ if tokio::runtime::Handle::try_current().is_ok() {
+ SUCCESS.store(true, Ordering::SeqCst);
+ }
+ }
+ }
+
+ let rt = rt();
+ rt.spawn(ContextOnDrop);
+ drop(rt);
+
+ assert!(SUCCESS.load(Ordering::SeqCst));
+}
+
+#[test]
+#[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
+#[should_panic(expected = "boom")]
+fn wake_in_drop_after_panic() {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ struct WakeOnDrop(Option<oneshot::Sender<()>>);
+
+ impl Drop for WakeOnDrop {
+ fn drop(&mut self) {
+ self.0.take().unwrap().send(()).unwrap();
+ }
+ }
+
+ let rt = rt();
+
+ rt.spawn(async move {
+ let _wake_on_drop = WakeOnDrop(Some(tx));
+ // wait forever
+ futures::future::pending::<()>().await;
+ });
+
+ let _join = rt.spawn(async move { rx.await });
+
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ panic!("boom");
+ });
+}
+
+#[test]
+fn spawn_two() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ });
+ });
+
+ assert_ok!(rx.await)
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(0, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(2, local);
+ }
+}
+
+#[cfg_attr(tokio_wasi, ignore = "WASI: std::thread::spawn not supported")]
+#[test]
+fn spawn_remote() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ let handle = tokio::spawn(async move {
+ std::thread::spawn(move || {
+ std::thread::sleep(Duration::from_millis(10));
+ tx.send("ZOMG").unwrap();
+ });
+
+ rx.await.unwrap()
+ });
+
+ handle.await.unwrap()
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(1, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(1, local);
+ }
+}
+
+#[test]
+#[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
#[should_panic(
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
)]
@@ -150,6 +291,155 @@ fn timeout_panics_when_no_time_handle() {
});
}
+#[cfg(tokio_unstable)]
+mod unstable {
+ use tokio::runtime::{Builder, RngSeed, UnhandledPanic};
+
+ #[test]
+ #[should_panic(
+ expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic"
+ )]
+ fn shutdown_on_panic() {
+ let rt = Builder::new_current_thread()
+ .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ .build()
+ .unwrap();
+
+ rt.block_on(async {
+ tokio::spawn(async {
+ panic!("boom");
+ });
+
+ futures::future::pending::<()>().await;
+ })
+ }
+
+ #[test]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
+ fn spawns_do_nothing() {
+ use std::sync::Arc;
+
+ let rt = Builder::new_current_thread()
+ .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ .build()
+ .unwrap();
+
+ let rt1 = Arc::new(rt);
+ let rt2 = rt1.clone();
+
+ let _ = std::thread::spawn(move || {
+ rt2.block_on(async {
+ tokio::spawn(async {
+ panic!("boom");
+ });
+
+ futures::future::pending::<()>().await;
+ })
+ })
+ .join();
+
+ let task = rt1.spawn(async {});
+ let res = futures::executor::block_on(task);
+ assert!(res.is_err());
+ }
+
+ #[test]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
+ fn shutdown_all_concurrent_block_on() {
+ const N: usize = 2;
+ use std::sync::{mpsc, Arc};
+
+ let rt = Builder::new_current_thread()
+ .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ .build()
+ .unwrap();
+
+ let rt = Arc::new(rt);
+ let mut ths = vec![];
+ let (tx, rx) = mpsc::channel();
+
+ for _ in 0..N {
+ let rt = rt.clone();
+ let tx = tx.clone();
+ ths.push(std::thread::spawn(move || {
+ rt.block_on(async {
+ tx.send(()).unwrap();
+ futures::future::pending::<()>().await;
+ });
+ }));
+ }
+
+ for _ in 0..N {
+ rx.recv().unwrap();
+ }
+
+ rt.spawn(async {
+ panic!("boom");
+ });
+
+ for th in ths {
+ assert!(th.join().is_err());
+ }
+ }
+
+ #[test]
+ fn rng_seed() {
+ let seed = b"bytes used to generate seed";
+ let rt1 = tokio::runtime::Builder::new_current_thread()
+ .rng_seed(RngSeed::from_bytes(seed))
+ .build()
+ .unwrap();
+ let rt1_values = rt1.block_on(async {
+ let rand_1 = tokio::macros::support::thread_rng_n(100);
+ let rand_2 = tokio::macros::support::thread_rng_n(100);
+
+ (rand_1, rand_2)
+ });
+
+ let rt2 = tokio::runtime::Builder::new_current_thread()
+ .rng_seed(RngSeed::from_bytes(seed))
+ .build()
+ .unwrap();
+ let rt2_values = rt2.block_on(async {
+ let rand_1 = tokio::macros::support::thread_rng_n(100);
+ let rand_2 = tokio::macros::support::thread_rng_n(100);
+
+ (rand_1, rand_2)
+ });
+
+ assert_eq!(rt1_values, rt2_values);
+ }
+
+ #[test]
+ fn rng_seed_multi_enter() {
+ let seed = b"bytes used to generate seed";
+
+ fn two_rand_values() -> (u32, u32) {
+ let rand_1 = tokio::macros::support::thread_rng_n(100);
+ let rand_2 = tokio::macros::support::thread_rng_n(100);
+
+ (rand_1, rand_2)
+ }
+
+ let rt1 = tokio::runtime::Builder::new_current_thread()
+ .rng_seed(RngSeed::from_bytes(seed))
+ .build()
+ .unwrap();
+ let rt1_values_1 = rt1.block_on(async { two_rand_values() });
+ let rt1_values_2 = rt1.block_on(async { two_rand_values() });
+
+ let rt2 = tokio::runtime::Builder::new_current_thread()
+ .rng_seed(RngSeed::from_bytes(seed))
+ .build()
+ .unwrap();
+ let rt2_values_1 = rt2.block_on(async { two_rand_values() });
+ let rt2_values_2 = rt2.block_on(async { two_rand_values() });
+
+ assert_eq!(rt1_values_1, rt2_values_1);
+ assert_eq!(rt1_values_2, rt2_values_2);
+ }
+}
+
fn rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()