summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/tests/queue.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/tests/queue.rs
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/tests/queue.rs')
-rw-r--r--vendor/tokio/src/runtime/tests/queue.rs171
1 files changed, 126 insertions, 45 deletions
diff --git a/vendor/tokio/src/runtime/tests/queue.rs b/vendor/tokio/src/runtime/tests/queue.rs
index b2962f154..5df92b7a2 100644
--- a/vendor/tokio/src/runtime/tests/queue.rs
+++ b/vendor/tokio/src/runtime/tests/queue.rs
@@ -1,39 +1,107 @@
-use crate::runtime::queue;
+use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::task::{self, Schedule, Task};
+use std::cell::RefCell;
use std::thread;
use std::time::Duration;
+#[allow(unused)]
+macro_rules! assert_metrics {
+ ($stats:ident, $field:ident == $v:expr) => {{
+ use crate::runtime::WorkerMetrics;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ let worker = WorkerMetrics::new();
+ $stats.submit(&worker);
+
+ let expect = $v;
+ let actual = worker.$field.load(Relaxed);
+
+ assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
+ }};
+}
+
+fn new_stats() -> Stats {
+ use crate::runtime::WorkerMetrics;
+ Stats::new(&WorkerMetrics::new())
+}
+
#[test]
-fn fits_256() {
+fn fits_256_one_at_a_time() {
let (_, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
for _ in 0..256 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
- assert!(inject.pop().is_none());
+ cfg_metrics! {
+ assert_metrics!(stats, overflow_count == 0);
+ }
+
+ assert!(inject.borrow_mut().pop().is_none());
while local.pop().is_some() {}
}
#[test]
+fn fits_256_all_at_once() {
+ let (_, mut local) = queue::local();
+
+ let mut tasks = (0..256)
+ .map(|_| super::unowned(async {}).0)
+ .collect::<Vec<_>>();
+ local.push_back(tasks.drain(..));
+
+ let mut i = 0;
+ while local.pop().is_some() {
+ i += 1;
+ }
+
+ assert_eq!(i, 256);
+}
+
+#[test]
+fn fits_256_all_in_chunks() {
+ let (_, mut local) = queue::local();
+
+ let mut tasks = (0..256)
+ .map(|_| super::unowned(async {}).0)
+ .collect::<Vec<_>>();
+
+ local.push_back(tasks.drain(..10));
+ local.push_back(tasks.drain(..100));
+ local.push_back(tasks.drain(..46));
+ local.push_back(tasks.drain(..100));
+
+ let mut i = 0;
+ while local.pop().is_some() {
+ i += 1;
+ }
+
+ assert_eq!(i, 256);
+}
+
+#[test]
fn overflow() {
let (_, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
+ let mut stats = new_stats();
for _ in 0..257 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
+ }
+
+ cfg_metrics! {
+ assert_metrics!(stats, overflow_count == 1);
}
let mut n = 0;
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
while local.pop().is_some() {
n += 1;
@@ -44,16 +112,22 @@ fn overflow() {
#[test]
fn steal_batch() {
+ let mut stats = new_stats();
+
let (steal1, mut local1) = queue::local();
let (_, mut local2) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
for _ in 0..4 {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local1.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local1.push_back_or_overflow(task, &inject, &mut stats);
}
- assert!(steal1.steal_into(&mut local2).is_some());
+ assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
+
+ cfg_metrics! {
+ assert_metrics!(stats, steal_count == 2);
+ }
for _ in 0..1 {
assert!(local2.pop().is_some());
@@ -68,24 +142,35 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}
+const fn normal_or_miri(normal: usize, miri: usize) -> usize {
+ if cfg!(miri) {
+ miri
+ } else {
+ normal
+ }
+}
+
#[test]
fn stress1() {
- const NUM_ITER: usize = 1;
- const NUM_STEAL: usize = 1_000;
- const NUM_LOCAL: usize = 1_000;
- const NUM_PUSH: usize = 500;
- const NUM_POP: usize = 250;
+ const NUM_ITER: usize = 5;
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+ const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
+ const NUM_PUSH: usize = normal_or_miri(500, 10);
+ const NUM_POP: usize = normal_or_miri(250, 10);
+
+ let mut stats = new_stats();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -96,6 +181,10 @@ fn stress1() {
thread::yield_now();
}
+ cfg_metrics! {
+ assert_metrics!(stats, steal_count == n as _);
+ }
+
n
});
@@ -103,8 +192,8 @@ fn stress1() {
for _ in 0..NUM_LOCAL {
for _ in 0..NUM_PUSH {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
}
for _ in 0..NUM_POP {
@@ -116,9 +205,7 @@ fn stress1() {
}
}
- while inject.pop().is_some() {
- n += 1;
- }
+ n += inject.borrow_mut().drain(..).count();
n += th.join().unwrap();
@@ -129,19 +216,22 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
- const NUM_TASKS: usize = 1_000_000;
- const NUM_STEAL: usize = 1_000;
+ const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+
+ let mut stats = new_stats();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
- let inject = queue::Inject::new();
+ let inject = RefCell::new(vec![]);
let th = thread::spawn(move || {
+ let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -158,16 +248,14 @@ fn stress2() {
let mut num_pop = 0;
for i in 0..NUM_TASKS {
- let (task, _) = super::joinable::<_, Runtime>(async {});
- local.push_back(task, &inject);
+ let (task, _) = super::unowned(async {});
+ local.push_back_or_overflow(task, &inject, &mut stats);
if i % 128 == 0 && local.pop().is_some() {
num_pop += 1;
}
- while inject.pop().is_some() {
- num_pop += 1;
- }
+ num_pop += inject.borrow_mut().drain(..).count();
}
num_pop += th.join().unwrap();
@@ -176,9 +264,7 @@ fn stress2() {
num_pop += 1;
}
- while inject.pop().is_some() {
- num_pop += 1;
- }
+ num_pop += inject.borrow_mut().drain(..).count();
assert_eq!(num_pop, NUM_TASKS);
}
@@ -187,11 +273,6 @@ fn stress2() {
struct Runtime;
impl Schedule for Runtime {
- fn bind(task: Task<Self>) -> Runtime {
- std::mem::forget(task);
- Runtime
- }
-
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}