diff options
Diffstat (limited to 'vendor/gix-features-0.35.0/src/parallel/serial.rs')
-rw-r--r-- | vendor/gix-features-0.35.0/src/parallel/serial.rs | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/vendor/gix-features-0.35.0/src/parallel/serial.rs b/vendor/gix-features-0.35.0/src/parallel/serial.rs new file mode 100644 index 000000000..7665d3ffa --- /dev/null +++ b/vendor/gix-features-0.35.0/src/parallel/serial.rs @@ -0,0 +1,174 @@ +use crate::parallel::Reduce; + +#[cfg(not(feature = "parallel"))] +mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + + /// Runs `left` and then `right`, one after another, returning their output when both are done. + pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { + (left(), right()) + } + + /// A scope for spawning threads. + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, + } + + pub struct ThreadBuilder; + + /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. + pub fn build_thread() -> ThreadBuilder { + ThreadBuilder + } + + #[allow(unsafe_code)] + unsafe impl Sync for Scope<'_, '_> {} + + impl ThreadBuilder { + pub fn name(self, _new: String) -> Self { + self + } + pub fn spawn_scoped<'scope, 'env, F, T>( + &self, + scope: &'scope Scope<'scope, 'env>, + f: F, + ) -> std::io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce() -> T + 'scope, + T: 'scope, + { + Ok(scope.spawn(f)) + } + } + + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + 'scope, + T: 'scope, + { + ScopedJoinHandle { + result: f(), + _marker: Default::default(), + } + } + } + + /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. + /// Note that this implementation will run the spawned functions immediately. + pub fn threads<'env, F, R>(f: F) -> R + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, + { + f(&Scope { + _scope: Default::default(), + _env: Default::default(), + }) + } + + /// A handle that can be used to join its scoped thread. + /// + /// This struct is created by the [`Scope::spawn`] method and the + /// [`ScopedThreadBuilder::spawn`] method. + pub struct ScopedJoinHandle<'scope, T> { + /// Holds the result of the inner closure. + result: T, + _marker: std::marker::PhantomData<&'scope mut &'scope ()>, + } + + impl<T> ScopedJoinHandle<'_, T> { + pub fn join(self) -> std::thread::Result<T> { + Ok(self.result) + } + pub fn is_finished(&self) -> bool { + true + } + } + + /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. + /// This is only good for operations where near-random access isn't detrimental, so it's not usually great + /// for file-io as it won't make use of sorted inputs well. + // TODO: better docs + pub fn in_parallel_with_slice<I, S, R, E>( + input: &mut [I], + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, + mut periodic: impl FnMut() -> Option<std::time::Duration>, + state_to_rval: impl FnOnce(S) -> R + Clone, + ) -> Result<Vec<R>, E> { + let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); + for item in input { + consume(item, &mut state, threads_left, should_interrupt)?; + if periodic().is_none() { + break; + } + } + Ok(vec![state_to_rval(state)]) + } +} + +#[cfg(not(feature = "parallel"))] +pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle}; + +/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`, +/// whose task is to aggregate these outputs into the final result returned by this function. +/// +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state. +/// * For `reducer`, see the [`Reduce`] trait +/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature +/// similar to the parallel version. +/// +/// **This serial version performing all calculations on the current thread.** +pub fn in_parallel<I, S, O, R>( + input: impl Iterator<Item = I>, + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.finalize() +} + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_with_finalize<I, S, O, R>( + input: impl Iterator<Item = I>, + _thread_limit: Option<usize>, + new_thread_state: impl FnOnce(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.feed(finalize(state))?; + reducer.finalize() +} |