summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/gix-features/src
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src')
-rw-r--r--vendor/gix-features/src/fs.rs28
-rw-r--r--vendor/gix-features/src/parallel/in_parallel.rs66
-rw-r--r--vendor/gix-features/src/parallel/mod.rs47
-rw-r--r--vendor/gix-features/src/parallel/serial.rs46
-rw-r--r--vendor/gix-features/src/progress.rs2
-rw-r--r--vendor/gix-features/src/threading.rs2
-rw-r--r--vendor/gix-features/src/zlib/mod.rs5
-rw-r--r--vendor/gix-features/src/zlib/stream/deflate/mod.rs19
-rw-r--r--vendor/gix-features/src/zlib/stream/deflate/tests.rs4
-rw-r--r--vendor/gix-features/src/zlib/stream/inflate.rs6
10 files changed, 135 insertions, 90 deletions
diff --git a/vendor/gix-features/src/fs.rs b/vendor/gix-features/src/fs.rs
index f07ac1f0f..20d819547 100644
--- a/vendor/gix-features/src/fs.rs
+++ b/vendor/gix-features/src/fs.rs
@@ -38,19 +38,21 @@ pub mod walkdir {
match v {
Parallelism::Serial => jwalk::Parallelism::Serial,
Parallelism::ThreadPoolPerTraversal { thread_name } => std::thread::available_parallelism()
- .map(|threads| {
- let pool = jwalk::rayon::ThreadPoolBuilder::new()
- .num_threads(threads.get().min(16))
- .stack_size(128 * 1024)
- .thread_name(move |idx| format!("{thread_name} {idx}"))
- .build()
- .expect("we only set options that can't cause a build failure");
- jwalk::Parallelism::RayonExistingPool {
- pool: pool.into(),
- busy_timeout: None,
- }
- })
- .unwrap_or_else(|_| Parallelism::Serial.into()),
+ .map_or_else(
+ |_| Parallelism::Serial.into(),
+ |threads| {
+ let pool = jwalk::rayon::ThreadPoolBuilder::new()
+ .num_threads(threads.get().min(16))
+ .stack_size(128 * 1024)
+ .thread_name(move |idx| format!("{thread_name} {idx}"))
+ .build()
+ .expect("we only set options that can't cause a build failure");
+ jwalk::Parallelism::RayonExistingPool {
+ pool: pool.into(),
+ busy_timeout: None,
+ }
+ },
+ ),
}
}
}
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs
index e1e2cc3e3..241565b62 100644
--- a/vendor/gix-features/src/parallel/in_parallel.rs
+++ b/vendor/gix-features/src/parallel/in_parallel.rs
@@ -1,7 +1,10 @@
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::parallel::{num_threads, Reduce};
+/// A scope to start threads within.
+pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>;
+
/// Runs `left` and `right` in parallel, returning their output when both are done.
pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
std::thread::scope(|s| {
@@ -47,7 +50,7 @@ pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Clone,
- consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
@@ -67,7 +70,7 @@ where
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
- let consume = consume.clone();
+ let mut consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
@@ -103,12 +106,19 @@ where
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
+/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation.
+/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`),
+/// which allows callees to implement their own work-stealing in case the work is distributed unevenly.
+/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice
+/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads,
+/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust
+/// the thread-count accordingly.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
thread_limit: Option<usize>,
new_thread_state: impl FnMut(usize) -> S + Send + Clone,
- consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone,
+ consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
state_to_rval: impl FnOnce(S) -> R + Send + Clone,
) -> Result<Vec<R>, E>
@@ -121,8 +131,8 @@ where
let mut results = Vec::with_capacity(num_threads);
let stop_everything = &AtomicBool::default();
let index = &AtomicUsize::default();
+ let threads_left = &AtomicIsize::new(num_threads as isize);
- // TODO: use std::thread::scope() once Rust 1.63 is available.
std::thread::scope({
move |s| {
std::thread::Builder::new()
@@ -163,29 +173,35 @@ where
let mut consume = consume.clone();
let input = Input(input as *mut [I]);
move || {
+ let _ = &input;
+ threads_left.fetch_sub(1, Ordering::SeqCst);
let mut state = new_thread_state(thread_id);
- while let Ok(input_index) =
- index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- (x < input_len).then_some(x + 1)
- })
- {
- if stop_everything.load(Ordering::Relaxed) {
- break;
- }
- // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
- // each item exactly once.
- let item = {
- #[allow(unsafe_code)]
- unsafe {
- &mut (&mut *input.0)[input_index]
+ let res = (|| {
+ while let Ok(input_index) =
+ index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
+ (x < input_len).then_some(x + 1)
+ })
+ {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+ // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
+ // each item exactly once.
+ let item = {
+ #[allow(unsafe_code)]
+ unsafe {
+ &mut (&mut *input.0)[input_index]
+ }
+ };
+ if let Err(err) = consume(item, &mut state, threads_left, stop_everything) {
+ stop_everything.store(true, Ordering::Relaxed);
+ return Err(err);
}
- };
- if let Err(err) = consume(item, &mut state) {
- stop_everything.store(true, Ordering::Relaxed);
- return Err(err);
}
- }
- Ok(state_to_rval(state))
+ Ok(state_to_rval(state))
+ })();
+ threads_left.fetch_add(1, Ordering::SeqCst);
+ res
}
})
.expect("valid name")
diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs
index c994cb3b8..ac644acdd 100644
--- a/vendor/gix-features/src/parallel/mod.rs
+++ b/vendor/gix-features/src/parallel/mod.rs
@@ -1,12 +1,12 @@
//! Run computations in parallel, or not based the `parallel` feature toggle.
//!
-//! ### in_parallel(…)
+//! ### `in_parallel`(…)
//!
//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
//!
-//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail.
+//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail.
//!
//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
//! or the data to work on.
@@ -35,11 +35,11 @@
#[cfg(feature = "parallel")]
mod in_parallel;
#[cfg(feature = "parallel")]
-pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
mod serial;
#[cfg(not(feature = "parallel"))]
-pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
mod in_order;
pub use in_order::{InOrderIter, SequenceId};
@@ -80,13 +80,21 @@ pub fn optimize_chunk_size_and_thread_limit(
) -> (usize, Option<usize>, usize) {
let available_threads =
available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1));
- let available_threads = thread_limit
- .map(|l| if l == 0 { available_threads } else { l })
- .unwrap_or(available_threads);
+ let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l });
let (lower, upper) = (50, 1000);
- let (chunk_size, thread_limit) = num_items
- .map(|num_items| {
+ let (chunk_size, thread_limit) = num_items.map_or(
+ {
+ let chunk_size = if available_threads == 1 {
+ desired_chunk_size
+ } else if desired_chunk_size < lower {
+ lower
+ } else {
+ desired_chunk_size.min(upper)
+ };
+ (chunk_size, available_threads)
+ },
+ |num_items| {
let desired_chunks_per_thread_at_least = 2;
let items = num_items;
let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
@@ -97,17 +105,8 @@ pub fn optimize_chunk_size_and_thread_limit(
available_threads
};
(chunk_size, thread_limit)
- })
- .unwrap_or({
- let chunk_size = if available_threads == 1 {
- desired_chunk_size
- } else if desired_chunk_size < lower {
- lower
- } else {
- desired_chunk_size.min(upper)
- };
- (chunk_size, available_threads)
- });
+ },
+ );
(chunk_size, Some(thread_limit), thread_limit)
}
@@ -123,9 +122,7 @@ pub fn num_threads(_thread_limit: Option<usize>) -> usize {
#[cfg(feature = "parallel")]
pub fn num_threads(thread_limit: Option<usize>) -> usize {
let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
- thread_limit
- .map(|l| if l == 0 { logical_cores } else { l })
- .unwrap_or(logical_cores)
+ thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l })
}
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
@@ -137,7 +134,7 @@ pub fn in_parallel_if<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Clone,
- consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ consume: impl FnMut(I, &mut S) -> O + Send + Clone,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
@@ -163,7 +160,7 @@ pub fn in_parallel_if<I, S, O, R>(
input: impl Iterator<Item = I>,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
- consume: impl Fn(I, &mut S) -> O,
+ consume: impl FnMut(I, &mut S) -> O,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
index 00723b2c3..3511c73e3 100644
--- a/vendor/gix-features/src/parallel/serial.rs
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -2,14 +2,17 @@ use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
+ use std::sync::atomic::{AtomicBool, AtomicIsize};
+
/// Runs `left` and then `right`, one after another, returning their output when both are done.
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
/// A scope for spawning threads.
- pub struct Scope<'env> {
- _marker: std::marker::PhantomData<&'env mut &'env ()>,
+ pub struct Scope<'scope, 'env: 'scope> {
+ _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
+ _env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
@@ -20,32 +23,31 @@ mod not_parallel {
}
#[allow(unsafe_code)]
- unsafe impl Sync for Scope<'_> {}
+ unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
- pub fn spawn_scoped<'a, 'env, F, T>(
+ pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
- scope: &'a Scope<'env>,
+ scope: &'scope Scope<'scope, 'env>,
f: F,
- ) -> std::io::Result<ScopedJoinHandle<'a, T>>
+ ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
Ok(scope.spawn(f))
}
}
- impl<'env> Scope<'env> {
- pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ impl<'scope, 'env> Scope<'scope, 'env> {
+ /// Provided with this scope, let `f` start new threads that live within it.
+ pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
- F: FnOnce() -> T,
- F: Send + 'env,
- T: Send + 'env,
+ F: FnOnce() -> T + 'scope,
+ T: 'scope,
{
ScopedJoinHandle {
result: f(),
@@ -58,10 +60,11 @@ mod not_parallel {
/// Note that this implementation will run the spawned functions immediately.
pub fn threads<'env, F, R>(f: F) -> R
where
- F: FnOnce(&Scope<'env>) -> R,
+ F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
- _marker: Default::default(),
+ _scope: Default::default(),
+ _env: Default::default(),
})
}
@@ -79,6 +82,9 @@ mod not_parallel {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
+ pub fn is_finished(&self) -> bool {
+ true
+ }
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
@@ -89,13 +95,15 @@ mod not_parallel {
input: &mut [I],
_thread_limit: Option<usize>,
mut new_thread_state: impl FnMut(usize) -> S + Clone,
- mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
+ mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
+ let should_interrupt = &AtomicBool::default();
+ let threads_left = &AtomicIsize::default();
for item in input {
- consume(item, &mut state)?;
+ consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
@@ -121,7 +129,7 @@ pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
- consume: impl Fn(I, &mut S) -> O,
+ mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
diff --git a/vendor/gix-features/src/progress.rs b/vendor/gix-features/src/progress.rs
index b7aeda620..8d1e30bc4 100644
--- a/vendor/gix-features/src/progress.rs
+++ b/vendor/gix-features/src/progress.rs
@@ -7,7 +7,7 @@ pub use prodash::{
self,
messages::MessageLevel,
progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN},
- unit, Progress, Unit,
+ unit, Progress, RawProgress, Unit,
};
/// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`.
#[cfg(not(feature = "progress-unit-bytes"))]
diff --git a/vendor/gix-features/src/threading.rs b/vendor/gix-features/src/threading.rs
index ff0c819a5..2b33386d2 100644
--- a/vendor/gix-features/src/threading.rs
+++ b/vendor/gix-features/src/threading.rs
@@ -17,7 +17,7 @@ mod _impl {
pub type Mutable<T> = parking_lot::Mutex<T>;
/// A guarded reference suitable for safekeeping in a struct.
pub type RefGuard<'a, T> = parking_lot::RwLockReadGuard<'a, T>;
- /// A mapped reference created from a RefGuard
+ /// A mapped reference created from a `RefGuard`
pub type MappedRefGuard<'a, U> = parking_lot::MappedRwLockReadGuard<'a, U>;
/// Get a shared reference through a [`MutableOnDemand`] for read-only access.
diff --git a/vendor/gix-features/src/zlib/mod.rs b/vendor/gix-features/src/zlib/mod.rs
index 8dcdfd93f..f55660075 100644
--- a/vendor/gix-features/src/zlib/mod.rs
+++ b/vendor/gix-features/src/zlib/mod.rs
@@ -41,6 +41,11 @@ impl Inflate {
(self.state.total_out() - before_out) as usize,
))
}
+
+ /// Ready this instance for decoding another data stream.
+ pub fn reset(&mut self) {
+ self.state.reset(true);
+ }
}
///
diff --git a/vendor/gix-features/src/zlib/stream/deflate/mod.rs b/vendor/gix-features/src/zlib/stream/deflate/mod.rs
index 55f575ea4..567e8fece 100644
--- a/vendor/gix-features/src/zlib/stream/deflate/mod.rs
+++ b/vendor/gix-features/src/zlib/stream/deflate/mod.rs
@@ -11,6 +11,19 @@ pub struct Write<W> {
buf: [u8; BUF_SIZE],
}
+impl<W> Clone for Write<W>
+where
+ W: Clone,
+{
+ fn clone(&self) -> Self {
+ Write {
+ compressor: impls::new_compress(),
+ inner: self.inner.clone(),
+ buf: self.buf,
+ }
+ }
+}
+
mod impls {
use std::io;
@@ -18,6 +31,10 @@ mod impls {
use crate::zlib::stream::deflate;
+ pub(crate) fn new_compress() -> Compress {
+ Compress::new(Compression::fast(), true)
+ }
+
impl<W> deflate::Write<W>
where
W: io::Write,
@@ -25,7 +42,7 @@ mod impls {
/// Create a new instance writing compressed bytes to `inner`.
pub fn new(inner: W) -> deflate::Write<W> {
deflate::Write {
- compressor: Compress::new(Compression::fast(), true),
+ compressor: new_compress(),
inner,
buf: [0; deflate::BUF_SIZE],
}
diff --git a/vendor/gix-features/src/zlib/stream/deflate/tests.rs b/vendor/gix-features/src/zlib/stream/deflate/tests.rs
index ba0dd2a2c..7c5865e0b 100644
--- a/vendor/gix-features/src/zlib/stream/deflate/tests.rs
+++ b/vendor/gix-features/src/zlib/stream/deflate/tests.rs
@@ -75,7 +75,7 @@ mod deflate_stream {
fn big_file_small_writes() -> Result<(), Box<dyn std::error::Error>> {
let mut w = deflate::Write::new(Vec::new());
let bytes = include_bytes!(
- "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
+ "../../../../../gix-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
);
for chunk in bytes.chunks(2) {
assert_eq!(w.write(chunk)?, chunk.len());
@@ -89,7 +89,7 @@ mod deflate_stream {
fn big_file_a_few_big_writes() -> Result<(), Box<dyn std::error::Error>> {
let mut w = deflate::Write::new(Vec::new());
let bytes = include_bytes!(
- "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
+ "../../../../../gix-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
);
for chunk in bytes.chunks(4096 * 9) {
assert_eq!(w.write(chunk)?, chunk.len());
diff --git a/vendor/gix-features/src/zlib/stream/inflate.rs b/vendor/gix-features/src/zlib/stream/inflate.rs
index 007ecedc6..f68f45f57 100644
--- a/vendor/gix-features/src/zlib/stream/inflate.rs
+++ b/vendor/gix-features/src/zlib/stream/inflate.rs
@@ -46,11 +46,11 @@ pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -
// The stream has officially ended, nothing more to do here.
Ok(Status::StreamEnd) => return Ok(total_written),
// Either input our output are depleted even though the stream is not depleted yet.
- Ok(Status::Ok) | Ok(Status::BufError) if eof || dst.is_empty() => return Ok(total_written),
+ Ok(Status::Ok | Status::BufError) if eof || dst.is_empty() => return Ok(total_written),
// Some progress was made in both the input and the output, it must continue to reach the end.
- Ok(Status::Ok) | Ok(Status::BufError) if consumed != 0 || written != 0 => continue,
+ Ok(Status::Ok | Status::BufError) if consumed != 0 || written != 0 => continue,
// A strange state, where zlib makes no progress but isn't done either. Call it out.
- Ok(Status::Ok) | Ok(Status::BufError) => unreachable!("Definitely a bug somewhere"),
+ Ok(Status::Ok | Status::BufError) => unreachable!("Definitely a bug somewhere"),
Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput, "corrupt deflate stream")),
}
}