summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src/registry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/registry.rs')
-rw-r--r--vendor/rayon-core/src/registry.rs155
1 files changed, 107 insertions, 48 deletions
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs
index 279e298d2..5d56ac927 100644
--- a/vendor/rayon-core/src/registry.rs
+++ b/vendor/rayon-core/src/registry.rs
@@ -1,11 +1,12 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+ Yield,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::cell::Cell;
@@ -50,7 +51,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
+ unsafe { main_loop(self) }
}
}
@@ -164,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -198,6 +199,46 @@ where
result
}
+fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
+ let result = Registry::new(ThreadPoolBuilder::new());
+
+ // If we're running in an environment that doesn't support threads at all, we can fall back to
+ // using the current thread alone. This is crude, and probably won't work for non-blocking
+ // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
+ //
+ // Notably, this allows current WebAssembly targets to work even though their threading support
+ // is stubbed out, and we won't have to change anything if they do add real threading.
+ let unsupported = matches!(&result, Err(e) if e.is_unsupported());
+ if unsupported && WorkerThread::current().is_null() {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(1)
+ .spawn_handler(|thread| {
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+
+ Ok(())
+ });
+
+ let fallback_result = Registry::new(builder);
+ if fallback_result.is_ok() {
+ return fallback_result;
+ }
+ }
+
+ result
+}
+
struct Terminator<'a>(&'a Arc<Registry>);
impl<'a> Drop for Terminator<'a> {
@@ -376,7 +417,7 @@ impl Registry {
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
- self.inject(&[job_ref]);
+ self.inject(job_ref);
}
}
}
@@ -384,10 +425,8 @@ impl Registry {
/// Push a job into the "external jobs" queue; it will be taken by
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- self.log(|| JobsInjected {
- count: injected_jobs.len(),
- });
+ pub(super) fn inject(&self, injected_job: JobRef) {
+ self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
@@ -402,12 +441,8 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
- for &job_ref in injected_jobs {
- self.injected_jobs.push(job_ref);
- }
-
- self.sleep
- .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ self.injected_jobs.push(injected_job);
+ self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
@@ -505,9 +540,9 @@ impl Registry {
assert!(injected && !worker_thread.is_null());
op(&*worker_thread, true)
},
- l,
+ LatchRef::new(l),
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
// flush accumulated logs as we exit the thread
@@ -535,7 +570,7 @@ impl Registry {
},
latch,
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
current_thread.wait_until(&job.latch);
job.into_result()
}
@@ -575,7 +610,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- thread_info.terminate.set_and_tickle_one(self, i);
+ unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -652,7 +687,20 @@ pub(super) struct WorkerThread {
// worker is fully unwound. Using an unsafe pointer avoids the need
// for a RefCell<T> etc.
thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
+}
+
+impl From<ThreadBuilder> for WorkerThread {
+ fn from(thread: ThreadBuilder) -> Self {
+ Self {
+ worker: thread.worker,
+ stealer: thread.stealer,
+ fifo: JobFifo::new(),
+ index: thread.index,
+ rng: XorShift64Star::new(),
+ registry: thread.registry,
+ }
+ }
}
impl Drop for WorkerThread {
@@ -725,7 +773,7 @@ impl WorkerThread {
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
- pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ pub(super) fn take_local_job(&self) -> Option<JobRef> {
let popped_job = self.worker.pop();
if popped_job.is_some() {
@@ -767,16 +815,7 @@ impl WorkerThread {
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- if let Some(job) = self
- .take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
- {
+ if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
@@ -799,6 +838,37 @@ impl WorkerThread {
mem::forget(abort_guard); // successful execution, do not abort
}
+ fn find_work(&self) -> Option<JobRef> {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ self.take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ }
+
+ pub(super) fn yield_now(&self) -> Yield {
+ match self.find_work() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
+ pub(super) fn yield_local(&self) -> Yield {
+ match self.take_local_job() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
#[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
@@ -808,7 +878,7 @@ impl WorkerThread {
///
/// This should only be done as a last resort, when there is no
/// local work to do.
- unsafe fn steal(&self) -> Option<JobRef> {
+ fn steal(&self) -> Option<JobRef> {
// we only steal when we don't have any work to do locally
debug_assert!(self.local_deque_is_empty());
@@ -851,25 +921,14 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(
- worker: Worker<JobRef>,
- stealer: Stealer<JobRef>,
- registry: Arc<Registry>,
- index: usize,
-) {
- let worker_thread = &WorkerThread {
- worker,
- stealer,
- fifo: JobFifo::new(),
- index,
- rng: XorShift64Star::new(),
- registry,
- };
+unsafe fn main_loop(thread: ThreadBuilder) {
+ let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
// let registry know we are ready to do work
- registry.thread_infos[index].primed.set();
+ Latch::set(&registry.thread_infos[index].primed);
// Worker threads should not panic. If they do, just abort, as the
// internal state of the threadpool is corrupted. Note that if
@@ -892,7 +951,7 @@ unsafe fn main_loop(
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
- registry.thread_infos[index].stopped.set();
+ Latch::set(&registry.thread_infos[index].stopped);
// Normal termination, do not abort.
mem::forget(abort_guard);
@@ -924,7 +983,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
- global_registry().in_worker_cold(op)
+ global_registry().in_worker(op)
}
}
}