summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/tests
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/tests')
-rw-r--r--vendor/tokio/src/runtime/tests/inject.rs54
-rw-r--r--vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs82
-rw-r--r--vendor/tokio/src/runtime/tests/loom_blocking.rs71
-rw-r--r--vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs142
-rw-r--r--vendor/tokio/src/runtime/tests/loom_join_set.rs82
-rw-r--r--vendor/tokio/src/runtime/tests/loom_pool.rs157
-rw-r--r--vendor/tokio/src/runtime/tests/loom_queue.rs95
-rw-r--r--vendor/tokio/src/runtime/tests/loom_yield.rs37
-rw-r--r--vendor/tokio/src/runtime/tests/mod.rs64
-rw-r--r--vendor/tokio/src/runtime/tests/queue.rs171
-rw-r--r--vendor/tokio/src/runtime/tests/task.rs269
-rw-r--r--vendor/tokio/src/runtime/tests/task_combinations.rs153
12 files changed, 1074 insertions, 303 deletions
diff --git a/vendor/tokio/src/runtime/tests/inject.rs b/vendor/tokio/src/runtime/tests/inject.rs
new file mode 100644
index 000000000..ccead5e02
--- /dev/null
+++ b/vendor/tokio/src/runtime/tests/inject.rs
@@ -0,0 +1,54 @@
+use crate::runtime::scheduler::inject;
+
+#[test]
+fn push_and_pop() {
+ const N: usize = 2;
+
+ let (inject, mut synced) = inject::Shared::new();
+
+ for i in 0..N {
+ assert_eq!(inject.len(), i);
+ let (task, _) = super::unowned(async {});
+ unsafe { inject.push(&mut synced, task) };
+ }
+
+ for i in 0..N {
+ assert_eq!(inject.len(), N - i);
+ assert!(unsafe { inject.pop(&mut synced) }.is_some());
+ }
+
+ println!("--------------");
+
+ assert!(unsafe { inject.pop(&mut synced) }.is_none());
+}
+
+#[test]
+fn push_batch_and_pop() {
+ let (inject, mut inject_synced) = inject::Shared::new();
+
+ unsafe {
+ inject.push_batch(
+ &mut inject_synced,
+ (0..10).map(|_| super::unowned(async {}).0),
+ );
+
+ assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count());
+ assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count());
+ assert_eq!(0, inject.pop_n(&mut inject_synced, 5).count());
+ }
+}
+
+#[test]
+fn pop_n_drains_on_drop() {
+ let (inject, mut inject_synced) = inject::Shared::new();
+
+ unsafe {
+ inject.push_batch(
+ &mut inject_synced,
+ (0..10).map(|_| super::unowned(async {}).0),
+ );
+ let _ = inject.pop_n(&mut inject_synced, 10);
+
+ assert_eq!(inject.len(), 0);
+ }
+}
diff --git a/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs b/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs
deleted file mode 100644
index e6221d3b1..000000000
--- a/vendor/tokio/src/runtime/tests/loom_basic_scheduler.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-use crate::loom::sync::atomic::AtomicUsize;
-use crate::loom::sync::Arc;
-use crate::loom::thread;
-use crate::runtime::{Builder, Runtime};
-use crate::sync::oneshot::{self, Receiver};
-use crate::task;
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::atomic::Ordering::{Acquire, Release};
-use std::task::{Context, Poll};
-
-fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
- let (tx, rx) = oneshot::channel();
- let num_polls = Arc::new(AtomicUsize::new(0));
- rt.spawn(async move {
- for _ in 0..12 {
- task::yield_now().await;
- }
- tx.send(()).unwrap();
- });
-
- rt.block_on(async {
- BlockedFuture {
- rx,
- num_polls: num_polls.clone(),
- }
- .await;
- });
-
- let polls = num_polls.load(Acquire);
- assert!(polls <= at_most_polls);
-}
-
-#[test]
-fn block_on_num_polls() {
- loom::model(|| {
- // we expect at most 3 number of polls because there are
- // three points at which we poll the future. At any of these
- // points it can be ready:
- //
- // - when we fail to steal the parker and we block on a
- // notification that it is available.
- //
- // - when we steal the parker and we schedule the future
- //
- // - when the future is woken up and we have ran the max
- // number of tasks for the current tick or there are no
- // more tasks to run.
- //
- let at_most = 3;
-
- let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
- let rt2 = rt1.clone();
- let rt3 = rt1.clone();
-
- let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
- let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
- let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
-
- th1.join().unwrap();
- th2.join().unwrap();
- th3.join().unwrap();
- });
-}
-
-struct BlockedFuture {
- rx: Receiver<()>,
- num_polls: Arc<AtomicUsize>,
-}
-
-impl Future for BlockedFuture {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.num_polls.fetch_add(1, Release);
-
- match Pin::new(&mut self.rx).poll(cx) {
- Poll::Pending => Poll::Pending,
- _ => Poll::Ready(()),
- }
- }
-}
diff --git a/vendor/tokio/src/runtime/tests/loom_blocking.rs b/vendor/tokio/src/runtime/tests/loom_blocking.rs
index 8fb54c565..5c4aeae39 100644
--- a/vendor/tokio/src/runtime/tests/loom_blocking.rs
+++ b/vendor/tokio/src/runtime/tests/loom_blocking.rs
@@ -23,6 +23,77 @@ fn blocking_shutdown() {
});
}
+#[test]
+fn spawn_mandatory_blocking_should_always_run() {
+ use crate::runtime::tests::loom_oneshot;
+ loom::model(|| {
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
+
+ let (tx, rx) = loom_oneshot::channel();
+ let _enter = rt.enter();
+ runtime::spawn_blocking(|| {});
+ runtime::spawn_mandatory_blocking(move || {
+ let _ = tx.send(());
+ })
+ .unwrap();
+
+ drop(rt);
+
+ // This call will deadlock if `spawn_mandatory_blocking` doesn't run.
+ let () = rx.recv();
+ });
+}
+
+#[test]
+fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() {
+ use crate::runtime::tests::loom_oneshot;
+ loom::model(|| {
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
+ let handle = rt.handle().clone();
+
+ // Drop the runtime in a different thread
+ {
+ loom::thread::spawn(move || {
+ drop(rt);
+ });
+ }
+
+ let _enter = handle.enter();
+ let (tx, rx) = loom_oneshot::channel();
+ let handle = runtime::spawn_mandatory_blocking(move || {
+ let _ = tx.send(());
+ });
+
+ // handle.is_some() means that `spawn_mandatory_blocking`
+ // promised us to run the blocking task
+ if handle.is_some() {
+ // This call will deadlock if `spawn_mandatory_blocking` doesn't run.
+ let () = rx.recv();
+ }
+ });
+}
+
+#[test]
+fn spawn_blocking_when_paused() {
+ use std::time::Duration;
+ loom::model(|| {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+ let handle = rt.handle();
+ let _enter = handle.enter();
+ let a = crate::task::spawn_blocking(|| {});
+ let b = crate::task::spawn_blocking(|| {});
+ rt.block_on(crate::time::timeout(Duration::from_millis(1), async move {
+ a.await.expect("blocking task should finish");
+ b.await.expect("blocking task should finish");
+ }))
+ .expect("timeout should not trigger");
+ });
+}
+
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
diff --git a/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs b/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs
new file mode 100644
index 000000000..a772603f7
--- /dev/null
+++ b/vendor/tokio/src/runtime/tests/loom_current_thread_scheduler.rs
@@ -0,0 +1,142 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Arc;
+use crate::loom::thread;
+use crate::runtime::{Builder, Runtime};
+use crate::sync::oneshot::{self, Receiver};
+use crate::task;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::{Acquire, Release};
+use std::task::{Context, Poll};
+
+fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
+ let (tx, rx) = oneshot::channel();
+ let num_polls = Arc::new(AtomicUsize::new(0));
+ rt.spawn(async move {
+ for _ in 0..12 {
+ task::yield_now().await;
+ }
+ tx.send(()).unwrap();
+ });
+
+ rt.block_on(async {
+ BlockedFuture {
+ rx,
+ num_polls: num_polls.clone(),
+ }
+ .await;
+ });
+
+ let polls = num_polls.load(Acquire);
+ assert!(polls <= at_most_polls);
+}
+
+#[test]
+fn block_on_num_polls() {
+ loom::model(|| {
+ // we expect at most 4 number of polls because there are three points at
+ // which we poll the future and an opportunity for a false-positive.. At
+ // any of these points it can be ready:
+ //
+ // - when we fail to steal the parker and we block on a notification
+ // that it is available.
+ //
+ // - when we steal the parker and we schedule the future
+ //
+ // - when the future is woken up and we have ran the max number of tasks
+ // for the current tick or there are no more tasks to run.
+ //
+ // - a thread is notified that the parker is available but a third
+ // thread acquires it before the notified thread can.
+ //
+ let at_most = 4;
+
+ let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
+ let rt2 = rt1.clone();
+ let rt3 = rt1.clone();
+
+ let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
+ let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
+ let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ th3.join().unwrap();
+ });
+}
+
+#[test]
+fn assert_no_unnecessary_polls() {
+ loom::model(|| {
+ // // After we poll outer future, woken should reset to false
+ let rt = Builder::new_current_thread().build().unwrap();
+ let (tx, rx) = oneshot::channel();
+ let pending_cnt = Arc::new(AtomicUsize::new(0));
+
+ rt.spawn(async move {
+ for _ in 0..24 {
+ task::yield_now().await;
+ }
+ tx.send(()).unwrap();
+ });
+
+ let pending_cnt_clone = pending_cnt.clone();
+ rt.block_on(async move {
+ // use task::yield_now() to ensure woken set to true
+ // ResetFuture will be polled at most once
+ // Here comes two cases
+ // 1. recv no message from channel, ResetFuture will be polled
+ // but get Pending and we record ResetFuture.pending_cnt ++.
+ // Then when message arrive, ResetFuture returns Ready. So we
+ // expect ResetFuture.pending_cnt = 1
+ // 2. recv message from channel, ResetFuture returns Ready immediately.
+ // We expect ResetFuture.pending_cnt = 0
+ task::yield_now().await;
+ ResetFuture {
+ rx,
+ pending_cnt: pending_cnt_clone,
+ }
+ .await;
+ });
+
+ let pending_cnt = pending_cnt.load(Acquire);
+ assert!(pending_cnt <= 1);
+ });
+}
+
+struct BlockedFuture {
+ rx: Receiver<()>,
+ num_polls: Arc<AtomicUsize>,
+}
+
+impl Future for BlockedFuture {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.num_polls.fetch_add(1, Release);
+
+ match Pin::new(&mut self.rx).poll(cx) {
+ Poll::Pending => Poll::Pending,
+ _ => Poll::Ready(()),
+ }
+ }
+}
+
+struct ResetFuture {
+ rx: Receiver<()>,
+ pending_cnt: Arc<AtomicUsize>,
+}
+
+impl Future for ResetFuture {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match Pin::new(&mut self.rx).poll(cx) {
+ Poll::Pending => {
+ self.pending_cnt.fetch_add(1, Release);
+ Poll::Pending
+ }
+ _ => Poll::Ready(()),
+ }
+ }
+}
diff --git a/vendor/tokio/src/runtime/tests/loom_join_set.rs b/vendor/tokio/src/runtime/tests/loom_join_set.rs
new file mode 100644
index 000000000..bd343876a
--- /dev/null
+++ b/vendor/tokio/src/runtime/tests/loom_join_set.rs
@@ -0,0 +1,82 @@
+use crate::runtime::Builder;
+use crate::task::JoinSet;
+
+#[test]
+fn test_join_set() {
+ loom::model(|| {
+ let rt = Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+ let mut set = JoinSet::new();
+
+ rt.block_on(async {
+ assert_eq!(set.len(), 0);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 1);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 2);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 1);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 2);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 1);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 0);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 1);
+ });
+
+ drop(set);
+ drop(rt);
+ });
+}
+
+#[test]
+fn abort_all_during_completion() {
+ use std::sync::{
+ atomic::{AtomicBool, Ordering::SeqCst},
+ Arc,
+ };
+
+ // These booleans assert that at least one execution had the task complete first, and that at
+ // least one execution had the task be cancelled before it completed.
+ let complete_happened = Arc::new(AtomicBool::new(false));
+ let cancel_happened = Arc::new(AtomicBool::new(false));
+
+ {
+ let complete_happened = complete_happened.clone();
+ let cancel_happened = cancel_happened.clone();
+ loom::model(move || {
+ let rt = Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let mut set = JoinSet::new();
+
+ rt.block_on(async {
+ set.spawn(async { () });
+ set.abort_all();
+
+ match set.join_next().await {
+ Some(Ok(())) => complete_happened.store(true, SeqCst),
+ Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
+ Some(Err(err)) => panic!("fail: {}", err),
+ None => {
+ unreachable!("Aborting the task does not remove it from the JoinSet.")
+ }
+ }
+
+ assert!(matches!(set.join_next().await, None));
+ });
+
+ drop(set);
+ drop(rt);
+ });
+ }
+
+ assert!(complete_happened.load(SeqCst));
+ assert!(cancel_happened.load(SeqCst));
+}
diff --git a/vendor/tokio/src/runtime/tests/loom_pool.rs b/vendor/tokio/src/runtime/tests/loom_pool.rs
index 06ad6412f..fb42e1eb4 100644
--- a/vendor/tokio/src/runtime/tests/loom_pool.rs
+++ b/vendor/tokio/src/runtime/tests/loom_pool.rs
@@ -11,7 +11,7 @@ use crate::{spawn, task};
use tokio_test::assert_ok;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
-use loom::sync::{Arc, Mutex};
+use loom::sync::Arc;
use pin_project_lite::pin_project;
use std::future::Future;
@@ -19,6 +19,57 @@ use std::pin::Pin;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::task::{Context, Poll};
+mod atomic_take {
+ use loom::sync::atomic::AtomicBool;
+ use std::mem::MaybeUninit;
+ use std::sync::atomic::Ordering::SeqCst;
+
+ pub(super) struct AtomicTake<T> {
+ inner: MaybeUninit<T>,
+ taken: AtomicBool,
+ }
+
+ impl<T> AtomicTake<T> {
+ pub(super) fn new(value: T) -> Self {
+ Self {
+ inner: MaybeUninit::new(value),
+ taken: AtomicBool::new(false),
+ }
+ }
+
+ pub(super) fn take(&self) -> Option<T> {
+ // safety: Only one thread will see the boolean change from false
+ // to true, so that thread is able to take the value.
+ match self.taken.fetch_or(true, SeqCst) {
+ false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
+ true => None,
+ }
+ }
+ }
+
+ impl<T> Drop for AtomicTake<T> {
+ fn drop(&mut self) {
+ drop(self.take());
+ }
+ }
+}
+
+#[derive(Clone)]
+struct AtomicOneshot<T> {
+ value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
+}
+impl<T> AtomicOneshot<T> {
+ fn new(sender: oneshot::Sender<T>) -> Self {
+ Self {
+ value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
+ }
+ }
+
+ fn assert_send(&self, value: T) {
+ self.value.take().unwrap().send(value);
+ }
+}
+
/// Tests are divided into groups to make the runs faster on CI.
mod group_a {
use super::*;
@@ -52,7 +103,7 @@ mod group_a {
let c1 = Arc::new(AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
- let tx1 = Arc::new(Mutex::new(Some(tx)));
+ let tx1 = AtomicOneshot::new(tx);
// Spawn a task
let c2 = c1.clone();
@@ -60,7 +111,7 @@ mod group_a {
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c1.fetch_add(1, Relaxed) {
- tx1.lock().unwrap().take().unwrap().send(());
+ tx1.assert_send(());
}
}));
}));
@@ -69,7 +120,7 @@ mod group_a {
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c2.fetch_add(1, Relaxed) {
- tx2.lock().unwrap().take().unwrap().send(());
+ tx2.assert_send(());
}
}));
}));
@@ -119,7 +170,7 @@ mod group_b {
let (block_tx, block_rx) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
- let done_tx = Arc::new(Mutex::new(Some(done_tx)));
+ let done_tx = AtomicOneshot::new(done_tx);
pool.spawn(track(async move {
crate::task::block_in_place(move || {
@@ -136,7 +187,7 @@ mod group_b {
pool.spawn(track(async move {
if NUM == cnt.fetch_add(1, Relaxed) + 1 {
- done_tx.lock().unwrap().take().unwrap().send(());
+ done_tx.assert_send(());
}
}));
}
@@ -159,23 +210,6 @@ mod group_b {
}
#[test]
- fn pool_shutdown() {
- loom::model(|| {
- let pool = mk_pool(2);
-
- pool.spawn(track(async move {
- gated2(true).await;
- }));
-
- pool.spawn(track(async move {
- gated2(false).await;
- }));
-
- drop(pool);
- });
- }
-
- #[test]
fn join_output() {
loom::model(|| {
let rt = mk_pool(1);
@@ -223,10 +257,6 @@ mod group_b {
});
});
}
-}
-
-mod group_c {
- use super::*;
#[test]
fn shutdown_with_notification() {
@@ -255,6 +285,27 @@ mod group_c {
}
}
+mod group_c {
+ use super::*;
+
+ #[test]
+ fn pool_shutdown() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+
+ pool.spawn(track(async move {
+ gated2(true).await;
+ }));
+
+ pool.spawn(track(async move {
+ gated2(false).await;
+ }));
+
+ drop(pool);
+ });
+ }
+}
+
mod group_d {
use super::*;
@@ -266,27 +317,25 @@ mod group_d {
let c1 = Arc::new(AtomicUsize::new(0));
let (done_tx, done_rx) = oneshot::channel();
- let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
+ let done_tx1 = AtomicOneshot::new(done_tx);
+ let done_tx2 = done_tx1.clone();
// Spawn a task
let c2 = c1.clone();
- let done_tx2 = done_tx1.clone();
pool.spawn(track(async move {
- gated().await;
- gated().await;
+ multi_gated().await;
if 1 == c1.fetch_add(1, Relaxed) {
- done_tx1.lock().unwrap().take().unwrap().send(());
+ done_tx1.assert_send(());
}
}));
// Spawn a second task
pool.spawn(track(async move {
- gated().await;
- gated().await;
+ multi_gated().await;
if 1 == c2.fetch_add(1, Relaxed) {
- done_tx2.lock().unwrap().take().unwrap().send(());
+ done_tx2.assert_send(());
}
}));
@@ -298,14 +347,12 @@ mod group_d {
fn mk_pool(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
+ // Set the intervals to avoid tuning logic
+ .event_interval(2)
.build()
.unwrap()
}
-fn gated() -> impl Future<Output = &'static str> {
- gated2(false)
-}
-
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
use loom::thread;
use std::sync::Arc;
@@ -343,6 +390,38 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> {
})
}
+async fn multi_gated() {
+ struct Gate {
+ waker: loom::future::AtomicWaker,
+ count: AtomicUsize,
+ }
+
+ let gate = Arc::new(Gate {
+ waker: loom::future::AtomicWaker::new(),
+ count: AtomicUsize::new(0),
+ });
+
+ {
+ let gate = gate.clone();
+ spawn(track(async move {
+ for i in 1..3 {
+ gate.count.store(i, SeqCst);
+ gate.waker.wake();
+ }
+ }));
+ }
+
+ poll_fn(move |cx| {
+ if gate.count.load(SeqCst) < 2 {
+ gate.waker.register_by_ref(cx.waker());
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ })
+ .await;
+}
+
fn track<T: Future>(f: T) -> Track<T> {
Track {
inner: f,
diff --git a/vendor/tokio/src/runtime/tests/loom_queue.rs b/vendor/tokio/src/runtime/tests/loom_queue.rs
index 34da7fd66..b60e039b9 100644
--- a/vendor/tokio/src/runtime/tests/loom_queue.rs
+++ b/vendor/tokio/src/runtime/tests/loom_queue.rs
@@ -1,20 +1,27 @@
-use crate::runtime::queue;
-use crate::runtime::task::{self, Schedule, Task};
+use crate::runtime::scheduler::multi_thread::{queue, Stats};
+use crate::runtime::tests::NoopSchedule;
use loom::thread;
+use std::cell::RefCell;
+
+fn new_stats() -> Stats {
+ Stats::new(&crate::runtime::WorkerMetrics::new())
+}
#[test]
fn basic() {
loom::model(|| {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -30,8 +37,8 @@ fn basic() {
for _ in 0..2 {
for _ in 0..2 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
if local.pop().is_some() {
@@ -39,17 +46,15 @@ fn basic() {
}
// Push another task
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
while local.pop().is_some() {
n += 1;
}
}
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
n += th.join().unwrap();
@@ -61,13 +66,15 @@ fn basic() {
fn steal_overflow() {
loom::model(|| {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -81,16 +88,16 @@ fn steal_overflow() {
let mut n = 0;
// push a task, pop a task
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
if local.pop().is_some() {
n += 1;
}
for _ in 0..6 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
n += th.join().unwrap();
@@ -99,9 +106,7 @@ fn steal_overflow() {
n += 1;
}
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
assert_eq!(7, n);
});
@@ -111,10 +116,11 @@ fn steal_overflow() {
fn multi_stealer() {
const NUM_TASKS: usize = 5;
- fn steal_tasks(steal: queue::Steal<Runtime>) -> usize {
+ fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
- if steal.steal_into(&mut local).is_none() {
+ if steal.steal_into(&mut local, &mut stats).is_none() {
return 0;
}
@@ -129,12 +135,13 @@ fn multi_stealer() {
loom::model(|| {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
// Push work
for _ in 0..NUM_TASKS {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
let th1 = {
@@ -150,9 +157,7 @@ fn multi_stealer() {
n += 1;
}
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
n += th1.join().unwrap();
n += th2.join().unwrap();
@@ -164,23 +169,25 @@ fn multi_stealer() {
#[test]
fn chained_steal() {
loom::model(|| {
+ let mut stats = new_stats();
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
// Load up some tasks
for _ in 0..4 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- l1.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ l1.push_back_or_overflow(task, &inject, &mut stats);
- let (task, _) = super::joinable::<_, Runtime>(async {});
- l2.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ l2.push_back_or_overflow(task, &inject, &mut stats);
}
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
- s1.steal_into(&mut local);
+ s1.steal_into(&mut local, &mut stats);
while local.pop().is_some() {}
});
@@ -188,29 +195,11 @@ fn chained_steal() {
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
- s2.steal_into(&mut l1);
+ s2.steal_into(&mut l1, &mut stats);
th.join().unwrap();
while l1.pop().is_some() {}
while l2.pop().is_some() {}
- while inject.pop().is_some() {}
});
}
-
-struct Runtime;
-
-impl Schedule for Runtime {
- fn bind(task: Task<Self>) -> Runtime {
- std::mem::forget(task);
- Runtime
- }
-
- fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
- None
- }
-
- fn schedule(&self, _task: task::Notified<Self>) {
- unreachable!();
- }
-}
diff --git a/vendor/tokio/src/runtime/tests/loom_yield.rs b/vendor/tokio/src/runtime/tests/loom_yield.rs
new file mode 100644
index 000000000..ba506e5a4
--- /dev/null
+++ b/vendor/tokio/src/runtime/tests/loom_yield.rs
@@ -0,0 +1,37 @@
+use crate::runtime::park;
+use crate::runtime::tests::loom_oneshot as oneshot;
+use crate::runtime::{self, Runtime};
+
+#[test]
+fn yield_calls_park_before_scheduling_again() {
+ // Don't need to check all permutations
+ let mut loom = loom::model::Builder::default();
+ loom.max_permutations = Some(1);
+ loom.check(|| {
+ let rt = mk_runtime(2);
+ let (tx, rx) = oneshot::channel::<()>();
+
+ rt.spawn(async {
+ let tid = loom::thread::current().id();
+ let park_count = park::current_thread_park_count();
+
+ crate::task::yield_now().await;
+
+ if tid == loom::thread::current().id() {
+ let new_park_count = park::current_thread_park_count();
+ assert_eq!(park_count + 1, new_park_count);
+ }
+
+ tx.send(());
+ });
+
+ rx.recv();
+ });
+}
+
+fn mk_runtime(num_threads: usize) -> Runtime {
+ runtime::Builder::new_multi_thread()
+ .worker_threads(num_threads)
+ .build()
+ .unwrap()
+}
diff --git a/vendor/tokio/src/runtime/tests/mod.rs b/vendor/tokio/src/runtime/tests/mod.rs
index 3f2cc9825..b12a76e26 100644
--- a/vendor/tokio/src/runtime/tests/mod.rs
+++ b/vendor/tokio/src/runtime/tests/mod.rs
@@ -1,35 +1,73 @@
-#[cfg(not(all(tokio_unstable, feature = "tracing")))]
-use crate::runtime::task::joinable;
+// Enable dead_code / unreachable_pub here. It has been disabled in lib.rs for
+// other code when running loom tests.
+#![cfg_attr(loom, warn(dead_code, unreachable_pub))]
-#[cfg(all(tokio_unstable, feature = "tracing"))]
-use self::joinable_wrapper::joinable;
+use self::noop_scheduler::NoopSchedule;
+use self::unowned_wrapper::unowned;
-#[cfg(all(tokio_unstable, feature = "tracing"))]
-mod joinable_wrapper {
- use crate::runtime::task::{JoinHandle, Notified, Schedule};
- use tracing::Instrument;
+mod noop_scheduler {
+ use crate::runtime::task::{self, Task};
- pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ /// `task::Schedule` implementation that does nothing, for testing.
+ pub(crate) struct NoopSchedule;
+
+ impl task::Schedule for NoopSchedule {
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+ }
+}
+
+mod unowned_wrapper {
+ use crate::runtime::task::{Id, JoinHandle, Notified};
+ use crate::runtime::tests::NoopSchedule;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
where
T: std::future::Future + Send + 'static,
- S: Schedule,
+ T::Output: Send + 'static,
{
+ use tracing::Instrument;
let span = tracing::trace_span!("test_span");
- crate::runtime::task::joinable(task.instrument(span))
+ let task = task.instrument(span);
+ let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next());
+ (task.into_notified(), handle)
+ }
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
+ where
+ T: std::future::Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next());
+ (task.into_notified(), handle)
}
}
cfg_loom! {
- mod loom_basic_scheduler;
- mod loom_local;
mod loom_blocking;
+ mod loom_current_thread_scheduler;
+ mod loom_local;
mod loom_oneshot;
mod loom_pool;
mod loom_queue;
mod loom_shutdown_join;
+ mod loom_join_set;
+ mod loom_yield;
+
+ // Make sure debug assertions are enabled
+ #[cfg(not(debug_assertions))]
+ compiler_error!("these tests require debug assertions to be enabled");
}
cfg_not_loom! {
+ mod inject;
mod queue;
#[cfg(not(miri))]
diff --git a/vendor/tokio/src/runtime/tests/queue.rs b/vendor/tokio/src/runtime/tests/queue.rs
index b2962f154..5df92b7a2 100644
--- a/vendor/tokio/src/runtime/tests/queue.rs
+++ b/vendor/tokio/src/runtime/tests/queue.rs
@@ -1,39 +1,107 @@
-use crate::runtime::queue;
+use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::task::{self, Schedule, Task};
+use std::cell::RefCell;
use std::thread;
use std::time::Duration;
+#[allow(unused)]
+macro_rules! assert_metrics {
+ ($stats:ident, $field:ident == $v:expr) => {{
+ use crate::runtime::WorkerMetrics;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ let worker = WorkerMetrics::new();
+ $stats.submit(&worker);
+
+ let expect = $v;
+ let actual = worker.$field.load(Relaxed);
+
+ assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
+ }};
+}
+
+fn new_stats() -> Stats {
+ use crate::runtime::WorkerMetrics;
+ Stats::new(&WorkerMetrics::new())
+}
+
#[test]
-fn fits_256() {
+fn fits_256_one_at_a_time() {
let (_, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
for _ in 0..256 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
- assert!(inject.pop().is_none());
+ cfg_metrics! {
+ assert_metrics!(stats, overflow_count == 0);
+ }
+
+ assert!(inject.borrow_mut().pop().is_none());
while local.pop().is_some() {}
}
#[test]
+fn fits_256_all_at_once() {
+ let (_, mut local) = queue::local();
+
+ let mut tasks = (0..256)
+ .map(|_| super::unowned(async {}).0)
+ .collect::<Vec<_>>();
+ local.push_back(tasks.drain(..));
+
+ let mut i = 0;
+ while local.pop().is_some() {
+ i += 1;
+ }
+
+ assert_eq!(i, 256);
+}
+
+#[test]
+fn fits_256_all_in_chunks() {
+ let (_, mut local) = queue::local();
+
+ let mut tasks = (0..256)
+ .map(|_| super::unowned(async {}).0)
+ .collect::<Vec<_>>();
+
+ local.push_back(tasks.drain(..10));
+ local.push_back(tasks.drain(..100));
+ local.push_back(tasks.drain(..46));
+ local.push_back(tasks.drain(..100));
+
+ let mut i = 0;
+ while local.pop().is_some() {
+ i += 1;
+ }
+
+ assert_eq!(i, 256);
+}
+
+#[test]
fn overflow() {
let (_, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
for _ in 0..257 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
+ }
+
+ cfg_metrics! {
+ assert_metrics!(stats, overflow_count == 1);
}
let mut n = 0;
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
while local.pop().is_some() {
n += 1;
@@ -44,16 +112,22 @@ fn overflow() {
#[test]
fn steal_batch() {
+ let mut stats = new_stats();
+
let (steal1, mut local1) = queue::local();
let (_, mut local2) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
for _ in 0..4 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local1.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local1.push_back_or_overflow(task, &inject, &mut stats);
}
- assert!(steal1.steal_into(&mut local2).is_some());
+ assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
+
+ cfg_metrics! {
+ assert_metrics!(stats, steal_count == 2);
+ }
for _ in 0..1 {
assert!(local2.pop().is_some());
@@ -68,24 +142,35 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}
+const fn normal_or_miri(normal: usize, miri: usize) -> usize {
+ if cfg!(miri) {
+ miri
+ } else {
+ normal
+ }
+}
+
#[test]
fn stress1() {
- const NUM_ITER: usize = 1;
- const NUM_STEAL: usize = 1_000;
- const NUM_LOCAL: usize = 1_000;
- const NUM_PUSH: usize = 500;
- const NUM_POP: usize = 250;
+ const NUM_ITER: usize = 5;
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+ const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
+ const NUM_PUSH: usize = normal_or_miri(500, 10);
+ const NUM_POP: usize = normal_or_miri(250, 10);
+
+ let mut stats = new_stats();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -96,6 +181,10 @@ fn stress1() {
thread::yield_now();
}
+ cfg_metrics! {
+ assert_metrics!(stats, steal_count == n as _);
+ }
+
n
});
@@ -103,8 +192,8 @@ fn stress1() {
for _ in 0..NUM_LOCAL {
for _ in 0..NUM_PUSH {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
for _ in 0..NUM_POP {
@@ -116,9 +205,7 @@ fn stress1() {
}
}
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
n += th.join().unwrap();
@@ -129,19 +216,22 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
- const NUM_TASKS: usize = 1_000_000;
- const NUM_STEAL: usize = 1_000;
+ const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+
+ let mut stats = new_stats();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -158,16 +248,14 @@ fn stress2() {
let mut num_pop = 0;
for i in 0..NUM_TASKS {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
if i % 128 == 0 && local.pop().is_some() {
num_pop += 1;
}
- while inject.pop().is_some() {
- num_pop += 1;
- }
+ num_pop += inject.borrow_mut().drain(..).count();
}
num_pop += th.join().unwrap();
@@ -176,9 +264,7 @@ fn stress2() {
num_pop += 1;
}
- while inject.pop().is_some() {
- num_pop += 1;
- }
+ num_pop += inject.borrow_mut().drain(..).count();
assert_eq!(num_pop, NUM_TASKS);
}
@@ -187,11 +273,6 @@ fn stress2() {
struct Runtime;
impl Schedule for Runtime {
- fn bind(task: Task<Self>) -> Runtime {
- std::mem::forget(task);
- Runtime
- }
-
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}
diff --git a/vendor/tokio/src/runtime/tests/task.rs b/vendor/tokio/src/runtime/tests/task.rs
index 7c2012523..a79c0f50d 100644
--- a/vendor/tokio/src/runtime/tests/task.rs
+++ b/vendor/tokio/src/runtime/tests/task.rs
@@ -1,44 +1,235 @@
-use crate::runtime::task::{self, Schedule, Task};
-use crate::util::linked_list::{Link, LinkedList};
+use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;
use std::collections::VecDeque;
+use std::future::Future;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+struct AssertDropHandle {
+ is_dropped: Arc<AtomicBool>,
+}
+impl AssertDropHandle {
+ #[track_caller]
+ fn assert_dropped(&self) {
+ assert!(self.is_dropped.load(Ordering::SeqCst));
+ }
+
+ #[track_caller]
+ fn assert_not_dropped(&self) {
+ assert!(!self.is_dropped.load(Ordering::SeqCst));
+ }
+}
+
+struct AssertDrop {
+ is_dropped: Arc<AtomicBool>,
+}
+impl AssertDrop {
+ fn new() -> (Self, AssertDropHandle) {
+ let shared = Arc::new(AtomicBool::new(false));
+ (
+ AssertDrop {
+ is_dropped: shared.clone(),
+ },
+ AssertDropHandle {
+ is_dropped: shared.clone(),
+ },
+ )
+ }
+}
+impl Drop for AssertDrop {
+ fn drop(&mut self) {
+ self.is_dropped.store(true, Ordering::SeqCst);
+ }
+}
+
+// A Notified does not shut down on drop, but it is dropped once the ref-count
+// hits zero.
+#[test]
+fn create_drop1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(join);
+ handle.assert_dropped();
+}
+
#[test]
-fn create_drop() {
- let _ = super::joinable::<_, Runtime>(async { unreachable!() });
+fn create_drop2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(join);
+ handle.assert_not_dropped();
+ drop(notified);
+ handle.assert_dropped();
+}
+
+#[test]
+fn drop_abort_handle1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(join);
+ handle.assert_not_dropped();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_dropped();
+}
+
+#[test]
+fn drop_abort_handle2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_not_dropped();
+ drop(join);
+ handle.assert_dropped();
+}
+
+// Shutting down through Notified works
+#[test]
+fn create_shutdown1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(join);
+ handle.assert_not_dropped();
+ notified.shutdown();
+ handle.assert_dropped();
+}
+
+#[test]
+fn create_shutdown2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ handle.assert_not_dropped();
+ notified.shutdown();
+ handle.assert_dropped();
+ drop(join);
+}
+
+#[test]
+fn unowned_poll() {
+ let (task, _) = unowned(async {}, NoopSchedule, Id::next());
+ task.run();
}
#[test]
fn schedule() {
with(|rt| {
- let (task, _) = super::joinable(async {
+ rt.spawn(async {
crate::task::yield_now().await;
});
- rt.schedule(task);
-
assert_eq!(2, rt.tick());
+ rt.shutdown();
})
}
#[test]
fn shutdown() {
with(|rt| {
- let (task, _) = super::joinable(async {
+ rt.spawn(async {
loop {
crate::task::yield_now().await;
}
});
- rt.schedule(task);
rt.tick_max(1);
rt.shutdown();
})
}
+#[test]
+fn shutdown_immediately() {
+ with(|rt| {
+ rt.spawn(async {
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.shutdown();
+ })
+}
+
+#[test]
+fn spawn_during_shutdown() {
+ static DID_SPAWN: AtomicBool = AtomicBool::new(false);
+
+ struct SpawnOnDrop(Runtime);
+ impl Drop for SpawnOnDrop {
+ fn drop(&mut self) {
+ DID_SPAWN.store(true, Ordering::SeqCst);
+ self.0.spawn(async {});
+ }
+ }
+
+ with(|rt| {
+ let rt2 = rt.clone();
+ rt.spawn(async move {
+ let _spawn_on_drop = SpawnOnDrop(rt2);
+
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.tick_max(1);
+ rt.shutdown();
+ });
+
+ assert!(DID_SPAWN.load(Ordering::SeqCst));
+}
+
fn with(f: impl FnOnce(Runtime)) {
struct Reset;
@@ -51,10 +242,9 @@ fn with(f: impl FnOnce(Runtime)) {
let _reset = Reset;
let rt = Runtime(Arc::new(Inner {
- released: task::TransferStack::new(),
+ owned: OwnedTasks::new(),
core: TryLock::new(Core {
queue: VecDeque::new(),
- tasks: LinkedList::new(),
}),
}));
@@ -66,18 +256,31 @@ fn with(f: impl FnOnce(Runtime)) {
struct Runtime(Arc<Inner>);
struct Inner {
- released: task::TransferStack<Runtime>,
core: TryLock<Core>,
+ owned: OwnedTasks<Runtime>,
}
struct Core {
queue: VecDeque<task::Notified<Runtime>>,
- tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>,
}
static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
impl Runtime {
+ fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
+ where
+ T: 'static + Send + Future,
+ T::Output: 'static + Send,
+ {
+ let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
+
+ if let Some(notified) = notified {
+ self.schedule(notified);
+ }
+
+ handle
+ }
+
fn tick(&self) -> usize {
self.tick_max(usize::MAX)
}
@@ -88,11 +291,10 @@ impl Runtime {
while !self.is_empty() && n < max {
let task = self.next_task();
n += 1;
+ let task = self.0.owned.assert_owner(task);
task.run();
}
- self.0.maintenance();
-
n
}
@@ -107,50 +309,21 @@ impl Runtime {
fn shutdown(&self) {
let mut core = self.0.core.try_lock().unwrap();
- for task in core.tasks.iter() {
- task.shutdown();
- }
+ self.0.owned.close_and_shutdown_all();
while let Some(task) = core.queue.pop_back() {
- task.shutdown();
+ drop(task);
}
drop(core);
- while !self.0.core.try_lock().unwrap().tasks.is_empty() {
- self.0.maintenance();
- }
- }
-}
-
-impl Inner {
- fn maintenance(&self) {
- use std::mem::ManuallyDrop;
-
- for task in self.released.drain() {
- let task = ManuallyDrop::new(task);
-
- // safety: see worker.rs
- unsafe {
- let ptr = task.header().into();
- self.core.try_lock().unwrap().tasks.remove(ptr);
- }
- }
+ assert!(self.0.owned.is_empty());
}
}
impl Schedule for Runtime {
- fn bind(task: Task<Self>) -> Runtime {
- let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
- rt.0.core.try_lock().unwrap().tasks.push_front(task);
- rt
- }
-
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
- // safety: copying worker.rs
- let task = unsafe { Task::from_raw(task.header().into()) };
- self.0.released.push(task);
- None
+ self.0.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {
diff --git a/vendor/tokio/src/runtime/tests/task_combinations.rs b/vendor/tokio/src/runtime/tests/task_combinations.rs
index 76ce2330c..73a20d976 100644
--- a/vendor/tokio/src/runtime/tests/task_combinations.rs
+++ b/vendor/tokio/src/runtime/tests/task_combinations.rs
@@ -1,8 +1,10 @@
+use std::fmt;
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};
+use crate::runtime::task::AbortHandle;
use crate::runtime::Builder;
use crate::sync::oneshot;
use crate::task::JoinHandle;
@@ -56,6 +58,12 @@ enum CombiAbort {
AbortedAfterConsumeOutput = 4,
}
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum CombiAbortSource {
+ JoinHandle,
+ AbortHandle,
+}
+
#[test]
fn test_combinations() {
let mut rt = &[
@@ -90,6 +98,13 @@ fn test_combinations() {
CombiAbort::AbortedAfterFinish,
CombiAbort::AbortedAfterConsumeOutput,
];
+ let ah = [
+ None,
+ Some(CombiJoinHandle::DropImmediately),
+ Some(CombiJoinHandle::DropFirstPoll),
+ Some(CombiJoinHandle::DropAfterNoConsume),
+ Some(CombiJoinHandle::DropAfterConsume),
+ ];
for rt in rt.iter().copied() {
for ls in ls.iter().copied() {
@@ -98,7 +113,34 @@ fn test_combinations() {
for ji in ji.iter().copied() {
for jh in jh.iter().copied() {
for abort in abort.iter().copied() {
- test_combination(rt, ls, task, output, ji, jh, abort);
+ // abort via join handle --- abort handles
+ // may be dropped at any point
+ for ah in ah.iter().copied() {
+ test_combination(
+ rt,
+ ls,
+ task,
+ output,
+ ji,
+ jh,
+ ah,
+ abort,
+ CombiAbortSource::JoinHandle,
+ );
+ }
+ // if aborting via AbortHandle, it will
+ // never be dropped.
+ test_combination(
+ rt,
+ ls,
+ task,
+ output,
+ ji,
+ jh,
+ None,
+ abort,
+ CombiAbortSource::AbortHandle,
+ );
}
}
}
@@ -108,6 +150,9 @@ fn test_combinations() {
}
}
+fn is_debug<T: fmt::Debug>(_: &T) {}
+
+#[allow(clippy::too_many_arguments)]
fn test_combination(
rt: CombiRuntime,
ls: CombiLocalSet,
@@ -115,12 +160,24 @@ fn test_combination(
output: CombiOutput,
ji: CombiJoinInterest,
jh: CombiJoinHandle,
+ ah: Option<CombiJoinHandle>,
abort: CombiAbort,
+ abort_src: CombiAbortSource,
) {
- if (jh as usize) < (abort as usize) {
- // drop before abort not possible
- return;
+ match (abort_src, ah) {
+ (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
+ // join handle dropped prior to abort
+ return;
+ }
+ (CombiAbortSource::AbortHandle, Some(_)) => {
+ // abort handle dropped, we can't abort through the
+ // abort handle
+ return;
+ }
+
+ _ => {}
}
+
if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
// this causes double panic
return;
@@ -130,7 +187,15 @@ fn test_combination(
return;
}
- println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);
+ is_debug(&rt);
+ is_debug(&ls);
+ is_debug(&task);
+ is_debug(&output);
+ is_debug(&ji);
+ is_debug(&jh);
+ is_debug(&ah);
+ is_debug(&abort);
+ is_debug(&abort_src);
// A runtime optionally with a LocalSet
struct Rt {
@@ -282,8 +347,24 @@ fn test_combination(
);
}
+ // If we are either aborting the task via an abort handle, or dropping via
+ // an abort handle, do that now.
+ let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
+ handle.as_ref().map(JoinHandle::abort_handle)
+ } else {
+ None
+ };
+
+ let do_abort = |abort_handle: &mut Option<AbortHandle>,
+ join_handle: Option<&mut JoinHandle<_>>| {
+ match abort_src {
+ CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
+ CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
+ }
+ };
+
if abort == CombiAbort::AbortedImmediately {
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropImmediately {
@@ -301,12 +382,15 @@ fn test_combination(
}
if abort == CombiAbort::AbortedFirstPoll {
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropFirstPoll {
drop(handle.take().unwrap());
}
+ if ah == Some(CombiJoinHandle::DropFirstPoll) {
+ drop(abort_handle.take().unwrap());
+ }
// Signal the future that it can return now
let _ = on_complete.send(());
@@ -318,23 +402,42 @@ fn test_combination(
if abort == CombiAbort::AbortedAfterFinish {
// Don't set aborted to true here as the task already finished
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
}
if jh == CombiJoinHandle::DropAfterNoConsume {
- // The runtime will usually have dropped every ref-count at this point,
- // in which case dropping the JoinHandle drops the output.
- //
- // (But it might race and still hold a ref-count)
- let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
drop(handle.take().unwrap());
- }));
- if panic.is_err() {
- assert!(
- (output == CombiOutput::PanicOnDrop)
- && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
- && !aborted,
- "Dropping JoinHandle shouldn't panic here"
- );
+ // The runtime will usually have dropped every ref-count at this point,
+ // in which case dropping the AbortHandle drops the output.
+ //
+ // (But it might race and still hold a ref-count)
+ let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ drop(abort_handle.take().unwrap());
+ }));
+ if panic.is_err() {
+ assert!(
+ (output == CombiOutput::PanicOnDrop)
+ && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
+ && !aborted,
+ "Dropping AbortHandle shouldn't panic here"
+ );
+ }
+ } else {
+ // The runtime will usually have dropped every ref-count at this point,
+ // in which case dropping the JoinHandle drops the output.
+ //
+ // (But it might race and still hold a ref-count)
+ let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ drop(handle.take().unwrap());
+ }));
+ if panic.is_err() {
+ assert!(
+ (output == CombiOutput::PanicOnDrop)
+ && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
+ && !aborted,
+ "Dropping JoinHandle shouldn't panic here"
+ );
+ }
}
}
@@ -362,11 +465,15 @@ fn test_combination(
_ => unreachable!(),
}
- let handle = handle.take().unwrap();
+ let mut handle = handle.take().unwrap();
if abort == CombiAbort::AbortedAfterConsumeOutput {
- handle.abort();
+ do_abort(&mut abort_handle, Some(&mut handle));
}
drop(handle);
+
+ if ah == Some(CombiJoinHandle::DropAfterConsume) {
+ drop(abort_handle.take());
+ }
}
// The output should have been dropped now. Check whether the output