#![allow(clippy::needless_range_loop)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] // Tests to run on both current-thread & multi-thread runtime variants. macro_rules! rt_test { ($($t:tt)*) => { mod current_thread_scheduler { $($t)* #[cfg(not(target_os="wasi"))] const NUM_WORKERS: usize = 1; fn rt() -> Arc { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .into() } } #[cfg(not(tokio_wasi))] // Wasi doesn't support threads mod threaded_scheduler_4_threads { $($t)* const NUM_WORKERS: usize = 4; fn rt() -> Arc { tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() .build() .unwrap() .into() } } #[cfg(not(tokio_wasi))] // Wasi doesn't support threads mod threaded_scheduler_1_thread { $($t)* const NUM_WORKERS: usize = 1; fn rt() -> Arc { tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .enable_all() .build() .unwrap() .into() } } } } #[test] fn send_sync_bound() { use tokio::runtime::Runtime; fn is_send() {} is_send::(); } rt_test! { #[cfg(not(target_os="wasi"))] use tokio::net::{TcpListener, TcpStream}; #[cfg(not(target_os="wasi"))] use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Runtime; use tokio::sync::oneshot; use tokio::{task, time}; #[cfg(not(target_os="wasi"))] use tokio_test::assert_err; use tokio_test::assert_ok; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; #[cfg(not(target_os="wasi"))] use std::sync::mpsc; use std::sync::Arc; use std::task::{Context, Poll}; #[cfg(not(target_os="wasi"))] use std::thread; use std::time::{Duration, Instant}; #[test] fn block_on_sync() { let rt = rt(); let mut win = false; rt.block_on(async { win = true; }); assert!(win); } #[cfg(not(target_os="wasi"))] #[test] fn block_on_async() { let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); thread::spawn(move || { thread::sleep(Duration::from_millis(50)); tx.send("ZOMG").unwrap(); }); assert_ok!(rx.await) }); assert_eq!(out, "ZOMG"); } #[test] fn spawn_one_bg() { let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { tx.send("ZOMG").unwrap(); }); assert_ok!(rx.await) }); assert_eq!(out, "ZOMG"); } #[test] fn spawn_one_join() { let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { tx.send("ZOMG").unwrap(); "DONE" }); let msg = assert_ok!(rx.await); let out = assert_ok!(handle.await); assert_eq!(out, "DONE"); msg }); assert_eq!(out, "ZOMG"); } #[test] fn spawn_two() { let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); tokio::spawn(async move { assert_ok!(tx1.send("ZOMG")); }); tokio::spawn(async move { let msg = assert_ok!(rx1.await); assert_ok!(tx2.send(msg)); }); assert_ok!(rx2.await) }); assert_eq!(out, "ZOMG"); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_many_from_block_on() { use tokio::sync::mpsc; const ITER: usize = 200; let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); let mut txs = (0..ITER) .map(|i| { let (tx, rx) = oneshot::channel(); let done_tx = done_tx.clone(); tokio::spawn(async move { let msg = assert_ok!(rx.await); assert_eq!(i, msg); assert_ok!(done_tx.send(msg)); }); tx }) .collect::>(); drop(done_tx); thread::spawn(move || { for (i, tx) in txs.drain(..).enumerate() { assert_ok!(tx.send(i)); } }); let mut out = vec![]; while let Some(i) = done_rx.recv().await { out.push(i); } out.sort_unstable(); out }); assert_eq!(ITER, out.len()); for i in 0..ITER { assert_eq!(i, out[i]); } } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_many_from_task() { use tokio::sync::mpsc; const ITER: usize = 500; let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); let mut txs = (0..ITER) .map(|i| { let (tx, rx) = oneshot::channel(); let done_tx = done_tx.clone(); tokio::spawn(async move { let msg = assert_ok!(rx.await); assert_eq!(i, msg); assert_ok!(done_tx.send(msg)); }); tx }) .collect::>(); drop(done_tx); thread::spawn(move || { for (i, tx) in txs.drain(..).enumerate() { assert_ok!(tx.send(i)); } }); let mut out = vec![]; while let Some(i) = done_rx.recv().await { out.push(i); } out.sort_unstable(); out }).await.unwrap() }); assert_eq!(ITER, out.len()); for i in 0..ITER { assert_eq!(i, out[i]); } } #[test] fn spawn_one_from_block_on_called_on_handle() { let rt = rt(); let (tx, rx) = oneshot::channel(); #[allow(clippy::async_yields_async)] let handle = rt.handle().block_on(async { tokio::spawn(async move { tx.send("ZOMG").unwrap(); "DONE" }) }); let out = rt.block_on(async { let msg = assert_ok!(rx.await); let out = assert_ok!(handle.await); assert_eq!(out, "DONE"); msg }); assert_eq!(out, "ZOMG"); } #[test] fn spawn_await_chain() { let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { assert_ok!(tokio::spawn(async { "hello" }).await) }).await) }); assert_eq!(out, "hello"); } #[test] fn outstanding_tasks_dropped() { let rt = rt(); let cnt = Arc::new(()); rt.block_on(async { let cnt = cnt.clone(); tokio::spawn(poll_fn(move |_| { assert_eq!(2, Arc::strong_count(&cnt)); Poll::<()>::Pending })); }); assert_eq!(2, Arc::strong_count(&cnt)); drop(rt); assert_eq!(1, Arc::strong_count(&cnt)); } #[test] #[should_panic] fn nested_rt() { let rt1 = rt(); let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { let rt1 = rt(); let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn complete_block_on_under_load() { let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); // Spin hard tokio::spawn(async { loop { yield_once().await; } }); thread::spawn(move || { thread::sleep(Duration::from_millis(50)); assert_ok!(tx.send(())); }); assert_ok!(rx.await); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn complete_task_under_load() { let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); // Spin hard tokio::spawn(async { loop { yield_once().await; } }); thread::spawn(move || { thread::sleep(Duration::from_millis(50)); assert_ok!(tx1.send(())); }); tokio::spawn(async move { assert_ok!(rx1.await); assert_ok!(tx2.send(())); }); assert_ok!(rx2.await); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_other_thread_idle() { let rt = rt(); let handle = rt.clone(); let (tx, rx) = oneshot::channel(); thread::spawn(move || { thread::sleep(Duration::from_millis(50)); handle.spawn(async move { assert_ok!(tx.send(())); }); }); rt.block_on(async move { assert_ok!(rx.await); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_other_thread_under_load() { let rt = rt(); let handle = rt.clone(); let (tx, rx) = oneshot::channel(); thread::spawn(move || { handle.spawn(async move { assert_ok!(tx.send(())); }); }); rt.block_on(async move { // Spin hard tokio::spawn(async { loop { yield_once().await; } }); assert_ok!(rx.await); }); } #[test] fn sleep_at_root() { let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); rt.block_on(async move { time::sleep(dur).await; }); assert!(now.elapsed() >= dur); } #[test] fn sleep_in_spawn() { let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); rt.block_on(async move { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { time::sleep(dur).await; assert_ok!(tx.send(())); }); assert_ok!(rx.await); }); assert!(now.elapsed() >= dur); } #[cfg(not(target_os="wasi"))] // Wasi does not support bind #[test] fn block_on_socket() { let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { let _ = listener.accept().await; tx.send(()).unwrap(); }); TcpStream::connect(&addr).await.unwrap(); rx.await.unwrap(); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_from_blocking() { let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { tokio::spawn(async move { "hello" }) }).await); assert_ok!(inner.await) }); assert_eq!(out, "hello") } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn spawn_blocking_from_blocking() { let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { tokio::task::spawn_blocking(|| "hello") }).await); assert_ok!(inner.await) }); assert_eq!(out, "hello") } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn sleep_from_blocking() { let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { let now = std::time::Instant::now(); let dur = Duration::from_millis(1); // use the futures' block_on fn to make sure we aren't setting // any Tokio context futures::executor::block_on(async { tokio::time::sleep(dur).await; }); assert!(now.elapsed() >= dur); }).await); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support bind #[test] fn socket_from_blocking() { let rt = rt(); rt.block_on(async move { let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(listener.local_addr()); let peer = tokio::task::spawn_blocking(move || { // use the futures' block_on fn to make sure we aren't setting // any Tokio context futures::executor::block_on(async { assert_ok!(TcpStream::connect(addr).await); }); }); // Wait for the client to connect let _ = assert_ok!(listener.accept().await); assert_ok!(peer.await); }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn always_active_parker() { // This test it to show that we will always have // an active parker even if we call block_on concurrently let rt = rt(); let rt2 = rt.clone(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let jh1 = thread::spawn(move || { rt.block_on(async move { rx2.await.unwrap(); time::sleep(Duration::from_millis(5)).await; tx1.send(()).unwrap(); }); }); let jh2 = thread::spawn(move || { rt2.block_on(async move { tx2.send(()).unwrap(); time::sleep(Duration::from_millis(5)).await; rx1.await.unwrap(); time::sleep(Duration::from_millis(5)).await; }); }); jh1.join().unwrap(); jh2.join().unwrap(); } #[test] // IOCP requires setting the "max thread" concurrency value. The sane, // default, is to set this to the number of cores. Threads that poll I/O // become associated with the IOCP handle. Once those threads sleep for any // reason (mutex), they yield their ownership. // // This test hits an edge case on windows where more threads than cores are // created, none of those threads ever yield due to being at capacity, so // IOCP gets "starved". // // For now, this is a very edge case that is probably not a real production // concern. There also isn't a great/obvious solution to take. For now, the // test is disabled. #[cfg(not(windows))] #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads fn io_driver_called_when_under_load() { let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { rt.spawn(async { loop { // Don't use Tokio's `yield_now()` to avoid special defer // logic. futures::future::poll_fn::<(), _>(|cx| { cx.waker().wake_by_ref(); std::task::Poll::Pending }).await; } }); } // Do some I/O work rt.block_on(async { let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(listener.local_addr()); let srv = tokio::spawn(async move { let (mut stream, _) = assert_ok!(listener.accept().await); assert_ok!(stream.write_all(b"hello world").await); }); let cli = tokio::spawn(async move { let mut stream = assert_ok!(TcpStream::connect(addr).await); let mut dst = vec![0; 11]; assert_ok!(stream.read_exact(&mut dst).await); assert_eq!(dst, b"hello world"); }); assert_ok!(srv.await); assert_ok!(cli.await); }); } /// Tests that yielded tasks are not scheduled until **after** resource /// drivers are polled. /// /// The OS does not guarantee when I/O events are delivered, so there may be /// more yields than anticipated. This makes the test slightly flaky. To /// help avoid flakiness, we run the test 10 times and only fail it after /// 10 failures in a row. /// /// Note that if the test fails by panicking rather than by returning false, /// then we fail it immediately. That kind of failure should not happen /// spuriously. #[test] #[cfg(not(target_os="wasi"))] fn yield_defers_until_park() { for _ in 0..10 { if yield_defers_until_park_inner() { // test passed return; } // Wait a bit and run the test again. std::thread::sleep(std::time::Duration::from_secs(2)); } panic!("yield_defers_until_park is failing consistently"); } /// Implementation of `yield_defers_until_park` test. Returns `true` if the /// test passed. #[cfg(not(target_os="wasi"))] fn yield_defers_until_park_inner() -> bool { use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use std::sync::Barrier; let rt = rt(); let flag = Arc::new(AtomicBool::new(false)); let barrier = Arc::new(Barrier::new(NUM_WORKERS)); rt.block_on(async { // Make sure other workers cannot steal tasks #[allow(clippy::reversed_empty_ranges)] for _ in 0..(NUM_WORKERS-1) { let flag = flag.clone(); let barrier = barrier.clone(); tokio::spawn(async move { barrier.wait(); while !flag.load(SeqCst) { std::thread::sleep(std::time::Duration::from_millis(1)); } }); } barrier.wait(); let (fail_test, fail_test_recv) = oneshot::channel::<()>(); let jh = tokio::spawn(async move { // Create a TCP litener let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::join!( async { // Done in a blocking manner intentionally. let _socket = std::net::TcpStream::connect(addr).unwrap(); // Yield until connected let mut cnt = 0; while !flag.load(SeqCst){ tokio::task::yield_now().await; cnt += 1; if cnt >= 10 { // yielded too many times; report failure and // sleep forever so that the `fail_test` branch // of the `select!` below triggers. let _ = fail_test.send(()); futures::future::pending::<()>().await; break; } } }, async { let _ = listener.accept().await.unwrap(); flag.store(true, SeqCst); } ); }); // Wait until the spawned task completes or fails. If no message is // sent on `fail_test`, then the test succeeds. Otherwise, it fails. let success = fail_test_recv.await.is_err(); if success { // Check for panics in spawned task. jh.abort(); jh.await.unwrap(); } success }) } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn client_server_block_on() { let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); } #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")] #[test] fn panic_in_task() { let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option>); impl Future for Boom { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { panic!(); } } impl Drop for Boom { fn drop(&mut self) { assert!(std::thread::panicking()); self.0.take().unwrap().send(()).unwrap(); } } rt.spawn(Boom(Some(tx))); assert_ok!(rt.block_on(rx)); } #[test] #[should_panic] #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] fn panic_in_block_on() { let rt = rt(); rt.block_on(async { panic!() }); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads async fn yield_once() { let mut yielded = false; poll_fn(|cx| { if yielded { Poll::Ready(()) } else { yielded = true; cx.waker().wake_by_ref(); Poll::Pending } }) .await } #[test] fn enter_and_spawn() { let rt = rt(); let handle = { let _enter = rt.enter(); tokio::spawn(async {}) }; assert_ok!(rt.block_on(handle)); } #[test] fn eagerly_drops_futures_on_shutdown() { use std::sync::mpsc; struct Never { drop_tx: mpsc::Sender<()>, } impl Future for Never { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } impl Drop for Never { fn drop(&mut self) { self.drop_tx.send(()).unwrap(); } } let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); rt.block_on(async move { tokio::spawn(async move { assert_ok!(run_tx.send(())); Never { drop_tx }.await }); assert_ok!(run_rx.await); }); drop(rt); assert_ok!(drop_rx.recv()); } #[test] fn wake_while_rt_is_dropping() { use tokio::sync::Barrier; use core::sync::atomic::{AtomicBool, Ordering}; let drop_triggered = Arc::new(AtomicBool::new(false)); let set_drop_triggered = drop_triggered.clone(); struct OnDrop(F); impl Drop for OnDrop { fn drop(&mut self) { (self.0)() } } let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); let barrier = Arc::new(Barrier::new(4)); let barrier1 = barrier.clone(); let barrier2 = barrier.clone(); let barrier3 = barrier.clone(); let rt = rt(); rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. let _ = tokio::join!(rx1, barrier1.wait()); tx3.send(()).unwrap(); }); rt.spawn(async move { let h1 = tokio::runtime::Handle::current(); // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. // // Importantly, the oneshot 1 has a waker already stored, so // the eventual drop here will try to re-schedule again. let mut opt_tx1 = Some(tx1); let _d = OnDrop(move || { let tx1 = opt_tx1.take().unwrap(); h1.spawn(async move { tx1.send(()).unwrap(); }); // Just a sanity check that this entire thing actually happened set_drop_triggered.store(true, Ordering::Relaxed); }); let _ = tokio::join!(rx2, barrier2.wait()); }); rt.spawn(async move { let _ = tokio::join!(rx3, barrier3.wait()); // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. tx2.send(()).unwrap(); }); // Wait until every oneshot channel has been polled. rt.block_on(barrier.wait()); // Drop the rt drop(rt); // Make sure that the spawn actually happened assert!(drop_triggered.load(Ordering::Relaxed)); } #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind() #[test] fn io_notify_while_shutting_down() { use tokio::net::UdpSocket; use std::sync::Arc; for _ in 1..10 { let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let addr = socket.local_addr().unwrap(); let send_half = Arc::new(socket); let recv_half = send_half.clone(); tokio::spawn(async move { let mut buf = [0]; loop { recv_half.recv_from(&mut buf).await.unwrap(); std::thread::sleep(Duration::from_millis(2)); } }); tokio::spawn(async move { let buf = [0]; loop { send_half.send_to(&buf, &addr).await.unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; } }); tokio::time::sleep(Duration::from_millis(5)).await; }); } } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { tx.send(()).unwrap(); thread::sleep(Duration::from_secs(10_000)); }); rx.await.unwrap(); }); Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } #[cfg(not(target_os="wasi"))] // Wasi does not support threads #[test] fn shutdown_timeout_0() { let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { thread::sleep(Duration::from_secs(10_000)); }); }); let now = Instant::now(); Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0)); assert!(now.elapsed().as_secs() < 1); } #[test] fn shutdown_wakeup_time() { let runtime = rt(); runtime.block_on(async move { tokio::time::sleep(std::time::Duration::from_millis(100)).await; }); Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); } // This test is currently ignored on Windows because of a // rust-lang issue in thread local storage destructors. // See https://github.com/rust-lang/rust/issues/74875 #[test] #[cfg(not(windows))] #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread; thread_local!( static R: RefCell> = RefCell::new(None); ); thread::spawn(|| { R.with(|cell| { let rt = rt(); let rt = Arc::try_unwrap(rt).unwrap(); *cell.borrow_mut() = Some(rt); }); let _rt = rt(); }).join().unwrap(); } #[cfg(not(target_os="wasi"))] // Wasi does not support bind async fn client_server(tx: mpsc::Sender<()>) { let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); // Spawn the server tokio::spawn(async move { // Accept a socket let (mut socket, _) = server.accept().await.unwrap(); // Write some data socket.write_all(b"hello").await.unwrap(); }); let mut client = TcpStream::connect(&addr).await.unwrap(); let mut buf = vec![]; client.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello"); tx.send(()).unwrap(); } #[cfg(not(tokio_wasi))] // Wasi does not support bind #[test] fn local_set_block_on_socket() { let rt = rt(); let local = task::LocalSet::new(); local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); task::spawn_local(async move { let _ = listener.accept().await; tx.send(()).unwrap(); }); TcpStream::connect(&addr).await.unwrap(); rx.await.unwrap(); }); } #[cfg(not(tokio_wasi))] // Wasi does not support bind #[test] fn local_set_client_server_block_on() { let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); } #[cfg(not(tokio_wasi))] // Wasi does not support bind async fn client_server_local(tx: mpsc::Sender<()>) { let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); // Spawn the server task::spawn_local(async move { // Accept a socket let (mut socket, _) = server.accept().await.unwrap(); // Write some data socket.write_all(b"hello").await.unwrap(); }); let mut client = TcpStream::connect(&addr).await.unwrap(); let mut buf = vec![]; client.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello"); tx.send(()).unwrap(); } #[test] fn coop() { use std::task::Poll::Ready; use tokio::sync::mpsc; let rt = rt(); rt.block_on(async { let (send, mut recv) = mpsc::unbounded_channel(); // Send a bunch of messages. for _ in 0..1_000 { send.send(()).unwrap(); } poll_fn(|cx| { // At least one response should return pending. for _ in 0..1_000 { if recv.poll_recv(cx).is_pending() { return Ready(()); } } panic!("did not yield"); }).await; }); } #[test] fn coop_unconstrained() { use std::task::Poll::Ready; use tokio::sync::mpsc; let rt = rt(); rt.block_on(async { let (send, mut recv) = mpsc::unbounded_channel(); // Send a bunch of messages. for _ in 0..1_000 { send.send(()).unwrap(); } tokio::task::unconstrained(poll_fn(|cx| { // All the responses should be ready. for _ in 0..1_000 { assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(()))); } Ready(()) })).await; }); } #[cfg(tokio_unstable)] #[test] fn coop_consume_budget() { let rt = rt(); rt.block_on(async { poll_fn(|cx| { let counter = Arc::new(std::sync::Mutex::new(0)); let counter_clone = Arc::clone(&counter); let mut worker = Box::pin(async move { // Consume the budget until a yield happens for _ in 0..1000 { *counter.lock().unwrap() += 1; task::consume_budget().await } }); // Assert that the worker was yielded and it didn't manage // to finish the whole work (assuming the total budget of 128) assert!(Pin::new(&mut worker).poll(cx).is_pending()); assert!(*counter_clone.lock().unwrap() < 1000); std::task::Poll::Ready(()) }).await; }); } // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test] fn ping_pong_saturation() { use std::sync::atomic::{Ordering, AtomicBool}; use tokio::sync::mpsc; const NUM: usize = 100; let rt = rt(); let running = Arc::new(AtomicBool::new(true)); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); let mut tasks = vec![]; // Spawn a bunch of tasks that ping-pong between each other to // saturate the runtime. for _ in 0..NUM { let (tx1, mut rx1) = mpsc::unbounded_channel(); let (tx2, mut rx2) = mpsc::unbounded_channel(); let spawned_tx = spawned_tx.clone(); let running = running.clone(); tasks.push(task::spawn(async move { spawned_tx.send(()).unwrap(); while running.load(Ordering::Relaxed) { tx1.send(()).unwrap(); rx2.recv().await.unwrap(); } // Close the channel and wait for the other task to exit. drop(tx1); assert!(rx2.recv().await.is_none()); })); tasks.push(task::spawn(async move { while rx1.recv().await.is_some() { tx2.send(()).unwrap(); } })); } for _ in 0..NUM { spawned_rx.recv().await.unwrap(); } // spawn another task and wait for it to complete let handle = task::spawn(async { for _ in 0..5 { // Yielding forces it back into the local queue. task::yield_now().await; } }); handle.await.unwrap(); running.store(false, Ordering::Relaxed); for t in tasks { t.await.unwrap(); } }); } #[test] #[cfg(not(target_os="wasi"))] fn shutdown_concurrent_spawn() { const NUM_TASKS: usize = 10_000; for _ in 0..5 { let (tx, rx) = std::sync::mpsc::channel(); let rt = rt(); let mut txs = vec![]; for _ in 0..NUM_TASKS { let (tx, rx) = tokio::sync::oneshot::channel(); txs.push(tx); rt.spawn(async move { rx.await.unwrap(); }); } // Prime the tasks rt.block_on(async { tokio::task::yield_now().await }); let th = std::thread::spawn(move || { tx.send(()).unwrap(); for tx in txs.drain(..) { let _ = tx.send(()); } }); rx.recv().unwrap(); drop(rt); th.join().unwrap(); } } }