summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/rt_common.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/tests/rt_common.rs')
-rw-r--r--vendor/tokio/tests/rt_common.rs334
1 files changed, 290 insertions, 44 deletions
diff --git a/vendor/tokio/tests/rt_common.rs b/vendor/tokio/tests/rt_common.rs
index cb1d0f661..9c6add047 100644
--- a/vendor/tokio/tests/rt_common.rs
+++ b/vendor/tokio/tests/rt_common.rs
@@ -2,13 +2,16 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-// Tests to run on both current-thread & thread-pool runtime variants.
+// Tests to run on both current-thread & multi-thread runtime variants.
macro_rules! rt_test {
($($t:tt)*) => {
mod current_thread_scheduler {
$($t)*
+ #[cfg(not(target_os="wasi"))]
+ const NUM_WORKERS: usize = 1;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -18,9 +21,12 @@ macro_rules! rt_test {
}
}
+ #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_4_threads {
$($t)*
+ const NUM_WORKERS: usize = 4;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
@@ -31,9 +37,12 @@ macro_rules! rt_test {
}
}
+ #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_1_thread {
$($t)*
+ const NUM_WORKERS: usize = 1;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
@@ -55,18 +64,30 @@ fn send_sync_bound() {
}
rt_test! {
- use tokio::net::{TcpListener, TcpStream, UdpSocket};
+ #[cfg(not(target_os="wasi"))]
+ use tokio::net::{TcpListener, TcpStream};
+ #[cfg(not(target_os="wasi"))]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::{task, time};
- use tokio_test::{assert_err, assert_ok};
+
+ #[cfg(not(target_os="wasi"))]
+ use tokio_test::assert_err;
+ use tokio_test::assert_ok;
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
- use std::sync::{mpsc, Arc};
+
+ #[cfg(not(target_os="wasi"))]
+ use std::sync::mpsc;
+
+ use std::sync::Arc;
use std::task::{Context, Poll};
+
+ #[cfg(not(target_os="wasi"))]
use std::thread;
use std::time::{Duration, Instant};
@@ -83,6 +104,7 @@ rt_test! {
}
+ #[cfg(not(target_os="wasi"))]
#[test]
fn block_on_async() {
let rt = rt();
@@ -164,6 +186,7 @@ rt_test! {
assert_eq!(out, "ZOMG");
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_block_on() {
use tokio::sync::mpsc;
@@ -214,6 +237,7 @@ rt_test! {
}
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_task() {
use tokio::sync::mpsc;
@@ -226,14 +250,6 @@ rt_test! {
tokio::spawn(async move {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
- /*
- for _ in 0..100 {
- tokio::spawn(async move { });
- }
-
- tokio::task::yield_now().await;
- */
-
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
@@ -275,6 +291,31 @@ rt_test! {
}
#[test]
+ fn spawn_one_from_block_on_called_on_handle() {
+ let rt = rt();
+ let (tx, rx) = oneshot::channel();
+
+ #[allow(clippy::async_yields_async)]
+ let handle = rt.handle().block_on(async {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ "DONE"
+ })
+ });
+
+ let out = rt.block_on(async {
+ let msg = assert_ok!(rx.await);
+
+ let out = assert_ok!(handle.await);
+ assert_eq!(out, "DONE");
+
+ msg
+ });
+
+ assert_eq!(out, "ZOMG");
+ }
+
+ #[test]
fn spawn_await_chain() {
let rt = rt();
@@ -329,6 +370,7 @@ rt_test! {
assert_eq!(out, "ZOMG");
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_block_on_under_load() {
let rt = rt();
@@ -352,6 +394,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_task_under_load() {
let rt = rt();
@@ -381,6 +424,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_idle() {
let rt = rt();
@@ -401,6 +445,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_under_load() {
let rt = rt();
@@ -461,6 +506,7 @@ rt_test! {
assert!(now.elapsed() >= dur);
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn block_on_socket() {
let rt = rt();
@@ -481,6 +527,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_blocking() {
let rt = rt();
@@ -496,6 +543,7 @@ rt_test! {
assert_eq!(out, "hello")
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_blocking_from_blocking() {
let rt = rt();
@@ -511,6 +559,7 @@ rt_test! {
assert_eq!(out, "hello")
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn sleep_from_blocking() {
let rt = rt();
@@ -531,6 +580,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn socket_from_blocking() {
let rt = rt();
@@ -554,6 +604,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn always_active_parker() {
// This test it to show that we will always have
@@ -600,6 +651,7 @@ rt_test! {
// concern. There also isn't a great/obvious solution to take. For now, the
// test is disabled.
#[cfg(not(windows))]
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
fn io_driver_called_when_under_load() {
let rt = rt();
@@ -607,7 +659,12 @@ rt_test! {
for _ in 0..100 {
rt.spawn(async {
loop {
- tokio::task::yield_now().await;
+ // Don't use Tokio's `yield_now()` to avoid special defer
+ // logic.
+ futures::future::poll_fn::<(), _>(|cx| {
+ cx.waker().wake_by_ref();
+ std::task::Poll::Pending
+ }).await;
}
});
}
@@ -635,6 +692,113 @@ rt_test! {
});
}
+ /// Tests that yielded tasks are not scheduled until **after** resource
+ /// drivers are polled.
+ ///
+ /// The OS does not guarantee when I/O events are delivered, so there may be
+ /// more yields than anticipated. This makes the test slightly flaky. To
+ /// help avoid flakiness, we run the test 10 times and only fail it after
+ /// 10 failures in a row.
+ ///
+ /// Note that if the test fails by panicking rather than by returning false,
+ /// then we fail it immediately. That kind of failure should not happen
+ /// spuriously.
+ #[test]
+ #[cfg(not(target_os="wasi"))]
+ fn yield_defers_until_park() {
+ for _ in 0..10 {
+ if yield_defers_until_park_inner() {
+ // test passed
+ return;
+ }
+
+ // Wait a bit and run the test again.
+ std::thread::sleep(std::time::Duration::from_secs(2));
+ }
+
+ panic!("yield_defers_until_park is failing consistently");
+ }
+
+ /// Implementation of `yield_defers_until_park` test. Returns `true` if the
+ /// test passed.
+ #[cfg(not(target_os="wasi"))]
+ fn yield_defers_until_park_inner() -> bool {
+ use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
+ use std::sync::Barrier;
+
+ let rt = rt();
+
+ let flag = Arc::new(AtomicBool::new(false));
+ let barrier = Arc::new(Barrier::new(NUM_WORKERS));
+
+ rt.block_on(async {
+ // Make sure other workers cannot steal tasks
+ #[allow(clippy::reversed_empty_ranges)]
+ for _ in 0..(NUM_WORKERS-1) {
+ let flag = flag.clone();
+ let barrier = barrier.clone();
+
+ tokio::spawn(async move {
+ barrier.wait();
+
+ while !flag.load(SeqCst) {
+ std::thread::sleep(std::time::Duration::from_millis(1));
+ }
+ });
+ }
+
+ barrier.wait();
+
+ let (fail_test, fail_test_recv) = oneshot::channel::<()>();
+
+ let jh = tokio::spawn(async move {
+ // Create a TCP litener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ tokio::join!(
+ async {
+ // Done in a blocking manner intentionally.
+ let _socket = std::net::TcpStream::connect(addr).unwrap();
+
+ // Yield until connected
+ let mut cnt = 0;
+ while !flag.load(SeqCst){
+ tokio::task::yield_now().await;
+ cnt += 1;
+
+ if cnt >= 10 {
+ // yielded too many times; report failure and
+ // sleep forever so that the `fail_test` branch
+ // of the `select!` below triggers.
+ let _ = fail_test.send(());
+ futures::future::pending::<()>().await;
+ break;
+ }
+ }
+ },
+ async {
+ let _ = listener.accept().await.unwrap();
+ flag.store(true, SeqCst);
+ }
+ );
+ });
+
+ // Wait until the spawned task completes or fails. If no message is
+ // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
+ let success = fail_test_recv.await.is_err();
+
+ if success {
+ // Check for panics in spawned task.
+ jh.abort();
+ jh.await.unwrap();
+ }
+
+ success
+ })
+ }
+
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn client_server_block_on() {
let rt = rt();
@@ -646,6 +810,7 @@ rt_test! {
assert_err!(rx.try_recv());
}
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")]
#[test]
fn panic_in_task() {
let rt = rt();
@@ -674,11 +839,13 @@ rt_test! {
#[test]
#[should_panic]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
fn panic_in_block_on() {
let rt = rt();
rt.block_on(async { panic!() });
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
@@ -748,7 +915,11 @@ rt_test! {
#[test]
fn wake_while_rt_is_dropping() {
- use tokio::task;
+ use tokio::sync::Barrier;
+ use core::sync::atomic::{AtomicBool, Ordering};
+
+ let drop_triggered = Arc::new(AtomicBool::new(false));
+ let set_drop_triggered = drop_triggered.clone();
struct OnDrop<F: FnMut()>(F);
@@ -762,17 +933,21 @@ rt_test! {
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
- let rt = rt();
+ let barrier = Arc::new(Barrier::new(4));
+ let barrier1 = barrier.clone();
+ let barrier2 = barrier.clone();
+ let barrier3 = barrier.clone();
- let h1 = rt.clone();
+ let rt = rt();
rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
- let _ = rx1.await;
+ let _ = tokio::join!(rx1, barrier1.wait());
tx3.send(()).unwrap();
});
rt.spawn(async move {
+ let h1 = tokio::runtime::Handle::current();
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
@@ -785,36 +960,40 @@ rt_test! {
h1.spawn(async move {
tx1.send(()).unwrap();
});
+ // Just a sanity check that this entire thing actually happened
+ set_drop_triggered.store(true, Ordering::Relaxed);
});
- let _ = rx2.await;
+ let _ = tokio::join!(rx2, barrier2.wait());
});
rt.spawn(async move {
- let _ = rx3.await;
+ let _ = tokio::join!(rx3, barrier3.wait());
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
tx2.send(()).unwrap();
});
- // Tick the loop
- rt.block_on(async {
- task::yield_now().await;
- });
+ // Wait until every oneshot channel has been polled.
+ rt.block_on(barrier.wait());
// Drop the rt
drop(rt);
+
+ // Make sure that the spawn actually happened
+ assert!(drop_triggered.load(Ordering::Relaxed));
}
+ #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
#[test]
fn io_notify_while_shutting_down() {
- use std::net::Ipv6Addr;
+ use tokio::net::UdpSocket;
use std::sync::Arc;
for _ in 1..10 {
let runtime = rt();
runtime.block_on(async {
- let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
+ let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = socket.local_addr().unwrap();
let send_half = Arc::new(socket);
let recv_half = send_half.clone();
@@ -840,6 +1019,7 @@ rt_test! {
}
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
@@ -857,6 +1037,7 @@ rt_test! {
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout_0() {
let runtime = rt();
@@ -888,6 +1069,7 @@ rt_test! {
// See https://github.com/rust-lang/rust/issues/74875
#[test]
#[cfg(not(windows))]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;
@@ -907,6 +1089,7 @@ rt_test! {
}).join().unwrap();
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
async fn client_server(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -931,6 +1114,7 @@ rt_test! {
tx.send(()).unwrap();
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_block_on_socket() {
let rt = rt();
@@ -952,6 +1136,7 @@ rt_test! {
});
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_client_server_block_on() {
let rt = rt();
@@ -965,6 +1150,7 @@ rt_test! {
assert_err!(rx.try_recv());
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
async fn client_server_local(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -992,22 +1178,22 @@ rt_test! {
#[test]
fn coop() {
use std::task::Poll::Ready;
+ use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
- // Create a bunch of tasks
- let mut tasks = (0..1_000).map(|_| {
- tokio::spawn(async { })
- }).collect::<Vec<_>>();
+ let (send, mut recv) = mpsc::unbounded_channel();
- // Hope that all the tasks complete...
- time::sleep(Duration::from_millis(100)).await;
+ // Send a bunch of messages.
+ for _ in 0..1_000 {
+ send.send(()).unwrap();
+ }
poll_fn(|cx| {
- // At least one task should not be ready
- for task in &mut tasks {
- if Pin::new(task).poll(cx).is_pending() {
+ // At least one response should return pending.
+ for _ in 0..1_000 {
+ if recv.poll_recv(cx).is_pending() {
return Ready(());
}
}
@@ -1020,22 +1206,22 @@ rt_test! {
#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;
+ use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
- // Create a bunch of tasks
- let mut tasks = (0..1_000).map(|_| {
- tokio::spawn(async { })
- }).collect::<Vec<_>>();
+ let (send, mut recv) = mpsc::unbounded_channel();
- // Hope that all the tasks complete...
- time::sleep(Duration::from_millis(100)).await;
+ // Send a bunch of messages.
+ for _ in 0..1_000 {
+ send.send(()).unwrap();
+ }
tokio::task::unconstrained(poll_fn(|cx| {
- // All the tasks should be ready
- for task in &mut tasks {
- assert!(Pin::new(task).poll(cx).is_ready());
+ // All the responses should be ready.
+ for _ in 0..1_000 {
+ assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
}
Ready(())
@@ -1043,6 +1229,31 @@ rt_test! {
});
}
+ #[cfg(tokio_unstable)]
+ #[test]
+ fn coop_consume_budget() {
+ let rt = rt();
+
+ rt.block_on(async {
+ poll_fn(|cx| {
+ let counter = Arc::new(std::sync::Mutex::new(0));
+ let counter_clone = Arc::clone(&counter);
+ let mut worker = Box::pin(async move {
+ // Consume the budget until a yield happens
+ for _ in 0..1000 {
+ *counter.lock().unwrap() += 1;
+ task::consume_budget().await
+ }
+ });
+ // Assert that the worker was yielded and it didn't manage
+ // to finish the whole work (assuming the total budget of 128)
+ assert!(Pin::new(&mut worker).poll(cx).is_pending());
+ assert!(*counter_clone.lock().unwrap() < 1000);
+ std::task::Poll::Ready(())
+ }).await;
+ });
+ }
+
// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
@@ -1060,7 +1271,7 @@ rt_test! {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
let mut tasks = vec![];
- // Spawn a bunch of tasks that ping ping between each other to
+ // Spawn a bunch of tasks that ping-pong between each other to
// saturate the runtime.
for _ in 0..NUM {
let (tx1, mut rx1) = mpsc::unbounded_channel();
@@ -1106,4 +1317,39 @@ rt_test! {
}
});
}
+
+ #[test]
+ #[cfg(not(target_os="wasi"))]
+ fn shutdown_concurrent_spawn() {
+ const NUM_TASKS: usize = 10_000;
+ for _ in 0..5 {
+ let (tx, rx) = std::sync::mpsc::channel();
+ let rt = rt();
+
+ let mut txs = vec![];
+
+ for _ in 0..NUM_TASKS {
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ txs.push(tx);
+ rt.spawn(async move {
+ rx.await.unwrap();
+ });
+ }
+
+ // Prime the tasks
+ rt.block_on(async { tokio::task::yield_now().await });
+
+ let th = std::thread::spawn(move || {
+ tx.send(()).unwrap();
+ for tx in txs.drain(..) {
+ let _ = tx.send(());
+ }
+ });
+
+ rx.recv().unwrap();
+ drop(rt);
+
+ th.join().unwrap();
+ }
+ }
}