diff options
Diffstat (limited to 'vendor/tokio/tests/task_local_set.rs')
-rw-r--r-- | vendor/tokio/tests/task_local_set.rs | 243 |
1 files changed, 214 insertions, 29 deletions
diff --git a/vendor/tokio/tests/task_local_set.rs b/vendor/tokio/tests/task_local_set.rs index 58d510948..2da87f5ae 100644 --- a/vendor/tokio/tests/task_local_set.rs +++ b/vendor/tokio/tests/task_local_set.rs @@ -6,18 +6,23 @@ use futures::{ FutureExt, }; -use tokio::runtime::{self, Runtime}; +use tokio::runtime; use tokio::sync::{mpsc, oneshot}; use tokio::task::{self, LocalSet}; use tokio::time; +#[cfg(not(tokio_wasi))] use std::cell::Cell; -use std::sync::atomic::Ordering::{self, SeqCst}; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::AtomicBool; +#[cfg(not(tokio_wasi))] +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +#[cfg(not(tokio_wasi))] +use std::sync::atomic::Ordering::SeqCst; use std::time::Duration; #[tokio::test(flavor = "current_thread")] -async fn local_basic_scheduler() { +async fn local_current_thread_scheduler() { LocalSet::new() .run_until(async { task::spawn_local(async {}).await.unwrap(); @@ -25,6 +30,7 @@ async fn local_basic_scheduler() { .await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn local_threadpool() { thread_local! { @@ -45,6 +51,7 @@ async fn local_threadpool() { .await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn localset_future_threadpool() { thread_local! { @@ -60,6 +67,7 @@ async fn localset_future_threadpool() { local.await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn localset_future_timers() { static RAN1: AtomicBool = AtomicBool::new(false); @@ -67,11 +75,11 @@ async fn localset_future_timers() { let local = LocalSet::new(); local.spawn_local(async move { - time::sleep(Duration::from_millis(10)).await; + time::sleep(Duration::from_millis(5)).await; RAN1.store(true, Ordering::SeqCst); }); local.spawn_local(async move { - time::sleep(Duration::from_millis(20)).await; + time::sleep(Duration::from_millis(10)).await; RAN2.store(true, Ordering::SeqCst); }); local.await; @@ -104,6 +112,7 @@ async fn localset_future_drives_all_local_futs() { assert!(RAN3.load(Ordering::SeqCst)); } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn local_threadpool_timer() { // This test ensures that runtime services like the timer are properly @@ -126,7 +135,23 @@ async fn local_threadpool_timer() { }) .await; } +#[test] +fn enter_guard_spawn() { + let local = LocalSet::new(); + let _guard = local.enter(); + // Run the local task set. + + let join = task::spawn_local(async { true }); + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + local.block_on(&rt, async move { + assert!(join.await.unwrap()); + }); +} +#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery #[test] // This will panic, since the thread that calls `block_on` cannot use // in-place blocking inside of `block_on`. @@ -153,6 +178,7 @@ fn local_threadpool_blocking_in_place() { }); } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn local_threadpool_blocking_run() { thread_local! { @@ -181,6 +207,7 @@ async fn local_threadpool_blocking_run() { .await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn all_spawns_are_local() { use futures::future; @@ -207,6 +234,7 @@ async fn all_spawns_are_local() { .await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[tokio::test(flavor = "multi_thread")] async fn nested_spawn_is_local() { thread_local! { @@ -242,6 +270,7 @@ async fn nested_spawn_is_local() { .await; } +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads #[test] fn join_local_future_elsewhere() { thread_local! { @@ -255,14 +284,12 @@ fn join_local_future_elsewhere() { local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let join = task::spawn_local(async move { - println!("hello world running..."); assert!( ON_RT_THREAD.with(|cell| cell.get()), "local task must run on local thread, no matter where it is awaited" ); rx.await.unwrap(); - println!("hello world task done"); "hello world" }); let join2 = task::spawn(async move { @@ -272,16 +299,34 @@ fn join_local_future_elsewhere() { ); tx.send(()).expect("task shouldn't have ended yet"); - println!("waking up hello world..."); join.await.expect("task should complete successfully"); - - println!("hello world task joined"); }); join2.await.unwrap() }); } +// Tests for <https://github.com/tokio-rs/tokio/issues/4973> +#[cfg(not(tokio_wasi))] // Wasi doesn't support threads +#[tokio::test(flavor = "multi_thread")] +async fn localset_in_thread_local() { + thread_local! { + static LOCAL_SET: LocalSet = LocalSet::new(); + } + + // holds runtime thread until end of main fn. + let (_tx, rx) = oneshot::channel::<()>(); + let handle = tokio::runtime::Handle::current(); + + std::thread::spawn(move || { + LOCAL_SET.with(|local_set| { + handle.block_on(local_set.run_until(async move { + let _ = rx.await; + })) + }); + }); +} + #[test] fn drop_cancels_tasks() { use std::rc::Rc; @@ -299,9 +344,7 @@ fn drop_cancels_tasks() { let _rc2 = rc2; started_tx.send(()).unwrap(); - loop { - time::sleep(Duration::from_secs(3600)).await; - } + futures::future::pending::<()>().await; }); local.block_on(&rt, async { @@ -347,9 +390,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) { ), // Did the test thread panic? We'll find out for sure when we `join` // with it. - Err(RecvTimeoutError::Disconnected) => { - println!("done_rx dropped, did the test thread panic?"); - } + Err(RecvTimeoutError::Disconnected) => {} // Test completed successfully! Ok(()) => {} } @@ -357,6 +398,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) { thread.join().expect("test thread should not panic!") } +#[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")] #[test] fn drop_cancels_remote_tasks() { // This test reproduces issue #1885. @@ -379,6 +421,10 @@ fn drop_cancels_remote_tasks() { }); } +#[cfg_attr( + tokio_wasi, + ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi" +)] #[test] fn local_tasks_wake_join_all() { // This test reproduces issue #2460. @@ -400,13 +446,34 @@ fn local_tasks_wake_join_all() { }); } -#[tokio::test] -async fn local_tasks_are_polled_after_tick() { +#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery +#[test] +fn local_tasks_are_polled_after_tick() { + // This test depends on timing, so we run it up to five times. + for _ in 0..4 { + let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner); + if res.is_ok() { + // success + return; + } + } + + // Test failed 4 times. Try one more time without catching panics. If it + // fails again, the test fails. + local_tasks_are_polled_after_tick_inner(); +} + +#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery +#[tokio::main(flavor = "current_thread")] +async fn local_tasks_are_polled_after_tick_inner() { // Reproduces issues #1899 and #1900 static RX1: AtomicUsize = AtomicUsize::new(0); static RX2: AtomicUsize = AtomicUsize::new(0); - static EXPECTED: usize = 500; + const EXPECTED: usize = 500; + + RX1.store(0, SeqCst); + RX2.store(0, SeqCst); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -416,7 +483,7 @@ async fn local_tasks_are_polled_after_tick() { .run_until(async { let task2 = task::spawn(async move { // Wait a bit - time::sleep(Duration::from_millis(100)).await; + time::sleep(Duration::from_millis(10)).await; let mut oneshots = Vec::with_capacity(EXPECTED); @@ -427,18 +494,21 @@ async fn local_tasks_are_polled_after_tick() { tx.send(oneshot_rx).unwrap(); } - time::sleep(Duration::from_millis(100)).await; + time::sleep(Duration::from_millis(10)).await; for tx in oneshots.drain(..) { tx.send(()).unwrap(); } - time::sleep(Duration::from_millis(300)).await; - let rx1 = RX1.load(SeqCst); - let rx2 = RX2.load(SeqCst); - println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2); - assert_eq!(EXPECTED, rx1); - assert_eq!(EXPECTED, rx2); + loop { + time::sleep(Duration::from_millis(20)).await; + let rx1 = RX1.load(SeqCst); + let rx2 = RX2.load(SeqCst); + + if rx1 == EXPECTED && rx2 == EXPECTED { + break; + } + } }); while let Some(oneshot) = rx.recv().await { @@ -500,7 +570,122 @@ async fn spawn_wakes_localset() { } } -fn rt() -> Runtime { +#[test] +fn store_local_set_in_thread_local_with_runtime() { + use tokio::runtime::Runtime; + + thread_local! { + static CURRENT: RtAndLocalSet = RtAndLocalSet::new(); + } + + struct RtAndLocalSet { + rt: Runtime, + local: LocalSet, + } + + impl RtAndLocalSet { + fn new() -> RtAndLocalSet { + RtAndLocalSet { + rt: tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + local: LocalSet::new(), + } + } + + async fn inner_method(&self) { + self.local + .run_until(async move { + tokio::task::spawn_local(async {}); + }) + .await + } + + fn method(&self) { + self.rt.block_on(self.inner_method()); + } + } + + CURRENT.with(|f| { + f.method(); + }); +} + +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::UnhandledPanic; + use tokio::task::LocalSet; + + #[tokio::test] + #[should_panic( + expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic" + )] + async fn shutdown_on_panic() { + LocalSet::new() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .run_until(async { + tokio::task::spawn_local(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + .await; + } + + // This test compares that, when the task driving `run_until` has already + // consumed budget, the `run_until` future has less budget than a "spawned" + // task. + // + // "Budget" is a fuzzy metric as the Tokio runtime is able to change values + // internally. This is why the test uses indirection to test this. + #[tokio::test] + async fn run_until_does_not_get_own_budget() { + // Consume some budget + tokio::task::consume_budget().await; + + LocalSet::new() + .run_until(async { + let spawned = tokio::spawn(async { + let mut spawned_n = 0; + + { + let mut spawned = tokio_test::task::spawn(async { + loop { + spawned_n += 1; + tokio::task::consume_budget().await; + } + }); + // Poll once + assert!(!spawned.poll().is_ready()); + } + + spawned_n + }); + + let mut run_until_n = 0; + { + let mut run_until = tokio_test::task::spawn(async { + loop { + run_until_n += 1; + tokio::task::consume_budget().await; + } + }); + // Poll once + assert!(!run_until.poll().is_ready()); + } + + let spawned_n = spawned.await.unwrap(); + assert_ne!(spawned_n, 0); + assert_ne!(run_until_n, 0); + assert!(spawned_n > run_until_n); + }) + .await + } +} + +fn rt() -> runtime::Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() .build() |