diff options
Diffstat (limited to 'vendor/tokio/tests/rt_common.rs')
-rw-r--r-- | vendor/tokio/tests/rt_common.rs | 334 |
1 files changed, 290 insertions, 44 deletions
diff --git a/vendor/tokio/tests/rt_common.rs b/vendor/tokio/tests/rt_common.rs index cb1d0f661..9c6add047 100644 --- a/vendor/tokio/tests/rt_common.rs +++ b/vendor/tokio/tests/rt_common.rs @@ -2,13 +2,16 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -// Tests to run on both current-thread & thread-pool runtime variants. +// Tests to run on both current-thread & multi-thread runtime variants. macro_rules! rt_test { ($($t:tt)*) => { mod current_thread_scheduler { $($t)* + #[cfg(not(target_os="wasi"))] + const NUM_WORKERS: usize = 1; + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new_current_thread() .enable_all() @@ -18,9 +21,12 @@ macro_rules! rt_test { } } + #[cfg(not(tokio_wasi))] // Wasi doesn't support threads mod threaded_scheduler_4_threads { $($t)* + const NUM_WORKERS: usize = 4; + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new_multi_thread() .worker_threads(4) @@ -31,9 +37,12 @@ macro_rules! rt_test { } } + #[cfg(not(tokio_wasi))] // Wasi doesn't support threads mod threaded_scheduler_1_thread { $($t)* + const NUM_WORKERS: usize = 1; + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new_multi_thread() .worker_threads(1) @@ -55,18 +64,30 @@ fn send_sync_bound() { } rt_test! { - use tokio::net::{TcpListener, TcpStream, UdpSocket}; + #[cfg(not(target_os="wasi"))] + use tokio::net::{TcpListener, TcpStream}; + #[cfg(not(target_os="wasi"))] use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::runtime::Runtime; use tokio::sync::oneshot; use tokio::{task, time}; - use tokio_test::{assert_err, assert_ok}; + + #[cfg(not(target_os="wasi"))] + use tokio_test::assert_err; + use tokio_test::assert_ok; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; - use std::sync::{mpsc, Arc}; + + #[cfg(not(target_os="wasi"))] + use std::sync::mpsc; + + use std::sync::Arc; use std::task::{Context, Poll}; + + #[cfg(not(target_os="wasi"))] use std::thread; use std::time::{Duration, Instant}; @@ -83,6 +104,7 @@ rt_test! { } + #[cfg(not(target_os="wasi"))] #[test] fn block_on_async() { let rt = rt(); @@ -164,6 +186,7 @@ rt_test! { assert_eq!(out, "ZOMG"); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_many_from_block_on() { use tokio::sync::mpsc; @@ -214,6 +237,7 @@ rt_test! { } } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_many_from_task() { use tokio::sync::mpsc; @@ -226,14 +250,6 @@ rt_test! { tokio::spawn(async move { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); - /* - for _ in 0..100 { - tokio::spawn(async move { }); - } - - tokio::task::yield_now().await; - */ - let mut txs = (0..ITER) .map(|i| { let (tx, rx) = oneshot::channel(); @@ -275,6 +291,31 @@ rt_test! { } #[test] + fn spawn_one_from_block_on_called_on_handle() { + let rt = rt(); + let (tx, rx) = oneshot::channel(); + + #[allow(clippy::async_yields_async)] + let handle = rt.handle().block_on(async { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + "DONE" + }) + }); + + let out = rt.block_on(async { + let msg = assert_ok!(rx.await); + + let out = assert_ok!(handle.await); + assert_eq!(out, "DONE"); + + msg + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] fn spawn_await_chain() { let rt = rt(); @@ -329,6 +370,7 @@ rt_test! { assert_eq!(out, "ZOMG"); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn complete_block_on_under_load() { let rt = rt(); @@ -352,6 +394,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn complete_task_under_load() { let rt = rt(); @@ -381,6 +424,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_other_thread_idle() { let rt = rt(); @@ -401,6 +445,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_other_thread_under_load() { let rt = rt(); @@ -461,6 +506,7 @@ rt_test! { assert!(now.elapsed() >= dur); } + #[cfg(not(target_os="wasi"))] // Wasi does not support bind #[test] fn block_on_socket() { let rt = rt(); @@ -481,6 +527,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_blocking() { let rt = rt(); @@ -496,6 +543,7 @@ rt_test! { assert_eq!(out, "hello") } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_blocking_from_blocking() { let rt = rt(); @@ -511,6 +559,7 @@ rt_test! { assert_eq!(out, "hello") } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn sleep_from_blocking() { let rt = rt(); @@ -531,6 +580,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support bind #[test] fn socket_from_blocking() { let rt = rt(); @@ -554,6 +604,7 @@ rt_test! { }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn always_active_parker() { // This test it to show that we will always have @@ -600,6 +651,7 @@ rt_test! { // concern. There also isn't a great/obvious solution to take. For now, the // test is disabled. #[cfg(not(windows))] + #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads fn io_driver_called_when_under_load() { let rt = rt(); @@ -607,7 +659,12 @@ rt_test! { for _ in 0..100 { rt.spawn(async { loop { - tokio::task::yield_now().await; + // Don't use Tokio's `yield_now()` to avoid special defer + // logic. + futures::future::poll_fn::<(), _>(|cx| { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }).await; } }); } @@ -635,6 +692,113 @@ rt_test! { }); } + /// Tests that yielded tasks are not scheduled until **after** resource + /// drivers are polled. + /// + /// The OS does not guarantee when I/O events are delivered, so there may be + /// more yields than anticipated. This makes the test slightly flaky. To + /// help avoid flakiness, we run the test 10 times and only fail it after + /// 10 failures in a row. + /// + /// Note that if the test fails by panicking rather than by returning false, + /// then we fail it immediately. That kind of failure should not happen + /// spuriously. + #[test] + #[cfg(not(target_os="wasi"))] + fn yield_defers_until_park() { + for _ in 0..10 { + if yield_defers_until_park_inner() { + // test passed + return; + } + + // Wait a bit and run the test again. + std::thread::sleep(std::time::Duration::from_secs(2)); + } + + panic!("yield_defers_until_park is failing consistently"); + } + + /// Implementation of `yield_defers_until_park` test. Returns `true` if the + /// test passed. + #[cfg(not(target_os="wasi"))] + fn yield_defers_until_park_inner() -> bool { + use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; + use std::sync::Barrier; + + let rt = rt(); + + let flag = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(NUM_WORKERS)); + + rt.block_on(async { + // Make sure other workers cannot steal tasks + #[allow(clippy::reversed_empty_ranges)] + for _ in 0..(NUM_WORKERS-1) { + let flag = flag.clone(); + let barrier = barrier.clone(); + + tokio::spawn(async move { + barrier.wait(); + + while !flag.load(SeqCst) { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + }); + } + + barrier.wait(); + + let (fail_test, fail_test_recv) = oneshot::channel::<()>(); + + let jh = tokio::spawn(async move { + // Create a TCP litener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::join!( + async { + // Done in a blocking manner intentionally. + let _socket = std::net::TcpStream::connect(addr).unwrap(); + + // Yield until connected + let mut cnt = 0; + while !flag.load(SeqCst){ + tokio::task::yield_now().await; + cnt += 1; + + if cnt >= 10 { + // yielded too many times; report failure and + // sleep forever so that the `fail_test` branch + // of the `select!` below triggers. + let _ = fail_test.send(()); + futures::future::pending::<()>().await; + break; + } + } + }, + async { + let _ = listener.accept().await.unwrap(); + flag.store(true, SeqCst); + } + ); + }); + + // Wait until the spawned task completes or fails. If no message is + // sent on `fail_test`, then the test succeeds. Otherwise, it fails. + let success = fail_test_recv.await.is_err(); + + if success { + // Check for panics in spawned task. + jh.abort(); + jh.await.unwrap(); + } + + success + }) + } + + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn client_server_block_on() { let rt = rt(); @@ -646,6 +810,7 @@ rt_test! { assert_err!(rx.try_recv()); } + #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")] #[test] fn panic_in_task() { let rt = rt(); @@ -674,11 +839,13 @@ rt_test! { #[test] #[should_panic] + #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] fn panic_in_block_on() { let rt = rt(); rt.block_on(async { panic!() }); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads async fn yield_once() { let mut yielded = false; poll_fn(|cx| { @@ -748,7 +915,11 @@ rt_test! { #[test] fn wake_while_rt_is_dropping() { - use tokio::task; + use tokio::sync::Barrier; + use core::sync::atomic::{AtomicBool, Ordering}; + + let drop_triggered = Arc::new(AtomicBool::new(false)); + let set_drop_triggered = drop_triggered.clone(); struct OnDrop<F: FnMut()>(F); @@ -762,17 +933,21 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let rt = rt(); + let barrier = Arc::new(Barrier::new(4)); + let barrier1 = barrier.clone(); + let barrier2 = barrier.clone(); + let barrier3 = barrier.clone(); - let h1 = rt.clone(); + let rt = rt(); rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. - let _ = rx1.await; + let _ = tokio::join!(rx1, barrier1.wait()); tx3.send(()).unwrap(); }); rt.spawn(async move { + let h1 = tokio::runtime::Handle::current(); // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. @@ -785,36 +960,40 @@ rt_test! { h1.spawn(async move { tx1.send(()).unwrap(); }); + // Just a sanity check that this entire thing actually happened + set_drop_triggered.store(true, Ordering::Relaxed); }); - let _ = rx2.await; + let _ = tokio::join!(rx2, barrier2.wait()); }); rt.spawn(async move { - let _ = rx3.await; + let _ = tokio::join!(rx3, barrier3.wait()); // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. tx2.send(()).unwrap(); }); - // Tick the loop - rt.block_on(async { - task::yield_now().await; - }); + // Wait until every oneshot channel has been polled. + rt.block_on(barrier.wait()); // Drop the rt drop(rt); + + // Make sure that the spawn actually happened + assert!(drop_triggered.load(Ordering::Relaxed)); } + #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind() #[test] fn io_notify_while_shutting_down() { - use std::net::Ipv6Addr; + use tokio::net::UdpSocket; use std::sync::Arc; for _ in 1..10 { let runtime = rt(); runtime.block_on(async { - let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let addr = socket.local_addr().unwrap(); let send_half = Arc::new(socket); let recv_half = send_half.clone(); @@ -840,6 +1019,7 @@ rt_test! { } } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); @@ -857,6 +1037,7 @@ rt_test! { Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } + #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn shutdown_timeout_0() { let runtime = rt(); @@ -888,6 +1069,7 @@ rt_test! { // See https://github.com/rust-lang/rust/issues/74875 #[test] #[cfg(not(windows))] + #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread; @@ -907,6 +1089,7 @@ rt_test! { }).join().unwrap(); } + #[cfg(not(target_os="wasi"))] // Wasi does not support bind async fn client_server(tx: mpsc::Sender<()>) { let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -931,6 +1114,7 @@ rt_test! { tx.send(()).unwrap(); } + #[cfg(not(tokio_wasi))] // Wasi does not support bind #[test] fn local_set_block_on_socket() { let rt = rt(); @@ -952,6 +1136,7 @@ rt_test! { }); } + #[cfg(not(tokio_wasi))] // Wasi does not support bind #[test] fn local_set_client_server_block_on() { let rt = rt(); @@ -965,6 +1150,7 @@ rt_test! { assert_err!(rx.try_recv()); } + #[cfg(not(tokio_wasi))] // Wasi does not support bind async fn client_server_local(tx: mpsc::Sender<()>) { let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -992,22 +1178,22 @@ rt_test! { #[test] fn coop() { use std::task::Poll::Ready; + use tokio::sync::mpsc; let rt = rt(); rt.block_on(async { - // Create a bunch of tasks - let mut tasks = (0..1_000).map(|_| { - tokio::spawn(async { }) - }).collect::<Vec<_>>(); + let (send, mut recv) = mpsc::unbounded_channel(); - // Hope that all the tasks complete... - time::sleep(Duration::from_millis(100)).await; + // Send a bunch of messages. + for _ in 0..1_000 { + send.send(()).unwrap(); + } poll_fn(|cx| { - // At least one task should not be ready - for task in &mut tasks { - if Pin::new(task).poll(cx).is_pending() { + // At least one response should return pending. + for _ in 0..1_000 { + if recv.poll_recv(cx).is_pending() { return Ready(()); } } @@ -1020,22 +1206,22 @@ rt_test! { #[test] fn coop_unconstrained() { use std::task::Poll::Ready; + use tokio::sync::mpsc; let rt = rt(); rt.block_on(async { - // Create a bunch of tasks - let mut tasks = (0..1_000).map(|_| { - tokio::spawn(async { }) - }).collect::<Vec<_>>(); + let (send, mut recv) = mpsc::unbounded_channel(); - // Hope that all the tasks complete... - time::sleep(Duration::from_millis(100)).await; + // Send a bunch of messages. + for _ in 0..1_000 { + send.send(()).unwrap(); + } tokio::task::unconstrained(poll_fn(|cx| { - // All the tasks should be ready - for task in &mut tasks { - assert!(Pin::new(task).poll(cx).is_ready()); + // All the responses should be ready. + for _ in 0..1_000 { + assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(()))); } Ready(()) @@ -1043,6 +1229,31 @@ rt_test! { }); } + #[cfg(tokio_unstable)] + #[test] + fn coop_consume_budget() { + let rt = rt(); + + rt.block_on(async { + poll_fn(|cx| { + let counter = Arc::new(std::sync::Mutex::new(0)); + let counter_clone = Arc::clone(&counter); + let mut worker = Box::pin(async move { + // Consume the budget until a yield happens + for _ in 0..1000 { + *counter.lock().unwrap() += 1; + task::consume_budget().await + } + }); + // Assert that the worker was yielded and it didn't manage + // to finish the whole work (assuming the total budget of 128) + assert!(Pin::new(&mut worker).poll(cx).is_pending()); + assert!(*counter_clone.lock().unwrap() < 1000); + std::task::Poll::Ready(()) + }).await; + }); + } + // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test] @@ -1060,7 +1271,7 @@ rt_test! { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); let mut tasks = vec![]; - // Spawn a bunch of tasks that ping ping between each other to + // Spawn a bunch of tasks that ping-pong between each other to // saturate the runtime. for _ in 0..NUM { let (tx1, mut rx1) = mpsc::unbounded_channel(); @@ -1106,4 +1317,39 @@ rt_test! { } }); } + + #[test] + #[cfg(not(target_os="wasi"))] + fn shutdown_concurrent_spawn() { + const NUM_TASKS: usize = 10_000; + for _ in 0..5 { + let (tx, rx) = std::sync::mpsc::channel(); + let rt = rt(); + + let mut txs = vec![]; + + for _ in 0..NUM_TASKS { + let (tx, rx) = tokio::sync::oneshot::channel(); + txs.push(tx); + rt.spawn(async move { + rx.await.unwrap(); + }); + } + + // Prime the tasks + rt.block_on(async { tokio::task::yield_now().await }); + + let th = std::thread::spawn(move || { + tx.send(()).unwrap(); + for tx in txs.drain(..) { + let _ = tx.send(()); + } + }); + + rx.recv().unwrap(); + drop(rt); + + th.join().unwrap(); + } + } } |