From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- vendor/rayon-core/src/thread_pool/test.rs | 368 ++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100644 vendor/rayon-core/src/thread_pool/test.rs (limited to 'vendor/rayon-core/src/thread_pool/test.rs') diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs new file mode 100644 index 000000000..8de65a5e4 --- /dev/null +++ b/vendor/rayon-core/src/thread_pool/test.rs @@ -0,0 +1,368 @@ +#![cfg(test)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; + +#[allow(deprecated)] +use crate::Configuration; +use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + thread_pool.install(|| { + panic!("Hello, world!"); + }); +} + +#[test] +fn workers_stop() { + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = thread_pool.install(|| { + // do some work on these threads + join_a_lot(22); + + Arc::clone(&thread_pool.registry) + }); + assert_eq!(registry.num_threads(), 22); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +fn join_a_lot(n: usize) { + if n > 0 { + join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); + } +} + +#[test] +fn sleeper_stop() { + use std::{thread, time}; + + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = Arc::clone(&thread_pool.registry); + + // Give time for at least some of the thread pool to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +/// Creates a start/exit handler that increments an atomic counter. +fn count_handler() -> (Arc, impl Fn(usize)) { + let count = Arc::new(AtomicUsize::new(0)); + (Arc::clone(&count), move |_| { + count.fetch_add(1, Ordering::SeqCst); + }) +} + +/// Wait until a counter is no longer shared, then return its value. +fn wait_for_counter(mut counter: Arc) -> usize { + use std::{thread, time}; + + for _ in 0..60 { + counter = match Arc::try_unwrap(counter) { + Ok(counter) => return counter.into_inner(), + Err(counter) => { + thread::sleep(time::Duration::from_secs(1)); + counter + } + }; + } + + // That's too long! + panic!("Counter is still shared!"); +} + +#[test] +fn failed_thread_stack() { + // Note: we first tried to force failure with a `usize::MAX` stack, but + // macOS and Windows weren't fazed, or at least didn't fail the way we want. + // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a + // 2GB stack, so it might not fail until the second thread. + let stack_size = ::std::isize::MAX as usize; + + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .stack_size(stack_size) + .start_handler(start_handler) + .exit_handler(exit_handler); + + let pool = builder.build(); + assert!(pool.is_err(), "thread stack should have failed!"); + + // With such a huge stack, 64-bit will probably fail on the first thread; + // 32-bit might manage the first 2GB, but certainly fail the second. + let start_count = wait_for_counter(start_count); + assert!(start_count <= 1); + assert_eq!(start_count, wait_for_counter(exit_count)); +} + +#[test] +fn panic_thread_name() { + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .start_handler(start_handler) + .exit_handler(exit_handler) + .thread_name(|i| { + if i >= 5 { + panic!(); + } + format!("panic_thread_name#{}", i) + }); + + let pool = crate::unwind::halt_unwinding(|| builder.build()); + assert!(pool.is_err(), "thread-name panic should propagate!"); + + // Assuming they're created in order, threads 0 through 4 should have + // been started already, and then terminated by the panic. + assert_eq!(5, wait_for_counter(start_count)); + assert_eq!(5, wait_for_counter(exit_count)); +} + +#[test] +fn self_install() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + // If the inner `install` blocks, then nothing will actually run it! + assert!(pool.install(|| pool.install(|| true))); +} + +#[test] +fn mutual_install() { + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +fn mutual_install_sleepy() { + use std::{thread, time}; + + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // Give `pool1` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // Give `pool2` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +#[allow(deprecated)] +fn check_thread_pool_new() { + let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap(); + assert_eq!(pool.current_num_threads(), 22); +} + +macro_rules! test_scope_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + pool.$scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn scope_lifo_order() { + let vec = test_scope_order!(scope => spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn scope_fifo_order() { + let vec = test_scope_order!(scope_fifo => spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_spawn_order { + ($spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = &builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + pool.$spawn(move || { + tx.send(i).unwrap(); + }); + } + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn spawn_lifo_order() { + let vec = test_spawn_order!(spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn spawn_fifo_order() { + let vec = test_spawn_order!(spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn nested_scopes() { + // Create matching scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) + where + OP: FnOnce(&[&Scope<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +fn nested_fifo_scopes() { + // Create matching fifo scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) + where + OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope_fifo(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn_fifo(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +fn in_place_scope_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} + +#[test] +fn in_place_scope_fifo_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope_fifo(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn_fifo(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} -- cgit v1.2.3