diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/tests/queue.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/tests/queue.rs | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/tests/queue.rs b/third_party/rust/tokio/src/runtime/tests/queue.rs new file mode 100644 index 0000000000..0fd1e0c6d9 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/queue.rs @@ -0,0 +1,248 @@ +use crate::runtime::queue; +use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::MetricsBatch; + +use std::thread; +use std::time::Duration; + +#[allow(unused)] +macro_rules! assert_metrics { + ($metrics:ident, $field:ident == $v:expr) => {{ + use crate::runtime::WorkerMetrics; + use std::sync::atomic::Ordering::Relaxed; + + let worker = WorkerMetrics::new(); + $metrics.submit(&worker); + + let expect = $v; + let actual = worker.$field.load(Relaxed); + + assert!(actual == expect, "expect = {}; actual = {}", expect, actual) + }}; +} + +#[test] +fn fits_256() { + let (_, mut local) = queue::local(); + let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); + + for _ in 0..256 { + let (task, _) = super::unowned(async {}); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 0); + } + + assert!(inject.pop().is_none()); + + while local.pop().is_some() {} +} + +#[test] +fn overflow() { + let (_, mut local) = queue::local(); + let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); + + for _ in 0..257 { + let (task, _) = super::unowned(async {}); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 1); + } + + let mut n = 0; + + while inject.pop().is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + assert_eq!(n, 257); +} + +#[test] +fn steal_batch() { + let mut metrics = MetricsBatch::new(); + + let (steal1, mut local1) = queue::local(); + let (_, mut local2) = queue::local(); + let inject = Inject::new(); + + for _ in 0..4 { + let (task, _) = super::unowned(async {}); + local1.push_back(task, &inject, &mut metrics); + } + + assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + + cfg_metrics! { + assert_metrics!(metrics, steal_count == 2); + } + + for _ in 0..1 { + assert!(local2.pop().is_some()); + } + + assert!(local2.pop().is_none()); + + for _ in 0..2 { + assert!(local1.pop().is_some()); + } + + assert!(local1.pop().is_none()); +} + +const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } +} + +#[test] +fn stress1() { + const NUM_ITER: usize = 1; + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + const NUM_LOCAL: usize = normal_or_miri(1_000, 10); + const NUM_PUSH: usize = normal_or_miri(500, 10); + const NUM_POP: usize = normal_or_miri(250, 10); + + let mut metrics = MetricsBatch::new(); + + for _ in 0..NUM_ITER { + let (steal, mut local) = queue::local(); + let inject = Inject::new(); + + let th = thread::spawn(move || { + let mut metrics = MetricsBatch::new(); + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..NUM_STEAL { + if steal.steal_into(&mut local, &mut metrics).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + thread::yield_now(); + } + + cfg_metrics! { + assert_metrics!(metrics, steal_count == n as _); + } + + n + }); + + let mut n = 0; + + for _ in 0..NUM_LOCAL { + for _ in 0..NUM_PUSH { + let (task, _) = super::unowned(async {}); + local.push_back(task, &inject, &mut metrics); + } + + for _ in 0..NUM_POP { + if local.pop().is_some() { + n += 1; + } else { + break; + } + } + } + + while inject.pop().is_some() { + n += 1; + } + + n += th.join().unwrap(); + + assert_eq!(n, NUM_LOCAL * NUM_PUSH); + } +} + +#[test] +fn stress2() { + const NUM_ITER: usize = 1; + const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + + let mut metrics = MetricsBatch::new(); + + for _ in 0..NUM_ITER { + let (steal, mut local) = queue::local(); + let inject = Inject::new(); + + let th = thread::spawn(move || { + let mut stats = MetricsBatch::new(); + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..NUM_STEAL { + if steal.steal_into(&mut local, &mut stats).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + thread::sleep(Duration::from_micros(10)); + } + + n + }); + + let mut num_pop = 0; + + for i in 0..NUM_TASKS { + let (task, _) = super::unowned(async {}); + local.push_back(task, &inject, &mut metrics); + + if i % 128 == 0 && local.pop().is_some() { + num_pop += 1; + } + + while inject.pop().is_some() { + num_pop += 1; + } + } + + num_pop += th.join().unwrap(); + + while local.pop().is_some() { + num_pop += 1; + } + + while inject.pop().is_some() { + num_pop += 1; + } + + assert_eq!(num_pop, NUM_TASKS); + } +} + +struct Runtime; + +impl Schedule for Runtime { + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } +} |