diff options
Diffstat (limited to 'vendor/rayon-core/src/registry.rs')
-rw-r--r-- | vendor/rayon-core/src/registry.rs | 155 |
1 files changed, 107 insertions, 48 deletions
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs index 279e298d2..5d56ac927 100644 --- a/vendor/rayon-core/src/registry.rs +++ b/vendor/rayon-core/src/registry.rs @@ -1,11 +1,12 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; +use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; use crate::unwind; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, + Yield, }; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::cell::Cell; @@ -50,7 +51,7 @@ impl ThreadBuilder { /// Executes the main loop for this thread. This will not return until the /// thread pool is dropped. pub fn run(self) { - unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) } + unsafe { main_loop(self) } } } @@ -164,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new(); /// initialization has not already occurred, use the default /// configuration. pub(super) fn global_registry() -> &'static Arc<Registry> { - set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) + set_global_registry(default_global_registry) .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") } @@ -198,6 +199,46 @@ where result } +fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { + let result = Registry::new(ThreadPoolBuilder::new()); + + // If we're running in an environment that doesn't support threads at all, we can fall back to + // using the current thread alone. This is crude, and probably won't work for non-blocking + // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. + // + // Notably, this allows current WebAssembly targets to work even though their threading support + // is stubbed out, and we won't have to change anything if they do add real threading. + let unsupported = matches!(&result, Err(e) if e.is_unsupported()); + if unsupported && WorkerThread::current().is_null() { + let builder = ThreadPoolBuilder::new() + .num_threads(1) + .spawn_handler(|thread| { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + unsafe { + WorkerThread::set_current(worker_thread); + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + } + + Ok(()) + }); + + let fallback_result = Registry::new(builder); + if fallback_result.is_ok() { + return fallback_result; + } + } + + result +} + struct Terminator<'a>(&'a Arc<Registry>); impl<'a> Drop for Terminator<'a> { @@ -376,7 +417,7 @@ impl Registry { if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { (*worker_thread).push(job_ref); } else { - self.inject(&[job_ref]); + self.inject(job_ref); } } } @@ -384,10 +425,8 @@ impl Registry { /// Push a job into the "external jobs" queue; it will be taken by /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. - pub(super) fn inject(&self, injected_jobs: &[JobRef]) { - self.log(|| JobsInjected { - count: injected_jobs.len(), - }); + pub(super) fn inject(&self, injected_job: JobRef) { + self.log(|| JobsInjected { count: 1 }); // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and @@ -402,12 +441,8 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); - for &job_ref in injected_jobs { - self.injected_jobs.push(job_ref); - } - - self.sleep - .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); + self.injected_jobs.push(injected_job); + self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); } fn has_injected_job(&self) -> bool { @@ -505,9 +540,9 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - l, + LatchRef::new(l), ); - self.inject(&[job.as_job_ref()]); + self.inject(job.as_job_ref()); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. // flush accumulated logs as we exit the thread @@ -535,7 +570,7 @@ impl Registry { }, latch, ); - self.inject(&[job.as_job_ref()]); + self.inject(job.as_job_ref()); current_thread.wait_until(&job.latch); job.into_result() } @@ -575,7 +610,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - thread_info.terminate.set_and_tickle_one(self, i); + unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -652,7 +687,20 @@ pub(super) struct WorkerThread { // worker is fully unwound. Using an unsafe pointer avoids the need // for a RefCell<T> etc. thread_local! { - static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); + static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; +} + +impl From<ThreadBuilder> for WorkerThread { + fn from(thread: ThreadBuilder) -> Self { + Self { + worker: thread.worker, + stealer: thread.stealer, + fifo: JobFifo::new(), + index: thread.index, + rng: XorShift64Star::new(), + registry: thread.registry, + } + } } impl Drop for WorkerThread { @@ -725,7 +773,7 @@ impl WorkerThread { /// for breadth-first execution, it would mean dequeuing from the /// bottom. #[inline] - pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> { + pub(super) fn take_local_job(&self) -> Option<JobRef> { let popped_job = self.worker.pop(); if popped_job.is_some() { @@ -767,16 +815,7 @@ impl WorkerThread { let mut idle_state = self.registry.sleep.start_looking(self.index, latch); while !latch.probe() { - // Try to find some work to do. We give preference first - // to things in our local deque, then in other workers - // deques, and finally to injected jobs from the - // outside. The idea is to finish what we started before - // we take on something new. - if let Some(job) = self - .take_local_job() - .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) - { + if let Some(job) = self.find_work() { self.registry.sleep.work_found(idle_state); self.execute(job); idle_state = self.registry.sleep.start_looking(self.index, latch); @@ -799,6 +838,37 @@ impl WorkerThread { mem::forget(abort_guard); // successful execution, do not abort } + fn find_work(&self) -> Option<JobRef> { + // Try to find some work to do. We give preference first + // to things in our local deque, then in other workers + // deques, and finally to injected jobs from the + // outside. The idea is to finish what we started before + // we take on something new. + self.take_local_job() + .or_else(|| self.steal()) + .or_else(|| self.registry.pop_injected_job(self.index)) + } + + pub(super) fn yield_now(&self) -> Yield { + match self.find_work() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + pub(super) fn yield_local(&self) -> Yield { + match self.take_local_job() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + #[inline] pub(super) unsafe fn execute(&self, job: JobRef) { job.execute(); @@ -808,7 +878,7 @@ impl WorkerThread { /// /// This should only be done as a last resort, when there is no /// local work to do. - unsafe fn steal(&self) -> Option<JobRef> { + fn steal(&self) -> Option<JobRef> { // we only steal when we don't have any work to do locally debug_assert!(self.local_deque_is_empty()); @@ -851,25 +921,14 @@ impl WorkerThread { /// //////////////////////////////////////////////////////////////////////// -unsafe fn main_loop( - worker: Worker<JobRef>, - stealer: Stealer<JobRef>, - registry: Arc<Registry>, - index: usize, -) { - let worker_thread = &WorkerThread { - worker, - stealer, - fifo: JobFifo::new(), - index, - rng: XorShift64Star::new(), - registry, - }; +unsafe fn main_loop(thread: ThreadBuilder) { + let worker_thread = &WorkerThread::from(thread); WorkerThread::set_current(worker_thread); let registry = &*worker_thread.registry; + let index = worker_thread.index; // let registry know we are ready to do work - registry.thread_infos[index].primed.set(); + Latch::set(®istry.thread_infos[index].primed); // Worker threads should not panic. If they do, just abort, as the // internal state of the threadpool is corrupted. Note that if @@ -892,7 +951,7 @@ unsafe fn main_loop( debug_assert!(worker_thread.take_local_job().is_none()); // let registry know we are done - registry.thread_infos[index].stopped.set(); + Latch::set(®istry.thread_infos[index].stopped); // Normal termination, do not abort. mem::forget(abort_guard); @@ -924,7 +983,7 @@ where // invalidated until we return. op(&*owner_thread, false) } else { - global_registry().in_worker_cold(op) + global_registry().in_worker(op) } } } |