summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/serial.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 18:31:44 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 18:31:44 +0000
commitc23a457e72abe608715ac76f076f47dc42af07a5 (patch)
tree2772049aaf84b5c9d0ed12ec8d86812f7a7904b6 /vendor/gix-features/src/parallel/serial.rs
parentReleasing progress-linux version 1.73.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-c23a457e72abe608715ac76f076f47dc42af07a5.tar.xz
rustc-c23a457e72abe608715ac76f076f47dc42af07a5.zip
Merging upstream version 1.74.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src/parallel/serial.rs')
-rw-r--r--vendor/gix-features/src/parallel/serial.rs35
1 files changed, 33 insertions, 2 deletions
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
index 3511c73e3..7665d3ffa 100644
--- a/vendor/gix-features/src/parallel/serial.rs
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -94,7 +94,7 @@ mod not_parallel {
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
_thread_limit: Option<usize>,
- mut new_thread_state: impl FnMut(usize) -> S + Clone,
+ new_thread_state: impl FnOnce(usize) -> S + Clone,
mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
@@ -128,7 +128,7 @@ pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scop
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
- new_thread_state: impl Fn(usize) -> S,
+ new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
@@ -141,3 +141,34 @@ where
}
reducer.finalize()
}
+
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
+/// * For `reducer`, see the [`Reduce`] trait
+#[cfg(not(feature = "parallel"))]
+pub fn in_parallel_with_finalize<I, S, O, R>(
+ input: impl Iterator<Item = I>,
+ _thread_limit: Option<usize>,
+ new_thread_state: impl FnOnce(usize) -> S,
+ mut consume: impl FnMut(I, &mut S) -> O,
+ finalize: impl FnOnce(S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+{
+ let mut state = new_thread_state(0);
+ for item in input {
+ drop(reducer.feed(consume(item, &mut state))?);
+ }
+ reducer.feed(finalize(state))?;
+ reducer.finalize()
+}