summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/serial.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gix-features/src/parallel/serial.rs')
-rw-r--r--vendor/gix-features/src/parallel/serial.rs46
1 files changed, 27 insertions, 19 deletions
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
index 00723b2c3..3511c73e3 100644
--- a/vendor/gix-features/src/parallel/serial.rs
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -2,14 +2,17 @@ use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
+ use std::sync::atomic::{AtomicBool, AtomicIsize};
+
/// Runs `left` and then `right`, one after another, returning their output when both are done.
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
/// A scope for spawning threads.
- pub struct Scope<'env> {
- _marker: std::marker::PhantomData<&'env mut &'env ()>,
+ pub struct Scope<'scope, 'env: 'scope> {
+ _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
+ _env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
@@ -20,32 +23,31 @@ mod not_parallel {
}
#[allow(unsafe_code)]
- unsafe impl Sync for Scope<'_> {}
+ unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
- pub fn spawn_scoped<'a, 'env, F, T>(
+ pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
- scope: &'a Scope<'env>,
+ scope: &'scope Scope<'scope, 'env>,
f: F,
- ) -> std::io::Result<ScopedJoinHandle<'a, T>>
+ ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
Ok(scope.spawn(f))
}
}
- impl<'env> Scope<'env> {
- pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ impl<'scope, 'env> Scope<'scope, 'env> {
+ /// Provided with this scope, let `f` start new threads that live within it.
+ pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
ScopedJoinHandle {
result: f(),
@@ -58,10 +60,11 @@ mod not_parallel {
/// Note that this implementation will run the spawned functions immediately.
pub fn threads<'env, F, R>(f: F) -> R
where
- F: FnOnce(&Scope<'env>) -> R,
+ F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
- _marker: Default::default(),
+ _scope: Default::default(),
+ _env: Default::default(),
})
}
@@ -79,6 +82,9 @@ mod not_parallel {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
+ pub fn is_finished(&self) -> bool {
+ true
+ }
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
@@ -89,13 +95,15 @@ mod not_parallel {
input: &mut [I],
_thread_limit: Option<usize>,
mut new_thread_state: impl FnMut(usize) -> S + Clone,
- mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
+ mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
+ let should_interrupt = &AtomicBool::default();
+ let threads_left = &AtomicIsize::default();
for item in input {
- consume(item, &mut state)?;
+ consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
@@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
- consume: impl Fn(I, &mut S) -> O,
+ mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where