summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/in_parallel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gix-features/src/parallel/in_parallel.rs')
-rw-r--r--vendor/gix-features/src/parallel/in_parallel.rs80
1 files changed, 77 insertions, 3 deletions
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs
index 241565b62..a80762bad 100644
--- a/vendor/gix-features/src/parallel/in_parallel.rs
+++ b/vendor/gix-features/src/parallel/in_parallel.rs
@@ -49,7 +49,7 @@ pub fn build_thread() -> std::thread::Builder {
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
- new_thread_state: impl Fn(usize) -> S + Send + Clone,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
@@ -102,6 +102,80 @@ where
})
}
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
+/// * For `reducer`, see the [`Reduce`] trait
+pub fn in_parallel_with_finalize<I, S, O, R>(
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
+ finalize: impl FnOnce(S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ std::thread::scope(move |s| {
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let mut consume = consume.clone();
+ let finalize = finalize.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ let mut can_send = true;
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ can_send = false;
+ break;
+ }
+ }
+ if can_send {
+ send_result.send(finalize(state)).ok();
+ }
+ }
+ })
+ .expect("valid name");
+ }
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel.feed".into())
+ .spawn_scoped(s, move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ })
+ .expect("valid name");
+ receive_result
+ };
+
+ for item in receive_result {
+ drop(reducer.feed(item)?);
+ }
+ reducer.finalize()
+ })
+}
+
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
@@ -117,7 +191,7 @@ where
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
thread_limit: Option<usize>,
- new_thread_state: impl FnMut(usize) -> S + Send + Clone,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
state_to_rval: impl FnOnce(S) -> R + Send + Clone,
@@ -168,7 +242,7 @@ where
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}"))
.spawn_scoped(s, {
- let mut new_thread_state = new_thread_state.clone();
+ let new_thread_state = new_thread_state.clone();
let state_to_rval = state_to_rval.clone();
let mut consume = consume.clone();
let input = Input(input as *mut [I]);