summaryrefslogtreecommitdiffstats
path: root/vendor/rustc-rayon-core
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rustc-rayon-core')
-rw-r--r--vendor/rustc-rayon-core/.cargo-checksum.json2
-rw-r--r--vendor/rustc-rayon-core/Cargo.toml10
-rw-r--r--vendor/rustc-rayon-core/README.md2
-rw-r--r--vendor/rustc-rayon-core/build.rs7
-rw-r--r--vendor/rustc-rayon-core/src/broadcast/mod.rs152
-rw-r--r--vendor/rustc-rayon-core/src/broadcast/test.rs262
-rw-r--r--vendor/rustc-rayon-core/src/compile_fail/rc_return.rs8
-rw-r--r--vendor/rustc-rayon-core/src/compile_fail/rc_upvar.rs8
-rw-r--r--vendor/rustc-rayon-core/src/job.rs123
-rw-r--r--vendor/rustc-rayon-core/src/join/mod.rs7
-rw-r--r--vendor/rustc-rayon-core/src/join/test.rs6
-rw-r--r--vendor/rustc-rayon-core/src/latch.rs103
-rw-r--r--vendor/rustc-rayon-core/src/lib.rs76
-rw-r--r--vendor/rustc-rayon-core/src/log.rs64
-rw-r--r--vendor/rustc-rayon-core/src/registry.rs262
-rw-r--r--vendor/rustc-rayon-core/src/scope/mod.rs220
-rw-r--r--vendor/rustc-rayon-core/src/scope/test.rs110
-rw-r--r--vendor/rustc-rayon-core/src/sleep/README.md4
-rw-r--r--vendor/rustc-rayon-core/src/sleep/counters.rs2
-rw-r--r--vendor/rustc-rayon-core/src/sleep/mod.rs22
-rw-r--r--vendor/rustc-rayon-core/src/spawn/mod.rs20
-rw-r--r--vendor/rustc-rayon-core/src/spawn/test.rs12
-rw-r--r--vendor/rustc-rayon-core/src/test.rs13
-rw-r--r--vendor/rustc-rayon-core/src/thread_pool/mod.rs137
-rw-r--r--vendor/rustc-rayon-core/src/thread_pool/test.rs56
-rw-r--r--vendor/rustc-rayon-core/src/tlv.rs35
-rw-r--r--vendor/rustc-rayon-core/tests/double_init_fail.rs3
-rw-r--r--vendor/rustc-rayon-core/tests/init_zero_threads.rs1
-rw-r--r--vendor/rustc-rayon-core/tests/scoped_threadpool.rs3
-rw-r--r--vendor/rustc-rayon-core/tests/stack_overflow_crash.rs95
30 files changed, 1445 insertions, 380 deletions
diff --git a/vendor/rustc-rayon-core/.cargo-checksum.json b/vendor/rustc-rayon-core/.cargo-checksum.json
index 77b6ec303..f6e342c94 100644
--- a/vendor/rustc-rayon-core/.cargo-checksum.json
+++ b/vendor/rustc-rayon-core/.cargo-checksum.json
@@ -1 +1 @@
-{"files":{"Cargo.toml":"e72898647de608f5a787b4d4bb1c0d1e4eef5e6ac16eafba2c961f095fcf3be4","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"0621878e61f0d0fda054bcbe02df75192c28bde1ecc8289cbd86aeba2dd72720","README.md":"80ac59d116aeb2630bcd34de55c9a35c201e294330534d4afaeb048da3f27f54","build.rs":"fa31cb198b772600d100a7c403ddedccef637d2e6b2da431fa7f02ca41307fc6","src/compile_fail/mod.rs":"4d70256295bd64691a8c1994b323559cda1888e85f0b45ca55711541c257dcb6","src/compile_fail/quicksort_race1.rs":"35f498cda38f4eb6e00117f78ed68e0fe5a3fa61c25303d9c08a19bda345bc6c","src/compile_fail/quicksort_race2.rs":"cbb40030c7867cae34bb373b6ec5d97c2ac6de39bc917f47879b30eb423924bc","src/compile_fail/quicksort_race3.rs":"8403643e64c969306b1a9b1243378e6ccdd313b57e1878dbd31393618082fd35","src/compile_fail/rc_return.rs":"b6f1f821ef30086e2990db1ac58a92adaf31e57c23c587c7746d6303865c5c0f","src/compile_fail/rc_upvar.rs":"92a799b313fb88a0eb83fc1aaaea4b61604222b45e20f1c22ff5b092a03f8620","src/compile_fail/scope_join_bad.rs":"892959949f77cadfc07458473e7a290301182027ca64428df5a8ce887be0892b","src/job.rs":"b40f608eae25a67253eed82b7aeb51c3dbd02b67dcc53f0f5515508600f4409f","src/join/mod.rs":"b1bdece1512d8af9a19d7bbe1cc4a1caa4fd11f43b0be7761f4d20d67a9ce220","src/join/test.rs":"71e9a5fda81610d0a41004ccdc24dfd1b1ebe3f1f87d2ddf730c48e69659a234","src/latch.rs":"d32eb2caa0b4c5218152d85eebaf3c7013cc5ce542e4e1f087da1e755402582e","src/lib.rs":"f25df8638414aece9fb70ebae48407ea72599c5f1ad67f0e59f8d6d5869afe1b","src/log.rs":"8a25ed8661f5ae3317085a108f78bb5f5a2e40827dbde9b16d6b8a560a5f2c39","src/private.rs":"152f6d65ce4741616a1dec796b9442f78a018d38bb040f76c4cd85008333a3bb","src/registry.rs":"9134c3a1c9cbece2d4219b904a6f07bf199c19705b96cc231a5155b4edb30f42","src/scope/mod.rs":"bae707b50957975aab731efc25223e7d93d0b3ca80c0a31f954d847c7fff57f5","src/scope/test.rs":"34fb0af1286bd2bcc2f7cd03e2f0d35b3d2284074ce6829ba1c319bae83eb5bf","src/sleep/README.md":"f0b707808d88899c6cab75eb9c6ef999067360503fa7794a488e2c9cb9582f1d","src/sleep/counters.rs":"3488d3af9711db78697a42822a87b218aaf80f1c2a41abc50e61aacd47b72bd0","src/sleep/mod.rs":"de0de140b4fe335125cedc4517e075d6ca97f419caa06db395adcf949b45fa95","src/spawn/mod.rs":"83786adbf3ad17104372d156d0ed88c76ae6e8b0327c696016dc3258802f4c8f","src/spawn/test.rs":"836f5a39f35e037164a48a5dfcba7f01abde431fbf49e52e8776256249944d21","src/test.rs":"68334fad0e8ae154097383c6df3c2658dc969e37c88d7a57298dae6c84c491be","src/thread_pool/mod.rs":"535df28a4c37ebb2e5f9ebf7238db47a59b073fd3a5e4c6624675320235b4c86","src/thread_pool/test.rs":"c56e70402295d748fba1f679752d1b434811f8d54c2514a6ffb6284373740690","src/tlv.rs":"fe9997bb586218901355ffde92d377bd0372336de6fe84f26adf1f6bdd2bf920","src/unwind.rs":"7baa4511467d008b14856ea8de7ced92b9251c9df4854f69c81f7efc3cf0cd6c","src/worker_local.rs":"9a5a0c3b3dc1667cc1f7f34d2f3757869f692923b065ae34e1c187648e09278e","tests/double_init_fail.rs":"f6f6e1b45bdba6ef8f18d1981e5356a6d5ef2f3bbecd0c8ce4341c126a3a6f1d","tests/init_zero_threads.rs":"57f66d1662f7782848d45942faa613a608188eb400943e4efa4c3ec3375e1dac","tests/scope_join.rs":"56f570c4b6a01704aacf93e7f17f89fe0f40f46ed6f9ede517abfe9adaf91f83","tests/scoped_threadpool.rs":"0f6475f440a57b6f91ecdd720228395f4feaa6165b136558f65267e185fd7127","tests/simple_panic.rs":"916d40d36c1a0fad3e1dfb31550f0672641feab4b03d480f039143dbe2f2445f","tests/stack_overflow_crash.rs":"e8865d33d51a58f5c6639f457d91f82f2a4379cf5129094eaa521e95bad72d51"},"package":"02269144a0db9bb55cf5d4a41a5a0e95b334b0b78b08269018ca9b0250718c30"} \ No newline at end of file
+{"files":{"Cargo.toml":"8babe4c2193c3229b62847b22b8b188932035fd19b24d64ad275ca4b8caa3194","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"0621878e61f0d0fda054bcbe02df75192c28bde1ecc8289cbd86aeba2dd72720","README.md":"1179e1b0eae68794cb6292b8c884f81aae3007c44d22d88df0fdd402e729277d","src/broadcast/mod.rs":"d087b7055778e03cae02b78f5fbbd32ffc439a2acec1792ed4525c2ff1f39b45","src/broadcast/test.rs":"c42d5aa6a3d3a53614ac534811f0fe7a347f18912ecfd63d874a281dc215aca4","src/compile_fail/mod.rs":"4d70256295bd64691a8c1994b323559cda1888e85f0b45ca55711541c257dcb6","src/compile_fail/quicksort_race1.rs":"35f498cda38f4eb6e00117f78ed68e0fe5a3fa61c25303d9c08a19bda345bc6c","src/compile_fail/quicksort_race2.rs":"cbb40030c7867cae34bb373b6ec5d97c2ac6de39bc917f47879b30eb423924bc","src/compile_fail/quicksort_race3.rs":"8403643e64c969306b1a9b1243378e6ccdd313b57e1878dbd31393618082fd35","src/compile_fail/rc_return.rs":"197894803d8df58fc8005d90c86b90cd98f1972f1a4b57438516a565df35903f","src/compile_fail/rc_upvar.rs":"42d110429621f407ef0dada1306dab116583d2c782a99894204dd8e0ccd2312f","src/compile_fail/scope_join_bad.rs":"892959949f77cadfc07458473e7a290301182027ca64428df5a8ce887be0892b","src/job.rs":"b1c2508ce251ba1e10946fb127cb0ca7b639ee414a33b265d82e58b658bcc831","src/join/mod.rs":"9d07f6b69447ee006cfe603a43ea04aacf2322ffa717da103b9ff21b15085631","src/join/test.rs":"157db5306e8df89a8eea19dbba499f26c2f44d9803cb36a796c852a9a695821e","src/latch.rs":"2056effd8b1d71e1df2889c7a163570c975d25fff8404368b0e1554efeeab6c7","src/lib.rs":"d87a8232ac665c1d3445c7238fc646a1b61770d9540e80f736d22b919c67a0da","src/log.rs":"3f901d61125584a50b05892b7e690872bda15be2150b9c0595c6125664f4cf3e","src/private.rs":"152f6d65ce4741616a1dec796b9442f78a018d38bb040f76c4cd85008333a3bb","src/registry.rs":"264ed32c3ed079c106b18da6f382ff4b18c5326464740e8dee7e7233700fbe88","src/scope/mod.rs":"73b5c390a902dc203f1e760afae1e482244c60c6172491c20afc92234d4bd91c","src/scope/test.rs":"d4f068cae4ee4483b41bd3054582d96e74ced46eb57361e7510ef62d4318d340","src/sleep/README.md":"65252d4552cc54d94ccb38f7dc09bef00d64124da605b929154943c7425879dd","src/sleep/counters.rs":"2ce3052f05b3b75b1b96c6604fc0dfb6de867426981b37641410c068f92897bd","src/sleep/mod.rs":"3bbff8427fcdf5f880c47e86abae985f19f0109de0102d6edf6b893d37eae34d","src/spawn/mod.rs":"199a103d446216a085f70526a0bc28b903c70492bf4e25be0c654a424c43d983","src/spawn/test.rs":"a28f8943f28a4cef642b6429c538b1df879c9eb1db9927ce69b97c686bf81173","src/test.rs":"7d0dee06fcf41bddf77449a85cece44133f966a0622a31cf3ed110fbe83e094e","src/thread_pool/mod.rs":"e8feabb2e26b38b40de846f0ada52bc132314d4ff731d0160a1815573eda377c","src/thread_pool/test.rs":"cf63d45ae1f0e7fd3c6d5e4b2aafa8900338c141e04aba15711b02b4a71febb2","src/tlv.rs":"581eb845dd0c3f695fd11aa25e9fca7a9009ba0ca5f4625f81db279ebbd67b20","src/unwind.rs":"7baa4511467d008b14856ea8de7ced92b9251c9df4854f69c81f7efc3cf0cd6c","src/worker_local.rs":"9a5a0c3b3dc1667cc1f7f34d2f3757869f692923b065ae34e1c187648e09278e","tests/double_init_fail.rs":"8c208ce45e83ab1dfc5890353d5b2f06fc8005684ae622827a65d05abb35a072","tests/init_zero_threads.rs":"5c7f7e0e13e9ead3733253e30d6b52ac5ee66fd6c105999d096bdf31cfccaf95","tests/scope_join.rs":"56f570c4b6a01704aacf93e7f17f89fe0f40f46ed6f9ede517abfe9adaf91f83","tests/scoped_threadpool.rs":"9753467b3de37dd1d19327fe0075d3c4ba768430b97e7aface39627592e9b09a","tests/simple_panic.rs":"916d40d36c1a0fad3e1dfb31550f0672641feab4b03d480f039143dbe2f2445f","tests/stack_overflow_crash.rs":"87b962c66f301ac44f808d992d4e8b861305db0c282f256761a5075c9f018243"},"package":"67668daaf00e359c126f6dcb40d652d89b458a008c8afa727a42a2d20fca0b7f"} \ No newline at end of file
diff --git a/vendor/rustc-rayon-core/Cargo.toml b/vendor/rustc-rayon-core/Cargo.toml
index d8c281cf9..74639080c 100644
--- a/vendor/rustc-rayon-core/Cargo.toml
+++ b/vendor/rustc-rayon-core/Cargo.toml
@@ -10,14 +10,14 @@
# See Cargo.toml.orig for the original contents.
[package]
-edition = "2018"
+edition = "2021"
+rust-version = "1.59"
name = "rustc-rayon-core"
-version = "0.4.1"
+version = "0.5.0"
authors = [
"Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>",
]
-build = "build.rs"
description = "Core APIs for Rayon - fork for rustc"
documentation = "https://docs.rs/rustc-rayon-core/"
readme = "README.md"
@@ -32,10 +32,12 @@ categories = ["concurrency"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/rustc-rayon"
+[lib]
+name = "rayon_core"
+
[[test]]
name = "stack_overflow_crash"
path = "tests/stack_overflow_crash.rs"
-harness = false
[[test]]
name = "double_init_fail"
diff --git a/vendor/rustc-rayon-core/README.md b/vendor/rustc-rayon-core/README.md
index 94c667017..13b8a451e 100644
--- a/vendor/rustc-rayon-core/README.md
+++ b/vendor/rustc-rayon-core/README.md
@@ -10,4 +10,4 @@ Please see [Rayon Docs] for details about using Rayon.
[Rayon Docs]: https://docs.rs/rayon/
-Rayon-core currently requires `rustc 1.36.0` or greater.
+Rayon-core currently requires `rustc 1.59.0` or greater.
diff --git a/vendor/rustc-rayon-core/build.rs b/vendor/rustc-rayon-core/build.rs
deleted file mode 100644
index 8771b63fc..000000000
--- a/vendor/rustc-rayon-core/build.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-// We need a build script to use `link = "rayon-core"`. But we're not
-// *actually* linking to anything, just making sure that we're the only
-// rayon-core in use.
-fn main() {
- // we don't need to rebuild for anything else
- println!("cargo:rerun-if-changed=build.rs");
-}
diff --git a/vendor/rustc-rayon-core/src/broadcast/mod.rs b/vendor/rustc-rayon-core/src/broadcast/mod.rs
new file mode 100644
index 000000000..f9cfc47ac
--- /dev/null
+++ b/vendor/rustc-rayon-core/src/broadcast/mod.rs
@@ -0,0 +1,152 @@
+use crate::job::{ArcJob, StackJob};
+use crate::latch::LatchRef;
+use crate::registry::{Registry, WorkerThread};
+use crate::scope::ScopeLatch;
+use std::fmt;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+mod test;
+
+/// Executes `op` within every thread in the current threadpool. If this is
+/// called from a non-Rayon thread, it will execute in the global threadpool.
+/// Any attempts to use `join`, `scope`, or parallel iterators will then operate
+/// within that threadpool. When the call has completed on each thread, returns
+/// a vector containing all of their return values.
+///
+/// For more information, see the [`ThreadPool::broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.broadcast
+pub fn broadcast<OP, R>(op: OP) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ // We assert that current registry has not terminated.
+ unsafe { broadcast_in(op, &Registry::current()) }
+}
+
+/// Spawns an asynchronous task on every thread in this thread-pool. This task
+/// will run in the implicit, global scope, which means that it may outlast the
+/// current stack frame -- therefore, it cannot capture any references onto the
+/// stack (you will likely need a `move` closure).
+///
+/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.spawn_broadcast
+pub fn spawn_broadcast<OP>(op: OP)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ // We assert that current registry has not terminated.
+ unsafe { spawn_broadcast_in(op, &Registry::current()) }
+}
+
+/// Provides context to a closure called by `broadcast`.
+pub struct BroadcastContext<'a> {
+ worker: &'a WorkerThread,
+
+ /// Make sure to prevent auto-traits like `Send` and `Sync`.
+ _marker: PhantomData<&'a mut dyn Fn()>,
+}
+
+impl<'a> BroadcastContext<'a> {
+ pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
+ let worker_thread = WorkerThread::current();
+ assert!(!worker_thread.is_null());
+ f(BroadcastContext {
+ worker: unsafe { &*worker_thread },
+ _marker: PhantomData,
+ })
+ }
+
+ /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
+ #[inline]
+ pub fn index(&self) -> usize {
+ self.worker.index()
+ }
+
+ /// The number of threads receiving the broadcast in the thread pool.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Future versions of Rayon might vary the number of threads over time, but
+ /// this method will always return the number of threads which are actually
+ /// receiving your particular `broadcast` call.
+ #[inline]
+ pub fn num_threads(&self) -> usize {
+ self.worker.registry().num_threads()
+ }
+}
+
+impl<'a> fmt::Debug for BroadcastContext<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BroadcastContext")
+ .field("index", &self.index())
+ .field("num_threads", &self.num_threads())
+ .field("pool_id", &self.worker.registry().id())
+ .finish()
+ }
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function will not return until all
+/// threads have completed the `op`.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ let f = move |injected: bool| {
+ debug_assert!(injected);
+ BroadcastContext::with(&op)
+ };
+
+ let n_threads = registry.num_threads();
+ let current_thread = WorkerThread::current().as_ref();
+ let tlv = crate::tlv::get();
+ let latch = ScopeLatch::with_count(n_threads, current_thread);
+ let jobs: Vec<_> = (0..n_threads)
+ .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch)))
+ .collect();
+ let job_refs = jobs.iter().map(|job| job.as_job_ref());
+
+ registry.inject_broadcast(job_refs);
+
+ // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
+ latch.wait(current_thread);
+ jobs.into_iter().map(|job| job.into_result()).collect()
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function returns immediately after
+/// injecting the jobs.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ let job = ArcJob::new({
+ let registry = Arc::clone(registry);
+ move || {
+ registry.catch_unwind(|| BroadcastContext::with(&op));
+ registry.terminate(); // (*) permit registry to terminate now
+ }
+ });
+
+ let n_threads = registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| {
+ // Ensure that registry cannot terminate until this job has executed
+ // on each thread. This ref is decremented at the (*) above.
+ registry.increment_terminate_count();
+
+ ArcJob::as_static_job_ref(&job)
+ });
+
+ registry.inject_broadcast(job_refs);
+}
diff --git a/vendor/rustc-rayon-core/src/broadcast/test.rs b/vendor/rustc-rayon-core/src/broadcast/test.rs
new file mode 100644
index 000000000..3ae11f7f6
--- /dev/null
+++ b/vendor/rustc-rayon-core/src/broadcast/test.rs
@@ -0,0 +1,262 @@
+#![cfg(test)]
+
+use crate::ThreadPoolBuilder;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::{thread, time};
+
+#[test]
+fn broadcast_global() {
+ let v = crate::broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_global() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_pool() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_pool() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_self() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_self() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ pool2.broadcast(|_| {
+ pool1.broadcast(|_| {
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual_sleepy() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.broadcast(|_| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.broadcast(|_| {
+ thread::sleep(time::Duration::from_millis(100));
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual_sleepy() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.spawn_broadcast(move |_| {
+ thread::sleep(time::Duration::from_millis(100));
+ tx.send(()).unwrap();
+ })
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_one() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 1);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_many() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 4);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_sleep_race() {
+ let test_duration = time::Duration::from_secs(1);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let start = time::Instant::now();
+ while start.elapsed() < test_duration {
+ pool.broadcast(|ctx| {
+ // A slight spread of sleep duration increases the chance that one
+ // of the threads will race in the pool's idle sleep afterward.
+ thread::sleep(time::Duration::from_micros(ctx.index() as u64));
+ });
+ }
+}
+
+#[test]
+fn broadcast_after_spawn_broadcast() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+
+ // Queue a non-blocking spawn_broadcast.
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ // This blocking broadcast runs after all prior broadcasts.
+ crate::broadcast(|_| {});
+
+ // The spawn_broadcast **must** have run by now on all threads.
+ let mut v: Vec<_> = rx.try_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_after_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn on a thread-local deque.
+ crate::registry::in_worker(move |_, _| {
+ crate::spawn(move || tx.send(22).unwrap());
+ });
+
+ // Broadcast runs after the local deque is empty.
+ crate::broadcast(|_| {});
+
+ // The spawn **must** have run by now.
+ assert_eq!(22, rx.try_recv().unwrap());
+}
diff --git a/vendor/rustc-rayon-core/src/compile_fail/rc_return.rs b/vendor/rustc-rayon-core/src/compile_fail/rc_return.rs
index 164f8ce5e..93e3a6038 100644
--- a/vendor/rustc-rayon-core/src/compile_fail/rc_return.rs
+++ b/vendor/rustc-rayon-core/src/compile_fail/rc_return.rs
@@ -2,9 +2,7 @@
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
-}
+rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
``` */
mod left {}
@@ -13,9 +11,7 @@ mod left {}
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
-}
+rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
``` */
mod right {}
diff --git a/vendor/rustc-rayon-core/src/compile_fail/rc_upvar.rs b/vendor/rustc-rayon-core/src/compile_fail/rc_upvar.rs
index 62895bf22..d8aebcfcb 100644
--- a/vendor/rustc-rayon-core/src/compile_fail/rc_upvar.rs
+++ b/vendor/rustc-rayon-core/src/compile_fail/rc_upvar.rs
@@ -2,10 +2,8 @@
use std::rc::Rc;
-fn main() {
- let r = Rc::new(22);
- rayon_core::join(|| r.clone(), || r.clone());
- //~^ ERROR
-}
+let r = Rc::new(22);
+rayon_core::join(|| r.clone(), || r.clone());
+//~^ ERROR
``` */
diff --git a/vendor/rustc-rayon-core/src/job.rs b/vendor/rustc-rayon-core/src/job.rs
index d8aae8101..394c7576b 100644
--- a/vendor/rustc-rayon-core/src/job.rs
+++ b/vendor/rustc-rayon-core/src/job.rs
@@ -1,10 +1,12 @@
use crate::latch::Latch;
use crate::tlv;
+use crate::tlv::Tlv;
use crate::unwind;
use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
+use std::sync::Arc;
pub(super) enum JobResult<T> {
None,
@@ -21,7 +23,7 @@ pub(super) trait Job {
/// Unsafe: this may be called from a different thread than the one
/// which scheduled the job, so the implementer must ensure the
/// appropriate traits are met, whether `Send`, `Sync`, or both.
- unsafe fn execute(this: *const Self);
+ unsafe fn execute(this: *const ());
}
/// Effectively a Job trait object. Each JobRef **must** be executed
@@ -30,7 +32,6 @@ pub(super) trait Job {
/// Internally, we store the job's data in a `*const ()` pointer. The
/// true type is something like `*const StackJob<...>`, but we hide
/// it. We also carry the "execute fn" from the `Job` trait.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) struct JobRef {
pointer: *const (),
execute_fn: unsafe fn(*const ()),
@@ -46,17 +47,22 @@ impl JobRef {
where
T: Job,
{
- let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
-
// erase types:
JobRef {
pointer: data as *const (),
- execute_fn: mem::transmute(fn_ptr),
+ execute_fn: <T as Job>::execute,
}
}
+ /// Returns an opaque handle that can be saved and compared,
+ /// without making `JobRef` itself `Copy + Eq`.
+ #[inline]
+ pub(super) fn id(&self) -> impl Eq {
+ (self.pointer, self.execute_fn)
+ }
+
#[inline]
- pub(super) unsafe fn execute(&self) {
+ pub(super) unsafe fn execute(self) {
(self.execute_fn)(self.pointer)
}
}
@@ -74,7 +80,7 @@ where
pub(super) latch: L,
func: UnsafeCell<Option<F>>,
result: UnsafeCell<JobResult<R>>,
- tlv: usize,
+ tlv: Tlv,
}
impl<L, F, R> StackJob<L, F, R>
@@ -83,7 +89,7 @@ where
F: FnOnce(bool) -> R + Send,
R: Send,
{
- pub(super) fn new(tlv: usize, func: F, latch: L) -> StackJob<L, F, R> {
+ pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
StackJob {
latch,
func: UnsafeCell::new(Some(func)),
@@ -111,20 +117,13 @@ where
F: FnOnce(bool) -> R + Send,
R: Send,
{
- unsafe fn execute(this: *const Self) {
- fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
- move || func(true)
- }
-
- let this = &*this;
+ unsafe fn execute(this: *const ()) {
+ let this = &*(this as *const Self);
tlv::set(this.tlv);
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
- (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
- Ok(x) => JobResult::Ok(x),
- Err(x) => JobResult::Panic(x),
- };
- this.latch.set();
+ (*this.result.get()) = JobResult::call(func);
+ Latch::set(&this.latch);
mem::forget(abort);
}
}
@@ -139,27 +138,31 @@ pub(super) struct HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- job: UnsafeCell<Option<BODY>>,
- tlv: usize,
+ job: BODY,
+ tlv: Tlv,
}
impl<BODY> HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- pub(super) fn new(tlv: usize, func: BODY) -> Self {
- HeapJob {
- job: UnsafeCell::new(Some(func)),
- tlv,
- }
+ pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
+ Box::new(HeapJob { job, tlv })
}
/// Creates a `JobRef` from this job -- note that this hides all
/// lifetimes, so it is up to you to ensure that this JobRef
/// doesn't outlive any data that it closes over.
- pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
- let this: *const Self = mem::transmute(self);
- JobRef::new(this)
+ pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
+ JobRef::new(Box::into_raw(self))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { self.into_job_ref() }
}
}
@@ -167,15 +170,64 @@ impl<BODY> Job for HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- unsafe fn execute(this: *const Self) {
- let this: Box<Self> = mem::transmute(this);
+ unsafe fn execute(this: *const ()) {
+ let this = Box::from_raw(this as *mut Self);
tlv::set(this.tlv);
- let job = (*this.job.get()).take().unwrap();
- job();
+ (this.job)();
+ }
+}
+
+/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
+/// be turned into multiple `JobRef`s and called multiple times.
+pub(super) struct ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ job: BODY,
+}
+
+impl<BODY> ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ pub(super) fn new(job: BODY) -> Arc<Self> {
+ Arc::new(ArcJob { job })
+ }
+
+ /// Creates a `JobRef` from this job -- note that this hides all
+ /// lifetimes, so it is up to you to ensure that this JobRef
+ /// doesn't outlive any data that it closes over.
+ pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
+ JobRef::new(Arc::into_raw(Arc::clone(this)))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { Self::as_job_ref(this) }
+ }
+}
+
+impl<BODY> Job for ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ unsafe fn execute(this: *const ()) {
+ let this = Arc::from_raw(this as *mut Self);
+ (this.job)();
}
}
impl<T> JobResult<T> {
+ fn call(func: impl FnOnce(bool) -> T) -> Self {
+ match unwind::halt_unwinding(|| func(true)) {
+ Ok(x) => JobResult::Ok(x),
+ Err(x) => JobResult::Panic(x),
+ }
+ }
+
/// Convert the `JobResult` for a job that has finished (and hence
/// its JobResult is populated) into its return value.
///
@@ -211,10 +263,11 @@ impl JobFifo {
}
impl Job for JobFifo {
- unsafe fn execute(this: *const Self) {
+ unsafe fn execute(this: *const ()) {
// We "execute" a queue by executing its first job, FIFO.
+ let this = &*(this as *const Self);
loop {
- match (*this).inner.steal() {
+ match this.inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
diff --git a/vendor/rustc-rayon-core/src/join/mod.rs b/vendor/rustc-rayon-core/src/join/mod.rs
index 1fff3fa30..032eec9c4 100644
--- a/vendor/rustc-rayon-core/src/join/mod.rs
+++ b/vendor/rustc-rayon-core/src/join/mod.rs
@@ -1,7 +1,7 @@
use crate::job::StackJob;
use crate::latch::SpinLatch;
use crate::registry::{self, WorkerThread};
-use crate::tlv;
+use crate::tlv::{self, Tlv};
use crate::unwind;
use std::any::Any;
@@ -137,6 +137,7 @@ where
// long enough.
let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
+ let job_b_id = job_b_ref.id();
worker_thread.push(job_b_ref);
// Execute task a; hopefully b gets stolen in the meantime.
@@ -153,7 +154,7 @@ where
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
- if job == job_b_ref {
+ if job_b_id == job.id() {
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
@@ -190,7 +191,7 @@ unsafe fn join_recover_from_panic(
worker_thread: &WorkerThread,
job_b_latch: &SpinLatch<'_>,
err: Box<dyn Any + Send>,
- tlv: usize,
+ tlv: Tlv,
) -> ! {
worker_thread.wait_until(job_b_latch);
diff --git a/vendor/rustc-rayon-core/src/join/test.rs b/vendor/rustc-rayon-core/src/join/test.rs
index e7f287f6f..b303dbc81 100644
--- a/vendor/rustc-rayon-core/src/join/test.rs
+++ b/vendor/rustc-rayon-core/src/join/test.rs
@@ -47,6 +47,7 @@ fn sort() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
@@ -77,6 +78,7 @@ fn panic_propagate_both() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
@@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
@@ -94,6 +97,7 @@ fn join_context_both() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -104,6 +108,7 @@ fn join_context_neither() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;
@@ -127,6 +132,7 @@ fn join_context_second() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;
diff --git a/vendor/rustc-rayon-core/src/latch.rs b/vendor/rustc-rayon-core/src/latch.rs
index b84fbe371..de4327234 100644
--- a/vendor/rustc-rayon-core/src/latch.rs
+++ b/vendor/rustc-rayon-core/src/latch.rs
@@ -1,3 +1,5 @@
+use std::marker::PhantomData;
+use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;
@@ -37,10 +39,15 @@ pub(super) trait Latch {
///
/// Setting a latch triggers other threads to wake up and (in some
/// cases) complete. This may, in turn, cause memory to be
- /// allocated and so forth. One must be very careful about this,
+ /// deallocated and so forth. One must be very careful about this,
/// and it's typically better to read all the fields you will need
/// to access *before* a latch is set!
- fn set(&self);
+ ///
+ /// This function operates on `*const Self` instead of `&self` to allow it
+ /// to become dangling during this call. The caller must ensure that the
+ /// pointer is valid upon entry, and not invalidated during the call by any
+ /// actions other than `set` itself.
+ unsafe fn set(this: *const Self);
}
pub(super) trait AsCoreLatch {
@@ -123,8 +130,8 @@ impl CoreLatch {
/// doing some wakeups; those are encapsulated in the surrounding
/// latch code.
#[inline]
- fn set(&self) -> bool {
- let old_state = self.state.swap(SET, Ordering::AcqRel);
+ unsafe fn set(this: *const Self) -> bool {
+ let old_state = (*this).state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}
@@ -186,29 +193,29 @@ impl<'r> AsCoreLatch for SpinLatch<'r> {
impl<'r> Latch for SpinLatch<'r> {
#[inline]
- fn set(&self) {
+ unsafe fn set(this: *const Self) {
let cross_registry;
- let registry: &Registry = if self.cross {
+ let registry: &Registry = if (*this).cross {
// Ensure the registry stays alive while we notify it.
// Otherwise, it would be possible that we set the spin
// latch and the other thread sees it and exits, causing
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
- cross_registry = Arc::clone(self.registry);
- &*cross_registry
+ cross_registry = Arc::clone((*this).registry);
+ &cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
// thread which is performing `set` is itself ensuring
// that the registry stays alive. However, that doesn't
// include this *particular* `Arc` handle if the waiting
// thread then exits, so we must completely dereference it.
- &**self.registry
+ (*this).registry
};
- let target_worker_index = self.target_worker_index;
+ let target_worker_index = (*this).target_worker_index;
- // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
- if self.core_latch.set() {
+ // NOTE: Once we `set`, the target may proceed and invalidate `this`!
+ if CoreLatch::set(&(*this).core_latch) {
// Subtle: at this point, we can no longer read from
// `self`, because the thread owning this spin latch may
// have awoken and deallocated the latch. Therefore, we
@@ -255,10 +262,10 @@ impl LockLatch {
impl Latch for LockLatch {
#[inline]
- fn set(&self) {
- let mut guard = self.m.lock().unwrap();
+ unsafe fn set(this: *const Self) {
+ let mut guard = (*this).m.lock().unwrap();
*guard = true;
- self.v.notify_all();
+ (*this).v.notify_all();
}
}
@@ -286,9 +293,14 @@ pub(super) struct CountLatch {
impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
+ Self::with_count(1)
+ }
+
+ #[inline]
+ pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
core_latch: CoreLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
@@ -302,9 +314,9 @@ impl CountLatch {
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
- pub(super) fn set(&self) -> bool {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.core_latch.set();
+ pub(super) unsafe fn set(this: *const Self) -> bool {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ CoreLatch::set(&(*this).core_latch);
true
} else {
false
@@ -315,8 +327,12 @@ impl CountLatch {
/// the latch is set, then the specific worker thread is tickled,
/// which should be the one that owns this latch.
#[inline]
- pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
- if self.set() {
+ pub(super) unsafe fn set_and_tickle_one(
+ this: *const Self,
+ registry: &Registry,
+ target_worker_index: usize,
+ ) {
+ if Self::set(this) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
@@ -337,10 +353,10 @@ pub(super) struct CountLockLatch {
impl CountLockLatch {
#[inline]
- pub(super) fn new() -> CountLockLatch {
+ pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
@@ -357,19 +373,42 @@ impl CountLockLatch {
impl Latch for CountLockLatch {
#[inline]
- fn set(&self) {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.lock_latch.set();
+ unsafe fn set(this: *const Self) {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ LockLatch::set(&(*this).lock_latch);
+ }
+ }
+}
+
+/// `&L` without any implication of `dereferenceable` for `Latch::set`
+pub(super) struct LatchRef<'a, L> {
+ inner: *const L,
+ marker: PhantomData<&'a L>,
+}
+
+impl<L> LatchRef<'_, L> {
+ pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
+ LatchRef {
+ inner,
+ marker: PhantomData,
}
}
}
-impl<'a, L> Latch for &'a L
-where
- L: Latch,
-{
+unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
+
+impl<L> Deref for LatchRef<'_, L> {
+ type Target = L;
+
+ fn deref(&self) -> &L {
+ // SAFETY: if we have &self, the inner latch is still alive
+ unsafe { &*self.inner }
+ }
+}
+
+impl<L: Latch> Latch for LatchRef<'_, L> {
#[inline]
- fn set(&self) {
- L::set(self);
+ unsafe fn set(this: *const Self) {
+ L::set((*this).inner);
}
}
diff --git a/vendor/rustc-rayon-core/src/lib.rs b/vendor/rustc-rayon-core/src/lib.rs
index 3cf41a212..354bd8f40 100644
--- a/vendor/rustc-rayon-core/src/lib.rs
+++ b/vendor/rustc-rayon-core/src/lib.rs
@@ -26,7 +26,24 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
-//! ## Restricting multiple versions
+//! # Global fallback when threading is unsupported
+//!
+//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
+//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
+//! targets are notable examples of this. Rather than panicking on the unsupported error when
+//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
+//!
+//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
+//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
+//! there is no other thread to share the work. However, since the pool is not running independent
+//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
+//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
+//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
+//! can also volunteer execution time.
+//!
+//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
+//!
+//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
@@ -44,7 +61,6 @@
//! conflicting requirements will need to be resolved before the build will
//! succeed.
-#![doc(html_root_url = "https://docs.rs/rayon-core/1.9")]
#![warn(rust_2018_idioms)]
use std::any::Any;
@@ -60,6 +76,7 @@ mod log;
#[macro_use]
mod private;
+mod broadcast;
mod job;
mod join;
mod latch;
@@ -76,6 +93,7 @@ mod test;
pub mod tlv;
+pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::registry::{mark_blocked, mark_unblocked, Registry};
@@ -85,6 +103,7 @@ pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
+pub use self::thread_pool::{yield_local, yield_now, Yield};
pub use worker_local::WorkerLocal;
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
@@ -196,6 +215,7 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
///
/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
#[deprecated(note = "Use `ThreadPoolBuilder`")]
+#[derive(Default)]
pub struct Configuration {
builder: ThreadPoolBuilder,
}
@@ -294,7 +314,7 @@ impl ThreadPoolBuilder {
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -369,7 +389,7 @@ impl<S> ThreadPoolBuilder<S> {
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -414,6 +434,39 @@ impl<S> ThreadPoolBuilder<S> {
/// Ok(())
/// }
/// ```
+ ///
+ /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
+ /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
+ /// [`build_scoped`](#method.build_scoped).
+ ///
+ /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// std::thread::scope(|scope| {
+ /// let pool = rayon::ThreadPoolBuilder::new()
+ /// .spawn_handler(|thread| {
+ /// let mut builder = std::thread::Builder::new();
+ /// if let Some(name) = thread.name() {
+ /// builder = builder.name(name.to_string());
+ /// }
+ /// if let Some(size) = thread.stack_size() {
+ /// builder = builder.stack_size(size);
+ /// }
+ /// builder.spawn_scoped(scope, || {
+ /// // Add any scoped initialization here, then run!
+ /// thread.run()
+ /// })?;
+ /// Ok(())
+ /// })
+ /// .build()?;
+ ///
+ /// pool.install(|| println!("Hello from my custom scoped thread!"));
+ /// Ok(())
+ /// })
+ /// }
+ /// ```
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
@@ -559,7 +612,7 @@ impl<S> ThreadPoolBuilder<S> {
/// to true, however, workers will prefer to execute in a
/// *breadth-first* fashion -- that is, they will search for jobs at
/// the *bottom* of their local deque. (At present, workers *always*
- /// steal from the bottom of other worker's deques, regardless of
+ /// steal from the bottom of other workers' deques, regardless of
/// the setting of this flag.)
///
/// If you think of the tasks as a tree, where a parent task
@@ -747,6 +800,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}
+
+ fn is_unsupported(&self) -> bool {
+ matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
+ }
}
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
@@ -833,15 +890,6 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
}
#[allow(deprecated)]
-impl Default for Configuration {
- fn default() -> Self {
- Configuration {
- builder: Default::default(),
- }
- }
-}
-
-#[allow(deprecated)]
impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.builder.fmt(f)
diff --git a/vendor/rustc-rayon-core/src/log.rs b/vendor/rustc-rayon-core/src/log.rs
index e1ff827df..7b6daf0ab 100644
--- a/vendor/rustc-rayon-core/src/log.rs
+++ b/vendor/rustc-rayon-core/src/log.rs
@@ -93,6 +93,9 @@ pub(super) enum Event {
/// A job was removed from the global queue.
JobUninjected { worker: usize },
+ /// A job was broadcasted to N threads.
+ JobBroadcast { count: usize },
+
/// When announcing a job, this was the value of the counters we observed.
///
/// No effect on thread state, just a debugging event.
@@ -124,15 +127,15 @@ impl Logger {
let (sender, receiver) = crossbeam_channel::unbounded();
- if env_log.starts_with("tail:") {
- let filename = env_log["tail:".len()..].to_string();
+ if let Some(filename) = env_log.strip_prefix("tail:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
});
} else if env_log == "all" {
::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
- } else if env_log.starts_with("profile:") {
- let filename = env_log["profile:".len()..].to_string();
+ } else if let Some(filename) = env_log.strip_prefix("profile:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
});
@@ -140,9 +143,9 @@ impl Logger {
panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
}
- return Logger {
+ Logger {
sender: Some(sender),
- };
+ }
}
fn disabled() -> Logger {
@@ -175,19 +178,12 @@ impl Logger {
let timeout = std::time::Duration::from_secs(30);
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- break;
- } else {
- events.push(event);
- }
- }
-
- Err(_) => break,
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ break;
}
+ events.push(event);
if events.len() == capacity {
break;
}
@@ -219,31 +215,25 @@ impl Logger {
let mut skipped = false;
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- // We ignore Flush events in tail mode --
- // we're really just looking for
- // deadlocks.
- continue;
- } else {
- if events.len() == capacity {
- let event = events.pop_front().unwrap();
- state.simulate(&event);
- skipped = true;
- }
-
- events.push_back(event);
- }
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ // We ignore Flush events in tail mode --
+ // we're really just looking for
+ // deadlocks.
+ continue;
+ } else {
+ if events.len() == capacity {
+ let event = events.pop_front().unwrap();
+ state.simulate(&event);
+ skipped = true;
}
- Err(_) => break,
+ events.push_back(event);
}
}
if skipped {
- write!(writer, "...\n").unwrap();
+ writeln!(writer, "...").unwrap();
skipped = false;
}
@@ -417,7 +407,7 @@ impl SimulatorState {
}
}
- write!(w, "\n")?;
+ writeln!(w)?;
Ok(())
}
}
diff --git a/vendor/rustc-rayon-core/src/registry.rs b/vendor/rustc-rayon-core/src/registry.rs
index 24f73f319..15ceb6b0c 100644
--- a/vendor/rustc-rayon-core/src/registry.rs
+++ b/vendor/rustc-rayon-core/src/registry.rs
@@ -1,15 +1,15 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
+use crate::tlv::Tlv;
use crate::unwind;
use crate::{
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
- ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+ ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
-use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
@@ -17,10 +17,8 @@ use std::hash::Hasher;
use std::io;
use std::mem;
use std::ptr;
-#[allow(deprecated)]
-use std::sync::atomic::ATOMIC_USIZE_INIT;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Once};
+use std::sync::{Arc, Mutex, Once};
use std::thread;
use std::usize;
@@ -30,6 +28,7 @@ pub struct ThreadBuilder {
name: Option<String>,
stack_size: Option<usize>,
worker: Worker<JobRef>,
+ stealer: Stealer<JobRef>,
registry: Arc<Registry>,
index: usize,
}
@@ -42,7 +41,7 @@ impl ThreadBuilder {
/// Gets the string that was specified by `ThreadPoolBuilder::name()`.
pub fn name(&self) -> Option<&str> {
- self.name.as_ref().map(String::as_str)
+ self.name.as_deref()
}
/// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
@@ -53,7 +52,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.registry, self.index) }
+ unsafe { main_loop(self) }
}
}
@@ -136,6 +135,7 @@ pub struct Registry {
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
+ broadcasts: Mutex<Vec<Worker<JobRef>>>,
panic_handler: Option<Box<PanicHandler>>,
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
start_handler: Option<Box<StartHandler>>,
@@ -169,7 +169,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -203,6 +203,46 @@ where
result
}
+fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
+ let result = Registry::new(ThreadPoolBuilder::new());
+
+ // If we're running in an environment that doesn't support threads at all, we can fall back to
+ // using the current thread alone. This is crude, and probably won't work for non-blocking
+ // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
+ //
+ // Notably, this allows current WebAssembly targets to work even though their threading support
+ // is stubbed out, and we won't have to change anything if they do add real threading.
+ let unsupported = matches!(&result, Err(e) if e.is_unsupported());
+ if unsupported && WorkerThread::current().is_null() {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(1)
+ .spawn_handler(|thread| {
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+
+ Ok(())
+ });
+
+ let fallback_result = Registry::new(builder);
+ if fallback_result.is_ok() {
+ return fallback_result;
+ }
+ }
+
+ result
+}
+
struct Terminator<'a>(&'a Arc<Registry>);
impl<'a> Drop for Terminator<'a> {
@@ -236,12 +276,21 @@ impl Registry {
})
.unzip();
+ let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
+ .map(|_| {
+ let worker = Worker::new_fifo();
+ let stealer = worker.stealer();
+ (worker, stealer)
+ })
+ .unzip();
+
let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: Injector::new(),
+ broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
deadlock_handler: builder.take_deadlock_handler(),
@@ -254,12 +303,13 @@ impl Registry {
// If we return early or panic, make sure to terminate existing threads.
let t1000 = Terminator(&registry);
- for (index, worker) in workers.into_iter().enumerate() {
+ for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
registry: Arc::clone(&registry),
worker,
+ stealer,
index,
};
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
@@ -329,19 +379,14 @@ impl Registry {
self.thread_infos.len()
}
- pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
- match self.panic_handler {
- Some(ref handler) => {
- // If the customizable panic handler itself panics,
- // then we abort.
- let abort_guard = unwind::AbortIfPanic;
+ pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
+ if let Err(err) = unwind::halt_unwinding(f) {
+ // If there is no handler, or if that handler itself panics, then we abort.
+ let abort_guard = unwind::AbortIfPanic;
+ if let Some(ref handler) = self.panic_handler {
handler(err);
mem::forget(abort_guard);
}
- None => {
- // Default panic handler aborts.
- let _ = unwind::AbortIfPanic; // let this drop.
- }
}
}
@@ -392,18 +437,16 @@ impl Registry {
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
- self.inject(&[job_ref]);
+ self.inject(job_ref);
}
}
}
/// Push a job into the "external jobs" queue; it will be taken by
- /// whatever worker has nothing to do. Use this is you know that
+ /// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- self.log(|| JobsInjected {
- count: injected_jobs.len(),
- });
+ pub(super) fn inject(&self, injected_job: JobRef) {
+ self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
@@ -418,12 +461,8 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
- for &job_ref in injected_jobs {
- self.injected_jobs.push(job_ref);
- }
-
- self.sleep
- .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ self.injected_jobs.push(injected_job);
+ self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
}
pub(crate) fn has_injected_job(&self) -> bool {
@@ -445,6 +484,40 @@ impl Registry {
}
}
+ /// Push a job into each thread's own "external jobs" queue; it will be
+ /// executed only on that thread, when it has nothing else to do locally,
+ /// before it tries to steal other work.
+ ///
+ /// **Panics** if not given exactly as many jobs as there are threads.
+ pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
+ assert_eq!(self.num_threads(), injected_jobs.len());
+ self.log(|| JobBroadcast {
+ count: self.num_threads(),
+ });
+ {
+ let broadcasts = self.broadcasts.lock().unwrap();
+
+ // It should not be possible for `state.terminate` to be true
+ // here. It is only set to true when the user creates (and
+ // drops) a `ThreadPool`; and, in that case, they cannot be
+ // calling `inject_broadcast()` later, since they dropped their
+ // `ThreadPool`.
+ debug_assert_ne!(
+ self.terminate_count.load(Ordering::Acquire),
+ 0,
+ "inject_broadcast() sees state.terminate as true"
+ );
+
+ assert_eq!(broadcasts.len(), injected_jobs.len());
+ for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
+ worker.push(job_ref);
+ }
+ }
+ for i in 0..self.num_threads() {
+ self.sleep.notify_worker_latch_is_set(i);
+ }
+ }
+
/// If already in a worker-thread of this registry, just execute `op`.
/// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
/// completes and return its return value. If `op` panics, that panic will
@@ -482,15 +555,15 @@ impl Registry {
// This thread isn't a member of *any* thread pool, so just block.
debug_assert!(WorkerThread::current().is_null());
let job = StackJob::new(
- 0,
+ Tlv::null(),
|injected| {
let worker_thread = WorkerThread::current();
assert!(injected && !worker_thread.is_null());
op(&*worker_thread, true)
},
- l,
+ LatchRef::new(l),
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
self.release_thread();
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
self.acquire_thread();
@@ -513,7 +586,7 @@ impl Registry {
debug_assert!(current_thread.registry().id() != self.id());
let latch = SpinLatch::cross(current_thread);
let job = StackJob::new(
- 0,
+ Tlv::null(),
|injected| {
let worker_thread = WorkerThread::current();
assert!(injected && !worker_thread.is_null());
@@ -521,7 +594,7 @@ impl Registry {
},
latch,
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
current_thread.wait_until(&job.latch);
job.into_result()
}
@@ -561,7 +634,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- thread_info.terminate.set_and_tickle_one(self, i);
+ unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -636,6 +709,9 @@ pub(super) struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,
+ /// the "stealer" half of the worker's broadcast deque
+ stealer: Stealer<JobRef>,
+
/// local queue used for `spawn_fifo` indirection
fifo: JobFifo,
@@ -653,7 +729,20 @@ pub(super) struct WorkerThread {
// worker is fully unwound. Using an unsafe pointer avoids the need
// for a RefCell<T> etc.
thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
+}
+
+impl From<ThreadBuilder> for WorkerThread {
+ fn from(thread: ThreadBuilder) -> Self {
+ Self {
+ worker: thread.worker,
+ stealer: thread.stealer,
+ fifo: JobFifo::new(),
+ index: thread.index,
+ rng: XorShift64Star::new(),
+ registry: thread.registry,
+ }
+ }
}
impl Drop for WorkerThread {
@@ -726,14 +815,25 @@ impl WorkerThread {
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
- pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ pub(super) fn take_local_job(&self) -> Option<JobRef> {
let popped_job = self.worker.pop();
if popped_job.is_some() {
self.log(|| JobPopped { worker: self.index });
+ return popped_job;
}
- popped_job
+ loop {
+ match self.stealer.steal() {
+ Steal::Success(job) => return Some(job),
+ Steal::Empty => return None,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ pub(super) fn has_injected_job(&self) -> bool {
+ !self.stealer.is_empty() || self.registry.has_injected_job()
}
/// Wait until the latch is set. Try to keep busy by popping and
@@ -757,23 +857,14 @@ impl WorkerThread {
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- if let Some(job) = self
- .take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
- {
+ if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
} else {
self.registry
.sleep
- .no_work_found(&mut idle_state, latch, &self.registry)
+ .no_work_found(&mut idle_state, latch, &self)
}
}
@@ -789,6 +880,37 @@ impl WorkerThread {
mem::forget(abort_guard); // successful execution, do not abort
}
+ fn find_work(&self) -> Option<JobRef> {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ self.take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ }
+
+ pub(super) fn yield_now(&self) -> Yield {
+ match self.find_work() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
+ pub(super) fn yield_local(&self) -> Yield {
+ match self.take_local_job() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
#[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
@@ -798,7 +920,7 @@ impl WorkerThread {
///
/// This should only be done as a last resort, when there is no
/// local work to do.
- unsafe fn steal(&self) -> Option<JobRef> {
+ fn steal(&self) -> Option<JobRef> {
// we only steal when we don't have any work to do locally
debug_assert!(self.local_deque_is_empty());
@@ -841,19 +963,14 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
- let worker_thread = &WorkerThread {
- worker,
- fifo: JobFifo::new(),
- index,
- rng: XorShift64Star::new(),
- registry,
- };
+unsafe fn main_loop(thread: ThreadBuilder) {
+ let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
// let registry know we are ready to do work
- registry.thread_infos[index].primed.set();
+ Latch::set(&registry.thread_infos[index].primed);
// Worker threads should not panic. If they do, just abort, as the
// internal state of the threadpool is corrupted. Note that if
@@ -862,12 +979,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we started a thread.
if let Some(ref handler) = registry.start_handler {
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
}
let my_terminate_latch = &registry.thread_infos[index].terminate;
@@ -882,7 +994,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
- registry.thread_infos[index].stopped.set();
+ Latch::set(&registry.thread_infos[index].stopped);
// Normal termination, do not abort.
mem::forget(abort_guard);
@@ -891,12 +1003,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
// We're already exiting the thread, there's nothing else to do.
}
@@ -921,7 +1028,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
- global_registry().in_worker_cold(op)
+ global_registry().in_worker(op)
}
}
}
@@ -940,8 +1047,7 @@ impl XorShift64Star {
let mut seed = 0;
while seed == 0 {
let mut hasher = DefaultHasher::new();
- #[allow(deprecated)]
- static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
seed = hasher.finish();
}
diff --git a/vendor/rustc-rayon-core/src/scope/mod.rs b/vendor/rustc-rayon-core/src/scope/mod.rs
index 5a5b7b718..1b74f274d 100644
--- a/vendor/rustc-rayon-core/src/scope/mod.rs
+++ b/vendor/rustc-rayon-core/src/scope/mod.rs
@@ -5,15 +5,16 @@
//! [`in_place_scope()`]: fn.in_place_scope.html
//! [`join()`]: ../join/join.fn.html
-use crate::job::{HeapJob, JobFifo};
+use crate::broadcast::BroadcastContext;
+use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, CountLockLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
-use crate::tlv;
+use crate::tlv::{self, Tlv};
use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
@@ -39,7 +40,7 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
-enum ScopeLatch {
+pub(super) enum ScopeLatch {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
@@ -78,7 +79,7 @@ struct ScopeBase<'scope> {
marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
/// The TLV at the scope's creation. Used to set the TLV for spawned jobs.
- tlv: usize,
+ tlv: Tlv,
}
/// Creates a "fork-join" scope `s` and invokes the closure with a
@@ -131,7 +132,7 @@ struct ScopeBase<'scope> {
/// Task execution potentially starts as soon as `spawn()` is called.
/// The task will end sometime before `scope()` returns. Note that the
/// *closure* given to scope may return much earlier. In general
-/// the lifetime of a scope created like `scope(body) goes something like this:
+/// the lifetime of a scope created like `scope(body)` goes something like this:
///
/// - Scope begins when `scope(body)` is called
/// - Scope body `body()` is invoked
@@ -245,7 +246,7 @@ struct ScopeBase<'scope> {
/// });
///
/// // That closure is fine, but now we can't use `ok` anywhere else,
-/// // since it is owend by the previous task:
+/// // since it is owned by the previous task:
/// // s.spawn(|_| println!("ok: {:?}", ok));
/// });
/// ```
@@ -542,18 +543,37 @@ impl<'scope> Scope<'scope> {
where
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(self.base.tlv, move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // Since `Scope` implements `Sync`, we can't be sure that we're still in a
- // thread of this pool, so we can't just push to the local worker thread.
- // Also, this might be an in-place scope.
- self.base.registry.inject_or_push(job_ref);
- }
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(self.base.tlv, move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // Since `Scope` implements `Sync`, we can't be sure that we're still in a
+ // thread of this pool, so we can't just push to the local worker thread.
+ // Also, this might be an in-place scope.
+ self.base.registry.inject_or_push(job_ref);
+ }
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
}
}
@@ -583,24 +603,44 @@ impl<'scope> ScopeFifo<'scope> {
where
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(self.base.tlv, move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // If we're in the pool, use our scope's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match self.base.registry.current_thread() {
- Some(worker) => {
- let fifo = &self.fifos[worker.index()];
- worker.push(fifo.push(job_ref));
- }
- None => self.base.registry.inject(&[job_ref]),
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(self.base.tlv, move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // If we're in the pool, use our scope's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match self.base.registry.current_thread() {
+ Some(worker) => {
+ let fifo = &self.fifos[worker.index()];
+ // SAFETY: this job will execute before the scope ends.
+ unsafe { worker.push(fifo.push(job_ref)) };
}
+ None => self.base.registry.inject(job_ref),
}
}
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
+ }
}
impl<'scope> ScopeBase<'scope> {
@@ -624,13 +664,36 @@ impl<'scope> ScopeBase<'scope> {
self.job_completed_latch.increment();
}
+ fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
+ where
+ FUNC: FnOnce() + Send + 'scope,
+ {
+ unsafe {
+ self.increment();
+ job.into_job_ref()
+ }
+ }
+
+ fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
+ where
+ FUNC: Fn() + Send + Sync + 'scope,
+ {
+ let n_threads = self.registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| unsafe {
+ self.increment();
+ ArcJob::as_job_ref(&job)
+ });
+
+ self.registry.inject_broadcast(job_refs);
+ }
+
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
where
FUNC: FnOnce() -> R,
{
- let result = self.execute_job_closure(func);
+ let result = unsafe { Self::execute_job_closure(self, func) };
self.job_completed_latch.wait(owner);
// Restore the TLV if we ran some jobs while waiting
@@ -642,28 +705,28 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- fn execute_job<FUNC>(&self, func: FUNC)
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
where
FUNC: FnOnce(),
{
- let _: Option<()> = self.execute_job_closure(func);
+ let _: Option<()> = Self::execute_job_closure(this, func);
}
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_latch.set();
+ Latch::set(&(*this).job_completed_latch);
Some(r)
}
Err(err) => {
- self.job_panicked(err);
- self.job_completed_latch.set();
+ (*this).job_panicked(err);
+ Latch::set(&(*this).job_completed_latch);
None
}
}
@@ -671,14 +734,20 @@ impl<'scope> ScopeBase<'scope> {
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
- let nil = ptr::null_mut();
- let mut err = Box::new(err); // box up the fat ptr
- if self
- .panic
- .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- mem::forget(err); // ownership now transferred into self.panic
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
}
}
@@ -700,14 +769,18 @@ impl<'scope> ScopeBase<'scope> {
impl ScopeLatch {
fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
match owner {
Some(owner) => ScopeLatch::Stealing {
- latch: CountLatch::new(),
+ latch: CountLatch::with_count(count),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => ScopeLatch::Blocking {
- latch: CountLockLatch::new(),
+ latch: CountLockLatch::with_count(count),
},
}
}
@@ -719,18 +792,7 @@ impl ScopeLatch {
}
}
- fn set(&self) {
- match self {
- ScopeLatch::Stealing {
- latch,
- registry,
- worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
- }
- }
-
- fn wait(&self, owner: Option<&WorkerThread>) {
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match self {
ScopeLatch::Stealing {
latch,
@@ -747,6 +809,19 @@ impl ScopeLatch {
}
}
+impl Latch for ScopeLatch {
+ unsafe fn set(this: *const Self) {
+ match &*this {
+ ScopeLatch::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
+ ScopeLatch::Blocking { latch } => Latch::set(latch),
+ }
+ }
+}
+
impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scope")
@@ -782,3 +857,22 @@ impl fmt::Debug for ScopeLatch {
}
}
}
+
+/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
+///
+/// Unsafe code is still required to dereference the pointer, but that's fine in
+/// scope jobs that are guaranteed to execute before the scope ends.
+struct ScopePtr<T>(*const T);
+
+// SAFETY: !Send for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Send for ScopePtr<T> {}
+
+// SAFETY: !Sync for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Sync for ScopePtr<T> {}
+
+impl<T> ScopePtr<T> {
+ // Helper to avoid disjoint captures of `scope_ptr.0`
+ unsafe fn as_ref(&self) -> &T {
+ &*self.0
+ }
+}
diff --git a/vendor/rustc-rayon-core/src/scope/test.rs b/vendor/rustc-rayon-core/src/scope/test.rs
index 88f9e9548..ad8c4af0b 100644
--- a/vendor/rustc-rayon-core/src/scope/test.rs
+++ b/vendor/rustc-rayon-core/src/scope/test.rs
@@ -6,7 +6,7 @@ use rand_xorshift::XorShiftRng;
use std::cmp;
use std::iter::once;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex;
+use std::sync::{Barrier, Mutex};
use std::vec;
#[test]
@@ -42,7 +42,7 @@ fn scope_divide_and_conquer() {
scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
let counter_s = &AtomicUsize::new(0);
- divide_and_conquer_seq(&counter_s, 1024);
+ divide_and_conquer_seq(counter_s, 1024);
let p = counter_p.load(Ordering::SeqCst);
let s = counter_s.load(Ordering::SeqCst);
@@ -75,7 +75,7 @@ struct Tree<T: Send> {
}
impl<T: Send> Tree<T> {
- fn iter<'s>(&'s self) -> vec::IntoIter<&'s T> {
+ fn iter(&self) -> vec::IntoIter<&T> {
once(&self.value)
.chain(self.children.iter().flat_map(Tree::iter))
.collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
@@ -148,6 +148,7 @@ fn update_tree() {
/// linearly with N. We test this by some unsafe hackery and
/// permitting an approx 10% change with a 10x input change.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn linear_stack_growth() {
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = builder.build().unwrap();
@@ -213,6 +214,7 @@ fn panic_propagate_nested_scope_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_1() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -227,6 +229,7 @@ fn panic_propagate_still_execute_1() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_2() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -241,6 +244,7 @@ fn panic_propagate_still_execute_2() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_3() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -255,6 +259,7 @@ fn panic_propagate_still_execute_3() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_4() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -292,6 +297,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_order!(scope => spawn);
@@ -300,6 +306,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_order!(scope_fifo => spawn_fifo);
@@ -334,6 +341,7 @@ macro_rules! test_nested_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_nested_order!(scope => spawn, scope => spawn);
@@ -342,6 +350,7 @@ fn nested_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
@@ -350,6 +359,7 @@ fn nested_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
@@ -361,6 +371,7 @@ fn nested_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
@@ -403,6 +414,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -412,6 +424,7 @@ fn mixed_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
let expected = vec![-1, 0, -2, 1, -3, 2, 3];
@@ -419,6 +432,7 @@ fn mixed_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -428,6 +442,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
@@ -513,3 +528,92 @@ fn mixed_lifetime_scope_fifo() {
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}
+
+#[test]
+fn scope_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_fifo_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope_fifo(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_spawn_broadcast_nested() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|s, _| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * n * (n - 1) / 2);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_barrier() {
+ let barrier = Barrier::new(8);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.in_place_scope(|s| {
+ s.spawn_broadcast(|_, _| {
+ barrier.wait();
+ });
+ barrier.wait();
+ });
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
diff --git a/vendor/rustc-rayon-core/src/sleep/README.md b/vendor/rustc-rayon-core/src/sleep/README.md
index bcd74af97..6f27df51d 100644
--- a/vendor/rustc-rayon-core/src/sleep/README.md
+++ b/vendor/rustc-rayon-core/src/sleep/README.md
@@ -75,7 +75,7 @@ These counters are adjusted as follows:
* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
* Subtle point: the thread that *awakens* the sleeping thread decrements the
counter, not the thread that is *sleeping*. This is because there is a delay
- between siganling a thread to wake and the thread actually waking:
+ between signaling a thread to wake and the thread actually waking:
decrementing the counter when awakening the thread means that other threads
that may be posting work will see the up-to-date value that much faster.
* When a thread finds work, exiting the idle state: decrement the inactive
@@ -137,7 +137,7 @@ The full protocol for a thread to fall asleep is as follows:
above, remembering the `final_value` of the JEC. It does one more search for work.
* If no work is found, the thread atomically:
* Checks the JEC to see that it has not changed from `final_value`.
- * If it has, then the thread goes back to searchin for work. We reset to
+ * If it has, then the thread goes back to searching for work. We reset to
just before we got sleepy, so that we will do one more search
before attending to sleep again (rather than searching for many rounds).
* Increments the number of sleeping threads by 1.
diff --git a/vendor/rustc-rayon-core/src/sleep/counters.rs b/vendor/rustc-rayon-core/src/sleep/counters.rs
index 29d3b18e2..f2a3de3e1 100644
--- a/vendor/rustc-rayon-core/src/sleep/counters.rs
+++ b/vendor/rustc-rayon-core/src/sleep/counters.rs
@@ -61,10 +61,12 @@ const THREADS_BITS: usize = 8;
/// Bits to shift to select the sleeping threads
/// (used with `select_bits`).
+#[allow(clippy::erasing_op)]
const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
/// Bits to shift to select the inactive threads
/// (used with `select_bits`).
+#[allow(clippy::identity_op)]
const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
/// Bits to shift to select the JEC
diff --git a/vendor/rustc-rayon-core/src/sleep/mod.rs b/vendor/rustc-rayon-core/src/sleep/mod.rs
index fb113fac0..96e1a68be 100644
--- a/vendor/rustc-rayon-core/src/sleep/mod.rs
+++ b/vendor/rustc-rayon-core/src/sleep/mod.rs
@@ -4,7 +4,7 @@
use crate::latch::CoreLatch;
use crate::log::Event::*;
use crate::log::Logger;
-use crate::registry::Registry;
+use crate::registry::WorkerThread;
use crate::DeadlockHandler;
use crossbeam_utils::CachePadded;
use std::sync::atomic::Ordering;
@@ -161,7 +161,7 @@ impl Sleep {
&self,
idle_state: &mut IdleState,
latch: &CoreLatch,
- registry: &Registry,
+ thread: &WorkerThread,
) {
if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
@@ -175,7 +175,7 @@ impl Sleep {
thread::yield_now();
} else {
debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
- self.sleep(idle_state, latch, registry);
+ self.sleep(idle_state, latch, thread);
}
}
@@ -193,7 +193,7 @@ impl Sleep {
}
#[cold]
- fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, registry: &Registry) {
+ fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
let worker_index = idle_state.worker_index;
if !latch.get_sleepy() {
@@ -210,7 +210,7 @@ impl Sleep {
debug_assert!(!*is_blocked);
// Our latch was signalled. We should wake back up fully as we
- // wil have some stuff to do.
+ // will have some stuff to do.
if !latch.fall_asleep() {
self.logger.log(|| ThreadSleepInterruptedByLatch {
worker: worker_index,
@@ -260,7 +260,7 @@ impl Sleep {
// - that job triggers the rollover over the JEC such that we don't see it
// - we are the last active worker thread
std::sync::atomic::fence(Ordering::SeqCst);
- if registry.has_injected_job() {
+ if thread.has_injected_job() {
// If we see an externally injected job, then we have to 'wake
// ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
// the one that wakes us.)
@@ -270,7 +270,7 @@ impl Sleep {
// Decrement the number of active threads and check for a deadlock
let mut data = self.data.lock().unwrap();
data.active_threads -= 1;
- data.deadlock_check(&registry.deadlock_handler);
+ data.deadlock_check(&thread.registry.deadlock_handler);
}
// If we don't see an injected job (the normal case), then flag
@@ -281,12 +281,16 @@ impl Sleep {
// that whomever is coming to wake us will have to wait until we
// release the mutex in the call to `wait`, so they will see this
// boolean as true.)
- registry.release_thread();
+ thread.registry.release_thread();
*is_blocked = true;
while *is_blocked {
is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
}
- registry.acquire_thread();
+
+ // Drop `is_blocked` now in case `acquire_thread` blocks
+ drop(is_blocked);
+
+ thread.registry.acquire_thread();
}
// Update other state:
diff --git a/vendor/rustc-rayon-core/src/spawn/mod.rs b/vendor/rustc-rayon-core/src/spawn/mod.rs
index 57b8eb874..c2a8b3b53 100644
--- a/vendor/rustc-rayon-core/src/spawn/mod.rs
+++ b/vendor/rustc-rayon-core/src/spawn/mod.rs
@@ -1,5 +1,6 @@
use crate::job::*;
use crate::registry::Registry;
+use crate::tlv::Tlv;
use crate::unwind;
use std::mem;
use std::sync::Arc;
@@ -73,7 +74,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
@@ -91,19 +92,14 @@ where
// executed. This ref is decremented at the (*) below.
registry.increment_terminate_count();
- Box::new(HeapJob::new(0, {
+ HeapJob::new(Tlv::null(), {
let registry = Arc::clone(registry);
move || {
- match unwind::halt_unwinding(func) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(func);
registry.terminate(); // (*) permit registry to terminate now
}
- }))
- .as_job_ref()
+ })
+ .into_static_job_ref()
}
/// Fires off a task into the Rayon threadpool in the "static" or
@@ -148,7 +144,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
@@ -159,7 +155,7 @@ where
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
- None => registry.inject(&[job_ref]),
+ None => registry.inject(job_ref),
}
mem::forget(abort_guard);
}
diff --git a/vendor/rustc-rayon-core/src/spawn/test.rs b/vendor/rustc-rayon-core/src/spawn/test.rs
index 761fafc77..b7a0535aa 100644
--- a/vendor/rustc-rayon-core/src/spawn/test.rs
+++ b/vendor/rustc-rayon-core/src/spawn/test.rs
@@ -7,6 +7,7 @@ use super::{spawn, spawn_fifo};
use crate::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_in_worker() {
let (tx, rx) = channel();
scope(move |_| {
@@ -16,6 +17,7 @@ fn spawn_then_join_in_worker() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_outside_worker() {
let (tx, rx) = channel();
spawn(move || tx.send(22).unwrap());
@@ -23,6 +25,7 @@ fn spawn_then_join_outside_worker() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_fwd() {
let (tx, rx) = channel();
@@ -54,6 +57,7 @@ fn panic_fwd() {
/// still active asynchronous tasks. We expect the thread-pool to stay
/// alive and executing until those threads are complete.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn termination_while_things_are_executing() {
let (tx0, rx0) = channel();
let (tx1, rx1) = channel();
@@ -80,6 +84,7 @@ fn termination_while_things_are_executing() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_spawn() {
let (tx, rx) = channel();
@@ -107,6 +112,7 @@ fn custom_panic_handler_and_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_nested_spawn() {
let (tx, rx) = channel();
@@ -165,6 +171,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
let vec = test_order!(spawn, spawn);
@@ -173,6 +180,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
let vec = test_order!(spawn_fifo, spawn_fifo);
@@ -181,6 +189,7 @@ fn fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_order!(spawn, spawn_fifo);
@@ -192,6 +201,7 @@ fn lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_order!(spawn_fifo, spawn);
@@ -229,6 +239,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
let vec = test_mixed_order!(spawn, spawn_fifo);
let expected = vec![3, -1, 2, -2, 1, -3, 0];
@@ -236,6 +247,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(spawn_fifo, spawn);
let expected = vec![0, -3, 1, -2, 2, -1, 3];
diff --git a/vendor/rustc-rayon-core/src/test.rs b/vendor/rustc-rayon-core/src/test.rs
index 600e58b11..25b8487f7 100644
--- a/vendor/rustc-rayon-core/src/test.rs
+++ b/vendor/rustc-rayon-core/src/test.rs
@@ -1,12 +1,11 @@
#![cfg(test)]
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn worker_thread_index() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -16,6 +15,7 @@ fn worker_thread_index() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn start_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -42,6 +42,7 @@ fn start_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn exit_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -71,6 +72,7 @@ fn exit_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn handler_panics_handled_correctly() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -121,7 +123,7 @@ fn handler_panics_handled_correctly() {
}
#[test]
-#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_config_build() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -137,6 +139,7 @@ fn check_error_send_sync() {
#[allow(deprecated)]
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn configuration() {
let start_handler = move |_| {};
let exit_handler = move |_| {};
@@ -144,7 +147,7 @@ fn configuration() {
let thread_name = move |i| format!("thread_name_{}", i);
// Ensure we can call all public methods on Configuration
- Configuration::new()
+ crate::Configuration::new()
.thread_name(thread_name)
.num_threads(5)
.panic_handler(panic_handler)
@@ -157,6 +160,7 @@ fn configuration() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn default_pool() {
ThreadPoolBuilder::default().build().unwrap();
}
@@ -165,6 +169,7 @@ fn default_pool() {
/// the pool is done with them, allowing them to be used with rayon again
/// later. e.g. WebAssembly want to have their own pool of available threads.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
let n_threads = 5;
let mut handles = vec![];
diff --git a/vendor/rustc-rayon-core/src/thread_pool/mod.rs b/vendor/rustc-rayon-core/src/thread_pool/mod.rs
index 0791fc4e9..fd890c9fd 100644
--- a/vendor/rustc-rayon-core/src/thread_pool/mod.rs
+++ b/vendor/rustc-rayon-core/src/thread_pool/mod.rs
@@ -3,12 +3,11 @@
//!
//! [`ThreadPool`]: struct.ThreadPool.html
+use crate::broadcast::{self, BroadcastContext};
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{scope, Scope};
use crate::{scope_fifo, ScopeFifo};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
@@ -57,7 +56,7 @@ impl ThreadPool {
#[deprecated(note = "Use `ThreadPoolBuilder::build`")]
#[allow(deprecated)]
/// Deprecated in favor of `ThreadPoolBuilder::build`.
- pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
+ pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
Self::build(configuration.into_builder()).map_err(Box::from)
}
@@ -111,6 +110,57 @@ impl ThreadPool {
self.registry.in_worker(|_, _| op())
}
+ /// Executes `op` within every thread in the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate within that
+ /// threadpool.
+ ///
+ /// Broadcasts are executed on each thread after they have exhausted their
+ /// local work queue, before they attempt work-stealing from other threads.
+ /// The goal of that strategy is to run everywhere in a timely manner
+ /// *without* being too disruptive to current work. There may be alternative
+ /// broadcast styles added in the future for more or less aggressive
+ /// injection, if the need arises.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic on one or more threads, exactly one panic
+ /// will be propagated, only after all threads have completed
+ /// (or panicked) their own `op`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// use std::sync::atomic::{AtomicUsize, Ordering};
+ ///
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
+ ///
+ /// // The argument gives context, including the index of each thread.
+ /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
+ /// assert_eq!(v, &[0, 1, 4, 9, 16]);
+ ///
+ /// // The closure can reference the local stack
+ /// let count = AtomicUsize::new(0);
+ /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
+ /// assert_eq!(count.into_inner(), 5);
+ /// }
+ /// ```
+ pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
+ where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::broadcast_in(op, &self.registry) }
+ }
+
/// Returns the (current) number of threads in the thread pool.
///
/// # Future compatibility note
@@ -278,6 +328,42 @@ impl ThreadPool {
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
}
+ /// Spawns an asynchronous task on every thread in this thread-pool. This task
+ /// will run in the implicit, global scope, which means that it may outlast the
+ /// current stack frame -- therefore, it cannot capture any references onto the
+ /// stack (you will likely need a `move` closure).
+ pub fn spawn_broadcast<OP>(&self, op: OP)
+ where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
+ }
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
+
pub(crate) fn wait_until_stopped(self) {
let registry = self.registry.clone();
drop(self);
@@ -344,3 +430,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
Some(!curr.local_deque_is_empty())
}
}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/vendor/rustc-rayon-core/src/thread_pool/test.rs b/vendor/rustc-rayon-core/src/thread_pool/test.rs
index 8de65a5e4..6143e5799 100644
--- a/vendor/rustc-rayon-core/src/thread_pool/test.rs
+++ b/vendor/rustc-rayon-core/src/thread_pool/test.rs
@@ -4,8 +4,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
#[test]
@@ -18,6 +16,7 @@ fn panic_propagate() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
@@ -45,6 +44,7 @@ fn join_a_lot(n: usize) {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
@@ -91,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
// Note: we first tried to force failure with a `usize::MAX` stack, but
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
@@ -117,6 +118,7 @@ fn failed_thread_stack() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
@@ -141,6 +143,7 @@ fn panic_thread_name() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -149,6 +152,7 @@ fn self_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -168,6 +172,7 @@ fn mutual_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
@@ -196,8 +201,9 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
- let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
+ let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
}
@@ -221,6 +227,7 @@ macro_rules! test_scope_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -228,6 +235,7 @@ fn scope_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -252,6 +260,7 @@ macro_rules! test_spawn_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -259,6 +268,7 @@ fn spawn_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -266,6 +276,7 @@ fn spawn_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
@@ -302,6 +313,7 @@ fn nested_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
@@ -338,6 +350,7 @@ fn nested_fifo_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -353,6 +366,7 @@ fn in_place_scope_no_deadlock() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -366,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
rx_ref.recv().unwrap();
});
}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
diff --git a/vendor/rustc-rayon-core/src/tlv.rs b/vendor/rustc-rayon-core/src/tlv.rs
index f035d12d9..ce22f7aa0 100644
--- a/vendor/rustc-rayon-core/src/tlv.rs
+++ b/vendor/rustc-rayon-core/src/tlv.rs
@@ -1,30 +1,31 @@
//! Allows access to the Rayon's thread local value
//! which is preserved when moving jobs across threads
-use std::cell::Cell;
+use std::{cell::Cell, ptr};
-thread_local!(pub(crate) static TLV: Cell<usize> = Cell::new(0));
+thread_local!(pub static TLV: Cell<*const ()> = const { Cell::new(ptr::null()) });
-/// Sets the current thread-local value to `value` inside the closure.
-/// The old value is restored when the closure ends
-pub fn with<F: FnOnce() -> R, R>(value: usize, f: F) -> R {
- struct Reset(usize);
- impl Drop for Reset {
- fn drop(&mut self) {
- TLV.with(|tlv| tlv.set(self.0));
- }
+#[derive(Copy, Clone)]
+pub(crate) struct Tlv(pub(crate) *const ());
+
+impl Tlv {
+ #[inline]
+ pub(crate) fn null() -> Self {
+ Self(ptr::null())
}
- let _reset = Reset(get());
- TLV.with(|tlv| tlv.set(value));
- f()
}
+unsafe impl Sync for Tlv {}
+unsafe impl Send for Tlv {}
+
/// Sets the current thread-local value
-pub fn set(value: usize) {
- TLV.with(|tlv| tlv.set(value));
+#[inline]
+pub(crate) fn set(value: Tlv) {
+ TLV.with(|tlv| tlv.set(value.0));
}
/// Returns the current thread-local value
-pub fn get() -> usize {
- TLV.with(|tlv| tlv.get())
+#[inline]
+pub(crate) fn get() -> Tlv {
+ TLV.with(|tlv| Tlv(tlv.get()))
}
diff --git a/vendor/rustc-rayon-core/tests/double_init_fail.rs b/vendor/rustc-rayon-core/tests/double_init_fail.rs
index ea06bf0e1..15915304d 100644
--- a/vendor/rustc-rayon-core/tests/double_init_fail.rs
+++ b/vendor/rustc-rayon-core/tests/double_init_fail.rs
@@ -2,9 +2,10 @@ use rayon_core::ThreadPoolBuilder;
use std::error::Error;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn double_init_fail() {
let result1 = ThreadPoolBuilder::new().build_global();
- assert_eq!(result1.unwrap(), ());
+ assert!(result1.is_ok());
let err = ThreadPoolBuilder::new().build_global().unwrap_err();
assert!(err.source().is_none());
assert_eq!(
diff --git a/vendor/rustc-rayon-core/tests/init_zero_threads.rs b/vendor/rustc-rayon-core/tests/init_zero_threads.rs
index ebd73c585..3c1ad251c 100644
--- a/vendor/rustc-rayon-core/tests/init_zero_threads.rs
+++ b/vendor/rustc-rayon-core/tests/init_zero_threads.rs
@@ -1,6 +1,7 @@
use rayon_core::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn init_zero_threads() {
ThreadPoolBuilder::new()
.num_threads(0)
diff --git a/vendor/rustc-rayon-core/tests/scoped_threadpool.rs b/vendor/rustc-rayon-core/tests/scoped_threadpool.rs
index db3d0b874..534e8bbf4 100644
--- a/vendor/rustc-rayon-core/tests/scoped_threadpool.rs
+++ b/vendor/rustc-rayon-core/tests/scoped_threadpool.rs
@@ -7,6 +7,7 @@ struct Local(i32);
scoped_tls::scoped_thread_local!(static LOCAL: Local);
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn missing_scoped_tls() {
LOCAL.set(&Local(42), || {
let pool = ThreadPoolBuilder::new()
@@ -21,6 +22,7 @@ fn missing_scoped_tls() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
@@ -63,6 +65,7 @@ fn spawn_scoped_tls_threadpool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn build_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
diff --git a/vendor/rustc-rayon-core/tests/stack_overflow_crash.rs b/vendor/rustc-rayon-core/tests/stack_overflow_crash.rs
index 61288982c..7dcde43c4 100644
--- a/vendor/rustc-rayon-core/tests/stack_overflow_crash.rs
+++ b/vendor/rustc-rayon-core/tests/stack_overflow_crash.rs
@@ -1,13 +1,14 @@
use rayon_core::ThreadPoolBuilder;
use std::env;
-use std::process::Command;
+use std::process::{Command, ExitStatus, Stdio};
#[cfg(target_os = "linux")]
use std::os::unix::process::ExitStatusExt;
fn force_stack_overflow(depth: u32) {
- let _buffer = [0u8; 1024 * 1024];
+ let mut buffer = [0u8; 1024 * 1024];
+ std::hint::black_box(&mut buffer);
if depth > 0 {
force_stack_overflow(depth - 1);
}
@@ -34,49 +35,63 @@ fn overflow_code() -> Option<i32> {
#[cfg(windows)]
fn overflow_code() -> Option<i32> {
use std::os::windows::process::ExitStatusExt;
- use std::process::ExitStatus;
ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code()
}
-fn main() {
- if env::args().len() == 1 {
- // first check that the recursivecall actually causes a stack overflow, and does not get optimized away
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("8")
- .status()
- .unwrap();
+#[test]
+#[cfg_attr(not(any(unix, windows)), ignore)]
+fn stack_overflow_crash() {
+ // First check that the recursive call actually causes a stack overflow,
+ // and does not get optimized away.
+ let status = run_ignored("run_with_small_stack");
+ assert!(!status.success());
+ #[cfg(any(unix, windows))]
+ assert_eq!(status.code(), overflow_code());
+ #[cfg(target_os = "linux")]
+ assert!(matches!(
+ status.signal(),
+ Some(libc::SIGABRT | libc::SIGSEGV)
+ ));
- #[cfg(any(unix, windows))]
- assert_eq!(status.code(), overflow_code());
+ // Now run with a larger stack and verify correct operation.
+ let status = run_ignored("run_with_large_stack");
+ assert_eq!(status.code(), Some(0));
+ #[cfg(target_os = "linux")]
+ assert_eq!(status.signal(), None);
+}
- #[cfg(target_os = "linux")]
- assert!(
- status.signal() == Some(11 /*SIGABRT*/) || status.signal() == Some(6 /*SIGSEGV*/)
- );
- }
+fn run_ignored(test: &str) -> ExitStatus {
+ Command::new(env::current_exe().unwrap())
+ .arg("--ignored")
+ .arg("--exact")
+ .arg(test)
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status()
+ .unwrap()
+}
- // now run with a larger stack and verify correct operation
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("48")
- .status()
- .unwrap();
- assert_eq!(status.code(), Some(0));
- #[cfg(target_os = "linux")]
- assert_eq!(status.signal(), None);
- }
- } else {
- let stack_size_in_mb: usize = env::args().nth(1).unwrap().parse().unwrap();
- let pool = ThreadPoolBuilder::new()
- .stack_size(stack_size_in_mb * 1024 * 1024)
- .build()
- .unwrap();
- pool.install(|| {
- #[cfg(unix)]
- disable_core();
- force_stack_overflow(32);
- });
- }
+#[test]
+#[ignore]
+fn run_with_small_stack() {
+ run_with_stack(8);
+}
+
+#[test]
+#[ignore]
+fn run_with_large_stack() {
+ run_with_stack(48);
+}
+
+fn run_with_stack(stack_size_in_mb: usize) {
+ let pool = ThreadPoolBuilder::new()
+ .stack_size(stack_size_in_mb * 1024 * 1024)
+ .build()
+ .unwrap();
+ pool.install(|| {
+ #[cfg(unix)]
+ disable_core();
+ force_stack_overflow(32);
+ });
}