diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 02:49:50 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 02:49:50 +0000 |
commit | 9835e2ae736235810b4ea1c162ca5e65c547e770 (patch) | |
tree | 3fcebf40ed70e581d776a8a4c65923e8ec20e026 /vendor/rayon-core/src/thread_pool | |
parent | Releasing progress-linux version 1.70.0+dfsg2-1~progress7.99u1. (diff) | |
download | rustc-9835e2ae736235810b4ea1c162ca5e65c547e770.tar.xz rustc-9835e2ae736235810b4ea1c162ca5e65c547e770.zip |
Merging upstream version 1.71.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src/thread_pool')
-rw-r--r-- | vendor/rayon-core/src/thread_pool/mod.rs | 69 | ||||
-rw-r--r-- | vendor/rayon-core/src/thread_pool/test.rs | 52 |
2 files changed, 121 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs index 0fc06dd6b..c37826ef5 100644 --- a/vendor/rayon-core/src/thread_pool/mod.rs +++ b/vendor/rayon-core/src/thread_pool/mod.rs @@ -339,6 +339,30 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } } + + /// Cooperatively yields execution to Rayon. + /// + /// This is similar to the general [`yield_now()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_now(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_now()) + } + + /// Cooperatively yields execution to local Rayon work. + /// + /// This is similar to the general [`yield_local()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_local(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_local()) + } } impl Drop for ThreadPool { @@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> { Some(!curr.local_deque_is_empty()) } } + +/// Cooperatively yields execution to Rayon. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in the pool, then executes it. Completion of +/// that work might include nested work or further work stealing. +/// +/// This is similar to [`std::thread::yield_now()`], but does not literally make +/// that call. If you are implementing a polling loop, you may want to also +/// yield to the OS scheduler yourself if no Rayon work was found. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_now() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_now()) + } +} + +/// Cooperatively yields execution to local Rayon work. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in this thread's queue, then executes it. +/// Completion of that work might include nested work or further work stealing. +/// +/// This is similar to [`yield_now()`], but does not steal from other threads. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_local() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_local()) + } +} + +/// Result of [`yield_now()`] or [`yield_local()`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Yield { + /// Work was found and executed. + Executed, + /// No available work was found. + Idle, +} diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs index ac750a6dc..6143e5799 100644 --- a/vendor/rayon-core/src/thread_pool/test.rs +++ b/vendor/rayon-core/src/thread_pool/test.rs @@ -16,6 +16,7 @@ fn panic_propagate() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn workers_stop() { let registry; @@ -43,6 +44,7 @@ fn join_a_lot(n: usize) { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sleeper_stop() { use std::{thread, time}; @@ -89,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn failed_thread_stack() { // Note: we first tried to force failure with a `usize::MAX` stack, but // macOS and Windows weren't fazed, or at least didn't fail the way we want. @@ -115,6 +118,7 @@ fn failed_thread_stack() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_thread_name() { let (start_count, start_handler) = count_handler(); let (exit_count, exit_handler) = count_handler(); @@ -139,6 +143,7 @@ fn panic_thread_name() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn self_install() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); @@ -147,6 +152,7 @@ fn self_install() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install() { let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); @@ -166,6 +172,7 @@ fn mutual_install() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install_sleepy() { use std::{thread, time}; @@ -194,6 +201,7 @@ fn mutual_install_sleepy() { #[test] #[allow(deprecated)] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn check_thread_pool_new() { let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); assert_eq!(pool.current_num_threads(), 22); @@ -219,6 +227,7 @@ macro_rules! test_scope_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_lifo_order() { let vec = test_scope_order!(scope => spawn); let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed @@ -226,6 +235,7 @@ fn scope_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_fifo_order() { let vec = test_scope_order!(scope_fifo => spawn_fifo); let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order @@ -250,6 +260,7 @@ macro_rules! test_spawn_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_lifo_order() { let vec = test_spawn_order!(spawn); let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed @@ -257,6 +268,7 @@ fn spawn_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_fifo_order() { let vec = test_spawn_order!(spawn_fifo); let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order @@ -264,6 +276,7 @@ fn spawn_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_scopes() { // Create matching scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) @@ -300,6 +313,7 @@ fn nested_scopes() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_fifo_scopes() { // Create matching fifo scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) @@ -336,6 +350,7 @@ fn nested_fifo_scopes() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); @@ -351,6 +366,7 @@ fn in_place_scope_no_deadlock() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_fifo_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); @@ -364,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() { rx_ref.recv().unwrap(); }); } + +#[test] +fn yield_now_to_spawn() { + let (tx, rx) = crossbeam_channel::bounded(1); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_now(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +fn yield_local_to_spawn() { + let (tx, rx) = crossbeam_channel::bounded(1); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_local(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} |