diff options
Diffstat (limited to 'vendor/tokio/tests/rt_threaded.rs')
-rw-r--r-- | vendor/tokio/tests/rt_threaded.rs | 305 |
1 files changed, 296 insertions, 9 deletions
diff --git a/vendor/tokio/tests/rt_threaded.rs b/vendor/tokio/tests/rt_threaded.rs index 9e76c4ed0..69b186947 100644 --- a/vendor/tokio/tests/rt_threaded.rs +++ b/vendor/tokio/tests/rt_threaded.rs @@ -1,9 +1,9 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] +#![cfg(all(feature = "full", not(tokio_wasi)))] use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime::{self, Runtime}; +use tokio::runtime; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; @@ -15,13 +15,23 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; +macro_rules! cfg_metrics { + ($($t:tt)*) => { + #[cfg(tokio_unstable)] + { + $( $t )* + } + } +} + #[test] fn single_thread() { // No panic when starting a runtime w/ a single thread let _ = runtime::Builder::new_multi_thread() .enable_all() .worker_threads(1) - .build(); + .build() + .unwrap(); } #[test] @@ -54,6 +64,39 @@ fn many_oneshot_futures() { drop(rt); } } + +#[test] +fn spawn_two() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(1, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(1, local); + } +} + #[test] fn many_multishot_futures() { const CHAIN: usize = 200; @@ -119,6 +162,32 @@ fn many_multishot_futures() { } #[test] +fn lifo_slot_budget() { + async fn my_fn() { + spawn_another(); + } + + fn spawn_another() { + tokio::spawn(my_fn()); + } + + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap(); + + let (send, recv) = oneshot::channel(); + + rt.spawn(async move { + tokio::spawn(my_fn()); + let _ = send.send(()); + }); + + let _ = rt.block_on(recv); +} + +#[test] fn spawn_shutdown() { let rt = rt(); let (tx, rx) = mpsc::channel(); @@ -373,6 +442,32 @@ fn coop_and_block_in_place() { }); } +#[test] +fn yield_after_block_in_place() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async move { + // Block in place then enter a new runtime + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + rt.block_on(async {}); + }); + + // Yield, then complete + tokio::task::yield_now().await; + }) + .await + .unwrap() + }); +} + // Testing this does not panic #[test] fn max_blocking_threads() { @@ -438,9 +533,7 @@ fn wake_during_shutdown() { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let me = Pin::into_inner(self); let mut lock = me.shared.lock().unwrap(); - println!("poll {}", me.put_waker); if me.put_waker { - println!("putting"); lock.waker = Some(cx.waker().clone()); } Poll::Pending @@ -449,13 +542,11 @@ fn wake_during_shutdown() { impl Drop for MyFuture { fn drop(&mut self) { - println!("drop {} start", self.put_waker); let mut lock = self.shared.lock().unwrap(); if !self.put_waker { lock.waker.take().unwrap().wake(); } drop(lock); - println!("drop {} stop", self.put_waker); } } @@ -473,6 +564,202 @@ fn wake_during_shutdown() { rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await }); } -fn rt() -> Runtime { - Runtime::new().unwrap() +#[should_panic] +#[tokio::test] +async fn test_block_in_place1() { + tokio::task::block_in_place(|| {}); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_block_in_place2() { + tokio::task::block_in_place(|| {}); +} + +#[should_panic] +#[tokio::main(flavor = "current_thread")] +#[test] +async fn test_block_in_place3() { + tokio::task::block_in_place(|| {}); +} + +#[tokio::main] +#[test] +async fn test_block_in_place4() { + tokio::task::block_in_place(|| {}); +} + +// Repro for tokio-rs/tokio#5239 +#[test] +fn test_nested_block_in_place_with_block_on_between() { + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + // Needs to be more than 0 + .max_blocking_threads(1) + .build() + .unwrap(); + + // Triggered by a race condition, so run a few times to make sure it is OK. + for _ in 0..100 { + let h = rt.handle().clone(); + + rt.block_on(async move { + tokio::spawn(async move { + tokio::task::block_in_place(|| { + h.block_on(async { + tokio::task::block_in_place(|| {}); + }); + }) + }) + .await + .unwrap() + }); + } +} + +// Testing the tuning logic is tricky as it is inherently timing based, and more +// of a heuristic than an exact behavior. This test checks that the interval +// changes over time based on load factors. There are no assertions, completion +// is sufficient. If there is a regression, this test will hang. In theory, we +// could add limits, but that would be likely to fail on CI. +#[test] +#[cfg(not(tokio_no_tuning_tests))] +fn test_tuning() { + use std::sync::atomic::AtomicBool; + use std::time::Duration; + + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) { + if flag.load(Relaxed) { + if stall { + std::thread::sleep(Duration::from_micros(5)); + } + + counter.fetch_add(1, Relaxed); + tokio::spawn(async move { iter(flag, counter, stall) }); + } + } + + let flag = Arc::new(AtomicBool::new(true)); + let counter = Arc::new(AtomicUsize::new(61)); + let interval = Arc::new(AtomicUsize::new(61)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, true) }); + } + + // Now, hammer the injection queue until the interval drops. + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 8 { + n += 1; + } else { + n = 0; + } + + // Make sure we get a few good rounds. Jitter in the tuning could result + // in one "good" value without being representative of reaching a good + // state. + if n == 3 { + break; + } + + if Arc::strong_count(&interval) < 5_000 { + let counter = counter.clone(); + let interval = interval.clone(); + + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + + std::thread::yield_now(); + } + } + + flag.store(false, Relaxed); + + let w = Arc::downgrade(&interval); + drop(interval); + + while w.strong_count() > 0 { + std::thread::sleep(Duration::from_micros(500)); + } + + // Now, run it again with a faster task + let flag = Arc::new(AtomicBool::new(true)); + // Set it high, we know it shouldn't ever really be this high + let counter = Arc::new(AtomicUsize::new(10_000)); + let interval = Arc::new(AtomicUsize::new(10_000)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, false) }); + } + + // Now, hammer the injection queue until the interval reaches the expected range. + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 1_000 && curr > 32 { + n += 1; + } else { + n = 0; + } + + if n == 3 { + break; + } + + if Arc::strong_count(&interval) <= 5_000 { + let counter = counter.clone(); + let interval = interval.clone(); + + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + } + + std::thread::yield_now(); + } + + flag.store(false, Relaxed); +} + +fn rt() -> runtime::Runtime { + runtime::Runtime::new().unwrap() +} + +#[cfg(tokio_unstable)] +mod unstable { + use super::*; + + #[test] + fn test_disable_lifo_slot() { + let rt = runtime::Builder::new_multi_thread() + .disable_lifo_slot() + .worker_threads(2) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + // Spawn another task and block the thread until completion. If the LIFO slot + // is used then the test doesn't complete. + futures::executor::block_on(tokio::spawn(async {})).unwrap(); + }) + .await + .unwrap(); + }) + } } |