diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/gix-features/src/parallel | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src/parallel')
-rw-r--r-- | vendor/gix-features/src/parallel/in_parallel.rs | 66 | ||||
-rw-r--r-- | vendor/gix-features/src/parallel/mod.rs | 47 | ||||
-rw-r--r-- | vendor/gix-features/src/parallel/serial.rs | 46 |
3 files changed, 90 insertions, 69 deletions
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs index e1e2cc3e3..241565b62 100644 --- a/vendor/gix-features/src/parallel/in_parallel.rs +++ b/vendor/gix-features/src/parallel/in_parallel.rs @@ -1,7 +1,10 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::parallel::{num_threads, Reduce}; +/// A scope to start threads within. +pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>; + /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { std::thread::scope(|s| { @@ -47,7 +50,7 @@ pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where @@ -67,7 +70,7 @@ where let send_result = send_result.clone(); let receive_input = receive_input.clone(); let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut consume = consume.clone(); move || { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -103,12 +106,19 @@ where /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. /// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation. +/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`), +/// which allows callees to implement their own work-stealing in case the work is distributed unevenly. +/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice +/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads, +/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust +/// the thread-count accordingly. // TODO: better docs pub fn in_parallel_with_slice<I, S, R, E>( input: &mut [I], thread_limit: Option<usize>, new_thread_state: impl FnMut(usize) -> S + Send + Clone, - consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone, + consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration> + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, ) -> Result<Vec<R>, E> @@ -121,8 +131,8 @@ where let mut results = Vec::with_capacity(num_threads); let stop_everything = &AtomicBool::default(); let index = &AtomicUsize::default(); + let threads_left = &AtomicIsize::new(num_threads as isize); - // TODO: use std::thread::scope() once Rust 1.63 is available. std::thread::scope({ move |s| { std::thread::Builder::new() @@ -163,29 +173,35 @@ where let mut consume = consume.clone(); let input = Input(input as *mut [I]); move || { + let _ = &input; + threads_left.fetch_sub(1, Ordering::SeqCst); let mut state = new_thread_state(thread_id); - while let Ok(input_index) = - index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - (x < input_len).then_some(x + 1) - }) - { - if stop_everything.load(Ordering::Relaxed) { - break; - } - // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding - // each item exactly once. - let item = { - #[allow(unsafe_code)] - unsafe { - &mut (&mut *input.0)[input_index] + let res = (|| { + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state, threads_left, stop_everything) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); } - }; - if let Err(err) = consume(item, &mut state) { - stop_everything.store(true, Ordering::Relaxed); - return Err(err); } - } - Ok(state_to_rval(state)) + Ok(state_to_rval(state)) + })(); + threads_left.fetch_add(1, Ordering::SeqCst); + res } }) .expect("valid name") diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs index c994cb3b8..ac644acdd 100644 --- a/vendor/gix-features/src/parallel/mod.rs +++ b/vendor/gix-features/src/parallel/mod.rs @@ -1,12 +1,12 @@ //! Run computations in parallel, or not based the `parallel` feature toggle. //! -//! ### in_parallel(…) +//! ### `in_parallel`(…) //! //! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage //! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling //! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. //! -//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail. +//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail. //! //! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself //! or the data to work on. @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -80,13 +80,21 @@ pub fn optimize_chunk_size_and_thread_limit( ) -> (usize, Option<usize>, usize) { let available_threads = available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); - let available_threads = thread_limit - .map(|l| if l == 0 { available_threads } else { l }) - .unwrap_or(available_threads); + let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); let (lower, upper) = (50, 1000); - let (chunk_size, thread_limit) = num_items - .map(|num_items| { + let (chunk_size, thread_limit) = num_items.map_or( + { + let chunk_size = if available_threads == 1 { + desired_chunk_size + } else if desired_chunk_size < lower { + lower + } else { + desired_chunk_size.min(upper) + }; + (chunk_size, available_threads) + }, + |num_items| { let desired_chunks_per_thread_at_least = 2; let items = num_items; let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); @@ -97,17 +105,8 @@ pub fn optimize_chunk_size_and_thread_limit( available_threads }; (chunk_size, thread_limit) - }) - .unwrap_or({ - let chunk_size = if available_threads == 1 { - desired_chunk_size - } else if desired_chunk_size < lower { - lower - } else { - desired_chunk_size.min(upper) - }; - (chunk_size, available_threads) - }); + }, + ); (chunk_size, Some(thread_limit), thread_limit) } @@ -123,9 +122,7 @@ pub fn num_threads(_thread_limit: Option<usize>) -> usize { #[cfg(feature = "parallel")] pub fn num_threads(thread_limit: Option<usize>) -> usize { let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); - thread_limit - .map(|l| if l == 0 { logical_cores } else { l }) - .unwrap_or(logical_cores) + thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) } /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. @@ -137,7 +134,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where @@ -163,7 +160,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I>, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs index 00723b2c3..3511c73e3 100644 --- a/vendor/gix-features/src/parallel/serial.rs +++ b/vendor/gix-features/src/parallel/serial.rs @@ -2,14 +2,17 @@ use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. - pub struct Scope<'env> { - _marker: std::marker::PhantomData<&'env mut &'env ()>, + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; @@ -20,32 +23,31 @@ mod not_parallel { } #[allow(unsafe_code)] - unsafe impl Sync for Scope<'_> {} + unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } - pub fn spawn_scoped<'a, 'env, F, T>( + pub fn spawn_scoped<'scope, 'env, F, T>( &self, - scope: &'a Scope<'env>, + scope: &'scope Scope<'scope, 'env>, f: F, - ) -> std::io::Result<ScopedJoinHandle<'a, T>> + ) -> std::io::Result<ScopedJoinHandle<'scope, T>> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { Ok(scope.spawn(f)) } } - impl<'env> Scope<'env> { - pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { ScopedJoinHandle { result: f(), @@ -58,10 +60,11 @@ mod not_parallel { /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where - F: FnOnce(&Scope<'env>) -> R, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { - _marker: Default::default(), + _scope: Default::default(), + _env: Default::default(), }) } @@ -79,6 +82,9 @@ mod not_parallel { pub fn join(self) -> std::thread::Result<T> { Ok(self.result) } + pub fn is_finished(&self) -> bool { + true + } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. @@ -89,13 +95,15 @@ mod not_parallel { input: &mut [I], _thread_limit: Option<usize>, mut new_thread_state: impl FnMut(usize) -> S + Clone, - mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration>, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result<Vec<R>, E> { let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); for item in input { - consume(item, &mut state)?; + consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } @@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I>, _thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where |