summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 02:49:50 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 02:49:50 +0000
commit9835e2ae736235810b4ea1c162ca5e65c547e770 (patch)
tree3fcebf40ed70e581d776a8a4c65923e8ec20e026 /vendor/rayon-core/src
parentReleasing progress-linux version 1.70.0+dfsg2-1~progress7.99u1. (diff)
downloadrustc-9835e2ae736235810b4ea1c162ca5e65c547e770.tar.xz
rustc-9835e2ae736235810b4ea1c162ca5e65c547e770.zip
Merging upstream version 1.71.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src')
-rw-r--r--vendor/rayon-core/src/broadcast/mod.rs5
-rw-r--r--[-rwxr-xr-x]vendor/rayon-core/src/broadcast/test.rs46
-rw-r--r--vendor/rayon-core/src/job.rs10
-rw-r--r--vendor/rayon-core/src/join/mod.rs3
-rw-r--r--vendor/rayon-core/src/join/test.rs6
-rw-r--r--vendor/rayon-core/src/latch.rs90
-rw-r--r--vendor/rayon-core/src/lib.rs24
-rw-r--r--vendor/rayon-core/src/registry.rs155
-rw-r--r--vendor/rayon-core/src/scope/mod.rs72
-rw-r--r--vendor/rayon-core/src/scope/test.rs18
-rw-r--r--vendor/rayon-core/src/spawn/mod.rs2
-rw-r--r--vendor/rayon-core/src/spawn/test.rs12
-rw-r--r--vendor/rayon-core/src/test.rs8
-rw-r--r--vendor/rayon-core/src/thread_pool/mod.rs69
-rw-r--r--vendor/rayon-core/src/thread_pool/test.rs52
15 files changed, 457 insertions, 115 deletions
diff --git a/vendor/rayon-core/src/broadcast/mod.rs b/vendor/rayon-core/src/broadcast/mod.rs
index 452aa71b6..d991c5461 100644
--- a/vendor/rayon-core/src/broadcast/mod.rs
+++ b/vendor/rayon-core/src/broadcast/mod.rs
@@ -1,4 +1,5 @@
use crate::job::{ArcJob, StackJob};
+use crate::latch::LatchRef;
use crate::registry::{Registry, WorkerThread};
use crate::scope::ScopeLatch;
use std::fmt;
@@ -107,7 +108,9 @@ where
let n_threads = registry.num_threads();
let current_thread = WorkerThread::current().as_ref();
let latch = ScopeLatch::with_count(n_threads, current_thread);
- let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect();
+ let jobs: Vec<_> = (0..n_threads)
+ .map(|_| StackJob::new(&f, LatchRef::new(&latch)))
+ .collect();
let job_refs = jobs.iter().map(|job| job.as_job_ref());
registry.inject_broadcast(job_refs);
diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs
index a765cb034..3ae11f7f6 100755..100644
--- a/vendor/rayon-core/src/broadcast/test.rs
+++ b/vendor/rayon-core/src/broadcast/test.rs
@@ -12,6 +12,7 @@ fn broadcast_global() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
let (tx, rx) = crossbeam_channel::unbounded();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@@ -22,6 +23,7 @@ fn spawn_broadcast_global() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_pool() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.broadcast(|ctx| ctx.index());
@@ -29,6 +31,7 @@ fn broadcast_pool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -40,6 +43,7 @@ fn spawn_broadcast_pool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_self() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
@@ -47,6 +51,7 @@ fn broadcast_self() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -58,6 +63,7 @@ fn spawn_broadcast_self() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -73,6 +79,7 @@ fn broadcast_mutual() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual_sleepy() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -146,6 +156,7 @@ fn broadcast_panic_one() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -182,6 +194,7 @@ fn broadcast_panic_many() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_sleep_race() {
let test_duration = time::Duration::from_secs(1);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -214,3 +228,35 @@ fn broadcast_sleep_race() {
});
}
}
+
+#[test]
+fn broadcast_after_spawn_broadcast() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+
+ // Queue a non-blocking spawn_broadcast.
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ // This blocking broadcast runs after all prior broadcasts.
+ crate::broadcast(|_| {});
+
+ // The spawn_broadcast **must** have run by now on all threads.
+ let mut v: Vec<_> = rx.try_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_after_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn on a thread-local deque.
+ crate::registry::in_worker(move |_, _| {
+ crate::spawn(move || tx.send(22).unwrap());
+ });
+
+ // Broadcast runs after the local deque is empty.
+ crate::broadcast(|_| {});
+
+ // The spawn **must** have run by now.
+ assert_eq!(22, rx.try_recv().unwrap());
+}
diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs
index b7a3dae18..5664bb385 100644
--- a/vendor/rayon-core/src/job.rs
+++ b/vendor/rayon-core/src/job.rs
@@ -30,7 +30,6 @@ pub(super) trait Job {
/// Internally, we store the job's data in a `*const ()` pointer. The
/// true type is something like `*const StackJob<...>`, but we hide
/// it. We also carry the "execute fn" from the `Job` trait.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) struct JobRef {
pointer: *const (),
execute_fn: unsafe fn(*const ()),
@@ -53,6 +52,13 @@ impl JobRef {
}
}
+ /// Returns an opaque handle that can be saved and compared,
+ /// without making `JobRef` itself `Copy + Eq`.
+ #[inline]
+ pub(super) fn id(&self) -> impl Eq {
+ (self.pointer, self.execute_fn)
+ }
+
#[inline]
pub(super) unsafe fn execute(self) {
(self.execute_fn)(self.pointer)
@@ -112,7 +118,7 @@ where
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
(*this.result.get()) = JobResult::call(func);
- this.latch.set();
+ Latch::set(&this.latch);
mem::forget(abort);
}
}
diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs
index d72c7e61c..5ab9f6b32 100644
--- a/vendor/rayon-core/src/join/mod.rs
+++ b/vendor/rayon-core/src/join/mod.rs
@@ -135,6 +135,7 @@ where
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
+ let job_b_id = job_b_ref.id();
worker_thread.push(job_b_ref);
// Execute task a; hopefully b gets stolen in the meantime.
@@ -151,7 +152,7 @@ where
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
- if job == job_b_ref {
+ if job_b_id == job.id() {
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs
index e7f287f6f..b303dbc81 100644
--- a/vendor/rayon-core/src/join/test.rs
+++ b/vendor/rayon-core/src/join/test.rs
@@ -47,6 +47,7 @@ fn sort() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
@@ -77,6 +78,7 @@ fn panic_propagate_both() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
@@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
@@ -94,6 +97,7 @@ fn join_context_both() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -104,6 +108,7 @@ fn join_context_neither() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;
@@ -127,6 +132,7 @@ fn join_context_second() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;
diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs
index 090929374..de4327234 100644
--- a/vendor/rayon-core/src/latch.rs
+++ b/vendor/rayon-core/src/latch.rs
@@ -1,3 +1,5 @@
+use std::marker::PhantomData;
+use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;
@@ -37,10 +39,15 @@ pub(super) trait Latch {
///
/// Setting a latch triggers other threads to wake up and (in some
/// cases) complete. This may, in turn, cause memory to be
- /// allocated and so forth. One must be very careful about this,
+ /// deallocated and so forth. One must be very careful about this,
/// and it's typically better to read all the fields you will need
/// to access *before* a latch is set!
- fn set(&self);
+ ///
+ /// This function operates on `*const Self` instead of `&self` to allow it
+ /// to become dangling during this call. The caller must ensure that the
+ /// pointer is valid upon entry, and not invalidated during the call by any
+ /// actions other than `set` itself.
+ unsafe fn set(this: *const Self);
}
pub(super) trait AsCoreLatch {
@@ -123,8 +130,8 @@ impl CoreLatch {
/// doing some wakeups; those are encapsulated in the surrounding
/// latch code.
#[inline]
- fn set(&self) -> bool {
- let old_state = self.state.swap(SET, Ordering::AcqRel);
+ unsafe fn set(this: *const Self) -> bool {
+ let old_state = (*this).state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}
@@ -186,16 +193,16 @@ impl<'r> AsCoreLatch for SpinLatch<'r> {
impl<'r> Latch for SpinLatch<'r> {
#[inline]
- fn set(&self) {
+ unsafe fn set(this: *const Self) {
let cross_registry;
- let registry: &Registry = if self.cross {
+ let registry: &Registry = if (*this).cross {
// Ensure the registry stays alive while we notify it.
// Otherwise, it would be possible that we set the spin
// latch and the other thread sees it and exits, causing
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
- cross_registry = Arc::clone(self.registry);
+ cross_registry = Arc::clone((*this).registry);
&cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
@@ -203,12 +210,12 @@ impl<'r> Latch for SpinLatch<'r> {
// that the registry stays alive. However, that doesn't
// include this *particular* `Arc` handle if the waiting
// thread then exits, so we must completely dereference it.
- self.registry
+ (*this).registry
};
- let target_worker_index = self.target_worker_index;
+ let target_worker_index = (*this).target_worker_index;
- // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
- if self.core_latch.set() {
+ // NOTE: Once we `set`, the target may proceed and invalidate `this`!
+ if CoreLatch::set(&(*this).core_latch) {
// Subtle: at this point, we can no longer read from
// `self`, because the thread owning this spin latch may
// have awoken and deallocated the latch. Therefore, we
@@ -255,10 +262,10 @@ impl LockLatch {
impl Latch for LockLatch {
#[inline]
- fn set(&self) {
- let mut guard = self.m.lock().unwrap();
+ unsafe fn set(this: *const Self) {
+ let mut guard = (*this).m.lock().unwrap();
*guard = true;
- self.v.notify_all();
+ (*this).v.notify_all();
}
}
@@ -307,9 +314,9 @@ impl CountLatch {
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
- pub(super) fn set(&self) -> bool {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.core_latch.set();
+ pub(super) unsafe fn set(this: *const Self) -> bool {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ CoreLatch::set(&(*this).core_latch);
true
} else {
false
@@ -320,8 +327,12 @@ impl CountLatch {
/// the latch is set, then the specific worker thread is tickled,
/// which should be the one that owns this latch.
#[inline]
- pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
- if self.set() {
+ pub(super) unsafe fn set_and_tickle_one(
+ this: *const Self,
+ registry: &Registry,
+ target_worker_index: usize,
+ ) {
+ if Self::set(this) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
@@ -362,19 +373,42 @@ impl CountLockLatch {
impl Latch for CountLockLatch {
#[inline]
- fn set(&self) {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.lock_latch.set();
+ unsafe fn set(this: *const Self) {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ LockLatch::set(&(*this).lock_latch);
}
}
}
-impl<'a, L> Latch for &'a L
-where
- L: Latch,
-{
+/// `&L` without any implication of `dereferenceable` for `Latch::set`
+pub(super) struct LatchRef<'a, L> {
+ inner: *const L,
+ marker: PhantomData<&'a L>,
+}
+
+impl<L> LatchRef<'_, L> {
+ pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
+ LatchRef {
+ inner,
+ marker: PhantomData,
+ }
+ }
+}
+
+unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
+
+impl<L> Deref for LatchRef<'_, L> {
+ type Target = L;
+
+ fn deref(&self) -> &L {
+ // SAFETY: if we have &self, the inner latch is still alive
+ unsafe { &*self.inner }
+ }
+}
+
+impl<L: Latch> Latch for LatchRef<'_, L> {
#[inline]
- fn set(&self) {
- L::set(self);
+ unsafe fn set(this: *const Self) {
+ L::set((*this).inner);
}
}
diff --git a/vendor/rayon-core/src/lib.rs b/vendor/rayon-core/src/lib.rs
index b31a2d7e0..c9694ee16 100644
--- a/vendor/rayon-core/src/lib.rs
+++ b/vendor/rayon-core/src/lib.rs
@@ -26,7 +26,24 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
-//! ## Restricting multiple versions
+//! # Global fallback when threading is unsupported
+//!
+//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
+//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
+//! targets are notable examples of this. Rather than panicking on the unsupported error when
+//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
+//!
+//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
+//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
+//! there is no other thread to share the work. However, since the pool is not running independent
+//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
+//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
+//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
+//! can also volunteer execution time.
+//!
+//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
+//!
+//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
@@ -85,6 +102,7 @@ pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
+pub use self::thread_pool::{yield_local, yield_now, Yield};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
@@ -707,6 +725,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}
+
+ fn is_unsupported(&self) -> bool {
+ matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
+ }
}
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs
index 279e298d2..5d56ac927 100644
--- a/vendor/rayon-core/src/registry.rs
+++ b/vendor/rayon-core/src/registry.rs
@@ -1,11 +1,12 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+ Yield,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::cell::Cell;
@@ -50,7 +51,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
+ unsafe { main_loop(self) }
}
}
@@ -164,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -198,6 +199,46 @@ where
result
}
+fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
+ let result = Registry::new(ThreadPoolBuilder::new());
+
+ // If we're running in an environment that doesn't support threads at all, we can fall back to
+ // using the current thread alone. This is crude, and probably won't work for non-blocking
+ // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
+ //
+ // Notably, this allows current WebAssembly targets to work even though their threading support
+ // is stubbed out, and we won't have to change anything if they do add real threading.
+ let unsupported = matches!(&result, Err(e) if e.is_unsupported());
+ if unsupported && WorkerThread::current().is_null() {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(1)
+ .spawn_handler(|thread| {
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+
+ Ok(())
+ });
+
+ let fallback_result = Registry::new(builder);
+ if fallback_result.is_ok() {
+ return fallback_result;
+ }
+ }
+
+ result
+}
+
struct Terminator<'a>(&'a Arc<Registry>);
impl<'a> Drop for Terminator<'a> {
@@ -376,7 +417,7 @@ impl Registry {
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
- self.inject(&[job_ref]);
+ self.inject(job_ref);
}
}
}
@@ -384,10 +425,8 @@ impl Registry {
/// Push a job into the "external jobs" queue; it will be taken by
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- self.log(|| JobsInjected {
- count: injected_jobs.len(),
- });
+ pub(super) fn inject(&self, injected_job: JobRef) {
+ self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
@@ -402,12 +441,8 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
- for &job_ref in injected_jobs {
- self.injected_jobs.push(job_ref);
- }
-
- self.sleep
- .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ self.injected_jobs.push(injected_job);
+ self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
@@ -505,9 +540,9 @@ impl Registry {
assert!(injected && !worker_thread.is_null());
op(&*worker_thread, true)
},
- l,
+ LatchRef::new(l),
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
// flush accumulated logs as we exit the thread
@@ -535,7 +570,7 @@ impl Registry {
},
latch,
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
current_thread.wait_until(&job.latch);
job.into_result()
}
@@ -575,7 +610,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- thread_info.terminate.set_and_tickle_one(self, i);
+ unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -652,7 +687,20 @@ pub(super) struct WorkerThread {
// worker is fully unwound. Using an unsafe pointer avoids the need
// for a RefCell<T> etc.
thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
+}
+
+impl From<ThreadBuilder> for WorkerThread {
+ fn from(thread: ThreadBuilder) -> Self {
+ Self {
+ worker: thread.worker,
+ stealer: thread.stealer,
+ fifo: JobFifo::new(),
+ index: thread.index,
+ rng: XorShift64Star::new(),
+ registry: thread.registry,
+ }
+ }
}
impl Drop for WorkerThread {
@@ -725,7 +773,7 @@ impl WorkerThread {
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
- pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ pub(super) fn take_local_job(&self) -> Option<JobRef> {
let popped_job = self.worker.pop();
if popped_job.is_some() {
@@ -767,16 +815,7 @@ impl WorkerThread {
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- if let Some(job) = self
- .take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
- {
+ if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
@@ -799,6 +838,37 @@ impl WorkerThread {
mem::forget(abort_guard); // successful execution, do not abort
}
+ fn find_work(&self) -> Option<JobRef> {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ self.take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ }
+
+ pub(super) fn yield_now(&self) -> Yield {
+ match self.find_work() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
+ pub(super) fn yield_local(&self) -> Yield {
+ match self.take_local_job() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
#[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
@@ -808,7 +878,7 @@ impl WorkerThread {
///
/// This should only be done as a last resort, when there is no
/// local work to do.
- unsafe fn steal(&self) -> Option<JobRef> {
+ fn steal(&self) -> Option<JobRef> {
// we only steal when we don't have any work to do locally
debug_assert!(self.local_deque_is_empty());
@@ -851,25 +921,14 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(
- worker: Worker<JobRef>,
- stealer: Stealer<JobRef>,
- registry: Arc<Registry>,
- index: usize,
-) {
- let worker_thread = &WorkerThread {
- worker,
- stealer,
- fifo: JobFifo::new(),
- index,
- rng: XorShift64Star::new(),
- registry,
- };
+unsafe fn main_loop(thread: ThreadBuilder) {
+ let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
// let registry know we are ready to do work
- registry.thread_infos[index].primed.set();
+ Latch::set(&registry.thread_infos[index].primed);
// Worker threads should not panic. If they do, just abort, as the
// internal state of the threadpool is corrupted. Note that if
@@ -892,7 +951,7 @@ unsafe fn main_loop(
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
- registry.thread_infos[index].stopped.set();
+ Latch::set(&registry.thread_infos[index].stopped);
// Normal termination, do not abort.
mem::forget(abort_guard);
@@ -924,7 +983,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
- global_registry().in_worker_cold(op)
+ global_registry().in_worker(op)
}
}
}
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
index 25cda832e..f460dd79d 100644
--- a/vendor/rayon-core/src/scope/mod.rs
+++ b/vendor/rayon-core/src/scope/mod.rs
@@ -13,7 +13,7 @@ use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
@@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> {
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> {
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> {
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -615,7 +615,7 @@ impl<'scope> ScopeFifo<'scope> {
// SAFETY: this job will execute before the scope ends.
unsafe { worker.push(fifo.push(job_ref)) };
}
- None => self.base.registry.inject(&[job_ref]),
+ None => self.base.registry.inject(job_ref),
}
}
@@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> {
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> {
where
FUNC: FnOnce() -> R,
{
- let result = self.execute_job_closure(func);
+ let result = unsafe { Self::execute_job_closure(self, func) };
self.job_completed_latch.wait(owner);
self.maybe_propagate_panic();
result.unwrap() // only None if `op` panicked, and that would have been propagated
@@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- fn execute_job<FUNC>(&self, func: FUNC)
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
where
FUNC: FnOnce(),
{
- let _: Option<()> = self.execute_job_closure(func);
+ let _: Option<()> = Self::execute_job_closure(this, func);
}
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_latch.set();
+ Latch::set(&(*this).job_completed_latch);
Some(r)
}
Err(err) => {
- self.job_panicked(err);
- self.job_completed_latch.set();
+ (*this).job_panicked(err);
+ Latch::set(&(*this).job_completed_latch);
None
}
}
@@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> {
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
- let nil = ptr::null_mut();
- let mut err = Box::new(err); // box up the fat ptr
- if self
- .panic
- .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- mem::forget(err); // ownership now transferred into self.panic
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
}
}
@@ -791,14 +797,14 @@ impl ScopeLatch {
}
impl Latch for ScopeLatch {
- fn set(&self) {
- match self {
+ unsafe fn set(this: *const Self) {
+ match &*this {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
+ } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
+ ScopeLatch::Blocking { latch } => Latch::set(latch),
}
}
}
diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs
index 00dd18c92..ad8c4af0b 100644
--- a/vendor/rayon-core/src/scope/test.rs
+++ b/vendor/rayon-core/src/scope/test.rs
@@ -148,6 +148,7 @@ fn update_tree() {
/// linearly with N. We test this by some unsafe hackery and
/// permitting an approx 10% change with a 10x input change.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn linear_stack_growth() {
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = builder.build().unwrap();
@@ -213,6 +214,7 @@ fn panic_propagate_nested_scope_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_1() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -227,6 +229,7 @@ fn panic_propagate_still_execute_1() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_2() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -241,6 +244,7 @@ fn panic_propagate_still_execute_2() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_3() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -255,6 +259,7 @@ fn panic_propagate_still_execute_3() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_4() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -292,6 +297,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_order!(scope => spawn);
@@ -300,6 +306,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_order!(scope_fifo => spawn_fifo);
@@ -334,6 +341,7 @@ macro_rules! test_nested_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_nested_order!(scope => spawn, scope => spawn);
@@ -342,6 +350,7 @@ fn nested_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
@@ -350,6 +359,7 @@ fn nested_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
@@ -361,6 +371,7 @@ fn nested_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
@@ -403,6 +414,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -412,6 +424,7 @@ fn mixed_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
let expected = vec![-1, 0, -2, 1, -3, 2, 3];
@@ -419,6 +432,7 @@ fn mixed_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -428,6 +442,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
@@ -553,6 +568,7 @@ fn scope_spawn_broadcast_nested() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_barrier() {
let barrier = Barrier::new(8);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -565,6 +581,7 @@ fn scope_spawn_broadcast_barrier() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -583,6 +600,7 @@ fn scope_spawn_broadcast_panic_one() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
diff --git a/vendor/rayon-core/src/spawn/mod.rs b/vendor/rayon-core/src/spawn/mod.rs
index ae1f211ef..1aa9edb3c 100644
--- a/vendor/rayon-core/src/spawn/mod.rs
+++ b/vendor/rayon-core/src/spawn/mod.rs
@@ -154,7 +154,7 @@ where
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
- None => registry.inject(&[job_ref]),
+ None => registry.inject(job_ref),
}
mem::forget(abort_guard);
}
diff --git a/vendor/rayon-core/src/spawn/test.rs b/vendor/rayon-core/src/spawn/test.rs
index 761fafc77..b7a0535aa 100644
--- a/vendor/rayon-core/src/spawn/test.rs
+++ b/vendor/rayon-core/src/spawn/test.rs
@@ -7,6 +7,7 @@ use super::{spawn, spawn_fifo};
use crate::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_in_worker() {
let (tx, rx) = channel();
scope(move |_| {
@@ -16,6 +17,7 @@ fn spawn_then_join_in_worker() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_outside_worker() {
let (tx, rx) = channel();
spawn(move || tx.send(22).unwrap());
@@ -23,6 +25,7 @@ fn spawn_then_join_outside_worker() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_fwd() {
let (tx, rx) = channel();
@@ -54,6 +57,7 @@ fn panic_fwd() {
/// still active asynchronous tasks. We expect the thread-pool to stay
/// alive and executing until those threads are complete.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn termination_while_things_are_executing() {
let (tx0, rx0) = channel();
let (tx1, rx1) = channel();
@@ -80,6 +84,7 @@ fn termination_while_things_are_executing() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_spawn() {
let (tx, rx) = channel();
@@ -107,6 +112,7 @@ fn custom_panic_handler_and_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_nested_spawn() {
let (tx, rx) = channel();
@@ -165,6 +171,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
let vec = test_order!(spawn, spawn);
@@ -173,6 +180,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
let vec = test_order!(spawn_fifo, spawn_fifo);
@@ -181,6 +189,7 @@ fn fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_order!(spawn, spawn_fifo);
@@ -192,6 +201,7 @@ fn lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_order!(spawn_fifo, spawn);
@@ -229,6 +239,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
let vec = test_mixed_order!(spawn, spawn_fifo);
let expected = vec![3, -1, 2, -2, 1, -3, 0];
@@ -236,6 +247,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(spawn_fifo, spawn);
let expected = vec![0, -3, 1, -2, 2, -1, 3];
diff --git a/vendor/rayon-core/src/test.rs b/vendor/rayon-core/src/test.rs
index 46d63a7df..25b8487f7 100644
--- a/vendor/rayon-core/src/test.rs
+++ b/vendor/rayon-core/src/test.rs
@@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn worker_thread_index() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -14,6 +15,7 @@ fn worker_thread_index() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn start_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -40,6 +42,7 @@ fn start_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn exit_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -69,6 +72,7 @@ fn exit_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn handler_panics_handled_correctly() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -119,6 +123,7 @@ fn handler_panics_handled_correctly() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_config_build() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -134,6 +139,7 @@ fn check_error_send_sync() {
#[allow(deprecated)]
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn configuration() {
let start_handler = move |_| {};
let exit_handler = move |_| {};
@@ -154,6 +160,7 @@ fn configuration() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn default_pool() {
ThreadPoolBuilder::default().build().unwrap();
}
@@ -162,6 +169,7 @@ fn default_pool() {
/// the pool is done with them, allowing them to be used with rayon again
/// later. e.g. WebAssembly want to have their own pool of available threads.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
let n_threads = 5;
let mut handles = vec![];
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs
index 0fc06dd6b..c37826ef5 100644
--- a/vendor/rayon-core/src/thread_pool/mod.rs
+++ b/vendor/rayon-core/src/thread_pool/mod.rs
@@ -339,6 +339,30 @@ impl ThreadPool {
// We assert that `self.registry` has not terminated.
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
}
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
}
impl Drop for ThreadPool {
@@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
Some(!curr.local_deque_is_empty())
}
}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs
index ac750a6dc..6143e5799 100644
--- a/vendor/rayon-core/src/thread_pool/test.rs
+++ b/vendor/rayon-core/src/thread_pool/test.rs
@@ -16,6 +16,7 @@ fn panic_propagate() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
@@ -43,6 +44,7 @@ fn join_a_lot(n: usize) {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
@@ -89,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
// Note: we first tried to force failure with a `usize::MAX` stack, but
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
@@ -115,6 +118,7 @@ fn failed_thread_stack() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
@@ -139,6 +143,7 @@ fn panic_thread_name() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -147,6 +152,7 @@ fn self_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -166,6 +172,7 @@ fn mutual_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
@@ -194,6 +201,7 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -219,6 +227,7 @@ macro_rules! test_scope_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -226,6 +235,7 @@ fn scope_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -250,6 +260,7 @@ macro_rules! test_spawn_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -257,6 +268,7 @@ fn spawn_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -264,6 +276,7 @@ fn spawn_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
@@ -300,6 +313,7 @@ fn nested_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
@@ -336,6 +350,7 @@ fn nested_fifo_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -351,6 +366,7 @@ fn in_place_scope_no_deadlock() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -364,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
rx_ref.recv().unwrap();
});
}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}