diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/tests | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/tests')
-rw-r--r-- | vendor/tokio/src/runtime/tests/inject.rs | 54 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs | 82 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_blocking.rs | 71 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs | 142 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_join_set.rs | 82 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_pool.rs | 157 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_queue.rs | 95 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/loom_yield.rs | 37 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/mod.rs | 64 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/queue.rs | 171 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/task.rs | 269 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/tests/task_combinations.rs | 153 |
12 files changed, 1074 insertions, 303 deletions
diff --git a/vendor/tokio/src/runtime/tests/inject.rs b/vendor/tokio/src/runtime/tests/inject.rs new file mode 100644 index 000000000..ccead5e02 --- /dev/null +++ b/vendor/tokio/src/runtime/tests/inject.rs @@ -0,0 +1,54 @@ +use crate::runtime::scheduler::inject; + +#[test] +fn push_and_pop() { + const N: usize = 2; + + let (inject, mut synced) = inject::Shared::new(); + + for i in 0..N { + assert_eq!(inject.len(), i); + let (task, _) = super::unowned(async {}); + unsafe { inject.push(&mut synced, task) }; + } + + for i in 0..N { + assert_eq!(inject.len(), N - i); + assert!(unsafe { inject.pop(&mut synced) }.is_some()); + } + + println!("--------------"); + + assert!(unsafe { inject.pop(&mut synced) }.is_none()); +} + +#[test] +fn push_batch_and_pop() { + let (inject, mut inject_synced) = inject::Shared::new(); + + unsafe { + inject.push_batch( + &mut inject_synced, + (0..10).map(|_| super::unowned(async {}).0), + ); + + assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count()); + assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count()); + assert_eq!(0, inject.pop_n(&mut inject_synced, 5).count()); + } +} + +#[test] +fn pop_n_drains_on_drop() { + let (inject, mut inject_synced) = inject::Shared::new(); + + unsafe { + inject.push_batch( + &mut inject_synced, + (0..10).map(|_| super::unowned(async {}).0), + ); + let _ = inject.pop_n(&mut inject_synced, 10); + + assert_eq!(inject.len(), 0); + } +} diff --git a/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs b/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs deleted file mode 100644 index e6221d3b1..000000000 --- a/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Arc; -use crate::loom::thread; -use crate::runtime::{Builder, Runtime}; -use crate::sync::oneshot::{self, Receiver}; -use crate::task; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::Ordering::{Acquire, Release}; -use std::task::{Context, Poll}; - -fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) { - let (tx, rx) = oneshot::channel(); - let num_polls = Arc::new(AtomicUsize::new(0)); - rt.spawn(async move { - for _ in 0..12 { - task::yield_now().await; - } - tx.send(()).unwrap(); - }); - - rt.block_on(async { - BlockedFuture { - rx, - num_polls: num_polls.clone(), - } - .await; - }); - - let polls = num_polls.load(Acquire); - assert!(polls <= at_most_polls); -} - -#[test] -fn block_on_num_polls() { - loom::model(|| { - // we expect at most 3 number of polls because there are - // three points at which we poll the future. At any of these - // points it can be ready: - // - // - when we fail to steal the parker and we block on a - // notification that it is available. - // - // - when we steal the parker and we schedule the future - // - // - when the future is woken up and we have ran the max - // number of tasks for the current tick or there are no - // more tasks to run. - // - let at_most = 3; - - let rt1 = Arc::new(Builder::new_current_thread().build().unwrap()); - let rt2 = rt1.clone(); - let rt3 = rt1.clone(); - - let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most)); - let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most)); - let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most)); - - th1.join().unwrap(); - th2.join().unwrap(); - th3.join().unwrap(); - }); -} - -struct BlockedFuture { - rx: Receiver<()>, - num_polls: Arc<AtomicUsize>, -} - -impl Future for BlockedFuture { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - self.num_polls.fetch_add(1, Release); - - match Pin::new(&mut self.rx).poll(cx) { - Poll::Pending => Poll::Pending, - _ => Poll::Ready(()), - } - } -} diff --git a/vendor/tokio/src/runtime/tests/loom_blocking.rs b/vendor/tokio/src/runtime/tests/loom_blocking.rs index 8fb54c565..5c4aeae39 100644 --- a/vendor/tokio/src/runtime/tests/loom_blocking.rs +++ b/vendor/tokio/src/runtime/tests/loom_blocking.rs @@ -23,6 +23,77 @@ fn blocking_shutdown() { }); } +#[test] +fn spawn_mandatory_blocking_should_always_run() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + + let (tx, rx) = loom_oneshot::channel(); + let _enter = rt.enter(); + runtime::spawn_blocking(|| {}); + runtime::spawn_mandatory_blocking(move || { + let _ = tx.send(()); + }) + .unwrap(); + + drop(rt); + + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + }); +} + +#[test] +fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + let handle = rt.handle().clone(); + + // Drop the runtime in a different thread + { + loom::thread::spawn(move || { + drop(rt); + }); + } + + let _enter = handle.enter(); + let (tx, rx) = loom_oneshot::channel(); + let handle = runtime::spawn_mandatory_blocking(move || { + let _ = tx.send(()); + }); + + // handle.is_some() means that `spawn_mandatory_blocking` + // promised us to run the blocking task + if handle.is_some() { + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + } + }); +} + +#[test] +fn spawn_blocking_when_paused() { + use std::time::Duration; + loom::model(|| { + let rt = crate::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + let handle = rt.handle(); + let _enter = handle.enter(); + let a = crate::task::spawn_blocking(|| {}); + let b = crate::task::spawn_blocking(|| {}); + rt.block_on(crate::time::timeout(Duration::from_millis(1), async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + })) + .expect("timeout should not trigger"); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs b/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs new file mode 100644 index 000000000..a772603f7 --- /dev/null +++ b/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs @@ -0,0 +1,142 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::loom::thread; +use crate::runtime::{Builder, Runtime}; +use crate::sync::oneshot::{self, Receiver}; +use crate::task; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::task::{Context, Poll}; + +fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) { + let (tx, rx) = oneshot::channel(); + let num_polls = Arc::new(AtomicUsize::new(0)); + rt.spawn(async move { + for _ in 0..12 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + rt.block_on(async { + BlockedFuture { + rx, + num_polls: num_polls.clone(), + } + .await; + }); + + let polls = num_polls.load(Acquire); + assert!(polls <= at_most_polls); +} + +#[test] +fn block_on_num_polls() { + loom::model(|| { + // we expect at most 4 number of polls because there are three points at + // which we poll the future and an opportunity for a false-positive.. At + // any of these points it can be ready: + // + // - when we fail to steal the parker and we block on a notification + // that it is available. + // + // - when we steal the parker and we schedule the future + // + // - when the future is woken up and we have ran the max number of tasks + // for the current tick or there are no more tasks to run. + // + // - a thread is notified that the parker is available but a third + // thread acquires it before the notified thread can. + // + let at_most = 4; + + let rt1 = Arc::new(Builder::new_current_thread().build().unwrap()); + let rt2 = rt1.clone(); + let rt3 = rt1.clone(); + + let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most)); + let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most)); + let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most)); + + th1.join().unwrap(); + th2.join().unwrap(); + th3.join().unwrap(); + }); +} + +#[test] +fn assert_no_unnecessary_polls() { + loom::model(|| { + // // After we poll outer future, woken should reset to false + let rt = Builder::new_current_thread().build().unwrap(); + let (tx, rx) = oneshot::channel(); + let pending_cnt = Arc::new(AtomicUsize::new(0)); + + rt.spawn(async move { + for _ in 0..24 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + let pending_cnt_clone = pending_cnt.clone(); + rt.block_on(async move { + // use task::yield_now() to ensure woken set to true + // ResetFuture will be polled at most once + // Here comes two cases + // 1. recv no message from channel, ResetFuture will be polled + // but get Pending and we record ResetFuture.pending_cnt ++. + // Then when message arrive, ResetFuture returns Ready. So we + // expect ResetFuture.pending_cnt = 1 + // 2. recv message from channel, ResetFuture returns Ready immediately. + // We expect ResetFuture.pending_cnt = 0 + task::yield_now().await; + ResetFuture { + rx, + pending_cnt: pending_cnt_clone, + } + .await; + }); + + let pending_cnt = pending_cnt.load(Acquire); + assert!(pending_cnt <= 1); + }); +} + +struct BlockedFuture { + rx: Receiver<()>, + num_polls: Arc<AtomicUsize>, +} + +impl Future for BlockedFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.num_polls.fetch_add(1, Release); + + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => Poll::Pending, + _ => Poll::Ready(()), + } + } +} + +struct ResetFuture { + rx: Receiver<()>, + pending_cnt: Arc<AtomicUsize>, +} + +impl Future for ResetFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => { + self.pending_cnt.fetch_add(1, Release); + Poll::Pending + } + _ => Poll::Ready(()), + } + } +} diff --git a/vendor/tokio/src/runtime/tests/loom_join_set.rs b/vendor/tokio/src/runtime/tests/loom_join_set.rs new file mode 100644 index 000000000..bd343876a --- /dev/null +++ b/vendor/tokio/src/runtime/tests/loom_join_set.rs @@ -0,0 +1,82 @@ +use crate::runtime::Builder; +use crate::task::JoinSet; + +#[test] +fn test_join_set() { + loom::model(|| { + let rt = Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut set = JoinSet::new(); + + rt.block_on(async { + assert_eq!(set.len(), 0); + set.spawn(async { () }); + assert_eq!(set.len(), 1); + set.spawn(async { () }); + assert_eq!(set.len(), 2); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 1); + set.spawn(async { () }); + assert_eq!(set.len(), 2); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 1); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 0); + set.spawn(async { () }); + assert_eq!(set.len(), 1); + }); + + drop(set); + drop(rt); + }); +} + +#[test] +fn abort_all_during_completion() { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; + + // These booleans assert that at least one execution had the task complete first, and that at + // least one execution had the task be cancelled before it completed. + let complete_happened = Arc::new(AtomicBool::new(false)); + let cancel_happened = Arc::new(AtomicBool::new(false)); + + { + let complete_happened = complete_happened.clone(); + let cancel_happened = cancel_happened.clone(); + loom::model(move || { + let rt = Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + let mut set = JoinSet::new(); + + rt.block_on(async { + set.spawn(async { () }); + set.abort_all(); + + match set.join_next().await { + Some(Ok(())) => complete_happened.store(true, SeqCst), + Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst), + Some(Err(err)) => panic!("fail: {}", err), + None => { + unreachable!("Aborting the task does not remove it from the JoinSet.") + } + } + + assert!(matches!(set.join_next().await, None)); + }); + + drop(set); + drop(rt); + }); + } + + assert!(complete_happened.load(SeqCst)); + assert!(cancel_happened.load(SeqCst)); +} diff --git a/vendor/tokio/src/runtime/tests/loom_pool.rs b/vendor/tokio/src/runtime/tests/loom_pool.rs index 06ad6412f..fb42e1eb4 100644 --- a/vendor/tokio/src/runtime/tests/loom_pool.rs +++ b/vendor/tokio/src/runtime/tests/loom_pool.rs @@ -11,7 +11,7 @@ use crate::{spawn, task}; use tokio_test::assert_ok; use loom::sync::atomic::{AtomicBool, AtomicUsize}; -use loom::sync::{Arc, Mutex}; +use loom::sync::Arc; use pin_project_lite::pin_project; use std::future::Future; @@ -19,6 +19,57 @@ use std::pin::Pin; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::task::{Context, Poll}; +mod atomic_take { + use loom::sync::atomic::AtomicBool; + use std::mem::MaybeUninit; + use std::sync::atomic::Ordering::SeqCst; + + pub(super) struct AtomicTake<T> { + inner: MaybeUninit<T>, + taken: AtomicBool, + } + + impl<T> AtomicTake<T> { + pub(super) fn new(value: T) -> Self { + Self { + inner: MaybeUninit::new(value), + taken: AtomicBool::new(false), + } + } + + pub(super) fn take(&self) -> Option<T> { + // safety: Only one thread will see the boolean change from false + // to true, so that thread is able to take the value. + match self.taken.fetch_or(true, SeqCst) { + false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) }, + true => None, + } + } + } + + impl<T> Drop for AtomicTake<T> { + fn drop(&mut self) { + drop(self.take()); + } + } +} + +#[derive(Clone)] +struct AtomicOneshot<T> { + value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>, +} +impl<T> AtomicOneshot<T> { + fn new(sender: oneshot::Sender<T>) -> Self { + Self { + value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)), + } + } + + fn assert_send(&self, value: T) { + self.value.take().unwrap().send(value); + } +} + /// Tests are divided into groups to make the runs faster on CI. mod group_a { use super::*; @@ -52,7 +103,7 @@ mod group_a { let c1 = Arc::new(AtomicUsize::new(0)); let (tx, rx) = oneshot::channel(); - let tx1 = Arc::new(Mutex::new(Some(tx))); + let tx1 = AtomicOneshot::new(tx); // Spawn a task let c2 = c1.clone(); @@ -60,7 +111,7 @@ mod group_a { pool.spawn(track(async move { spawn(track(async move { if 1 == c1.fetch_add(1, Relaxed) { - tx1.lock().unwrap().take().unwrap().send(()); + tx1.assert_send(()); } })); })); @@ -69,7 +120,7 @@ mod group_a { pool.spawn(track(async move { spawn(track(async move { if 1 == c2.fetch_add(1, Relaxed) { - tx2.lock().unwrap().take().unwrap().send(()); + tx2.assert_send(()); } })); })); @@ -119,7 +170,7 @@ mod group_b { let (block_tx, block_rx) = oneshot::channel(); let (done_tx, done_rx) = oneshot::channel(); - let done_tx = Arc::new(Mutex::new(Some(done_tx))); + let done_tx = AtomicOneshot::new(done_tx); pool.spawn(track(async move { crate::task::block_in_place(move || { @@ -136,7 +187,7 @@ mod group_b { pool.spawn(track(async move { if NUM == cnt.fetch_add(1, Relaxed) + 1 { - done_tx.lock().unwrap().take().unwrap().send(()); + done_tx.assert_send(()); } })); } @@ -159,23 +210,6 @@ mod group_b { } #[test] - fn pool_shutdown() { - loom::model(|| { - let pool = mk_pool(2); - - pool.spawn(track(async move { - gated2(true).await; - })); - - pool.spawn(track(async move { - gated2(false).await; - })); - - drop(pool); - }); - } - - #[test] fn join_output() { loom::model(|| { let rt = mk_pool(1); @@ -223,10 +257,6 @@ mod group_b { }); }); } -} - -mod group_c { - use super::*; #[test] fn shutdown_with_notification() { @@ -255,6 +285,27 @@ mod group_c { } } +mod group_c { + use super::*; + + #[test] + fn pool_shutdown() { + loom::model(|| { + let pool = mk_pool(2); + + pool.spawn(track(async move { + gated2(true).await; + })); + + pool.spawn(track(async move { + gated2(false).await; + })); + + drop(pool); + }); + } +} + mod group_d { use super::*; @@ -266,27 +317,25 @@ mod group_d { let c1 = Arc::new(AtomicUsize::new(0)); let (done_tx, done_rx) = oneshot::channel(); - let done_tx1 = Arc::new(Mutex::new(Some(done_tx))); + let done_tx1 = AtomicOneshot::new(done_tx); + let done_tx2 = done_tx1.clone(); // Spawn a task let c2 = c1.clone(); - let done_tx2 = done_tx1.clone(); pool.spawn(track(async move { - gated().await; - gated().await; + multi_gated().await; if 1 == c1.fetch_add(1, Relaxed) { - done_tx1.lock().unwrap().take().unwrap().send(()); + done_tx1.assert_send(()); } })); // Spawn a second task pool.spawn(track(async move { - gated().await; - gated().await; + multi_gated().await; if 1 == c2.fetch_add(1, Relaxed) { - done_tx2.lock().unwrap().take().unwrap().send(()); + done_tx2.assert_send(()); } })); @@ -298,14 +347,12 @@ mod group_d { fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) + // Set the intervals to avoid tuning logic + .event_interval(2) .build() .unwrap() } -fn gated() -> impl Future<Output = &'static str> { - gated2(false) -} - fn gated2(thread: bool) -> impl Future<Output = &'static str> { use loom::thread; use std::sync::Arc; @@ -343,6 +390,38 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> { }) } +async fn multi_gated() { + struct Gate { + waker: loom::future::AtomicWaker, + count: AtomicUsize, + } + + let gate = Arc::new(Gate { + waker: loom::future::AtomicWaker::new(), + count: AtomicUsize::new(0), + }); + + { + let gate = gate.clone(); + spawn(track(async move { + for i in 1..3 { + gate.count.store(i, SeqCst); + gate.waker.wake(); + } + })); + } + + poll_fn(move |cx| { + if gate.count.load(SeqCst) < 2 { + gate.waker.register_by_ref(cx.waker()); + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await; +} + fn track<T: Future>(f: T) -> Track<T> { Track { inner: f, diff --git a/vendor/tokio/src/runtime/tests/loom_queue.rs b/vendor/tokio/src/runtime/tests/loom_queue.rs index 34da7fd66..b60e039b9 100644 --- a/vendor/tokio/src/runtime/tests/loom_queue.rs +++ b/vendor/tokio/src/runtime/tests/loom_queue.rs @@ -1,20 +1,27 @@ -use crate::runtime::queue; -use crate::runtime::task::{self, Schedule, Task}; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; +use crate::runtime::tests::NoopSchedule; use loom::thread; +use std::cell::RefCell; + +fn new_stats() -> Stats { + Stats::new(&crate::runtime::WorkerMetrics::new()) +} #[test] fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); let th = thread::spawn(move || { + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -30,8 +37,8 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } if local.pop().is_some() { @@ -39,17 +46,15 @@ fn basic() { } // Push another task - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); while local.pop().is_some() { n += 1; } } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th.join().unwrap(); @@ -61,13 +66,15 @@ fn basic() { fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); let th = thread::spawn(move || { + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -81,16 +88,16 @@ fn steal_overflow() { let mut n = 0; // push a task, pop a task - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); if local.pop().is_some() { n += 1; } for _ in 0..6 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } n += th.join().unwrap(); @@ -99,9 +106,7 @@ fn steal_overflow() { n += 1; } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); assert_eq!(7, n); }); @@ -111,10 +116,11 @@ fn steal_overflow() { fn multi_stealer() { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal<Runtime>) -> usize { + fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize { + let mut stats = new_stats(); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local).is_none() { + if steal.steal_into(&mut local, &mut stats).is_none() { return 0; } @@ -129,12 +135,13 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); // Push work for _ in 0..NUM_TASKS { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } let th1 = { @@ -150,9 +157,7 @@ fn multi_stealer() { n += 1; } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th1.join().unwrap(); n += th2.join().unwrap(); @@ -164,23 +169,25 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { + let mut stats = new_stats(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); // Load up some tasks for _ in 0..4 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - l1.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + l1.push_back_or_overflow(task, &inject, &mut stats); - let (task, _) = super::joinable::<_, Runtime>(async {}); - l2.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + l2.push_back_or_overflow(task, &inject, &mut stats); } // Spawn a task to steal from **our** queue let th = thread::spawn(move || { + let mut stats = new_stats(); let (_, mut local) = queue::local(); - s1.steal_into(&mut local); + s1.steal_into(&mut local, &mut stats); while local.pop().is_some() {} }); @@ -188,29 +195,11 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1); + s2.steal_into(&mut l1, &mut stats); th.join().unwrap(); while l1.pop().is_some() {} while l2.pop().is_some() {} - while inject.pop().is_some() {} }); } - -struct Runtime; - -impl Schedule for Runtime { - fn bind(task: Task<Self>) -> Runtime { - std::mem::forget(task); - Runtime - } - - fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { - None - } - - fn schedule(&self, _task: task::Notified<Self>) { - unreachable!(); - } -} diff --git a/vendor/tokio/src/runtime/tests/loom_yield.rs b/vendor/tokio/src/runtime/tests/loom_yield.rs new file mode 100644 index 000000000..ba506e5a4 --- /dev/null +++ b/vendor/tokio/src/runtime/tests/loom_yield.rs @@ -0,0 +1,37 @@ +use crate::runtime::park; +use crate::runtime::tests::loom_oneshot as oneshot; +use crate::runtime::{self, Runtime}; + +#[test] +fn yield_calls_park_before_scheduling_again() { + // Don't need to check all permutations + let mut loom = loom::model::Builder::default(); + loom.max_permutations = Some(1); + loom.check(|| { + let rt = mk_runtime(2); + let (tx, rx) = oneshot::channel::<()>(); + + rt.spawn(async { + let tid = loom::thread::current().id(); + let park_count = park::current_thread_park_count(); + + crate::task::yield_now().await; + + if tid == loom::thread::current().id() { + let new_park_count = park::current_thread_park_count(); + assert_eq!(park_count + 1, new_park_count); + } + + tx.send(()); + }); + + rx.recv(); + }); +} + +fn mk_runtime(num_threads: usize) -> Runtime { + runtime::Builder::new_multi_thread() + .worker_threads(num_threads) + .build() + .unwrap() +} diff --git a/vendor/tokio/src/runtime/tests/mod.rs b/vendor/tokio/src/runtime/tests/mod.rs index 3f2cc9825..b12a76e26 100644 --- a/vendor/tokio/src/runtime/tests/mod.rs +++ b/vendor/tokio/src/runtime/tests/mod.rs @@ -1,35 +1,73 @@ -#[cfg(not(all(tokio_unstable, feature = "tracing")))] -use crate::runtime::task::joinable; +// Enable dead_code / unreachable_pub here. It has been disabled in lib.rs for +// other code when running loom tests. +#![cfg_attr(loom, warn(dead_code, unreachable_pub))] -#[cfg(all(tokio_unstable, feature = "tracing"))] -use self::joinable_wrapper::joinable; +use self::noop_scheduler::NoopSchedule; +use self::unowned_wrapper::unowned; -#[cfg(all(tokio_unstable, feature = "tracing"))] -mod joinable_wrapper { - use crate::runtime::task::{JoinHandle, Notified, Schedule}; - use tracing::Instrument; +mod noop_scheduler { + use crate::runtime::task::{self, Task}; - pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) + /// `task::Schedule` implementation that does nothing, for testing. + pub(crate) struct NoopSchedule; + + impl task::Schedule for NoopSchedule { + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } + } +} + +mod unowned_wrapper { + use crate::runtime::task::{Id, JoinHandle, Notified}; + use crate::runtime::tests::NoopSchedule; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>) where T: std::future::Future + Send + 'static, - S: Schedule, + T::Output: Send + 'static, { + use tracing::Instrument; let span = tracing::trace_span!("test_span"); - crate::runtime::task::joinable(task.instrument(span)) + let task = task.instrument(span); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); + (task.into_notified(), handle) + } + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>) + where + T: std::future::Future + Send + 'static, + T::Output: Send + 'static, + { + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); + (task.into_notified(), handle) } } cfg_loom! { - mod loom_basic_scheduler; - mod loom_local; mod loom_blocking; + mod loom_current_thread_scheduler; + mod loom_local; mod loom_oneshot; mod loom_pool; mod loom_queue; mod loom_shutdown_join; + mod loom_join_set; + mod loom_yield; + + // Make sure debug assertions are enabled + #[cfg(not(debug_assertions))] + compiler_error!("these tests require debug assertions to be enabled"); } cfg_not_loom! { + mod inject; mod queue; #[cfg(not(miri))] diff --git a/vendor/tokio/src/runtime/tests/queue.rs b/vendor/tokio/src/runtime/tests/queue.rs index b2962f154..5df92b7a2 100644 --- a/vendor/tokio/src/runtime/tests/queue.rs +++ b/vendor/tokio/src/runtime/tests/queue.rs @@ -1,39 +1,107 @@ -use crate::runtime::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::{self, Schedule, Task}; +use std::cell::RefCell; use std::thread; use std::time::Duration; +#[allow(unused)] +macro_rules! assert_metrics { + ($stats:ident, $field:ident == $v:expr) => {{ + use crate::runtime::WorkerMetrics; + use std::sync::atomic::Ordering::Relaxed; + + let worker = WorkerMetrics::new(); + $stats.submit(&worker); + + let expect = $v; + let actual = worker.$field.load(Relaxed); + + assert!(actual == expect, "expect = {}; actual = {}", expect, actual) + }}; +} + +fn new_stats() -> Stats { + use crate::runtime::WorkerMetrics; + Stats::new(&WorkerMetrics::new()) +} + #[test] -fn fits_256() { +fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); for _ in 0..256 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } - assert!(inject.pop().is_none()); + cfg_metrics! { + assert_metrics!(stats, overflow_count == 0); + } + + assert!(inject.borrow_mut().pop().is_none()); while local.pop().is_some() {} } #[test] +fn fits_256_all_at_once() { + let (_, mut local) = queue::local(); + + let mut tasks = (0..256) + .map(|_| super::unowned(async {}).0) + .collect::<Vec<_>>(); + local.push_back(tasks.drain(..)); + + let mut i = 0; + while local.pop().is_some() { + i += 1; + } + + assert_eq!(i, 256); +} + +#[test] +fn fits_256_all_in_chunks() { + let (_, mut local) = queue::local(); + + let mut tasks = (0..256) + .map(|_| super::unowned(async {}).0) + .collect::<Vec<_>>(); + + local.push_back(tasks.drain(..10)); + local.push_back(tasks.drain(..100)); + local.push_back(tasks.drain(..46)); + local.push_back(tasks.drain(..100)); + + let mut i = 0; + while local.pop().is_some() { + i += 1; + } + + assert_eq!(i, 256); +} + +#[test] fn overflow() { let (_, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); for _ in 0..257 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); + } + + cfg_metrics! { + assert_metrics!(stats, overflow_count == 1); } let mut n = 0; - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); while local.pop().is_some() { n += 1; @@ -44,16 +112,22 @@ fn overflow() { #[test] fn steal_batch() { + let mut stats = new_stats(); + let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); for _ in 0..4 { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local1.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local1.push_back_or_overflow(task, &inject, &mut stats); } - assert!(steal1.steal_into(&mut local2).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); + + cfg_metrics! { + assert_metrics!(stats, steal_count == 2); + } for _ in 0..1 { assert!(local2.pop().is_some()); @@ -68,24 +142,35 @@ fn steal_batch() { assert!(local1.pop().is_none()); } +const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } +} + #[test] fn stress1() { - const NUM_ITER: usize = 1; - const NUM_STEAL: usize = 1_000; - const NUM_LOCAL: usize = 1_000; - const NUM_PUSH: usize = 500; - const NUM_POP: usize = 250; + const NUM_ITER: usize = 5; + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + const NUM_LOCAL: usize = normal_or_miri(1_000, 10); + const NUM_PUSH: usize = normal_or_miri(500, 10); + const NUM_POP: usize = normal_or_miri(250, 10); + + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); let th = thread::spawn(move || { + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -96,6 +181,10 @@ fn stress1() { thread::yield_now(); } + cfg_metrics! { + assert_metrics!(stats, steal_count == n as _); + } + n }); @@ -103,8 +192,8 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } for _ in 0..NUM_POP { @@ -116,9 +205,7 @@ fn stress1() { } } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th.join().unwrap(); @@ -129,19 +216,22 @@ fn stress1() { #[test] fn stress2() { const NUM_ITER: usize = 1; - const NUM_TASKS: usize = 1_000_000; - const NUM_STEAL: usize = 1_000; + const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = RefCell::new(vec![]); let th = thread::spawn(move || { + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -158,16 +248,14 @@ fn stress2() { let mut num_pop = 0; for i in 0..NUM_TASKS { - let (task, _) = super::joinable::<_, Runtime>(async {}); - local.push_back(task, &inject); + let (task, _) = super::unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; } - while inject.pop().is_some() { - num_pop += 1; - } + num_pop += inject.borrow_mut().drain(..).count(); } num_pop += th.join().unwrap(); @@ -176,9 +264,7 @@ fn stress2() { num_pop += 1; } - while inject.pop().is_some() { - num_pop += 1; - } + num_pop += inject.borrow_mut().drain(..).count(); assert_eq!(num_pop, NUM_TASKS); } @@ -187,11 +273,6 @@ fn stress2() { struct Runtime; impl Schedule for Runtime { - fn bind(task: Task<Self>) -> Runtime { - std::mem::forget(task); - Runtime - } - fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { None } diff --git a/vendor/tokio/src/runtime/tests/task.rs b/vendor/tokio/src/runtime/tests/task.rs index 7c2012523..a79c0f50d 100644 --- a/vendor/tokio/src/runtime/tests/task.rs +++ b/vendor/tokio/src/runtime/tests/task.rs @@ -1,44 +1,235 @@ -use crate::runtime::task::{self, Schedule, Task}; -use crate::util::linked_list::{Link, LinkedList}; +use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +struct AssertDropHandle { + is_dropped: Arc<AtomicBool>, +} +impl AssertDropHandle { + #[track_caller] + fn assert_dropped(&self) { + assert!(self.is_dropped.load(Ordering::SeqCst)); + } + + #[track_caller] + fn assert_not_dropped(&self) { + assert!(!self.is_dropped.load(Ordering::SeqCst)); + } +} + +struct AssertDrop { + is_dropped: Arc<AtomicBool>, +} +impl AssertDrop { + fn new() -> (Self, AssertDropHandle) { + let shared = Arc::new(AtomicBool::new(false)); + ( + AssertDrop { + is_dropped: shared.clone(), + }, + AssertDropHandle { + is_dropped: shared.clone(), + }, + ) + } +} +impl Drop for AssertDrop { + fn drop(&mut self) { + self.is_dropped.store(true, Ordering::SeqCst); + } +} + +// A Notified does not shut down on drop, but it is dropped once the ref-count +// hits zero. +#[test] +fn create_drop1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(notified); + handle.assert_not_dropped(); + drop(join); + handle.assert_dropped(); +} + #[test] -fn create_drop() { - let _ = super::joinable::<_, Runtime>(async { unreachable!() }); +fn create_drop2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(join); + handle.assert_not_dropped(); + drop(notified); + handle.assert_dropped(); +} + +#[test] +fn drop_abort_handle1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(join); + handle.assert_not_dropped(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_dropped(); +} + +#[test] +fn drop_abort_handle2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_not_dropped(); + drop(join); + handle.assert_dropped(); +} + +// Shutting down through Notified works +#[test] +fn create_shutdown1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(join); + handle.assert_not_dropped(); + notified.shutdown(); + handle.assert_dropped(); +} + +#[test] +fn create_shutdown2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + handle.assert_not_dropped(); + notified.shutdown(); + handle.assert_dropped(); + drop(join); +} + +#[test] +fn unowned_poll() { + let (task, _) = unowned(async {}, NoopSchedule, Id::next()); + task.run(); } #[test] fn schedule() { with(|rt| { - let (task, _) = super::joinable(async { + rt.spawn(async { crate::task::yield_now().await; }); - rt.schedule(task); - assert_eq!(2, rt.tick()); + rt.shutdown(); }) } #[test] fn shutdown() { with(|rt| { - let (task, _) = super::joinable(async { + rt.spawn(async { loop { crate::task::yield_now().await; } }); - rt.schedule(task); rt.tick_max(1); rt.shutdown(); }) } +#[test] +fn shutdown_immediately() { + with(|rt| { + rt.spawn(async { + loop { + crate::task::yield_now().await; + } + }); + + rt.shutdown(); + }) +} + +#[test] +fn spawn_during_shutdown() { + static DID_SPAWN: AtomicBool = AtomicBool::new(false); + + struct SpawnOnDrop(Runtime); + impl Drop for SpawnOnDrop { + fn drop(&mut self) { + DID_SPAWN.store(true, Ordering::SeqCst); + self.0.spawn(async {}); + } + } + + with(|rt| { + let rt2 = rt.clone(); + rt.spawn(async move { + let _spawn_on_drop = SpawnOnDrop(rt2); + + loop { + crate::task::yield_now().await; + } + }); + + rt.tick_max(1); + rt.shutdown(); + }); + + assert!(DID_SPAWN.load(Ordering::SeqCst)); +} + fn with(f: impl FnOnce(Runtime)) { struct Reset; @@ -51,10 +242,9 @@ fn with(f: impl FnOnce(Runtime)) { let _reset = Reset; let rt = Runtime(Arc::new(Inner { - released: task::TransferStack::new(), + owned: OwnedTasks::new(), core: TryLock::new(Core { queue: VecDeque::new(), - tasks: LinkedList::new(), }), })); @@ -66,18 +256,31 @@ fn with(f: impl FnOnce(Runtime)) { struct Runtime(Arc<Inner>); struct Inner { - released: task::TransferStack<Runtime>, core: TryLock<Core>, + owned: OwnedTasks<Runtime>, } struct Core { queue: VecDeque<task::Notified<Runtime>>, - tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>, } static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None); impl Runtime { + fn spawn<T>(&self, future: T) -> JoinHandle<T::Output> + where + T: 'static + Send + Future, + T::Output: 'static + Send, + { + let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next()); + + if let Some(notified) = notified { + self.schedule(notified); + } + + handle + } + fn tick(&self) -> usize { self.tick_max(usize::MAX) } @@ -88,11 +291,10 @@ impl Runtime { while !self.is_empty() && n < max { let task = self.next_task(); n += 1; + let task = self.0.owned.assert_owner(task); task.run(); } - self.0.maintenance(); - n } @@ -107,50 +309,21 @@ impl Runtime { fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); - for task in core.tasks.iter() { - task.shutdown(); - } + self.0.owned.close_and_shutdown_all(); while let Some(task) = core.queue.pop_back() { - task.shutdown(); + drop(task); } drop(core); - while !self.0.core.try_lock().unwrap().tasks.is_empty() { - self.0.maintenance(); - } - } -} - -impl Inner { - fn maintenance(&self) { - use std::mem::ManuallyDrop; - - for task in self.released.drain() { - let task = ManuallyDrop::new(task); - - // safety: see worker.rs - unsafe { - let ptr = task.header().into(); - self.core.try_lock().unwrap().tasks.remove(ptr); - } - } + assert!(self.0.owned.is_empty()); } } impl Schedule for Runtime { - fn bind(task: Task<Self>) -> Runtime { - let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); - rt.0.core.try_lock().unwrap().tasks.push_front(task); - rt - } - fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { - // safety: copying worker.rs - let task = unsafe { Task::from_raw(task.header().into()) }; - self.0.released.push(task); - None + self.0.owned.remove(task) } fn schedule(&self, task: task::Notified<Self>) { diff --git a/vendor/tokio/src/runtime/tests/task_combinations.rs b/vendor/tokio/src/runtime/tests/task_combinations.rs index 76ce2330c..73a20d976 100644 --- a/vendor/tokio/src/runtime/tests/task_combinations.rs +++ b/vendor/tokio/src/runtime/tests/task_combinations.rs @@ -1,8 +1,10 @@ +use std::fmt; use std::future::Future; use std::panic; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::runtime::task::AbortHandle; use crate::runtime::Builder; use crate::sync::oneshot; use crate::task::JoinHandle; @@ -56,6 +58,12 @@ enum CombiAbort { AbortedAfterConsumeOutput = 4, } +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbortSource { + JoinHandle, + AbortHandle, +} + #[test] fn test_combinations() { let mut rt = &[ @@ -90,6 +98,13 @@ fn test_combinations() { CombiAbort::AbortedAfterFinish, CombiAbort::AbortedAfterConsumeOutput, ]; + let ah = [ + None, + Some(CombiJoinHandle::DropImmediately), + Some(CombiJoinHandle::DropFirstPoll), + Some(CombiJoinHandle::DropAfterNoConsume), + Some(CombiJoinHandle::DropAfterConsume), + ]; for rt in rt.iter().copied() { for ls in ls.iter().copied() { @@ -98,7 +113,34 @@ fn test_combinations() { for ji in ji.iter().copied() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { - test_combination(rt, ls, task, output, ji, jh, abort); + // abort via join handle --- abort handles + // may be dropped at any point + for ah in ah.iter().copied() { + test_combination( + rt, + ls, + task, + output, + ji, + jh, + ah, + abort, + CombiAbortSource::JoinHandle, + ); + } + // if aborting via AbortHandle, it will + // never be dropped. + test_combination( + rt, + ls, + task, + output, + ji, + jh, + None, + abort, + CombiAbortSource::AbortHandle, + ); } } } @@ -108,6 +150,9 @@ fn test_combinations() { } } +fn is_debug<T: fmt::Debug>(_: &T) {} + +#[allow(clippy::too_many_arguments)] fn test_combination( rt: CombiRuntime, ls: CombiLocalSet, @@ -115,12 +160,24 @@ fn test_combination( output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, + ah: Option<CombiJoinHandle>, abort: CombiAbort, + abort_src: CombiAbortSource, ) { - if (jh as usize) < (abort as usize) { - // drop before abort not possible - return; + match (abort_src, ah) { + (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => { + // join handle dropped prior to abort + return; + } + (CombiAbortSource::AbortHandle, Some(_)) => { + // abort handle dropped, we can't abort through the + // abort handle + return; + } + + _ => {} } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { // this causes double panic return; @@ -130,7 +187,15 @@ fn test_combination( return; } - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + is_debug(&rt); + is_debug(&ls); + is_debug(&task); + is_debug(&output); + is_debug(&ji); + is_debug(&jh); + is_debug(&ah); + is_debug(&abort); + is_debug(&abort_src); // A runtime optionally with a LocalSet struct Rt { @@ -282,8 +347,24 @@ fn test_combination( ); } + // If we are either aborting the task via an abort handle, or dropping via + // an abort handle, do that now. + let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle { + handle.as_ref().map(JoinHandle::abort_handle) + } else { + None + }; + + let do_abort = |abort_handle: &mut Option<AbortHandle>, + join_handle: Option<&mut JoinHandle<_>>| { + match abort_src { + CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(), + CombiAbortSource::JoinHandle => join_handle.unwrap().abort(), + } + }; + if abort == CombiAbort::AbortedImmediately { - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); aborted = true; } if jh == CombiJoinHandle::DropImmediately { @@ -301,12 +382,15 @@ fn test_combination( } if abort == CombiAbort::AbortedFirstPoll { - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); aborted = true; } if jh == CombiJoinHandle::DropFirstPoll { drop(handle.take().unwrap()); } + if ah == Some(CombiJoinHandle::DropFirstPoll) { + drop(abort_handle.take().unwrap()); + } // Signal the future that it can return now let _ = on_complete.send(()); @@ -318,23 +402,42 @@ fn test_combination( if abort == CombiAbort::AbortedAfterFinish { // Don't set aborted to true here as the task already finished - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); } if jh == CombiJoinHandle::DropAfterNoConsume { - // The runtime will usually have dropped every ref-count at this point, - // in which case dropping the JoinHandle drops the output. - // - // (But it might race and still hold a ref-count) - let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + if ah == Some(CombiJoinHandle::DropAfterNoConsume) { drop(handle.take().unwrap()); - })); - if panic.is_err() { - assert!( - (output == CombiOutput::PanicOnDrop) - && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) - && !aborted, - "Dropping JoinHandle shouldn't panic here" - ); + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the AbortHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(abort_handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping AbortHandle shouldn't panic here" + ); + } + } else { + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); + } } } @@ -362,11 +465,15 @@ fn test_combination( _ => unreachable!(), } - let handle = handle.take().unwrap(); + let mut handle = handle.take().unwrap(); if abort == CombiAbort::AbortedAfterConsumeOutput { - handle.abort(); + do_abort(&mut abort_handle, Some(&mut handle)); } drop(handle); + + if ah == Some(CombiJoinHandle::DropAfterConsume) { + drop(abort_handle.take()); + } } // The output should have been dropped now. Check whether the output |