summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/task_local_set.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/tests/task_local_set.rs')
-rw-r--r--vendor/tokio/tests/task_local_set.rs243
1 files changed, 214 insertions, 29 deletions
diff --git a/vendor/tokio/tests/task_local_set.rs b/vendor/tokio/tests/task_local_set.rs
index 58d510948..2da87f5ae 100644
--- a/vendor/tokio/tests/task_local_set.rs
+++ b/vendor/tokio/tests/task_local_set.rs
@@ -6,18 +6,23 @@ use futures::{
FutureExt,
};
-use tokio::runtime::{self, Runtime};
+use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
use tokio::time;
+#[cfg(not(tokio_wasi))]
use std::cell::Cell;
-use std::sync::atomic::Ordering::{self, SeqCst};
-use std::sync::atomic::{AtomicBool, AtomicUsize};
+use std::sync::atomic::AtomicBool;
+#[cfg(not(tokio_wasi))]
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+#[cfg(not(tokio_wasi))]
+use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
#[tokio::test(flavor = "current_thread")]
-async fn local_basic_scheduler() {
+async fn local_current_thread_scheduler() {
LocalSet::new()
.run_until(async {
task::spawn_local(async {}).await.unwrap();
@@ -25,6 +30,7 @@ async fn local_basic_scheduler() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool() {
thread_local! {
@@ -45,6 +51,7 @@ async fn local_threadpool() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_threadpool() {
thread_local! {
@@ -60,6 +67,7 @@ async fn localset_future_threadpool() {
local.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_timers() {
static RAN1: AtomicBool = AtomicBool::new(false);
@@ -67,11 +75,11 @@ async fn localset_future_timers() {
let local = LocalSet::new();
local.spawn_local(async move {
- time::sleep(Duration::from_millis(10)).await;
+ time::sleep(Duration::from_millis(5)).await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
- time::sleep(Duration::from_millis(20)).await;
+ time::sleep(Duration::from_millis(10)).await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
@@ -104,6 +112,7 @@ async fn localset_future_drives_all_local_futs() {
assert!(RAN3.load(Ordering::SeqCst));
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
@@ -126,7 +135,23 @@ async fn local_threadpool_timer() {
})
.await;
}
+#[test]
+fn enter_guard_spawn() {
+ let local = LocalSet::new();
+ let _guard = local.enter();
+ // Run the local task set.
+
+ let join = task::spawn_local(async { true });
+ let rt = runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ local.block_on(&rt, async move {
+ assert!(join.await.unwrap());
+ });
+}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[test]
// This will panic, since the thread that calls `block_on` cannot use
// in-place blocking inside of `block_on`.
@@ -153,6 +178,7 @@ fn local_threadpool_blocking_in_place() {
});
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_blocking_run() {
thread_local! {
@@ -181,6 +207,7 @@ async fn local_threadpool_blocking_run() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn all_spawns_are_local() {
use futures::future;
@@ -207,6 +234,7 @@ async fn all_spawns_are_local() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn nested_spawn_is_local() {
thread_local! {
@@ -242,6 +270,7 @@ async fn nested_spawn_is_local() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[test]
fn join_local_future_elsewhere() {
thread_local! {
@@ -255,14 +284,12 @@ fn join_local_future_elsewhere() {
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let join = task::spawn_local(async move {
- println!("hello world running...");
assert!(
ON_RT_THREAD.with(|cell| cell.get()),
"local task must run on local thread, no matter where it is awaited"
);
rx.await.unwrap();
- println!("hello world task done");
"hello world"
});
let join2 = task::spawn(async move {
@@ -272,16 +299,34 @@ fn join_local_future_elsewhere() {
);
tx.send(()).expect("task shouldn't have ended yet");
- println!("waking up hello world...");
join.await.expect("task should complete successfully");
-
- println!("hello world task joined");
});
join2.await.unwrap()
});
}
+// Tests for <https://github.com/tokio-rs/tokio/issues/4973>
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
+#[tokio::test(flavor = "multi_thread")]
+async fn localset_in_thread_local() {
+ thread_local! {
+ static LOCAL_SET: LocalSet = LocalSet::new();
+ }
+
+ // holds runtime thread until end of main fn.
+ let (_tx, rx) = oneshot::channel::<()>();
+ let handle = tokio::runtime::Handle::current();
+
+ std::thread::spawn(move || {
+ LOCAL_SET.with(|local_set| {
+ handle.block_on(local_set.run_until(async move {
+ let _ = rx.await;
+ }))
+ });
+ });
+}
+
#[test]
fn drop_cancels_tasks() {
use std::rc::Rc;
@@ -299,9 +344,7 @@ fn drop_cancels_tasks() {
let _rc2 = rc2;
started_tx.send(()).unwrap();
- loop {
- time::sleep(Duration::from_secs(3600)).await;
- }
+ futures::future::pending::<()>().await;
});
local.block_on(&rt, async {
@@ -347,9 +390,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
- Err(RecvTimeoutError::Disconnected) => {
- println!("done_rx dropped, did the test thread panic?");
- }
+ Err(RecvTimeoutError::Disconnected) => {}
// Test completed successfully!
Ok(()) => {}
}
@@ -357,6 +398,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
thread.join().expect("test thread should not panic!")
}
+#[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")]
#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
@@ -379,6 +421,10 @@ fn drop_cancels_remote_tasks() {
});
}
+#[cfg_attr(
+ tokio_wasi,
+ ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
+)]
#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
@@ -400,13 +446,34 @@ fn local_tasks_wake_join_all() {
});
}
-#[tokio::test]
-async fn local_tasks_are_polled_after_tick() {
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
+#[test]
+fn local_tasks_are_polled_after_tick() {
+ // This test depends on timing, so we run it up to five times.
+ for _ in 0..4 {
+ let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
+ if res.is_ok() {
+ // success
+ return;
+ }
+ }
+
+ // Test failed 4 times. Try one more time without catching panics. If it
+ // fails again, the test fails.
+ local_tasks_are_polled_after_tick_inner();
+}
+
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
+#[tokio::main(flavor = "current_thread")]
+async fn local_tasks_are_polled_after_tick_inner() {
// Reproduces issues #1899 and #1900
static RX1: AtomicUsize = AtomicUsize::new(0);
static RX2: AtomicUsize = AtomicUsize::new(0);
- static EXPECTED: usize = 500;
+ const EXPECTED: usize = 500;
+
+ RX1.store(0, SeqCst);
+ RX2.store(0, SeqCst);
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -416,7 +483,7 @@ async fn local_tasks_are_polled_after_tick() {
.run_until(async {
let task2 = task::spawn(async move {
// Wait a bit
- time::sleep(Duration::from_millis(100)).await;
+ time::sleep(Duration::from_millis(10)).await;
let mut oneshots = Vec::with_capacity(EXPECTED);
@@ -427,18 +494,21 @@ async fn local_tasks_are_polled_after_tick() {
tx.send(oneshot_rx).unwrap();
}
- time::sleep(Duration::from_millis(100)).await;
+ time::sleep(Duration::from_millis(10)).await;
for tx in oneshots.drain(..) {
tx.send(()).unwrap();
}
- time::sleep(Duration::from_millis(300)).await;
- let rx1 = RX1.load(SeqCst);
- let rx2 = RX2.load(SeqCst);
- println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
- assert_eq!(EXPECTED, rx1);
- assert_eq!(EXPECTED, rx2);
+ loop {
+ time::sleep(Duration::from_millis(20)).await;
+ let rx1 = RX1.load(SeqCst);
+ let rx2 = RX2.load(SeqCst);
+
+ if rx1 == EXPECTED && rx2 == EXPECTED {
+ break;
+ }
+ }
});
while let Some(oneshot) = rx.recv().await {
@@ -500,7 +570,122 @@ async fn spawn_wakes_localset() {
}
}
-fn rt() -> Runtime {
+#[test]
+fn store_local_set_in_thread_local_with_runtime() {
+ use tokio::runtime::Runtime;
+
+ thread_local! {
+ static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
+ }
+
+ struct RtAndLocalSet {
+ rt: Runtime,
+ local: LocalSet,
+ }
+
+ impl RtAndLocalSet {
+ fn new() -> RtAndLocalSet {
+ RtAndLocalSet {
+ rt: tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap(),
+ local: LocalSet::new(),
+ }
+ }
+
+ async fn inner_method(&self) {
+ self.local
+ .run_until(async move {
+ tokio::task::spawn_local(async {});
+ })
+ .await
+ }
+
+ fn method(&self) {
+ self.rt.block_on(self.inner_method());
+ }
+ }
+
+ CURRENT.with(|f| {
+ f.method();
+ });
+}
+
+#[cfg(tokio_unstable)]
+mod unstable {
+ use tokio::runtime::UnhandledPanic;
+ use tokio::task::LocalSet;
+
+ #[tokio::test]
+ #[should_panic(
+ expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
+ )]
+ async fn shutdown_on_panic() {
+ LocalSet::new()
+ .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ .run_until(async {
+ tokio::task::spawn_local(async {
+ panic!("boom");
+ });
+
+ futures::future::pending::<()>().await;
+ })
+ .await;
+ }
+
+ // This test compares that, when the task driving `run_until` has already
+ // consumed budget, the `run_until` future has less budget than a "spawned"
+ // task.
+ //
+ // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
+ // internally. This is why the test uses indirection to test this.
+ #[tokio::test]
+ async fn run_until_does_not_get_own_budget() {
+ // Consume some budget
+ tokio::task::consume_budget().await;
+
+ LocalSet::new()
+ .run_until(async {
+ let spawned = tokio::spawn(async {
+ let mut spawned_n = 0;
+
+ {
+ let mut spawned = tokio_test::task::spawn(async {
+ loop {
+ spawned_n += 1;
+ tokio::task::consume_budget().await;
+ }
+ });
+ // Poll once
+ assert!(!spawned.poll().is_ready());
+ }
+
+ spawned_n
+ });
+
+ let mut run_until_n = 0;
+ {
+ let mut run_until = tokio_test::task::spawn(async {
+ loop {
+ run_until_n += 1;
+ tokio::task::consume_budget().await;
+ }
+ });
+ // Poll once
+ assert!(!run_until.poll().is_ready());
+ }
+
+ let spawned_n = spawned.await.unwrap();
+ assert_ne!(spawned_n, 0);
+ assert_ne!(run_until_n, 0);
+ assert!(spawned_n > run_until_n);
+ })
+ .await
+ }
+}
+
+fn rt() -> runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()