summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src/thread_pool
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 02:49:50 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 02:49:50 +0000
commit9835e2ae736235810b4ea1c162ca5e65c547e770 (patch)
tree3fcebf40ed70e581d776a8a4c65923e8ec20e026 /vendor/rayon-core/src/thread_pool
parentReleasing progress-linux version 1.70.0+dfsg2-1~progress7.99u1. (diff)
downloadrustc-9835e2ae736235810b4ea1c162ca5e65c547e770.tar.xz
rustc-9835e2ae736235810b4ea1c162ca5e65c547e770.zip
Merging upstream version 1.71.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src/thread_pool')
-rw-r--r--vendor/rayon-core/src/thread_pool/mod.rs69
-rw-r--r--vendor/rayon-core/src/thread_pool/test.rs52
2 files changed, 121 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs
index 0fc06dd6b..c37826ef5 100644
--- a/vendor/rayon-core/src/thread_pool/mod.rs
+++ b/vendor/rayon-core/src/thread_pool/mod.rs
@@ -339,6 +339,30 @@ impl ThreadPool {
// We assert that `self.registry` has not terminated.
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
}
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
}
impl Drop for ThreadPool {
@@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
Some(!curr.local_deque_is_empty())
}
}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs
index ac750a6dc..6143e5799 100644
--- a/vendor/rayon-core/src/thread_pool/test.rs
+++ b/vendor/rayon-core/src/thread_pool/test.rs
@@ -16,6 +16,7 @@ fn panic_propagate() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
@@ -43,6 +44,7 @@ fn join_a_lot(n: usize) {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
@@ -89,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
// Note: we first tried to force failure with a `usize::MAX` stack, but
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
@@ -115,6 +118,7 @@ fn failed_thread_stack() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
@@ -139,6 +143,7 @@ fn panic_thread_name() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -147,6 +152,7 @@ fn self_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -166,6 +172,7 @@ fn mutual_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
@@ -194,6 +201,7 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -219,6 +227,7 @@ macro_rules! test_scope_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -226,6 +235,7 @@ fn scope_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -250,6 +260,7 @@ macro_rules! test_spawn_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -257,6 +268,7 @@ fn spawn_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -264,6 +276,7 @@ fn spawn_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
@@ -300,6 +313,7 @@ fn nested_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
@@ -336,6 +350,7 @@ fn nested_fifo_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -351,6 +366,7 @@ fn in_place_scope_no_deadlock() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -364,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
rx_ref.recv().unwrap();
});
}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}