summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/rt_threaded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/tests/rt_threaded.rs')
-rw-r--r--vendor/tokio/tests/rt_threaded.rs305
1 files changed, 296 insertions, 9 deletions
diff --git a/vendor/tokio/tests/rt_threaded.rs b/vendor/tokio/tests/rt_threaded.rs
index 9e76c4ed0..69b186947 100644
--- a/vendor/tokio/tests/rt_threaded.rs
+++ b/vendor/tokio/tests/rt_threaded.rs
@@ -1,9 +1,9 @@
#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
+#![cfg(all(feature = "full", not(tokio_wasi)))]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
-use tokio::runtime::{self, Runtime};
+use tokio::runtime;
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
@@ -15,13 +15,23 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};
+macro_rules! cfg_metrics {
+ ($($t:tt)*) => {
+ #[cfg(tokio_unstable)]
+ {
+ $( $t )*
+ }
+ }
+}
+
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
- .build();
+ .build()
+ .unwrap();
}
#[test]
@@ -54,6 +64,39 @@ fn many_oneshot_futures() {
drop(rt);
}
}
+
+#[test]
+fn spawn_two() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ });
+ });
+
+ assert_ok!(rx.await)
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(1, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(1, local);
+ }
+}
+
#[test]
fn many_multishot_futures() {
const CHAIN: usize = 200;
@@ -119,6 +162,32 @@ fn many_multishot_futures() {
}
#[test]
+fn lifo_slot_budget() {
+ async fn my_fn() {
+ spawn_another();
+ }
+
+ fn spawn_another() {
+ tokio::spawn(my_fn());
+ }
+
+ let rt = runtime::Builder::new_multi_thread()
+ .enable_all()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let (send, recv) = oneshot::channel();
+
+ rt.spawn(async move {
+ tokio::spawn(my_fn());
+ let _ = send.send(());
+ });
+
+ let _ = rt.block_on(recv);
+}
+
+#[test]
fn spawn_shutdown() {
let rt = rt();
let (tx, rx) = mpsc::channel();
@@ -373,6 +442,32 @@ fn coop_and_block_in_place() {
});
}
+#[test]
+fn yield_after_block_in_place() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async {
+ tokio::spawn(async move {
+ // Block in place then enter a new runtime
+ tokio::task::block_in_place(|| {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+
+ rt.block_on(async {});
+ });
+
+ // Yield, then complete
+ tokio::task::yield_now().await;
+ })
+ .await
+ .unwrap()
+ });
+}
+
// Testing this does not panic
#[test]
fn max_blocking_threads() {
@@ -438,9 +533,7 @@ fn wake_during_shutdown() {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = Pin::into_inner(self);
let mut lock = me.shared.lock().unwrap();
- println!("poll {}", me.put_waker);
if me.put_waker {
- println!("putting");
lock.waker = Some(cx.waker().clone());
}
Poll::Pending
@@ -449,13 +542,11 @@ fn wake_during_shutdown() {
impl Drop for MyFuture {
fn drop(&mut self) {
- println!("drop {} start", self.put_waker);
let mut lock = self.shared.lock().unwrap();
if !self.put_waker {
lock.waker.take().unwrap().wake();
}
drop(lock);
- println!("drop {} stop", self.put_waker);
}
}
@@ -473,6 +564,202 @@ fn wake_during_shutdown() {
rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}
-fn rt() -> Runtime {
- Runtime::new().unwrap()
+#[should_panic]
+#[tokio::test]
+async fn test_block_in_place1() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_block_in_place2() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[should_panic]
+#[tokio::main(flavor = "current_thread")]
+#[test]
+async fn test_block_in_place3() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::main]
+#[test]
+async fn test_block_in_place4() {
+ tokio::task::block_in_place(|| {});
+}
+
+// Repro for tokio-rs/tokio#5239
+#[test]
+fn test_nested_block_in_place_with_block_on_between() {
+ let rt = runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ // Needs to be more than 0
+ .max_blocking_threads(1)
+ .build()
+ .unwrap();
+
+ // Triggered by a race condition, so run a few times to make sure it is OK.
+ for _ in 0..100 {
+ let h = rt.handle().clone();
+
+ rt.block_on(async move {
+ tokio::spawn(async move {
+ tokio::task::block_in_place(|| {
+ h.block_on(async {
+ tokio::task::block_in_place(|| {});
+ });
+ })
+ })
+ .await
+ .unwrap()
+ });
+ }
+}
+
+// Testing the tuning logic is tricky as it is inherently timing based, and more
+// of a heuristic than an exact behavior. This test checks that the interval
+// changes over time based on load factors. There are no assertions, completion
+// is sufficient. If there is a regression, this test will hang. In theory, we
+// could add limits, but that would be likely to fail on CI.
+#[test]
+#[cfg(not(tokio_no_tuning_tests))]
+fn test_tuning() {
+ use std::sync::atomic::AtomicBool;
+ use std::time::Duration;
+
+ let rt = runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
+ if flag.load(Relaxed) {
+ if stall {
+ std::thread::sleep(Duration::from_micros(5));
+ }
+
+ counter.fetch_add(1, Relaxed);
+ tokio::spawn(async move { iter(flag, counter, stall) });
+ }
+ }
+
+ let flag = Arc::new(AtomicBool::new(true));
+ let counter = Arc::new(AtomicUsize::new(61));
+ let interval = Arc::new(AtomicUsize::new(61));
+
+ {
+ let flag = flag.clone();
+ let counter = counter.clone();
+ rt.spawn(async move { iter(flag, counter, true) });
+ }
+
+ // Now, hammer the injection queue until the interval drops.
+ let mut n = 0;
+ loop {
+ let curr = interval.load(Relaxed);
+
+ if curr <= 8 {
+ n += 1;
+ } else {
+ n = 0;
+ }
+
+ // Make sure we get a few good rounds. Jitter in the tuning could result
+ // in one "good" value without being representative of reaching a good
+ // state.
+ if n == 3 {
+ break;
+ }
+
+ if Arc::strong_count(&interval) < 5_000 {
+ let counter = counter.clone();
+ let interval = interval.clone();
+
+ rt.spawn(async move {
+ let prev = counter.swap(0, Relaxed);
+ interval.store(prev, Relaxed);
+ });
+
+ std::thread::yield_now();
+ }
+ }
+
+ flag.store(false, Relaxed);
+
+ let w = Arc::downgrade(&interval);
+ drop(interval);
+
+ while w.strong_count() > 0 {
+ std::thread::sleep(Duration::from_micros(500));
+ }
+
+ // Now, run it again with a faster task
+ let flag = Arc::new(AtomicBool::new(true));
+ // Set it high, we know it shouldn't ever really be this high
+ let counter = Arc::new(AtomicUsize::new(10_000));
+ let interval = Arc::new(AtomicUsize::new(10_000));
+
+ {
+ let flag = flag.clone();
+ let counter = counter.clone();
+ rt.spawn(async move { iter(flag, counter, false) });
+ }
+
+ // Now, hammer the injection queue until the interval reaches the expected range.
+ let mut n = 0;
+ loop {
+ let curr = interval.load(Relaxed);
+
+ if curr <= 1_000 && curr > 32 {
+ n += 1;
+ } else {
+ n = 0;
+ }
+
+ if n == 3 {
+ break;
+ }
+
+ if Arc::strong_count(&interval) <= 5_000 {
+ let counter = counter.clone();
+ let interval = interval.clone();
+
+ rt.spawn(async move {
+ let prev = counter.swap(0, Relaxed);
+ interval.store(prev, Relaxed);
+ });
+ }
+
+ std::thread::yield_now();
+ }
+
+ flag.store(false, Relaxed);
+}
+
+fn rt() -> runtime::Runtime {
+ runtime::Runtime::new().unwrap()
+}
+
+#[cfg(tokio_unstable)]
+mod unstable {
+ use super::*;
+
+ #[test]
+ fn test_disable_lifo_slot() {
+ let rt = runtime::Builder::new_multi_thread()
+ .disable_lifo_slot()
+ .worker_threads(2)
+ .build()
+ .unwrap();
+
+ rt.block_on(async {
+ tokio::spawn(async {
+ // Spawn another task and block the thread until completion. If the LIFO slot
+ // is used then the test doesn't complete.
+ futures::executor::block_on(tokio::spawn(async {})).unwrap();
+ })
+ .await
+ .unwrap();
+ })
+ }
}