summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/serial.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gix-features/src/parallel/serial.rs')
-rw-r--r--vendor/gix-features/src/parallel/serial.rs135
1 files changed, 135 insertions, 0 deletions
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
new file mode 100644
index 000000000..00723b2c3
--- /dev/null
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -0,0 +1,135 @@
+use crate::parallel::Reduce;
+
+#[cfg(not(feature = "parallel"))]
+mod not_parallel {
+ /// Runs `left` and then `right`, one after another, returning their output when both are done.
+ pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
+ (left(), right())
+ }
+
+ /// A scope for spawning threads.
+ pub struct Scope<'env> {
+ _marker: std::marker::PhantomData<&'env mut &'env ()>,
+ }
+
+ pub struct ThreadBuilder;
+
+ /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
+ pub fn build_thread() -> ThreadBuilder {
+ ThreadBuilder
+ }
+
+ #[allow(unsafe_code)]
+ unsafe impl Sync for Scope<'_> {}
+
+ impl ThreadBuilder {
+ pub fn name(self, _new: String) -> Self {
+ self
+ }
+ pub fn spawn_scoped<'a, 'env, F, T>(
+ &self,
+ scope: &'a Scope<'env>,
+ f: F,
+ ) -> std::io::Result<ScopedJoinHandle<'a, T>>
+ where
+ F: FnOnce() -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ Ok(scope.spawn(f))
+ }
+ }
+
+ impl<'env> Scope<'env> {
+ pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ where
+ F: FnOnce() -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ ScopedJoinHandle {
+ result: f(),
+ _marker: Default::default(),
+ }
+ }
+ }
+
+ /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
+ /// Note that this implementation will run the spawned functions immediately.
+ pub fn threads<'env, F, R>(f: F) -> R
+ where
+ F: FnOnce(&Scope<'env>) -> R,
+ {
+ f(&Scope {
+ _marker: Default::default(),
+ })
+ }
+
+ /// A handle that can be used to join its scoped thread.
+ ///
+ /// This struct is created by the [`Scope::spawn`] method and the
+ /// [`ScopedThreadBuilder::spawn`] method.
+ pub struct ScopedJoinHandle<'scope, T> {
+ /// Holds the result of the inner closure.
+ result: T,
+ _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
+ }
+
+ impl<T> ScopedJoinHandle<'_, T> {
+ pub fn join(self) -> std::thread::Result<T> {
+ Ok(self.result)
+ }
+ }
+
+ /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
+ /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
+ /// for file-io as it won't make use of sorted inputs well.
+ // TODO: better docs
+ pub fn in_parallel_with_slice<I, S, R, E>(
+ input: &mut [I],
+ _thread_limit: Option<usize>,
+ mut new_thread_state: impl FnMut(usize) -> S + Clone,
+ mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
+ mut periodic: impl FnMut() -> Option<std::time::Duration>,
+ state_to_rval: impl FnOnce(S) -> R + Clone,
+ ) -> Result<Vec<R>, E> {
+ let mut state = new_thread_state(0);
+ for item in input {
+ consume(item, &mut state)?;
+ if periodic().is_none() {
+ break;
+ }
+ }
+ Ok(vec![state_to_rval(state)])
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
+
+/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
+/// whose task is to aggregate these outputs into the final result returned by this function.
+///
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
+/// * For `reducer`, see the [`Reduce`] trait
+/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
+/// similar to the parallel version.
+///
+/// **This serial version performing all calculations on the current thread.**
+pub fn in_parallel<I, S, O, R>(
+ input: impl Iterator<Item = I>,
+ _thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S,
+ consume: impl Fn(I, &mut S) -> O,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+{
+ let mut state = new_thread_state(0);
+ for item in input {
+ drop(reducer.feed(consume(item, &mut state))?);
+ }
+ reducer.finalize()
+}