diff options
Diffstat (limited to 'vendor/rayon-core/src')
-rw-r--r-- | vendor/rayon-core/src/broadcast/mod.rs | 5 | ||||
-rw-r--r--[-rwxr-xr-x] | vendor/rayon-core/src/broadcast/test.rs | 46 | ||||
-rw-r--r-- | vendor/rayon-core/src/job.rs | 10 | ||||
-rw-r--r-- | vendor/rayon-core/src/join/mod.rs | 3 | ||||
-rw-r--r-- | vendor/rayon-core/src/join/test.rs | 6 | ||||
-rw-r--r-- | vendor/rayon-core/src/latch.rs | 90 | ||||
-rw-r--r-- | vendor/rayon-core/src/lib.rs | 24 | ||||
-rw-r--r-- | vendor/rayon-core/src/registry.rs | 155 | ||||
-rw-r--r-- | vendor/rayon-core/src/scope/mod.rs | 72 | ||||
-rw-r--r-- | vendor/rayon-core/src/scope/test.rs | 18 | ||||
-rw-r--r-- | vendor/rayon-core/src/spawn/mod.rs | 2 | ||||
-rw-r--r-- | vendor/rayon-core/src/spawn/test.rs | 12 | ||||
-rw-r--r-- | vendor/rayon-core/src/test.rs | 8 | ||||
-rw-r--r-- | vendor/rayon-core/src/thread_pool/mod.rs | 69 | ||||
-rw-r--r-- | vendor/rayon-core/src/thread_pool/test.rs | 52 |
15 files changed, 457 insertions, 115 deletions
diff --git a/vendor/rayon-core/src/broadcast/mod.rs b/vendor/rayon-core/src/broadcast/mod.rs index 452aa71b6..d991c5461 100644 --- a/vendor/rayon-core/src/broadcast/mod.rs +++ b/vendor/rayon-core/src/broadcast/mod.rs @@ -1,4 +1,5 @@ use crate::job::{ArcJob, StackJob}; +use crate::latch::LatchRef; use crate::registry::{Registry, WorkerThread}; use crate::scope::ScopeLatch; use std::fmt; @@ -107,7 +108,9 @@ where let n_threads = registry.num_threads(); let current_thread = WorkerThread::current().as_ref(); let latch = ScopeLatch::with_count(n_threads, current_thread); - let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect(); + let jobs: Vec<_> = (0..n_threads) + .map(|_| StackJob::new(&f, LatchRef::new(&latch))) + .collect(); let job_refs = jobs.iter().map(|job| job.as_job_ref()); registry.inject_broadcast(job_refs); diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs index a765cb034..3ae11f7f6 100755..100644 --- a/vendor/rayon-core/src/broadcast/test.rs +++ b/vendor/rayon-core/src/broadcast/test.rs @@ -12,6 +12,7 @@ fn broadcast_global() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_global() { let (tx, rx) = crossbeam_channel::unbounded(); crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -22,6 +23,7 @@ fn spawn_broadcast_global() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn broadcast_pool() { let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); let v = pool.broadcast(|ctx| ctx.index()); @@ -29,6 +31,7 @@ fn broadcast_pool() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_pool() { let (tx, rx) = crossbeam_channel::unbounded(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -40,6 +43,7 @@ fn spawn_broadcast_pool() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn broadcast_self() { let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); @@ -47,6 +51,7 @@ fn broadcast_self() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_self() { let (tx, rx) = crossbeam_channel::unbounded(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -58,6 +63,7 @@ fn spawn_broadcast_self() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn broadcast_mutual() { let count = AtomicUsize::new(0); let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); @@ -73,6 +79,7 @@ fn broadcast_mutual() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual() { let (tx, rx) = crossbeam_channel::unbounded(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); @@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn broadcast_mutual_sleepy() { let count = AtomicUsize::new(0); let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); @@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual_sleepy() { let (tx, rx) = crossbeam_channel::unbounded(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); @@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn broadcast_panic_one() { let count = AtomicUsize::new(0); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -146,6 +156,7 @@ fn broadcast_panic_one() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_one() { let (tx, rx) = crossbeam_channel::unbounded(); let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); @@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn broadcast_panic_many() { let count = AtomicUsize::new(0); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -182,6 +194,7 @@ fn broadcast_panic_many() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_many() { let (tx, rx) = crossbeam_channel::unbounded(); let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); @@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn broadcast_sleep_race() { let test_duration = time::Duration::from_secs(1); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -214,3 +228,35 @@ fn broadcast_sleep_race() { }); } } + +#[test] +fn broadcast_after_spawn_broadcast() { + let (tx, rx) = crossbeam_channel::unbounded(); + + // Queue a non-blocking spawn_broadcast. + crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + // This blocking broadcast runs after all prior broadcasts. + crate::broadcast(|_| {}); + + // The spawn_broadcast **must** have run by now on all threads. + let mut v: Vec<_> = rx.try_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +fn broadcast_after_spawn() { + let (tx, rx) = crossbeam_channel::bounded(1); + + // Queue a regular spawn on a thread-local deque. + crate::registry::in_worker(move |_, _| { + crate::spawn(move || tx.send(22).unwrap()); + }); + + // Broadcast runs after the local deque is empty. + crate::broadcast(|_| {}); + + // The spawn **must** have run by now. + assert_eq!(22, rx.try_recv().unwrap()); +} diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs index b7a3dae18..5664bb385 100644 --- a/vendor/rayon-core/src/job.rs +++ b/vendor/rayon-core/src/job.rs @@ -30,7 +30,6 @@ pub(super) trait Job { /// Internally, we store the job's data in a `*const ()` pointer. The /// true type is something like `*const StackJob<...>`, but we hide /// it. We also carry the "execute fn" from the `Job` trait. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub(super) struct JobRef { pointer: *const (), execute_fn: unsafe fn(*const ()), @@ -53,6 +52,13 @@ impl JobRef { } } + /// Returns an opaque handle that can be saved and compared, + /// without making `JobRef` itself `Copy + Eq`. + #[inline] + pub(super) fn id(&self) -> impl Eq { + (self.pointer, self.execute_fn) + } + #[inline] pub(super) unsafe fn execute(self) { (self.execute_fn)(self.pointer) @@ -112,7 +118,7 @@ where let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = JobResult::call(func); - this.latch.set(); + Latch::set(&this.latch); mem::forget(abort); } } diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs index d72c7e61c..5ab9f6b32 100644 --- a/vendor/rayon-core/src/join/mod.rs +++ b/vendor/rayon-core/src/join/mod.rs @@ -135,6 +135,7 @@ where // long enough. let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); let job_b_ref = job_b.as_job_ref(); + let job_b_id = job_b_ref.id(); worker_thread.push(job_b_ref); // Execute task a; hopefully b gets stolen in the meantime. @@ -151,7 +152,7 @@ where // those off to get to it. while !job_b.latch.probe() { if let Some(job) = worker_thread.take_local_job() { - if job == job_b_ref { + if job_b_id == job.id() { // Found it! Let's run it. // // Note that this could panic, but it's ok if we unwind here. diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs index e7f287f6f..b303dbc81 100644 --- a/vendor/rayon-core/src/join/test.rs +++ b/vendor/rayon-core/src/join/test.rs @@ -47,6 +47,7 @@ fn sort() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sort_in_pool() { let rng = seeded_rng(); let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect(); @@ -77,6 +78,7 @@ fn panic_propagate_both() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_b_still_executes() { let mut x = false; match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) { @@ -86,6 +88,7 @@ fn panic_b_still_executes() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn join_context_both() { // If we're not in a pool, both should be marked stolen as they're injected. let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated()); @@ -94,6 +97,7 @@ fn join_context_both() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn join_context_neither() { // If we're already in a 1-thread pool, neither job should be stolen. let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); @@ -104,6 +108,7 @@ fn join_context_neither() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn join_context_second() { use std::sync::Barrier; @@ -127,6 +132,7 @@ fn join_context_second() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn join_counter_overflow() { const MAX: u32 = 500_000; diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs index 090929374..de4327234 100644 --- a/vendor/rayon-core/src/latch.rs +++ b/vendor/rayon-core/src/latch.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; +use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::usize; @@ -37,10 +39,15 @@ pub(super) trait Latch { /// /// Setting a latch triggers other threads to wake up and (in some /// cases) complete. This may, in turn, cause memory to be - /// allocated and so forth. One must be very careful about this, + /// deallocated and so forth. One must be very careful about this, /// and it's typically better to read all the fields you will need /// to access *before* a latch is set! - fn set(&self); + /// + /// This function operates on `*const Self` instead of `&self` to allow it + /// to become dangling during this call. The caller must ensure that the + /// pointer is valid upon entry, and not invalidated during the call by any + /// actions other than `set` itself. + unsafe fn set(this: *const Self); } pub(super) trait AsCoreLatch { @@ -123,8 +130,8 @@ impl CoreLatch { /// doing some wakeups; those are encapsulated in the surrounding /// latch code. #[inline] - fn set(&self) -> bool { - let old_state = self.state.swap(SET, Ordering::AcqRel); + unsafe fn set(this: *const Self) -> bool { + let old_state = (*this).state.swap(SET, Ordering::AcqRel); old_state == SLEEPING } @@ -186,16 +193,16 @@ impl<'r> AsCoreLatch for SpinLatch<'r> { impl<'r> Latch for SpinLatch<'r> { #[inline] - fn set(&self) { + unsafe fn set(this: *const Self) { let cross_registry; - let registry: &Registry = if self.cross { + let registry: &Registry = if (*this).cross { // Ensure the registry stays alive while we notify it. // Otherwise, it would be possible that we set the spin // latch and the other thread sees it and exits, causing // the registry to be deallocated, all before we get a // chance to invoke `registry.notify_worker_latch_is_set`. - cross_registry = Arc::clone(self.registry); + cross_registry = Arc::clone((*this).registry); &cross_registry } else { // If this is not a "cross-registry" spin-latch, then the @@ -203,12 +210,12 @@ impl<'r> Latch for SpinLatch<'r> { // that the registry stays alive. However, that doesn't // include this *particular* `Arc` handle if the waiting // thread then exits, so we must completely dereference it. - self.registry + (*this).registry }; - let target_worker_index = self.target_worker_index; + let target_worker_index = (*this).target_worker_index; - // NOTE: Once we `set`, the target may proceed and invalidate `&self`! - if self.core_latch.set() { + // NOTE: Once we `set`, the target may proceed and invalidate `this`! + if CoreLatch::set(&(*this).core_latch) { // Subtle: at this point, we can no longer read from // `self`, because the thread owning this spin latch may // have awoken and deallocated the latch. Therefore, we @@ -255,10 +262,10 @@ impl LockLatch { impl Latch for LockLatch { #[inline] - fn set(&self) { - let mut guard = self.m.lock().unwrap(); + unsafe fn set(this: *const Self) { + let mut guard = (*this).m.lock().unwrap(); *guard = true; - self.v.notify_all(); + (*this).v.notify_all(); } } @@ -307,9 +314,9 @@ impl CountLatch { /// count, then the latch is **set**, and calls to `probe()` will /// return true. Returns whether the latch was set. #[inline] - pub(super) fn set(&self) -> bool { - if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { - self.core_latch.set(); + pub(super) unsafe fn set(this: *const Self) -> bool { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + CoreLatch::set(&(*this).core_latch); true } else { false @@ -320,8 +327,12 @@ impl CountLatch { /// the latch is set, then the specific worker thread is tickled, /// which should be the one that owns this latch. #[inline] - pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) { - if self.set() { + pub(super) unsafe fn set_and_tickle_one( + this: *const Self, + registry: &Registry, + target_worker_index: usize, + ) { + if Self::set(this) { registry.notify_worker_latch_is_set(target_worker_index); } } @@ -362,19 +373,42 @@ impl CountLockLatch { impl Latch for CountLockLatch { #[inline] - fn set(&self) { - if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { - self.lock_latch.set(); + unsafe fn set(this: *const Self) { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + LockLatch::set(&(*this).lock_latch); } } } -impl<'a, L> Latch for &'a L -where - L: Latch, -{ +/// `&L` without any implication of `dereferenceable` for `Latch::set` +pub(super) struct LatchRef<'a, L> { + inner: *const L, + marker: PhantomData<&'a L>, +} + +impl<L> LatchRef<'_, L> { + pub(super) fn new(inner: &L) -> LatchRef<'_, L> { + LatchRef { + inner, + marker: PhantomData, + } + } +} + +unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} + +impl<L> Deref for LatchRef<'_, L> { + type Target = L; + + fn deref(&self) -> &L { + // SAFETY: if we have &self, the inner latch is still alive + unsafe { &*self.inner } + } +} + +impl<L: Latch> Latch for LatchRef<'_, L> { #[inline] - fn set(&self) { - L::set(self); + unsafe fn set(this: *const Self) { + L::set((*this).inner); } } diff --git a/vendor/rayon-core/src/lib.rs b/vendor/rayon-core/src/lib.rs index b31a2d7e0..c9694ee16 100644 --- a/vendor/rayon-core/src/lib.rs +++ b/vendor/rayon-core/src/lib.rs @@ -26,7 +26,24 @@ //! [`join()`]: struct.ThreadPool.html#method.join //! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html //! -//! ## Restricting multiple versions +//! # Global fallback when threading is unsupported +//! +//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that +//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi` +//! targets are notable examples of this. Rather than panicking on the unsupported error when +//! creating the implicit global threadpool, Rayon configures a fallback mode instead. +//! +//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting +//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since +//! there is no other thread to share the work. However, since the pool is not running independent +//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower- +//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate +//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local` +//! can also volunteer execution time. +//! +//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback. +//! +//! # Restricting multiple versions //! //! In order to ensure proper coordination between threadpools, and especially //! to make sure there's only one global threadpool, `rayon-core` is actively @@ -85,6 +102,7 @@ pub use self::spawn::{spawn, spawn_fifo}; pub use self::thread_pool::current_thread_has_pending_tasks; pub use self::thread_pool::current_thread_index; pub use self::thread_pool::ThreadPool; +pub use self::thread_pool::{yield_local, yield_now, Yield}; use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; @@ -707,6 +725,10 @@ impl ThreadPoolBuildError { fn new(kind: ErrorKind) -> ThreadPoolBuildError { ThreadPoolBuildError { kind } } + + fn is_unsupported(&self) -> bool { + matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported) + } } const GLOBAL_POOL_ALREADY_INITIALIZED: &str = diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs index 279e298d2..5d56ac927 100644 --- a/vendor/rayon-core/src/registry.rs +++ b/vendor/rayon-core/src/registry.rs @@ -1,11 +1,12 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; +use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; use crate::unwind; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, + Yield, }; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::cell::Cell; @@ -50,7 +51,7 @@ impl ThreadBuilder { /// Executes the main loop for this thread. This will not return until the /// thread pool is dropped. pub fn run(self) { - unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) } + unsafe { main_loop(self) } } } @@ -164,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new(); /// initialization has not already occurred, use the default /// configuration. pub(super) fn global_registry() -> &'static Arc<Registry> { - set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) + set_global_registry(default_global_registry) .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") } @@ -198,6 +199,46 @@ where result } +fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { + let result = Registry::new(ThreadPoolBuilder::new()); + + // If we're running in an environment that doesn't support threads at all, we can fall back to + // using the current thread alone. This is crude, and probably won't work for non-blocking + // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. + // + // Notably, this allows current WebAssembly targets to work even though their threading support + // is stubbed out, and we won't have to change anything if they do add real threading. + let unsupported = matches!(&result, Err(e) if e.is_unsupported()); + if unsupported && WorkerThread::current().is_null() { + let builder = ThreadPoolBuilder::new() + .num_threads(1) + .spawn_handler(|thread| { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + unsafe { + WorkerThread::set_current(worker_thread); + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + } + + Ok(()) + }); + + let fallback_result = Registry::new(builder); + if fallback_result.is_ok() { + return fallback_result; + } + } + + result +} + struct Terminator<'a>(&'a Arc<Registry>); impl<'a> Drop for Terminator<'a> { @@ -376,7 +417,7 @@ impl Registry { if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { (*worker_thread).push(job_ref); } else { - self.inject(&[job_ref]); + self.inject(job_ref); } } } @@ -384,10 +425,8 @@ impl Registry { /// Push a job into the "external jobs" queue; it will be taken by /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. - pub(super) fn inject(&self, injected_jobs: &[JobRef]) { - self.log(|| JobsInjected { - count: injected_jobs.len(), - }); + pub(super) fn inject(&self, injected_job: JobRef) { + self.log(|| JobsInjected { count: 1 }); // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and @@ -402,12 +441,8 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); - for &job_ref in injected_jobs { - self.injected_jobs.push(job_ref); - } - - self.sleep - .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); + self.injected_jobs.push(injected_job); + self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); } fn has_injected_job(&self) -> bool { @@ -505,9 +540,9 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - l, + LatchRef::new(l), ); - self.inject(&[job.as_job_ref()]); + self.inject(job.as_job_ref()); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. // flush accumulated logs as we exit the thread @@ -535,7 +570,7 @@ impl Registry { }, latch, ); - self.inject(&[job.as_job_ref()]); + self.inject(job.as_job_ref()); current_thread.wait_until(&job.latch); job.into_result() } @@ -575,7 +610,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - thread_info.terminate.set_and_tickle_one(self, i); + unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -652,7 +687,20 @@ pub(super) struct WorkerThread { // worker is fully unwound. Using an unsafe pointer avoids the need // for a RefCell<T> etc. thread_local! { - static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); + static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; +} + +impl From<ThreadBuilder> for WorkerThread { + fn from(thread: ThreadBuilder) -> Self { + Self { + worker: thread.worker, + stealer: thread.stealer, + fifo: JobFifo::new(), + index: thread.index, + rng: XorShift64Star::new(), + registry: thread.registry, + } + } } impl Drop for WorkerThread { @@ -725,7 +773,7 @@ impl WorkerThread { /// for breadth-first execution, it would mean dequeuing from the /// bottom. #[inline] - pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> { + pub(super) fn take_local_job(&self) -> Option<JobRef> { let popped_job = self.worker.pop(); if popped_job.is_some() { @@ -767,16 +815,7 @@ impl WorkerThread { let mut idle_state = self.registry.sleep.start_looking(self.index, latch); while !latch.probe() { - // Try to find some work to do. We give preference first - // to things in our local deque, then in other workers - // deques, and finally to injected jobs from the - // outside. The idea is to finish what we started before - // we take on something new. - if let Some(job) = self - .take_local_job() - .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) - { + if let Some(job) = self.find_work() { self.registry.sleep.work_found(idle_state); self.execute(job); idle_state = self.registry.sleep.start_looking(self.index, latch); @@ -799,6 +838,37 @@ impl WorkerThread { mem::forget(abort_guard); // successful execution, do not abort } + fn find_work(&self) -> Option<JobRef> { + // Try to find some work to do. We give preference first + // to things in our local deque, then in other workers + // deques, and finally to injected jobs from the + // outside. The idea is to finish what we started before + // we take on something new. + self.take_local_job() + .or_else(|| self.steal()) + .or_else(|| self.registry.pop_injected_job(self.index)) + } + + pub(super) fn yield_now(&self) -> Yield { + match self.find_work() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + pub(super) fn yield_local(&self) -> Yield { + match self.take_local_job() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + #[inline] pub(super) unsafe fn execute(&self, job: JobRef) { job.execute(); @@ -808,7 +878,7 @@ impl WorkerThread { /// /// This should only be done as a last resort, when there is no /// local work to do. - unsafe fn steal(&self) -> Option<JobRef> { + fn steal(&self) -> Option<JobRef> { // we only steal when we don't have any work to do locally debug_assert!(self.local_deque_is_empty()); @@ -851,25 +921,14 @@ impl WorkerThread { /// //////////////////////////////////////////////////////////////////////// -unsafe fn main_loop( - worker: Worker<JobRef>, - stealer: Stealer<JobRef>, - registry: Arc<Registry>, - index: usize, -) { - let worker_thread = &WorkerThread { - worker, - stealer, - fifo: JobFifo::new(), - index, - rng: XorShift64Star::new(), - registry, - }; +unsafe fn main_loop(thread: ThreadBuilder) { + let worker_thread = &WorkerThread::from(thread); WorkerThread::set_current(worker_thread); let registry = &*worker_thread.registry; + let index = worker_thread.index; // let registry know we are ready to do work - registry.thread_infos[index].primed.set(); + Latch::set(®istry.thread_infos[index].primed); // Worker threads should not panic. If they do, just abort, as the // internal state of the threadpool is corrupted. Note that if @@ -892,7 +951,7 @@ unsafe fn main_loop( debug_assert!(worker_thread.take_local_job().is_none()); // let registry know we are done - registry.thread_infos[index].stopped.set(); + Latch::set(®istry.thread_infos[index].stopped); // Normal termination, do not abort. mem::forget(abort_guard); @@ -924,7 +983,7 @@ where // invalidated until we return. op(&*owner_thread, false) } else { - global_registry().in_worker_cold(op) + global_registry().in_worker(op) } } } diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs index 25cda832e..f460dd79d 100644 --- a/vendor/rayon-core/src/scope/mod.rs +++ b/vendor/rayon-core/src/scope/mod.rs @@ -13,7 +13,7 @@ use crate::unwind; use std::any::Any; use std::fmt; use std::marker::PhantomData; -use std::mem; +use std::mem::ManuallyDrop; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; @@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> { BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> { BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> { BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -615,7 +615,7 @@ impl<'scope> ScopeFifo<'scope> { // SAFETY: this job will execute before the scope ends. unsafe { worker.push(fifo.push(job_ref)) }; } - None => self.base.registry.inject(&[job_ref]), + None => self.base.registry.inject(job_ref), } } @@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> { BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - let result = self.execute_job_closure(func); + let result = unsafe { Self::execute_job_closure(self, func) }; self.job_completed_latch.wait(owner); self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated @@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> { /// Executes `func` as a job, either aborting or executing as /// appropriate. - fn execute_job<FUNC>(&self, func: FUNC) + unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) where FUNC: FnOnce(), { - let _: Option<()> = self.execute_job_closure(func); + let _: Option<()> = Self::execute_job_closure(this, func); } /// Executes `func` as a job in scope. Adjusts the "job completed" /// counters and also catches any panic and stores it into /// `scope`. - fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> + unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R, { match unwind::halt_unwinding(func) { Ok(r) => { - self.job_completed_latch.set(); + Latch::set(&(*this).job_completed_latch); Some(r) } Err(err) => { - self.job_panicked(err); - self.job_completed_latch.set(); + (*this).job_panicked(err); + Latch::set(&(*this).job_completed_latch); None } } @@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> { fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { // capture the first error we see, free the rest - let nil = ptr::null_mut(); - let mut err = Box::new(err); // box up the fat ptr - if self - .panic - .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - mem::forget(err); // ownership now transferred into self.panic + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr + let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // ownership now transferred into self.panic + } else { + // another panic raced in ahead of us, so drop ours + let _: Box<Box<_>> = ManuallyDrop::into_inner(err); + } } } @@ -791,14 +797,14 @@ impl ScopeLatch { } impl Latch for ScopeLatch { - fn set(&self) { - match self { + unsafe fn set(this: *const Self) { + match &*this { ScopeLatch::Stealing { latch, registry, worker_index, - } => latch.set_and_tickle_one(registry, *worker_index), - ScopeLatch::Blocking { latch } => latch.set(), + } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), + ScopeLatch::Blocking { latch } => Latch::set(latch), } } } diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs index 00dd18c92..ad8c4af0b 100644 --- a/vendor/rayon-core/src/scope/test.rs +++ b/vendor/rayon-core/src/scope/test.rs @@ -148,6 +148,7 @@ fn update_tree() { /// linearly with N. We test this by some unsafe hackery and /// permitting an approx 10% change with a 10x input change. #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn linear_stack_growth() { let builder = ThreadPoolBuilder::new().num_threads(1); let pool = builder.build().unwrap(); @@ -213,6 +214,7 @@ fn panic_propagate_nested_scope_spawn() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_1() { let mut x = false; match unwind::halt_unwinding(|| { @@ -227,6 +229,7 @@ fn panic_propagate_still_execute_1() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_2() { let mut x = false; match unwind::halt_unwinding(|| { @@ -241,6 +244,7 @@ fn panic_propagate_still_execute_2() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_3() { let mut x = false; match unwind::halt_unwinding(|| { @@ -255,6 +259,7 @@ fn panic_propagate_still_execute_3() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_4() { let mut x = false; match unwind::halt_unwinding(|| { @@ -292,6 +297,7 @@ macro_rules! test_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn lifo_order() { // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. let vec = test_order!(scope => spawn); @@ -300,6 +306,7 @@ fn lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn fifo_order() { // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. let vec = test_order!(scope_fifo => spawn_fifo); @@ -334,6 +341,7 @@ macro_rules! test_nested_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_lifo_order() { // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. let vec = test_nested_order!(scope => spawn, scope => spawn); @@ -342,6 +350,7 @@ fn nested_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_fifo_order() { // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); @@ -350,6 +359,7 @@ fn nested_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_lifo_fifo_order() { // LIFO on the outside, FIFO on the inside let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); @@ -361,6 +371,7 @@ fn nested_lifo_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_fifo_lifo_order() { // FIFO on the outside, LIFO on the inside let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); @@ -403,6 +414,7 @@ macro_rules! test_mixed_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_lifo_order() { // NB: the end of the inner scope makes us execute some of the outer scope // before they've all been spawned, so they're not perfectly LIFO. @@ -412,6 +424,7 @@ fn mixed_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_fifo_order() { let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); let expected = vec![-1, 0, -2, 1, -3, 2, 3]; @@ -419,6 +432,7 @@ fn mixed_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_lifo_fifo_order() { // NB: the end of the inner scope makes us execute some of the outer scope // before they've all been spawned, so they're not perfectly LIFO. @@ -428,6 +442,7 @@ fn mixed_lifo_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_fifo_lifo_order() { let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); let expected = vec![-3, 0, -2, 1, -1, 2, 3]; @@ -553,6 +568,7 @@ fn scope_spawn_broadcast_nested() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_spawn_broadcast_barrier() { let barrier = Barrier::new(8); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -565,6 +581,7 @@ fn scope_spawn_broadcast_barrier() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_spawn_broadcast_panic_one() { let count = AtomicUsize::new(0); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); @@ -583,6 +600,7 @@ fn scope_spawn_broadcast_panic_one() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_spawn_broadcast_panic_many() { let count = AtomicUsize::new(0); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); diff --git a/vendor/rayon-core/src/spawn/mod.rs b/vendor/rayon-core/src/spawn/mod.rs index ae1f211ef..1aa9edb3c 100644 --- a/vendor/rayon-core/src/spawn/mod.rs +++ b/vendor/rayon-core/src/spawn/mod.rs @@ -154,7 +154,7 @@ where // in a locally-FIFO order. Otherwise, just use the pool's global injector. match registry.current_thread() { Some(worker) => worker.push_fifo(job_ref), - None => registry.inject(&[job_ref]), + None => registry.inject(job_ref), } mem::forget(abort_guard); } diff --git a/vendor/rayon-core/src/spawn/test.rs b/vendor/rayon-core/src/spawn/test.rs index 761fafc77..b7a0535aa 100644 --- a/vendor/rayon-core/src/spawn/test.rs +++ b/vendor/rayon-core/src/spawn/test.rs @@ -7,6 +7,7 @@ use super::{spawn, spawn_fifo}; use crate::ThreadPoolBuilder; #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_then_join_in_worker() { let (tx, rx) = channel(); scope(move |_| { @@ -16,6 +17,7 @@ fn spawn_then_join_in_worker() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_then_join_outside_worker() { let (tx, rx) = channel(); spawn(move || tx.send(22).unwrap()); @@ -23,6 +25,7 @@ fn spawn_then_join_outside_worker() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_fwd() { let (tx, rx) = channel(); @@ -54,6 +57,7 @@ fn panic_fwd() { /// still active asynchronous tasks. We expect the thread-pool to stay /// alive and executing until those threads are complete. #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn termination_while_things_are_executing() { let (tx0, rx0) = channel(); let (tx1, rx1) = channel(); @@ -80,6 +84,7 @@ fn termination_while_things_are_executing() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn custom_panic_handler_and_spawn() { let (tx, rx) = channel(); @@ -107,6 +112,7 @@ fn custom_panic_handler_and_spawn() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn custom_panic_handler_and_nested_spawn() { let (tx, rx) = channel(); @@ -165,6 +171,7 @@ macro_rules! test_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn lifo_order() { // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order. let vec = test_order!(spawn, spawn); @@ -173,6 +180,7 @@ fn lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn fifo_order() { // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. let vec = test_order!(spawn_fifo, spawn_fifo); @@ -181,6 +189,7 @@ fn fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn lifo_fifo_order() { // LIFO on the outside, FIFO on the inside let vec = test_order!(spawn, spawn_fifo); @@ -192,6 +201,7 @@ fn lifo_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn fifo_lifo_order() { // FIFO on the outside, LIFO on the inside let vec = test_order!(spawn_fifo, spawn); @@ -229,6 +239,7 @@ macro_rules! test_mixed_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_lifo_fifo_order() { let vec = test_mixed_order!(spawn, spawn_fifo); let expected = vec![3, -1, 2, -2, 1, -3, 0]; @@ -236,6 +247,7 @@ fn mixed_lifo_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mixed_fifo_lifo_order() { let vec = test_mixed_order!(spawn_fifo, spawn); let expected = vec![0, -3, 1, -2, 2, -1, 3]; diff --git a/vendor/rayon-core/src/test.rs b/vendor/rayon-core/src/test.rs index 46d63a7df..25b8487f7 100644 --- a/vendor/rayon-core/src/test.rs +++ b/vendor/rayon-core/src/test.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Barrier}; #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn worker_thread_index() { let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); assert_eq!(pool.current_num_threads(), 22); @@ -14,6 +15,7 @@ fn worker_thread_index() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn start_callback_called() { let n_threads = 16; let n_called = Arc::new(AtomicUsize::new(0)); @@ -40,6 +42,7 @@ fn start_callback_called() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn exit_callback_called() { let n_threads = 16; let n_called = Arc::new(AtomicUsize::new(0)); @@ -69,6 +72,7 @@ fn exit_callback_called() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn handler_panics_handled_correctly() { let n_threads = 16; let n_called = Arc::new(AtomicUsize::new(0)); @@ -119,6 +123,7 @@ fn handler_panics_handled_correctly() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn check_config_build() { let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); assert_eq!(pool.current_num_threads(), 22); @@ -134,6 +139,7 @@ fn check_error_send_sync() { #[allow(deprecated)] #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn configuration() { let start_handler = move |_| {}; let exit_handler = move |_| {}; @@ -154,6 +160,7 @@ fn configuration() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn default_pool() { ThreadPoolBuilder::default().build().unwrap(); } @@ -162,6 +169,7 @@ fn default_pool() { /// the pool is done with them, allowing them to be used with rayon again /// later. e.g. WebAssembly want to have their own pool of available threads. #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { let n_threads = 5; let mut handles = vec![]; diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs index 0fc06dd6b..c37826ef5 100644 --- a/vendor/rayon-core/src/thread_pool/mod.rs +++ b/vendor/rayon-core/src/thread_pool/mod.rs @@ -339,6 +339,30 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } } + + /// Cooperatively yields execution to Rayon. + /// + /// This is similar to the general [`yield_now()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_now(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_now()) + } + + /// Cooperatively yields execution to local Rayon work. + /// + /// This is similar to the general [`yield_local()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_local(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_local()) + } } impl Drop for ThreadPool { @@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> { Some(!curr.local_deque_is_empty()) } } + +/// Cooperatively yields execution to Rayon. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in the pool, then executes it. Completion of +/// that work might include nested work or further work stealing. +/// +/// This is similar to [`std::thread::yield_now()`], but does not literally make +/// that call. If you are implementing a polling loop, you may want to also +/// yield to the OS scheduler yourself if no Rayon work was found. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_now() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_now()) + } +} + +/// Cooperatively yields execution to local Rayon work. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in this thread's queue, then executes it. +/// Completion of that work might include nested work or further work stealing. +/// +/// This is similar to [`yield_now()`], but does not steal from other threads. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_local() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_local()) + } +} + +/// Result of [`yield_now()`] or [`yield_local()`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Yield { + /// Work was found and executed. + Executed, + /// No available work was found. + Idle, +} diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs index ac750a6dc..6143e5799 100644 --- a/vendor/rayon-core/src/thread_pool/test.rs +++ b/vendor/rayon-core/src/thread_pool/test.rs @@ -16,6 +16,7 @@ fn panic_propagate() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn workers_stop() { let registry; @@ -43,6 +44,7 @@ fn join_a_lot(n: usize) { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sleeper_stop() { use std::{thread, time}; @@ -89,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn failed_thread_stack() { // Note: we first tried to force failure with a `usize::MAX` stack, but // macOS and Windows weren't fazed, or at least didn't fail the way we want. @@ -115,6 +118,7 @@ fn failed_thread_stack() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore)] fn panic_thread_name() { let (start_count, start_handler) = count_handler(); let (exit_count, exit_handler) = count_handler(); @@ -139,6 +143,7 @@ fn panic_thread_name() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn self_install() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); @@ -147,6 +152,7 @@ fn self_install() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install() { let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); @@ -166,6 +172,7 @@ fn mutual_install() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install_sleepy() { use std::{thread, time}; @@ -194,6 +201,7 @@ fn mutual_install_sleepy() { #[test] #[allow(deprecated)] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn check_thread_pool_new() { let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); assert_eq!(pool.current_num_threads(), 22); @@ -219,6 +227,7 @@ macro_rules! test_scope_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_lifo_order() { let vec = test_scope_order!(scope => spawn); let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed @@ -226,6 +235,7 @@ fn scope_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_fifo_order() { let vec = test_scope_order!(scope_fifo => spawn_fifo); let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order @@ -250,6 +260,7 @@ macro_rules! test_spawn_order { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_lifo_order() { let vec = test_spawn_order!(spawn); let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed @@ -257,6 +268,7 @@ fn spawn_lifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_fifo_order() { let vec = test_spawn_order!(spawn_fifo); let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order @@ -264,6 +276,7 @@ fn spawn_fifo_order() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_scopes() { // Create matching scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) @@ -300,6 +313,7 @@ fn nested_scopes() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_fifo_scopes() { // Create matching fifo scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) @@ -336,6 +350,7 @@ fn nested_fifo_scopes() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); @@ -351,6 +366,7 @@ fn in_place_scope_no_deadlock() { } #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_fifo_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); @@ -364,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() { rx_ref.recv().unwrap(); }); } + +#[test] +fn yield_now_to_spawn() { + let (tx, rx) = crossbeam_channel::bounded(1); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_now(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +fn yield_local_to_spawn() { + let (tx, rx) = crossbeam_channel::bounded(1); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_local(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} |