diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-07 05:48:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-07 05:48:48 +0000 |
commit | ef24de24a82fe681581cc130f342363c47c0969a (patch) | |
tree | 0d494f7e1a38b95c92426f58fe6eaa877303a86c /vendor/gix-features-0.35.0/src/parallel | |
parent | Releasing progress-linux version 1.74.1+dfsg1-1~progress7.99u1. (diff) | |
download | rustc-ef24de24a82fe681581cc130f342363c47c0969a.tar.xz rustc-ef24de24a82fe681581cc130f342363c47c0969a.zip |
Merging upstream version 1.75.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features-0.35.0/src/parallel')
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/eager_iter.rs | 124 | ||||
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/in_order.rs | 83 | ||||
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/in_parallel.rs | 301 | ||||
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/mod.rs | 178 | ||||
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/reduce.rs | 279 | ||||
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/serial.rs | 174 |
6 files changed, 1139 insertions, 0 deletions
diff --git a/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs b/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs new file mode 100644 index 000000000..9a1735f72 --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/eager_iter.rs @@ -0,0 +1,124 @@ +/// Evaluate any iterator in their own thread. +/// +/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations. +/// Use [`EagerIter::new()`] for instantiation. +pub struct EagerIter<I: Iterator> { + receiver: std::sync::mpsc::Receiver<Vec<I::Item>>, + chunk: Option<std::vec::IntoIter<I::Item>>, + size_hint: (usize, Option<usize>), +} + +impl<I> EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIter` which evaluates `iter` in its own thread, + /// with a given `chunk_size` allowing a maximum `chunks_in_flight`. + /// + /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`. + /// This helps to reduce the overhead imposed by transferring many small items. + /// If this number is 1, each item will become a single chunk. 0 is invalid. + /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items + /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter` + /// if its items aren't consumed fast enough. + pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight); + let size_hint = iter.size_hint(); + assert!(chunk_size > 0, "non-zero chunk size is needed"); + + std::thread::spawn(move || { + let mut out = Vec::with_capacity(chunk_size); + for item in iter { + out.push(item); + if out.len() == chunk_size { + if sender.send(out).is_err() { + return; + } + out = Vec::with_capacity(chunk_size); + } + } + if !out.is_empty() { + sender.send(out).ok(); + } + }); + EagerIter { + receiver, + chunk: None, + size_hint, + } + } + + fn fill_buf_and_pop(&mut self) -> Option<I::Item> { + self.chunk = self.receiver.recv().ok().map(|v| { + assert!(!v.is_empty()); + v.into_iter() + }); + self.chunk.as_mut().and_then(Iterator::next) + } +} + +impl<I> Iterator for EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self.chunk.as_mut() { + Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()), + None => self.fill_buf_and_pop(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.size_hint + } +} + +/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition. +pub enum EagerIterIf<I: Iterator> { + /// A separate thread will eagerly evaluate iterator `I`. + Eager(EagerIter<I>), + /// The current thread evaluates `I`. + OnDemand(I), +} + +impl<I> EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIterIf` if `condition()` returns true. + /// + /// For all other parameters, please see [`EagerIter::new()`]. + pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + if condition() { + EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight)) + } else { + EagerIterIf::OnDemand(iter) + } + } +} +impl<I> Iterator for EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self { + EagerIterIf::OnDemand(i) => i.next(), + EagerIterIf::Eager(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + EagerIterIf::OnDemand(i) => i.size_hint(), + EagerIterIf::Eager(i) => i.size_hint(), + } + } +} diff --git a/vendor/gix-features-0.35.0/src/parallel/in_order.rs b/vendor/gix-features-0.35.0/src/parallel/in_order.rs new file mode 100644 index 000000000..7928ac692 --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/in_order.rs @@ -0,0 +1,83 @@ +use std::{cmp::Ordering, collections::BTreeMap}; + +/// A counter for items that are in sequence, to be able to put them back into original order later. +pub type SequenceId = usize; + +/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order. +pub struct InOrderIter<T, I> { + /// The iterator yielding the out-of-order elements we are to yield in order. + pub inner: I, + store: BTreeMap<SequenceId, T>, + next_chunk: SequenceId, + is_done: bool, +} + +impl<T, E, I> From<I> for InOrderIter<T, I> +where + I: Iterator<Item = Result<(SequenceId, T), E>>, +{ + fn from(iter: I) -> Self { + InOrderIter { + inner: iter, + store: Default::default(), + next_chunk: 0, + is_done: false, + } + } +} + +impl<T, E, I> Iterator for InOrderIter<T, I> +where + I: Iterator<Item = Result<(SequenceId, T), E>>, +{ + type Item = Result<T, E>; + + fn next(&mut self) -> Option<Self::Item> { + if self.is_done { + return None; + } + 'find_next_in_sequence: loop { + match self.inner.next() { + Some(Ok((c, v))) => match c.cmp(&self.next_chunk) { + Ordering::Equal => { + self.next_chunk += 1; + return Some(Ok(v)); + } + Ordering::Less => { + unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c) + } + Ordering::Greater => { + let previous = self.store.insert(c, v); + assert!( + previous.is_none(), + "Chunks are returned only once, input is an invalid sequence" + ); + if let Some(v) = self.store.remove(&self.next_chunk) { + self.next_chunk += 1; + return Some(Ok(v)); + } + continue 'find_next_in_sequence; + } + }, + Some(Err(e)) => { + self.is_done = true; + self.store.clear(); + return Some(Err(e)); + } + None => match self.store.remove(&self.next_chunk) { + Some(v) => { + self.next_chunk += 1; + return Some(Ok(v)); + } + None => { + debug_assert!( + self.store.is_empty(), + "When iteration is done we should not have stored items left" + ); + return None; + } + }, + } + } + } +} diff --git a/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs b/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs new file mode 100644 index 000000000..a80762bad --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/in_parallel.rs @@ -0,0 +1,301 @@ +use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; + +use crate::parallel::{num_threads, Reduce}; + +/// A scope to start threads within. +pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>; + +/// Runs `left` and `right` in parallel, returning their output when both are done. +pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { + std::thread::scope(|s| { + let left = std::thread::Builder::new() + .name("gitoxide.join.left".into()) + .spawn_scoped(s, left) + .expect("valid name"); + let right = std::thread::Builder::new() + .name("gitoxide.join.right".into()) + .spawn_scoped(s, right) + .expect("valid name"); + (left.join().unwrap(), right.join().unwrap()) + }) +} + +/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. +/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with. +/// +/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not +/// connect each other with channels as deadlock would occur in single-threaded mode. +pub fn threads<'env, F, R>(f: F) -> R +where + F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R, +{ + std::thread::scope(f) +} + +/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. +pub fn build_thread() -> std::thread::Builder { + std::thread::Builder::new() +} + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * For `reducer`, see the [`Reduce`] trait +pub fn in_parallel<I, S, O, R>( + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + let num_threads = num_threads(thread_limit); + std::thread::scope(move |s| { + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads); + for thread_id in 0..num_threads { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel.produce.{thread_id}")) + .spawn_scoped(s, { + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); + move || { + let mut state = new_thread_state(thread_id); + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + break; + } + } + } + }) + .expect("valid name"); + } + std::thread::Builder::new() + .name("gitoxide.in_parallel.feed".into()) + .spawn_scoped(s, move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + }) + .expect("valid name"); + receive_result + }; + + for item in receive_result { + drop(reducer.feed(item)?); + } + reducer.finalize() + }) +} + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +pub fn in_parallel_with_finalize<I, S, O, R>( + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + let num_threads = num_threads(thread_limit); + std::thread::scope(move |s| { + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads); + for thread_id in 0..num_threads { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel.produce.{thread_id}")) + .spawn_scoped(s, { + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); + let finalize = finalize.clone(); + move || { + let mut state = new_thread_state(thread_id); + let mut can_send = true; + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + can_send = false; + break; + } + } + if can_send { + send_result.send(finalize(state)).ok(); + } + } + }) + .expect("valid name"); + } + std::thread::Builder::new() + .name("gitoxide.in_parallel.feed".into()) + .spawn_scoped(s, move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + }) + .expect("valid name"); + receive_result + }; + + for item in receive_result { + drop(reducer.feed(item)?); + } + reducer.finalize() + }) +} + +/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. +/// This is only good for operations where near-random access isn't detrimental, so it's not usually great +/// for file-io as it won't make use of sorted inputs well. +/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation. +/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`), +/// which allows callees to implement their own work-stealing in case the work is distributed unevenly. +/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice +/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads, +/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust +/// the thread-count accordingly. +// TODO: better docs +pub fn in_parallel_with_slice<I, S, R, E>( + input: &mut [I], + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, + mut periodic: impl FnMut() -> Option<std::time::Duration> + Send, + state_to_rval: impl FnOnce(S) -> R + Send + Clone, +) -> Result<Vec<R>, E> +where + I: Send, + E: Send, + R: Send, +{ + let num_threads = num_threads(thread_limit); + let mut results = Vec::with_capacity(num_threads); + let stop_everything = &AtomicBool::default(); + let index = &AtomicUsize::default(); + let threads_left = &AtomicIsize::new(num_threads as isize); + + std::thread::scope({ + move |s| { + std::thread::Builder::new() + .name("gitoxide.in_parallel_with_slice.watch-interrupts".into()) + .spawn_scoped(s, { + move || loop { + if stop_everything.load(Ordering::Relaxed) { + break; + } + + match periodic() { + Some(duration) => std::thread::sleep(duration), + None => { + stop_everything.store(true, Ordering::Relaxed); + break; + } + } + } + }) + .expect("valid name"); + + let input_len = input.len(); + struct Input<I>(*mut [I]) + where + I: Send; + + // SAFETY: I is Send + Sync, so is a *mut [I] + #[allow(unsafe_code)] + unsafe impl<I> Send for Input<I> where I: Send {} + + let threads: Vec<_> = (0..num_threads) + .map(|thread_id| { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}")) + .spawn_scoped(s, { + let new_thread_state = new_thread_state.clone(); + let state_to_rval = state_to_rval.clone(); + let mut consume = consume.clone(); + let input = Input(input as *mut [I]); + move || { + let _ = &input; + threads_left.fetch_sub(1, Ordering::SeqCst); + let mut state = new_thread_state(thread_id); + let res = (|| { + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state, threads_left, stop_everything) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); + } + } + Ok(state_to_rval(state)) + })(); + threads_left.fetch_add(1, Ordering::SeqCst); + res + } + }) + .expect("valid name") + }) + .collect(); + for thread in threads { + match thread.join() { + Ok(res) => { + results.push(res?); + } + Err(err) => { + // a panic happened, stop the world gracefully (even though we panic later) + stop_everything.store(true, Ordering::Relaxed); + std::panic::resume_unwind(err); + } + } + } + + stop_everything.store(true, Ordering::Relaxed); + Ok(results) + } + }) +} diff --git a/vendor/gix-features-0.35.0/src/parallel/mod.rs b/vendor/gix-features-0.35.0/src/parallel/mod.rs new file mode 100644 index 000000000..5a0a4b589 --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/mod.rs @@ -0,0 +1,178 @@ +//! Run computations in parallel, or not based the `parallel` feature toggle. +//! +//! ### `in_parallel`(…) +//! +//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage +//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling +//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. +//! +//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail. +//! +//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself +//! or the data to work on. +//! +//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running +//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output +//! aggregation. +//! +//! ### `reduce::Stepwise` +//! +//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by +//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not +//! buffered, the owner of the iterator controls the progress made. +//! +//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which +//! is functionally equivalent to calling [`in_parallel()`]. +//! +//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping +//! the iterator will wind down the computation without any result. +//! +//! #### Maintaining Safety +//! +//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce +//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce +//! suitable input iterators as long as they can hold something on the heap. +#[cfg(feature = "parallel")] +mod in_parallel; +#[cfg(feature = "parallel")] +pub use in_parallel::{ + build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope, +}; + +mod serial; +#[cfg(not(feature = "parallel"))] +pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope}; + +mod in_order; +pub use in_order::{InOrderIter, SequenceId}; + +mod eager_iter; +pub use eager_iter::{EagerIter, EagerIterIf}; + +/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used +/// when the `parallel` feature toggle is not set. +#[cfg(not(feature = "parallel"))] +pub fn optimize_chunk_size_and_thread_limit( + desired_chunk_size: usize, + _num_items: Option<usize>, + thread_limit: Option<usize>, + _available_threads: Option<usize>, +) -> (usize, Option<usize>, usize) { + (desired_chunk_size, thread_limit, num_threads(thread_limit)) +} + +/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given +/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`. +/// +/// * `desired_chunk_size` is the amount of items per chunk you think should be used. +/// * `num_items` is the total amount of items in the iteration, if `Some`. +/// Otherwise this knowledge will not affect the output of this function. +/// * `thread_limit` is the amount of threads to use at most, if `Some`. +/// Otherwise this knowledge will not affect the output of this function. +/// * `available_threads` is the total amount of threads available, if `Some`. +/// Otherwise the actual amount of available threads is determined by querying the system. +/// +/// `Note` that this implementation is available only if the `parallel` feature toggle is set. +#[cfg(feature = "parallel")] +pub fn optimize_chunk_size_and_thread_limit( + desired_chunk_size: usize, + num_items: Option<usize>, + thread_limit: Option<usize>, + available_threads: Option<usize>, +) -> (usize, Option<usize>, usize) { + let available_threads = + available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into)); + let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); + + let (lower, upper) = (50, 1000); + let (chunk_size, thread_limit) = num_items.map_or( + { + let chunk_size = if available_threads == 1 { + desired_chunk_size + } else if desired_chunk_size < lower { + lower + } else { + desired_chunk_size.min(upper) + }; + (chunk_size, available_threads) + }, + |num_items| { + let desired_chunks_per_thread_at_least = 2; + let items = num_items; + let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); + let num_chunks = items / chunk_size; + let thread_limit = if num_chunks <= available_threads { + (num_chunks / desired_chunks_per_thread_at_least).max(1) + } else { + available_threads + }; + (chunk_size, thread_limit) + }, + ); + (chunk_size, Some(thread_limit), thread_limit) +} + +/// Always returns 1, available when the `parallel` feature toggle is unset. +#[cfg(not(feature = "parallel"))] +pub fn num_threads(_thread_limit: Option<usize>) -> usize { + 1 +} + +/// Returns the amount of threads the system can effectively use as the amount of its logical cores. +/// +/// Only available with the `parallel` feature toggle set. +#[cfg(feature = "parallel")] +pub fn num_threads(thread_limit: Option<usize>) -> usize { + let logical_cores = std::thread::available_parallelism().map_or(1, Into::into); + thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) +} + +/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. +/// +/// For parameters, see the documentation of [`in_parallel()`] +#[cfg(feature = "parallel")] +pub fn in_parallel_if<I, S, O, R>( + condition: impl FnOnce() -> bool, + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, + reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + if num_threads(thread_limit) > 1 && condition() { + in_parallel(input, thread_limit, new_thread_state, consume, reducer) + } else { + serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) + } +} + +/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. +/// +/// For parameters, see the documentation of [`in_parallel()`] +/// +/// Note that the non-parallel version is equivalent to [`in_parallel()`]. +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_if<I, S, O, R>( + _condition: impl FnOnce() -> bool, + input: impl Iterator<Item = I>, + thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S, + consume: impl FnMut(I, &mut S) -> O, + reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) +} + +/// +pub mod reduce; +pub use reduce::Reduce; diff --git a/vendor/gix-features-0.35.0/src/parallel/reduce.rs b/vendor/gix-features-0.35.0/src/parallel/reduce.rs new file mode 100644 index 000000000..f9992cfd2 --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/reduce.rs @@ -0,0 +1,279 @@ +#[cfg(feature = "parallel")] +mod stepped { + use crate::parallel::num_threads; + + /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] + /// for details. + pub struct Stepwise<Reduce: super::Reduce> { + /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops + /// as sending results fails when the receiver is dropped. + receive_result: std::sync::mpsc::Receiver<Reduce::Input>, + /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When + /// that happens, they break out of their loops. + threads: Vec<std::thread::JoinHandle<()>>, + /// The reducer is called only in the thread using the iterator, dropping it has no side effects. + reducer: Option<Reduce>, + } + + impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> { + fn drop(&mut self) { + let (_, sink) = std::sync::mpsc::channel(); + drop(std::mem::replace(&mut self.receive_result, sink)); + + let mut last_err = None; + for handle in std::mem::take(&mut self.threads) { + if let Err(err) = handle.join() { + last_err = Some(err); + }; + } + if let Some(thread_err) = last_err { + std::panic::resume_unwind(thread_err); + } + } + } + + impl<Reduce: super::Reduce> Stepwise<Reduce> { + /// Instantiate a new iterator and start working in threads. + /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. + pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>( + input: InputIter, + thread_limit: Option<usize>, + new_thread_state: ThreadStateFn, + consume: ConsumeFn, + reducer: Reduce, + ) -> Self + where + InputIter: Iterator<Item = I> + Send + 'static, + ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static, + ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static, + Reduce: super::Reduce<Input = O> + 'static, + I: Send + 'static, + O: Send + 'static, + { + let num_threads = num_threads(thread_limit); + let mut threads = Vec::with_capacity(num_threads + 1); + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads); + for thread_id in 0..num_threads { + let handle = std::thread::spawn({ + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let consume = consume.clone(); + move || { + let mut state = new_thread_state(thread_id); + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + break; + } + } + } + }); + threads.push(handle); + } + threads.push(std::thread::spawn(move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + })); + receive_result + }; + Stepwise { + threads, + receive_result, + reducer: Some(reducer), + } + } + + /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. + pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { + for value in self.by_ref() { + drop(value?); + } + self.reducer + .take() + .expect("this is the last call before consumption") + .finalize() + } + } + + impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> { + type Item = Result<Reduce::FeedProduce, Reduce::Error>; + + fn next(&mut self) -> Option<<Self as Iterator>::Item> { + self.receive_result + .recv() + .ok() + .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input))) + } + } + + impl<R: super::Reduce> super::Finalize for Stepwise<R> { + type Reduce = R; + + fn finalize( + self, + ) -> Result< + <<Self as super::Finalize>::Reduce as super::Reduce>::Output, + <<Self as super::Finalize>::Reduce as super::Reduce>::Error, + > { + Stepwise::finalize(self) + } + } +} + +#[cfg(not(feature = "parallel"))] +mod stepped { + /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] + /// for details. + pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> { + input: InputIter, + consume: ConsumeFn, + thread_state: ThreadState, + reducer: Reduce, + } + + impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut S) -> O, + Reduce: super::Reduce<Input = O>, + { + /// Instantiate a new iterator. + /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. + pub fn new<ThreadStateFn>( + input: InputIter, + _thread_limit: Option<usize>, + new_thread_state: ThreadStateFn, + consume: ConsumeFn, + reducer: Reduce, + ) -> Self + where + ThreadStateFn: Fn(usize) -> S, + { + Stepwise { + input, + consume, + thread_state: new_thread_state(0), + reducer, + } + } + + /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. + pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { + for value in self.by_ref() { + drop(value?); + } + self.reducer.finalize() + } + } + + impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut ThreadState) -> O, + Reduce: super::Reduce<Input = O>, + { + type Item = Result<Reduce::FeedProduce, Reduce::Error>; + + fn next(&mut self) -> Option<<Self as Iterator>::Item> { + self.input + .next() + .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state))) + } + } + + impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut S) -> O, + R: super::Reduce<Input = O>, + { + type Reduce = R; + + fn finalize( + self, + ) -> Result< + <<Self as super::Finalize>::Reduce as super::Reduce>::Output, + <<Self as super::Finalize>::Reduce as super::Reduce>::Error, + > { + Stepwise::finalize(self) + } + } +} + +use std::marker::PhantomData; + +pub use stepped::Stepwise; + +/// An trait for aggregating items commonly produced in threads into a single result, without itself +/// needing to be thread safe. +pub trait Reduce { + /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method. + /// + /// It's produced by a function that may run on multiple threads. + type Input; + /// The type produced in Ok(…) by [`feed()`][Reduce::feed()]. + /// Most reducers by nature use `()` here as the value is in the aggregation. + /// However, some may use it to collect statistics only and return their Input + /// in some form as a result here for [`Stepwise`] to be useful. + type FeedProduce; + /// The type produced once by the [`finalize()`][Reduce::finalize()] method. + /// + /// For traditional reducers, this is the value produced by the entire operation. + /// For those made for step-wise iteration this may be aggregated statistics. + type Output; + /// The error type to use for all methods of this trait. + type Error; + /// Called each time a new `item` was produced in order to aggregate it into the final result. + /// + /// If an `Error` is returned, the entire operation will be stopped. + fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>; + /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`. + fn finalize(self) -> Result<Self::Output, Self::Error>; +} + +/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()] +/// without the use of non-threaded reduction of products created in threads. +pub struct IdentityWithResult<Input, Error> { + _input: PhantomData<Input>, + _error: PhantomData<Error>, +} + +impl<Input, Error> Default for IdentityWithResult<Input, Error> { + fn default() -> Self { + IdentityWithResult { + _input: Default::default(), + _error: Default::default(), + } + } +} + +impl<Input, Error> Reduce for IdentityWithResult<Input, Error> { + type Input = Result<Input, Self::Error>; + type FeedProduce = Input; + type Output = (); + type Error = Error; + + fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> { + item + } + + fn finalize(self) -> Result<Self::Output, Self::Error> { + Ok(()) + } +} + +/// A trait reflecting the `finalize()` method of [`Reduce`] implementations +pub trait Finalize { + /// An implementation of [`Reduce`] + type Reduce: self::Reduce; + + /// Similar to the [`Reduce::finalize()`] method + fn finalize( + self, + ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>; +} diff --git a/vendor/gix-features-0.35.0/src/parallel/serial.rs b/vendor/gix-features-0.35.0/src/parallel/serial.rs new file mode 100644 index 000000000..7665d3ffa --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/serial.rs @@ -0,0 +1,174 @@ +use crate::parallel::Reduce; + +#[cfg(not(feature = "parallel"))] +mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + + /// Runs `left` and then `right`, one after another, returning their output when both are done. + pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { + (left(), right()) + } + + /// A scope for spawning threads. + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, + } + + pub struct ThreadBuilder; + + /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. + pub fn build_thread() -> ThreadBuilder { + ThreadBuilder + } + + #[allow(unsafe_code)] + unsafe impl Sync for Scope<'_, '_> {} + + impl ThreadBuilder { + pub fn name(self, _new: String) -> Self { + self + } + pub fn spawn_scoped<'scope, 'env, F, T>( + &self, + scope: &'scope Scope<'scope, 'env>, + f: F, + ) -> std::io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce() -> T + 'scope, + T: 'scope, + { + Ok(scope.spawn(f)) + } + } + + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + 'scope, + T: 'scope, + { + ScopedJoinHandle { + result: f(), + _marker: Default::default(), + } + } + } + + /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. + /// Note that this implementation will run the spawned functions immediately. + pub fn threads<'env, F, R>(f: F) -> R + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, + { + f(&Scope { + _scope: Default::default(), + _env: Default::default(), + }) + } + + /// A handle that can be used to join its scoped thread. + /// + /// This struct is created by the [`Scope::spawn`] method and the + /// [`ScopedThreadBuilder::spawn`] method. + pub struct ScopedJoinHandle<'scope, T> { + /// Holds the result of the inner closure. + result: T, + _marker: std::marker::PhantomData<&'scope mut &'scope ()>, + } + + impl<T> ScopedJoinHandle<'_, T> { + pub fn join(self) -> std::thread::Result<T> { + Ok(self.result) + } + pub fn is_finished(&self) -> bool { + true + } + } + + /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. + /// This is only good for operations where near-random access isn't detrimental, so it's not usually great + /// for file-io as it won't make use of sorted inputs well. + // TODO: better docs + pub fn in_parallel_with_slice<I, S, R, E>( + input: &mut [I], + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, + mut periodic: impl FnMut() -> Option<std::time::Duration>, + state_to_rval: impl FnOnce(S) -> R + Clone, + ) -> Result<Vec<R>, E> { + let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); + for item in input { + consume(item, &mut state, threads_left, should_interrupt)?; + if periodic().is_none() { + break; + } + } + Ok(vec![state_to_rval(state)]) + } +} + +#[cfg(not(feature = "parallel"))] +pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle}; + +/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`, +/// whose task is to aggregate these outputs into the final result returned by this function. +/// +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state. +/// * For `reducer`, see the [`Reduce`] trait +/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature +/// similar to the parallel version. +/// +/// **This serial version performing all calculations on the current thread.** +pub fn in_parallel<I, S, O, R>( + input: impl Iterator<Item = I>, + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.finalize() +} + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_with_finalize<I, S, O, R>( + input: impl Iterator<Item = I>, + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.feed(finalize(state))?; + reducer.finalize() +} |