diff options
Diffstat (limited to 'vendor/gix-features/src/parallel/mod.rs')
-rw-r--r-- | vendor/gix-features/src/parallel/mod.rs | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs index c994cb3b8..ac644acdd 100644 --- a/vendor/gix-features/src/parallel/mod.rs +++ b/vendor/gix-features/src/parallel/mod.rs @@ -1,12 +1,12 @@ //! Run computations in parallel, or not based the `parallel` feature toggle. //! -//! ### in_parallel(…) +//! ### `in_parallel`(…) //! //! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage //! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling //! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. //! -//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail. +//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail. //! //! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself //! or the data to work on. @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -80,13 +80,21 @@ pub fn optimize_chunk_size_and_thread_limit( ) -> (usize, Option<usize>, usize) { let available_threads = available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); - let available_threads = thread_limit - .map(|l| if l == 0 { available_threads } else { l }) - .unwrap_or(available_threads); + let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); let (lower, upper) = (50, 1000); - let (chunk_size, thread_limit) = num_items - .map(|num_items| { + let (chunk_size, thread_limit) = num_items.map_or( + { + let chunk_size = if available_threads == 1 { + desired_chunk_size + } else if desired_chunk_size < lower { + lower + } else { + desired_chunk_size.min(upper) + }; + (chunk_size, available_threads) + }, + |num_items| { let desired_chunks_per_thread_at_least = 2; let items = num_items; let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); @@ -97,17 +105,8 @@ pub fn optimize_chunk_size_and_thread_limit( available_threads }; (chunk_size, thread_limit) - }) - .unwrap_or({ - let chunk_size = if available_threads == 1 { - desired_chunk_size - } else if desired_chunk_size < lower { - lower - } else { - desired_chunk_size.min(upper) - }; - (chunk_size, available_threads) - }); + }, + ); (chunk_size, Some(thread_limit), thread_limit) } @@ -123,9 +122,7 @@ pub fn num_threads(_thread_limit: Option<usize>) -> usize { #[cfg(feature = "parallel")] pub fn num_threads(thread_limit: Option<usize>) -> usize { let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); - thread_limit - .map(|l| if l == 0 { logical_cores } else { l }) - .unwrap_or(logical_cores) + thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) } /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. @@ -137,7 +134,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where @@ -163,7 +160,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I>, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where |