summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features-0.35.0/src/parallel
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-07 05:48:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-07 05:48:48 +0000
commitef24de24a82fe681581cc130f342363c47c0969a (patch)
tree0d494f7e1a38b95c92426f58fe6eaa877303a86c /vendor/gix-features-0.35.0/src/parallel
parentReleasing progress-linux version 1.74.1+dfsg1-1~progress7.99u1. (diff)
downloadrustc-ef24de24a82fe681581cc130f342363c47c0969a.tar.xz
rustc-ef24de24a82fe681581cc130f342363c47c0969a.zip
Merging upstream version 1.75.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features-0.35.0/src/parallel')
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/eager_iter.rs124
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/in_order.rs83
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/in_parallel.rs301
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/mod.rs178
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/reduce.rs279
-rw-r--r--vendor/gix-features-0.35.0/src/parallel/serial.rs174
6 files changed, 1139 insertions, 0 deletions
diff --git a/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs b/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs
new file mode 100644
index 000000000..9a1735f72
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs
@@ -0,0 +1,124 @@
+/// Evaluate any iterator in their own thread.
+///
+/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations.
+/// Use [`EagerIter::new()`] for instantiation.
+pub struct EagerIter<I: Iterator> {
+ receiver: std::sync::mpsc::Receiver<Vec<I::Item>>,
+ chunk: Option<std::vec::IntoIter<I::Item>>,
+ size_hint: (usize, Option<usize>),
+}
+
+impl<I> EagerIter<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ /// Return a new `EagerIter` which evaluates `iter` in its own thread,
+ /// with a given `chunk_size` allowing a maximum `chunks_in_flight`.
+ ///
+ /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`.
+ /// This helps to reduce the overhead imposed by transferring many small items.
+ /// If this number is 1, each item will become a single chunk. 0 is invalid.
+ /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items
+ /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter`
+ /// if its items aren't consumed fast enough.
+ pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
+ let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight);
+ let size_hint = iter.size_hint();
+ assert!(chunk_size > 0, "non-zero chunk size is needed");
+
+ std::thread::spawn(move || {
+ let mut out = Vec::with_capacity(chunk_size);
+ for item in iter {
+ out.push(item);
+ if out.len() == chunk_size {
+ if sender.send(out).is_err() {
+ return;
+ }
+ out = Vec::with_capacity(chunk_size);
+ }
+ }
+ if !out.is_empty() {
+ sender.send(out).ok();
+ }
+ });
+ EagerIter {
+ receiver,
+ chunk: None,
+ size_hint,
+ }
+ }
+
+ fn fill_buf_and_pop(&mut self) -> Option<I::Item> {
+ self.chunk = self.receiver.recv().ok().map(|v| {
+ assert!(!v.is_empty());
+ v.into_iter()
+ });
+ self.chunk.as_mut().and_then(Iterator::next)
+ }
+}
+
+impl<I> Iterator for EagerIter<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.chunk.as_mut() {
+ Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()),
+ None => self.fill_buf_and_pop(),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.size_hint
+ }
+}
+
+/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition.
+pub enum EagerIterIf<I: Iterator> {
+ /// A separate thread will eagerly evaluate iterator `I`.
+ Eager(EagerIter<I>),
+ /// The current thread evaluates `I`.
+ OnDemand(I),
+}
+
+impl<I> EagerIterIf<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ /// Return a new `EagerIterIf` if `condition()` returns true.
+ ///
+ /// For all other parameters, please see [`EagerIter::new()`].
+ pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
+ if condition() {
+ EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight))
+ } else {
+ EagerIterIf::OnDemand(iter)
+ }
+ }
+}
+impl<I> Iterator for EagerIterIf<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self {
+ EagerIterIf::OnDemand(i) => i.next(),
+ EagerIterIf::Eager(i) => i.next(),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self {
+ EagerIterIf::OnDemand(i) => i.size_hint(),
+ EagerIterIf::Eager(i) => i.size_hint(),
+ }
+ }
+}
diff --git a/vendor/gix-features-0.35.0/src/parallel/in_order.rs b/vendor/gix-features-0.35.0/src/parallel/in_order.rs
new file mode 100644
index 000000000..7928ac692
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/in_order.rs
@@ -0,0 +1,83 @@
+use std::{cmp::Ordering, collections::BTreeMap};
+
+/// A counter for items that are in sequence, to be able to put them back into original order later.
+pub type SequenceId = usize;
+
+/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order.
+pub struct InOrderIter<T, I> {
+ /// The iterator yielding the out-of-order elements we are to yield in order.
+ pub inner: I,
+ store: BTreeMap<SequenceId, T>,
+ next_chunk: SequenceId,
+ is_done: bool,
+}
+
+impl<T, E, I> From<I> for InOrderIter<T, I>
+where
+ I: Iterator<Item = Result<(SequenceId, T), E>>,
+{
+ fn from(iter: I) -> Self {
+ InOrderIter {
+ inner: iter,
+ store: Default::default(),
+ next_chunk: 0,
+ is_done: false,
+ }
+ }
+}
+
+impl<T, E, I> Iterator for InOrderIter<T, I>
+where
+ I: Iterator<Item = Result<(SequenceId, T), E>>,
+{
+ type Item = Result<T, E>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.is_done {
+ return None;
+ }
+ 'find_next_in_sequence: loop {
+ match self.inner.next() {
+ Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
+ Ordering::Equal => {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ Ordering::Less => {
+ unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
+ }
+ Ordering::Greater => {
+ let previous = self.store.insert(c, v);
+ assert!(
+ previous.is_none(),
+ "Chunks are returned only once, input is an invalid sequence"
+ );
+ if let Some(v) = self.store.remove(&self.next_chunk) {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ continue 'find_next_in_sequence;
+ }
+ },
+ Some(Err(e)) => {
+ self.is_done = true;
+ self.store.clear();
+ return Some(Err(e));
+ }
+ None => match self.store.remove(&self.next_chunk) {
+ Some(v) => {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ None => {
+ debug_assert!(
+ self.store.is_empty(),
+ "When iteration is done we should not have stored items left"
+ );
+ return None;
+ }
+ },
+ }
+ }
+ }
+}
diff --git a/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs b/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs
new file mode 100644
index 000000000..a80762bad
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs
@@ -0,0 +1,301 @@
+use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+
+use crate::parallel::{num_threads, Reduce};
+
+/// A scope to start threads within.
+pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>;
+
+/// Runs `left` and `right` in parallel, returning their output when both are done.
+pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
+ std::thread::scope(|s| {
+ let left = std::thread::Builder::new()
+ .name("gitoxide.join.left".into())
+ .spawn_scoped(s, left)
+ .expect("valid name");
+ let right = std::thread::Builder::new()
+ .name("gitoxide.join.right".into())
+ .spawn_scoped(s, right)
+ .expect("valid name");
+ (left.join().unwrap(), right.join().unwrap())
+ })
+}
+
+/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
+/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
+///
+/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not
+/// connect each other with channels as deadlock would occur in single-threaded mode.
+pub fn threads<'env, F, R>(f: F) -> R
+where
+ F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R,
+{
+ std::thread::scope(f)
+}
+
+/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
+pub fn build_thread() -> std::thread::Builder {
+ std::thread::Builder::new()
+}
+
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * For `reducer`, see the [`Reduce`] trait
+pub fn in_parallel<I, S, O, R>(
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ std::thread::scope(move |s| {
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let mut consume = consume.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ break;
+ }
+ }
+ }
+ })
+ .expect("valid name");
+ }
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel.feed".into())
+ .spawn_scoped(s, move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ })
+ .expect("valid name");
+ receive_result
+ };
+
+ for item in receive_result {
+ drop(reducer.feed(item)?);
+ }
+ reducer.finalize()
+ })
+}
+
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
+/// * For `reducer`, see the [`Reduce`] trait
+pub fn in_parallel_with_finalize<I, S, O, R>(
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
+ finalize: impl FnOnce(S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ std::thread::scope(move |s| {
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let mut consume = consume.clone();
+ let finalize = finalize.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ let mut can_send = true;
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ can_send = false;
+ break;
+ }
+ }
+ if can_send {
+ send_result.send(finalize(state)).ok();
+ }
+ }
+ })
+ .expect("valid name");
+ }
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel.feed".into())
+ .spawn_scoped(s, move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ })
+ .expect("valid name");
+ receive_result
+ };
+
+ for item in receive_result {
+ drop(reducer.feed(item)?);
+ }
+ reducer.finalize()
+ })
+}
+
+/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
+/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
+/// for file-io as it won't make use of sorted inputs well.
+/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
+/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation.
+/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`),
+/// which allows callees to implement their own work-stealing in case the work is distributed unevenly.
+/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice
+/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads,
+/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust
+/// the thread-count accordingly.
+// TODO: better docs
+pub fn in_parallel_with_slice<I, S, R, E>(
+ input: &mut [I],
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
+ consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
+ mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
+ state_to_rval: impl FnOnce(S) -> R + Send + Clone,
+) -> Result<Vec<R>, E>
+where
+ I: Send,
+ E: Send,
+ R: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ let mut results = Vec::with_capacity(num_threads);
+ let stop_everything = &AtomicBool::default();
+ let index = &AtomicUsize::default();
+ let threads_left = &AtomicIsize::new(num_threads as isize);
+
+ std::thread::scope({
+ move |s| {
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel_with_slice.watch-interrupts".into())
+ .spawn_scoped(s, {
+ move || loop {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+
+ match periodic() {
+ Some(duration) => std::thread::sleep(duration),
+ None => {
+ stop_everything.store(true, Ordering::Relaxed);
+ break;
+ }
+ }
+ }
+ })
+ .expect("valid name");
+
+ let input_len = input.len();
+ struct Input<I>(*mut [I])
+ where
+ I: Send;
+
+ // SAFETY: I is Send + Sync, so is a *mut [I]
+ #[allow(unsafe_code)]
+ unsafe impl<I> Send for Input<I> where I: Send {}
+
+ let threads: Vec<_> = (0..num_threads)
+ .map(|thread_id| {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let new_thread_state = new_thread_state.clone();
+ let state_to_rval = state_to_rval.clone();
+ let mut consume = consume.clone();
+ let input = Input(input as *mut [I]);
+ move || {
+ let _ = &input;
+ threads_left.fetch_sub(1, Ordering::SeqCst);
+ let mut state = new_thread_state(thread_id);
+ let res = (|| {
+ while let Ok(input_index) =
+ index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
+ (x < input_len).then_some(x + 1)
+ })
+ {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+ // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
+ // each item exactly once.
+ let item = {
+ #[allow(unsafe_code)]
+ unsafe {
+ &mut (&mut *input.0)[input_index]
+ }
+ };
+ if let Err(err) = consume(item, &mut state, threads_left, stop_everything) {
+ stop_everything.store(true, Ordering::Relaxed);
+ return Err(err);
+ }
+ }
+ Ok(state_to_rval(state))
+ })();
+ threads_left.fetch_add(1, Ordering::SeqCst);
+ res
+ }
+ })
+ .expect("valid name")
+ })
+ .collect();
+ for thread in threads {
+ match thread.join() {
+ Ok(res) => {
+ results.push(res?);
+ }
+ Err(err) => {
+ // a panic happened, stop the world gracefully (even though we panic later)
+ stop_everything.store(true, Ordering::Relaxed);
+ std::panic::resume_unwind(err);
+ }
+ }
+ }
+
+ stop_everything.store(true, Ordering::Relaxed);
+ Ok(results)
+ }
+ })
+}
diff --git a/vendor/gix-features-0.35.0/src/parallel/mod.rs b/vendor/gix-features-0.35.0/src/parallel/mod.rs
new file mode 100644
index 000000000..5a0a4b589
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/mod.rs
@@ -0,0 +1,178 @@
+//! Run computations in parallel, or not based the `parallel` feature toggle.
+//!
+//! ### `in_parallel`(…)
+//!
+//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
+//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
+//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
+//!
+//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail.
+//!
+//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
+//! or the data to work on.
+//!
+//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
+//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
+//! aggregation.
+//!
+//! ### `reduce::Stepwise`
+//!
+//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
+//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
+//! buffered, the owner of the iterator controls the progress made.
+//!
+//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
+//! is functionally equivalent to calling [`in_parallel()`].
+//!
+//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
+//! the iterator will wind down the computation without any result.
+//!
+//! #### Maintaining Safety
+//!
+//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
+//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
+//! suitable input iterators as long as they can hold something on the heap.
+#[cfg(feature = "parallel")]
+mod in_parallel;
+#[cfg(feature = "parallel")]
+pub use in_parallel::{
+ build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope,
+};
+
+mod serial;
+#[cfg(not(feature = "parallel"))]
+pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope};
+
+mod in_order;
+pub use in_order::{InOrderIter, SequenceId};
+
+mod eager_iter;
+pub use eager_iter::{EagerIter, EagerIterIf};
+
+/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
+/// when the `parallel` feature toggle is not set.
+#[cfg(not(feature = "parallel"))]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ _num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ _available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ (desired_chunk_size, thread_limit, num_threads(thread_limit))
+}
+
+/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
+/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
+///
+/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
+/// * `num_items` is the total amount of items in the iteration, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `thread_limit` is the amount of threads to use at most, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `available_threads` is the total amount of threads available, if `Some`.
+/// Otherwise the actual amount of available threads is determined by querying the system.
+///
+/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
+#[cfg(feature = "parallel")]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ let available_threads =
+ available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into));
+ let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l });
+
+ let (lower, upper) = (50, 1000);
+ let (chunk_size, thread_limit) = num_items.map_or(
+ {
+ let chunk_size = if available_threads == 1 {
+ desired_chunk_size
+ } else if desired_chunk_size < lower {
+ lower
+ } else {
+ desired_chunk_size.min(upper)
+ };
+ (chunk_size, available_threads)
+ },
+ |num_items| {
+ let desired_chunks_per_thread_at_least = 2;
+ let items = num_items;
+ let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
+ let num_chunks = items / chunk_size;
+ let thread_limit = if num_chunks <= available_threads {
+ (num_chunks / desired_chunks_per_thread_at_least).max(1)
+ } else {
+ available_threads
+ };
+ (chunk_size, thread_limit)
+ },
+ );
+ (chunk_size, Some(thread_limit), thread_limit)
+}
+
+/// Always returns 1, available when the `parallel` feature toggle is unset.
+#[cfg(not(feature = "parallel"))]
+pub fn num_threads(_thread_limit: Option<usize>) -> usize {
+ 1
+}
+
+/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
+///
+/// Only available with the `parallel` feature toggle set.
+#[cfg(feature = "parallel")]
+pub fn num_threads(thread_limit: Option<usize>) -> usize {
+ let logical_cores = std::thread::available_parallelism().map_or(1, Into::into);
+ thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l })
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+#[cfg(feature = "parallel")]
+pub fn in_parallel_if<I, S, O, R>(
+ condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ if num_threads(thread_limit) > 1 && condition() {
+ in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ } else {
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ }
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+///
+/// Note that the non-parallel version is equivalent to [`in_parallel()`].
+#[cfg(not(feature = "parallel"))]
+pub fn in_parallel_if<I, S, O, R>(
+ _condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I>,
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S,
+ consume: impl FnMut(I, &mut S) -> O,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+}
+
+///
+pub mod reduce;
+pub use reduce::Reduce;
diff --git a/vendor/gix-features-0.35.0/src/parallel/reduce.rs b/vendor/gix-features-0.35.0/src/parallel/reduce.rs
new file mode 100644
index 000000000..f9992cfd2
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/reduce.rs
@@ -0,0 +1,279 @@
+#[cfg(feature = "parallel")]
+mod stepped {
+ use crate::parallel::num_threads;
+
+ /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
+ /// for details.
+ pub struct Stepwise<Reduce: super::Reduce> {
+ /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops
+ /// as sending results fails when the receiver is dropped.
+ receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
+ /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
+ /// that happens, they break out of their loops.
+ threads: Vec<std::thread::JoinHandle<()>>,
+ /// The reducer is called only in the thread using the iterator, dropping it has no side effects.
+ reducer: Option<Reduce>,
+ }
+
+ impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
+ fn drop(&mut self) {
+ let (_, sink) = std::sync::mpsc::channel();
+ drop(std::mem::replace(&mut self.receive_result, sink));
+
+ let mut last_err = None;
+ for handle in std::mem::take(&mut self.threads) {
+ if let Err(err) = handle.join() {
+ last_err = Some(err);
+ };
+ }
+ if let Some(thread_err) = last_err {
+ std::panic::resume_unwind(thread_err);
+ }
+ }
+ }
+
+ impl<Reduce: super::Reduce> Stepwise<Reduce> {
+ /// Instantiate a new iterator and start working in threads.
+ /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
+ pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
+ input: InputIter,
+ thread_limit: Option<usize>,
+ new_thread_state: ThreadStateFn,
+ consume: ConsumeFn,
+ reducer: Reduce,
+ ) -> Self
+ where
+ InputIter: Iterator<Item = I> + Send + 'static,
+ ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
+ ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
+ Reduce: super::Reduce<Input = O> + 'static,
+ I: Send + 'static,
+ O: Send + 'static,
+ {
+ let num_threads = num_threads(thread_limit);
+ let mut threads = Vec::with_capacity(num_threads + 1);
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ let handle = std::thread::spawn({
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let consume = consume.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ break;
+ }
+ }
+ }
+ });
+ threads.push(handle);
+ }
+ threads.push(std::thread::spawn(move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ }));
+ receive_result
+ };
+ Stepwise {
+ threads,
+ receive_result,
+ reducer: Some(reducer),
+ }
+ }
+
+ /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
+ pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
+ for value in self.by_ref() {
+ drop(value?);
+ }
+ self.reducer
+ .take()
+ .expect("this is the last call before consumption")
+ .finalize()
+ }
+ }
+
+ impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
+ type Item = Result<Reduce::FeedProduce, Reduce::Error>;
+
+ fn next(&mut self) -> Option<<Self as Iterator>::Item> {
+ self.receive_result
+ .recv()
+ .ok()
+ .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
+ }
+ }
+
+ impl<R: super::Reduce> super::Finalize for Stepwise<R> {
+ type Reduce = R;
+
+ fn finalize(
+ self,
+ ) -> Result<
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
+ > {
+ Stepwise::finalize(self)
+ }
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+mod stepped {
+ /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
+ /// for details.
+ pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
+ input: InputIter,
+ consume: ConsumeFn,
+ thread_state: ThreadState,
+ reducer: Reduce,
+ }
+
+ impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut S) -> O,
+ Reduce: super::Reduce<Input = O>,
+ {
+ /// Instantiate a new iterator.
+ /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
+ pub fn new<ThreadStateFn>(
+ input: InputIter,
+ _thread_limit: Option<usize>,
+ new_thread_state: ThreadStateFn,
+ consume: ConsumeFn,
+ reducer: Reduce,
+ ) -> Self
+ where
+ ThreadStateFn: Fn(usize) -> S,
+ {
+ Stepwise {
+ input,
+ consume,
+ thread_state: new_thread_state(0),
+ reducer,
+ }
+ }
+
+ /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
+ pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
+ for value in self.by_ref() {
+ drop(value?);
+ }
+ self.reducer.finalize()
+ }
+ }
+
+ impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut ThreadState) -> O,
+ Reduce: super::Reduce<Input = O>,
+ {
+ type Item = Result<Reduce::FeedProduce, Reduce::Error>;
+
+ fn next(&mut self) -> Option<<Self as Iterator>::Item> {
+ self.input
+ .next()
+ .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
+ }
+ }
+
+ impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut S) -> O,
+ R: super::Reduce<Input = O>,
+ {
+ type Reduce = R;
+
+ fn finalize(
+ self,
+ ) -> Result<
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
+ > {
+ Stepwise::finalize(self)
+ }
+ }
+}
+
+use std::marker::PhantomData;
+
+pub use stepped::Stepwise;
+
+/// An trait for aggregating items commonly produced in threads into a single result, without itself
+/// needing to be thread safe.
+pub trait Reduce {
+ /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method.
+ ///
+ /// It's produced by a function that may run on multiple threads.
+ type Input;
+ /// The type produced in Ok(…) by [`feed()`][Reduce::feed()].
+ /// Most reducers by nature use `()` here as the value is in the aggregation.
+ /// However, some may use it to collect statistics only and return their Input
+ /// in some form as a result here for [`Stepwise`] to be useful.
+ type FeedProduce;
+ /// The type produced once by the [`finalize()`][Reduce::finalize()] method.
+ ///
+ /// For traditional reducers, this is the value produced by the entire operation.
+ /// For those made for step-wise iteration this may be aggregated statistics.
+ type Output;
+ /// The error type to use for all methods of this trait.
+ type Error;
+ /// Called each time a new `item` was produced in order to aggregate it into the final result.
+ ///
+ /// If an `Error` is returned, the entire operation will be stopped.
+ fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
+ /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`.
+ fn finalize(self) -> Result<Self::Output, Self::Error>;
+}
+
+/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()]
+/// without the use of non-threaded reduction of products created in threads.
+pub struct IdentityWithResult<Input, Error> {
+ _input: PhantomData<Input>,
+ _error: PhantomData<Error>,
+}
+
+impl<Input, Error> Default for IdentityWithResult<Input, Error> {
+ fn default() -> Self {
+ IdentityWithResult {
+ _input: Default::default(),
+ _error: Default::default(),
+ }
+ }
+}
+
+impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
+ type Input = Result<Input, Self::Error>;
+ type FeedProduce = Input;
+ type Output = ();
+ type Error = Error;
+
+ fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
+ item
+ }
+
+ fn finalize(self) -> Result<Self::Output, Self::Error> {
+ Ok(())
+ }
+}
+
+/// A trait reflecting the `finalize()` method of [`Reduce`] implementations
+pub trait Finalize {
+ /// An implementation of [`Reduce`]
+ type Reduce: self::Reduce;
+
+ /// Similar to the [`Reduce::finalize()`] method
+ fn finalize(
+ self,
+ ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
+}
diff --git a/vendor/gix-features-0.35.0/src/parallel/serial.rs b/vendor/gix-features-0.35.0/src/parallel/serial.rs
new file mode 100644
index 000000000..7665d3ffa
--- /dev/null
+++ b/vendor/gix-features-0.35.0/src/parallel/serial.rs
@@ -0,0 +1,174 @@
+use crate::parallel::Reduce;
+
+#[cfg(not(feature = "parallel"))]
+mod not_parallel {
+ use std::sync::atomic::{AtomicBool, AtomicIsize};
+
+ /// Runs `left` and then `right`, one after another, returning their output when both are done.
+ pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
+ (left(), right())
+ }
+
+ /// A scope for spawning threads.
+ pub struct Scope<'scope, 'env: 'scope> {
+ _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
+ _env: std::marker::PhantomData<&'env mut &'env ()>,
+ }
+
+ pub struct ThreadBuilder;
+
+ /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
+ pub fn build_thread() -> ThreadBuilder {
+ ThreadBuilder
+ }
+
+ #[allow(unsafe_code)]
+ unsafe impl Sync for Scope<'_, '_> {}
+
+ impl ThreadBuilder {
+ pub fn name(self, _new: String) -> Self {
+ self
+ }
+ pub fn spawn_scoped<'scope, 'env, F, T>(
+ &self,
+ scope: &'scope Scope<'scope, 'env>,
+ f: F,
+ ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
+ where
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
+ {
+ Ok(scope.spawn(f))
+ }
+ }
+
+ impl<'scope, 'env> Scope<'scope, 'env> {
+ /// Provided with this scope, let `f` start new threads that live within it.
+ pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ where
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
+ {
+ ScopedJoinHandle {
+ result: f(),
+ _marker: Default::default(),
+ }
+ }
+ }
+
+ /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
+ /// Note that this implementation will run the spawned functions immediately.
+ pub fn threads<'env, F, R>(f: F) -> R
+ where
+ F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
+ {
+ f(&Scope {
+ _scope: Default::default(),
+ _env: Default::default(),
+ })
+ }
+
+ /// A handle that can be used to join its scoped thread.
+ ///
+ /// This struct is created by the [`Scope::spawn`] method and the
+ /// [`ScopedThreadBuilder::spawn`] method.
+ pub struct ScopedJoinHandle<'scope, T> {
+ /// Holds the result of the inner closure.
+ result: T,
+ _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
+ }
+
+ impl<T> ScopedJoinHandle<'_, T> {
+ pub fn join(self) -> std::thread::Result<T> {
+ Ok(self.result)
+ }
+ pub fn is_finished(&self) -> bool {
+ true
+ }
+ }
+
+ /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
+ /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
+ /// for file-io as it won't make use of sorted inputs well.
+ // TODO: better docs
+ pub fn in_parallel_with_slice<I, S, R, E>(
+ input: &mut [I],
+ _thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S + Clone,
+ mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
+ mut periodic: impl FnMut() -> Option<std::time::Duration>,
+ state_to_rval: impl FnOnce(S) -> R + Clone,
+ ) -> Result<Vec<R>, E> {
+ let mut state = new_thread_state(0);
+ let should_interrupt = &AtomicBool::default();
+ let threads_left = &AtomicIsize::default();
+ for item in input {
+ consume(item, &mut state, threads_left, should_interrupt)?;
+ if periodic().is_none() {
+ break;
+ }
+ }
+ Ok(vec![state_to_rval(state)])
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
+
+/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
+/// whose task is to aggregate these outputs into the final result returned by this function.
+///
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
+/// * For `reducer`, see the [`Reduce`] trait
+/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
+/// similar to the parallel version.
+///
+/// **This serial version performing all calculations on the current thread.**
+pub fn in_parallel<I, S, O, R>(
+ input: impl Iterator<Item = I>,
+ _thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S,
+ mut consume: impl FnMut(I, &mut S) -> O,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+{
+ let mut state = new_thread_state(0);
+ for item in input {
+ drop(reducer.feed(consume(item, &mut state))?);
+ }
+ reducer.finalize()
+}
+
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
+/// * For `reducer`, see the [`Reduce`] trait
+#[cfg(not(feature = "parallel"))]
+pub fn in_parallel_with_finalize<I, S, O, R>(
+ input: impl Iterator<Item = I>,
+ _thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S,
+ mut consume: impl FnMut(I, &mut S) -> O,
+ finalize: impl FnOnce(S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+{
+ let mut state = new_thread_state(0);
+ for item in input {
+ drop(reducer.feed(consume(item, &mut state))?);
+ }
+ reducer.feed(finalize(state))?;
+ reducer.finalize()
+}