summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/gix-features/src/parallel
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src/parallel')
-rw-r--r--vendor/gix-features/src/parallel/in_parallel.rs66
-rw-r--r--vendor/gix-features/src/parallel/mod.rs47
-rw-r--r--vendor/gix-features/src/parallel/serial.rs46
3 files changed, 90 insertions, 69 deletions
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs
index e1e2cc3e3..241565b62 100644
--- a/vendor/gix-features/src/parallel/in_parallel.rs
+++ b/vendor/gix-features/src/parallel/in_parallel.rs
@@ -1,7 +1,10 @@
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::parallel::{num_threads, Reduce};
+/// A scope to start threads within.
+pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>;
+
/// Runs `left` and `right` in parallel, returning their output when both are done.
pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
std::thread::scope(|s| {
@@ -47,7 +50,7 @@ pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Clone,
- consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
@@ -67,7 +70,7 @@ where
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
- let consume = consume.clone();
+ let mut consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
@@ -103,12 +106,19 @@ where
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
+/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation.
+/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`),
+/// which allows callees to implement their own work-stealing in case the work is distributed unevenly.
+/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice
+/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads,
+/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust
+/// the thread-count accordingly.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
thread_limit: Option<usize>,
new_thread_state: impl FnMut(usize) -> S + Send + Clone,
- consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone,
+ consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
state_to_rval: impl FnOnce(S) -> R + Send + Clone,
) -> Result<Vec<R>, E>
@@ -121,8 +131,8 @@ where
let mut results = Vec::with_capacity(num_threads);
let stop_everything = &AtomicBool::default();
let index = &AtomicUsize::default();
+ let threads_left = &AtomicIsize::new(num_threads as isize);
- // TODO: use std::thread::scope() once Rust 1.63 is available.
std::thread::scope({
move |s| {
std::thread::Builder::new()
@@ -163,29 +173,35 @@ where
let mut consume = consume.clone();
let input = Input(input as *mut [I]);
move || {
+ let _ = &input;
+ threads_left.fetch_sub(1, Ordering::SeqCst);
let mut state = new_thread_state(thread_id);
- while let Ok(input_index) =
- index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- (x < input_len).then_some(x + 1)
- })
- {
- if stop_everything.load(Ordering::Relaxed) {
- break;
- }
- // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
- // each item exactly once.
- let item = {
- #[allow(unsafe_code)]
- unsafe {
- &mut (&mut *input.0)[input_index]
+ let res = (|| {
+ while let Ok(input_index) =
+ index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
+ (x < input_len).then_some(x + 1)
+ })
+ {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+ // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
+ // each item exactly once.
+ let item = {
+ #[allow(unsafe_code)]
+ unsafe {
+ &mut (&mut *input.0)[input_index]
+ }
+ };
+ if let Err(err) = consume(item, &mut state, threads_left, stop_everything) {
+ stop_everything.store(true, Ordering::Relaxed);
+ return Err(err);
}
- };
- if let Err(err) = consume(item, &mut state) {
- stop_everything.store(true, Ordering::Relaxed);
- return Err(err);
}
- }
- Ok(state_to_rval(state))
+ Ok(state_to_rval(state))
+ })();
+ threads_left.fetch_add(1, Ordering::SeqCst);
+ res
}
})
.expect("valid name")
diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs
index c994cb3b8..ac644acdd 100644
--- a/vendor/gix-features/src/parallel/mod.rs
+++ b/vendor/gix-features/src/parallel/mod.rs
@@ -1,12 +1,12 @@
//! Run computations in parallel, or not based the `parallel` feature toggle.
//!
-//! ### in_parallel(…)
+//! ### `in_parallel`(…)
//!
//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
//!
-//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail.
+//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail.
//!
//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
//! or the data to work on.
@@ -35,11 +35,11 @@
#[cfg(feature = "parallel")]
mod in_parallel;
#[cfg(feature = "parallel")]
-pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
mod serial;
#[cfg(not(feature = "parallel"))]
-pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
mod in_order;
pub use in_order::{InOrderIter, SequenceId};
@@ -80,13 +80,21 @@ pub fn optimize_chunk_size_and_thread_limit(
) -> (usize, Option<usize>, usize) {
let available_threads =
available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1));
- let available_threads = thread_limit
- .map(|l| if l == 0 { available_threads } else { l })
- .unwrap_or(available_threads);
+ let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l });
let (lower, upper) = (50, 1000);
- let (chunk_size, thread_limit) = num_items
- .map(|num_items| {
+ let (chunk_size, thread_limit) = num_items.map_or(
+ {
+ let chunk_size = if available_threads == 1 {
+ desired_chunk_size
+ } else if desired_chunk_size < lower {
+ lower
+ } else {
+ desired_chunk_size.min(upper)
+ };
+ (chunk_size, available_threads)
+ },
+ |num_items| {
let desired_chunks_per_thread_at_least = 2;
let items = num_items;
let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
@@ -97,17 +105,8 @@ pub fn optimize_chunk_size_and_thread_limit(
available_threads
};
(chunk_size, thread_limit)
- })
- .unwrap_or({
- let chunk_size = if available_threads == 1 {
- desired_chunk_size
- } else if desired_chunk_size < lower {
- lower
- } else {
- desired_chunk_size.min(upper)
- };
- (chunk_size, available_threads)
- });
+ },
+ );
(chunk_size, Some(thread_limit), thread_limit)
}
@@ -123,9 +122,7 @@ pub fn num_threads(_thread_limit: Option<usize>) -> usize {
#[cfg(feature = "parallel")]
pub fn num_threads(thread_limit: Option<usize>) -> usize {
let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
- thread_limit
- .map(|l| if l == 0 { logical_cores } else { l })
- .unwrap_or(logical_cores)
+ thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l })
}
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
@@ -137,7 +134,7 @@ pub fn in_parallel_if<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Clone,
- consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
@@ -163,7 +160,7 @@ pub fn in_parallel_if<I, S, O, R>(
input: impl Iterator<Item = I>,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
- consume: impl Fn(I, &mut S) -> O,
+ consume: impl FnMut(I, &mut S) -> O,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
index 00723b2c3..3511c73e3 100644
--- a/vendor/gix-features/src/parallel/serial.rs
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -2,14 +2,17 @@ use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
+ use std::sync::atomic::{AtomicBool, AtomicIsize};
+
/// Runs `left` and then `right`, one after another, returning their output when both are done.
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
/// A scope for spawning threads.
- pub struct Scope<'env> {
- _marker: std::marker::PhantomData<&'env mut &'env ()>,
+ pub struct Scope<'scope, 'env: 'scope> {
+ _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
+ _env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
@@ -20,32 +23,31 @@ mod not_parallel {
}
#[allow(unsafe_code)]
- unsafe impl Sync for Scope<'_> {}
+ unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
- pub fn spawn_scoped<'a, 'env, F, T>(
+ pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
- scope: &'a Scope<'env>,
+ scope: &'scope Scope<'scope, 'env>,
f: F,
- ) -> std::io::Result<ScopedJoinHandle<'a, T>>
+ ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
Ok(scope.spawn(f))
}
}
- impl<'env> Scope<'env> {
- pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ impl<'scope, 'env> Scope<'scope, 'env> {
+ /// Provided with this scope, let `f` start new threads that live within it.
+ pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
ScopedJoinHandle {
result: f(),
@@ -58,10 +60,11 @@ mod not_parallel {
/// Note that this implementation will run the spawned functions immediately.
pub fn threads<'env, F, R>(f: F) -> R
where
- F: FnOnce(&Scope<'env>) -> R,
+ F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
- _marker: Default::default(),
+ _scope: Default::default(),
+ _env: Default::default(),
})
}
@@ -79,6 +82,9 @@ mod not_parallel {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
+ pub fn is_finished(&self) -> bool {
+ true
+ }
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
@@ -89,13 +95,15 @@ mod not_parallel {
input: &mut [I],
_thread_limit: Option<usize>,
mut new_thread_state: impl FnMut(usize) -> S + Clone,
- mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
+ mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
+ let should_interrupt = &AtomicBool::default();
+ let threads_left = &AtomicIsize::default();
for item in input {
- consume(item, &mut state)?;
+ consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
@@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
- consume: impl Fn(I, &mut S) -> O,
+ mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where