From c23a457e72abe608715ac76f076f47dc42af07a5 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 30 May 2024 20:31:44 +0200 Subject: Merging upstream version 1.74.1+dfsg1. Signed-off-by: Daniel Baumann --- vendor/gix-features/src/cache.rs | 6 +- vendor/gix-features/src/fs.rs | 8 +-- vendor/gix-features/src/hash.rs | 10 ++-- vendor/gix-features/src/interrupt.rs | 38 +++++++++++- vendor/gix-features/src/io.rs | 6 +- vendor/gix-features/src/lib.rs | 2 + vendor/gix-features/src/parallel/eager_iter.rs | 2 +- vendor/gix-features/src/parallel/in_parallel.rs | 80 ++++++++++++++++++++++++- vendor/gix-features/src/parallel/mod.rs | 14 +++-- vendor/gix-features/src/parallel/serial.rs | 35 ++++++++++- vendor/gix-features/src/progress.rs | 20 +++++-- vendor/gix-features/src/threading.rs | 10 ++++ vendor/gix-features/src/zlib/stream/inflate.rs | 17 ------ 13 files changed, 199 insertions(+), 49 deletions(-) (limited to 'vendor/gix-features/src') diff --git a/vendor/gix-features/src/cache.rs b/vendor/gix-features/src/cache.rs index f7a2cf005..c56a34e12 100644 --- a/vendor/gix-features/src/cache.rs +++ b/vendor/gix-features/src/cache.rs @@ -11,9 +11,9 @@ mod impl_ { impl Debug { /// Create a new instance #[inline] - pub fn new(owner: impl Into) -> Self { + pub fn new(owner: String) -> Self { Debug { - owner: owner.into(), + owner, hits: 0, puts: 0, misses: 0, @@ -61,7 +61,7 @@ mod impl_ { impl Debug { /// Create a new instance #[inline] - pub fn new(_owner: impl Into) -> Self { + pub fn new(_owner: String) -> Self { Debug } /// noop diff --git a/vendor/gix-features/src/fs.rs b/vendor/gix-features/src/fs.rs index 20d819547..8b3a7d3e7 100644 --- a/vendor/gix-features/src/fs.rs +++ b/vendor/gix-features/src/fs.rs @@ -58,12 +58,12 @@ pub mod walkdir { } /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`. - pub fn walkdir_new(root: impl AsRef, parallelism: Parallelism) -> WalkDir { + pub fn walkdir_new(root: &Path, parallelism: Parallelism) -> WalkDir { WalkDir::new(root).skip_hidden(false).parallelism(parallelism.into()) } /// Instantiate a new directory iterator which will not skip hidden files and is sorted - pub fn walkdir_sorted_new(root: impl AsRef, parallelism: Parallelism) -> WalkDir { + pub fn walkdir_sorted_new(root: &Path, parallelism: Parallelism) -> WalkDir { WalkDir::new(root) .skip_hidden(false) .sort(true) @@ -84,12 +84,12 @@ pub mod walkdir { pub use super::shared::Parallelism; /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`. - pub fn walkdir_new(root: impl AsRef, _: Parallelism) -> WalkDir { + pub fn walkdir_new(root: &Path, _: Parallelism) -> WalkDir { WalkDir::new(root) } /// Instantiate a new directory iterator which will not skip hidden files and is sorted, with the given level of `parallelism`. - pub fn walkdir_sorted_new(root: impl AsRef, _: Parallelism) -> WalkDir { + pub fn walkdir_sorted_new(root: &Path, _: Parallelism) -> WalkDir { WalkDir::new(root).sort_by_file_name() } diff --git a/vendor/gix-features/src/hash.rs b/vendor/gix-features/src/hash.rs index fe064139a..435e018e9 100644 --- a/vendor/gix-features/src/hash.rs +++ b/vendor/gix-features/src/hash.rs @@ -95,14 +95,14 @@ pub fn hasher(kind: gix_hash::Kind) -> Sha1 { /// * [Interrupts][crate::interrupt] are supported. #[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))] pub fn bytes_of_file( - path: impl AsRef, + path: &std::path::Path, num_bytes_from_start: usize, kind: gix_hash::Kind, - progress: &mut impl crate::progress::Progress, + progress: &mut dyn crate::progress::Progress, should_interrupt: &std::sync::atomic::AtomicBool, ) -> std::io::Result { bytes( - std::fs::File::open(path)?, + &mut std::fs::File::open(path)?, num_bytes_from_start, kind, progress, @@ -113,10 +113,10 @@ pub fn bytes_of_file( /// Similar to [`bytes_of_file`], but operates on an already open file. #[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))] pub fn bytes( - mut read: impl std::io::Read, + read: &mut dyn std::io::Read, num_bytes_from_start: usize, kind: gix_hash::Kind, - progress: &mut impl crate::progress::Progress, + progress: &mut dyn crate::progress::Progress, should_interrupt: &std::sync::atomic::AtomicBool, ) -> std::io::Result { let mut hasher = hasher(kind); diff --git a/vendor/gix-features/src/interrupt.rs b/vendor/gix-features/src/interrupt.rs index 1f78e613a..dc7a2db17 100644 --- a/vendor/gix-features/src/interrupt.rs +++ b/vendor/gix-features/src/interrupt.rs @@ -91,7 +91,7 @@ where /// A wrapper for implementors of [`std::io::Read`] or [`std::io::BufRead`] with interrupt support. /// -/// It fails a [read][`std::io::Read::read`] while an interrupt was requested. +/// It fails a [read][std::io::Read::read] while an interrupt was requested. pub struct Read<'a, R> { /// The actual implementor of [`std::io::Read`] to which interrupt support will be added. pub inner: R, @@ -123,3 +123,39 @@ where self.inner.consume(amt) } } + +/// A wrapper for implementors of [`std::io::Write`] with interrupt checks on each write call. +/// +/// It fails a [write][std::io::Write::write] while an interrupt was requested. +pub struct Write<'a, W> { + /// The actual implementor of [`std::io::Write`] to which interrupt support will be added. + pub inner: W, + /// The flag to trigger interruption + pub should_interrupt: &'a AtomicBool, +} + +impl io::Write for Write<'_, W> +where + W: std::io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.should_interrupt.load(Ordering::Relaxed) { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Interrupted")); + } + self.inner.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + // Don't interrupt here, allow flushes to happen to prefer disk consistency. + self.inner.flush() + } +} + +impl io::Seek for Write<'_, W> +where + W: std::io::Seek, +{ + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + self.inner.seek(pos) + } +} diff --git a/vendor/gix-features/src/io.rs b/vendor/gix-features/src/io.rs index 405960c0b..fba273693 100644 --- a/vendor/gix-features/src/io.rs +++ b/vendor/gix-features/src/io.rs @@ -77,9 +77,9 @@ pub mod pipe { /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe. /// /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing. - /// If `None` or `0`, the `write` end will always block until the `read` end consumes the transferred bytes. - pub fn unidirectional(in_flight_writes: impl Into>) -> (Writer, Reader) { - let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0)); + /// If `0`, the `write` end will always block until the `read` end consumes the transferred bytes. + pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) { + let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes); ( Writer { channel: tx, diff --git a/vendor/gix-features/src/lib.rs b/vendor/gix-features/src/lib.rs index 643320c0f..f343b2647 100644 --- a/vendor/gix-features/src/lib.rs +++ b/vendor/gix-features/src/lib.rs @@ -27,6 +27,8 @@ pub mod parallel; #[cfg(feature = "progress")] pub mod progress; pub mod threading; +pub use gix_trace as trace; + /// #[cfg(feature = "zlib")] pub mod zlib; diff --git a/vendor/gix-features/src/parallel/eager_iter.rs b/vendor/gix-features/src/parallel/eager_iter.rs index 60123f54c..9a1735f72 100644 --- a/vendor/gix-features/src/parallel/eager_iter.rs +++ b/vendor/gix-features/src/parallel/eager_iter.rs @@ -54,7 +54,7 @@ where assert!(!v.is_empty()); v.into_iter() }); - self.chunk.as_mut().and_then(|c| c.next()) + self.chunk.as_mut().and_then(Iterator::next) } } diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs index 241565b62..a80762bad 100644 --- a/vendor/gix-features/src/parallel/in_parallel.rs +++ b/vendor/gix-features/src/parallel/in_parallel.rs @@ -49,7 +49,7 @@ pub fn build_thread() -> std::thread::Builder { pub fn in_parallel( input: impl Iterator + Send, thread_limit: Option, - new_thread_state: impl Fn(usize) -> S + Send + Clone, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<::Output, ::Error> @@ -102,6 +102,80 @@ where }) } +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +pub fn in_parallel_with_finalize( + input: impl Iterator + Send, + thread_limit: Option, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<::Output, ::Error> +where + R: Reduce, + I: Send, + O: Send, +{ + let num_threads = num_threads(thread_limit); + std::thread::scope(move |s| { + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::(num_threads); + let (send_result, receive_result) = crossbeam_channel::bounded::(num_threads); + for thread_id in 0..num_threads { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel.produce.{thread_id}")) + .spawn_scoped(s, { + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); + let finalize = finalize.clone(); + move || { + let mut state = new_thread_state(thread_id); + let mut can_send = true; + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + can_send = false; + break; + } + } + if can_send { + send_result.send(finalize(state)).ok(); + } + } + }) + .expect("valid name"); + } + std::thread::Builder::new() + .name("gitoxide.in_parallel.feed".into()) + .spawn_scoped(s, move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + }) + .expect("valid name"); + receive_result + }; + + for item in receive_result { + drop(reducer.feed(item)?); + } + reducer.finalize() + }) +} + /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. @@ -117,7 +191,7 @@ where pub fn in_parallel_with_slice( input: &mut [I], thread_limit: Option, - new_thread_state: impl FnMut(usize) -> S + Send + Clone, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, @@ -168,7 +242,7 @@ where std::thread::Builder::new() .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}")) .spawn_scoped(s, { - let mut new_thread_state = new_thread_state.clone(); + let new_thread_state = new_thread_state.clone(); let state_to_rval = state_to_rval.clone(); let mut consume = consume.clone(); let input = Input(input as *mut [I]); diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs index ac644acdd..5a0a4b589 100644 --- a/vendor/gix-features/src/parallel/mod.rs +++ b/vendor/gix-features/src/parallel/mod.rs @@ -35,11 +35,13 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; +pub use in_parallel::{ + build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope, +}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; +pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -79,7 +81,7 @@ pub fn optimize_chunk_size_and_thread_limit( available_threads: Option, ) -> (usize, Option, usize) { let available_threads = - available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); + available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into)); let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); let (lower, upper) = (50, 1000); @@ -121,7 +123,7 @@ pub fn num_threads(_thread_limit: Option) -> usize { /// Only available with the `parallel` feature toggle set. #[cfg(feature = "parallel")] pub fn num_threads(thread_limit: Option) -> usize { - let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + let logical_cores = std::thread::available_parallelism().map_or(1, Into::into); thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) } @@ -133,7 +135,7 @@ pub fn in_parallel_if( condition: impl FnOnce() -> bool, input: impl Iterator + Send, thread_limit: Option, - new_thread_state: impl Fn(usize) -> S + Send + Clone, + new_thread_state: impl FnOnce(usize) -> S + Send + Clone, consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<::Output, ::Error> @@ -159,7 +161,7 @@ pub fn in_parallel_if( _condition: impl FnOnce() -> bool, input: impl Iterator, thread_limit: Option, - new_thread_state: impl Fn(usize) -> S, + new_thread_state: impl FnOnce(usize) -> S, consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<::Output, ::Error> diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs index 3511c73e3..7665d3ffa 100644 --- a/vendor/gix-features/src/parallel/serial.rs +++ b/vendor/gix-features/src/parallel/serial.rs @@ -94,7 +94,7 @@ mod not_parallel { pub fn in_parallel_with_slice( input: &mut [I], _thread_limit: Option, - mut new_thread_state: impl FnMut(usize) -> S + Clone, + new_thread_state: impl FnOnce(usize) -> S + Clone, mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option, state_to_rval: impl FnOnce(S) -> R + Clone, @@ -128,7 +128,7 @@ pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scop pub fn in_parallel( input: impl Iterator, _thread_limit: Option, - new_thread_state: impl Fn(usize) -> S, + new_thread_state: impl FnOnce(usize) -> S, mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> @@ -141,3 +141,34 @@ where } reducer.finalize() } + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. +/// * For `reducer`, see the [`Reduce`] trait +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_with_finalize( + input: impl Iterator, + _thread_limit: Option, + new_thread_state: impl FnOnce(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, + finalize: impl FnOnce(S) -> O + Send + Clone, + mut reducer: R, +) -> Result<::Output, ::Error> +where + R: Reduce, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.feed(finalize(state))?; + reducer.finalize() +} diff --git a/vendor/gix-features/src/progress.rs b/vendor/gix-features/src/progress.rs index 8d1e30bc4..6a8c9e1bd 100644 --- a/vendor/gix-features/src/progress.rs +++ b/vendor/gix-features/src/progress.rs @@ -6,8 +6,11 @@ pub use bytesize; pub use prodash::{ self, messages::MessageLevel, - progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN}, - unit, Progress, RawProgress, Unit, + progress::{ + AtomicStep, Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN, + }, + unit, BoxedDynNestedProgress, Count, DynNestedProgress, DynNestedProgressToNestedProgress, NestedProgress, + Progress, Unit, }; /// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`. #[cfg(not(feature = "progress-unit-bytes"))] @@ -77,7 +80,7 @@ pub fn steps() -> Option { Some(unit::dynamic(unit::Range::new("steps"))) } -/// A structure passing every [`read`][std::io::Read::read()] call through to the contained Progress instance using [`inc_by(bytes_read)`][Progress::inc_by()]. +/// A structure passing every [`read`](std::io::Read::read()) call through to the contained Progress instance using [`inc_by(bytes_read)`](Count::inc_by()). pub struct Read { /// The implementor of [`std::io::Read`] to which progress is added pub inner: T, @@ -111,7 +114,7 @@ where } } -/// A structure passing every [`write`][std::io::Write::write()] call through to the contained Progress instance using [`inc_by(bytes_written)`][Progress::inc_by()]. +/// A structure passing every [`write`][std::io::Write::write()] call through to the contained Progress instance using [`inc_by(bytes_written)`](Count::inc_by()). /// /// This is particularly useful if the final size of the bytes to write is known or can be estimated precisely enough. pub struct Write { @@ -136,3 +139,12 @@ where self.inner.flush() } } + +impl io::Seek for Write +where + T: io::Seek, +{ + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + self.inner.seek(pos) + } +} diff --git a/vendor/gix-features/src/threading.rs b/vendor/gix-features/src/threading.rs index 2b33386d2..5f92ba2a8 100644 --- a/vendor/gix-features/src/threading.rs +++ b/vendor/gix-features/src/threading.rs @@ -30,6 +30,11 @@ mod _impl { v.write() } + /// Get a mutable reference to the underlying data, with semantics similar to [Arc::make_mut()]. + pub fn make_mut(this: &mut OwnShared) -> &mut T { + OwnShared::make_mut(this) + } + /// Get a mutable reference through a [`Mutable`] for read-write access. pub fn lock(v: &Mutable) -> parking_lot::MutexGuard<'_, T> { v.lock() @@ -75,6 +80,11 @@ mod _impl { v.borrow_mut() } + /// Get a mutable reference to the underlying data, with semantics similar to [Rc::make_mut()]. + pub fn make_mut(this: &mut OwnShared) -> &mut T { + OwnShared::make_mut(this) + } + /// Get a mutable reference through a [`Mutable`] for read-write access. pub fn lock(v: &Mutable) -> RefMut<'_, T> { v.borrow_mut() diff --git a/vendor/gix-features/src/zlib/stream/inflate.rs b/vendor/gix-features/src/zlib/stream/inflate.rs index f68f45f57..11dc92800 100644 --- a/vendor/gix-features/src/zlib/stream/inflate.rs +++ b/vendor/gix-features/src/zlib/stream/inflate.rs @@ -2,23 +2,6 @@ use std::{io, io::BufRead}; use flate2::{Decompress, FlushDecompress, Status}; -/// The boxed variant is faster for what we do (moving the decompressor in and out a lot) -pub struct ReadBoxed { - /// The reader from which bytes should be decompressed. - pub inner: R, - /// The decompressor doing all the work. - pub decompressor: Box, -} - -impl io::Read for ReadBoxed -where - R: BufRead, -{ - fn read(&mut self, into: &mut [u8]) -> io::Result { - read(&mut self.inner, &mut self.decompressor, into) - } -} - /// Read bytes from `rd` and decompress them using `state` into a pre-allocated fitting buffer `dst`, returning the amount of bytes written. pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -> io::Result { let mut total_written = 0; -- cgit v1.2.3