summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gix-features/src/parallel/mod.rs')
-rw-r--r--vendor/gix-features/src/parallel/mod.rs179
1 files changed, 179 insertions, 0 deletions
diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs
new file mode 100644
index 000000000..c994cb3b8
--- /dev/null
+++ b/vendor/gix-features/src/parallel/mod.rs
@@ -0,0 +1,179 @@
+//! Run computations in parallel, or not based the `parallel` feature toggle.
+//!
+//! ### in_parallel(…)
+//!
+//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
+//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
+//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
+//!
+//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail.
+//!
+//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
+//! or the data to work on.
+//!
+//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
+//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
+//! aggregation.
+//!
+//! ### `reduce::Stepwise`
+//!
+//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
+//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
+//! buffered, the owner of the iterator controls the progress made.
+//!
+//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
+//! is functionally equivalent to calling [`in_parallel()`].
+//!
+//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
+//! the iterator will wind down the computation without any result.
+//!
+//! #### Maintaining Safety
+//!
+//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
+//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
+//! suitable input iterators as long as they can hold something on the heap.
+#[cfg(feature = "parallel")]
+mod in_parallel;
+#[cfg(feature = "parallel")]
+pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+
+mod serial;
+#[cfg(not(feature = "parallel"))]
+pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+
+mod in_order;
+pub use in_order::{InOrderIter, SequenceId};
+
+mod eager_iter;
+pub use eager_iter::{EagerIter, EagerIterIf};
+
+/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
+/// when the `parallel` feature toggle is not set.
+#[cfg(not(feature = "parallel"))]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ _num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ _available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ (desired_chunk_size, thread_limit, num_threads(thread_limit))
+}
+
+/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
+/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
+///
+/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
+/// * `num_items` is the total amount of items in the iteration, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `thread_limit` is the amount of threads to use at most, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `available_threads` is the total amount of threads available, if `Some`.
+/// Otherwise the actual amount of available threads is determined by querying the system.
+///
+/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
+#[cfg(feature = "parallel")]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ let available_threads =
+ available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1));
+ let available_threads = thread_limit
+ .map(|l| if l == 0 { available_threads } else { l })
+ .unwrap_or(available_threads);
+
+ let (lower, upper) = (50, 1000);
+ let (chunk_size, thread_limit) = num_items
+ .map(|num_items| {
+ let desired_chunks_per_thread_at_least = 2;
+ let items = num_items;
+ let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
+ let num_chunks = items / chunk_size;
+ let thread_limit = if num_chunks <= available_threads {
+ (num_chunks / desired_chunks_per_thread_at_least).max(1)
+ } else {
+ available_threads
+ };
+ (chunk_size, thread_limit)
+ })
+ .unwrap_or({
+ let chunk_size = if available_threads == 1 {
+ desired_chunk_size
+ } else if desired_chunk_size < lower {
+ lower
+ } else {
+ desired_chunk_size.min(upper)
+ };
+ (chunk_size, available_threads)
+ });
+ (chunk_size, Some(thread_limit), thread_limit)
+}
+
+/// Always returns 1, available when the `parallel` feature toggle is unset.
+#[cfg(not(feature = "parallel"))]
+pub fn num_threads(_thread_limit: Option<usize>) -> usize {
+ 1
+}
+
+/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
+///
+/// Only available with the `parallel` feature toggle set.
+#[cfg(feature = "parallel")]
+pub fn num_threads(thread_limit: Option<usize>) -> usize {
+ let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
+ thread_limit
+ .map(|l| if l == 0 { logical_cores } else { l })
+ .unwrap_or(logical_cores)
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+#[cfg(feature = "parallel")]
+pub fn in_parallel_if<I, S, O, R>(
+ condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S + Send + Clone,
+ consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ if num_threads(thread_limit) > 1 && condition() {
+ in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ } else {
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ }
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+///
+/// Note that the non-parallel version is equivalent to [`in_parallel()`].
+#[cfg(not(feature = "parallel"))]
+pub fn in_parallel_if<I, S, O, R>(
+ _condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I>,
+ thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S,
+ consume: impl Fn(I, &mut S) -> O,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+}
+
+///
+pub mod reduce;
+pub use reduce::Reduce;