diff options
Diffstat (limited to 'vendor/gix-features/src/parallel/in_parallel.rs')
-rw-r--r-- | vendor/gix-features/src/parallel/in_parallel.rs | 80 |
1 files changed, 77 insertions, 3 deletions
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs index 241565b62..a80762bad 100644 --- a/vendor/gix-features/src/parallel/in_parallel.rs +++ b/vendor/gix-features/src/parallel/in_parallel.rs @@ -49,7 +49,7 @@ pub fn build_thread() -> std::thread::Builder { pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, - new_thread_state: impl Fn(usize) -> S + Send + Clone, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> @@ -102,6 +102,80 @@ where }) } +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +pub fn in_parallel_with_finalize<I, S, O, R>( + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + let num_threads = num_threads(thread_limit); + std::thread::scope(move |s| { + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads); + for thread_id in 0..num_threads { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel.produce.{thread_id}")) + .spawn_scoped(s, { + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); + let finalize = finalize.clone(); + move || { + let mut state = new_thread_state(thread_id); + let mut can_send = true; + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + can_send = false; + break; + } + } + if can_send { + send_result.send(finalize(state)).ok(); + } + } + }) + .expect("valid name"); + } + std::thread::Builder::new() + .name("gitoxide.in_parallel.feed".into()) + .spawn_scoped(s, move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + }) + .expect("valid name"); + receive_result + }; + + for item in receive_result { + drop(reducer.feed(item)?); + } + reducer.finalize() + }) +} + /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. @@ -117,7 +191,7 @@ where pub fn in_parallel_with_slice<I, S, R, E>( input: &mut [I], thread_limit: Option<usize>, - new_thread_state: impl FnMut(usize) -> S + Send + Clone, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration> + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, @@ -168,7 +242,7 @@ where std::thread::Builder::new() .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}")) .spawn_scoped(s, { - let mut new_thread_state = new_thread_state.clone(); + let new_thread_state = new_thread_state.clone(); let state_to_rval = state_to_rval.clone(); let mut consume = consume.clone(); let input = Input(input as *mut [I]); |