From 43a97878ce14b72f0981164f87f2e35e14151312 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 11:22:09 +0200 Subject: Adding upstream version 110.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/rayon-core/src/broadcast/mod.rs | 148 ++++ third_party/rust/rayon-core/src/broadcast/test.rs | 216 +++++ .../rust/rayon-core/src/compile_fail/mod.rs | 7 + .../rayon-core/src/compile_fail/quicksort_race1.rs | 28 + .../rayon-core/src/compile_fail/quicksort_race2.rs | 28 + .../rayon-core/src/compile_fail/quicksort_race3.rs | 28 + .../rust/rayon-core/src/compile_fail/rc_return.rs | 17 + .../rust/rayon-core/src/compile_fail/rc_upvar.rs | 9 + .../rayon-core/src/compile_fail/scope_join_bad.rs | 24 + third_party/rust/rayon-core/src/job.rs | 264 ++++++ third_party/rust/rayon-core/src/join/mod.rs | 187 ++++ third_party/rust/rayon-core/src/join/test.rs | 145 +++ third_party/rust/rayon-core/src/latch.rs | 380 ++++++++ third_party/rust/rayon-core/src/lib.rs | 819 +++++++++++++++++ third_party/rust/rayon-core/src/log.rs | 413 +++++++++ third_party/rust/rayon-core/src/private.rs | 26 + third_party/rust/rayon-core/src/registry.rs | 970 +++++++++++++++++++++ third_party/rust/rayon-core/src/scope/mod.rs | 859 ++++++++++++++++++ third_party/rust/rayon-core/src/scope/test.rs | 601 +++++++++++++ third_party/rust/rayon-core/src/sleep/README.md | 219 +++++ third_party/rust/rayon-core/src/sleep/counters.rs | 277 ++++++ third_party/rust/rayon-core/src/sleep/mod.rs | 394 +++++++++ third_party/rust/rayon-core/src/spawn/mod.rs | 163 ++++ third_party/rust/rayon-core/src/spawn/test.rs | 243 ++++++ third_party/rust/rayon-core/src/test.rs | 192 ++++ third_party/rust/rayon-core/src/thread_pool/mod.rs | 402 +++++++++ .../rust/rayon-core/src/thread_pool/test.rs | 366 ++++++++ third_party/rust/rayon-core/src/unwind.rs | 31 + 28 files changed, 7456 insertions(+) create mode 100644 third_party/rust/rayon-core/src/broadcast/mod.rs create mode 100755 third_party/rust/rayon-core/src/broadcast/test.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/mod.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/quicksort_race1.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/quicksort_race2.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/quicksort_race3.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/rc_return.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/rc_upvar.rs create mode 100644 third_party/rust/rayon-core/src/compile_fail/scope_join_bad.rs create mode 100644 third_party/rust/rayon-core/src/job.rs create mode 100644 third_party/rust/rayon-core/src/join/mod.rs create mode 100644 third_party/rust/rayon-core/src/join/test.rs create mode 100644 third_party/rust/rayon-core/src/latch.rs create mode 100644 third_party/rust/rayon-core/src/lib.rs create mode 100644 third_party/rust/rayon-core/src/log.rs create mode 100644 third_party/rust/rayon-core/src/private.rs create mode 100644 third_party/rust/rayon-core/src/registry.rs create mode 100644 third_party/rust/rayon-core/src/scope/mod.rs create mode 100644 third_party/rust/rayon-core/src/scope/test.rs create mode 100644 third_party/rust/rayon-core/src/sleep/README.md create mode 100644 third_party/rust/rayon-core/src/sleep/counters.rs create mode 100644 third_party/rust/rayon-core/src/sleep/mod.rs create mode 100644 third_party/rust/rayon-core/src/spawn/mod.rs create mode 100644 third_party/rust/rayon-core/src/spawn/test.rs create mode 100644 third_party/rust/rayon-core/src/test.rs create mode 100644 third_party/rust/rayon-core/src/thread_pool/mod.rs create mode 100644 third_party/rust/rayon-core/src/thread_pool/test.rs create mode 100644 third_party/rust/rayon-core/src/unwind.rs (limited to 'third_party/rust/rayon-core/src') diff --git a/third_party/rust/rayon-core/src/broadcast/mod.rs b/third_party/rust/rayon-core/src/broadcast/mod.rs new file mode 100644 index 0000000000..452aa71b67 --- /dev/null +++ b/third_party/rust/rayon-core/src/broadcast/mod.rs @@ -0,0 +1,148 @@ +use crate::job::{ArcJob, StackJob}; +use crate::registry::{Registry, WorkerThread}; +use crate::scope::ScopeLatch; +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + +mod test; + +/// Executes `op` within every thread in the current threadpool. If this is +/// called from a non-Rayon thread, it will execute in the global threadpool. +/// Any attempts to use `join`, `scope`, or parallel iterators will then operate +/// within that threadpool. When the call has completed on each thread, returns +/// a vector containing all of their return values. +/// +/// For more information, see the [`ThreadPool::broadcast()`][m] method. +/// +/// [m]: struct.ThreadPool.html#method.broadcast +pub fn broadcast(op: OP) -> Vec +where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, +{ + // We assert that current registry has not terminated. + unsafe { broadcast_in(op, &Registry::current()) } +} + +/// Spawns an asynchronous task on every thread in this thread-pool. This task +/// will run in the implicit, global scope, which means that it may outlast the +/// current stack frame -- therefore, it cannot capture any references onto the +/// stack (you will likely need a `move` closure). +/// +/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method. +/// +/// [m]: struct.ThreadPool.html#method.spawn_broadcast +pub fn spawn_broadcast(op: OP) +where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_broadcast_in(op, &Registry::current()) } +} + +/// Provides context to a closure called by `broadcast`. +pub struct BroadcastContext<'a> { + worker: &'a WorkerThread, + + /// Make sure to prevent auto-traits like `Send` and `Sync`. + _marker: PhantomData<&'a mut dyn Fn()>, +} + +impl<'a> BroadcastContext<'a> { + pub(super) fn with(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + f(BroadcastContext { + worker: unsafe { &*worker_thread }, + _marker: PhantomData, + }) + } + + /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`). + #[inline] + pub fn index(&self) -> usize { + self.worker.index() + } + + /// The number of threads receiving the broadcast in the thread pool. + /// + /// # Future compatibility note + /// + /// Future versions of Rayon might vary the number of threads over time, but + /// this method will always return the number of threads which are actually + /// receiving your particular `broadcast` call. + #[inline] + pub fn num_threads(&self) -> usize { + self.worker.registry().num_threads() + } +} + +impl<'a> fmt::Debug for BroadcastContext<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BroadcastContext") + .field("index", &self.index()) + .field("num_threads", &self.num_threads()) + .field("pool_id", &self.worker.registry().id()) + .finish() + } +} + +/// Execute `op` on every thread in the pool. It will be executed on each +/// thread when they have nothing else to do locally, before they try to +/// steal work from other threads. This function will not return until all +/// threads have completed the `op`. +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn broadcast_in(op: OP, registry: &Arc) -> Vec +where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, +{ + let f = move |injected: bool| { + debug_assert!(injected); + BroadcastContext::with(&op) + }; + + let n_threads = registry.num_threads(); + let current_thread = WorkerThread::current().as_ref(); + let latch = ScopeLatch::with_count(n_threads, current_thread); + let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect(); + let job_refs = jobs.iter().map(|job| job.as_job_ref()); + + registry.inject_broadcast(job_refs); + + // Wait for all jobs to complete, then collect the results, maybe propagating a panic. + latch.wait(current_thread); + jobs.into_iter().map(|job| job.into_result()).collect() +} + +/// Execute `op` on every thread in the pool. It will be executed on each +/// thread when they have nothing else to do locally, before they try to +/// steal work from other threads. This function returns immediately after +/// injecting the jobs. +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_broadcast_in(op: OP, registry: &Arc) +where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, +{ + let job = ArcJob::new({ + let registry = Arc::clone(registry); + move || { + registry.catch_unwind(|| BroadcastContext::with(&op)); + registry.terminate(); // (*) permit registry to terminate now + } + }); + + let n_threads = registry.num_threads(); + let job_refs = (0..n_threads).map(|_| { + // Ensure that registry cannot terminate until this job has executed + // on each thread. This ref is decremented at the (*) above. + registry.increment_terminate_count(); + + ArcJob::as_static_job_ref(&job) + }); + + registry.inject_broadcast(job_refs); +} diff --git a/third_party/rust/rayon-core/src/broadcast/test.rs b/third_party/rust/rayon-core/src/broadcast/test.rs new file mode 100755 index 0000000000..a765cb034b --- /dev/null +++ b/third_party/rust/rayon-core/src/broadcast/test.rs @@ -0,0 +1,216 @@ +#![cfg(test)] + +use crate::ThreadPoolBuilder; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::{thread, time}; + +#[test] +fn broadcast_global() { + let v = crate::broadcast(|ctx| ctx.index()); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +fn spawn_broadcast_global() { + let (tx, rx) = crossbeam_channel::unbounded(); + crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +fn broadcast_pool() { + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let v = pool.broadcast(|ctx| ctx.index()); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +fn spawn_broadcast_pool() { + let (tx, rx) = crossbeam_channel::unbounded(); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +fn broadcast_self() { + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +fn spawn_broadcast_self() { + let (tx, rx) = crossbeam_channel::unbounded(); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +fn broadcast_mutual() { + let count = AtomicUsize::new(0); + let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.install(|| { + pool2.broadcast(|_| { + pool1.broadcast(|_| { + count.fetch_add(1, Ordering::Relaxed); + }) + }) + }); + assert_eq!(count.into_inner(), 3 * 7); +} + +#[test] +fn spawn_broadcast_mutual() { + let (tx, rx) = crossbeam_channel::unbounded(); + let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.spawn({ + let pool1 = Arc::clone(&pool1); + move || { + pool2.spawn_broadcast(move |_| { + let tx = tx.clone(); + pool1.spawn_broadcast(move |_| tx.send(()).unwrap()) + }) + } + }); + assert_eq!(rx.into_iter().count(), 3 * 7); +} + +#[test] +fn broadcast_mutual_sleepy() { + let count = AtomicUsize::new(0); + let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.install(|| { + thread::sleep(time::Duration::from_secs(1)); + pool2.broadcast(|_| { + thread::sleep(time::Duration::from_secs(1)); + pool1.broadcast(|_| { + thread::sleep(time::Duration::from_millis(100)); + count.fetch_add(1, Ordering::Relaxed); + }) + }) + }); + assert_eq!(count.into_inner(), 3 * 7); +} + +#[test] +fn spawn_broadcast_mutual_sleepy() { + let (tx, rx) = crossbeam_channel::unbounded(); + let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.spawn({ + let pool1 = Arc::clone(&pool1); + move || { + thread::sleep(time::Duration::from_secs(1)); + pool2.spawn_broadcast(move |_| { + let tx = tx.clone(); + thread::sleep(time::Duration::from_secs(1)); + pool1.spawn_broadcast(move |_| { + thread::sleep(time::Duration::from_millis(100)); + tx.send(()).unwrap(); + }) + }) + } + }); + assert_eq!(rx.into_iter().count(), 3 * 7); +} + +#[test] +fn broadcast_panic_one() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.broadcast(|ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }) + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +fn spawn_broadcast_panic_one() { + let (tx, rx) = crossbeam_channel::unbounded(); + let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let pool = ThreadPoolBuilder::new() + .num_threads(7) + .panic_handler(move |e| panic_tx.send(e).unwrap()) + .build() + .unwrap(); + pool.spawn_broadcast(move |ctx| { + tx.send(()).unwrap(); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }); + drop(pool); // including panic_tx + assert_eq!(rx.into_iter().count(), 7); + assert_eq!(panic_rx.into_iter().count(), 1); +} + +#[test] +fn broadcast_panic_many() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.broadcast(|ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }) + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +fn spawn_broadcast_panic_many() { + let (tx, rx) = crossbeam_channel::unbounded(); + let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let pool = ThreadPoolBuilder::new() + .num_threads(7) + .panic_handler(move |e| panic_tx.send(e).unwrap()) + .build() + .unwrap(); + pool.spawn_broadcast(move |ctx| { + tx.send(()).unwrap(); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }); + drop(pool); // including panic_tx + assert_eq!(rx.into_iter().count(), 7); + assert_eq!(panic_rx.into_iter().count(), 4); +} + +#[test] +fn broadcast_sleep_race() { + let test_duration = time::Duration::from_secs(1); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let start = time::Instant::now(); + while start.elapsed() < test_duration { + pool.broadcast(|ctx| { + // A slight spread of sleep duration increases the chance that one + // of the threads will race in the pool's idle sleep afterward. + thread::sleep(time::Duration::from_micros(ctx.index() as u64)); + }); + } +} diff --git a/third_party/rust/rayon-core/src/compile_fail/mod.rs b/third_party/rust/rayon-core/src/compile_fail/mod.rs new file mode 100644 index 0000000000..f2ec646a4d --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/mod.rs @@ -0,0 +1,7 @@ +// These modules contain `compile_fail` doc tests. +mod quicksort_race1; +mod quicksort_race2; +mod quicksort_race3; +mod rc_return; +mod rc_upvar; +mod scope_join_bad; diff --git a/third_party/rust/rayon-core/src/compile_fail/quicksort_race1.rs b/third_party/rust/rayon-core/src/compile_fail/quicksort_race1.rs new file mode 100644 index 0000000000..5615033895 --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/quicksort_race1.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0524 + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/third_party/rust/rayon-core/src/compile_fail/quicksort_race2.rs b/third_party/rust/rayon-core/src/compile_fail/quicksort_race2.rs new file mode 100644 index 0000000000..020589c29a --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/quicksort_race2.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0500 + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/third_party/rust/rayon-core/src/compile_fail/quicksort_race3.rs b/third_party/rust/rayon-core/src/compile_fail/quicksort_race3.rs new file mode 100644 index 0000000000..16fbf3b824 --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/quicksort_race3.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0524 + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (_lo, hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/third_party/rust/rayon-core/src/compile_fail/rc_return.rs b/third_party/rust/rayon-core/src/compile_fail/rc_return.rs new file mode 100644 index 0000000000..93e3a60384 --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/rc_return.rs @@ -0,0 +1,17 @@ +/** ```compile_fail,E0277 + +use std::rc::Rc; + +rayon_core::join(|| Rc::new(22), || ()); //~ ERROR + +``` */ +mod left {} + +/** ```compile_fail,E0277 + +use std::rc::Rc; + +rayon_core::join(|| (), || Rc::new(23)); //~ ERROR + +``` */ +mod right {} diff --git a/third_party/rust/rayon-core/src/compile_fail/rc_upvar.rs b/third_party/rust/rayon-core/src/compile_fail/rc_upvar.rs new file mode 100644 index 0000000000..d8aebcfcbf --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/rc_upvar.rs @@ -0,0 +1,9 @@ +/*! ```compile_fail,E0277 + +use std::rc::Rc; + +let r = Rc::new(22); +rayon_core::join(|| r.clone(), || r.clone()); +//~^ ERROR + +``` */ diff --git a/third_party/rust/rayon-core/src/compile_fail/scope_join_bad.rs b/third_party/rust/rayon-core/src/compile_fail/scope_join_bad.rs new file mode 100644 index 0000000000..75e4c5ca6c --- /dev/null +++ b/third_party/rust/rayon-core/src/compile_fail/scope_join_bad.rs @@ -0,0 +1,24 @@ +/*! ```compile_fail,E0373 + +fn bad_scope(f: F) + where F: FnOnce(&i32) + Send, +{ + rayon_core::scope(|s| { + let x = 22; + s.spawn(|_| f(&x)); //~ ERROR `x` does not live long enough + }); +} + +fn good_scope(f: F) + where F: FnOnce(&i32) + Send, +{ + let x = 22; + rayon_core::scope(|s| { + s.spawn(|_| f(&x)); + }); +} + +fn main() { +} + +``` */ diff --git a/third_party/rust/rayon-core/src/job.rs b/third_party/rust/rayon-core/src/job.rs new file mode 100644 index 0000000000..b7a3dae18b --- /dev/null +++ b/third_party/rust/rayon-core/src/job.rs @@ -0,0 +1,264 @@ +use crate::latch::Latch; +use crate::unwind; +use crossbeam_deque::{Injector, Steal}; +use std::any::Any; +use std::cell::UnsafeCell; +use std::mem; +use std::sync::Arc; + +pub(super) enum JobResult { + None, + Ok(T), + Panic(Box), +} + +/// A `Job` is used to advertise work for other threads that they may +/// want to steal. In accordance with time honored tradition, jobs are +/// arranged in a deque, so that thieves can take from the top of the +/// deque while the main worker manages the bottom of the deque. This +/// deque is managed by the `thread_pool` module. +pub(super) trait Job { + /// Unsafe: this may be called from a different thread than the one + /// which scheduled the job, so the implementer must ensure the + /// appropriate traits are met, whether `Send`, `Sync`, or both. + unsafe fn execute(this: *const ()); +} + +/// Effectively a Job trait object. Each JobRef **must** be executed +/// exactly once, or else data may leak. +/// +/// Internally, we store the job's data in a `*const ()` pointer. The +/// true type is something like `*const StackJob<...>`, but we hide +/// it. We also carry the "execute fn" from the `Job` trait. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(super) struct JobRef { + pointer: *const (), + execute_fn: unsafe fn(*const ()), +} + +unsafe impl Send for JobRef {} +unsafe impl Sync for JobRef {} + +impl JobRef { + /// Unsafe: caller asserts that `data` will remain valid until the + /// job is executed. + pub(super) unsafe fn new(data: *const T) -> JobRef + where + T: Job, + { + // erase types: + JobRef { + pointer: data as *const (), + execute_fn: ::execute, + } + } + + #[inline] + pub(super) unsafe fn execute(self) { + (self.execute_fn)(self.pointer) + } +} + +/// A job that will be owned by a stack slot. This means that when it +/// executes it need not free any heap data, the cleanup occurs when +/// the stack frame is later popped. The function parameter indicates +/// `true` if the job was stolen -- executed on a different thread. +pub(super) struct StackJob +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) latch: L, + func: UnsafeCell>, + result: UnsafeCell>, +} + +impl StackJob +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) fn new(func: F, latch: L) -> StackJob { + StackJob { + latch, + func: UnsafeCell::new(Some(func)), + result: UnsafeCell::new(JobResult::None), + } + } + + pub(super) unsafe fn as_job_ref(&self) -> JobRef { + JobRef::new(self) + } + + pub(super) unsafe fn run_inline(self, stolen: bool) -> R { + self.func.into_inner().unwrap()(stolen) + } + + pub(super) unsafe fn into_result(self) -> R { + self.result.into_inner().into_return_value() + } +} + +impl Job for StackJob +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + unsafe fn execute(this: *const ()) { + let this = &*(this as *const Self); + let abort = unwind::AbortIfPanic; + let func = (*this.func.get()).take().unwrap(); + (*this.result.get()) = JobResult::call(func); + this.latch.set(); + mem::forget(abort); + } +} + +/// Represents a job stored in the heap. Used to implement +/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply +/// invokes a closure, which then triggers the appropriate logic to +/// signal that the job executed. +/// +/// (Probably `StackJob` should be refactored in a similar fashion.) +pub(super) struct HeapJob +where + BODY: FnOnce() + Send, +{ + job: BODY, +} + +impl HeapJob +where + BODY: FnOnce() + Send, +{ + pub(super) fn new(job: BODY) -> Box { + Box::new(HeapJob { job }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn into_job_ref(self: Box) -> JobRef { + JobRef::new(Box::into_raw(self)) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn into_static_job_ref(self: Box) -> JobRef + where + BODY: 'static, + { + unsafe { self.into_job_ref() } + } +} + +impl Job for HeapJob +where + BODY: FnOnce() + Send, +{ + unsafe fn execute(this: *const ()) { + let this = Box::from_raw(this as *mut Self); + (this.job)(); + } +} + +/// Represents a job stored in an `Arc` -- like `HeapJob`, but may +/// be turned into multiple `JobRef`s and called multiple times. +pub(super) struct ArcJob +where + BODY: Fn() + Send + Sync, +{ + job: BODY, +} + +impl ArcJob +where + BODY: Fn() + Send + Sync, +{ + pub(super) fn new(job: BODY) -> Arc { + Arc::new(ArcJob { job }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn as_job_ref(this: &Arc) -> JobRef { + JobRef::new(Arc::into_raw(Arc::clone(this))) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn as_static_job_ref(this: &Arc) -> JobRef + where + BODY: 'static, + { + unsafe { Self::as_job_ref(this) } + } +} + +impl Job for ArcJob +where + BODY: Fn() + Send + Sync, +{ + unsafe fn execute(this: *const ()) { + let this = Arc::from_raw(this as *mut Self); + (this.job)(); + } +} + +impl JobResult { + fn call(func: impl FnOnce(bool) -> T) -> Self { + match unwind::halt_unwinding(|| func(true)) { + Ok(x) => JobResult::Ok(x), + Err(x) => JobResult::Panic(x), + } + } + + /// Convert the `JobResult` for a job that has finished (and hence + /// its JobResult is populated) into its return value. + /// + /// NB. This will panic if the job panicked. + pub(super) fn into_return_value(self) -> T { + match self { + JobResult::None => unreachable!(), + JobResult::Ok(x) => x, + JobResult::Panic(x) => unwind::resume_unwinding(x), + } + } +} + +/// Indirect queue to provide FIFO job priority. +pub(super) struct JobFifo { + inner: Injector, +} + +impl JobFifo { + pub(super) fn new() -> Self { + JobFifo { + inner: Injector::new(), + } + } + + pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { + // A little indirection ensures that spawns are always prioritized in FIFO order. The + // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front + // (FIFO), but either way they will end up popping from the front of this queue. + self.inner.push(job_ref); + JobRef::new(self) + } +} + +impl Job for JobFifo { + unsafe fn execute(this: *const ()) { + // We "execute" a queue by executing its first job, FIFO. + let this = &*(this as *const Self); + loop { + match this.inner.steal() { + Steal::Success(job_ref) => break job_ref.execute(), + Steal::Empty => panic!("FIFO is empty"), + Steal::Retry => {} + } + } + } +} diff --git a/third_party/rust/rayon-core/src/join/mod.rs b/third_party/rust/rayon-core/src/join/mod.rs new file mode 100644 index 0000000000..d72c7e61c7 --- /dev/null +++ b/third_party/rust/rayon-core/src/join/mod.rs @@ -0,0 +1,187 @@ +use crate::job::StackJob; +use crate::latch::SpinLatch; +use crate::registry::{self, WorkerThread}; +use crate::unwind; +use std::any::Any; + +use crate::FnContext; + +#[cfg(test)] +mod test; + +/// Takes two closures and *potentially* runs them in parallel. It +/// returns a pair of the results from those closures. +/// +/// Conceptually, calling `join()` is similar to spawning two threads, +/// one executing each of the two closures. However, the +/// implementation is quite different and incurs very low +/// overhead. The underlying technique is called "work stealing": the +/// Rayon runtime uses a fixed pool of worker threads and attempts to +/// only execute code in parallel when there are idle CPUs to handle +/// it. +/// +/// When `join` is called from outside the thread pool, the calling +/// thread will block while the closures execute in the pool. When +/// `join` is called within the pool, the calling thread still actively +/// participates in the thread pool. It will begin by executing closure +/// A (on the current thread). While it is doing that, it will advertise +/// closure B as being available for other threads to execute. Once closure A +/// has completed, the current thread will try to execute closure B; +/// if however closure B has been stolen, then it will look for other work +/// while waiting for the thief to fully execute closure B. (This is the +/// typical work-stealing strategy). +/// +/// # Examples +/// +/// This example uses join to perform a quick-sort (note this is not a +/// particularly optimized implementation: if you **actually** want to +/// sort for real, you should prefer [the `par_sort` method] offered +/// by Rayon). +/// +/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort +/// +/// ```rust +/// # use rayon_core as rayon; +/// let mut v = vec![5, 1, 8, 22, 0, 44]; +/// quick_sort(&mut v); +/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]); +/// +/// fn quick_sort(v: &mut [T]) { +/// if v.len() > 1 { +/// let mid = partition(v); +/// let (lo, hi) = v.split_at_mut(mid); +/// rayon::join(|| quick_sort(lo), +/// || quick_sort(hi)); +/// } +/// } +/// +/// // Partition rearranges all items `<=` to the pivot +/// // item (arbitrary selected to be the last item in the slice) +/// // to the first half of the slice. It then returns the +/// // "dividing point" where the pivot is placed. +/// fn partition(v: &mut [T]) -> usize { +/// let pivot = v.len() - 1; +/// let mut i = 0; +/// for j in 0..pivot { +/// if v[j] <= v[pivot] { +/// v.swap(i, j); +/// i += 1; +/// } +/// } +/// v.swap(i, pivot); +/// i +/// } +/// ``` +/// +/// # Warning about blocking I/O +/// +/// The assumption is that the closures given to `join()` are +/// CPU-bound tasks that do not perform I/O or other blocking +/// operations. If you do perform I/O, and that I/O should block +/// (e.g., waiting for a network request), the overall performance may +/// be poor. Moreover, if you cause one closure to be blocked waiting +/// on another (for example, using a channel), that could lead to a +/// deadlock. +/// +/// # Panics +/// +/// No matter what happens, both closures will always be executed. If +/// a single closure panics, whether it be the first or second +/// closure, that panic will be propagated and hence `join()` will +/// panic with the same panic value. If both closures panic, `join()` +/// will panic with the panic value from the first closure. +pub fn join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, +{ + #[inline] + fn call(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R { + move |_| f() + } + + join_context(call(oper_a), call(oper_b)) +} + +/// Identical to `join`, except that the closures have a parameter +/// that provides context for the way the closure has been called, +/// especially indicating whether they're executing on a different +/// thread than where `join_context` was called. This will occur if +/// the second job is stolen by a different thread, or if +/// `join_context` was called from outside the thread pool to begin +/// with. +pub fn join_context(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce(FnContext) -> RA + Send, + B: FnOnce(FnContext) -> RB + Send, + RA: Send, + RB: Send, +{ + #[inline] + fn call_a(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { + move || f(FnContext::new(injected)) + } + + #[inline] + fn call_b(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { + move |migrated| f(FnContext::new(migrated)) + } + + registry::in_worker(|worker_thread, injected| unsafe { + // Create virtual wrapper for task b; this all has to be + // done here so that the stack frame can keep it all live + // long enough. + let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); + let job_b_ref = job_b.as_job_ref(); + worker_thread.push(job_b_ref); + + // Execute task a; hopefully b gets stolen in the meantime. + let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); + let result_a = match status_a { + Ok(v) => v, + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), + }; + + // Now that task A has finished, try to pop job B from the + // local stack. It may already have been popped by job A; it + // may also have been stolen. There may also be some tasks + // pushed on top of it in the stack, and we will have to pop + // those off to get to it. + while !job_b.latch.probe() { + if let Some(job) = worker_thread.take_local_job() { + if job == job_b_ref { + // Found it! Let's run it. + // + // Note that this could panic, but it's ok if we unwind here. + let result_b = job_b.run_inline(injected); + return (result_a, result_b); + } else { + worker_thread.execute(job); + } + } else { + // Local deque is empty. Time to steal from other + // threads. + worker_thread.wait_until(&job_b.latch); + debug_assert!(job_b.latch.probe()); + break; + } + } + + (result_a, job_b.into_result()) + }) +} + +/// If job A panics, we still cannot return until we are sure that job +/// B is complete. This is because it may contain references into the +/// enclosing stack frame(s). +#[cold] // cold path +unsafe fn join_recover_from_panic( + worker_thread: &WorkerThread, + job_b_latch: &SpinLatch<'_>, + err: Box, +) -> ! { + worker_thread.wait_until(job_b_latch); + unwind::resume_unwinding(err) +} diff --git a/third_party/rust/rayon-core/src/join/test.rs b/third_party/rust/rayon-core/src/join/test.rs new file mode 100644 index 0000000000..e7f287f6fc --- /dev/null +++ b/third_party/rust/rayon-core/src/join/test.rs @@ -0,0 +1,145 @@ +//! Tests for the join code. + +use crate::join::*; +use crate::unwind; +use crate::ThreadPoolBuilder; +use rand::distributions::Standard; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, hi) = v.split_at_mut(mid); + join(|| quick_sort(lo), || quick_sort(hi)); +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn seeded_rng() -> XorShiftRng { + let mut seed = ::Seed::default(); + (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); + XorShiftRng::from_seed(seed) +} + +#[test] +fn sort() { + let rng = seeded_rng(); + let mut data: Vec = rng.sample_iter(&Standard).take(6 * 1024).collect(); + let mut sorted_data = data.clone(); + sorted_data.sort(); + quick_sort(&mut data); + assert_eq!(data, sorted_data); +} + +#[test] +fn sort_in_pool() { + let rng = seeded_rng(); + let mut data: Vec = rng.sample_iter(&Standard).take(12 * 1024).collect(); + + let pool = ThreadPoolBuilder::new().build().unwrap(); + let mut sorted_data = data.clone(); + sorted_data.sort(); + pool.install(|| quick_sort(&mut data)); + assert_eq!(data, sorted_data); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_a() { + join(|| panic!("Hello, world!"), || ()); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_b() { + join(|| (), || panic!("Hello, world!")); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_both() { + join(|| panic!("Hello, world!"), || panic!("Goodbye, world!")); +} + +#[test] +fn panic_b_still_executes() { + let mut x = false; + match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) { + Ok(_) => panic!("failed to propagate panic from closure A,"), + Err(_) => assert!(x, "closure b failed to execute"), + } +} + +#[test] +fn join_context_both() { + // If we're not in a pool, both should be marked stolen as they're injected. + let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated()); + assert!(a_migrated); + assert!(b_migrated); +} + +#[test] +fn join_context_neither() { + // If we're already in a 1-thread pool, neither job should be stolen. + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (a_migrated, b_migrated) = + pool.install(|| join_context(|a| a.migrated(), |b| b.migrated())); + assert!(!a_migrated); + assert!(!b_migrated); +} + +#[test] +fn join_context_second() { + use std::sync::Barrier; + + // If we're already in a 2-thread pool, the second job should be stolen. + let barrier = Barrier::new(2); + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let (a_migrated, b_migrated) = pool.install(|| { + join_context( + |a| { + barrier.wait(); + a.migrated() + }, + |b| { + barrier.wait(); + b.migrated() + }, + ) + }); + assert!(!a_migrated); + assert!(b_migrated); +} + +#[test] +fn join_counter_overflow() { + const MAX: u32 = 500_000; + + let mut i = 0; + let mut j = 0; + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + + // Hammer on join a bunch of times -- used to hit overflow debug-assertions + // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797 + for _ in 0..MAX { + pool.join(|| i += 1, || j += 1); + } + + assert_eq!(i, MAX); + assert_eq!(j, MAX); +} diff --git a/third_party/rust/rayon-core/src/latch.rs b/third_party/rust/rayon-core/src/latch.rs new file mode 100644 index 0000000000..0909293741 --- /dev/null +++ b/third_party/rust/rayon-core/src/latch.rs @@ -0,0 +1,380 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::usize; + +use crate::registry::{Registry, WorkerThread}; + +/// We define various kinds of latches, which are all a primitive signaling +/// mechanism. A latch starts as false. Eventually someone calls `set()` and +/// it becomes true. You can test if it has been set by calling `probe()`. +/// +/// Some kinds of latches, but not all, support a `wait()` operation +/// that will wait until the latch is set, blocking efficiently. That +/// is not part of the trait since it is not possibly to do with all +/// latches. +/// +/// The intention is that `set()` is called once, but `probe()` may be +/// called any number of times. Once `probe()` returns true, the memory +/// effects that occurred before `set()` become visible. +/// +/// It'd probably be better to refactor the API into two paired types, +/// but that's a bit of work, and this is not a public API. +/// +/// ## Memory ordering +/// +/// Latches need to guarantee two things: +/// +/// - Once `probe()` returns true, all memory effects from the `set()` +/// are visible (in other words, the set should synchronize-with +/// the probe). +/// - Once `set()` occurs, the next `probe()` *will* observe it. This +/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep +/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. +pub(super) trait Latch { + /// Set the latch, signalling others. + /// + /// # WARNING + /// + /// Setting a latch triggers other threads to wake up and (in some + /// cases) complete. This may, in turn, cause memory to be + /// allocated and so forth. One must be very careful about this, + /// and it's typically better to read all the fields you will need + /// to access *before* a latch is set! + fn set(&self); +} + +pub(super) trait AsCoreLatch { + fn as_core_latch(&self) -> &CoreLatch; +} + +/// Latch is not set, owning thread is awake +const UNSET: usize = 0; + +/// Latch is not set, owning thread is going to sleep on this latch +/// (but has not yet fallen asleep). +const SLEEPY: usize = 1; + +/// Latch is not set, owning thread is asleep on this latch and +/// must be awoken. +const SLEEPING: usize = 2; + +/// Latch is set. +const SET: usize = 3; + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +#[derive(Debug)] +pub(super) struct CoreLatch { + state: AtomicUsize, +} + +impl CoreLatch { + #[inline] + fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// Returns the address of this core latch as an integer. Used + /// for logging. + #[inline] + pub(super) fn addr(&self) -> usize { + self as *const CoreLatch as usize + } + + /// Invoked by owning thread as it prepares to sleep. Returns true + /// if the owning thread may proceed to fall asleep, false if the + /// latch was set in the meantime. + #[inline] + pub(super) fn get_sleepy(&self) -> bool { + self.state + .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn fall_asleep(&self) -> bool { + self.state + .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn wake_up(&self) { + if !self.probe() { + let _ = + self.state + .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); + } + } + + /// Set the latch. If this returns true, the owning thread was sleeping + /// and must be awoken. + /// + /// This is private because, typically, setting a latch involves + /// doing some wakeups; those are encapsulated in the surrounding + /// latch code. + #[inline] + fn set(&self) -> bool { + let old_state = self.state.swap(SET, Ordering::AcqRel); + old_state == SLEEPING + } + + /// Test if this latch has been set. + #[inline] + pub(super) fn probe(&self) -> bool { + self.state.load(Ordering::Acquire) == SET + } +} + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +pub(super) struct SpinLatch<'r> { + core_latch: CoreLatch, + registry: &'r Arc, + target_worker_index: usize, + cross: bool, +} + +impl<'r> SpinLatch<'r> { + /// Creates a new spin latch that is owned by `thread`. This means + /// that `thread` is the only thread that should be blocking on + /// this latch -- it also means that when the latch is set, we + /// will wake `thread` if it is sleeping. + #[inline] + pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + core_latch: CoreLatch::new(), + registry: thread.registry(), + target_worker_index: thread.index(), + cross: false, + } + } + + /// Creates a new spin latch for cross-threadpool blocking. Notably, we + /// need to make sure the registry is kept alive after setting, so we can + /// safely call the notification. + #[inline] + pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + cross: true, + ..SpinLatch::new(thread) + } + } + + #[inline] + pub(super) fn probe(&self) -> bool { + self.core_latch.probe() + } +} + +impl<'r> AsCoreLatch for SpinLatch<'r> { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +impl<'r> Latch for SpinLatch<'r> { + #[inline] + fn set(&self) { + let cross_registry; + + let registry: &Registry = if self.cross { + // Ensure the registry stays alive while we notify it. + // Otherwise, it would be possible that we set the spin + // latch and the other thread sees it and exits, causing + // the registry to be deallocated, all before we get a + // chance to invoke `registry.notify_worker_latch_is_set`. + cross_registry = Arc::clone(self.registry); + &cross_registry + } else { + // If this is not a "cross-registry" spin-latch, then the + // thread which is performing `set` is itself ensuring + // that the registry stays alive. However, that doesn't + // include this *particular* `Arc` handle if the waiting + // thread then exits, so we must completely dereference it. + self.registry + }; + let target_worker_index = self.target_worker_index; + + // NOTE: Once we `set`, the target may proceed and invalidate `&self`! + if self.core_latch.set() { + // Subtle: at this point, we can no longer read from + // `self`, because the thread owning this spin latch may + // have awoken and deallocated the latch. Therefore, we + // only use fields whose values we already read. + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +/// A Latch starts as false and eventually becomes true. You can block +/// until it becomes true. +#[derive(Debug)] +pub(super) struct LockLatch { + m: Mutex, + v: Condvar, +} + +impl LockLatch { + #[inline] + pub(super) fn new() -> LockLatch { + LockLatch { + m: Mutex::new(false), + v: Condvar::new(), + } + } + + /// Block until latch is set, then resets this lock latch so it can be reused again. + pub(super) fn wait_and_reset(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + *guard = false; + } + + /// Block until latch is set. + pub(super) fn wait(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + } +} + +impl Latch for LockLatch { + #[inline] + fn set(&self) { + let mut guard = self.m.lock().unwrap(); + *guard = true; + self.v.notify_all(); + } +} + +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. +/// +/// Note: like a `SpinLatch`, count laches are always associated with +/// some registry that is probing them, which must be tickled when +/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a +/// reference to that registry. This is because in some cases the +/// registry owns the count-latch, and that would create a cycle. So a +/// `CountLatch` must be given a reference to its owning registry when +/// it is set. For this reason, it does not implement the `Latch` +/// trait (but it doesn't have to, as it is not used in those generic +/// contexts). +#[derive(Debug)] +pub(super) struct CountLatch { + core_latch: CoreLatch, + counter: AtomicUsize, +} + +impl CountLatch { + #[inline] + pub(super) fn new() -> CountLatch { + Self::with_count(1) + } + + #[inline] + pub(super) fn with_count(n: usize) -> CountLatch { + CountLatch { + core_latch: CoreLatch::new(), + counter: AtomicUsize::new(n), + } + } + + #[inline] + pub(super) fn increment(&self) { + debug_assert!(!self.core_latch.probe()); + self.counter.fetch_add(1, Ordering::Relaxed); + } + + /// Decrements the latch counter by one. If this is the final + /// count, then the latch is **set**, and calls to `probe()` will + /// return true. Returns whether the latch was set. + #[inline] + pub(super) fn set(&self) -> bool { + if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { + self.core_latch.set(); + true + } else { + false + } + } + + /// Decrements the latch counter by one and possibly set it. If + /// the latch is set, then the specific worker thread is tickled, + /// which should be the one that owns this latch. + #[inline] + pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) { + if self.set() { + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +impl AsCoreLatch for CountLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +#[derive(Debug)] +pub(super) struct CountLockLatch { + lock_latch: LockLatch, + counter: AtomicUsize, +} + +impl CountLockLatch { + #[inline] + pub(super) fn with_count(n: usize) -> CountLockLatch { + CountLockLatch { + lock_latch: LockLatch::new(), + counter: AtomicUsize::new(n), + } + } + + #[inline] + pub(super) fn increment(&self) { + let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); + debug_assert!(old_counter != 0); + } + + pub(super) fn wait(&self) { + self.lock_latch.wait(); + } +} + +impl Latch for CountLockLatch { + #[inline] + fn set(&self) { + if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { + self.lock_latch.set(); + } + } +} + +impl<'a, L> Latch for &'a L +where + L: Latch, +{ + #[inline] + fn set(&self) { + L::set(self); + } +} diff --git a/third_party/rust/rayon-core/src/lib.rs b/third_party/rust/rayon-core/src/lib.rs new file mode 100644 index 0000000000..b31a2d7e0e --- /dev/null +++ b/third_party/rust/rayon-core/src/lib.rs @@ -0,0 +1,819 @@ +//! Rayon-core houses the core stable APIs of Rayon. +//! +//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there. +//! +//! [`join`] is used to take two closures and potentially run them in parallel. +//! - It will run in parallel if task B gets stolen before task A can finish. +//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B. +//! +//! [`scope`] creates a scope in which you can run any number of parallel tasks. +//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed. +//! The scope will exist until all tasks spawned within the scope have been completed. +//! +//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function. +//! +//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one. +//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque, +//! where it becomes available for work stealing from other threads in the local threadpool. +//! +//! [`join`]: fn.join.html +//! [`scope`]: fn.scope.html +//! [`scope()`]: fn.scope.html +//! [`spawn`]: fn.spawn.html +//! [`ThreadPool`]: struct.threadpool.html +//! [`install()`]: struct.ThreadPool.html#method.install +//! [`spawn()`]: struct.ThreadPool.html#method.spawn +//! [`join()`]: struct.ThreadPool.html#method.join +//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +//! +//! ## Restricting multiple versions +//! +//! In order to ensure proper coordination between threadpools, and especially +//! to make sure there's only one global threadpool, `rayon-core` is actively +//! restricted from building multiple versions of itself into a single target. +//! You may see a build error like this in violation: +//! +//! ```text +//! error: native library `rayon-core` is being linked to by more +//! than one package, and can only be linked to by one package +//! ``` +//! +//! While we strive to keep `rayon-core` semver-compatible, it's still +//! possible to arrive at this situation if different crates have overly +//! restrictive tilde or inequality requirements for `rayon-core`. The +//! conflicting requirements will need to be resolved before the build will +//! succeed. + +#![deny(missing_debug_implementations)] +#![deny(missing_docs)] +#![deny(unreachable_pub)] +#![warn(rust_2018_idioms)] + +use std::any::Any; +use std::env; +use std::error::Error; +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::str::FromStr; + +#[macro_use] +mod log; +#[macro_use] +mod private; + +mod broadcast; +mod job; +mod join; +mod latch; +mod registry; +mod scope; +mod sleep; +mod spawn; +mod thread_pool; +mod unwind; + +mod compile_fail; +mod test; + +pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; +pub use self::join::{join, join_context}; +pub use self::registry::ThreadBuilder; +pub use self::scope::{in_place_scope, scope, Scope}; +pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; +pub use self::spawn::{spawn, spawn_fifo}; +pub use self::thread_pool::current_thread_has_pending_tasks; +pub use self::thread_pool::current_thread_index; +pub use self::thread_pool::ThreadPool; + +use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; + +/// Returns the maximum number of threads that Rayon supports in a single thread-pool. +/// +/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting +/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum. +/// +/// The value may vary between different targets, and is subject to change in new Rayon versions. +pub fn max_num_threads() -> usize { + // We are limited by the bits available in the sleep counter's `AtomicUsize`. + crate::sleep::THREADS_MAX +} + +/// Returns the number of threads in the current registry. If this +/// code is executing within a Rayon thread-pool, then this will be +/// the number of threads for the thread-pool of the current +/// thread. Otherwise, it will be the number of threads for the global +/// thread-pool. +/// +/// This can be useful when trying to judge how many times to split +/// parallel work (the parallel iterator traits use this value +/// internally for this purpose). +/// +/// # Future compatibility note +/// +/// Note that unless this thread-pool was created with a +/// builder that specifies the number of threads, then this +/// number may vary over time in future versions (see [the +/// `num_threads()` method for details][snt]). +/// +/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads +pub fn current_num_threads() -> usize { + crate::registry::Registry::current_num_threads() +} + +/// Error when initializing a thread pool. +#[derive(Debug)] +pub struct ThreadPoolBuildError { + kind: ErrorKind, +} + +#[derive(Debug)] +enum ErrorKind { + GlobalPoolAlreadyInitialized, + IOError(io::Error), +} + +/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool. +/// ## Creating a ThreadPool +/// The following creates a thread pool with 22 threads. +/// +/// ```rust +/// # use rayon_core as rayon; +/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap(); +/// ``` +/// +/// To instead configure the global thread pool, use [`build_global()`]: +/// +/// ```rust +/// # use rayon_core as rayon; +/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); +/// ``` +/// +/// [`ThreadPool`]: struct.ThreadPool.html +/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global +pub struct ThreadPoolBuilder { + /// The number of threads in the rayon thread pool. + /// If zero will use the RAYON_NUM_THREADS environment variable. + /// If RAYON_NUM_THREADS is invalid or zero will use the default. + num_threads: usize, + + /// Custom closure, if any, to handle a panic that we cannot propagate + /// anywhere else. + panic_handler: Option>, + + /// Closure to compute the name of a thread. + get_thread_name: Option String>>, + + /// The stack size for the created worker threads + stack_size: Option, + + /// Closure invoked on worker thread start. + start_handler: Option>, + + /// Closure invoked on worker thread exit. + exit_handler: Option>, + + /// Closure invoked to spawn threads. + spawn_handler: S, + + /// If false, worker threads will execute spawned jobs in a + /// "depth-first" fashion. If true, they will do a "breadth-first" + /// fashion. Depth-first is the default. + breadth_first: bool, +} + +/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. +/// +/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +#[deprecated(note = "Use `ThreadPoolBuilder`")] +#[derive(Default)] +pub struct Configuration { + builder: ThreadPoolBuilder, +} + +/// The type for a panic handling closure. Note that this same closure +/// may be invoked multiple times in parallel. +type PanicHandler = dyn Fn(Box) + Send + Sync; + +/// The type for a closure that gets invoked when a thread starts. The +/// closure is passed the index of the thread on which it is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type StartHandler = dyn Fn(usize) + Send + Sync; + +/// The type for a closure that gets invoked when a thread exits. The +/// closure is passed the index of the thread on which is is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type ExitHandler = dyn Fn(usize) + Send + Sync; + +// NB: We can't `#[derive(Default)]` because `S` is left ambiguous. +impl Default for ThreadPoolBuilder { + fn default() -> Self { + ThreadPoolBuilder { + num_threads: 0, + panic_handler: None, + get_thread_name: None, + stack_size: None, + start_handler: None, + exit_handler: None, + spawn_handler: DefaultSpawn, + breadth_first: false, + } + } +} + +impl ThreadPoolBuilder { + /// Creates and returns a valid rayon thread pool builder, but does not initialize it. + pub fn new() -> Self { + Self::default() + } +} + +/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the +/// default spawn and those set by [`spawn_handler`](#method.spawn_handler). +impl ThreadPoolBuilder +where + S: ThreadSpawn, +{ + /// Creates a new `ThreadPool` initialized using this configuration. + pub fn build(self) -> Result { + ThreadPool::build(self) + } + + /// Initializes the global thread pool. This initialization is + /// **optional**. If you do not call this function, the thread pool + /// will be automatically initialized with the default + /// configuration. Calling `build_global` is not recommended, except + /// in two scenarios: + /// + /// - You wish to change the default configuration. + /// - You are running a benchmark, in which case initializing may + /// yield slightly more consistent results, since the worker threads + /// will already be ready to go even in the first iteration. But + /// this cost is minimal. + /// + /// Initialization of the global thread pool happens exactly + /// once. Once started, the configuration cannot be + /// changed. Therefore, if you call `build_global` a second time, it + /// will return an error. An `Ok` result indicates that this + /// is the first initialization of the thread pool. + pub fn build_global(self) -> Result<(), ThreadPoolBuildError> { + let registry = registry::init_global_registry(self)?; + registry.wait_until_primed(); + Ok(()) + } +} + +impl ThreadPoolBuilder { + /// Creates a scoped `ThreadPool` initialized using this configuration. + /// + /// This is a convenience function for building a pool using [`crossbeam::scope`] + /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). + /// The threads in this pool will start by calling `wrapper`, which should + /// do initialization and continue by calling `ThreadBuilder::run()`. + /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// + /// # Examples + /// + /// A scoped pool may be useful in combination with scoped thread-local variables. + /// + /// ``` + /// # use rayon_core as rayon; + /// + /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec); + /// + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool_data = vec![1, 2, 3]; + /// + /// // We haven't assigned any TLS data yet. + /// assert!(!POOL_DATA.is_set()); + /// + /// rayon::ThreadPoolBuilder::new() + /// .build_scoped( + /// // Borrow `pool_data` in TLS for each thread. + /// |thread| POOL_DATA.set(&pool_data, || thread.run()), + /// // Do some work that needs the TLS data. + /// |pool| pool.install(|| assert!(POOL_DATA.is_set())), + /// )?; + /// + /// // Once we've returned, `pool_data` is no longer borrowed. + /// drop(pool_data); + /// Ok(()) + /// } + /// ``` + pub fn build_scoped(self, wrapper: W, with_pool: F) -> Result + where + W: Fn(ThreadBuilder) + Sync, // expected to call `run()` + F: FnOnce(&ThreadPool) -> R, + { + let result = crossbeam_utils::thread::scope(|scope| { + let wrapper = &wrapper; + let pool = self + .spawn_handler(|thread| { + let mut builder = scope.builder(); + if let Some(name) = thread.name() { + builder = builder.name(name.to_string()); + } + if let Some(size) = thread.stack_size() { + builder = builder.stack_size(size); + } + builder.spawn(move |_| wrapper(thread))?; + Ok(()) + }) + .build()?; + Ok(with_pool(&pool)) + }); + + match result { + Ok(result) => result, + Err(err) => unwind::resume_unwinding(err), + } + } +} + +impl ThreadPoolBuilder { + /// Sets a custom function for spawning threads. + /// + /// Note that the threads will not exit until after the pool is dropped. It + /// is up to the caller to wait for thread termination if that is important + /// for any invariants. For instance, threads created in [`crossbeam::scope`] + /// will be joined before that scope returns, and this will block indefinitely + /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate + /// until the entire process exits! + /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// + /// # Examples + /// + /// A minimal spawn handler just needs to call `run()` from an independent thread. + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// std::thread::spawn(|| thread.run()); + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my custom thread!")); + /// Ok(()) + /// } + /// ``` + /// + /// The default spawn handler sets the name and stack size if given, and propagates + /// any errors from the thread builder. + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// let mut b = std::thread::Builder::new(); + /// if let Some(name) = thread.name() { + /// b = b.name(name.to_owned()); + /// } + /// if let Some(stack_size) = thread.stack_size() { + /// b = b.stack_size(stack_size); + /// } + /// b.spawn(|| thread.run())?; + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my fully custom thread!")); + /// Ok(()) + /// } + /// ``` + /// + /// This can also be used for a pool of scoped threads like [`crossbeam::scope`], + /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in + /// [`build_scoped`](#method.build_scoped). + /// + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// std::thread::scope(|scope| { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// let mut builder = std::thread::Builder::new(); + /// if let Some(name) = thread.name() { + /// builder = builder.name(name.to_string()); + /// } + /// if let Some(size) = thread.stack_size() { + /// builder = builder.stack_size(size); + /// } + /// builder.spawn_scoped(scope, || { + /// // Add any scoped initialization here, then run! + /// thread.run() + /// })?; + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my custom scoped thread!")); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn spawn_handler(self, spawn: F) -> ThreadPoolBuilder> + where + F: FnMut(ThreadBuilder) -> io::Result<()>, + { + ThreadPoolBuilder { + spawn_handler: CustomSpawn::new(spawn), + // ..self + num_threads: self.num_threads, + panic_handler: self.panic_handler, + get_thread_name: self.get_thread_name, + stack_size: self.stack_size, + start_handler: self.start_handler, + exit_handler: self.exit_handler, + breadth_first: self.breadth_first, + } + } + + /// Returns a reference to the current spawn handler. + fn get_spawn_handler(&mut self) -> &mut S { + &mut self.spawn_handler + } + + /// Get the number of threads that will be used for the thread + /// pool. See `num_threads()` for more information. + fn get_num_threads(&self) -> usize { + if self.num_threads > 0 { + self.num_threads + } else { + match env::var("RAYON_NUM_THREADS") + .ok() + .and_then(|s| usize::from_str(&s).ok()) + { + Some(x) if x > 0 => return x, + Some(x) if x == 0 => return num_cpus::get(), + _ => {} + } + + // Support for deprecated `RAYON_RS_NUM_CPUS`. + match env::var("RAYON_RS_NUM_CPUS") + .ok() + .and_then(|s| usize::from_str(&s).ok()) + { + Some(x) if x > 0 => x, + _ => num_cpus::get(), + } + } + } + + /// Get the thread name for the thread with the given index. + fn get_thread_name(&mut self, index: usize) -> Option { + let f = self.get_thread_name.as_mut()?; + Some(f(index)) + } + + /// Sets a closure which takes a thread index and returns + /// the thread's name. + pub fn thread_name(mut self, closure: F) -> Self + where + F: FnMut(usize) -> String + 'static, + { + self.get_thread_name = Some(Box::new(closure)); + self + } + + /// Sets the number of threads to be used in the rayon threadpool. + /// + /// If you specify a non-zero number of threads using this + /// function, then the resulting thread-pools are guaranteed to + /// start at most this number of threads. + /// + /// If `num_threads` is 0, or you do not call this function, then + /// the Rayon runtime will select the number of threads + /// automatically. At present, this is based on the + /// `RAYON_NUM_THREADS` environment variable (if set), + /// or the number of logical CPUs (otherwise). + /// In the future, however, the default behavior may + /// change to dynamically add or remove threads as needed. + /// + /// **Future compatibility warning:** Given the default behavior + /// may change in the future, if you wish to rely on a fixed + /// number of threads, you should use this function to specify + /// that number. To reproduce the current default behavior, you + /// may wish to use the [`num_cpus` + /// crate](https://crates.io/crates/num_cpus) to query the number + /// of CPUs dynamically. + /// + /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one + /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment + /// variable. If both variables are specified, `RAYON_NUM_THREADS` will + /// be preferred. + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = num_threads; + self + } + + /// Returns a copy of the current panic handler. + fn take_panic_handler(&mut self) -> Option> { + self.panic_handler.take() + } + + /// Normally, whenever Rayon catches a panic, it tries to + /// propagate it to someplace sensible, to try and reflect the + /// semantics of sequential execution. But in some cases, + /// particularly with the `spawn()` APIs, there is no + /// obvious place where we should propagate the panic to. + /// In that case, this panic handler is invoked. + /// + /// If no panic handler is set, the default is to abort the + /// process, under the principle that panics should not go + /// unobserved. + /// + /// If the panic handler itself panics, this will abort the + /// process. To prevent this, wrap the body of your panic handler + /// in a call to `std::panic::catch_unwind()`. + pub fn panic_handler(mut self, panic_handler: H) -> Self + where + H: Fn(Box) + Send + Sync + 'static, + { + self.panic_handler = Some(Box::new(panic_handler)); + self + } + + /// Get the stack size of the worker threads + fn get_stack_size(&self) -> Option { + self.stack_size + } + + /// Sets the stack size of the worker threads + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// **(DEPRECATED)** Suggest to worker threads that they execute + /// spawned jobs in a "breadth-first" fashion. + /// + /// Typically, when a worker thread is idle or blocked, it will + /// attempt to execute the job from the *top* of its local deque of + /// work (i.e., the job most recently spawned). If this flag is set + /// to true, however, workers will prefer to execute in a + /// *breadth-first* fashion -- that is, they will search for jobs at + /// the *bottom* of their local deque. (At present, workers *always* + /// steal from the bottom of other workers' deques, regardless of + /// the setting of this flag.) + /// + /// If you think of the tasks as a tree, where a parent task + /// spawns its children in the tree, then this flag loosely + /// corresponds to doing a breadth-first traversal of the tree, + /// whereas the default would be to do a depth-first traversal. + /// + /// **Note that this is an "execution hint".** Rayon's task + /// execution is highly dynamic and the precise order in which + /// independent tasks are executed is not intended to be + /// guaranteed. + /// + /// This `breadth_first()` method is now deprecated per [RFC #1], + /// and in the future its effect may be removed. Consider using + /// [`scope_fifo()`] for a similar effect. + /// + /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md + /// [`scope_fifo()`]: fn.scope_fifo.html + #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")] + pub fn breadth_first(mut self) -> Self { + self.breadth_first = true; + self + } + + fn get_breadth_first(&self) -> bool { + self.breadth_first + } + + /// Takes the current thread start callback, leaving `None`. + fn take_start_handler(&mut self) -> Option> { + self.start_handler.take() + } + + /// Sets a callback to be invoked on thread start. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + /// If that handler returns, then startup will continue normally. + pub fn start_handler(mut self, start_handler: H) -> Self + where + H: Fn(usize) + Send + Sync + 'static, + { + self.start_handler = Some(Box::new(start_handler)); + self + } + + /// Returns a current thread exit callback, leaving `None`. + fn take_exit_handler(&mut self) -> Option> { + self.exit_handler.take() + } + + /// Sets a callback to be invoked on thread exit. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + /// If that handler returns, then the thread will exit normally. + pub fn exit_handler(mut self, exit_handler: H) -> Self + where + H: Fn(usize) + Send + Sync + 'static, + { + self.exit_handler = Some(Box::new(exit_handler)); + self + } +} + +#[allow(deprecated)] +impl Configuration { + /// Creates and return a valid rayon thread pool configuration, but does not initialize it. + pub fn new() -> Configuration { + Configuration { + builder: ThreadPoolBuilder::new(), + } + } + + /// Deprecated in favor of `ThreadPoolBuilder::build`. + pub fn build(self) -> Result> { + self.builder.build().map_err(Box::from) + } + + /// Deprecated in favor of `ThreadPoolBuilder::thread_name`. + pub fn thread_name(mut self, closure: F) -> Self + where + F: FnMut(usize) -> String + 'static, + { + self.builder = self.builder.thread_name(closure); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::num_threads`. + pub fn num_threads(mut self, num_threads: usize) -> Configuration { + self.builder = self.builder.num_threads(num_threads); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`. + pub fn panic_handler(mut self, panic_handler: H) -> Configuration + where + H: Fn(Box) + Send + Sync + 'static, + { + self.builder = self.builder.panic_handler(panic_handler); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::stack_size`. + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.builder = self.builder.stack_size(stack_size); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`. + pub fn breadth_first(mut self) -> Self { + self.builder = self.builder.breadth_first(); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::start_handler`. + pub fn start_handler(mut self, start_handler: H) -> Configuration + where + H: Fn(usize) + Send + Sync + 'static, + { + self.builder = self.builder.start_handler(start_handler); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`. + pub fn exit_handler(mut self, exit_handler: H) -> Configuration + where + H: Fn(usize) + Send + Sync + 'static, + { + self.builder = self.builder.exit_handler(exit_handler); + self + } + + /// Returns a ThreadPoolBuilder with identical parameters. + fn into_builder(self) -> ThreadPoolBuilder { + self.builder + } +} + +impl ThreadPoolBuildError { + fn new(kind: ErrorKind) -> ThreadPoolBuildError { + ThreadPoolBuildError { kind } + } +} + +const GLOBAL_POOL_ALREADY_INITIALIZED: &str = + "The global thread pool has already been initialized."; + +impl Error for ThreadPoolBuildError { + #[allow(deprecated)] + fn description(&self) -> &str { + match self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, + ErrorKind::IOError(ref e) => e.description(), + } + } + + fn source(&self) -> Option<&(dyn Error + 'static)> { + match &self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => None, + ErrorKind::IOError(e) => Some(e), + } + } +} + +impl fmt::Display for ThreadPoolBuildError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), + ErrorKind::IOError(e) => e.fmt(f), + } + } +} + +/// Deprecated in favor of `ThreadPoolBuilder::build_global`. +#[deprecated(note = "use `ThreadPoolBuilder::build_global`")] +#[allow(deprecated)] +pub fn initialize(config: Configuration) -> Result<(), Box> { + config.into_builder().build_global().map_err(Box::from) +} + +impl fmt::Debug for ThreadPoolBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ThreadPoolBuilder { + ref num_threads, + ref get_thread_name, + ref panic_handler, + ref stack_size, + ref start_handler, + ref exit_handler, + spawn_handler: _, + ref breadth_first, + } = *self; + + // Just print `Some()` or `None` to the debug + // output. + struct ClosurePlaceholder; + impl fmt::Debug for ClosurePlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); + let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); + let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + + f.debug_struct("ThreadPoolBuilder") + .field("num_threads", num_threads) + .field("get_thread_name", &get_thread_name) + .field("panic_handler", &panic_handler) + .field("stack_size", &stack_size) + .field("start_handler", &start_handler) + .field("exit_handler", &exit_handler) + .field("breadth_first", &breadth_first) + .finish() + } +} + +#[allow(deprecated)] +impl fmt::Debug for Configuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.builder.fmt(f) + } +} + +/// Provides the calling context to a closure called by `join_context`. +#[derive(Debug)] +pub struct FnContext { + migrated: bool, + + /// disable `Send` and `Sync`, just for a little future-proofing. + _marker: PhantomData<*mut ()>, +} + +impl FnContext { + #[inline] + fn new(migrated: bool) -> Self { + FnContext { + migrated, + _marker: PhantomData, + } + } +} + +impl FnContext { + /// Returns `true` if the closure was called from a different thread + /// than it was provided from. + #[inline] + pub fn migrated(&self) -> bool { + self.migrated + } +} diff --git a/third_party/rust/rayon-core/src/log.rs b/third_party/rust/rayon-core/src/log.rs new file mode 100644 index 0000000000..7b6daf0ab6 --- /dev/null +++ b/third_party/rust/rayon-core/src/log.rs @@ -0,0 +1,413 @@ +//! Debug Logging +//! +//! To use in a debug build, set the env var `RAYON_LOG` as +//! described below. In a release build, logs are compiled out by +//! default unless Rayon is built with `--cfg rayon_rs_log` (try +//! `RUSTFLAGS="--cfg rayon_rs_log"`). +//! +//! Note that logs are an internally debugging tool and their format +//! is considered unstable, as are the details of how to enable them. +//! +//! # Valid values for RAYON_LOG +//! +//! The `RAYON_LOG` variable can take on the following values: +//! +//! * `tail:` -- dumps the last 10,000 events into the given file; +//! useful for tracking down deadlocks +//! * `profile:` -- dumps only those events needed to reconstruct how +//! many workers are active at a given time +//! * `all:` -- dumps every event to the file; useful for debugging + +use crossbeam_channel::{self, Receiver, Sender}; +use std::collections::VecDeque; +use std::env; +use std::fs::File; +use std::io::{self, BufWriter, Write}; + +/// True if logs are compiled in. +pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); + +#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] +pub(super) enum Event { + /// Flushes events to disk, used to terminate benchmarking. + Flush, + + /// Indicates that a worker thread started execution. + ThreadStart { + worker: usize, + terminate_addr: usize, + }, + + /// Indicates that a worker thread started execution. + ThreadTerminate { worker: usize }, + + /// Indicates that a worker thread became idle, blocked on `latch_addr`. + ThreadIdle { worker: usize, latch_addr: usize }, + + /// Indicates that an idle worker thread found work to do, after + /// yield rounds. It should no longer be considered idle. + ThreadFoundWork { worker: usize, yields: u32 }, + + /// Indicates that a worker blocked on a latch observed that it was set. + /// + /// Internal debugging event that does not affect the state + /// machine. + ThreadSawLatchSet { worker: usize, latch_addr: usize }, + + /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal + /// sleep state that we saw at the time. + ThreadSleepy { worker: usize, jobs_counter: usize }, + + /// Indicates that the thread's attempt to fall asleep was + /// interrupted because the latch was set. (This is not, in and of + /// itself, a change to the thread state.) + ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, + + /// Indicates that the thread's attempt to fall asleep was + /// interrupted because a job was posted. (This is not, in and of + /// itself, a change to the thread state.) + ThreadSleepInterruptedByJob { worker: usize }, + + /// Indicates that an idle worker has gone to sleep. + ThreadSleeping { worker: usize, latch_addr: usize }, + + /// Indicates that a sleeping worker has awoken. + ThreadAwoken { worker: usize, latch_addr: usize }, + + /// Indicates that the given worker thread was notified it should + /// awaken. + ThreadNotify { worker: usize }, + + /// The given worker has pushed a job to its local deque. + JobPushed { worker: usize }, + + /// The given worker has popped a job from its local deque. + JobPopped { worker: usize }, + + /// The given worker has stolen a job from the deque of another. + JobStolen { worker: usize, victim: usize }, + + /// N jobs were injected into the global queue. + JobsInjected { count: usize }, + + /// A job was removed from the global queue. + JobUninjected { worker: usize }, + + /// A job was broadcasted to N threads. + JobBroadcast { count: usize }, + + /// When announcing a job, this was the value of the counters we observed. + /// + /// No effect on thread state, just a debugging event. + JobThreadCounts { + worker: usize, + num_idle: u16, + num_sleepers: u16, + }, +} + +/// Handle to the logging thread, if any. You can use this to deliver +/// logs. You can also clone it freely. +#[derive(Clone)] +pub(super) struct Logger { + sender: Option>, +} + +impl Logger { + pub(super) fn new(num_workers: usize) -> Logger { + if !LOG_ENABLED { + return Self::disabled(); + } + + // see the doc comment for the format + let env_log = match env::var("RAYON_LOG") { + Ok(s) => s, + Err(_) => return Self::disabled(), + }; + + let (sender, receiver) = crossbeam_channel::unbounded(); + + if let Some(filename) = env_log.strip_prefix("tail:") { + let filename = filename.to_string(); + ::std::thread::spawn(move || { + Self::tail_logger_thread(num_workers, filename, 10_000, receiver) + }); + } else if env_log == "all" { + ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); + } else if let Some(filename) = env_log.strip_prefix("profile:") { + let filename = filename.to_string(); + ::std::thread::spawn(move || { + Self::profile_logger_thread(num_workers, filename, 10_000, receiver) + }); + } else { + panic!("RAYON_LOG should be 'tail:' or 'profile:'"); + } + + Logger { + sender: Some(sender), + } + } + + fn disabled() -> Logger { + Logger { sender: None } + } + + #[inline] + pub(super) fn log(&self, event: impl FnOnce() -> Event) { + if !LOG_ENABLED { + return; + } + + if let Some(sender) = &self.sender { + sender.send(event()).unwrap(); + } + } + + fn profile_logger_thread( + num_workers: usize, + log_filename: String, + capacity: usize, + receiver: Receiver, + ) { + let file = File::create(&log_filename) + .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); + + let mut writer = BufWriter::new(file); + let mut events = Vec::with_capacity(capacity); + let mut state = SimulatorState::new(num_workers); + let timeout = std::time::Duration::from_secs(30); + + loop { + while let Ok(event) = receiver.recv_timeout(timeout) { + if let Event::Flush = event { + break; + } + + events.push(event); + if events.len() == capacity { + break; + } + } + + for event in events.drain(..) { + if state.simulate(&event) { + state.dump(&mut writer, &event).unwrap(); + } + } + + writer.flush().unwrap(); + } + } + + fn tail_logger_thread( + num_workers: usize, + log_filename: String, + capacity: usize, + receiver: Receiver, + ) { + let file = File::create(&log_filename) + .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); + + let mut writer = BufWriter::new(file); + let mut events: VecDeque = VecDeque::with_capacity(capacity); + let mut state = SimulatorState::new(num_workers); + let timeout = std::time::Duration::from_secs(30); + let mut skipped = false; + + loop { + while let Ok(event) = receiver.recv_timeout(timeout) { + if let Event::Flush = event { + // We ignore Flush events in tail mode -- + // we're really just looking for + // deadlocks. + continue; + } else { + if events.len() == capacity { + let event = events.pop_front().unwrap(); + state.simulate(&event); + skipped = true; + } + + events.push_back(event); + } + } + + if skipped { + writeln!(writer, "...").unwrap(); + skipped = false; + } + + for event in events.drain(..) { + // In tail mode, we dump *all* events out, whether or + // not they were 'interesting' to the state machine. + state.simulate(&event); + state.dump(&mut writer, &event).unwrap(); + } + + writer.flush().unwrap(); + } + } + + fn all_logger_thread(num_workers: usize, receiver: Receiver) { + let stderr = std::io::stderr(); + let mut state = SimulatorState::new(num_workers); + + for event in receiver { + let mut writer = BufWriter::new(stderr.lock()); + state.simulate(&event); + state.dump(&mut writer, &event).unwrap(); + writer.flush().unwrap(); + } + } +} + +#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] +enum State { + Working, + Idle, + Notified, + Sleeping, + Terminated, +} + +impl State { + fn letter(&self) -> char { + match self { + State::Working => 'W', + State::Idle => 'I', + State::Notified => 'N', + State::Sleeping => 'S', + State::Terminated => 'T', + } + } +} + +struct SimulatorState { + local_queue_size: Vec, + thread_states: Vec, + injector_size: usize, +} + +impl SimulatorState { + fn new(num_workers: usize) -> Self { + Self { + local_queue_size: (0..num_workers).map(|_| 0).collect(), + thread_states: (0..num_workers).map(|_| State::Working).collect(), + injector_size: 0, + } + } + + fn simulate(&mut self, event: &Event) -> bool { + match *event { + Event::ThreadIdle { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Working); + self.thread_states[worker] = State::Idle; + true + } + + Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { + self.thread_states[worker] = State::Working; + true + } + + Event::ThreadTerminate { worker, .. } => { + self.thread_states[worker] = State::Terminated; + true + } + + Event::ThreadSleeping { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Idle); + self.thread_states[worker] = State::Sleeping; + true + } + + Event::ThreadAwoken { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Notified); + self.thread_states[worker] = State::Idle; + true + } + + Event::JobPushed { worker } => { + self.local_queue_size[worker] += 1; + true + } + + Event::JobPopped { worker } => { + self.local_queue_size[worker] -= 1; + true + } + + Event::JobStolen { victim, .. } => { + self.local_queue_size[victim] -= 1; + true + } + + Event::JobsInjected { count } => { + self.injector_size += count; + true + } + + Event::JobUninjected { .. } => { + self.injector_size -= 1; + true + } + + Event::ThreadNotify { worker } => { + // Currently, this log event occurs while holding the + // thread lock, so we should *always* see it before + // the worker awakens. + assert_eq!(self.thread_states[worker], State::Sleeping); + self.thread_states[worker] = State::Notified; + true + } + + // remaining events are no-ops from pov of simulating the + // thread state + _ => false, + } + } + + fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { + let num_idle_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Idle) + .count(); + + let num_sleeping_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Sleeping) + .count(); + + let num_notified_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Notified) + .count(); + + let num_pending_jobs: usize = self.local_queue_size.iter().sum(); + + write!(w, "{:2},", num_idle_threads)?; + write!(w, "{:2},", num_sleeping_threads)?; + write!(w, "{:2},", num_notified_threads)?; + write!(w, "{:4},", num_pending_jobs)?; + write!(w, "{:4},", self.injector_size)?; + + let event_str = format!("{:?}", event); + write!(w, r#""{:60}","#, event_str)?; + + for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { + write!(w, " T{:02},{}", i, state.letter(),)?; + + if *queue_size > 0 { + write!(w, ",{:03},", queue_size)?; + } else { + write!(w, ", ,")?; + } + } + + writeln!(w)?; + Ok(()) + } +} diff --git a/third_party/rust/rayon-core/src/private.rs b/third_party/rust/rayon-core/src/private.rs new file mode 100644 index 0000000000..c85e77b9cb --- /dev/null +++ b/third_party/rust/rayon-core/src/private.rs @@ -0,0 +1,26 @@ +//! The public parts of this private module are used to create traits +//! that cannot be implemented outside of our own crate. This way we +//! can feel free to extend those traits without worrying about it +//! being a breaking change for other implementations. + +/// If this type is pub but not publicly reachable, third parties +/// can't name it and can't implement traits using it. +#[allow(missing_debug_implementations)] +pub struct PrivateMarker; + +macro_rules! private_decl { + () => { + /// This trait is private; this method exists to make it + /// impossible to implement outside the crate. + #[doc(hidden)] + fn __rayon_private__(&self) -> crate::private::PrivateMarker; + }; +} + +macro_rules! private_impl { + () => { + fn __rayon_private__(&self) -> crate::private::PrivateMarker { + crate::private::PrivateMarker + } + }; +} diff --git a/third_party/rust/rayon-core/src/registry.rs b/third_party/rust/rayon-core/src/registry.rs new file mode 100644 index 0000000000..279e298d21 --- /dev/null +++ b/third_party/rust/rayon-core/src/registry.rs @@ -0,0 +1,970 @@ +use crate::job::{JobFifo, JobRef, StackJob}; +use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; +use crate::log::Event::*; +use crate::log::Logger; +use crate::sleep::Sleep; +use crate::unwind; +use crate::{ + ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, +}; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use std::cell::Cell; +use std::collections::hash_map::DefaultHasher; +use std::fmt; +use std::hash::Hasher; +use std::io; +use std::mem; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; +use std::thread; +use std::usize; + +/// Thread builder used for customization via +/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). +pub struct ThreadBuilder { + name: Option, + stack_size: Option, + worker: Worker, + stealer: Stealer, + registry: Arc, + index: usize, +} + +impl ThreadBuilder { + /// Gets the index of this thread in the pool, within `0..num_threads`. + pub fn index(&self) -> usize { + self.index + } + + /// Gets the string that was specified by `ThreadPoolBuilder::name()`. + pub fn name(&self) -> Option<&str> { + self.name.as_deref() + } + + /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. + pub fn stack_size(&self) -> Option { + self.stack_size + } + + /// Executes the main loop for this thread. This will not return until the + /// thread pool is dropped. + pub fn run(self) { + unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) } + } +} + +impl fmt::Debug for ThreadBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadBuilder") + .field("pool", &self.registry.id()) + .field("index", &self.index) + .field("name", &self.name) + .field("stack_size", &self.stack_size) + .finish() + } +} + +/// Generalized trait for spawning a thread in the `Registry`. +/// +/// This trait is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +pub trait ThreadSpawn { + private_decl! {} + + /// Spawn a thread with the `ThreadBuilder` parameters, and then + /// call `ThreadBuilder::run()`. + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; +} + +/// Spawns a thread in the "normal" way with `std::thread::Builder`. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug, Default)] +pub struct DefaultSpawn; + +impl ThreadSpawn for DefaultSpawn { + private_impl! {} + + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + let mut b = thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + b.spawn(|| thread.run())?; + Ok(()) + } +} + +/// Spawns a thread with a user's custom callback. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug)] +pub struct CustomSpawn(F); + +impl CustomSpawn +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + pub(super) fn new(spawn: F) -> Self { + CustomSpawn(spawn) + } +} + +impl ThreadSpawn for CustomSpawn +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + private_impl! {} + + #[inline] + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + (self.0)(thread) + } +} + +pub(super) struct Registry { + logger: Logger, + thread_infos: Vec, + sleep: Sleep, + injected_jobs: Injector, + broadcasts: Mutex>>, + panic_handler: Option>, + start_handler: Option>, + exit_handler: Option>, + + // When this latch reaches 0, it means that all work on this + // registry must be complete. This is ensured in the following ways: + // + // - if this is the global registry, there is a ref-count that never + // gets released. + // - if this is a user-created thread-pool, then so long as the thread-pool + // exists, it holds a reference. + // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, + // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't + // return until the blocking job is complete, that ref will continue to be held. + // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. + // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) + // and that job will keep the pool alive. + terminate_count: AtomicUsize, +} + +/// //////////////////////////////////////////////////////////////////////// +/// Initialization + +static mut THE_REGISTRY: Option> = None; +static THE_REGISTRY_SET: Once = Once::new(); + +/// Starts the worker threads (if that has not already happened). If +/// initialization has not already occurred, use the default +/// configuration. +pub(super) fn global_registry() -> &'static Arc { + set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) + .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) + .expect("The global thread pool has not been initialized.") +} + +/// Starts the worker threads (if that has not already happened) with +/// the given builder. +pub(super) fn init_global_registry( + builder: ThreadPoolBuilder, +) -> Result<&'static Arc, ThreadPoolBuildError> +where + S: ThreadSpawn, +{ + set_global_registry(|| Registry::new(builder)) +} + +/// Starts the worker threads (if that has not already happened) +/// by creating a registry with the given callback. +fn set_global_registry(registry: F) -> Result<&'static Arc, ThreadPoolBuildError> +where + F: FnOnce() -> Result, ThreadPoolBuildError>, +{ + let mut result = Err(ThreadPoolBuildError::new( + ErrorKind::GlobalPoolAlreadyInitialized, + )); + + THE_REGISTRY_SET.call_once(|| { + result = registry() + .map(|registry: Arc| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) + }); + + result +} + +struct Terminator<'a>(&'a Arc); + +impl<'a> Drop for Terminator<'a> { + fn drop(&mut self) { + self.0.terminate() + } +} + +impl Registry { + pub(super) fn new( + mut builder: ThreadPoolBuilder, + ) -> Result, ThreadPoolBuildError> + where + S: ThreadSpawn, + { + // Soft-limit the number of threads that we can actually support. + let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads()); + + let breadth_first = builder.get_breadth_first(); + + let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = if breadth_first { + Worker::new_fifo() + } else { + Worker::new_lifo() + }; + + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = Worker::new_fifo(); + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let logger = Logger::new(n_threads); + let registry = Arc::new(Registry { + logger: logger.clone(), + thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), + sleep: Sleep::new(logger, n_threads), + injected_jobs: Injector::new(), + broadcasts: Mutex::new(broadcasts), + terminate_count: AtomicUsize::new(1), + panic_handler: builder.take_panic_handler(), + start_handler: builder.take_start_handler(), + exit_handler: builder.take_exit_handler(), + }); + + // If we return early or panic, make sure to terminate existing threads. + let t1000 = Terminator(®istry); + + for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() { + let thread = ThreadBuilder { + name: builder.get_thread_name(index), + stack_size: builder.get_stack_size(), + registry: Arc::clone(®istry), + worker, + stealer, + index, + }; + if let Err(e) = builder.get_spawn_handler().spawn(thread) { + return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); + } + } + + // Returning normally now, without termination. + mem::forget(t1000); + + Ok(registry) + } + + pub(super) fn current() -> Arc { + unsafe { + let worker_thread = WorkerThread::current(); + let registry = if worker_thread.is_null() { + global_registry() + } else { + &(*worker_thread).registry + }; + Arc::clone(registry) + } + } + + /// Returns the number of threads in the current registry. This + /// is better than `Registry::current().num_threads()` because it + /// avoids incrementing the `Arc`. + pub(super) fn current_num_threads() -> usize { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + global_registry().num_threads() + } else { + (*worker_thread).registry.num_threads() + } + } + } + + /// Returns the current `WorkerThread` if it's part of this `Registry`. + pub(super) fn current_thread(&self) -> Option<&WorkerThread> { + unsafe { + let worker = WorkerThread::current().as_ref()?; + if worker.registry().id() == self.id() { + Some(worker) + } else { + None + } + } + } + + /// Returns an opaque identifier for this registry. + pub(super) fn id(&self) -> RegistryId { + // We can rely on `self` not to change since we only ever create + // registries that are boxed up in an `Arc` (see `new()` above). + RegistryId { + addr: self as *const Self as usize, + } + } + + #[inline] + pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { + self.logger.log(event) + } + + pub(super) fn num_threads(&self) -> usize { + self.thread_infos.len() + } + + pub(super) fn catch_unwind(&self, f: impl FnOnce()) { + if let Err(err) = unwind::halt_unwinding(f) { + // If there is no handler, or if that handler itself panics, then we abort. + let abort_guard = unwind::AbortIfPanic; + if let Some(ref handler) = self.panic_handler { + handler(err); + mem::forget(abort_guard); + } + } + } + + /// Waits for the worker threads to get up and running. This is + /// meant to be used for benchmarking purposes, primarily, so that + /// you can get more consistent numbers by having everything + /// "ready to go". + pub(super) fn wait_until_primed(&self) { + for info in &self.thread_infos { + info.primed.wait(); + } + } + + /// Waits for the worker threads to stop. This is used for testing + /// -- so we can check that termination actually works. + #[cfg(test)] + pub(super) fn wait_until_stopped(&self) { + for info in &self.thread_infos { + info.stopped.wait(); + } + } + + /// //////////////////////////////////////////////////////////////////////// + /// MAIN LOOP + /// + /// So long as all of the worker threads are hanging out in their + /// top-level loop, there is no work to be done. + + /// Push a job into the given `registry`. If we are running on a + /// worker thread for the registry, this will push onto the + /// deque. Else, it will inject from the outside (which is slower). + pub(super) fn inject_or_push(&self, job_ref: JobRef) { + let worker_thread = WorkerThread::current(); + unsafe { + if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { + (*worker_thread).push(job_ref); + } else { + self.inject(&[job_ref]); + } + } + } + + /// Push a job into the "external jobs" queue; it will be taken by + /// whatever worker has nothing to do. Use this if you know that + /// you are not on a worker of this registry. + pub(super) fn inject(&self, injected_jobs: &[JobRef]) { + self.log(|| JobsInjected { + count: injected_jobs.len(), + }); + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject() sees state.terminate as true" + ); + + let queue_was_empty = self.injected_jobs.is_empty(); + + for &job_ref in injected_jobs { + self.injected_jobs.push(job_ref); + } + + self.sleep + .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); + } + + fn has_injected_job(&self) -> bool { + !self.injected_jobs.is_empty() + } + + fn pop_injected_job(&self, worker_index: usize) -> Option { + loop { + match self.injected_jobs.steal() { + Steal::Success(job) => { + self.log(|| JobUninjected { + worker: worker_index, + }); + return Some(job); + } + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + /// Push a job into each thread's own "external jobs" queue; it will be + /// executed only on that thread, when it has nothing else to do locally, + /// before it tries to steal other work. + /// + /// **Panics** if not given exactly as many jobs as there are threads. + pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator) { + assert_eq!(self.num_threads(), injected_jobs.len()); + self.log(|| JobBroadcast { + count: self.num_threads(), + }); + { + let broadcasts = self.broadcasts.lock().unwrap(); + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject_broadcast()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject_broadcast() sees state.terminate as true" + ); + + assert_eq!(broadcasts.len(), injected_jobs.len()); + for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) { + worker.push(job_ref); + } + } + for i in 0..self.num_threads() { + self.sleep.notify_worker_latch_is_set(i); + } + } + + /// If already in a worker-thread of this registry, just execute `op`. + /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` + /// completes and return its return value. If `op` panics, that panic will + /// be propagated as well. The second argument indicates `true` if injection + /// was performed, `false` if executed directly. + pub(super) fn in_worker(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + self.in_worker_cold(op) + } else if (*worker_thread).registry().id() != self.id() { + self.in_worker_cross(&*worker_thread, op) + } else { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*worker_thread, false) + } + } + } + + #[cold] + unsafe fn in_worker_cold(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); + + LOCK_LATCH.with(|l| { + // This thread isn't a member of *any* thread pool, so just block. + debug_assert!(WorkerThread::current().is_null()); + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + l, + ); + self.inject(&[job.as_job_ref()]); + job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + + // flush accumulated logs as we exit the thread + self.logger.log(|| Flush); + + job.into_result() + }) + } + + #[cold] + unsafe fn in_worker_cross(&self, current_thread: &WorkerThread, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + // This thread is a member of a different pool, so let it process + // other work while waiting for this `op` to complete. + debug_assert!(current_thread.registry().id() != self.id()); + let latch = SpinLatch::cross(current_thread); + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + latch, + ); + self.inject(&[job.as_job_ref()]); + current_thread.wait_until(&job.latch); + job.into_result() + } + + /// Increments the terminate counter. This increment should be + /// balanced by a call to `terminate`, which will decrement. This + /// is used when spawning asynchronous work, which needs to + /// prevent the registry from terminating so long as it is active. + /// + /// Note that blocking functions such as `join` and `scope` do not + /// need to concern themselves with this fn; their context is + /// responsible for ensuring the current thread-pool will not + /// terminate until they return. + /// + /// The global thread-pool always has an outstanding reference + /// (the initial one). Custom thread-pools have one outstanding + /// reference that is dropped when the `ThreadPool` is dropped: + /// since installing the thread-pool blocks until any joins/scopes + /// complete, this ensures that joins/scopes are covered. + /// + /// The exception is `::spawn()`, which can create a job outside + /// of any blocking scope. In that case, the job itself holds a + /// terminate count and is responsible for invoking `terminate()` + /// when finished. + pub(super) fn increment_terminate_count(&self) { + let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); + debug_assert!(previous != 0, "registry ref count incremented from zero"); + assert!( + previous != std::usize::MAX, + "overflow in registry ref count" + ); + } + + /// Signals that the thread-pool which owns this registry has been + /// dropped. The worker threads will gradually terminate, once any + /// extant work is completed. + pub(super) fn terminate(&self) { + if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { + for (i, thread_info) in self.thread_infos.iter().enumerate() { + thread_info.terminate.set_and_tickle_one(self, i); + } + } + } + + /// Notify the worker that the latch they are sleeping on has been "set". + pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { + self.sleep.notify_worker_latch_is_set(target_worker_index); + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(super) struct RegistryId { + addr: usize, +} + +struct ThreadInfo { + /// Latch set once thread has started and we are entering into the + /// main loop. Used to wait for worker threads to become primed, + /// primarily of interest for benchmarking. + primed: LockLatch, + + /// Latch is set once worker thread has completed. Used to wait + /// until workers have stopped; only used for tests. + stopped: LockLatch, + + /// The latch used to signal that terminated has been requested. + /// This latch is *set* by the `terminate` method on the + /// `Registry`, once the registry's main "terminate" counter + /// reaches zero. + /// + /// NB. We use a `CountLatch` here because it has no lifetimes and is + /// meant for async use, but the count never gets higher than one. + terminate: CountLatch, + + /// the "stealer" half of the worker's deque + stealer: Stealer, +} + +impl ThreadInfo { + fn new(stealer: Stealer) -> ThreadInfo { + ThreadInfo { + primed: LockLatch::new(), + stopped: LockLatch::new(), + terminate: CountLatch::new(), + stealer, + } + } +} + +/// //////////////////////////////////////////////////////////////////////// +/// WorkerThread identifiers + +pub(super) struct WorkerThread { + /// the "worker" half of our local deque + worker: Worker, + + /// the "stealer" half of the worker's broadcast deque + stealer: Stealer, + + /// local queue used for `spawn_fifo` indirection + fifo: JobFifo, + + index: usize, + + /// A weak random number generator. + rng: XorShift64Star, + + registry: Arc, +} + +// This is a bit sketchy, but basically: the WorkerThread is +// allocated on the stack of the worker on entry and stored into this +// thread local variable. So it will remain valid at least until the +// worker is fully unwound. Using an unsafe pointer avoids the need +// for a RefCell etc. +thread_local! { + static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); +} + +impl Drop for WorkerThread { + fn drop(&mut self) { + // Undo `set_current` + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().eq(&(self as *const _))); + t.set(ptr::null()); + }); + } +} + +impl WorkerThread { + /// Gets the `WorkerThread` index for the current thread; returns + /// NULL if this is not a worker thread. This pointer is valid + /// anywhere on the current thread. + #[inline] + pub(super) fn current() -> *const WorkerThread { + WORKER_THREAD_STATE.with(Cell::get) + } + + /// Sets `self` as the worker thread index for the current thread. + /// This is done during worker thread startup. + unsafe fn set_current(thread: *const WorkerThread) { + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().is_null()); + t.set(thread); + }); + } + + /// Returns the registry that owns this worker thread. + #[inline] + pub(super) fn registry(&self) -> &Arc { + &self.registry + } + + #[inline] + pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { + self.registry.logger.log(event) + } + + /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). + #[inline] + pub(super) fn index(&self) -> usize { + self.index + } + + #[inline] + pub(super) unsafe fn push(&self, job: JobRef) { + self.log(|| JobPushed { worker: self.index }); + let queue_was_empty = self.worker.is_empty(); + self.worker.push(job); + self.registry + .sleep + .new_internal_jobs(self.index, 1, queue_was_empty); + } + + #[inline] + pub(super) unsafe fn push_fifo(&self, job: JobRef) { + self.push(self.fifo.push(job)); + } + + #[inline] + pub(super) fn local_deque_is_empty(&self) -> bool { + self.worker.is_empty() + } + + /// Attempts to obtain a "local" job -- typically this means + /// popping from the top of the stack, though if we are configured + /// for breadth-first execution, it would mean dequeuing from the + /// bottom. + #[inline] + pub(super) unsafe fn take_local_job(&self) -> Option { + let popped_job = self.worker.pop(); + + if popped_job.is_some() { + self.log(|| JobPopped { worker: self.index }); + return popped_job; + } + + loop { + match self.stealer.steal() { + Steal::Success(job) => return Some(job), + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + fn has_injected_job(&self) -> bool { + !self.stealer.is_empty() || self.registry.has_injected_job() + } + + /// Wait until the latch is set. Try to keep busy by popping and + /// stealing tasks as necessary. + #[inline] + pub(super) unsafe fn wait_until(&self, latch: &L) { + let latch = latch.as_core_latch(); + if !latch.probe() { + self.wait_until_cold(latch); + } + } + + #[cold] + unsafe fn wait_until_cold(&self, latch: &CoreLatch) { + // the code below should swallow all panics and hence never + // unwind; but if something does wrong, we want to abort, + // because otherwise other code in rayon may assume that the + // latch has been signaled, and that can lead to random memory + // accesses, which would be *very bad* + let abort_guard = unwind::AbortIfPanic; + + let mut idle_state = self.registry.sleep.start_looking(self.index, latch); + while !latch.probe() { + // Try to find some work to do. We give preference first + // to things in our local deque, then in other workers + // deques, and finally to injected jobs from the + // outside. The idea is to finish what we started before + // we take on something new. + if let Some(job) = self + .take_local_job() + .or_else(|| self.steal()) + .or_else(|| self.registry.pop_injected_job(self.index)) + { + self.registry.sleep.work_found(idle_state); + self.execute(job); + idle_state = self.registry.sleep.start_looking(self.index, latch); + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + } + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to + // wait. + self.registry.sleep.work_found(idle_state); + + self.log(|| ThreadSawLatchSet { + worker: self.index, + latch_addr: latch.addr(), + }); + mem::forget(abort_guard); // successful execution, do not abort + } + + #[inline] + pub(super) unsafe fn execute(&self, job: JobRef) { + job.execute(); + } + + /// Try to steal a single job and return it. + /// + /// This should only be done as a last resort, when there is no + /// local work to do. + unsafe fn steal(&self) -> Option { + // we only steal when we don't have any work to do locally + debug_assert!(self.local_deque_is_empty()); + + // otherwise, try to steal + let thread_infos = &self.registry.thread_infos.as_slice(); + let num_threads = thread_infos.len(); + if num_threads <= 1 { + return None; + } + + loop { + let mut retry = false; + let start = self.rng.next_usize(num_threads); + let job = (start..num_threads) + .chain(0..start) + .filter(move |&i| i != self.index) + .find_map(|victim_index| { + let victim = &thread_infos[victim_index]; + match victim.stealer.steal() { + Steal::Success(job) => { + self.log(|| JobStolen { + worker: self.index, + victim: victim_index, + }); + Some(job) + } + Steal::Empty => None, + Steal::Retry => { + retry = true; + None + } + } + }); + if job.is_some() || !retry { + return job; + } + } + } +} + +/// //////////////////////////////////////////////////////////////////////// + +unsafe fn main_loop( + worker: Worker, + stealer: Stealer, + registry: Arc, + index: usize, +) { + let worker_thread = &WorkerThread { + worker, + stealer, + fifo: JobFifo::new(), + index, + rng: XorShift64Star::new(), + registry, + }; + WorkerThread::set_current(worker_thread); + let registry = &*worker_thread.registry; + + // let registry know we are ready to do work + registry.thread_infos[index].primed.set(); + + // Worker threads should not panic. If they do, just abort, as the + // internal state of the threadpool is corrupted. Note that if + // **user code** panics, we should catch that and redirect. + let abort_guard = unwind::AbortIfPanic; + + // Inform a user callback that we started a thread. + if let Some(ref handler) = registry.start_handler { + registry.catch_unwind(|| handler(index)); + } + + let my_terminate_latch = ®istry.thread_infos[index].terminate; + worker_thread.log(|| ThreadStart { + worker: index, + terminate_addr: my_terminate_latch.as_core_latch().addr(), + }); + worker_thread.wait_until(my_terminate_latch); + + // Should not be any work left in our queue. + debug_assert!(worker_thread.take_local_job().is_none()); + + // let registry know we are done + registry.thread_infos[index].stopped.set(); + + // Normal termination, do not abort. + mem::forget(abort_guard); + + worker_thread.log(|| ThreadTerminate { worker: index }); + + // Inform a user callback that we exited a thread. + if let Some(ref handler) = registry.exit_handler { + registry.catch_unwind(|| handler(index)); + // We're already exiting the thread, there's nothing else to do. + } +} + +/// If already in a worker-thread, just execute `op`. Otherwise, +/// execute `op` in the default thread-pool. Either way, block until +/// `op` completes and return its return value. If `op` panics, that +/// panic will be propagated as well. The second argument indicates +/// `true` if injection was performed, `false` if executed directly. +pub(super) fn in_worker(op: OP) -> R +where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, +{ + unsafe { + let owner_thread = WorkerThread::current(); + if !owner_thread.is_null() { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*owner_thread, false) + } else { + global_registry().in_worker_cold(op) + } + } +} + +/// [xorshift*] is a fast pseudorandom number generator which will +/// even tolerate weak seeding, as long as it's not zero. +/// +/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* +struct XorShift64Star { + state: Cell, +} + +impl XorShift64Star { + fn new() -> Self { + // Any non-zero seed will do -- this uses the hash of a global counter. + let mut seed = 0; + while seed == 0 { + let mut hasher = DefaultHasher::new(); + static COUNTER: AtomicUsize = AtomicUsize::new(0); + hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); + seed = hasher.finish(); + } + + XorShift64Star { + state: Cell::new(seed), + } + } + + fn next(&self) -> u64 { + let mut x = self.state.get(); + debug_assert_ne!(x, 0); + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.state.set(x); + x.wrapping_mul(0x2545_f491_4f6c_dd1d) + } + + /// Return a value from `0..n`. + fn next_usize(&self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +} diff --git a/third_party/rust/rayon-core/src/scope/mod.rs b/third_party/rust/rayon-core/src/scope/mod.rs new file mode 100644 index 0000000000..25cda832e8 --- /dev/null +++ b/third_party/rust/rayon-core/src/scope/mod.rs @@ -0,0 +1,859 @@ +//! Methods for custom fork-join scopes, created by the [`scope()`] +//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. +//! +//! [`scope()`]: fn.scope.html +//! [`in_place_scope()`]: fn.in_place_scope.html +//! [`join()`]: ../join/join.fn.html + +use crate::broadcast::BroadcastContext; +use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; +use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; +use crate::unwind; +use std::any::Any; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::Arc; + +#[cfg(test)] +mod test; + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// See [`scope()`] for more information. +/// +///[`scope()`]: fn.scope.html +pub struct Scope<'scope> { + base: ScopeBase<'scope>, +} + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// Those spawned from the same thread are prioritized in relative FIFO order. +/// See [`scope_fifo()`] for more information. +/// +///[`scope_fifo()`]: fn.scope_fifo.html +pub struct ScopeFifo<'scope> { + base: ScopeBase<'scope>, + fifos: Vec, +} + +pub(super) enum ScopeLatch { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CountLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: CountLockLatch }, +} + +struct ScopeBase<'scope> { + /// thread registry where `scope()` was executed or where `in_place_scope()` + /// should spawn jobs. + registry: Arc, + + /// if some job panicked, the error is stored here; it will be + /// propagated to the one who created the scope + panic: AtomicPtr>, + + /// latch to track job counts + job_completed_latch: ScopeLatch, + + /// You can think of a scope as containing a list of closures to execute, + /// all of which outlive `'scope`. They're not actually required to be + /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because + /// the closures are only *moved* across threads to be executed. + marker: PhantomData) + Send + Sync + 'scope>>, +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// `scope()` is a more flexible building block compared to `join()`, +/// since a loop can be used to spawn any number of tasks without +/// recursing. However, that flexibility comes at a performance price: +/// tasks spawned using `scope()` must be allocated onto the heap, +/// whereas `join()` can make exclusive use of the stack. **Prefer +/// `join()` (or, even better, parallel iterators) where possible.** +/// +/// # Example +/// +/// The Rayon `join()` function launches two closures and waits for them +/// to stop. One could implement `join()` using a scope like so, although +/// it would be less efficient than the real implementation: +/// +/// ```rust +/// # use rayon_core as rayon; +/// pub fn join(oper_a: A, oper_b: B) -> (RA, RB) +/// where A: FnOnce() -> RA + Send, +/// B: FnOnce() -> RB + Send, +/// RA: Send, +/// RB: Send, +/// { +/// let mut result_a: Option = None; +/// let mut result_b: Option = None; +/// rayon::scope(|s| { +/// s.spawn(|_| result_a = Some(oper_a())); +/// s.spawn(|_| result_b = Some(oper_b())); +/// }); +/// (result_a.unwrap(), result_b.unwrap()) +/// } +/// ``` +/// +/// # A note on threading +/// +/// The closure given to `scope()` executes in the Rayon thread-pool, +/// as do those given to `spawn()`. This means that you can't access +/// thread-local variables (well, you can, but they may have +/// unexpected values). +/// +/// # Task execution +/// +/// Task execution potentially starts as soon as `spawn()` is called. +/// The task will end sometime before `scope()` returns. Note that the +/// *closure* given to scope may return much earlier. In general +/// the lifetime of a scope created like `scope(body)` goes something like this: +/// +/// - Scope begins when `scope(body)` is called +/// - Scope body `body()` is invoked +/// - Scope tasks may be spawned +/// - Scope body returns +/// - Scope tasks execute, possibly spawning more tasks +/// - Once all tasks are done, scope ends and `scope()` returns +/// +/// To see how and when tasks are joined, consider this example: +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope(|s| { +/// s.spawn(|s| { // task s.1 +/// s.spawn(|s| { // task s.1.1 +/// rayon::scope(|t| { +/// t.spawn(|_| ()); // task t.1 +/// t.spawn(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (scope `s` created) +/// +-----------------------------------------------+ (task s.2) +/// +-------+ (task s.1) | +/// | | | +/// | +---+ (task s.1.1) | +/// | | | | +/// | | | (scope `t` created) | +/// | | +----------------+ (task t.2) | +/// | | +---+ (task t.1) | | +/// | (mid) | | | | | +/// : | + <-+------------+ (scope `t` ends) | +/// : | | | +/// |<------+---+-----------------------------------+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// The point here is that everything spawned into scope `s` will +/// terminate (at latest) at the same point -- right before the +/// original call to `rayon::scope` returns. This includes new +/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new +/// scope is created (such as `t`), the things spawned into that scope +/// will be joined before that scope returns, which in turn occurs +/// before the creating task (task `s.1.1` in this case) finishes. +/// +/// There is no guaranteed order of execution for spawns in a scope, +/// given that other threads may steal tasks at any time. However, they +/// are generally prioritized in a LIFO order on the thread from which +/// they were spawned. So in this example, absent any stealing, we can +/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other +/// threads always steal from the other end of the deque, like FIFO +/// order. The idea is that "recent" tasks are most likely to be fresh +/// in the local CPU's cache, while other threads can steal older +/// "stale" tasks. For an alternate approach, consider +/// [`scope_fifo()`] instead. +/// +/// [`scope_fifo()`]: fn.scope_fifo.html +/// +/// # Accessing stack data +/// +/// In general, spawned tasks may access stack data in place that +/// outlives the scope itself. Other data must be fully owned by the +/// spawned task. +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // We can access `ok` because outlives the scope `s`. +/// println!("ok: {:?}", ok); +/// +/// // If we just try to use `bad` here, the closure will borrow `bad` +/// // (because we are just printing it out, and that only requires a +/// // borrow), which will result in a compilation error. Read on +/// // for options. +/// // println!("bad: {:?}", bad); +/// }); +/// }); +/// ``` +/// +/// As the comments example above suggest, to reference `bad` we must +/// take ownership of it. One way to do this is to detach the closure +/// from the surrounding stack frame, using the `move` keyword. This +/// will cause it to take ownership of *all* the variables it touches, +/// in this case including both `ok` *and* `bad`: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec = vec![4, 5, 6]; +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); +/// println!("bad: {:?}", bad); +/// }); +/// +/// // That closure is fine, but now we can't use `ok` anywhere else, +/// // since it is owned by the previous task: +/// // s.spawn(|_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// While this works, it could be a problem if we want to use `ok` elsewhere. +/// There are two choices. We can keep the closure as a `move` closure, but +/// instead of referencing the variable `ok`, we create a shadowed variable that +/// is a borrow of `ok` and capture *that*: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec = vec![4, 5, 6]; +/// let ok: &Vec = &ok; // shadow the original `ok` +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); // captures the shadowed version +/// println!("bad: {:?}", bad); +/// }); +/// +/// // Now we too can use the shadowed `ok`, since `&Vec` references +/// // can be shared freely. Note that we need a `move` closure here though, +/// // because otherwise we'd be trying to borrow the shadowed `ok`, +/// // and that doesn't outlive `scope`. +/// s.spawn(move |_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// Another option is not to use the `move` keyword but instead to take ownership +/// of individual variables: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // Transfer ownership of `bad` into a local variable (also named `bad`). +/// // This will force the closure to take ownership of `bad` from the environment. +/// let bad = bad; +/// println!("ok: {:?}", ok); // `ok` is only borrowed. +/// println!("bad: {:?}", bad); // refers to our local variable, above. +/// }); +/// +/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// }); +/// ``` +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = Scope::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// # Task execution +/// +/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a +/// difference in the order of execution. Consider a similar example: +/// +/// [`scope()`]: fn.scope.html +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope_fifo(|s| { +/// s.spawn_fifo(|s| { // task s.1 +/// s.spawn_fifo(|s| { // task s.1.1 +/// rayon::scope_fifo(|t| { +/// t.spawn_fifo(|_| ()); // task t.1 +/// t.spawn_fifo(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn_fifo(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (FIFO scope `s` created) +/// +--------------------+ (task s.1) +/// +-------+ (task s.2) | +/// | | +---+ (task s.1.1) +/// | | | | +/// | | | | (FIFO scope `t` created) +/// | | | +----------------+ (task t.1) +/// | | | +---+ (task t.2) | +/// | (mid) | | | | | +/// : | | + <-+------------+ (scope `t` ends) +/// : | | | +/// |<------+------------+---+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on +/// the thread from which they were spawned, as opposed to `scope()`'s +/// LIFO. So in this example, we can expect `s.1` to execute before +/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in +/// FIFO order, as usual. Overall, this has roughly the same order as +/// the now-deprecated [`breadth_first`] option, except the effect is +/// isolated to a particular scope. If spawns are intermingled from any +/// combination of `scope()` and `scope_fifo()`, or from different +/// threads, their order is only specified with respect to spawns in the +/// same scope and thread. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope_fifo()` or +/// in any of the spawned jobs, that panic will be propagated and the +/// call to `scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it +/// will execute, even if the spawning task should later panic. +/// `scope_fifo()` returns once all spawned jobs have completed, and any +/// panics are propagated at that point. +pub fn scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// This is just like `scope()` except the closure runs on the same thread +/// that calls `in_place_scope()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + do_in_place_scope(None, op) +} + +pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc>, op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = Scope::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// This is just like `scope_fifo()` except the closure runs on the same thread +/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope_fifo()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + do_in_place_scope_fifo(None, op) +} + +pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc>, op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = ScopeFifo::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +impl<'scope> Scope<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { + let base = ScopeBase::new(owner, registry); + Scope { base } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # Returns + /// + /// Nothing. The spawned closures cannot pass back values to the + /// caller directly, though they can write to local variables on + /// the stack (if those variables outlive the scope) or + /// communicate through shared channels. + /// + /// (The intention is to eventually integrate with Rust futures to + /// support spawns of functions that compute a value.) + /// + /// # Examples + /// + /// ```rust + /// # use rayon_core as rayon; + /// let mut value_a = None; + /// let mut value_b = None; + /// let mut value_c = None; + /// rayon::scope(|s| { + /// s.spawn(|s1| { + /// // ^ this is the same scope as `s`; this handle `s1` + /// // is intended for use by the spawned task, + /// // since scope handles cannot cross thread boundaries. + /// + /// value_a = Some(22); + /// + /// // the scope `s` will not end until all these tasks are done + /// s1.spawn(|_| { + /// value_b = Some(44); + /// }); + /// }); + /// + /// s.spawn(|_| { + /// value_c = Some(66); + /// }); + /// }); + /// assert_eq!(value_a, Some(22)); + /// assert_eq!(value_b, Some(44)); + /// assert_eq!(value_c, Some(66)); + /// ``` + /// + /// # See also + /// + /// The [`scope` function] has more extensive documentation about + /// task spawning. + /// + /// [`scope` function]: fn.scope.html + pub fn spawn(&self, body: BODY) + where + BODY: FnOnce(&Scope<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + scope.base.execute_job(move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an in-place scope. + self.base.registry.inject_or_push(job_ref); + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast(&self, body: BODY) + where + BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + scope.base.execute_job(func); + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeFifo<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { + let base = ScopeBase::new(owner, registry); + let num_threads = base.registry.num_threads(); + let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); + ScopeFifo { base, fifos } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # See also + /// + /// This method is akin to [`Scope::spawn()`], but with a FIFO + /// priority. The [`scope_fifo` function] has more details about + /// this distinction. + /// + /// [`Scope::spawn()`]: struct.Scope.html#method.spawn + /// [`scope_fifo` function]: fn.scope_fifo.html + pub fn spawn_fifo(&self, body: BODY) + where + BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + scope.base.execute_job(move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // If we're in the pool, use our scope's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match self.base.registry.current_thread() { + Some(worker) => { + let fifo = &self.fifos[worker.index()]; + // SAFETY: this job will execute before the scope ends. + unsafe { worker.push(fifo.push(job_ref)) }; + } + None => self.base.registry.inject(&[job_ref]), + } + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast(&self, body: BODY) + where + BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + scope.base.execute_job(func); + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeBase<'scope> { + /// Creates the base of a new scope for the given registry + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { + let registry = registry.unwrap_or_else(|| match owner { + Some(owner) => owner.registry(), + None => global_registry(), + }); + + ScopeBase { + registry: Arc::clone(registry), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: ScopeLatch::new(owner), + marker: PhantomData, + } + } + + fn increment(&self) { + self.job_completed_latch.increment(); + } + + fn heap_job_ref(&self, job: Box>) -> JobRef + where + FUNC: FnOnce() + Send + 'scope, + { + unsafe { + self.increment(); + job.into_job_ref() + } + } + + fn inject_broadcast(&self, job: Arc>) + where + FUNC: Fn() + Send + Sync + 'scope, + { + let n_threads = self.registry.num_threads(); + let job_refs = (0..n_threads).map(|_| unsafe { + self.increment(); + ArcJob::as_job_ref(&job) + }); + + self.registry.inject_broadcast(job_refs); + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + fn complete(&self, owner: Option<&WorkerThread>, func: FUNC) -> R + where + FUNC: FnOnce() -> R, + { + let result = self.execute_job_closure(func); + self.job_completed_latch.wait(owner); + self.maybe_propagate_panic(); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + fn execute_job(&self, func: FUNC) + where + FUNC: FnOnce(), + { + let _: Option<()> = self.execute_job_closure(func); + } + + /// Executes `func` as a job in scope. Adjusts the "job completed" + /// counters and also catches any panic and stores it into + /// `scope`. + fn execute_job_closure(&self, func: FUNC) -> Option + where + FUNC: FnOnce() -> R, + { + match unwind::halt_unwinding(func) { + Ok(r) => { + self.job_completed_latch.set(); + Some(r) + } + Err(err) => { + self.job_panicked(err); + self.job_completed_latch.set(); + None + } + } + } + + fn job_panicked(&self, err: Box) { + // capture the first error we see, free the rest + let nil = ptr::null_mut(); + let mut err = Box::new(err); // box up the fat ptr + if self + .panic + .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + mem::forget(err); // ownership now transferred into self.panic + } + } + + fn maybe_propagate_panic(&self) { + // propagate panic, if any occurred; at this point, all + // outstanding jobs have completed, so we can use a relaxed + // ordering: + let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); + if !panic.is_null() { + let value = unsafe { Box::from_raw(panic) }; + unwind::resume_unwinding(*value); + } + } +} + +impl ScopeLatch { + fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + match owner { + Some(owner) => ScopeLatch::Stealing { + latch: CountLatch::with_count(count), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => ScopeLatch::Blocking { + latch: CountLockLatch::with_count(count), + }, + } + } + + fn increment(&self) { + match self { + ScopeLatch::Stealing { latch, .. } => latch.increment(), + ScopeLatch::Blocking { latch } => latch.increment(), + } + } + + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + ScopeLatch::Blocking { latch } => latch.wait(), + } + } +} + +impl Latch for ScopeLatch { + fn set(&self) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => latch.set_and_tickle_one(registry, *worker_index), + ScopeLatch::Blocking { latch } => latch.set(), + } + } +} + +impl<'scope> fmt::Debug for Scope<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Scope") + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +impl<'scope> fmt::Debug for ScopeFifo<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ScopeFifo") + .field("num_fifos", &self.fifos.len()) + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +impl fmt::Debug for ScopeLatch { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ScopeLatch::Stealing { latch, .. } => fmt + .debug_tuple("ScopeLatch::Stealing") + .field(latch) + .finish(), + ScopeLatch::Blocking { latch } => fmt + .debug_tuple("ScopeLatch::Blocking") + .field(latch) + .finish(), + } + } +} + +/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. +/// +/// Unsafe code is still required to dereference the pointer, but that's fine in +/// scope jobs that are guaranteed to execute before the scope ends. +struct ScopePtr(*const T); + +// SAFETY: !Send for raw pointers is not for safety, just as a lint +unsafe impl Send for ScopePtr {} + +// SAFETY: !Sync for raw pointers is not for safety, just as a lint +unsafe impl Sync for ScopePtr {} + +impl ScopePtr { + // Helper to avoid disjoint captures of `scope_ptr.0` + unsafe fn as_ref(&self) -> &T { + &*self.0 + } +} diff --git a/third_party/rust/rayon-core/src/scope/test.rs b/third_party/rust/rayon-core/src/scope/test.rs new file mode 100644 index 0000000000..00dd18c920 --- /dev/null +++ b/third_party/rust/rayon-core/src/scope/test.rs @@ -0,0 +1,601 @@ +use crate::unwind; +use crate::ThreadPoolBuilder; +use crate::{scope, scope_fifo, Scope, ScopeFifo}; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; +use std::cmp; +use std::iter::once; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Barrier, Mutex}; +use std::vec; + +#[test] +fn scope_empty() { + scope(|_| {}); +} + +#[test] +fn scope_result() { + let x = scope(|_| 22); + assert_eq!(x, 22); +} + +#[test] +fn scope_two() { + let counter = &AtomicUsize::new(0); + scope(|s| { + s.spawn(move |_| { + counter.fetch_add(1, Ordering::SeqCst); + }); + s.spawn(move |_| { + counter.fetch_add(10, Ordering::SeqCst); + }); + }); + + let v = counter.load(Ordering::SeqCst); + assert_eq!(v, 11); +} + +#[test] +fn scope_divide_and_conquer() { + let counter_p = &AtomicUsize::new(0); + scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024))); + + let counter_s = &AtomicUsize::new(0); + divide_and_conquer_seq(counter_s, 1024); + + let p = counter_p.load(Ordering::SeqCst); + let s = counter_s.load(Ordering::SeqCst); + assert_eq!(p, s); +} + +fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) { + if size > 1 { + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) { + if size > 1 { + divide_and_conquer_seq(counter, size / 2); + divide_and_conquer_seq(counter, size / 2); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +struct Tree { + value: T, + children: Vec>, +} + +impl Tree { + fn iter(&self) -> vec::IntoIter<&T> { + once(&self.value) + .chain(self.children.iter().flat_map(Tree::iter)) + .collect::>() // seems like it shouldn't be needed... but prevents overflow + .into_iter() + } + + fn update(&mut self, op: OP) + where + OP: Fn(&mut T) + Sync, + T: Send, + { + scope(|s| self.update_in_scope(&op, s)); + } + + fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>) + where + OP: Fn(&mut T) + Sync, + { + let Tree { + ref mut value, + ref mut children, + } = *self; + scope.spawn(move |scope| { + for child in children { + scope.spawn(move |scope| child.update_in_scope(op, scope)); + } + }); + + op(value); + } +} + +fn random_tree(depth: usize) -> Tree { + assert!(depth > 0); + let mut seed = ::Seed::default(); + (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); + let mut rng = XorShiftRng::from_seed(seed); + random_tree1(depth, &mut rng) +} + +fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree { + let children = if depth == 0 { + vec![] + } else { + (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level + .map(|_| random_tree1(depth - 1, rng)) + .collect() + }; + + Tree { + value: rng.gen_range(0..1_000_000), + children, + } +} + +#[test] +fn update_tree() { + let mut tree: Tree = random_tree(10); + let values: Vec = tree.iter().cloned().collect(); + tree.update(|v| *v += 1); + let new_values: Vec = tree.iter().cloned().collect(); + assert_eq!(values.len(), new_values.len()); + for (&i, &j) in values.iter().zip(&new_values) { + assert_eq!(i + 1, j); + } +} + +/// Check that if you have a chain of scoped tasks where T0 spawns T1 +/// spawns T2 and so forth down to Tn, the stack space should not grow +/// linearly with N. We test this by some unsafe hackery and +/// permitting an approx 10% change with a 10x input change. +#[test] +fn linear_stack_growth() { + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let mut max_diff = Mutex::new(0); + let bottom_of_stack = 0; + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5)); + let diff_when_5 = *max_diff.get_mut().unwrap() as f64; + + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500)); + let diff_when_500 = *max_diff.get_mut().unwrap() as f64; + + let ratio = diff_when_5 / diff_when_500; + assert!( + ratio > 0.9 && ratio < 1.1, + "stack usage ratio out of bounds: {}", + ratio + ); + }); +} + +fn the_final_countdown<'scope>( + s: &Scope<'scope>, + bottom_of_stack: &'scope i32, + max: &'scope Mutex, + n: usize, +) { + let top_of_stack = 0; + let p = bottom_of_stack as *const i32 as usize; + let q = &top_of_stack as *const i32 as usize; + let diff = if p > q { p - q } else { q - p }; + + let mut data = max.lock().unwrap(); + *data = cmp::max(diff, *data); + + if n > 0 { + s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); + } +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_scope() { + scope(|_| panic!("Hello, world!")); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_spawn() { + scope(|s| s.spawn(|_| panic!("Hello, world!"))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_spawn() { + scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_scope_spawn() { + scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +fn panic_propagate_still_execute_1() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); // job A + s.spawn(|_| x = true); // job B, should still execute even though A panics + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +fn panic_propagate_still_execute_2() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // job B, should still execute even though A panics + s.spawn(|_| panic!("Hello, world!")); // job A + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +fn panic_propagate_still_execute_3() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // spawned job should still execute despite later panic + panic!("Hello, world!"); + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), + } +} + +#[test] +fn panic_propagate_still_execute_4() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); + x = true; + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic in spawn tainted scope"), + } +} + +macro_rules! test_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |scope| { + for j in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_order!(scope => spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_order!(scope_fifo => spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_nested_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$outer_spawn(move |_| { + $inner_scope(|scope| { + for j in 0..10 { + scope.$inner_spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn nested_lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_nested_order!(scope => spawn, scope => spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn nested_fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn nested_lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected: Vec = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +fn nested_fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected: Vec = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_push { + ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{ + $scope.$spawn(move |_| $vec.lock().unwrap().push($i)); + }}; +} + +/// Test spawns pushing a series of numbers, interleaved +/// such that negative values are using an inner scope. +macro_rules! test_mixed_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|outer_scope| { + let vec = &vec; + spawn_push!(outer_scope.$outer_spawn, vec, 0); + $inner_scope(|inner_scope| { + spawn_push!(inner_scope.$inner_spawn, vec, -1); + spawn_push!(outer_scope.$outer_spawn, vec, 1); + spawn_push!(inner_scope.$inner_spawn, vec, -2); + spawn_push!(outer_scope.$outer_spawn, vec, 2); + spawn_push!(inner_scope.$inner_spawn, vec, -3); + }); + spawn_push!(outer_scope.$outer_spawn, vec, 3); + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn mixed_lifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope => spawn); + let expected = vec![-3, 2, -2, 1, -1, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected = vec![-1, 0, -2, 1, -3, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_lifo_fifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected = vec![-1, 2, -2, 1, -3, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected = vec![-3, 0, -2, 1, -1, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +fn static_scope() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope(|s: &Scope<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn static_scope_fifo() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope_fifo(|s: &ScopeFifo<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn_fifo(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn mixed_lifetime_scope() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope(move |s: &Scope<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn mixed_lifetime_scope_fifo() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope_fifo(move |s: &ScopeFifo<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn_fifo(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn scope_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_fifo_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope_fifo(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_spawn_broadcast_nested() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|s, _| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * n * (n - 1) / 2); +} + +#[test] +fn scope_spawn_broadcast_barrier() { + let barrier = Barrier::new(8); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.in_place_scope(|s| { + s.spawn_broadcast(|_, _| { + barrier.wait(); + }); + barrier.wait(); + }); +} + +#[test] +fn scope_spawn_broadcast_panic_one() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +fn scope_spawn_broadcast_panic_many() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} diff --git a/third_party/rust/rayon-core/src/sleep/README.md b/third_party/rust/rayon-core/src/sleep/README.md new file mode 100644 index 0000000000..55426c88c9 --- /dev/null +++ b/third_party/rust/rayon-core/src/sleep/README.md @@ -0,0 +1,219 @@ +# Introduction: the sleep module + +The code in this module governs when worker threads should go to +sleep. The system used in this code was introduced in [Rayon RFC #5]. +There is also a [video walkthrough] available. Both of those may be +valuable resources to understanding the code, though naturally they +will also grow stale over time. The comments in this file are +extracted from the RFC and meant to be kept up to date. + +[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5 +[video walkthrough]: https://youtu.be/HvmQsE5M4cY + +# The `Sleep` struct + +The `Sleep` struct is embedded into each registry. It performs several functions: + +* It tracks when workers are awake or asleep. +* It decides how long a worker should look for work before it goes to sleep, + via a callback that is invoked periodically from the worker's search loop. +* It is notified when latches are set, jobs are published, or other + events occur, and it will go and wake the appropriate threads if + they are sleeping. + +# Thread states + +There are three main thread states: + +* An **active** thread is one that is actively executing a job. +* An **idle** thread is one that is searching for work to do. It will be + trying to steal work or pop work from the global injector queue. +* A **sleeping** thread is one that is blocked on a condition variable, + waiting to be awoken. + +We sometimes refer to the final two states collectively as **inactive**. +Threads begin as idle but transition to idle and finally sleeping when +they're unable to find work to do. + +## Sleepy threads + +There is one other special state worth mentioning. During the idle state, +threads can get **sleepy**. A sleepy thread is still idle, in that it is still +searching for work, but it is *about* to go to sleep after it does one more +search (or some other number, potentially). When a thread enters the sleepy +state, it signals (via the **jobs event counter**, described below) that it is +about to go to sleep. If new work is published, this will lead to the counter +being adjusted. When the thread actually goes to sleep, it will (hopefully, but +not guaranteed) see that the counter has changed and elect not to sleep, but +instead to search again. See the section on the **jobs event counter** for more +details. + +# The counters + +One of the key structs in the sleep module is `AtomicCounters`, found in +`counters.rs`. It packs three counters into one atomically managed value: + +* Two **thread counters**, which track the number of threads in a particular state. +* The **jobs event counter**, which is used to signal when new work is available. + It (sort of) tracks the number of jobs posted, but not quite, and it can rollover. + +## Thread counters + +There are two thread counters, one that tracks **inactive** threads and one that +tracks **sleeping** threads. From this, one can deduce the number of threads +that are idle by subtracting sleeping threads from inactive threads. We track +the counters in this way because it permits simpler atomic operations. One can +increment the number of sleeping threads (and thus decrease the number of idle +threads) simply by doing one atomic increment, for example. Similarly, one can +decrease the number of sleeping threads (and increase the number of idle +threads) through one atomic decrement. + +These counters are adjusted as follows: + +* When a thread enters the idle state: increment the inactive thread counter. +* When a thread enters the sleeping state: increment the sleeping thread counter. +* When a thread awakens a sleeping thread: decrement the sleeping thread counter. + * Subtle point: the thread that *awakens* the sleeping thread decrements the + counter, not the thread that is *sleeping*. This is because there is a delay + between signaling a thread to wake and the thread actually waking: + decrementing the counter when awakening the thread means that other threads + that may be posting work will see the up-to-date value that much faster. +* When a thread finds work, exiting the idle state: decrement the inactive + thread counter. + +## Jobs event counter + +The final counter is the **jobs event counter**. The role of this counter is to +help sleepy threads detect when new work is posted in a lightweight fashion. In +its simplest form, we would simply have a counter that gets incremented each +time a new job is posted. This way, when a thread gets sleepy, it could read the +counter, and then compare to see if the value has changed before it actually +goes to sleep. But this [turns out to be too expensive] in practice, so we use a +somewhat more complex scheme. + +[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747 + +The idea is that the counter toggles between two states, depending on whether +its value is even or odd (or, equivalently, on the value of its low bit): + +* Even -- If the low bit is zero, then it means that there has been no new work + since the last thread got sleepy. +* Odd -- If the low bit is one, then it means that new work was posted since + the last thread got sleepy. + +### New work is posted + +When new work is posted, we check the value of the counter: if it is even, +then we increment it by one, so that it becomes odd. + +### Worker thread gets sleepy + +When a worker thread gets sleepy, it will read the value of the counter. If the +counter is odd, it will increment the counter so that it is even. Either way, it +remembers the final value of the counter. The final value will be used later, +when the thread is going to sleep. If at that time the counter has not changed, +then we can assume no new jobs have been posted (though note the remote +possibility of rollover, discussed in detail below). + +# Protocol for a worker thread to post work + +The full protocol for a thread to post work is as follows + +* If the work is posted into the injection queue, then execute a seq-cst fence (see below). +* Load the counters, incrementing the JEC if it is even so that it is odd. +* Check if there are idle threads available to handle this new job. If not, + and there are sleeping threads, then wake one or more threads. + +# Protocol for a worker thread to fall asleep + +The full protocol for a thread to fall asleep is as follows: + +* After completing all its jobs, the worker goes idle and begins to + search for work. As it searches, it counts "rounds". In each round, + it searches all other work threads' queues, plus the 'injector queue' for + work injected from the outside. If work is found in this search, the thread + becomes active again and hence restarts this protocol from the top. +* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy` + above, remembering the `final_value` of the JEC. It does one more search for work. +* If no work is found, the thread atomically: + * Checks the JEC to see that it has not changed from `final_value`. + * If it has, then the thread goes back to searching for work. We reset to + just before we got sleepy, so that we will do one more search + before attending to sleep again (rather than searching for many rounds). + * Increments the number of sleeping threads by 1. +* The thread then executes a seq-cst fence operation (see below). +* The thread then does one final check for injected jobs (see below). If any + are available, it returns to the 'pre-sleepy' state as if the JEC had changed. +* The thread waits to be signaled. Once signaled, it returns to the idle state. + +# The jobs event counter and deadlock + +As described in the section on the JEC, the main concern around going to sleep +is avoiding a race condition wherein: + +* Thread A looks for work, finds none. +* Thread B posts work but sees no sleeping threads. +* Thread A goes to sleep. + +The JEC protocol largely prevents this, but due to rollover, this prevention is +not complete. It is possible -- if unlikely -- that enough activity occurs for +Thread A to observe the same JEC value that it saw when getting sleepy. If the +new work being published came from *inside* the thread-pool, then this race +condition isn't too harmful. It means that we have fewer workers processing the +work then we should, but we won't deadlock. This seems like an acceptable risk +given that this is unlikely in practice. + +However, if the work was posted as an *external* job, that is a problem. In that +case, it's possible that all of our workers could go to sleep, and the external +job would never get processed. To prevent that, the sleeping protocol includes +one final check to see if the injector queue is empty before fully falling +asleep. Note that this final check occurs **after** the number of sleeping +threads has been incremented. We are not concerned therefore with races against +injections that occur after that increment, only before. + +Unfortunately, there is one rather subtle point concerning this final check: +we wish to avoid the possibility that: + +* work is pushed into the injection queue by an outside thread X, +* the sleepy thread S sees the JEC but it has rolled over and is equal +* the sleepy thread S reads the injection queue but does not see the work posted by X. + +This is possible because the C++ memory model typically offers guarantees of the +form "if you see the access A, then you must see those other accesses" -- but it +doesn't guarantee that you will see the access A (i.e., if you think of +processors with independent caches, you may be operating on very out of date +cache state). + +## Using seq-cst fences to prevent deadlock + +To overcome this problem, we have inserted two sequentially consistent fence +operations into the protocols above: + +* One fence occurs after work is posted into the injection queue, but before the + counters are read (including the number of sleeping threads). + * Note that no fence is needed for work posted to internal queues, since it is ok + to overlook work in that case. +* One fence occurs after the number of sleeping threads is incremented, but + before the injection queue is read. + +### Proof sketch + +What follows is a "proof sketch" that the protocol is deadlock free. We model +two relevant bits of memory, the job injector queue J and the atomic counters C. + +Consider the actions of the injecting thread: + +* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics. +* PushFence: A sequentially consistent fence is executed. +* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first). + +Meanwhile, the sleepy thread does the following: + +* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C. +* SleepFence: A sequentially consistent fence is executed. +* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics. + +Either PushFence or SleepFence must come first: + +* If PushFence comes first, then PushJob must be visible to ReadJob. +* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file diff --git a/third_party/rust/rayon-core/src/sleep/counters.rs b/third_party/rust/rayon-core/src/sleep/counters.rs new file mode 100644 index 0000000000..f2a3de3e13 --- /dev/null +++ b/third_party/rust/rayon-core/src/sleep/counters.rs @@ -0,0 +1,277 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub(super) struct AtomicCounters { + /// Packs together a number of counters. The counters are ordered as + /// follows, from least to most significant bits (here, we assuming + /// that [`THREADS_BITS`] is equal to 10): + /// + /// * Bits 0..10: Stores the number of **sleeping threads** + /// * Bits 10..20: Stores the number of **inactive threads** + /// * Bits 20..: Stores the **job event counter** (JEC) + /// + /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note + /// that the total number of bits (and hence the number of bits used for the + /// JEC) will depend on whether we are using a 32- or 64-bit architecture. + value: AtomicUsize, +} + +#[derive(Copy, Clone)] +pub(super) struct Counters { + word: usize, +} + +/// A value read from the **Jobs Event Counter**. +/// See the [`README.md`](README.md) for more +/// coverage of how the jobs event counter works. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] +pub(super) struct JobsEventCounter(usize); + +impl JobsEventCounter { + pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); + + #[inline] + pub(super) fn as_usize(self) -> usize { + self.0 + } + + /// The JEC "is sleepy" if the last thread to increment it was in the + /// process of becoming sleepy. This is indicated by its value being *even*. + /// When new jobs are posted, they check if the JEC is sleepy, and if so + /// they incremented it. + #[inline] + pub(super) fn is_sleepy(self) -> bool { + (self.as_usize() & 1) == 0 + } + + /// The JEC "is active" if the last thread to increment it was posting new + /// work. This is indicated by its value being *odd*. When threads get + /// sleepy, they will check if the JEC is active, and increment it. + #[inline] + pub(super) fn is_active(self) -> bool { + !self.is_sleepy() + } +} + +/// Number of bits used for the thread counters. +#[cfg(target_pointer_width = "64")] +const THREADS_BITS: usize = 16; + +#[cfg(target_pointer_width = "32")] +const THREADS_BITS: usize = 8; + +/// Bits to shift to select the sleeping threads +/// (used with `select_bits`). +#[allow(clippy::erasing_op)] +const SLEEPING_SHIFT: usize = 0 * THREADS_BITS; + +/// Bits to shift to select the inactive threads +/// (used with `select_bits`). +#[allow(clippy::identity_op)] +const INACTIVE_SHIFT: usize = 1 * THREADS_BITS; + +/// Bits to shift to select the JEC +/// (use JOBS_BITS). +const JEC_SHIFT: usize = 2 * THREADS_BITS; + +/// Max value for the thread counters. +pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1; + +/// Constant that can be added to add one sleeping thread. +const ONE_SLEEPING: usize = 1; + +/// Constant that can be added to add one inactive thread. +/// An inactive thread is either idle, sleepy, or sleeping. +const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT; + +/// Constant that can be added to add one to the JEC. +const ONE_JEC: usize = 1 << JEC_SHIFT; + +impl AtomicCounters { + #[inline] + pub(super) fn new() -> AtomicCounters { + AtomicCounters { + value: AtomicUsize::new(0), + } + } + + /// Load and return the current value of the various counters. + /// This value can then be given to other method which will + /// attempt to update the counters via compare-and-swap. + #[inline] + pub(super) fn load(&self, ordering: Ordering) -> Counters { + Counters::new(self.value.load(ordering)) + } + + #[inline] + fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool { + self.value + .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed) + .is_ok() + } + + /// Adds an inactive thread. This cannot fail. + /// + /// This should be invoked when a thread enters its idle loop looking + /// for work. It is decremented when work is found. Note that it is + /// not decremented if the thread transitions from idle to sleepy or sleeping; + /// so the number of inactive threads is always greater-than-or-equal + /// to the number of sleeping threads. + #[inline] + pub(super) fn add_inactive_thread(&self) { + self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); + } + + /// Increments the jobs event counter if `increment_when`, when applied to + /// the current value, is true. Used to toggle the JEC from even (sleepy) to + /// odd (active) or vice versa. Returns the final value of the counters, for + /// which `increment_when` is guaranteed to return false. + pub(super) fn increment_jobs_event_counter_if( + &self, + increment_when: impl Fn(JobsEventCounter) -> bool, + ) -> Counters { + loop { + let old_value = self.load(Ordering::SeqCst); + if increment_when(old_value.jobs_counter()) { + let new_value = old_value.increment_jobs_counter(); + if self.try_exchange(old_value, new_value, Ordering::SeqCst) { + return new_value; + } + } else { + return old_value; + } + } + } + + /// Subtracts an inactive thread. This cannot fail. It is invoked + /// when a thread finds work and hence becomes active. It returns the + /// number of sleeping threads to wake up (if any). + /// + /// See `add_inactive_thread`. + #[inline] + pub(super) fn sub_inactive_thread(&self) -> usize { + let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst)); + debug_assert!( + old_value.inactive_threads() > 0, + "sub_inactive_thread: old_value {:?} has no inactive threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() <= old_value.inactive_threads(), + "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads", + old_value, + old_value.sleeping_threads(), + old_value.inactive_threads(), + ); + + // Current heuristic: whenever an inactive thread goes away, if + // there are any sleeping threads, wake 'em up. + let sleeping_threads = old_value.sleeping_threads(); + std::cmp::min(sleeping_threads, 2) + } + + /// Subtracts a sleeping thread. This cannot fail, but it is only + /// safe to do if you you know the number of sleeping threads is + /// non-zero (i.e., because you have just awoken a sleeping + /// thread). + #[inline] + pub(super) fn sub_sleeping_thread(&self) { + let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst)); + debug_assert!( + old_value.sleeping_threads() > 0, + "sub_sleeping_thread: old_value {:?} had no sleeping threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() <= old_value.inactive_threads(), + "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads", + old_value, + old_value.sleeping_threads(), + old_value.inactive_threads(), + ); + } + + #[inline] + pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool { + debug_assert!( + old_value.inactive_threads() > 0, + "try_add_sleeping_thread: old_value {:?} has no inactive threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() < THREADS_MAX, + "try_add_sleeping_thread: old_value {:?} has too many sleeping threads", + old_value, + ); + + let mut new_value = old_value; + new_value.word += ONE_SLEEPING; + + self.try_exchange(old_value, new_value, Ordering::SeqCst) + } +} + +#[inline] +fn select_thread(word: usize, shift: usize) -> usize { + ((word >> shift) as usize) & THREADS_MAX +} + +#[inline] +fn select_jec(word: usize) -> usize { + (word >> JEC_SHIFT) as usize +} + +impl Counters { + #[inline] + fn new(word: usize) -> Counters { + Counters { word } + } + + #[inline] + fn increment_jobs_counter(self) -> Counters { + // We can freely add to JEC because it occupies the most significant bits. + // Thus it doesn't overflow into the other counters, just wraps itself. + Counters { + word: self.word.wrapping_add(ONE_JEC), + } + } + + #[inline] + pub(super) fn jobs_counter(self) -> JobsEventCounter { + JobsEventCounter(select_jec(self.word)) + } + + /// The number of threads that are not actively + /// executing work. They may be idle, sleepy, or asleep. + #[inline] + pub(super) fn inactive_threads(self) -> usize { + select_thread(self.word, INACTIVE_SHIFT) + } + + #[inline] + pub(super) fn awake_but_idle_threads(self) -> usize { + debug_assert!( + self.sleeping_threads() <= self.inactive_threads(), + "sleeping threads: {} > raw idle threads {}", + self.sleeping_threads(), + self.inactive_threads() + ); + self.inactive_threads() - self.sleeping_threads() + } + + #[inline] + pub(super) fn sleeping_threads(self) -> usize { + select_thread(self.word, SLEEPING_SHIFT) + } +} + +impl std::fmt::Debug for Counters { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let word = format!("{:016x}", self.word); + fmt.debug_struct("Counters") + .field("word", &word) + .field("jobs", &self.jobs_counter().0) + .field("inactive", &self.inactive_threads()) + .field("sleeping", &self.sleeping_threads()) + .finish() + } +} diff --git a/third_party/rust/rayon-core/src/sleep/mod.rs b/third_party/rust/rayon-core/src/sleep/mod.rs new file mode 100644 index 0000000000..af7225a35e --- /dev/null +++ b/third_party/rust/rayon-core/src/sleep/mod.rs @@ -0,0 +1,394 @@ +//! Code that decides when workers should go to sleep. See README.md +//! for an overview. + +use crate::latch::CoreLatch; +use crate::log::Event::*; +use crate::log::Logger; +use crossbeam_utils::CachePadded; +use std::sync::atomic::Ordering; +use std::sync::{Condvar, Mutex}; +use std::thread; +use std::usize; + +mod counters; +pub(crate) use self::counters::THREADS_MAX; +use self::counters::{AtomicCounters, JobsEventCounter}; + +/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping +/// of workers. It has callbacks that are invoked periodically at significant events, +/// such as when workers are looping and looking for work, when latches are set, or when +/// jobs are published, and it either blocks threads or wakes them in response to these +/// events. See the [`README.md`] in this module for more details. +/// +/// [`README.md`] README.md +pub(super) struct Sleep { + logger: Logger, + + /// One "sleep state" per worker. Used to track if a worker is sleeping and to have + /// them block. + worker_sleep_states: Vec>, + + counters: AtomicCounters, +} + +/// An instance of this struct is created when a thread becomes idle. +/// It is consumed when the thread finds work, and passed by `&mut` +/// reference for operations that preserve the idle state. (In other +/// words, producing one of these structs is evidence the thread is +/// idle.) It tracks state such as how long the thread has been idle. +pub(super) struct IdleState { + /// What is worker index of the idle thread? + worker_index: usize, + + /// How many rounds have we been circling without sleeping? + rounds: u32, + + /// Once we become sleepy, what was the sleepy counter value? + /// Set to `INVALID_SLEEPY_COUNTER` otherwise. + jobs_counter: JobsEventCounter, +} + +/// The "sleep state" for an individual worker. +#[derive(Default)] +struct WorkerSleepState { + /// Set to true when the worker goes to sleep; set to false when + /// the worker is notified or when it wakes. + is_blocked: Mutex, + + condvar: Condvar, +} + +const ROUNDS_UNTIL_SLEEPY: u32 = 32; +const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; + +impl Sleep { + pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { + assert!(n_threads <= THREADS_MAX); + Sleep { + logger, + worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), + counters: AtomicCounters::new(), + } + } + + #[inline] + pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { + self.logger.log(|| ThreadIdle { + worker: worker_index, + latch_addr: latch.addr(), + }); + + self.counters.add_inactive_thread(); + + IdleState { + worker_index, + rounds: 0, + jobs_counter: JobsEventCounter::DUMMY, + } + } + + #[inline] + pub(super) fn work_found(&self, idle_state: IdleState) { + self.logger.log(|| ThreadFoundWork { + worker: idle_state.worker_index, + yields: idle_state.rounds, + }); + + // If we were the last idle thread and other threads are still sleeping, + // then we should wake up another thread. + let threads_to_wake = self.counters.sub_inactive_thread(); + self.wake_any_threads(threads_to_wake as u32); + } + + #[inline] + pub(super) fn no_work_found( + &self, + idle_state: &mut IdleState, + latch: &CoreLatch, + has_injected_jobs: impl FnOnce() -> bool, + ) { + if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { + thread::yield_now(); + idle_state.rounds += 1; + } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { + idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); + idle_state.rounds += 1; + thread::yield_now(); + } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { + idle_state.rounds += 1; + thread::yield_now(); + } else { + debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); + self.sleep(idle_state, latch, has_injected_jobs); + } + } + + #[cold] + fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { + let counters = self + .counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active); + let jobs_counter = counters.jobs_counter(); + self.logger.log(|| ThreadSleepy { + worker: worker_index, + jobs_counter: jobs_counter.as_usize(), + }); + jobs_counter + } + + #[cold] + fn sleep( + &self, + idle_state: &mut IdleState, + latch: &CoreLatch, + has_injected_jobs: impl FnOnce() -> bool, + ) { + let worker_index = idle_state.worker_index; + + if !latch.get_sleepy() { + self.logger.log(|| ThreadSleepInterruptedByLatch { + worker: worker_index, + latch_addr: latch.addr(), + }); + + return; + } + + let sleep_state = &self.worker_sleep_states[worker_index]; + let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); + debug_assert!(!*is_blocked); + + // Our latch was signalled. We should wake back up fully as we + // will have some stuff to do. + if !latch.fall_asleep() { + self.logger.log(|| ThreadSleepInterruptedByLatch { + worker: worker_index, + latch_addr: latch.addr(), + }); + + idle_state.wake_fully(); + return; + } + + loop { + let counters = self.counters.load(Ordering::SeqCst); + + // Check if the JEC has changed since we got sleepy. + debug_assert!(idle_state.jobs_counter.is_sleepy()); + if counters.jobs_counter() != idle_state.jobs_counter { + // JEC has changed, so a new job was posted, but for some reason + // we didn't see it. We should return to just before the SLEEPY + // state so we can do another search and (if we fail to find + // work) go back to sleep. + self.logger.log(|| ThreadSleepInterruptedByJob { + worker: worker_index, + }); + + idle_state.wake_partly(); + latch.wake_up(); + return; + } + + // Otherwise, let's move from IDLE to SLEEPING. + if self.counters.try_add_sleeping_thread(counters) { + break; + } + } + + // Successfully registered as asleep. + + self.logger.log(|| ThreadSleeping { + worker: worker_index, + latch_addr: latch.addr(), + }); + + // We have one last check for injected jobs to do. This protects against + // deadlock in the very unlikely event that + // + // - an external job is being injected while we are sleepy + // - that job triggers the rollover over the JEC such that we don't see it + // - we are the last active worker thread + std::sync::atomic::fence(Ordering::SeqCst); + if has_injected_jobs() { + // If we see an externally injected job, then we have to 'wake + // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by + // the one that wakes us.) + self.counters.sub_sleeping_thread(); + } else { + // If we don't see an injected job (the normal case), then flag + // ourselves as asleep and wait till we are notified. + // + // (Note that `is_blocked` is held under a mutex and the mutex was + // acquired *before* we incremented the "sleepy counter". This means + // that whomever is coming to wake us will have to wait until we + // release the mutex in the call to `wait`, so they will see this + // boolean as true.) + *is_blocked = true; + while *is_blocked { + is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); + } + } + + // Update other state: + idle_state.wake_fully(); + latch.wake_up(); + + self.logger.log(|| ThreadAwoken { + worker: worker_index, + latch_addr: latch.addr(), + }); + } + + /// Notify the given thread that it should wake up (if it is + /// sleeping). When this method is invoked, we typically know the + /// thread is asleep, though in rare cases it could have been + /// awoken by (e.g.) new work having been posted. + pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { + self.wake_specific_thread(target_worker_index); + } + + /// Signals that `num_jobs` new jobs were injected into the thread + /// pool from outside. This function will ensure that there are + /// threads available to process them, waking threads from sleep + /// if necessary. + /// + /// # Parameters + /// + /// - `source_worker_index` -- index of the thread that did the + /// push, or `usize::MAX` if this came from outside the thread + /// pool -- it is used only for logging. + /// - `num_jobs` -- lower bound on number of jobs available for stealing. + /// We'll try to get at least one thread per job. + #[inline] + pub(super) fn new_injected_jobs( + &self, + source_worker_index: usize, + num_jobs: u32, + queue_was_empty: bool, + ) { + // This fence is needed to guarantee that threads + // as they are about to fall asleep, observe any + // new jobs that may have been injected. + std::sync::atomic::fence(Ordering::SeqCst); + + self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + } + + /// Signals that `num_jobs` new jobs were pushed onto a thread's + /// local deque. This function will try to ensure that there are + /// threads available to process them, waking threads from sleep + /// if necessary. However, this is not guaranteed: under certain + /// race conditions, the function may fail to wake any new + /// threads; in that case the existing thread should eventually + /// pop the job. + /// + /// # Parameters + /// + /// - `source_worker_index` -- index of the thread that did the + /// push, or `usize::MAX` if this came from outside the thread + /// pool -- it is used only for logging. + /// - `num_jobs` -- lower bound on number of jobs available for stealing. + /// We'll try to get at least one thread per job. + #[inline] + pub(super) fn new_internal_jobs( + &self, + source_worker_index: usize, + num_jobs: u32, + queue_was_empty: bool, + ) { + self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + } + + /// Common helper for `new_injected_jobs` and `new_internal_jobs`. + #[inline] + fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { + // Read the counters and -- if sleepy workers have announced themselves + // -- announce that there is now work available. The final value of `counters` + // with which we exit the loop thus corresponds to a state when + let counters = self + .counters + .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); + let num_awake_but_idle = counters.awake_but_idle_threads(); + let num_sleepers = counters.sleeping_threads(); + + self.logger.log(|| JobThreadCounts { + worker: source_worker_index, + num_idle: num_awake_but_idle as u16, + num_sleepers: num_sleepers as u16, + }); + + if num_sleepers == 0 { + // nobody to wake + return; + } + + // Promote from u16 to u32 so we can interoperate with + // num_jobs more easily. + let num_awake_but_idle = num_awake_but_idle as u32; + let num_sleepers = num_sleepers as u32; + + // If the queue is non-empty, then we always wake up a worker + // -- clearly the existing idle jobs aren't enough. Otherwise, + // check to see if we have enough idle workers. + if !queue_was_empty { + let num_to_wake = std::cmp::min(num_jobs, num_sleepers); + self.wake_any_threads(num_to_wake); + } else if num_awake_but_idle < num_jobs { + let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); + self.wake_any_threads(num_to_wake); + } + } + + #[cold] + fn wake_any_threads(&self, mut num_to_wake: u32) { + if num_to_wake > 0 { + for i in 0..self.worker_sleep_states.len() { + if self.wake_specific_thread(i) { + num_to_wake -= 1; + if num_to_wake == 0 { + return; + } + } + } + } + } + + fn wake_specific_thread(&self, index: usize) -> bool { + let sleep_state = &self.worker_sleep_states[index]; + + let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); + if *is_blocked { + *is_blocked = false; + sleep_state.condvar.notify_one(); + + // When the thread went to sleep, it will have incremented + // this value. When we wake it, its our job to decrement + // it. We could have the thread do it, but that would + // introduce a delay between when the thread was + // *notified* and when this counter was decremented. That + // might mislead people with new work into thinking that + // there are sleeping threads that they should try to + // wake, when in fact there is nothing left for them to + // do. + self.counters.sub_sleeping_thread(); + + self.logger.log(|| ThreadNotify { worker: index }); + + true + } else { + false + } + } +} + +impl IdleState { + fn wake_fully(&mut self) { + self.rounds = 0; + self.jobs_counter = JobsEventCounter::DUMMY; + } + + fn wake_partly(&mut self) { + self.rounds = ROUNDS_UNTIL_SLEEPY; + self.jobs_counter = JobsEventCounter::DUMMY; + } +} diff --git a/third_party/rust/rayon-core/src/spawn/mod.rs b/third_party/rust/rayon-core/src/spawn/mod.rs new file mode 100644 index 0000000000..ae1f211efe --- /dev/null +++ b/third_party/rust/rayon-core/src/spawn/mod.rs @@ -0,0 +1,163 @@ +use crate::job::*; +use crate::registry::Registry; +use crate::unwind; +use std::mem; +use std::sync::Arc; + +/// Fires off a task into the Rayon threadpool in the "static" or +/// "global" scope. Just like a standard thread, this task is not +/// tied to the current stack frame, and hence it cannot hold any +/// references other than those with `'static` lifetime. If you want +/// to spawn a task that references stack data, use [the `scope()` +/// function][scope] to create a scope. +/// +/// [scope]: fn.scope.html +/// +/// Since tasks spawned with this function cannot hold references into +/// the enclosing stack frame, you almost certainly want to use a +/// `move` closure as their argument (otherwise, the closure will +/// typically hold references to any variables from the enclosing +/// function that you happen to use). +/// +/// This API assumes that the closure is executed purely for its +/// side-effects (i.e., it might send messages, modify data protected +/// by a mutex, or some such thing). +/// +/// There is no guaranteed order of execution for spawns, given that +/// other threads may steal tasks at any time. However, they are +/// generally prioritized in a LIFO order on the thread from which +/// they were spawned. Other threads always steal from the other end of +/// the deque, like FIFO order. The idea is that "recent" tasks are +/// most likely to be fresh in the local CPU's cache, while other +/// threads can steal older "stale" tasks. For an alternate approach, +/// consider [`spawn_fifo()`] instead. +/// +/// [`spawn_fifo()`]: fn.spawn_fifo.html +/// +/// # Panic handling +/// +/// If this closure should panic, the resulting panic will be +/// propagated to the panic handler registered in the `ThreadPoolBuilder`, +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// details. +/// +/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler +/// +/// # Examples +/// +/// This code creates a Rayon task that increments a global counter. +/// +/// ```rust +/// # use rayon_core as rayon; +/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +/// +/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; +/// +/// rayon::spawn(move || { +/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst); +/// }); +/// ``` +pub fn spawn(func: F) +where + F: FnOnce() + Send + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_in(func, &Registry::current()) } +} + +/// Spawns an asynchronous job in `registry.` +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_in(func: F, registry: &Arc) +where + F: FnOnce() + Send + 'static, +{ + // We assert that this does not hold any references (we know + // this because of the `'static` bound in the interface); + // moreover, we assert that the code below is not supposed to + // be able to panic, and hence the data won't leak but will be + // enqueued into some deque for later execution. + let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic + let job_ref = spawn_job(func, registry); + registry.inject_or_push(job_ref); + mem::forget(abort_guard); +} + +unsafe fn spawn_job(func: F, registry: &Arc) -> JobRef +where + F: FnOnce() + Send + 'static, +{ + // Ensure that registry cannot terminate until this job has + // executed. This ref is decremented at the (*) below. + registry.increment_terminate_count(); + + HeapJob::new({ + let registry = Arc::clone(registry); + move || { + registry.catch_unwind(func); + registry.terminate(); // (*) permit registry to terminate now + } + }) + .into_static_job_ref() +} + +/// Fires off a task into the Rayon threadpool in the "static" or +/// "global" scope. Just like a standard thread, this task is not +/// tied to the current stack frame, and hence it cannot hold any +/// references other than those with `'static` lifetime. If you want +/// to spawn a task that references stack data, use [the `scope_fifo()` +/// function](fn.scope_fifo.html) to create a scope. +/// +/// The behavior is essentially the same as [the `spawn` +/// function](fn.spawn.html), except that calls from the same thread +/// will be prioritized in FIFO order. This is similar to the now- +/// deprecated [`breadth_first`] option, except the effect is isolated +/// to relative `spawn_fifo` calls, not all threadpool tasks. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panic handling +/// +/// If this closure should panic, the resulting panic will be +/// propagated to the panic handler registered in the `ThreadPoolBuilder`, +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// details. +/// +/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler +pub fn spawn_fifo(func: F) +where + F: FnOnce() + Send + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_fifo_in(func, &Registry::current()) } +} + +/// Spawns an asynchronous FIFO job in `registry.` +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_fifo_in(func: F, registry: &Arc) +where + F: FnOnce() + Send + 'static, +{ + // We assert that this does not hold any references (we know + // this because of the `'static` bound in the interface); + // moreover, we assert that the code below is not supposed to + // be able to panic, and hence the data won't leak but will be + // enqueued into some deque for later execution. + let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic + let job_ref = spawn_job(func, registry); + + // If we're in the pool, use our thread's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match registry.current_thread() { + Some(worker) => worker.push_fifo(job_ref), + None => registry.inject(&[job_ref]), + } + mem::forget(abort_guard); +} + +#[cfg(test)] +mod test; diff --git a/third_party/rust/rayon-core/src/spawn/test.rs b/third_party/rust/rayon-core/src/spawn/test.rs new file mode 100644 index 0000000000..761fafc777 --- /dev/null +++ b/third_party/rust/rayon-core/src/spawn/test.rs @@ -0,0 +1,243 @@ +use crate::scope; +use std::any::Any; +use std::sync::mpsc::channel; +use std::sync::Mutex; + +use super::{spawn, spawn_fifo}; +use crate::ThreadPoolBuilder; + +#[test] +fn spawn_then_join_in_worker() { + let (tx, rx) = channel(); + scope(move |_| { + spawn(move || tx.send(22).unwrap()); + }); + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +fn spawn_then_join_outside_worker() { + let (tx, rx) = channel(); + spawn(move || tx.send(22).unwrap()); + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +fn panic_fwd() { + let (tx, rx) = channel(); + + let tx = Mutex::new(tx); + let panic_handler = move |err: Box| { + let tx = tx.lock().unwrap(); + if let Some(&msg) = err.downcast_ref::<&str>() { + if msg == "Hello, world!" { + tx.send(1).unwrap(); + } else { + tx.send(2).unwrap(); + } + } else { + tx.send(3).unwrap(); + } + }; + + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + + builder + .build() + .unwrap() + .spawn(move || panic!("Hello, world!")); + + assert_eq!(1, rx.recv().unwrap()); +} + +/// Test what happens when the thread-pool is dropped but there are +/// still active asynchronous tasks. We expect the thread-pool to stay +/// alive and executing until those threads are complete. +#[test] +fn termination_while_things_are_executing() { + let (tx0, rx0) = channel(); + let (tx1, rx1) = channel(); + + // Create a thread-pool and spawn some code in it, but then drop + // our reference to it. + { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + thread_pool.spawn(move || { + let data = rx0.recv().unwrap(); + + // At this point, we know the "main" reference to the + // `ThreadPool` has been dropped, but there are still + // active threads. Launch one more. + spawn(move || { + tx1.send(data).unwrap(); + }); + }); + } + + tx0.send(22).unwrap(); + let v = rx1.recv().unwrap(); + assert_eq!(v, 22); +} + +#[test] +fn custom_panic_handler_and_spawn() { + let (tx, rx) = channel(); + + // Create a parallel closure that will send panics on the + // channel; since the closure is potentially executed in parallel + // with itself, we have to wrap `tx` in a mutex. + let tx = Mutex::new(tx); + let panic_handler = move |e: Box| { + tx.lock().unwrap().send(e).unwrap(); + }; + + // Execute an async that will panic. + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + builder.build().unwrap().spawn(move || { + panic!("Hello, world!"); + }); + + // Check that we got back the panic we expected. + let error = rx.recv().unwrap(); + if let Some(&msg) = error.downcast_ref::<&str>() { + assert_eq!(msg, "Hello, world!"); + } else { + panic!("did not receive a string from panic handler"); + } +} + +#[test] +fn custom_panic_handler_and_nested_spawn() { + let (tx, rx) = channel(); + + // Create a parallel closure that will send panics on the + // channel; since the closure is potentially executed in parallel + // with itself, we have to wrap `tx` in a mutex. + let tx = Mutex::new(tx); + let panic_handler = move |e| { + tx.lock().unwrap().send(e).unwrap(); + }; + + // Execute an async that will (eventually) panic. + const PANICS: usize = 3; + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + builder.build().unwrap().spawn(move || { + // launch 3 nested spawn-asyncs; these should be in the same + // thread-pool and hence inherit the same panic handler + for _ in 0..PANICS { + spawn(move || { + panic!("Hello, world!"); + }); + } + }); + + // Check that we get back the panics we expected. + for _ in 0..PANICS { + let error = rx.recv().unwrap(); + if let Some(&msg) = error.downcast_ref::<&str>() { + assert_eq!(msg, "Hello, world!"); + } else { + panic!("did not receive a string from panic handler"); + } + } +} + +macro_rules! test_order { + ($outer_spawn:ident, $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + $outer_spawn(move || { + for j in 0..10 { + let tx = tx.clone(); + $inner_spawn(move || { + tx.send(i * 10 + j).unwrap(); + }); + } + }); + } + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn lifo_order() { + // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order. + let vec = test_order!(spawn, spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn fifo_order() { + // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. + let vec = test_order!(spawn_fifo, spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_order!(spawn, spawn_fifo); + let expected: Vec = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +fn fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_order!(spawn_fifo, spawn); + let expected: Vec = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_send { + ($spawn:ident, $tx:ident, $i:expr) => {{ + let tx = $tx.clone(); + $spawn(move || tx.send($i).unwrap()); + }}; +} + +/// Test mixed spawns pushing a series of numbers, interleaved such +/// such that negative values are using the second kind of spawn. +macro_rules! test_mixed_order { + ($pos_spawn:ident, $neg_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + spawn_send!($pos_spawn, tx, 0); + spawn_send!($neg_spawn, tx, -1); + spawn_send!($pos_spawn, tx, 1); + spawn_send!($neg_spawn, tx, -2); + spawn_send!($pos_spawn, tx, 2); + spawn_send!($neg_spawn, tx, -3); + spawn_send!($pos_spawn, tx, 3); + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn mixed_lifo_fifo_order() { + let vec = test_mixed_order!(spawn, spawn_fifo); + let expected = vec![3, -1, 2, -2, 1, -3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(spawn_fifo, spawn); + let expected = vec![0, -3, 1, -2, 2, -1, 3]; + assert_eq!(vec, expected); +} diff --git a/third_party/rust/rayon-core/src/test.rs b/third_party/rust/rayon-core/src/test.rs new file mode 100644 index 0000000000..46d63a7dff --- /dev/null +++ b/third_party/rust/rayon-core/src/test.rs @@ -0,0 +1,192 @@ +#![cfg(test)] + +use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Barrier}; + +#[test] +fn worker_thread_index() { + let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + assert_eq!(pool.current_num_threads(), 22); + assert_eq!(pool.current_thread_index(), None); + let index = pool.install(|| pool.current_thread_index().unwrap()); + assert!(index < 22); +} + +#[test] +fn start_callback_called() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let barrier = Arc::new(Barrier::new(n_threads + 1)); + + let b = Arc::clone(&barrier); + let nc = Arc::clone(&n_called); + let start_handler = move |_| { + nc.fetch_add(1, Ordering::SeqCst); + b.wait(); + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .start_handler(start_handler); + let _ = conf.build().unwrap(); + + // Wait for all the threads to have been scheduled to run. + barrier.wait(); + + // The handler must have been called on every started thread. + assert_eq!(n_called.load(Ordering::SeqCst), n_threads); +} + +#[test] +fn exit_callback_called() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let barrier = Arc::new(Barrier::new(n_threads + 1)); + + let b = Arc::clone(&barrier); + let nc = Arc::clone(&n_called); + let exit_handler = move |_| { + nc.fetch_add(1, Ordering::SeqCst); + b.wait(); + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .exit_handler(exit_handler); + { + let _ = conf.build().unwrap(); + // Drop the pool so it stops the running threads. + } + + // Wait for all the threads to have been scheduled to run. + barrier.wait(); + + // The handler must have been called on every exiting thread. + assert_eq!(n_called.load(Ordering::SeqCst), n_threads); +} + +#[test] +fn handler_panics_handled_correctly() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let start_barrier = Arc::new(Barrier::new(n_threads + 1)); + let exit_barrier = Arc::new(Barrier::new(n_threads + 1)); + + let start_handler = move |_| { + panic!("ensure panic handler is called when starting"); + }; + let exit_handler = move |_| { + panic!("ensure panic handler is called when exiting"); + }; + + let sb = Arc::clone(&start_barrier); + let eb = Arc::clone(&exit_barrier); + let nc = Arc::clone(&n_called); + let panic_handler = move |_| { + let val = nc.fetch_add(1, Ordering::SeqCst); + if val < n_threads { + sb.wait(); + } else { + eb.wait(); + } + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .start_handler(start_handler) + .exit_handler(exit_handler) + .panic_handler(panic_handler); + { + let _ = conf.build().unwrap(); + + // Wait for all the threads to start, panic in the start handler, + // and been taken care of by the panic handler. + start_barrier.wait(); + + // Drop the pool so it stops the running threads. + } + + // Wait for all the threads to exit, panic in the exit handler, + // and been taken care of by the panic handler. + exit_barrier.wait(); + + // The panic handler must have been called twice on every thread. + assert_eq!(n_called.load(Ordering::SeqCst), 2 * n_threads); +} + +#[test] +fn check_config_build() { + let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + assert_eq!(pool.current_num_threads(), 22); +} + +/// Helper used by check_error_send_sync to ensure ThreadPoolBuildError is Send + Sync +fn _send_sync() {} + +#[test] +fn check_error_send_sync() { + _send_sync::(); +} + +#[allow(deprecated)] +#[test] +fn configuration() { + let start_handler = move |_| {}; + let exit_handler = move |_| {}; + let panic_handler = move |_| {}; + let thread_name = move |i| format!("thread_name_{}", i); + + // Ensure we can call all public methods on Configuration + crate::Configuration::new() + .thread_name(thread_name) + .num_threads(5) + .panic_handler(panic_handler) + .stack_size(4e6 as usize) + .breadth_first() + .start_handler(start_handler) + .exit_handler(exit_handler) + .build() + .unwrap(); +} + +#[test] +fn default_pool() { + ThreadPoolBuilder::default().build().unwrap(); +} + +/// Test that custom spawned threads get their `WorkerThread` cleared once +/// the pool is done with them, allowing them to be used with rayon again +/// later. e.g. WebAssembly want to have their own pool of available threads. +#[test] +fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { + let n_threads = 5; + let mut handles = vec![]; + let pool = ThreadPoolBuilder::new() + .num_threads(n_threads) + .spawn_handler(|thread| { + let handle = std::thread::spawn(move || { + thread.run(); + + // Afterward, the current thread shouldn't be set anymore. + assert_eq!(crate::current_thread_index(), None); + }); + handles.push(handle); + Ok(()) + }) + .build()?; + assert_eq!(handles.len(), n_threads); + + pool.install(|| assert!(crate::current_thread_index().is_some())); + drop(pool); + + // Wait for all threads to make their assertions and exit + for handle in handles { + handle.join().unwrap(); + } + + Ok(()) +} diff --git a/third_party/rust/rayon-core/src/thread_pool/mod.rs b/third_party/rust/rayon-core/src/thread_pool/mod.rs new file mode 100644 index 0000000000..0fc06dd6bc --- /dev/null +++ b/third_party/rust/rayon-core/src/thread_pool/mod.rs @@ -0,0 +1,402 @@ +//! Contains support for user-managed thread pools, represented by the +//! the [`ThreadPool`] type (see that struct for details). +//! +//! [`ThreadPool`]: struct.ThreadPool.html + +use crate::broadcast::{self, BroadcastContext}; +use crate::join; +use crate::registry::{Registry, ThreadSpawn, WorkerThread}; +use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; +use crate::spawn; +use crate::{scope, Scope}; +use crate::{scope_fifo, ScopeFifo}; +use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; +use std::error::Error; +use std::fmt; +use std::sync::Arc; + +mod test; + +/// Represents a user created [thread-pool]. +/// +/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads +/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then +/// execute functions explicitly within this [`ThreadPool`] using +/// [`ThreadPool::install()`]. By contrast, top level rayon functions +/// (like `join()`) will execute implicitly within the current thread-pool. +/// +/// +/// ## Creating a ThreadPool +/// +/// ```rust +/// # use rayon_core as rayon; +/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); +/// ``` +/// +/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s +/// threads. In addition, any other rayon operations called inside of `install()` will also +/// execute in the context of the `ThreadPool`. +/// +/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, +/// they will complete executing any remaining work that you have spawned, and automatically +/// terminate. +/// +/// +/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool +/// [`ThreadPool`]: struct.ThreadPool.html +/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new +/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build +/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install +pub struct ThreadPool { + registry: Arc, +} + +impl ThreadPool { + #[deprecated(note = "Use `ThreadPoolBuilder::build`")] + #[allow(deprecated)] + /// Deprecated in favor of `ThreadPoolBuilder::build`. + pub fn new(configuration: crate::Configuration) -> Result> { + Self::build(configuration.into_builder()).map_err(Box::from) + } + + pub(super) fn build( + builder: ThreadPoolBuilder, + ) -> Result + where + S: ThreadSpawn, + { + let registry = Registry::new(builder)?; + Ok(ThreadPool { registry }) + } + + /// Executes `op` within the threadpool. Any attempts to use + /// `join`, `scope`, or parallel iterators will then operate + /// within that threadpool. + /// + /// # Warning: thread-local data + /// + /// Because `op` is executing within the Rayon thread-pool, + /// thread-local data from the current thread will not be + /// accessible. + /// + /// # Panics + /// + /// If `op` should panic, that panic will be propagated. + /// + /// ## Using `install()` + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); + /// let n = pool.install(|| fib(20)); + /// println!("{}", n); + /// } + /// + /// fn fib(n: usize) -> usize { + /// if n == 0 || n == 1 { + /// return n; + /// } + /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` + /// return a + b; + /// } + /// ``` + pub fn install(&self, op: OP) -> R + where + OP: FnOnce() -> R + Send, + R: Send, + { + self.registry.in_worker(|_, _| op()) + } + + /// Executes `op` within every thread in the threadpool. Any attempts to use + /// `join`, `scope`, or parallel iterators will then operate within that + /// threadpool. + /// + /// Broadcasts are executed on each thread after they have exhausted their + /// local work queue, before they attempt work-stealing from other threads. + /// The goal of that strategy is to run everywhere in a timely manner + /// *without* being too disruptive to current work. There may be alternative + /// broadcast styles added in the future for more or less aggressive + /// injection, if the need arises. + /// + /// # Warning: thread-local data + /// + /// Because `op` is executing within the Rayon thread-pool, + /// thread-local data from the current thread will not be + /// accessible. + /// + /// # Panics + /// + /// If `op` should panic on one or more threads, exactly one panic + /// will be propagated, only after all threads have completed + /// (or panicked) their own `op`. + /// + /// # Examples + /// + /// ``` + /// # use rayon_core as rayon; + /// use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// fn main() { + /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); + /// + /// // The argument gives context, including the index of each thread. + /// let v: Vec = pool.broadcast(|ctx| ctx.index() * ctx.index()); + /// assert_eq!(v, &[0, 1, 4, 9, 16]); + /// + /// // The closure can reference the local stack + /// let count = AtomicUsize::new(0); + /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); + /// assert_eq!(count.into_inner(), 5); + /// } + /// ``` + pub fn broadcast(&self, op: OP) -> Vec + where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, + { + // We assert that `self.registry` has not terminated. + unsafe { broadcast::broadcast_in(op, &self.registry) } + } + + /// Returns the (current) number of threads in the thread pool. + /// + /// # Future compatibility note + /// + /// Note that unless this thread-pool was created with a + /// [`ThreadPoolBuilder`] that specifies the number of threads, + /// then this number may vary over time in future versions (see [the + /// `num_threads()` method for details][snt]). + /// + /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads + /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html + #[inline] + pub fn current_num_threads(&self) -> usize { + self.registry.num_threads() + } + + /// If called from a Rayon worker thread in this thread-pool, + /// returns the index of that thread; if not called from a Rayon + /// thread, or called from a Rayon thread that belongs to a + /// different thread-pool, returns `None`. + /// + /// The index for a given thread will not change over the thread's + /// lifetime. However, multiple threads may share the same index if + /// they are in distinct thread-pools. + /// + /// # Future compatibility note + /// + /// Currently, every thread-pool (including the global + /// thread-pool) has a fixed number of threads, but this may + /// change in future Rayon versions (see [the `num_threads()` method + /// for details][snt]). In that case, the index for a + /// thread would not change during its lifetime, but thread + /// indices may wind up being reused if threads are terminated and + /// restarted. + /// + /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads + #[inline] + pub fn current_thread_index(&self) -> Option { + let curr = self.registry.current_thread()?; + Some(curr.index()) + } + + /// Returns true if the current worker thread currently has "local + /// tasks" pending. This can be useful as part of a heuristic for + /// deciding whether to spawn a new task or execute code on the + /// current thread, particularly in breadth-first + /// schedulers. However, keep in mind that this is an inherently + /// racy check, as other worker threads may be actively "stealing" + /// tasks from our local deque. + /// + /// **Background:** Rayon's uses a [work-stealing] scheduler. The + /// key idea is that each thread has its own [deque] of + /// tasks. Whenever a new task is spawned -- whether through + /// `join()`, `Scope::spawn()`, or some other means -- that new + /// task is pushed onto the thread's *local* deque. Worker threads + /// have a preference for executing their own tasks; if however + /// they run out of tasks, they will go try to "steal" tasks from + /// other threads. This function therefore has an inherent race + /// with other active worker threads, which may be removing items + /// from the local deque. + /// + /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing + /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue + #[inline] + pub fn current_thread_has_pending_tasks(&self) -> Option { + let curr = self.registry.current_thread()?; + Some(!curr.local_deque_is_empty()) + } + + /// Execute `oper_a` and `oper_b` in the thread-pool and return + /// the results. Equivalent to `self.install(|| join(oper_a, + /// oper_b))`. + pub fn join(&self, oper_a: A, oper_b: B) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + self.install(|| join(oper_a, oper_b)) + } + + /// Creates a scope that executes within this thread-pool. + /// Equivalent to `self.install(|| scope(...))`. + /// + /// See also: [the `scope()` function][scope]. + /// + /// [scope]: fn.scope.html + pub fn scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, + { + self.install(|| scope(op)) + } + + /// Creates a scope that executes within this thread-pool. + /// Spawns from the same thread are prioritized in relative FIFO order. + /// Equivalent to `self.install(|| scope_fifo(...))`. + /// + /// See also: [the `scope_fifo()` function][scope_fifo]. + /// + /// [scope_fifo]: fn.scope_fifo.html + pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, + R: Send, + { + self.install(|| scope_fifo(op)) + } + + /// Creates a scope that spawns work into this thread-pool. + /// + /// See also: [the `in_place_scope()` function][in_place_scope]. + /// + /// [in_place_scope]: fn.in_place_scope.html + pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R, + { + do_in_place_scope(Some(&self.registry), op) + } + + /// Creates a scope that spawns work into this thread-pool in FIFO order. + /// + /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. + /// + /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html + pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&ScopeFifo<'scope>) -> R, + { + do_in_place_scope_fifo(Some(&self.registry), op) + } + + /// Spawns an asynchronous task in this thread-pool. This task will + /// run in the implicit, global scope, which means that it may outlast + /// the current stack frame -- therefore, it cannot capture any references + /// onto the stack (you will likely need a `move` closure). + /// + /// See also: [the `spawn()` function defined on scopes][spawn]. + /// + /// [spawn]: struct.Scope.html#method.spawn + pub fn spawn(&self, op: OP) + where + OP: FnOnce() + Send + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { spawn::spawn_in(op, &self.registry) } + } + + /// Spawns an asynchronous task in this thread-pool. This task will + /// run in the implicit, global scope, which means that it may outlast + /// the current stack frame -- therefore, it cannot capture any references + /// onto the stack (you will likely need a `move` closure). + /// + /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. + /// + /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo + pub fn spawn_fifo(&self, op: OP) + where + OP: FnOnce() + Send + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { spawn::spawn_fifo_in(op, &self.registry) } + } + + /// Spawns an asynchronous task on every thread in this thread-pool. This task + /// will run in the implicit, global scope, which means that it may outlast the + /// current stack frame -- therefore, it cannot capture any references onto the + /// stack (you will likely need a `move` closure). + pub fn spawn_broadcast(&self, op: OP) + where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.registry.terminate(); + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool") + .field("num_threads", &self.current_num_threads()) + .field("id", &self.registry.id()) + .finish() + } +} + +/// If called from a Rayon worker thread, returns the index of that +/// thread within its current pool; if not called from a Rayon thread, +/// returns `None`. +/// +/// The index for a given thread will not change over the thread's +/// lifetime. However, multiple threads may share the same index if +/// they are in distinct thread-pools. +/// +/// See also: [the `ThreadPool::current_thread_index()` method]. +/// +/// [m]: struct.ThreadPool.html#method.current_thread_index +/// +/// # Future compatibility note +/// +/// Currently, every thread-pool (including the global +/// thread-pool) has a fixed number of threads, but this may +/// change in future Rayon versions (see [the `num_threads()` method +/// for details][snt]). In that case, the index for a +/// thread would not change during its lifetime, but thread +/// indices may wind up being reused if threads are terminated and +/// restarted. +/// +/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads +#[inline] +pub fn current_thread_index() -> Option { + unsafe { + let curr = WorkerThread::current().as_ref()?; + Some(curr.index()) + } +} + +/// If called from a Rayon worker thread, indicates whether that +/// thread's local deque still has pending tasks. Otherwise, returns +/// `None`. For more information, see [the +/// `ThreadPool::current_thread_has_pending_tasks()` method][m]. +/// +/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks +#[inline] +pub fn current_thread_has_pending_tasks() -> Option { + unsafe { + let curr = WorkerThread::current().as_ref()?; + Some(!curr.local_deque_is_empty()) + } +} diff --git a/third_party/rust/rayon-core/src/thread_pool/test.rs b/third_party/rust/rayon-core/src/thread_pool/test.rs new file mode 100644 index 0000000000..ac750a6dc4 --- /dev/null +++ b/third_party/rust/rayon-core/src/thread_pool/test.rs @@ -0,0 +1,366 @@ +#![cfg(test)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; + +use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + thread_pool.install(|| { + panic!("Hello, world!"); + }); +} + +#[test] +fn workers_stop() { + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = thread_pool.install(|| { + // do some work on these threads + join_a_lot(22); + + Arc::clone(&thread_pool.registry) + }); + assert_eq!(registry.num_threads(), 22); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +fn join_a_lot(n: usize) { + if n > 0 { + join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); + } +} + +#[test] +fn sleeper_stop() { + use std::{thread, time}; + + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = Arc::clone(&thread_pool.registry); + + // Give time for at least some of the thread pool to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +/// Creates a start/exit handler that increments an atomic counter. +fn count_handler() -> (Arc, impl Fn(usize)) { + let count = Arc::new(AtomicUsize::new(0)); + (Arc::clone(&count), move |_| { + count.fetch_add(1, Ordering::SeqCst); + }) +} + +/// Wait until a counter is no longer shared, then return its value. +fn wait_for_counter(mut counter: Arc) -> usize { + use std::{thread, time}; + + for _ in 0..60 { + counter = match Arc::try_unwrap(counter) { + Ok(counter) => return counter.into_inner(), + Err(counter) => { + thread::sleep(time::Duration::from_secs(1)); + counter + } + }; + } + + // That's too long! + panic!("Counter is still shared!"); +} + +#[test] +fn failed_thread_stack() { + // Note: we first tried to force failure with a `usize::MAX` stack, but + // macOS and Windows weren't fazed, or at least didn't fail the way we want. + // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a + // 2GB stack, so it might not fail until the second thread. + let stack_size = ::std::isize::MAX as usize; + + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .stack_size(stack_size) + .start_handler(start_handler) + .exit_handler(exit_handler); + + let pool = builder.build(); + assert!(pool.is_err(), "thread stack should have failed!"); + + // With such a huge stack, 64-bit will probably fail on the first thread; + // 32-bit might manage the first 2GB, but certainly fail the second. + let start_count = wait_for_counter(start_count); + assert!(start_count <= 1); + assert_eq!(start_count, wait_for_counter(exit_count)); +} + +#[test] +fn panic_thread_name() { + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .start_handler(start_handler) + .exit_handler(exit_handler) + .thread_name(|i| { + if i >= 5 { + panic!(); + } + format!("panic_thread_name#{}", i) + }); + + let pool = crate::unwind::halt_unwinding(|| builder.build()); + assert!(pool.is_err(), "thread-name panic should propagate!"); + + // Assuming they're created in order, threads 0 through 4 should have + // been started already, and then terminated by the panic. + assert_eq!(5, wait_for_counter(start_count)); + assert_eq!(5, wait_for_counter(exit_count)); +} + +#[test] +fn self_install() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + // If the inner `install` blocks, then nothing will actually run it! + assert!(pool.install(|| pool.install(|| true))); +} + +#[test] +fn mutual_install() { + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +fn mutual_install_sleepy() { + use std::{thread, time}; + + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // Give `pool1` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // Give `pool2` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +#[allow(deprecated)] +fn check_thread_pool_new() { + let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); + assert_eq!(pool.current_num_threads(), 22); +} + +macro_rules! test_scope_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + pool.$scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn scope_lifo_order() { + let vec = test_scope_order!(scope => spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn scope_fifo_order() { + let vec = test_scope_order!(scope_fifo => spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_spawn_order { + ($spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = &builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + pool.$spawn(move || { + tx.send(i).unwrap(); + }); + } + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn spawn_lifo_order() { + let vec = test_spawn_order!(spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn spawn_fifo_order() { + let vec = test_spawn_order!(spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn nested_scopes() { + // Create matching scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) + where + OP: FnOnce(&[&Scope<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +fn nested_fifo_scopes() { + // Create matching fifo scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) + where + OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope_fifo(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn_fifo(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +fn in_place_scope_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} + +#[test] +fn in_place_scope_fifo_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope_fifo(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn_fifo(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} diff --git a/third_party/rust/rayon-core/src/unwind.rs b/third_party/rust/rayon-core/src/unwind.rs new file mode 100644 index 0000000000..9671fa5782 --- /dev/null +++ b/third_party/rust/rayon-core/src/unwind.rs @@ -0,0 +1,31 @@ +//! Package up unwind recovery. Note that if you are in some sensitive +//! place, you can use the `AbortIfPanic` helper to protect against +//! accidental panics in the rayon code itself. + +use std::any::Any; +use std::panic::{self, AssertUnwindSafe}; +use std::thread; + +/// Executes `f` and captures any panic, translating that panic into a +/// `Err` result. The assumption is that any panic will be propagated +/// later with `resume_unwinding`, and hence `f` can be treated as +/// exception safe. +pub(super) fn halt_unwinding(func: F) -> thread::Result +where + F: FnOnce() -> R, +{ + panic::catch_unwind(AssertUnwindSafe(func)) +} + +pub(super) fn resume_unwinding(payload: Box) -> ! { + panic::resume_unwind(payload) +} + +pub(super) struct AbortIfPanic; + +impl Drop for AbortIfPanic { + fn drop(&mut self) { + eprintln!("Rayon: detected unexpected panic; aborting"); + ::std::process::abort(); + } +} -- cgit v1.2.3