diff options
Diffstat (limited to 'vendor/gix-features/src/parallel/serial.rs')
-rw-r--r-- | vendor/gix-features/src/parallel/serial.rs | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs index 00723b2c3..3511c73e3 100644 --- a/vendor/gix-features/src/parallel/serial.rs +++ b/vendor/gix-features/src/parallel/serial.rs @@ -2,14 +2,17 @@ use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. - pub struct Scope<'env> { - _marker: std::marker::PhantomData<&'env mut &'env ()>, + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; @@ -20,32 +23,31 @@ mod not_parallel { } #[allow(unsafe_code)] - unsafe impl Sync for Scope<'_> {} + unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } - pub fn spawn_scoped<'a, 'env, F, T>( + pub fn spawn_scoped<'scope, 'env, F, T>( &self, - scope: &'a Scope<'env>, + scope: &'scope Scope<'scope, 'env>, f: F, - ) -> std::io::Result<ScopedJoinHandle<'a, T>> + ) -> std::io::Result<ScopedJoinHandle<'scope, T>> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { Ok(scope.spawn(f)) } } - impl<'env> Scope<'env> { - pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { ScopedJoinHandle { result: f(), @@ -58,10 +60,11 @@ mod not_parallel { /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where - F: FnOnce(&Scope<'env>) -> R, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { - _marker: Default::default(), + _scope: Default::default(), + _env: Default::default(), }) } @@ -79,6 +82,9 @@ mod not_parallel { pub fn join(self) -> std::thread::Result<T> { Ok(self.result) } + pub fn is_finished(&self) -> bool { + true + } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. @@ -89,13 +95,15 @@ mod not_parallel { input: &mut [I], _thread_limit: Option<usize>, mut new_thread_state: impl FnMut(usize) -> S + Clone, - mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration>, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result<Vec<R>, E> { let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); for item in input { - consume(item, &mut state)?; + consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } @@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I>, _thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where |