summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:32 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:32 +0000
commit4547b622d8d29df964fa2914213088b148c498fc (patch)
tree9fc6b25f3c3add6b745be9a2400a6e96140046e9 /vendor/rayon-core/src
parentReleasing progress-linux version 1.66.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-4547b622d8d29df964fa2914213088b148c498fc.tar.xz
rustc-4547b622d8d29df964fa2914213088b148c498fc.zip
Merging upstream version 1.67.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src')
-rw-r--r--vendor/rayon-core/src/broadcast/mod.rs148
-rwxr-xr-xvendor/rayon-core/src/broadcast/test.rs216
-rw-r--r--vendor/rayon-core/src/compile_fail/rc_return.rs8
-rw-r--r--vendor/rayon-core/src/compile_fail/rc_upvar.rs8
-rw-r--r--vendor/rayon-core/src/job.rs105
-rw-r--r--vendor/rayon-core/src/latch.rs15
-rw-r--r--vendor/rayon-core/src/lib.rs52
-rw-r--r--vendor/rayon-core/src/log.rs64
-rw-r--r--vendor/rayon-core/src/registry.rs117
-rw-r--r--vendor/rayon-core/src/scope/mod.rs176
-rw-r--r--vendor/rayon-core/src/scope/test.rs92
-rw-r--r--vendor/rayon-core/src/sleep/README.md4
-rw-r--r--vendor/rayon-core/src/sleep/counters.rs2
-rw-r--r--vendor/rayon-core/src/sleep/mod.rs2
-rw-r--r--vendor/rayon-core/src/spawn/mod.rs17
-rw-r--r--vendor/rayon-core/src/test.rs5
-rw-r--r--vendor/rayon-core/src/thread_pool/mod.rs68
-rw-r--r--vendor/rayon-core/src/thread_pool/test.rs4
18 files changed, 902 insertions, 201 deletions
diff --git a/vendor/rayon-core/src/broadcast/mod.rs b/vendor/rayon-core/src/broadcast/mod.rs
new file mode 100644
index 000000000..452aa71b6
--- /dev/null
+++ b/vendor/rayon-core/src/broadcast/mod.rs
@@ -0,0 +1,148 @@
+use crate::job::{ArcJob, StackJob};
+use crate::registry::{Registry, WorkerThread};
+use crate::scope::ScopeLatch;
+use std::fmt;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+mod test;
+
+/// Executes `op` within every thread in the current threadpool. If this is
+/// called from a non-Rayon thread, it will execute in the global threadpool.
+/// Any attempts to use `join`, `scope`, or parallel iterators will then operate
+/// within that threadpool. When the call has completed on each thread, returns
+/// a vector containing all of their return values.
+///
+/// For more information, see the [`ThreadPool::broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.broadcast
+pub fn broadcast<OP, R>(op: OP) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ // We assert that current registry has not terminated.
+ unsafe { broadcast_in(op, &Registry::current()) }
+}
+
+/// Spawns an asynchronous task on every thread in this thread-pool. This task
+/// will run in the implicit, global scope, which means that it may outlast the
+/// current stack frame -- therefore, it cannot capture any references onto the
+/// stack (you will likely need a `move` closure).
+///
+/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.spawn_broadcast
+pub fn spawn_broadcast<OP>(op: OP)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ // We assert that current registry has not terminated.
+ unsafe { spawn_broadcast_in(op, &Registry::current()) }
+}
+
+/// Provides context to a closure called by `broadcast`.
+pub struct BroadcastContext<'a> {
+ worker: &'a WorkerThread,
+
+ /// Make sure to prevent auto-traits like `Send` and `Sync`.
+ _marker: PhantomData<&'a mut dyn Fn()>,
+}
+
+impl<'a> BroadcastContext<'a> {
+ pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
+ let worker_thread = WorkerThread::current();
+ assert!(!worker_thread.is_null());
+ f(BroadcastContext {
+ worker: unsafe { &*worker_thread },
+ _marker: PhantomData,
+ })
+ }
+
+ /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
+ #[inline]
+ pub fn index(&self) -> usize {
+ self.worker.index()
+ }
+
+ /// The number of threads receiving the broadcast in the thread pool.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Future versions of Rayon might vary the number of threads over time, but
+ /// this method will always return the number of threads which are actually
+ /// receiving your particular `broadcast` call.
+ #[inline]
+ pub fn num_threads(&self) -> usize {
+ self.worker.registry().num_threads()
+ }
+}
+
+impl<'a> fmt::Debug for BroadcastContext<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BroadcastContext")
+ .field("index", &self.index())
+ .field("num_threads", &self.num_threads())
+ .field("pool_id", &self.worker.registry().id())
+ .finish()
+ }
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function will not return until all
+/// threads have completed the `op`.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ let f = move |injected: bool| {
+ debug_assert!(injected);
+ BroadcastContext::with(&op)
+ };
+
+ let n_threads = registry.num_threads();
+ let current_thread = WorkerThread::current().as_ref();
+ let latch = ScopeLatch::with_count(n_threads, current_thread);
+ let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect();
+ let job_refs = jobs.iter().map(|job| job.as_job_ref());
+
+ registry.inject_broadcast(job_refs);
+
+ // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
+ latch.wait(current_thread);
+ jobs.into_iter().map(|job| job.into_result()).collect()
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function returns immediately after
+/// injecting the jobs.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ let job = ArcJob::new({
+ let registry = Arc::clone(registry);
+ move || {
+ registry.catch_unwind(|| BroadcastContext::with(&op));
+ registry.terminate(); // (*) permit registry to terminate now
+ }
+ });
+
+ let n_threads = registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| {
+ // Ensure that registry cannot terminate until this job has executed
+ // on each thread. This ref is decremented at the (*) above.
+ registry.increment_terminate_count();
+
+ ArcJob::as_static_job_ref(&job)
+ });
+
+ registry.inject_broadcast(job_refs);
+}
diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs
new file mode 100755
index 000000000..a765cb034
--- /dev/null
+++ b/vendor/rayon-core/src/broadcast/test.rs
@@ -0,0 +1,216 @@
+#![cfg(test)]
+
+use crate::ThreadPoolBuilder;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::{thread, time};
+
+#[test]
+fn broadcast_global() {
+ let v = crate::broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn spawn_broadcast_global() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_pool() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+fn spawn_broadcast_pool() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+fn broadcast_self() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+fn spawn_broadcast_self() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+fn broadcast_mutual() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ pool2.broadcast(|_| {
+ pool1.broadcast(|_| {
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+fn spawn_broadcast_mutual() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+fn broadcast_mutual_sleepy() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.broadcast(|_| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.broadcast(|_| {
+ thread::sleep(time::Duration::from_millis(100));
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+fn spawn_broadcast_mutual_sleepy() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.spawn_broadcast(move |_| {
+ thread::sleep(time::Duration::from_millis(100));
+ tx.send(()).unwrap();
+ })
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+fn broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+fn spawn_broadcast_panic_one() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 1);
+}
+
+#[test]
+fn broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+fn spawn_broadcast_panic_many() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 4);
+}
+
+#[test]
+fn broadcast_sleep_race() {
+ let test_duration = time::Duration::from_secs(1);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let start = time::Instant::now();
+ while start.elapsed() < test_duration {
+ pool.broadcast(|ctx| {
+ // A slight spread of sleep duration increases the chance that one
+ // of the threads will race in the pool's idle sleep afterward.
+ thread::sleep(time::Duration::from_micros(ctx.index() as u64));
+ });
+ }
+}
diff --git a/vendor/rayon-core/src/compile_fail/rc_return.rs b/vendor/rayon-core/src/compile_fail/rc_return.rs
index 164f8ce5e..93e3a6038 100644
--- a/vendor/rayon-core/src/compile_fail/rc_return.rs
+++ b/vendor/rayon-core/src/compile_fail/rc_return.rs
@@ -2,9 +2,7 @@
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
-}
+rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
``` */
mod left {}
@@ -13,9 +11,7 @@ mod left {}
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
-}
+rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
``` */
mod right {}
diff --git a/vendor/rayon-core/src/compile_fail/rc_upvar.rs b/vendor/rayon-core/src/compile_fail/rc_upvar.rs
index 62895bf22..d8aebcfcb 100644
--- a/vendor/rayon-core/src/compile_fail/rc_upvar.rs
+++ b/vendor/rayon-core/src/compile_fail/rc_upvar.rs
@@ -2,10 +2,8 @@
use std::rc::Rc;
-fn main() {
- let r = Rc::new(22);
- rayon_core::join(|| r.clone(), || r.clone());
- //~^ ERROR
-}
+let r = Rc::new(22);
+rayon_core::join(|| r.clone(), || r.clone());
+//~^ ERROR
``` */
diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs
index a71f1b0e9..b7a3dae18 100644
--- a/vendor/rayon-core/src/job.rs
+++ b/vendor/rayon-core/src/job.rs
@@ -4,6 +4,7 @@ use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
+use std::sync::Arc;
pub(super) enum JobResult<T> {
None,
@@ -20,7 +21,7 @@ pub(super) trait Job {
/// Unsafe: this may be called from a different thread than the one
/// which scheduled the job, so the implementer must ensure the
/// appropriate traits are met, whether `Send`, `Sync`, or both.
- unsafe fn execute(this: *const Self);
+ unsafe fn execute(this: *const ());
}
/// Effectively a Job trait object. Each JobRef **must** be executed
@@ -45,17 +46,15 @@ impl JobRef {
where
T: Job,
{
- let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
-
// erase types:
JobRef {
pointer: data as *const (),
- execute_fn: mem::transmute(fn_ptr),
+ execute_fn: <T as Job>::execute,
}
}
#[inline]
- pub(super) unsafe fn execute(&self) {
+ pub(super) unsafe fn execute(self) {
(self.execute_fn)(self.pointer)
}
}
@@ -108,18 +107,11 @@ where
F: FnOnce(bool) -> R + Send,
R: Send,
{
- unsafe fn execute(this: *const Self) {
- fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
- move || func(true)
- }
-
- let this = &*this;
+ unsafe fn execute(this: *const ()) {
+ let this = &*(this as *const Self);
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
- (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
- Ok(x) => JobResult::Ok(x),
- Err(x) => JobResult::Panic(x),
- };
+ (*this.result.get()) = JobResult::call(func);
this.latch.set();
mem::forget(abort);
}
@@ -135,25 +127,30 @@ pub(super) struct HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- job: UnsafeCell<Option<BODY>>,
+ job: BODY,
}
impl<BODY> HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- pub(super) fn new(func: BODY) -> Self {
- HeapJob {
- job: UnsafeCell::new(Some(func)),
- }
+ pub(super) fn new(job: BODY) -> Box<Self> {
+ Box::new(HeapJob { job })
}
/// Creates a `JobRef` from this job -- note that this hides all
/// lifetimes, so it is up to you to ensure that this JobRef
/// doesn't outlive any data that it closes over.
- pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
- let this: *const Self = mem::transmute(self);
- JobRef::new(this)
+ pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
+ JobRef::new(Box::into_raw(self))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { self.into_job_ref() }
}
}
@@ -161,14 +158,63 @@ impl<BODY> Job for HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- unsafe fn execute(this: *const Self) {
- let this: Box<Self> = mem::transmute(this);
- let job = (*this.job.get()).take().unwrap();
- job();
+ unsafe fn execute(this: *const ()) {
+ let this = Box::from_raw(this as *mut Self);
+ (this.job)();
+ }
+}
+
+/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
+/// be turned into multiple `JobRef`s and called multiple times.
+pub(super) struct ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ job: BODY,
+}
+
+impl<BODY> ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ pub(super) fn new(job: BODY) -> Arc<Self> {
+ Arc::new(ArcJob { job })
+ }
+
+ /// Creates a `JobRef` from this job -- note that this hides all
+ /// lifetimes, so it is up to you to ensure that this JobRef
+ /// doesn't outlive any data that it closes over.
+ pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
+ JobRef::new(Arc::into_raw(Arc::clone(this)))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { Self::as_job_ref(this) }
+ }
+}
+
+impl<BODY> Job for ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ unsafe fn execute(this: *const ()) {
+ let this = Arc::from_raw(this as *mut Self);
+ (this.job)();
}
}
impl<T> JobResult<T> {
+ fn call(func: impl FnOnce(bool) -> T) -> Self {
+ match unwind::halt_unwinding(|| func(true)) {
+ Ok(x) => JobResult::Ok(x),
+ Err(x) => JobResult::Panic(x),
+ }
+ }
+
/// Convert the `JobResult` for a job that has finished (and hence
/// its JobResult is populated) into its return value.
///
@@ -204,10 +250,11 @@ impl JobFifo {
}
impl Job for JobFifo {
- unsafe fn execute(this: *const Self) {
+ unsafe fn execute(this: *const ()) {
// We "execute" a queue by executing its first job, FIFO.
+ let this = &*(this as *const Self);
loop {
- match (*this).inner.steal() {
+ match this.inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs
index b84fbe371..090929374 100644
--- a/vendor/rayon-core/src/latch.rs
+++ b/vendor/rayon-core/src/latch.rs
@@ -196,14 +196,14 @@ impl<'r> Latch for SpinLatch<'r> {
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
cross_registry = Arc::clone(self.registry);
- &*cross_registry
+ &cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
// thread which is performing `set` is itself ensuring
// that the registry stays alive. However, that doesn't
// include this *particular* `Arc` handle if the waiting
// thread then exits, so we must completely dereference it.
- &**self.registry
+ self.registry
};
let target_worker_index = self.target_worker_index;
@@ -286,9 +286,14 @@ pub(super) struct CountLatch {
impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
+ Self::with_count(1)
+ }
+
+ #[inline]
+ pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
core_latch: CoreLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
@@ -337,10 +342,10 @@ pub(super) struct CountLockLatch {
impl CountLockLatch {
#[inline]
- pub(super) fn new() -> CountLockLatch {
+ pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
diff --git a/vendor/rayon-core/src/lib.rs b/vendor/rayon-core/src/lib.rs
index 246b80070..b31a2d7e0 100644
--- a/vendor/rayon-core/src/lib.rs
+++ b/vendor/rayon-core/src/lib.rs
@@ -44,7 +44,6 @@
//! conflicting requirements will need to be resolved before the build will
//! succeed.
-#![doc(html_root_url = "https://docs.rs/rayon-core/1.9")]
#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
#![deny(unreachable_pub)]
@@ -63,6 +62,7 @@ mod log;
#[macro_use]
mod private;
+mod broadcast;
mod job;
mod join;
mod latch;
@@ -76,6 +76,7 @@ mod unwind;
mod compile_fail;
mod test;
+pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
@@ -185,6 +186,7 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
///
/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
#[deprecated(note = "Use `ThreadPoolBuilder`")]
+#[derive(Default)]
pub struct Configuration {
builder: ThreadPoolBuilder,
}
@@ -269,7 +271,7 @@ impl ThreadPoolBuilder {
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -339,7 +341,7 @@ impl<S> ThreadPoolBuilder<S> {
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -384,6 +386,39 @@ impl<S> ThreadPoolBuilder<S> {
/// Ok(())
/// }
/// ```
+ ///
+ /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
+ /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
+ /// [`build_scoped`](#method.build_scoped).
+ ///
+ /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// std::thread::scope(|scope| {
+ /// let pool = rayon::ThreadPoolBuilder::new()
+ /// .spawn_handler(|thread| {
+ /// let mut builder = std::thread::Builder::new();
+ /// if let Some(name) = thread.name() {
+ /// builder = builder.name(name.to_string());
+ /// }
+ /// if let Some(size) = thread.stack_size() {
+ /// builder = builder.stack_size(size);
+ /// }
+ /// builder.spawn_scoped(scope, || {
+ /// // Add any scoped initialization here, then run!
+ /// thread.run()
+ /// })?;
+ /// Ok(())
+ /// })
+ /// .build()?;
+ ///
+ /// pool.install(|| println!("Hello from my custom scoped thread!"));
+ /// Ok(())
+ /// })
+ /// }
+ /// ```
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
@@ -526,7 +561,7 @@ impl<S> ThreadPoolBuilder<S> {
/// to true, however, workers will prefer to execute in a
/// *breadth-first* fashion -- that is, they will search for jobs at
/// the *bottom* of their local deque. (At present, workers *always*
- /// steal from the bottom of other worker's deques, regardless of
+ /// steal from the bottom of other workers' deques, regardless of
/// the setting of this flag.)
///
/// If you think of the tasks as a tree, where a parent task
@@ -749,15 +784,6 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
}
#[allow(deprecated)]
-impl Default for Configuration {
- fn default() -> Self {
- Configuration {
- builder: Default::default(),
- }
- }
-}
-
-#[allow(deprecated)]
impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.builder.fmt(f)
diff --git a/vendor/rayon-core/src/log.rs b/vendor/rayon-core/src/log.rs
index e1ff827df..7b6daf0ab 100644
--- a/vendor/rayon-core/src/log.rs
+++ b/vendor/rayon-core/src/log.rs
@@ -93,6 +93,9 @@ pub(super) enum Event {
/// A job was removed from the global queue.
JobUninjected { worker: usize },
+ /// A job was broadcasted to N threads.
+ JobBroadcast { count: usize },
+
/// When announcing a job, this was the value of the counters we observed.
///
/// No effect on thread state, just a debugging event.
@@ -124,15 +127,15 @@ impl Logger {
let (sender, receiver) = crossbeam_channel::unbounded();
- if env_log.starts_with("tail:") {
- let filename = env_log["tail:".len()..].to_string();
+ if let Some(filename) = env_log.strip_prefix("tail:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
});
} else if env_log == "all" {
::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
- } else if env_log.starts_with("profile:") {
- let filename = env_log["profile:".len()..].to_string();
+ } else if let Some(filename) = env_log.strip_prefix("profile:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
});
@@ -140,9 +143,9 @@ impl Logger {
panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
}
- return Logger {
+ Logger {
sender: Some(sender),
- };
+ }
}
fn disabled() -> Logger {
@@ -175,19 +178,12 @@ impl Logger {
let timeout = std::time::Duration::from_secs(30);
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- break;
- } else {
- events.push(event);
- }
- }
-
- Err(_) => break,
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ break;
}
+ events.push(event);
if events.len() == capacity {
break;
}
@@ -219,31 +215,25 @@ impl Logger {
let mut skipped = false;
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- // We ignore Flush events in tail mode --
- // we're really just looking for
- // deadlocks.
- continue;
- } else {
- if events.len() == capacity {
- let event = events.pop_front().unwrap();
- state.simulate(&event);
- skipped = true;
- }
-
- events.push_back(event);
- }
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ // We ignore Flush events in tail mode --
+ // we're really just looking for
+ // deadlocks.
+ continue;
+ } else {
+ if events.len() == capacity {
+ let event = events.pop_front().unwrap();
+ state.simulate(&event);
+ skipped = true;
}
- Err(_) => break,
+ events.push_back(event);
}
}
if skipped {
- write!(writer, "...\n").unwrap();
+ writeln!(writer, "...").unwrap();
skipped = false;
}
@@ -417,7 +407,7 @@ impl SimulatorState {
}
}
- write!(w, "\n")?;
+ writeln!(w)?;
Ok(())
}
}
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs
index 7405fe8e7..279e298d2 100644
--- a/vendor/rayon-core/src/registry.rs
+++ b/vendor/rayon-core/src/registry.rs
@@ -8,7 +8,6 @@ use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
-use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
@@ -16,10 +15,8 @@ use std::hash::Hasher;
use std::io;
use std::mem;
use std::ptr;
-#[allow(deprecated)]
-use std::sync::atomic::ATOMIC_USIZE_INIT;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Once};
+use std::sync::{Arc, Mutex, Once};
use std::thread;
use std::usize;
@@ -29,6 +26,7 @@ pub struct ThreadBuilder {
name: Option<String>,
stack_size: Option<usize>,
worker: Worker<JobRef>,
+ stealer: Stealer<JobRef>,
registry: Arc<Registry>,
index: usize,
}
@@ -41,7 +39,7 @@ impl ThreadBuilder {
/// Gets the string that was specified by `ThreadPoolBuilder::name()`.
pub fn name(&self) -> Option<&str> {
- self.name.as_ref().map(String::as_str)
+ self.name.as_deref()
}
/// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
@@ -52,7 +50,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.registry, self.index) }
+ unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
}
}
@@ -135,6 +133,7 @@ pub(super) struct Registry {
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
+ broadcasts: Mutex<Vec<Worker<JobRef>>>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
@@ -232,12 +231,21 @@ impl Registry {
})
.unzip();
+ let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
+ .map(|_| {
+ let worker = Worker::new_fifo();
+ let stealer = worker.stealer();
+ (worker, stealer)
+ })
+ .unzip();
+
let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: Injector::new(),
+ broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
@@ -247,12 +255,13 @@ impl Registry {
// If we return early or panic, make sure to terminate existing threads.
let t1000 = Terminator(&registry);
- for (index, worker) in workers.into_iter().enumerate() {
+ for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
registry: Arc::clone(&registry),
worker,
+ stealer,
index,
};
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
@@ -322,19 +331,14 @@ impl Registry {
self.thread_infos.len()
}
- pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
- match self.panic_handler {
- Some(ref handler) => {
- // If the customizable panic handler itself panics,
- // then we abort.
- let abort_guard = unwind::AbortIfPanic;
+ pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
+ if let Err(err) = unwind::halt_unwinding(f) {
+ // If there is no handler, or if that handler itself panics, then we abort.
+ let abort_guard = unwind::AbortIfPanic;
+ if let Some(ref handler) = self.panic_handler {
handler(err);
mem::forget(abort_guard);
}
- None => {
- // Default panic handler aborts.
- let _ = unwind::AbortIfPanic; // let this drop.
- }
}
}
@@ -378,7 +382,7 @@ impl Registry {
}
/// Push a job into the "external jobs" queue; it will be taken by
- /// whatever worker has nothing to do. Use this is you know that
+ /// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
self.log(|| JobsInjected {
@@ -425,6 +429,40 @@ impl Registry {
}
}
+ /// Push a job into each thread's own "external jobs" queue; it will be
+ /// executed only on that thread, when it has nothing else to do locally,
+ /// before it tries to steal other work.
+ ///
+ /// **Panics** if not given exactly as many jobs as there are threads.
+ pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
+ assert_eq!(self.num_threads(), injected_jobs.len());
+ self.log(|| JobBroadcast {
+ count: self.num_threads(),
+ });
+ {
+ let broadcasts = self.broadcasts.lock().unwrap();
+
+ // It should not be possible for `state.terminate` to be true
+ // here. It is only set to true when the user creates (and
+ // drops) a `ThreadPool`; and, in that case, they cannot be
+ // calling `inject_broadcast()` later, since they dropped their
+ // `ThreadPool`.
+ debug_assert_ne!(
+ self.terminate_count.load(Ordering::Acquire),
+ 0,
+ "inject_broadcast() sees state.terminate as true"
+ );
+
+ assert_eq!(broadcasts.len(), injected_jobs.len());
+ for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
+ worker.push(job_ref);
+ }
+ }
+ for i in 0..self.num_threads() {
+ self.sleep.notify_worker_latch_is_set(i);
+ }
+ }
+
/// If already in a worker-thread of this registry, just execute `op`.
/// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
/// completes and return its return value. If `op` panics, that panic will
@@ -594,6 +632,9 @@ pub(super) struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,
+ /// the "stealer" half of the worker's broadcast deque
+ stealer: Stealer<JobRef>,
+
/// local queue used for `spawn_fifo` indirection
fifo: JobFifo,
@@ -689,9 +730,20 @@ impl WorkerThread {
if popped_job.is_some() {
self.log(|| JobPopped { worker: self.index });
+ return popped_job;
}
- popped_job
+ loop {
+ match self.stealer.steal() {
+ Steal::Success(job) => return Some(job),
+ Steal::Empty => return None,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ fn has_injected_job(&self) -> bool {
+ !self.stealer.is_empty() || self.registry.has_injected_job()
}
/// Wait until the latch is set. Try to keep busy by popping and
@@ -731,7 +783,7 @@ impl WorkerThread {
} else {
self.registry
.sleep
- .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
+ .no_work_found(&mut idle_state, latch, || self.has_injected_job())
}
}
@@ -799,9 +851,15 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
+unsafe fn main_loop(
+ worker: Worker<JobRef>,
+ stealer: Stealer<JobRef>,
+ registry: Arc<Registry>,
+ index: usize,
+) {
let worker_thread = &WorkerThread {
worker,
+ stealer,
fifo: JobFifo::new(),
index,
rng: XorShift64Star::new(),
@@ -820,12 +878,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we started a thread.
if let Some(ref handler) = registry.start_handler {
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
}
let my_terminate_latch = &registry.thread_infos[index].terminate;
@@ -848,12 +901,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
// We're already exiting the thread, there's nothing else to do.
}
}
@@ -895,8 +943,7 @@ impl XorShift64Star {
let mut seed = 0;
while seed == 0 {
let mut hasher = DefaultHasher::new();
- #[allow(deprecated)]
- static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
seed = hasher.finish();
}
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
index f8d90ce20..25cda832e 100644
--- a/vendor/rayon-core/src/scope/mod.rs
+++ b/vendor/rayon-core/src/scope/mod.rs
@@ -5,7 +5,8 @@
//! [`in_place_scope()`]: fn.in_place_scope.html
//! [`join()`]: ../join/join.fn.html
-use crate::job::{HeapJob, JobFifo};
+use crate::broadcast::BroadcastContext;
+use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, CountLockLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
@@ -38,7 +39,7 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
-enum ScopeLatch {
+pub(super) enum ScopeLatch {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
@@ -127,7 +128,7 @@ struct ScopeBase<'scope> {
/// Task execution potentially starts as soon as `spawn()` is called.
/// The task will end sometime before `scope()` returns. Note that the
/// *closure* given to scope may return much earlier. In general
-/// the lifetime of a scope created like `scope(body) goes something like this:
+/// the lifetime of a scope created like `scope(body)` goes something like this:
///
/// - Scope begins when `scope(body)` is called
/// - Scope body `body()` is invoked
@@ -241,7 +242,7 @@ struct ScopeBase<'scope> {
/// });
///
/// // That closure is fine, but now we can't use `ok` anywhere else,
-/// // since it is owend by the previous task:
+/// // since it is owned by the previous task:
/// // s.spawn(|_| println!("ok: {:?}", ok));
/// });
/// ```
@@ -538,18 +539,37 @@ impl<'scope> Scope<'scope> {
where
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // Since `Scope` implements `Sync`, we can't be sure that we're still in a
- // thread of this pool, so we can't just push to the local worker thread.
- // Also, this might be an in-place scope.
- self.base.registry.inject_or_push(job_ref);
- }
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = unsafe { scope_ptr.as_ref() };
+ scope.base.execute_job(move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // Since `Scope` implements `Sync`, we can't be sure that we're still in a
+ // thread of this pool, so we can't just push to the local worker thread.
+ // Also, this might be an in-place scope.
+ self.base.registry.inject_or_push(job_ref);
+ }
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = unsafe { scope_ptr.as_ref() };
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ scope.base.execute_job(func);
+ });
+ self.base.inject_broadcast(job)
}
}
@@ -579,24 +599,44 @@ impl<'scope> ScopeFifo<'scope> {
where
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // If we're in the pool, use our scope's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match self.base.registry.current_thread() {
- Some(worker) => {
- let fifo = &self.fifos[worker.index()];
- worker.push(fifo.push(job_ref));
- }
- None => self.base.registry.inject(&[job_ref]),
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = unsafe { scope_ptr.as_ref() };
+ scope.base.execute_job(move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // If we're in the pool, use our scope's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match self.base.registry.current_thread() {
+ Some(worker) => {
+ let fifo = &self.fifos[worker.index()];
+ // SAFETY: this job will execute before the scope ends.
+ unsafe { worker.push(fifo.push(job_ref)) };
}
+ None => self.base.registry.inject(&[job_ref]),
}
}
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = unsafe { scope_ptr.as_ref() };
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ scope.base.execute_job(func);
+ });
+ self.base.inject_broadcast(job)
+ }
}
impl<'scope> ScopeBase<'scope> {
@@ -619,6 +659,29 @@ impl<'scope> ScopeBase<'scope> {
self.job_completed_latch.increment();
}
+ fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
+ where
+ FUNC: FnOnce() + Send + 'scope,
+ {
+ unsafe {
+ self.increment();
+ job.into_job_ref()
+ }
+ }
+
+ fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
+ where
+ FUNC: Fn() + Send + Sync + 'scope,
+ {
+ let n_threads = self.registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| unsafe {
+ self.increment();
+ ArcJob::as_job_ref(&job)
+ });
+
+ self.registry.inject_broadcast(job_refs);
+ }
+
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
@@ -687,14 +750,18 @@ impl<'scope> ScopeBase<'scope> {
impl ScopeLatch {
fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
match owner {
Some(owner) => ScopeLatch::Stealing {
- latch: CountLatch::new(),
+ latch: CountLatch::with_count(count),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => ScopeLatch::Blocking {
- latch: CountLockLatch::new(),
+ latch: CountLockLatch::with_count(count),
},
}
}
@@ -706,30 +773,32 @@ impl ScopeLatch {
}
}
- fn set(&self) {
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match self {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
+ } => unsafe {
+ let owner = owner.expect("owner thread");
+ debug_assert_eq!(registry.id(), owner.registry().id());
+ debug_assert_eq!(*worker_index, owner.index());
+ owner.wait_until(latch);
+ },
+ ScopeLatch::Blocking { latch } => latch.wait(),
}
}
+}
- fn wait(&self, owner: Option<&WorkerThread>) {
+impl Latch for ScopeLatch {
+ fn set(&self) {
match self {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
- } => unsafe {
- let owner = owner.expect("owner thread");
- debug_assert_eq!(registry.id(), owner.registry().id());
- debug_assert_eq!(*worker_index, owner.index());
- owner.wait_until(latch);
- },
- ScopeLatch::Blocking { latch } => latch.wait(),
+ } => latch.set_and_tickle_one(registry, *worker_index),
+ ScopeLatch::Blocking { latch } => latch.set(),
}
}
}
@@ -769,3 +838,22 @@ impl fmt::Debug for ScopeLatch {
}
}
}
+
+/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
+///
+/// Unsafe code is still required to dereference the pointer, but that's fine in
+/// scope jobs that are guaranteed to execute before the scope ends.
+struct ScopePtr<T>(*const T);
+
+// SAFETY: !Send for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Send for ScopePtr<T> {}
+
+// SAFETY: !Sync for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Sync for ScopePtr<T> {}
+
+impl<T> ScopePtr<T> {
+ // Helper to avoid disjoint captures of `scope_ptr.0`
+ unsafe fn as_ref(&self) -> &T {
+ &*self.0
+ }
+}
diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs
index 88f9e9548..00dd18c92 100644
--- a/vendor/rayon-core/src/scope/test.rs
+++ b/vendor/rayon-core/src/scope/test.rs
@@ -6,7 +6,7 @@ use rand_xorshift::XorShiftRng;
use std::cmp;
use std::iter::once;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex;
+use std::sync::{Barrier, Mutex};
use std::vec;
#[test]
@@ -42,7 +42,7 @@ fn scope_divide_and_conquer() {
scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
let counter_s = &AtomicUsize::new(0);
- divide_and_conquer_seq(&counter_s, 1024);
+ divide_and_conquer_seq(counter_s, 1024);
let p = counter_p.load(Ordering::SeqCst);
let s = counter_s.load(Ordering::SeqCst);
@@ -75,7 +75,7 @@ struct Tree<T: Send> {
}
impl<T: Send> Tree<T> {
- fn iter<'s>(&'s self) -> vec::IntoIter<&'s T> {
+ fn iter(&self) -> vec::IntoIter<&T> {
once(&self.value)
.chain(self.children.iter().flat_map(Tree::iter))
.collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
@@ -513,3 +513,89 @@ fn mixed_lifetime_scope_fifo() {
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}
+
+#[test]
+fn scope_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_fifo_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope_fifo(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_spawn_broadcast_nested() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|s, _| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_spawn_broadcast_barrier() {
+ let barrier = Barrier::new(8);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.in_place_scope(|s| {
+ s.spawn_broadcast(|_, _| {
+ barrier.wait();
+ });
+ barrier.wait();
+ });
+}
+
+#[test]
+fn scope_spawn_broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+fn scope_spawn_broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
diff --git a/vendor/rayon-core/src/sleep/README.md b/vendor/rayon-core/src/sleep/README.md
index c62c3975d..55426c88c 100644
--- a/vendor/rayon-core/src/sleep/README.md
+++ b/vendor/rayon-core/src/sleep/README.md
@@ -75,7 +75,7 @@ These counters are adjusted as follows:
* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
* Subtle point: the thread that *awakens* the sleeping thread decrements the
counter, not the thread that is *sleeping*. This is because there is a delay
- between siganling a thread to wake and the thread actually waking:
+ between signaling a thread to wake and the thread actually waking:
decrementing the counter when awakening the thread means that other threads
that may be posting work will see the up-to-date value that much faster.
* When a thread finds work, exiting the idle state: decrement the inactive
@@ -137,7 +137,7 @@ The full protocol for a thread to fall asleep is as follows:
above, remembering the `final_value` of the JEC. It does one more search for work.
* If no work is found, the thread atomically:
* Checks the JEC to see that it has not changed from `final_value`.
- * If it has, then the thread goes back to searchin for work. We reset to
+ * If it has, then the thread goes back to searching for work. We reset to
just before we got sleepy, so that we will do one more search
before attending to sleep again (rather than searching for many rounds).
* Increments the number of sleeping threads by 1.
diff --git a/vendor/rayon-core/src/sleep/counters.rs b/vendor/rayon-core/src/sleep/counters.rs
index 29d3b18e2..f2a3de3e1 100644
--- a/vendor/rayon-core/src/sleep/counters.rs
+++ b/vendor/rayon-core/src/sleep/counters.rs
@@ -61,10 +61,12 @@ const THREADS_BITS: usize = 8;
/// Bits to shift to select the sleeping threads
/// (used with `select_bits`).
+#[allow(clippy::erasing_op)]
const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
/// Bits to shift to select the inactive threads
/// (used with `select_bits`).
+#[allow(clippy::identity_op)]
const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
/// Bits to shift to select the JEC
diff --git a/vendor/rayon-core/src/sleep/mod.rs b/vendor/rayon-core/src/sleep/mod.rs
index 2c4ac7c28..af7225a35 100644
--- a/vendor/rayon-core/src/sleep/mod.rs
+++ b/vendor/rayon-core/src/sleep/mod.rs
@@ -159,7 +159,7 @@ impl Sleep {
debug_assert!(!*is_blocked);
// Our latch was signalled. We should wake back up fully as we
- // wil have some stuff to do.
+ // will have some stuff to do.
if !latch.fall_asleep() {
self.logger.log(|| ThreadSleepInterruptedByLatch {
worker: worker_index,
diff --git a/vendor/rayon-core/src/spawn/mod.rs b/vendor/rayon-core/src/spawn/mod.rs
index dfa47818e..ae1f211ef 100644
--- a/vendor/rayon-core/src/spawn/mod.rs
+++ b/vendor/rayon-core/src/spawn/mod.rs
@@ -73,7 +73,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
@@ -91,19 +91,14 @@ where
// executed. This ref is decremented at the (*) below.
registry.increment_terminate_count();
- Box::new(HeapJob::new({
+ HeapJob::new({
let registry = Arc::clone(registry);
move || {
- match unwind::halt_unwinding(func) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(func);
registry.terminate(); // (*) permit registry to terminate now
}
- }))
- .as_job_ref()
+ })
+ .into_static_job_ref()
}
/// Fires off a task into the Rayon threadpool in the "static" or
@@ -148,7 +143,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
diff --git a/vendor/rayon-core/src/test.rs b/vendor/rayon-core/src/test.rs
index 600e58b11..46d63a7df 100644
--- a/vendor/rayon-core/src/test.rs
+++ b/vendor/rayon-core/src/test.rs
@@ -1,7 +1,5 @@
#![cfg(test)]
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
@@ -121,7 +119,6 @@ fn handler_panics_handled_correctly() {
}
#[test]
-#[allow(deprecated)]
fn check_config_build() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -144,7 +141,7 @@ fn configuration() {
let thread_name = move |i| format!("thread_name_{}", i);
// Ensure we can call all public methods on Configuration
- Configuration::new()
+ crate::Configuration::new()
.thread_name(thread_name)
.num_threads(5)
.panic_handler(panic_handler)
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs
index 5edaedc37..0fc06dd6b 100644
--- a/vendor/rayon-core/src/thread_pool/mod.rs
+++ b/vendor/rayon-core/src/thread_pool/mod.rs
@@ -3,12 +3,11 @@
//!
//! [`ThreadPool`]: struct.ThreadPool.html
+use crate::broadcast::{self, BroadcastContext};
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{scope, Scope};
use crate::{scope_fifo, ScopeFifo};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
@@ -57,7 +56,7 @@ impl ThreadPool {
#[deprecated(note = "Use `ThreadPoolBuilder::build`")]
#[allow(deprecated)]
/// Deprecated in favor of `ThreadPoolBuilder::build`.
- pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
+ pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
Self::build(configuration.into_builder()).map_err(Box::from)
}
@@ -111,6 +110,57 @@ impl ThreadPool {
self.registry.in_worker(|_, _| op())
}
+ /// Executes `op` within every thread in the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate within that
+ /// threadpool.
+ ///
+ /// Broadcasts are executed on each thread after they have exhausted their
+ /// local work queue, before they attempt work-stealing from other threads.
+ /// The goal of that strategy is to run everywhere in a timely manner
+ /// *without* being too disruptive to current work. There may be alternative
+ /// broadcast styles added in the future for more or less aggressive
+ /// injection, if the need arises.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic on one or more threads, exactly one panic
+ /// will be propagated, only after all threads have completed
+ /// (or panicked) their own `op`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// use std::sync::atomic::{AtomicUsize, Ordering};
+ ///
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
+ ///
+ /// // The argument gives context, including the index of each thread.
+ /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
+ /// assert_eq!(v, &[0, 1, 4, 9, 16]);
+ ///
+ /// // The closure can reference the local stack
+ /// let count = AtomicUsize::new(0);
+ /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
+ /// assert_eq!(count.into_inner(), 5);
+ /// }
+ /// ```
+ pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
+ where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::broadcast_in(op, &self.registry) }
+ }
+
/// Returns the (current) number of threads in the thread pool.
///
/// # Future compatibility note
@@ -277,6 +327,18 @@ impl ThreadPool {
// We assert that `self.registry` has not terminated.
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
}
+
+ /// Spawns an asynchronous task on every thread in this thread-pool. This task
+ /// will run in the implicit, global scope, which means that it may outlast the
+ /// current stack frame -- therefore, it cannot capture any references onto the
+ /// stack (you will likely need a `move` closure).
+ pub fn spawn_broadcast<OP>(&self, op: OP)
+ where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
+ }
}
impl Drop for ThreadPool {
diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs
index 8de65a5e4..ac750a6dc 100644
--- a/vendor/rayon-core/src/thread_pool/test.rs
+++ b/vendor/rayon-core/src/thread_pool/test.rs
@@ -4,8 +4,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
#[test]
@@ -197,7 +195,7 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
fn check_thread_pool_new() {
- let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
+ let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
}