diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/gix-features/src | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src')
-rw-r--r-- | vendor/gix-features/src/fs.rs | 28 | ||||
-rw-r--r-- | vendor/gix-features/src/parallel/in_parallel.rs | 66 | ||||
-rw-r--r-- | vendor/gix-features/src/parallel/mod.rs | 47 | ||||
-rw-r--r-- | vendor/gix-features/src/parallel/serial.rs | 46 | ||||
-rw-r--r-- | vendor/gix-features/src/progress.rs | 2 | ||||
-rw-r--r-- | vendor/gix-features/src/threading.rs | 2 | ||||
-rw-r--r-- | vendor/gix-features/src/zlib/mod.rs | 5 | ||||
-rw-r--r-- | vendor/gix-features/src/zlib/stream/deflate/mod.rs | 19 | ||||
-rw-r--r-- | vendor/gix-features/src/zlib/stream/deflate/tests.rs | 4 | ||||
-rw-r--r-- | vendor/gix-features/src/zlib/stream/inflate.rs | 6 |
10 files changed, 135 insertions, 90 deletions
diff --git a/vendor/gix-features/src/fs.rs b/vendor/gix-features/src/fs.rs index f07ac1f0f..20d819547 100644 --- a/vendor/gix-features/src/fs.rs +++ b/vendor/gix-features/src/fs.rs @@ -38,19 +38,21 @@ pub mod walkdir { match v { Parallelism::Serial => jwalk::Parallelism::Serial, Parallelism::ThreadPoolPerTraversal { thread_name } => std::thread::available_parallelism() - .map(|threads| { - let pool = jwalk::rayon::ThreadPoolBuilder::new() - .num_threads(threads.get().min(16)) - .stack_size(128 * 1024) - .thread_name(move |idx| format!("{thread_name} {idx}")) - .build() - .expect("we only set options that can't cause a build failure"); - jwalk::Parallelism::RayonExistingPool { - pool: pool.into(), - busy_timeout: None, - } - }) - .unwrap_or_else(|_| Parallelism::Serial.into()), + .map_or_else( + |_| Parallelism::Serial.into(), + |threads| { + let pool = jwalk::rayon::ThreadPoolBuilder::new() + .num_threads(threads.get().min(16)) + .stack_size(128 * 1024) + .thread_name(move |idx| format!("{thread_name} {idx}")) + .build() + .expect("we only set options that can't cause a build failure"); + jwalk::Parallelism::RayonExistingPool { + pool: pool.into(), + busy_timeout: None, + } + }, + ), } } } diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs index e1e2cc3e3..241565b62 100644 --- a/vendor/gix-features/src/parallel/in_parallel.rs +++ b/vendor/gix-features/src/parallel/in_parallel.rs @@ -1,7 +1,10 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::parallel::{num_threads, Reduce}; +/// A scope to start threads within. +pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>; + /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { std::thread::scope(|s| { @@ -47,7 +50,7 @@ pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where @@ -67,7 +70,7 @@ where let send_result = send_result.clone(); let receive_input = receive_input.clone(); let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut consume = consume.clone(); move || { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -103,12 +106,19 @@ where /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. /// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation. +/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`), +/// which allows callees to implement their own work-stealing in case the work is distributed unevenly. +/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice +/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads, +/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust +/// the thread-count accordingly. // TODO: better docs pub fn in_parallel_with_slice<I, S, R, E>( input: &mut [I], thread_limit: Option<usize>, new_thread_state: impl FnMut(usize) -> S + Send + Clone, - consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone, + consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration> + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, ) -> Result<Vec<R>, E> @@ -121,8 +131,8 @@ where let mut results = Vec::with_capacity(num_threads); let stop_everything = &AtomicBool::default(); let index = &AtomicUsize::default(); + let threads_left = &AtomicIsize::new(num_threads as isize); - // TODO: use std::thread::scope() once Rust 1.63 is available. std::thread::scope({ move |s| { std::thread::Builder::new() @@ -163,29 +173,35 @@ where let mut consume = consume.clone(); let input = Input(input as *mut [I]); move || { + let _ = &input; + threads_left.fetch_sub(1, Ordering::SeqCst); let mut state = new_thread_state(thread_id); - while let Ok(input_index) = - index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - (x < input_len).then_some(x + 1) - }) - { - if stop_everything.load(Ordering::Relaxed) { - break; - } - // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding - // each item exactly once. - let item = { - #[allow(unsafe_code)] - unsafe { - &mut (&mut *input.0)[input_index] + let res = (|| { + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state, threads_left, stop_everything) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); } - }; - if let Err(err) = consume(item, &mut state) { - stop_everything.store(true, Ordering::Relaxed); - return Err(err); } - } - Ok(state_to_rval(state)) + Ok(state_to_rval(state)) + })(); + threads_left.fetch_add(1, Ordering::SeqCst); + res } }) .expect("valid name") diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs index c994cb3b8..ac644acdd 100644 --- a/vendor/gix-features/src/parallel/mod.rs +++ b/vendor/gix-features/src/parallel/mod.rs @@ -1,12 +1,12 @@ //! Run computations in parallel, or not based the `parallel` feature toggle. //! -//! ### in_parallel(…) +//! ### `in_parallel`(…) //! //! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage //! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling //! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. //! -//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail. +//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail. //! //! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself //! or the data to work on. @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -80,13 +80,21 @@ pub fn optimize_chunk_size_and_thread_limit( ) -> (usize, Option<usize>, usize) { let available_threads = available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); - let available_threads = thread_limit - .map(|l| if l == 0 { available_threads } else { l }) - .unwrap_or(available_threads); + let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); let (lower, upper) = (50, 1000); - let (chunk_size, thread_limit) = num_items - .map(|num_items| { + let (chunk_size, thread_limit) = num_items.map_or( + { + let chunk_size = if available_threads == 1 { + desired_chunk_size + } else if desired_chunk_size < lower { + lower + } else { + desired_chunk_size.min(upper) + }; + (chunk_size, available_threads) + }, + |num_items| { let desired_chunks_per_thread_at_least = 2; let items = num_items; let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); @@ -97,17 +105,8 @@ pub fn optimize_chunk_size_and_thread_limit( available_threads }; (chunk_size, thread_limit) - }) - .unwrap_or({ - let chunk_size = if available_threads == 1 { - desired_chunk_size - } else if desired_chunk_size < lower { - lower - } else { - desired_chunk_size.min(upper) - }; - (chunk_size, available_threads) - }); + }, + ); (chunk_size, Some(thread_limit), thread_limit) } @@ -123,9 +122,7 @@ pub fn num_threads(_thread_limit: Option<usize>) -> usize { #[cfg(feature = "parallel")] pub fn num_threads(thread_limit: Option<usize>) -> usize { let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); - thread_limit - .map(|l| if l == 0 { logical_cores } else { l }) - .unwrap_or(logical_cores) + thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) } /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. @@ -137,7 +134,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where @@ -163,7 +160,7 @@ pub fn in_parallel_if<I, S, O, R>( input: impl Iterator<Item = I>, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs index 00723b2c3..3511c73e3 100644 --- a/vendor/gix-features/src/parallel/serial.rs +++ b/vendor/gix-features/src/parallel/serial.rs @@ -2,14 +2,17 @@ use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. - pub struct Scope<'env> { - _marker: std::marker::PhantomData<&'env mut &'env ()>, + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; @@ -20,32 +23,31 @@ mod not_parallel { } #[allow(unsafe_code)] - unsafe impl Sync for Scope<'_> {} + unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } - pub fn spawn_scoped<'a, 'env, F, T>( + pub fn spawn_scoped<'scope, 'env, F, T>( &self, - scope: &'a Scope<'env>, + scope: &'scope Scope<'scope, 'env>, f: F, - ) -> std::io::Result<ScopedJoinHandle<'a, T>> + ) -> std::io::Result<ScopedJoinHandle<'scope, T>> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { Ok(scope.spawn(f)) } } - impl<'env> Scope<'env> { - pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { ScopedJoinHandle { result: f(), @@ -58,10 +60,11 @@ mod not_parallel { /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where - F: FnOnce(&Scope<'env>) -> R, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { - _marker: Default::default(), + _scope: Default::default(), + _env: Default::default(), }) } @@ -79,6 +82,9 @@ mod not_parallel { pub fn join(self) -> std::thread::Result<T> { Ok(self.result) } + pub fn is_finished(&self) -> bool { + true + } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. @@ -89,13 +95,15 @@ mod not_parallel { input: &mut [I], _thread_limit: Option<usize>, mut new_thread_state: impl FnMut(usize) -> S + Clone, - mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option<std::time::Duration>, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result<Vec<R>, E> { let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); for item in input { - consume(item, &mut state)?; + consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } @@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I>, _thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> where diff --git a/vendor/gix-features/src/progress.rs b/vendor/gix-features/src/progress.rs index b7aeda620..8d1e30bc4 100644 --- a/vendor/gix-features/src/progress.rs +++ b/vendor/gix-features/src/progress.rs @@ -7,7 +7,7 @@ pub use prodash::{ self, messages::MessageLevel, progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN}, - unit, Progress, Unit, + unit, Progress, RawProgress, Unit, }; /// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`. #[cfg(not(feature = "progress-unit-bytes"))] diff --git a/vendor/gix-features/src/threading.rs b/vendor/gix-features/src/threading.rs index ff0c819a5..2b33386d2 100644 --- a/vendor/gix-features/src/threading.rs +++ b/vendor/gix-features/src/threading.rs @@ -17,7 +17,7 @@ mod _impl { pub type Mutable<T> = parking_lot::Mutex<T>; /// A guarded reference suitable for safekeeping in a struct. pub type RefGuard<'a, T> = parking_lot::RwLockReadGuard<'a, T>; - /// A mapped reference created from a RefGuard + /// A mapped reference created from a `RefGuard` pub type MappedRefGuard<'a, U> = parking_lot::MappedRwLockReadGuard<'a, U>; /// Get a shared reference through a [`MutableOnDemand`] for read-only access. diff --git a/vendor/gix-features/src/zlib/mod.rs b/vendor/gix-features/src/zlib/mod.rs index 8dcdfd93f..f55660075 100644 --- a/vendor/gix-features/src/zlib/mod.rs +++ b/vendor/gix-features/src/zlib/mod.rs @@ -41,6 +41,11 @@ impl Inflate { (self.state.total_out() - before_out) as usize, )) } + + /// Ready this instance for decoding another data stream. + pub fn reset(&mut self) { + self.state.reset(true); + } } /// diff --git a/vendor/gix-features/src/zlib/stream/deflate/mod.rs b/vendor/gix-features/src/zlib/stream/deflate/mod.rs index 55f575ea4..567e8fece 100644 --- a/vendor/gix-features/src/zlib/stream/deflate/mod.rs +++ b/vendor/gix-features/src/zlib/stream/deflate/mod.rs @@ -11,6 +11,19 @@ pub struct Write<W> { buf: [u8; BUF_SIZE], } +impl<W> Clone for Write<W> +where + W: Clone, +{ + fn clone(&self) -> Self { + Write { + compressor: impls::new_compress(), + inner: self.inner.clone(), + buf: self.buf, + } + } +} + mod impls { use std::io; @@ -18,6 +31,10 @@ mod impls { use crate::zlib::stream::deflate; + pub(crate) fn new_compress() -> Compress { + Compress::new(Compression::fast(), true) + } + impl<W> deflate::Write<W> where W: io::Write, @@ -25,7 +42,7 @@ mod impls { /// Create a new instance writing compressed bytes to `inner`. pub fn new(inner: W) -> deflate::Write<W> { deflate::Write { - compressor: Compress::new(Compression::fast(), true), + compressor: new_compress(), inner, buf: [0; deflate::BUF_SIZE], } diff --git a/vendor/gix-features/src/zlib/stream/deflate/tests.rs b/vendor/gix-features/src/zlib/stream/deflate/tests.rs index ba0dd2a2c..7c5865e0b 100644 --- a/vendor/gix-features/src/zlib/stream/deflate/tests.rs +++ b/vendor/gix-features/src/zlib/stream/deflate/tests.rs @@ -75,7 +75,7 @@ mod deflate_stream { fn big_file_small_writes() -> Result<(), Box<dyn std::error::Error>> { let mut w = deflate::Write::new(Vec::new()); let bytes = include_bytes!( - "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" + "../../../../../gix-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" ); for chunk in bytes.chunks(2) { assert_eq!(w.write(chunk)?, chunk.len()); @@ -89,7 +89,7 @@ mod deflate_stream { fn big_file_a_few_big_writes() -> Result<(), Box<dyn std::error::Error>> { let mut w = deflate::Write::new(Vec::new()); let bytes = include_bytes!( - "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" + "../../../../../gix-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" ); for chunk in bytes.chunks(4096 * 9) { assert_eq!(w.write(chunk)?, chunk.len()); diff --git a/vendor/gix-features/src/zlib/stream/inflate.rs b/vendor/gix-features/src/zlib/stream/inflate.rs index 007ecedc6..f68f45f57 100644 --- a/vendor/gix-features/src/zlib/stream/inflate.rs +++ b/vendor/gix-features/src/zlib/stream/inflate.rs @@ -46,11 +46,11 @@ pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) - // The stream has officially ended, nothing more to do here. Ok(Status::StreamEnd) => return Ok(total_written), // Either input our output are depleted even though the stream is not depleted yet. - Ok(Status::Ok) | Ok(Status::BufError) if eof || dst.is_empty() => return Ok(total_written), + Ok(Status::Ok | Status::BufError) if eof || dst.is_empty() => return Ok(total_written), // Some progress was made in both the input and the output, it must continue to reach the end. - Ok(Status::Ok) | Ok(Status::BufError) if consumed != 0 || written != 0 => continue, + Ok(Status::Ok | Status::BufError) if consumed != 0 || written != 0 => continue, // A strange state, where zlib makes no progress but isn't done either. Call it out. - Ok(Status::Ok) | Ok(Status::BufError) => unreachable!("Definitely a bug somewhere"), + Ok(Status::Ok | Status::BufError) => unreachable!("Definitely a bug somewhere"), Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput, "corrupt deflate stream")), } } |