diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/tests/rt_basic.rs | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/tests/rt_basic.rs')
-rw-r--r-- | vendor/tokio/tests/rt_basic.rs | 292 |
1 files changed, 291 insertions, 1 deletions
diff --git a/vendor/tokio/tests/rt_basic.rs b/vendor/tokio/tests/rt_basic.rs index 4b1bdadc1..6caf0a44b 100644 --- a/vendor/tokio/tests/rt_basic.rs +++ b/vendor/tokio/tests/rt_basic.rs @@ -3,15 +3,28 @@ use tokio::runtime::Runtime; use tokio::sync::oneshot; +use tokio::time::{timeout, Duration}; use tokio_test::{assert_err, assert_ok}; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Context, Poll}; use std::thread; -use tokio::time::{timeout, Duration}; mod support { pub(crate) mod mpsc_stream; } +macro_rules! cfg_metrics { + ($($t:tt)*) => { + #[cfg(tokio_unstable)] + { + $( $t )* + } + } +} + #[test] fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); @@ -136,6 +149,134 @@ fn acquire_mutex_in_drop() { } #[test] +fn drop_tasks_in_context() { + static SUCCESS: AtomicBool = AtomicBool::new(false); + + struct ContextOnDrop; + + impl Future for ContextOnDrop { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for ContextOnDrop { + fn drop(&mut self) { + if tokio::runtime::Handle::try_current().is_ok() { + SUCCESS.store(true, Ordering::SeqCst); + } + } + } + + let rt = rt(); + rt.spawn(ContextOnDrop); + drop(rt); + + assert!(SUCCESS.load(Ordering::SeqCst)); +} + +#[test] +#[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] +#[should_panic(expected = "boom")] +fn wake_in_drop_after_panic() { + let (tx, rx) = oneshot::channel::<()>(); + + struct WakeOnDrop(Option<oneshot::Sender<()>>); + + impl Drop for WakeOnDrop { + fn drop(&mut self) { + self.0.take().unwrap().send(()).unwrap(); + } + } + + let rt = rt(); + + rt.spawn(async move { + let _wake_on_drop = WakeOnDrop(Some(tx)); + // wait forever + futures::future::pending::<()>().await; + }); + + let _join = rt.spawn(async move { rx.await }); + + rt.block_on(async { + tokio::task::yield_now().await; + panic!("boom"); + }); +} + +#[test] +fn spawn_two() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(0, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(2, local); + } +} + +#[cfg_attr(tokio_wasi, ignore = "WASI: std::thread::spawn not supported")] +#[test] +fn spawn_remote() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + tx.send("ZOMG").unwrap(); + }); + + rx.await.unwrap() + }); + + handle.await.unwrap() + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(1, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(1, local); + } +} + +#[test] +#[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] #[should_panic( expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." )] @@ -150,6 +291,155 @@ fn timeout_panics_when_no_time_handle() { }); } +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::{Builder, RngSeed, UnhandledPanic}; + + #[test] + #[should_panic( + expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic" + )] + fn shutdown_on_panic() { + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + } + + #[test] + #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] + fn spawns_do_nothing() { + use std::sync::Arc; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt1 = Arc::new(rt); + let rt2 = rt1.clone(); + + let _ = std::thread::spawn(move || { + rt2.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + }) + .join(); + + let task = rt1.spawn(async {}); + let res = futures::executor::block_on(task); + assert!(res.is_err()); + } + + #[test] + #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] + fn shutdown_all_concurrent_block_on() { + const N: usize = 2; + use std::sync::{mpsc, Arc}; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt = Arc::new(rt); + let mut ths = vec![]; + let (tx, rx) = mpsc::channel(); + + for _ in 0..N { + let rt = rt.clone(); + let tx = tx.clone(); + ths.push(std::thread::spawn(move || { + rt.block_on(async { + tx.send(()).unwrap(); + futures::future::pending::<()>().await; + }); + })); + } + + for _ in 0..N { + rx.recv().unwrap(); + } + + rt.spawn(async { + panic!("boom"); + }); + + for th in ths { + assert!(th.join().is_err()); + } + } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt1 = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + let rt1_values = rt1.block_on(async { + let rand_1 = tokio::macros::support::thread_rng_n(100); + let rand_2 = tokio::macros::support::thread_rng_n(100); + + (rand_1, rand_2) + }); + + let rt2 = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + let rt2_values = rt2.block_on(async { + let rand_1 = tokio::macros::support::thread_rng_n(100); + let rand_2 = tokio::macros::support::thread_rng_n(100); + + (rand_1, rand_2) + }); + + assert_eq!(rt1_values, rt2_values); + } + + #[test] + fn rng_seed_multi_enter() { + let seed = b"bytes used to generate seed"; + + fn two_rand_values() -> (u32, u32) { + let rand_1 = tokio::macros::support::thread_rng_n(100); + let rand_2 = tokio::macros::support::thread_rng_n(100); + + (rand_1, rand_2) + } + + let rt1 = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + let rt1_values_1 = rt1.block_on(async { two_rand_values() }); + let rt1_values_2 = rt1.block_on(async { two_rand_values() }); + + let rt2 = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + let rt2_values_1 = rt2.block_on(async { two_rand_values() }); + let rt2_values_2 = rt2.block_on(async { two_rand_values() }); + + assert_eq!(rt1_values_1, rt2_values_1); + assert_eq!(rt1_values_2, rt2_values_2); + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() |